SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
CommitLogFragment.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 
9 namespace SWC { namespace Ranger { namespace CommitLog {
10 
11 
12 const char* Fragment::to_string(Fragment::State state) noexcept {
13  switch(state) {
14  case State::NONE:
15  return "NONE";
16  case State::LOADING:
17  return "LOADING";
18  case State::LOADED:
19  return "LOADED";
20  case State::WRITING:
21  return "WRITING";
22  default:
23  return "UKNOWN";
24  }
25 }
26 
27 
29 Fragment::Ptr Fragment::make_read(int& err, std::string&& filepath,
30  const DB::Types::KeySeq key_seq) {
31  auto smartfd = FS::SmartFd::make_ptr(std::move(filepath), 0);
32  return make_read(err, smartfd, key_seq);
33 }
34 
36  const DB::Types::KeySeq key_seq) {
37  uint8_t version = 0;
40  size_t size_plain = 0;
41  size_t size_enc = 0;
42  uint32_t cell_revs = 0;
43  uint32_t cells_count = 0;
44  uint32_t data_checksum = 0;
45  uint32_t offset_data = 0;
46 
48  err, smartfd,
52  );
53 
54  return Fragment::Ptr(err ? nullptr : new Fragment(
55  smartfd,
56  version, std::move(interval),
60  ));
61 }
62 
64 void Fragment::load_header(int& err, FS::SmartFd::Ptr& smartfd,
65  uint8_t& version,
66  DB::Cells::Interval& interval,
68  size_t& size_plain, size_t& size_enc,
69  uint32_t& cell_revs, uint32_t& cells_count,
70  uint32_t& data_checksum, uint32_t& offset_data) {
71  const auto& fs_if = Env::FsInterface::interface();
72  const auto& fs = Env::FsInterface::fs();
73 
74  while(err != Error::FS_PATH_NOT_FOUND &&
76  if(err) {
78  Error::print(SWC_LOG_OSTREAM << "Retrying to ", err);
79  smartfd->print(SWC_LOG_OSTREAM);
80  );
81  fs_if->close(err, smartfd);
82  err = Error::OK;
83  }
84 
85  if(!smartfd->valid() && !fs_if->open(err, smartfd) && err)
86  return;
87  if(err)
88  continue;
89 
90  StaticBuffer buf;
91  if(fs->pread(err, smartfd, 0, &buf, HEADER_SIZE) != HEADER_SIZE)
92  continue;
93 
94  const uint8_t *ptr = buf.base;
95 
96  size_t remain = HEADER_SIZE;
98  uint32_t header_extlen = Serialization::decode_i32(&ptr, &remain);
100  buf.base, HEADER_SIZE-4)) {
102  continue;
103  }
104  buf.free();
105 
106  if(fs->pread(err, smartfd, HEADER_SIZE, &buf, header_extlen)
107  != header_extlen)
108  continue;
109 
110  ptr = buf.base;
111  remain = header_extlen;
112 
113  interval.decode(&ptr, &remain, true);
120 
122  buf.base, header_extlen-4)) {
124  continue;
125  }
126  offset_data = HEADER_SIZE+header_extlen;
127  break;
128  }
129 
130  int tmperr = Error::OK;
131  fs_if->close(tmperr, smartfd);
132 }
133 
134 
136  DB::Cells::Interval&& interval,
138  const uint32_t cell_revs,
139  const uint32_t cells_count,
140  DynamicBuffer& cells,
141  StaticBuffer& buffer) {
142  auto smartfd = FS::SmartFd::make_ptr(
143  std::move(filepath),
145  );
146 
147  const uint8_t version = VERSION;
148  const size_t size_plain = cells.fill();
149  size_t size_enc = 0;
150  uint32_t data_checksum = 0;
151  uint32_t offset_data = 0;
152 
153  write(
154  err,
155  version, interval,
158  cells, buffer
159  );
160  if(err)
161  return nullptr;
162 
163  auto frag = new Fragment(
164  smartfd,
165  version, std::move(interval),
168  State::WRITING
169  );
170  frag->m_buffer.set(cells);
171  return Fragment::Ptr(frag);
172 }
173 
175 void Fragment::write(int& err,
176  const uint8_t version,
177  const DB::Cells::Interval& interval,
179  const size_t size_plain, size_t& size_enc,
180  const uint32_t cell_revs, const uint32_t cells_count,
181  uint32_t& data_checksum, uint32_t& offset_data,
182  DynamicBuffer& cells, StaticBuffer& buffer) {
183  uint32_t header_extlen = interval.encoded_length()+HEADER_EXT_FIXED_SIZE;
185  offset_data += header_extlen;
186 
187  DynamicBuffer output;
188  err = Error::OK;
190  &size_enc, output, offset_data);
191  if(err)
192  return;
193 
194  if(!size_enc) {
196  encoder = DB::Types::Encoder::PLAIN;
197  }
198 
199  uint8_t * bufp = output.base;
201  Serialization::encode_i32(&bufp, header_extlen);
202  Core::checksum_i32(output.base, bufp, &bufp);
203 
204  uint8_t * header_extptr = bufp;
205  interval.encode(&bufp);
206  Serialization::encode_i8(&bufp, uint8_t(encoder));
211 
212  Core::checksum_i32(output.base+offset_data, output.base+output.fill(),
213  &bufp, data_checksum);
214  Core::checksum_i32(header_extptr, bufp, &bufp);
215 
216  buffer.set(output);
217 }
218 
219 
222  const uint8_t a_version,
223  DB::Cells::Interval&& a_interval,
224  const DB::Types::Encoder a_encoder,
225  const size_t a_size_plain, const size_t a_size_enc,
226  const uint32_t a_cell_revs, const uint32_t a_cells_count,
227  const uint32_t a_data_checksum,
228  const uint32_t a_offset_data,
229  Fragment::State state) noexcept
230  : version(a_version), interval(std::move(a_interval)),
231  encoder(a_encoder),
232  size_plain(a_size_plain), size_enc(a_size_enc),
233  cell_revs(a_cell_revs), cells_count(a_cells_count),
234  data_checksum(a_data_checksum),
235  offset_data(a_offset_data),
236  m_mutex(),
237  m_state(state), m_marked_removed(false), m_err(Error::OK),
238  m_processing(m_state == State::WRITING),
239  m_cells_remain(cells_count),
240  m_smartfd(smartfd), m_buffer(), m_queue() {
241  if(m_state != State::NONE)
243 }
244 
247  if(m_state == State::LOADED && !m_marked_removed)
249 }
250 
253  return shared_from_this();
254 }
255 
257 const std::string& Fragment::get_filepath() const noexcept {
258  return m_smartfd->filepath();
259 }
260 
261 void Fragment::write(int err, uint8_t blk_replicas,
262  StaticBuffer&& buff_write,
263  Core::Semaphore* sem) {
264  if(err && err != Error::SERVER_SHUTTING_DOWN) {
265  if(err != Error::UNPOSSIBLE)
267  Error::print(SWC_LOG_OSTREAM << "Retrying write to ", err);
268  print(SWC_LOG_OSTREAM << ' ');
269  );
270 
271  Env::FsInterface::fs()->write(
272  [frag=ptr(), blk_replicas, sem]
273  (int _err, StaticBuffer&& buff_w) mutable {
274  struct Task {
275  Ptr frag;
276  StaticBuffer buff_w;
277  Core::Semaphore* sem;
278  int error;
279  uint8_t blk_replicas;
281  Task(Ptr&& a_frag, uint8_t a_blk_replicas,
282  StaticBuffer&& a_buff_w,
283  Core::Semaphore* a_sem, int a_error) noexcept
284  : frag(std::move(a_frag)),
285  buff_w(std::move(a_buff_w)), sem(a_sem),
286  error(a_error),
287  blk_replicas(a_blk_replicas) {
288  }
290  Task(Task&& other) noexcept
291  : frag(std::move(other.frag)),
292  buff_w(std::move(other.buff_w)),
293  sem(other.sem), error(other.error),
294  blk_replicas(other.blk_replicas) {
295  }
296  Task(const Task&) = delete;
297  Task& operator=(Task&&) = delete;
298  Task& operator=(const Task&) = delete;
299  ~Task() noexcept { }
300  void operator()() {
301  frag->write(error, blk_replicas, std::move(buff_w), sem);
302  }
303  };
305  Task(
306  std::move(frag), blk_replicas, std::move(buff_w), sem, _err
307  )
308  );
309  },
310  m_smartfd, blk_replicas, std::move(buff_write)
311  );
312  return;
313  }
314 
315  //if(err) remains Error::SERVER_SHUTTING_DOWN -- write local dump
316 
317  m_smartfd->flags(0);
318  sem->release();
319 
320  if(err) {
321  m_buffer.free();
323  }
324 
325  bool keep;
326  {
328  keep = m_processing.fetch_sub(1) > 1 || !m_queue.empty();
329  m_state.store(err ? State::NONE : State::LOADED);
330  m_err = err;
331  }
332 
333  if(keep) {
334  struct Task {
335  Ptr frag;
337  Task(Ptr&& a_frag) noexcept : frag(std::move(a_frag)) { }
339  Task(Task&& other) noexcept : frag(std::move(other.frag)) { }
340  Task(const Task&) = delete;
341  Task& operator=(Task&&) = delete;
342  Task& operator=(const Task&) = delete;
343  ~Task() noexcept { }
344  void operator()() { frag->run_queued(); }
345  };
346  Env::Rgr::post(Task(ptr()));
347  } else {
348  release();
349  }
350 }
351 
354  int err;
357  TaskLoadRead(Ptr&& a_frag, int a_err,
358  StaticBuffer&& a_buffer) noexcept
359  : frag(std::move(a_frag)), err(a_err),
360  buffer(std::move(a_buffer)) { }
362  TaskLoadRead(TaskLoadRead&& other) noexcept
363  : frag(std::move(other.frag)), err(other.err),
364  buffer(std::move(other.buffer)) {
365  }
366  TaskLoadRead(const TaskLoadRead&) = delete;
369  ~TaskLoadRead() noexcept { }
370  void operator()() { frag->load_read(err, std::move(buffer)); }
371 };
372 
374  auto at(State::NONE);
375  {
378  if(at != State::LOADED)
379  m_queue.push(cb);
380  }
381  switch(at) {
382  case State::NONE: {
385  struct Task {
386  Ptr frag;
388  Task(Ptr&& a_frag) noexcept : frag(std::move(a_frag)) { }
390  Task(Task&& other) noexcept : frag(std::move(other.frag)) { }
391  Task(const Task&) = delete;
392  Task& operator=(Task&&) = delete;
393  Task& operator=(const Task&) = delete;
394  ~Task() noexcept { }
395  void operator()() {
396  Env::FsInterface::fs()->combi_pread(
397  [frag=frag] (int err, StaticBuffer&& buffer) mutable {
399  TaskLoadRead(std::move(frag), err, std::move(buffer))
400  );
401  },
402  frag->m_smartfd, frag->offset_data, frag->size_enc
403  );
404  }
405  };
406  Env::Rgr::post(Task(ptr()));
407  return;
408  }
409  case State::WRITING:
410  case State::LOADING:
411  return;
412  default: // case State::LOADED:
413  return cb->loaded(ptr());
414  }
415 }
416 
418 void Fragment::load_cells(int&, Ranger::Block::Ptr cells_block) {
419  ssize_t remain_hint(0);
420  if(!marked_removed()) {
421  if(m_buffer.size) {
422  bool was_splitted;
423  remain_hint = m_cells_remain.sub_rslt(
424  cells_block->load_cells(
427  was_splitted
428  )
429  );
430  } else {
432  print(SWC_LOG_OSTREAM << "Fragment::load_cells empty buf ");
433  );
434  }
435  }
436 
437  if(m_processing.fetch_sub(1) == 1 &&
438  (remain_hint <= 0 || Env::Rgr::res().need_ram(size_plain)))
439  release();
440 }
441 
442 SWC_SHOULD_NOT_INLINE
444  if(!marked_removed()) {
445  if(m_buffer.size) {
446  size_t count = 0;
447  bool synced = cells.empty();
448  size_t offset_hint = 0;
449  size_t offset_it = 0;
450  const uint8_t* buf = m_buffer.base;
451  size_t remain = m_buffer.size;
452  try { for(DB::Cells::Cell cell; remain; ++count) {
453  cell.read(&buf, &remain, false);
454  synced
455  ? cells.add_sorted(cell)
456  : cells.add_raw(cell, &offset_it, &offset_hint, false);
457 
458  } } catch(...) {
461  << "Cell trunclated at count=" << count << '/' << cells_count
462  << " remain=" << remain << ' ';
465  );
466  }
467  } else {
469  print(SWC_LOG_OSTREAM << "Fragment::load_cells empty buf ");
470  );
471  }
472  }
474 }
475 
477 void Fragment::split(int&, const DB::Cell::Key& key,
478  Fragments::Ptr log_left, Fragments::Ptr log_right) {
479  if(!marked_removed()) {
480  if(m_buffer.size) {
481  size_t count = 0;
482  const uint8_t* buf = m_buffer.base;
483  size_t remain = m_buffer.size;
484 
485  try { for(DB::Cells::Cell cell; remain; ++count) {
486  cell.read(&buf, &remain, false);
488  ? log_right->add(cell)
489  : log_left->add(cell);
490 
491  } } catch(...) {
494  << "Cell trunclated at count=" << count << '/' << cells_count
495  << " remain=" << remain << ' ';
498  );
499  }
500  } else {
502  print(SWC_LOG_OSTREAM << "Fragment::load_cells empty buf ");
503  );
504  }
505  }
506  if(m_processing.fetch_sub(1) == 1)
507  release();
508 }
509 
513 }
514 
518 }
519 
521  size_t released = 0;
522  bool support;
523  if(!m_processing &&
524  !marked_removed() &&
525  m_state == State::LOADED &&
526  m_mutex.try_full_lock(support)) {
527  State at = State::LOADED;
528  if(!marked_removed() &&
529  m_state == State::LOADED &&
530  m_queue.empty() &&
531  !m_processing &&
533  released += size_plain;
534  m_buffer.free();
536  }
537  m_mutex.unlock(support);
538  }
539  if(released)
541  return released;
542 }
543 
545 bool Fragment::loaded(int& err) noexcept {
546  Core::MutexSptd::scope lock(m_mutex);
547  return !(err = m_err) && m_state == State::LOADED;
548 }
549 
551 size_t Fragment::size_bytes() const noexcept {
552  return size_plain;
553 }
554 
556 size_t Fragment::size_bytes(bool only_loaded) const noexcept {
557  return only_loaded && m_state == State::NONE ? 0 : size_plain;
558 }
559 
561 size_t Fragment::size_bytes_encoded() const noexcept {
562  return size_enc;
563 }
564 
566 bool Fragment::processing() noexcept {
567  bool support;
568  bool busy = m_processing ||
569  m_state == State::WRITING ||
570  m_state == State::LOADING ||
571  !m_mutex.try_full_lock(support);
572  if(!busy) {
573  busy = m_state == State::WRITING ||
574  m_state == State::LOADING ||
575  !m_queue.empty() ||
576  m_processing;
577  m_mutex.unlock(support);
578  }
579  return busy;
580 }
581 
583 bool Fragment::marked_removed() const noexcept {
584  return m_marked_removed;
585 }
586 
587 bool Fragment::mark_removed() noexcept {
588  bool do_remove = !m_marked_removed.exchange(true);
589  State state;
590  {
592  state = m_state.exchange(State::LOADED);
593  }
594  if(do_remove && state != State::NONE)
596  return do_remove;
597 }
598 
599 void Fragment::remove(int &err) {
600  if(mark_removed()) {
601  if(m_smartfd->valid()) {
602  int tmperr = Error::OK;
603  Env::FsInterface::interface()->close(tmperr, m_smartfd);
604  }
605  Env::FsInterface::interface()->remove(err, m_smartfd->filepath());
606  }
607 }
608 
610  if(m_smartfd->valid()) {
612  [sem, frag=ptr()](int err) {
613  frag->remove(err=Error::OK, sem);
614  },
615  m_smartfd
616  );
617  } else if(mark_removed()) {
618  Env::FsInterface::interface()->remove(
619  [sem, frag=ptr()](int) noexcept { sem->release(); },
620  m_smartfd->filepath()
621  );
622  }
623 }
624 
625 void Fragment::print(std::ostream& out) {
626  out << "Fragment(version=" << int(version)
627  << " count=" << cells_count
628  << " offset=" << offset_data
629  << " encoder=" << Core::Encoder::to_string(encoder)
630  << " enc/size=" << size_enc << '/' << size_plain
631  << " state=" << to_string(m_state)
632  << " processing=" << m_processing.load();
633  {
635  out << " queue=" << m_queue.size();
636  m_smartfd->print(out << ' ');
637  if(m_err)
638  Error::print(out << ' ', m_err);
639  }
640  if(m_marked_removed)
641  out << " MARKED-REMOVED";
642  out << ' ' << interval << ')';
643 }
644 
645 void Fragment::load_read(int err, StaticBuffer&& buffer) {
646  do_load_read:
647 
648  if(marked_removed())
649  return load_finish(err);
650 
651  switch(err) {
654  return load_finish(err);
655  case Error::OK:
656  break;
657  default: {
659  Error::print(SWC_LOG_OSTREAM << "Retrying to ", err);
660  print(SWC_LOG_OSTREAM << ' ');
661  );
662  Env::FsInterface::fs()->combi_pread(
663  [frag=ptr()] (int _err, StaticBuffer&& buff) mutable {
665  TaskLoadRead(std::move(frag), _err, std::move(buff))
666  );
667  },
669  );
670  //std::this_thread::sleep_for(std::chrono::microseconds(10000));
671  return;
672  }
673  }
674  if(!err) {
677 
678  } else if(buffer.size != size_enc) {
679  err = Error::FS_EOF;
680 
681  } else {
682  if(encoder != DB::Types::Encoder::PLAIN) {
683  StaticBuffer decoded_buf(size_plain);
685  err, encoder, buffer.base, size_enc, decoded_buf.base, size_plain);
686  if(!err)
687  m_buffer.set(decoded_buf);
688  } else {
689  m_buffer.set(buffer);
690  }
691  }
692  }
693  if(err) {
694  buffer.free();
695  goto do_load_read;
696  } else {
697  load_finish(err);
698  }
699 }
700 
701 void Fragment::load_finish(int err) {
703  if(err)
705  Error::print(SWC_LOG_OSTREAM << "CommitLog::Fragment load ", err);
706  print(SWC_LOG_OSTREAM << ' ');
707  );
708 
709  if(m_marked_removed) {
710  m_buffer.free();
711  } else {
712  if(err == Error::FS_PATH_NOT_FOUND) {
713  m_buffer.free();
714  err = Error::OK;
715  }
716  bool _marked_removed;
717  {
719  if(!(_marked_removed = m_state == State::LOADED))
720  m_state.store(err ? State::NONE : State::LOADED);
721  m_err = err;
722  }
723  if(err && !_marked_removed)
725  }
726 
727  run_queued();
728 }
729 
730 
732  for(LoadCallback* cb;;) {
733  {
735  if(m_queue.empty())
736  return;
737  cb = m_queue.front();
738  m_queue.pop();
739  }
740  cb->loaded(ptr());
741  }
742 }
743 
744 
745 
746 }}} // namespace SWC::Ranger::CommitLog
SWC::Ranger::CommitLog::Fragment::offset_data
const uint32_t offset_data
Definition: CommitLogFragment.h:92
SWC::DB::Cells::Interval::encoded_length
size_t SWC_PURE_FUNC encoded_length() const noexcept
Definition: Interval.cc:117
SWC::Ranger::CommitLog::Fragment::load_finish
void load_finish(int err)
Definition: CommitLogFragment.cc:701
SWC::Ranger::CommitLog::Fragment::TaskLoadRead
Definition: CommitLogFragment.cc:352
SWC::Ranger::CommitLog::Fragment::Ptr
std::shared_ptr< Fragment > Ptr
Definition: CommitLogFragment.h:31
SWC::Core::AtomicBase::compare_exchange_weak
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
Definition: Atomic.h:52
SWC::Core::Semaphore
Definition: Semaphore.h:16
SWC::Env::Rgr::res
static SWC_CAN_INLINE System::Resources & res() noexcept
Definition: RangerEnv.h:131
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::TaskLoadRead
SWC_CAN_INLINE TaskLoadRead(Ptr &&a_frag, int a_err, StaticBuffer &&a_buffer) noexcept
Definition: CommitLogFragment.cc:357
SWC::Ranger::CommitLog::Fragment::version
const uint8_t version
Definition: CommitLogFragment.h:84
SWC::Ranger::Block::load_cells
size_t load_cells(const uint8_t *buf, size_t remain, uint32_t revs, size_t avail, bool &was_splitted, bool synced=false)
Definition: RangeBlock.cc:209
SWC::Ranger::Block
Definition: RangeBlock.h:23
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::frag
Ptr frag
Definition: CommitLogFragment.cc:353
SWC::Error::SERVER_SHUTTING_DOWN
@ SERVER_SHUTTING_DOWN
Definition: Error.h:84
SWC::Ranger::CommitLog::Fragment::load_read
void load_read(int err, StaticBuffer &&buffer)
Definition: CommitLogFragment.cc:645
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::operator=
TaskLoadRead & operator=(TaskLoadRead &&)=delete
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Ranger::CommitLog::Fragment::cells_count
const uint32_t cells_count
Definition: CommitLogFragment.h:90
SWC::FS::WRITE_VALIDATE_LENGTH
@ WRITE_VALIDATE_LENGTH
Definition: FileSystem.h:37
SWC::Ranger::CommitLog::Fragment::make_read
static Ptr make_read(int &err, std::string &&filepath, const DB::Types::KeySeq key_seq)
Definition: CommitLogFragment.cc:29
SWC::Core::Encoder::Type
Type
Definition: Encoder.h:28
SWC::Ranger::CommitLog::Fragment::to_string
static const char *SWC_CONST_FUNC to_string(State state) noexcept
Definition: CommitLogFragment.cc:12
SWC::Condition::GT
@ GT
Definition: Comparators.h:30
SWC::Ranger::CommitLog::Fragment::size_enc
const size_t size_enc
Definition: CommitLogFragment.h:88
SWC::Serialization::encode_i32
SWC_CAN_INLINE void encode_i32(uint8_t **bufp, uint32_t val) noexcept
Definition: Serialization.h:138
SWC::Env::FsInterface::interface
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
Definition: Interface.h:150
SWC::System::Resources::more_mem_future
SWC_CAN_INLINE void more_mem_future(size_t sz) noexcept
Definition: Resources.h:104
SWC::FS::SmartFd::make_ptr
static SWC_CAN_INLINE Ptr make_ptr(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
Definition: SmartFd.h:40
SWC::DB::Cells::Interval::encode
void encode(uint8_t **ptr) const
Definition: Interval.cc:126
SWC::DB::Cells::Interval::key_seq
const Types::KeySeq key_seq
Definition: Interval.h:25
SWC::Core::checksum_i32_chk
SWC_CAN_INLINE bool checksum_i32_chk(uint32_t checksum, const uint8_t *base, uint32_t len)
Definition: Checksum.h:94
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Error::FS_PATH_NOT_FOUND
@ FS_PATH_NOT_FOUND
Definition: Error.h:97
SWC::DB::Cells::Interval::decode
void decode(const uint8_t **ptr, size_t *remain, bool owner)
Definition: Interval.cc:135
SWC::Error::UNPOSSIBLE
@ UNPOSSIBLE
Definition: Error.h:42
SWC::Ranger::CommitLog::Fragment::encoder
const DB::Types::Encoder encoder
Definition: CommitLogFragment.h:86
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::TaskLoadRead
SWC_CAN_INLINE TaskLoadRead(TaskLoadRead &&other) noexcept
Definition: CommitLogFragment.cc:362
SWC::DB::Cells::Cell
Definition: Cell.h:92
SWC::Ranger::CommitLog::Fragment::m_buffer
StaticBuffer m_buffer
Definition: CommitLogFragment.h:170
SWC::Core::MutexSptd::unlock
SWC_CAN_INLINE void unlock(const bool &support) noexcept
Definition: MutexSptd.h:71
SWC::Ranger::CommitLog::Fragment::processing
bool processing() noexcept
Definition: CommitLogFragment.cc:566
SWC::Ranger::CommitLog::Fragment::size_plain
const size_t size_plain
Definition: CommitLogFragment.h:87
SWC::DB::Cell::Key
Definition: CellKey.h:24
SWC::Ranger::CommitLog::Fragment::interval
const DB::Cells::Interval interval
Definition: CommitLogFragment.h:85
SWC::Ranger::CommitLog::Fragment::m_cells_remain
Core::Atomic< uint32_t > m_cells_remain
Definition: CommitLogFragment.h:168
SWC::Serialization::encode_i8
constexpr SWC_CAN_INLINE void encode_i8(uint8_t **bufp, uint8_t val) noexcept
Definition: Serialization.h:85
SWC::Ranger::CommitLog::Fragment::split
void split(int &err, const DB::Cell::Key &key, FragmentsPtr log_left, FragmentsPtr log_right)
Definition: CommitLogFragment.cc:477
SWC::Ranger::CommitLog::Fragment::loaded
bool loaded(int &err) noexcept
Definition: CommitLogFragment.cc:545
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::err
int err
Definition: CommitLogFragment.cc:354
CommitLogFragment.h
SWC::Core::Atomic::sub_rslt
constexpr SWC_CAN_INLINE T sub_rslt(T v) noexcept
Definition: Atomic.h:115
SWC::DB::Cells::MutableVec::empty
SWC_CAN_INLINE bool empty() const noexcept
Definition: MutableVec.h:66
SWC::Ranger::CommitLog::Fragments
Definition: CommitLog.h:19
SWC::DB::Types::KeySeq
KeySeq
Definition: KeySeq.h:13
encoder
Core::Encoder::Type encoder
Buffer Encoder.
Definition: HeaderBufferInfo.h:50
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Ranger::CommitLog::Fragment::mark_removed
bool mark_removed() noexcept
Definition: CommitLogFragment.cc:587
SWC::System::Resources::more_mem_releasable
SWC_CAN_INLINE void more_mem_releasable(size_t sz) noexcept
Definition: Resources.h:114
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Error::FS_EOF
@ FS_EOF
Definition: Error.h:96
SWC::Ranger::CommitLog::Fragment::release
size_t release()
Definition: CommitLogFragment.cc:520
SWC::System::Resources::less_mem_releasable
SWC_CAN_INLINE void less_mem_releasable(size_t sz) noexcept
Definition: Resources.h:119
SWC::DB::Cell::Serial::Value::UNKNOWN
@ UNKNOWN
Definition: CellValueSerialField.h:34
SWC::Core::Encoder::decode
void decode(int &err, Type encoder, const uint8_t *src, size_t sz_enc, uint8_t *dst, size_t sz)
Definition: Encoder.cc:99
SWC::Ranger::CommitLog::Fragment::ptr
Ptr ptr()
Definition: CommitLogFragment.cc:252
SWC_CURRENT_EXCEPTION
#define SWC_CURRENT_EXCEPTION(_msg_)
Definition: Exception.h:119
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::~TaskLoadRead
~TaskLoadRead() noexcept
Definition: CommitLogFragment.cc:369
SWC::Core::Buffer::base
value_type * base
Definition: Buffer.h:131
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::FS::OPEN_FLAG_OVERWRITE
@ OPEN_FLAG_OVERWRITE
Definition: FileSystem.h:35
SWC::Ranger::CommitLog::Fragment::print
void print(std::ostream &out)
Definition: CommitLogFragment.cc:625
SWC::Ranger::CommitLog::Fragment::LoadCallback::loaded
virtual void loaded(Ptr &&frag)=0
SWC::Core::BufferDyn< StaticBuffer >
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::System::Resources::need_ram
SWC_CAN_INLINE size_t need_ram() const noexcept
Definition: Resources.h:89
SWC::Core::Buffer::size
size_t size
Definition: Buffer.h:130
SWC::Env::FsInterface::fs
static SWC_CAN_INLINE FS::FileSystem::Ptr & fs() noexcept
Definition: Interface.h:155
size_plain
uint32_t size_plain
Buffer set if Encoder not PLAIN.
Definition: HeaderBufferInfo.h:48
SWC::Ranger::CommitLog::Fragment::run_queued
void run_queued()
Definition: CommitLogFragment.cc:731
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::buffer
StaticBuffer buffer
Definition: CommitLogFragment.cc:355
SWC::DB::Cells::MutableVec::add_raw
void add_raw(const Cell &cell, bool finalized)
Definition: MutableVec.cc:35
SWC::Ranger::CommitLog::Fragment::size_bytes
size_t size_bytes() const noexcept
Definition: CommitLogFragment.cc:551
SWC::Ranger::CommitLog::Fragment::m_smartfd
FS::SmartFd::Ptr m_smartfd
Definition: CommitLogFragment.h:169
SWC::LOG_ERROR
@ LOG_ERROR
Definition: Logger.h:32
SWC::System::Resources::less_mem_future
SWC_CAN_INLINE void less_mem_future(size_t sz) noexcept
Definition: Resources.h:109
SWC::Ranger::CommitLog::Fragment::m_err
int m_err
Definition: CommitLogFragment.h:166
SWC::Ranger::CommitLog::Fragment::marked_removed
bool marked_removed() const noexcept
Definition: CommitLogFragment.cc:583
SWC::Core::Buffer::free
SWC_CAN_INLINE void free() noexcept
Definition: Buffer.h:85
SWC::Ranger::CommitLog::Fragment::m_queue
std::queue< LoadCallback * > m_queue
Definition: CommitLogFragment.h:171
SWC::Condition::NONE
@ NONE
Definition: Comparators.h:28
SWC::Core::AtomicBase::exchange
constexpr SWC_CAN_INLINE T exchange(T value) noexcept
Definition: Atomic.h:47
SWC::Ranger::CommitLog::Fragment::State
State
Definition: CommitLogFragment.h:41
SWC::Ranger::CommitLog::Fragment::write
static void write(int &err, const uint8_t version, const DB::Cells::Interval &interval, DB::Types::Encoder &encoder, const size_t size_plain, size_t &size_enc, const uint32_t cell_revs, const uint32_t cells_count, uint32_t &data_checksum, uint32_t &offset_data, DynamicBuffer &cells, StaticBuffer &buffer)
Definition: CommitLogFragment.cc:175
SWC::DB::Types::Encoder
Core::Encoder::Type Encoder
Definition: Encoder.h:15
SWC::Ranger::CommitLog::Fragment::m_marked_removed
Core::Atomic< bool > m_marked_removed
Definition: CommitLogFragment.h:165
SWC::Core::BufferDyn::fill
constexpr SWC_CAN_INLINE size_t fill() const noexcept
Definition: Buffer.h:192
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC::Ranger::CommitLog::Fragment::load
void load(LoadCallback *cb)
Definition: CommitLogFragment.cc:373
SWC::Error::CHECKSUM_MISMATCH
@ CHECKSUM_MISMATCH
Definition: Error.h:62
SWC::Core::AtomicBase::load
constexpr SWC_CAN_INLINE T load() const noexcept
Definition: Atomic.h:42
SWC::Ranger::CommitLog::Fragment::remove
void remove(int &err)
Definition: CommitLogFragment.cc:599
SWC::Ranger::CommitLog::Fragments::add
void add(const DB::Cells::Cell &cell)
Definition: CommitLog.cc:54
SWC::Ranger::CommitLog::Fragment::processing_decrement
void processing_decrement() noexcept
Definition: CommitLogFragment.cc:516
SWC::DB::Types::MngrColumn::LOADING
@ LOADING
Definition: MngrColumnState.h:19
SWC::Ranger::CommitLog::Fragment::VERSION
static const uint8_t VERSION
Definition: CommitLogFragment.h:38
SWC::Ranger::CommitLog::Fragment::processing_increment
void processing_increment() noexcept
Definition: CommitLogFragment.cc:511
SWC::Ranger::CommitLog::Fragment::~Fragment
~Fragment() noexcept
Definition: CommitLogFragment.cc:246
SWC::Core::Encoder::to_string
const char *SWC_CONST_FUNC to_string(Type typ) noexcept
Definition: Encoder.cc:31
SWC::Core::checksum_i32
SWC_CAN_INLINE void checksum_i32(const uint8_t *start, size_t len, uint8_t **ptr) noexcept
Definition: Checksum.h:62
SWC::Ranger::CommitLog::Fragment::data_checksum
const uint32_t data_checksum
Definition: CommitLogFragment.h:91
SWC::Core::Encoder::encode
void encode(int &err, Type encoder, const uint8_t *src, size_t src_sz, size_t *sz_enc, DynamicBuffer &output, uint32_t reserve, bool no_plain_out=false, bool ok_more=false)
Definition: Encoder.cc:222
SWC::Ranger::CommitLog::Fragment::load_cells
void load_cells(int &err, Ranger::Block::Ptr cells_block)
Definition: CommitLogFragment.cc:418
SWC::DB::Cells::MutableVec
Definition: MutableVec.h:20
SWC::Ranger::CommitLog::Fragment::HEADER_EXT_FIXED_SIZE
static const uint8_t HEADER_EXT_FIXED_SIZE
Definition: CommitLogFragment.h:39
SWC::Common::Files::Schema::filepath
std::string filepath(cid_t cid)
Definition: Schema.h:34
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Ranger::CommitLog::Fragment::get_filepath
const std::string & get_filepath() const noexcept
Definition: CommitLogFragment.cc:257
SWC::Core::Semaphore::release
void release()
Definition: Semaphore.cc:36
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::TaskLoadRead
TaskLoadRead(const TaskLoadRead &)=delete
SWC::Ranger::CommitLog::Fragment::m_processing
Core::Atomic< size_t > m_processing
Definition: CommitLogFragment.h:167
SWC::DB::Cells::MutableVec::add_sorted
void add_sorted(const Cell &cell)
Definition: MutableVec.cc:28
SWC::Env::Rgr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: RangerEnv.h:114
SWC::Ranger::CommitLog::Fragment::size_bytes_encoded
size_t size_bytes_encoded() const noexcept
Definition: CommitLogFragment.cc:561
SWC::Core::Atomic::fetch_sub
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
Definition: Atomic.h:88
SWC::DB::Cells::Interval
Definition: Interval.h:17
SWC::Error::print
void print(std::ostream &out, int err)
Definition: Error.cc:191
SWC::Ranger::CommitLog::Fragment::m_state
Core::Atomic< State > m_state
Definition: CommitLogFragment.h:164
SWC::Core::Atomic::fetch_add
constexpr SWC_CAN_INLINE T fetch_add(T v) noexcept
Definition: Atomic.h:93
SWC::Serialization::decode_i8
constexpr SWC_CAN_INLINE uint8_t decode_i8(const uint8_t **bufp, size_t *remainp)
Definition: Serialization.h:91
SWC::Ranger::CommitLog::Fragment::HEADER_SIZE
static const uint8_t HEADER_SIZE
Definition: CommitLogFragment.h:37
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::operator=
TaskLoadRead & operator=(const TaskLoadRead &)=delete
SWC::Ranger::CommitLog::Fragment::make_write
static Ptr make_write(int &err, std::string &&filepath, DB::Cells::Interval &&interval, DB::Types::Encoder encoder, const uint32_t cell_revs, const uint32_t cells_count, DynamicBuffer &cells, StaticBuffer &buffer)
Definition: CommitLogFragment.cc:135
SWC::Ranger::CommitLog::Fragment::cell_revs
const uint32_t cell_revs
Definition: CommitLogFragment.h:89
SWC::Ranger::CommitLog::Fragment::operator=
Fragment & operator=(const Fragment &)=delete
SWC::Core::MutexSptd::try_full_lock
bool try_full_lock(bool &support) noexcept
Definition: MutexSptd.h:55
SWC::Ranger::CommitLog::Fragment::Fragment
Fragment(const FS::SmartFd::Ptr &smartfd, const uint8_t version, DB::Cells::Interval &&interval, const DB::Types::Encoder encoder, const size_t size_plain, const size_t size_enc, const uint32_t cell_revs, const uint32_t cells_count, const uint32_t data_checksum, const uint32_t offset_data, Fragment::State state) noexcept
Definition: CommitLogFragment.cc:221
SWC::Ranger::CommitLog::Fragment::load_header
static void load_header(int &err, FS::SmartFd::Ptr &smartfd, uint8_t &version, DB::Cells::Interval &interval, DB::Types::Encoder &encoder, size_t &size_plain, size_t &size_enc, uint32_t &cell_revs, uint32_t &cells_count, uint32_t &data_checksum, uint32_t &offset_data)
Definition: CommitLogFragment.cc:64
version
uint8_t version
Protocol version.
Definition: Header.h:52
SWC::Serialization::decode_i32
SWC_CAN_INLINE uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Definition: Serialization.h:143
SWC::DB::KeySeq::compare
Condition::Comp compare(const Types::KeySeq seq, const Cell::Key &key, const Cell::Key &other) SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: KeyComparator.h:294
SWC::Ranger::CommitLog::Fragment::m_mutex
Core::MutexSptd m_mutex
Definition: CommitLogFragment.h:163
SWC::Ranger::CommitLog::Fragment::TaskLoadRead::operator()
void operator()()
Definition: CommitLogFragment.cc:370
SWC::Core::Buffer::set
void set(value_type *data, size_t len, bool take_ownership) noexcept
Definition: Buffer.h:109
SWC::Ranger::CommitLog::Fragment::LoadCallback
Definition: CommitLogFragment.h:32