SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
CellStoreBlock.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 
9 
10 namespace SWC { namespace Ranger { namespace CellStore { namespace Block {
11 
12 
13 
14 const char* Read::to_string(const Read::State state) noexcept {
15  switch(state) {
16  case State::LOADED:
17  return "LOADED";
18  case State::LOADING:
19  return "LOADING";
20  default:
21  return "NONE";
22  }
23 }
24 
25 /*
26 Read::Ptr Read::make(int& err,
27  CellStore::Read* cellstore,
28  const DB::Cells::Interval& interval,
29  const uint64_t offset) {
30  Header header(interval.key_seq);
31  header.interval.copy(interval);
32  header.offset_data = offset;
33  load_header(err, cellstore->smartfd, header);
34  return err ? nullptr : new Read(cellstore, header);
35 }
36 */
37 
38 void Read::load_header(int& err, FS::SmartFd::Ptr& smartfd,
39  Header& header) {
40  const auto& fs_if = Env::FsInterface::interface();
41  const auto& fs = Env::FsInterface::fs();
42  err = Error::OK;
43  while(err != Error::FS_EOF) {
44 
45  if(err) {
47  Error::print(SWC_LOG_OSTREAM << "Retrying to ", err);
48  smartfd->print(SWC_LOG_OSTREAM << ' ');
49  SWC_LOG_OSTREAM << " blk-offset=" << header.offset_data;
50  );
51  fs_if->close(err, smartfd);
52  err = Error::OK;
53  }
54 
55  if(!smartfd->valid() && !fs_if->open(err, smartfd) && err)
56  return;
57  if(err)
58  continue;
59 
60  uint8_t buf[Header::SIZE];
61  const uint8_t *ptr = buf;
62  if(fs->pread(err, smartfd, header.offset_data, buf,
64  continue;
65 
66  size_t remain = Header::SIZE;
67  header.decode(&ptr, &remain);
69  Serialization::decode_i32(&ptr, &remain), buf, Header::SIZE - 4)) {
71  continue;
72  }
74  return;
75  }
76 }
77 
79 Read::Read(Header&& a_header) noexcept
80  : header(std::move(a_header)), cellstore(nullptr),
81  m_mutex(),
82  m_state(header.size_plain ? State::NONE : State::LOADED),
83  m_err(Error::OK), m_cells_remain(header.cells_count),
84  m_processing(0), m_buffer(), m_queue() {
85 }
86 
88 void Read::init(CellStore::Read* _cellstore) noexcept {
89  cellstore = _cellstore;
90 }
91 
92 Read::~Read() noexcept {
93  if(m_state == State::LOADED)
95 }
96 
97 /*
98 size_t Read::size_of() const noexcept {
99  return sizeof(*this) + sizeof(Ptr) +
100  header.interval.size_of_internal();
101 }
102 */
103 
105 bool Read::load(BlockLoader* loader) {
107  auto at(State::NONE);
108  {
111  if(at != State::LOADED)
112  m_queue.push(loader);
113  }
114  switch(at) {
115  case State::NONE: {
118  return true;
119  }
120  case State::LOADING:
121  return false;
122  default: //case State::LOADED:
123  loader->loaded(this);
124  return false;
125  }
126 }
127 
129 void Read::load() {
130  struct Task {
131  Read* ptr;
133  Task(Read* a_ptr) noexcept : ptr(a_ptr) { }
134  void operator()() { ptr->load_open(Error::OK); }
135  };
136  Env::Rgr::post(Task(this));
137 }
138 
140 void Read::load_cells(int&, Ranger::Block::Ptr cells_block) {
141  bool was_splitted = false;
142  ssize_t remain_hint = m_buffer.size
144  cells_block->load_cells(
147  was_splitted,
148  true
149  )
150  )
151  : 0;
152 
153  if(m_processing.fetch_sub(1) == 1 &&
154  !was_splitted &&
155  (remain_hint <= 0 || Env::Rgr::res().need_ram(header.size_plain)))
156  release();
157 }
158 
160 void Read::processing_decrement() noexcept {
162 }
163 
165 size_t Read::release() {
166  size_t released = 0;
167  bool support;
168  if(header.size_plain && !m_processing && loaded() &&
169  m_mutex.try_full_lock(support)) {
170  State at = State::LOADED;
171  if(m_queue.empty() && !m_processing &&
173  released += header.size_plain;
174  m_buffer.free();
176  }
177  m_mutex.unlock(support);
178  }
179  if(released)
181  return released;
182 }
183 
185 bool Read::processing() noexcept {
186  bool support;
187  bool busy = m_processing ||
188  m_state == State::LOADING ||
189  !m_mutex.try_full_lock(support);
190  if(!busy) {
191  busy = m_state == State::LOADING ||
192  !m_queue.empty() ||
193  m_processing;
194  m_mutex.unlock(support);
195  }
196  return busy;
197 }
198 
199 
201 bool Read::loaded() const noexcept {
202  return m_state == State::LOADED;
203 }
204 
206 bool Read::loaded(int& err) noexcept {
207  Core::MutexSptd::scope lock(m_mutex);
208  return !(err = m_err) && loaded();
209 }
210 
212 size_t Read::size_bytes(bool only_loaded) const noexcept {
213  return only_loaded && !loaded() ? 0 : header.size_plain;
214 }
215 
217 size_t Read::size_bytes_enc(bool only_loaded) const noexcept {
218  return only_loaded && !loaded() ? 0 : header.size_enc;
219 }
220 
221 void Read::print(std::ostream& out) {
222  out << "Block(";
223  header.print(out);
224  out << " state=" << to_string(m_state)
225  << " processing=" << m_processing.load();
226  {
228  out << " queue=" << m_queue.size();
229  if(m_err)
230  Error::print(out << ' ', m_err);
231  }
232  out << ')';
233 }
234 
235 void Read::load_open(int err) {
236  switch(err) {
239  return load_finish(err);
240  case Error::OK:
241  break;
242  default: {
244  Error::print(SWC_LOG_OSTREAM << "Retrying to ", err);
245  cellstore->smartfd->print(SWC_LOG_OSTREAM << ' ');
246  print(SWC_LOG_OSTREAM << ' ');
247  );
248  if(cellstore->smartfd->valid()) {
250  [this](int) { load_open(Error::OK); },
252  );
253  return;
254  }
255  //std::this_thread::sleep_for(std::chrono::microseconds(10000));
256  }
257  }
258 
259  cellstore->smartfd->valid()
260  ? Env::FsInterface::fs()->pread(
261  [ptr=this](int _err, StaticBuffer&& buff) {
262  struct Task {
263  Read* ptr;
264  int error;
265  StaticBuffer buffer;
267  Task(Read* a_ptr, int a_error, StaticBuffer&& a_buffer)
268  noexcept : ptr(a_ptr), error(a_error),
269  buffer(std::move(a_buffer)) {
270  }
272  Task(Task&& other) noexcept
273  : ptr(other.ptr), error(other.error),
274  buffer(std::move(other.buffer)) {
275  }
276  Task(const Task&) = delete;
277  Task& operator=(Task&&) = delete;
278  Task& operator=(const Task&) = delete;
279  ~Task() noexcept { }
280  void operator()() { ptr->load_read(error, std::move(buffer)); }
281  };
282  Env::Rgr::post(Task(ptr, _err, std::move(buff)));
283  },
285  )
286  : Env::FsInterface::fs()->open(
287  [this](int _err) { load_open(_err); },
289  );
290 }
291 
292 void Read::load_read(int err, StaticBuffer&& buffer) {
293  if(!err) {
295  buffer.base, header.size_enc)) {
297 
298  } else if(buffer.size != header.size_enc) {
299  err = Error::FS_EOF;
300 
301  } else {
302  if(header.encoder != DB::Types::Encoder::PLAIN) {
303  StaticBuffer decoded_buf(static_cast<size_t>(header.size_plain));
305  err, header.encoder,
306  buffer.base, header.size_enc,
307  decoded_buf.base, header.size_plain
308  );
309  if(!err)
310  m_buffer.set(decoded_buf);
311  } else {
312  m_buffer.set(buffer);
313  }
314  }
315  }
316 
317  err ? load_open(err) : load_finish(Error::OK);
318 }
319 
320 void Read::load_finish(int err) {
322  if(err) {
324  Error::print(SWC_LOG_OSTREAM << "CellStore::Block load ", err);
325  cellstore->smartfd->print(SWC_LOG_OSTREAM << ' ');
326  print(SWC_LOG_OSTREAM << ' ');
327  );
328  if(err == Error::FS_PATH_NOT_FOUND)
329  err = Error::OK;
330 
331  m_buffer.free();
332  }
333 
334  {
336  m_state.store(err ? State::NONE : State::LOADED);
337  m_err = err;
338  }
339  if(err)
341 
342  struct Task {
343  Read* ptr;
345  Task(Read* a_ptr) noexcept : ptr(a_ptr) { }
346  void operator()() { ptr->cellstore->_run_queued(); }
347  };
348  Env::Rgr::post(Task(this));
349 
350  for(BlockLoader* loader;;) {
351  {
353  if(m_queue.empty())
354  break;
355  loader = m_queue.front();
356  m_queue.pop();
357  }
358  loader->loaded(this);
359  }
360 }
361 
362 
363 
364 
365 
367 Write::Write(Header&& a_header) noexcept : header(std::move(a_header)) { }
368 
370 void Write::encode(int& err, DynamicBuffer& cells, DynamicBuffer& output,
371  Header& header) {
372  header.size_plain = cells.fill();
373  size_t len_enc = 0;
375  &len_enc, output, Header::SIZE);
376  if(err)
377  return;
378  if(!len_enc) {
379  header.encoder = DB::Types::Encoder::PLAIN;
381  } else {
382  header.size_enc = len_enc;
383  }
384 
385  uint8_t* ptr = output.base;
386  header.encode(&ptr);
387 }
388 
389 void Write::print(std::ostream& out) const {
390  header.print(out << "Block(");
391  out << ')';
392 }
393 
394 
395 
396 }}}} // namespace SWC::Ranger::CellStore::Block
397 
SWC::Ranger::CellStore::Block::Write::Write
Write(Header &&header) noexcept
Definition: CellStoreBlock.cc:367
SWC::Ranger::CellStore::Block::Header
Definition: CellStoreBlockHeader.h:23
SWC::Core::AtomicBase::compare_exchange_weak
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
Definition: Atomic.h:52
SWC::Env::Rgr::res
static SWC_CAN_INLINE System::Resources & res() noexcept
Definition: RangerEnv.h:131
SWC::Ranger::CellStore::Block::Read::load_cells
void load_cells(int &err, Ranger::Block::Ptr cells_block)
Definition: CellStoreBlock.cc:140
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Ranger::CellStore::Block::Write::encode
static void encode(int &err, DynamicBuffer &cells, DynamicBuffer &output, Header &header)
Definition: CellStoreBlock.cc:370
SWC::Ranger::Block::load_cells
size_t load_cells(const uint8_t *buf, size_t remain, uint32_t revs, size_t avail, bool &was_splitted, bool synced=false)
Definition: RangeBlock.cc:209
SWC::Ranger::Block
Definition: RangeBlock.h:23
SWC::Ranger::CellStore::Block::Header::offset_data
uint64_t offset_data
Definition: CellStoreBlockHeader.h:29
SWC::Error::SERVER_SHUTTING_DOWN
@ SERVER_SHUTTING_DOWN
Definition: Error.h:84
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Ranger::CellStore::Block::Read::~Read
~Read() noexcept
Definition: CellStoreBlock.cc:92
SWC::Ranger::CellStore::Block::Read::m_state
Core::Atomic< State > m_state
Definition: CellStoreBlock.h:98
SWC::Env::FsInterface::interface
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
Definition: Interface.h:150
SWC::System::Resources::more_mem_future
SWC_CAN_INLINE void more_mem_future(size_t sz) noexcept
Definition: Resources.h:104
SWC::Ranger::CellStore::Block::Read::load_finish
void load_finish(int err)
Definition: CellStoreBlock.cc:320
SWC::Ranger::CellStore::Block::Read::State
State
Definition: CellStoreBlock.h:32
SWC::Core::checksum_i32_chk
SWC_CAN_INLINE bool checksum_i32_chk(uint32_t checksum, const uint8_t *base, uint32_t len)
Definition: Checksum.h:94
SWC::Ranger::CellStore::Block::Read::operator=
Read & operator=(const Read &)=delete
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Error::FS_PATH_NOT_FOUND
@ FS_PATH_NOT_FOUND
Definition: Error.h:97
SWC::Ranger::CellStore::Block::Read::m_err
int m_err
Definition: CellStoreBlock.h:99
SWC::Ranger::BlockLoader::loaded
void loaded(CellStore::Block::Read::Ptr &&blk)
Definition: RangeBlockLoader.cc:114
SWC::Ranger::CellStore::Block::Read::header
const Header header
Definition: CellStoreBlock.h:49
SWC::Ranger::CellStore::Block::Read::m_buffer
StaticBuffer m_buffer
Definition: CellStoreBlock.h:102
SWC::Ranger::CellStore::Block::Read::m_cells_remain
Core::Atomic< uint32_t > m_cells_remain
Definition: CellStoreBlock.h:100
SWC::Core::MutexSptd::unlock
SWC_CAN_INLINE void unlock(const bool &support) noexcept
Definition: MutexSptd.h:71
SWC::Ranger::CellStore::Block::Read::processing_decrement
void processing_decrement() noexcept
Definition: CellStoreBlock.cc:160
SWC::Core::Atomic::sub_rslt
constexpr SWC_CAN_INLINE T sub_rslt(T v) noexcept
Definition: Atomic.h:115
SWC::Ranger::CellStore::Block::Write::print
void print(std::ostream &out) const
Definition: CellStoreBlock.cc:389
SWC::Ranger::CellStore::Block::Write::header
const Header header
Definition: CellStoreBlock.h:122
SWC::Ranger::CellStore::Block::Read::size_bytes_enc
size_t size_bytes_enc(bool only_loaded=false) const noexcept
Definition: CellStoreBlock.cc:217
SWC::Ranger::CellStore::Block::Header::SIZE
static const uint8_t SIZE
Definition: CellStoreBlockHeader.h:24
SWC::Ranger::CellStore::Block::Read::load_open
void load_open(int err)
Definition: CellStoreBlock.cc:235
SWC::Ranger::CellStore::Block::Read::size_bytes
size_t size_bytes(bool only_loaded=false) const noexcept
Definition: CellStoreBlock.cc:212
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Ranger::CellStore::Block::Header::cells_count
uint32_t cells_count
Definition: CellStoreBlockHeader.h:36
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::System::Resources::more_mem_releasable
SWC_CAN_INLINE void more_mem_releasable(size_t sz) noexcept
Definition: Resources.h:114
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Error::FS_EOF
@ FS_EOF
Definition: Error.h:96
CellStoreBlock.h
SWC::System::Resources::less_mem_releasable
SWC_CAN_INLINE void less_mem_releasable(size_t sz) noexcept
Definition: Resources.h:119
SWC::Core::Encoder::decode
void decode(int &err, Type encoder, const uint8_t *src, size_t sz_enc, uint8_t *dst, size_t sz)
Definition: Encoder.cc:99
SWC::Ranger::CellStore::Block::Read::Read
Read(Header &&header) noexcept
Definition: CellStoreBlock.cc:79
SWC::Core::Buffer::base
value_type * base
Definition: Buffer.h:131
SWC::Ranger::CellStore::Block::Read::init
void init(CellStore::Read *cellstore) noexcept
Definition: CellStoreBlock.cc:88
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Core::BufferDyn< StaticBuffer >
SWC::Ranger::CellStore::Read::cell_revs
const uint32_t cell_revs
Definition: CellStore.h:73
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::System::Resources::need_ram
SWC_CAN_INLINE size_t need_ram() const noexcept
Definition: Resources.h:89
SWC::Core::Buffer::size
size_t size
Definition: Buffer.h:130
SWC::Env::FsInterface::fs
static SWC_CAN_INLINE FS::FileSystem::Ptr & fs() noexcept
Definition: Interface.h:155
SWC::Ranger::CellStore::Read
Definition: CellStore.h:41
SWC::Ranger::CellStore::Read::smartfd
FS::SmartFd::Ptr smartfd
Definition: CellStore.h:74
SWC::Ranger::CellStore::Block::Read::m_queue
std::queue< BlockLoader * > m_queue
Definition: CellStoreBlock.h:103
SWC::Ranger::BlockLoader
Definition: RangeBlockLoader.h:19
SWC::Ranger::CellStore::Block::Read
Definition: CellStoreBlock.h:28
SWC::Ranger::CellStore::Block::Read::load_header
static void load_header(int &err, FS::SmartFd::Ptr &smartfd, Header &header)
Definition: CellStoreBlock.cc:38
SWC::Ranger::CellStore::Block::Read::m_mutex
Core::MutexSptd m_mutex
Definition: CellStoreBlock.h:97
SWC::Ranger::CellStore::Block::Read::m_processing
Core::Atomic< size_t > m_processing
Definition: CellStoreBlock.h:101
SWC::Ranger::CellStore::Block::Header::decode
void decode(const uint8_t **bufp, size_t *remainp)
Definition: CellStoreBlockHeader.cc:65
SWC::Ranger::CellStore::Read::_run_queued
void _run_queued()
Definition: CellStore.cc:311
SWC::Ranger::CellStore::Block::Header::checksum_data
uint32_t checksum_data
Definition: CellStoreBlockHeader.h:37
SWC::LOG_ERROR
@ LOG_ERROR
Definition: Logger.h:32
SWC::Ranger::CellStore::Block::Read::loaded
bool loaded() const noexcept
Definition: CellStoreBlock.cc:201
SWC::System::Resources::less_mem_future
SWC_CAN_INLINE void less_mem_future(size_t sz) noexcept
Definition: Resources.h:109
SWC::Core::Buffer::free
SWC_CAN_INLINE void free() noexcept
Definition: Buffer.h:85
SWC::Condition::NONE
@ NONE
Definition: Comparators.h:28
SWC::Core::BufferDyn::fill
constexpr SWC_CAN_INLINE size_t fill() const noexcept
Definition: Buffer.h:192
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC::Error::CHECKSUM_MISMATCH
@ CHECKSUM_MISMATCH
Definition: Error.h:62
SWC::Core::AtomicBase::load
constexpr SWC_CAN_INLINE T load() const noexcept
Definition: Atomic.h:42
SWC::Ranger::CellStore::Block::Header::encoder
DB::Types::Encoder encoder
Definition: CellStoreBlockHeader.h:33
SWC::DB::Types::MngrColumn::LOADING
@ LOADING
Definition: MngrColumnState.h:19
SWC::Ranger::CellStore::Block::Read::release
size_t release()
Definition: CellStoreBlock.cc:165
SWC::Core::Encoder::encode
void encode(int &err, Type encoder, const uint8_t *src, size_t src_sz, size_t *sz_enc, DynamicBuffer &output, uint32_t reserve, bool no_plain_out=false, bool ok_more=false)
Definition: Encoder.cc:222
SWC::Ranger::CellStore::Block::Header::encode
void encode(uint8_t **bufp)
Definition: CellStoreBlockHeader.cc:51
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Ranger::CellStore::Block::Read::print
void print(std::ostream &out)
Definition: CellStoreBlock.cc:221
SWC::Ranger::CellStore::Block::Read::load_read
void load_read(int err, StaticBuffer &&buffer)
Definition: CellStoreBlock.cc:292
SWC::Ranger::CellStore::Block::Read::load
void load()
Definition: CellStoreBlock.cc:129
SWC::Env::Rgr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: RangerEnv.h:114
SWC::Ranger::CellStore::Block::Header::size_plain
uint32_t size_plain
Definition: CellStoreBlockHeader.h:34
SWC::Core::Atomic::fetch_sub
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
Definition: Atomic.h:88
SWC::Ranger::CellStore::Block::Read::processing
bool processing() noexcept
Definition: CellStoreBlock.cc:185
SWC::Error::print
void print(std::ostream &out, int err)
Definition: Error.cc:191
SWC::Core::Atomic::fetch_add
constexpr SWC_CAN_INLINE T fetch_add(T v) noexcept
Definition: Atomic.h:93
SWC::Ranger::CellStore::Block::Read::to_string
static const char *SWC_CONST_FUNC to_string(const State state) noexcept
Definition: CellStoreBlock.cc:14
SWC::Ranger::CellStore::Block::Header::size_enc
uint32_t size_enc
Definition: CellStoreBlockHeader.h:35
SWC::Ranger::CellStore::Block::Header::print
void print(std::ostream &out) const
Definition: CellStoreBlockHeader.cc:109
SWC::Core::MutexSptd::try_full_lock
bool try_full_lock(bool &support) noexcept
Definition: MutexSptd.h:55
SWC::Serialization::decode_i32
SWC_CAN_INLINE uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Definition: Serialization.h:143
SWC::Ranger::CellStore::Block::Read::cellstore
CellStore::Read * cellstore
Definition: CellStoreBlock.h:50
SWC::Core::Buffer::set
void set(value_type *data, size_t len, bool take_ownership) noexcept
Definition: Buffer.h:109