SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
CommitLog.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 #include "swcdb/core/Time.h"
9 #include "swcdb/core/Semaphore.h"
10 
11 
12 namespace SWC { namespace Ranger { namespace CommitLog {
13 
14 static const uint8_t MAX_FRAGMENTS_NARROW = 20;
15 
16 
19  : range(nullptr), stopping(false),
20  m_mutex_cells(), m_cells(key_seq),
21  m_commit(),
22  m_compacting(false),
23  m_mutex(),
24  m_deleting(false),
25  m_cv(),
26  m_sem(5), m_last_id(0),
27  m_releasable_bytes(0), m_modification_ts(0) {
28 }
29 
32 }
33 
35 void Fragments::init(const RangePtr& for_range) {
36  range = for_range;
37  schema_update();
38 }
39 
44  range->cfg->block_cells() * 2,
45  range->cfg->cell_versions(),
46  range->cfg->cell_ttl(),
47  range->cfg->column_type(),
48  false
49  );
50 }
51 
52 
54 void Fragments::add(const DB::Cells::Cell& cell) {
56  m_cells.add_raw(cell, false);
57 }
58 
59 void Fragments::commit() noexcept {
61  if(m_commit.running())
62  return;
63  bool run = m_mutex_cells.try_lock_shared();
64  if(run) {
65  ssize_t sz = m_cells.size_of_internal();
67  run = _need_roll();
68  m_mutex_cells.unlock_shared();
69  }
70  if(!run)
71  return m_commit.stop();
72 
73  struct Task {
74  Fragments* ptr;
76  Task(Fragments* a_ptr) noexcept : ptr(a_ptr) { }
77  void operator()() { ptr->_commit(false); }
78  };
79  Env::Rgr::post(Task(this));
80 }
81 
83  if(!m_releasable_bytes || !m_mutex.try_lock_shared())
84  return 0;
85  bool ok = !m_compacting && !m_commit.running();
86  m_mutex.unlock_shared();
87  return ok ? _commit(true) : 0;
88 }
89 
91  {
92  Core::UniqueLock lock_wait(m_mutex);
93  if(m_compacting || m_commit.running()) {
94  m_cv.wait(lock_wait, [this] {
95  return !m_compacting && !m_commit.running(); });
96  }
97  }
98  _commit(true);
99 }
100 
101 size_t Fragments::_commit(bool finalize) {
102  Fragment::Ptr frag;
103  size_t released_bytes = 0;
104  ssize_t releasable;
105  uint32_t block_size;
106  for(int err; ;) {
107  {
109  if(m_cells.empty() || (!finalize && !_need_roll()))
110  break;
111  }
112 
113  block_size = range->cfg->block_size();
114  Env::Rgr::res().more_mem_future(block_size);
115 
116  StaticBuffer buff_write;
117  {
118  DynamicBuffer cells;
119  uint32_t cells_count = 0;
120  DB::Cells::Interval interval(range->cfg->key_seq);
122  size_t nxt_id = _next_id();
123  {
126  cells, cells_count, interval,
127  block_size, range->cfg->block_cells());
128  if(m_deleting || !cells.fill())
129  break;
130  releasable = m_cells.size_of_internal();
131  }
132 
133  frag = Fragment::make_write(
134  err = Error::OK,
135  get_log_fragment(nxt_id),
136  std::move(interval),
137  range->cfg->block_enc(), range->cfg->cell_versions(),
138  cells_count, cells,
139  buff_write
140  );
141  if(!frag) {
142  // if can happen checkup (fallbacks to Plain at Encoder err)
143  // put cells back tp m_cells
145  Error::print(SWC_LOG_OSTREAM << "Bad Fragment Write "
146  << range->cfg->cid << '/' << range->rid << ' ', err);
147  );
148  break;
149  }
150  _add(frag);
151  }
152 
153  m_sem.acquire();
154  frag->write(
156  range->cfg->file_replication(),
157  std::move(buff_write),
158  &m_sem
159  );
160 
161  ssize_t tmp = m_releasable_bytes.exchange(releasable);
162  Env::Rgr::res().adj_mem_releasable(releasable - tmp);
163  if(tmp > releasable)
164  released_bytes += tmp - releasable;
165 
167  Env::Rgr::res().less_mem_future(block_size);
168  }
169 
170  if(finalize)
171  m_sem.wait_all();
172 
173  m_commit.stop();
174  {
176  m_cv.notify_all();
177  }
178 
179  if(!finalize)
180  try_compact();
181 
182  return released_bytes;
183 }
184 
188  _add(frag);
189 }
190 
192  for(auto it = cbegin() + _narrow(frag->interval.key_begin);
193  it != cend(); ++it) {
195  (*it)->interval.key_begin, frag->interval.key_begin)
196  != Condition::GT) {
197  insert(it, frag);
198  return;
199  }
200  }
201  push_back(frag);
202 }
203 
206  return m_compacting;
207 }
208 
210  const Fragments::Vec& without,
211  size_t vol) {
213  return _need_compact(groups, without, vol);
214 }
215 
216 bool Fragments::try_compact(uint32_t tnum) {
217  if(stopping || Env::Rgr::res().is_low_mem_state())
218  return false;
219 
220  bool at = false;
221  if(!m_compacting.compare_exchange_weak(at, true))
222  return false;
223 
225  if(at && !(at=range->compact_possible(true)))
227  if(!at) {
228  m_compacting.store(false);
229  {
231  m_cv.notify_all();
232  }
233  return false;
234  }
235 
236  CompactGroups groups;
237  uint8_t cointervaling = range->cfg->log_compact_cointervaling();
238  size_t need = 0;
239  bool need_major;
240  {
242  if(!(need_major = _need_compact_major())) {
243  need = _need_compact(groups, {}, cointervaling);
244  }
245  }
246  if(need) {
247  range->compacting(need / groups.size() > range->cfg->log_rollout_ratio()
248  ? Range::COMPACT_PREPARING // mitigate add
249  : Range::COMPACT_COMPACTING // continue scan & add
250  );
251  new Compact(this, tnum, groups, cointervaling);
252  return true;
253  }
254 
255  finish_compact(nullptr);
256  if(need_major) {
257  range->compact_require(true);
259  }
260  return false;
261 }
262 
263 void Fragments::finish_compact(const Compact* compact) {
264  m_compacting.store(false);
265  {
267  m_cv.notify_all();
268  }
269  range->compacting(Range::COMPACT_NONE);
271 
272  if(compact) {
273  if(!stopping)
274  try_compact(compact->repetition+1);
275  delete compact;
276  }
277 }
278 
279 std::string Fragments::get_log_fragment(const int64_t frag) const {
280  std::string s(range->get_path(DB::RangeBase::LOG_DIR));
281  std::string tmp(std::to_string(frag));
282  s.reserve(s.length() + 6 + tmp.length());
283  s.append("/");
284  s.append(tmp);
285  s.append(".frag");
286  return s;
287 }
288 
289 std::string Fragments::get_log_fragment(const std::string& frag) const {
290  std::string s(range->get_path(DB::RangeBase::LOG_DIR));
291  s.reserve(s.length() + 1 + frag.length());
292  s.append("/");
293  s.append(frag);
294  return s;
295 }
296 
297 void Fragments::load(int &err) {
298  //Core::ScopedLock lock(m_mutex);
299  // fragments header OR log.data >> file.frag(intervals)
300 
301  err = Error::OK;
302  FS::DirentList fragments;
303  Env::FsInterface::interface()->readdir(
304  err, range->get_path(DB::RangeBase::LOG_DIR), fragments);
305  if(err)
306  return;
307 
308  Fragment::Ptr frag;
309  for(auto& entry : fragments) {
310  frag = Fragment::make_read(
311  err, get_log_fragment(entry.name), range->cfg->key_seq);
312  if(err == Error::FS_PATH_NOT_FOUND) {
313  err = Error::OK;
314  continue;
315  }
316  if(!frag)
317  return;
318  add(frag);
319  }
320 }
321 
324  for(auto& frag : *this)
325  intval.expand(frag->interval);
326 }
327 
330  for(auto& frag : *this) {
331  intval.expand(frag->interval);
332  intval.align(frag->interval);
333  }
334 }
335 
337 void Fragments::load_cells(BlockLoader* loader, bool& is_final,
338  Fragments::Vec& frags, uint8_t vol) {
339  uint8_t base = vol;
341  _load_cells(loader, frags, vol);
342  if(is_final && (is_final = base == vol)) {
343  Core::SharedLock lock_cells(m_mutex_cells);
344  loader->block->load_final(m_cells);
345  }
346 }
347 
350  uint8_t& vol) {
351  for(auto& frag : *this) {
352  if(std::find(frags.cbegin(), frags.cend(), frag) == frags.cend() &&
353  loader->block->is_consist(frag->interval)) {
354  frag->processing_increment();
355  frags.push_back(frag);
356  if(!--vol)
357  return;
358  }
359  }
360 }
361 
362 void Fragments::get(Fragments::Vec& fragments) {
364  fragments.assign(cbegin(), cend());
365 }
366 
367 size_t Fragments::release(size_t bytes) {
368  size_t released = 0;
369  if(m_mutex.try_lock_shared()) {
370  for(auto& frag : *this) {
371  released += frag->release();
372  if(released >= bytes)
373  break;
374  }
375  m_mutex.unlock_shared();
376  }
377  return released;
378 }
379 
380 void Fragments::remove(int &err, Fragments::Vec& fragments_old) {
381  Core::Semaphore sem(10);
382  {
384  _remove(err, fragments_old, &sem);
385  }
386  sem.wait_all();
387 }
388 
389 void Fragments::_remove(int &err, Fragments::Vec& fragments_old,
390  Core::Semaphore* semp) {
391  for(auto& frag : fragments_old) {
392  semp->acquire();
393  frag->remove(err, semp);
394  auto it = std::find(cbegin(), cend(), frag);
395  if(it != cend())
396  erase(it);
397  }
398 }
399 
400 void Fragments::remove(int &err, Fragment::Ptr& frag, bool remove_file) {
402  if(remove_file)
403  frag->remove(err);
404  auto it = std::find(cbegin(), cend(), frag);
405  if(it != cend())
406  erase(it);
407 }
408 
410  stopping.store(true);
411  {
412  Core::UniqueLock lock_wait(m_mutex);
413  m_deleting = true;
414  if(m_compacting || m_commit.running())
415  m_cv.wait(lock_wait, [this] {
416  return !m_compacting && !m_commit.running(); });
417  }
418  m_sem.wait_all();
420  for(auto& frag : *this)
421  frag->mark_removed();
422  clear();
423  range = nullptr;
424  m_commit.stop();
425 }
426 
428  stopping.store(true);
429  {
430  Core::UniqueLock lock_wait(m_mutex);
431  if(m_compacting || m_commit.running())
432  m_cv.wait(lock_wait, [this] {
433  return !m_compacting && !m_commit.running(); });
434  }
436  clear();
437  range = nullptr;
438  m_commit.stop();
439 }
440 
442  auto smartfd = FS::SmartFd::make_ptr(get_log_fragment(next_id()), 0);
443  Env::FsInterface::interface()->rename(
444  err, take_frag->get_filepath(), smartfd->filepath());
445 
446  if(!err) {
447  auto frag = Fragment::make_read(err, smartfd, range->cfg->key_seq);
448  if(frag) {
449  add(frag);
450  return frag;
451  }
452  Env::FsInterface::interface()->rename(
453  err = Error::OK, smartfd->filepath(), take_frag->get_filepath());
454  }
455  return nullptr;
456 }
457 
459  Fragments::Vec& removing) {
460  const auto& fs_if = Env::FsInterface::interface();
461  Fragments::Vec tmp_frags;
462  for(auto it = frags.cbegin(); !stopping && it != frags.cend(); ) {
463  auto smartfd = FS::SmartFd::make_ptr(get_log_fragment(next_id()), 0);
464  fs_if->rename(err, (*it)->get_filepath(), smartfd->filepath());
465  if(err)
466  break;
467  frags.erase(it);
468 
469  auto frag = Fragment::make_read(err, smartfd, range->cfg->key_seq);
470  if(frag) {
471  tmp_frags.push_back(frag);
472  } else {
473  fs_if->remove(err, smartfd->filepath());
474  break;
475  }
476  }
477 
478  Core::Semaphore sem(10);
479  if(stopping || !frags.empty()) {
480  for(auto& frag : tmp_frags) {
481  sem.acquire();
482  frag->remove(err = Error::OK, &sem);
483  }
484  } else {
486  for(auto& frag : tmp_frags)
487  _add(frag);
488  _remove(err = Error::OK, removing, &sem);
489  }
490  removing.clear();
491  sem.wait_all();
492 }
493 
496  return m_deleting;
497 }
498 
499 size_t Fragments::cells_count(bool only_current) {
500  size_t count = 0;
501  {
503  count += m_cells.size();
504  }
505  if(!only_current) {
507  for(auto& frag : *this)
508  count += frag->cells_count;
509  }
510  return count;
511 }
512 
515  Core::SharedLock lock1(m_mutex);
516  if(!Vec::empty())
517  return false;
519  return m_cells.empty();
520 }
521 
522 size_t Fragments::size() noexcept {
524  return Vec::size();
525 }
526 
528 size_t Fragments::size_bytes(bool only_loaded) {
529  return _size_bytes(only_loaded);
530 }
531 
534  size_t size = 0;
535  for(auto& frag : *this)
536  size += frag->size_bytes_encoded();
537  return size;
538 }
539 
540 bool Fragments::processing() noexcept {
541  bool busy;
542  if(!(busy = !m_mutex.try_lock())) {
543  busy = _processing();
544  m_mutex.unlock();
545  }
546  return busy;
547 }
548 
549 uint64_t Fragments::next_id() {
551  return _next_id();
552 }
553 
556  uint64_t new_id = Time::now_ns();
557  if(m_last_id == new_id) {
558  ++new_id;
559  // debug
561  << " Fragments::next_id was the same id=" << new_id; );
562  }
563  return m_last_id = new_id;
564 }
565 
566 void Fragments::print(std::ostream& out, bool minimal) {
567  size_t count = cells_count();
569 
570  out << "CommitLog(count=" << count
571  << " cells=";
572  {
573  Core::SharedLock lock_cells(m_mutex_cells);
574  out << m_cells.size();
575  }
576 
577  out << " fragments=" << Vec::size();
578  if(!minimal) {
579  out << " [";
580  for(auto& frag : *this){
581  frag->print(out);
582  out << ", ";
583  }
584  out << ']';
585  }
586 
587  out << " processing=" << _processing()
588  << " used/actual=" << _size_bytes(true) << '/' << _size_bytes()
589  << ')';
590 }
591 
592 
594 bool Fragments::_need_roll() const noexcept {
595  if(m_cells.empty())
596  return false;
597  auto cells = m_cells.size();
598  if(!cells)
599  return false;
600  auto bytes = m_cells.size_bytes();
601  auto cfg_cells = range->cfg->block_cells();
602  auto cfg_bytes = range->cfg->block_size();
603  if(cells < cfg_cells && bytes < cfg_bytes)
604  return false;
605  auto cfg_ratio = range->cfg->log_rollout_ratio();
606  return bytes >= cfg_bytes * cfg_ratio ||
607  cells >= cfg_cells * cfg_ratio ||
608  Env::Rgr::res().need_ram(bytes);
609 }
610 
612  const Fragments::Vec& without,
613  size_t vol) {
614  size_t need = 0;
615  if(stopping || Vec::size() < vol)
616  return need;
617 
618  Fragments::Vec fragments;
619  if(without.empty()) {
620  fragments.assign(Vec::cbegin(), Vec::cend());
621  } else {
622  if(Vec::size() < without.size() + vol)
623  return need;
624  fragments.reserve(Vec::size() - without.size());
625  for(auto& frag : *this) {
626  if(std::find(without.cbegin(), without.cend(), frag) == without.cend())
627  fragments.push_back(frag);
628  }
629  if(fragments.size() < vol)
630  return need;
631  }
632 
633  groups.emplace_back().push_back(fragments.front());
634  ++need;
635  Fragment::Ptr curt;
636  Condition::Comp cond;
637  for(auto it = fragments.cbegin() + 1; it != fragments.cend(); ++it) {
638  auto const& last = *groups.back().back();
639  if(!( (cond = DB::KeySeq::compare(m_cells.key_seq, last.interval.key_end,
640  (curt = *it)->interval.key_begin)) == Condition::LT ||
641  (cond == Condition::EQ &&
642  curt->cells_count < range->cfg->block_cells() &&
643  curt->size_bytes() < range->cfg->block_size() &&
644  last.cells_count < range->cfg->block_cells() &&
645  last.size_bytes() < range->cfg->block_size() ) ) ) {
646  if(groups.back().size() < vol) {
647  need -= groups.back().size();
648  groups.back().clear();
649  } else {
650  groups.emplace_back();
651  }
652  }
653  groups.back().push_back(curt);
654  ++need;
655  }
656  for(auto it=groups.cbegin(); it != groups.cend(); ) {
657  if(it->size() < vol) {
658  need -= it->size();
659  groups.erase(it);
660  } else {
661  ++it;
662  }
663  }
664  return need;
665 }
666 
668  size_t ok = range->cfg->cellstore_size();
669  ok /= 100;
670  ok *= range->cfg->compact_percent();
671 
672  bool need = Vec::size() > ok/range->cfg->block_size() &&
673  range->blocks.cellstores.blocks_count() < ok/range->cfg->block_size();
674  if(!need) {
675  size_t sz_bytes = 0;
676  for(auto& frag : *this) {
677  if((need = (sz_bytes += frag->size_bytes_encoded()) > ok))
678  break;
679  }
680  }
681  return need;
682 }
683 
685 bool Fragments::_processing() const noexcept {
686  if(m_commit)
687  return true;
688  for(auto& frag : *this)
689  if(frag->processing())
690  return true;
691  return false;
692 }
693 
694 size_t Fragments::_size_bytes(bool only_loaded) {
695  size_t size = 0;
696  {
698  size += m_cells.size_bytes();
699  }
700  for(auto& frag : *this)
701  size += frag->size_bytes(only_loaded);
702  return size;
703 }
704 
706 size_t Fragments::_narrow(const DB::Cell::Key& key) const {
707  size_t offset = 0;
708  if(key.empty() || Vec::size() <= MAX_FRAGMENTS_NARROW)
709  return offset;
710 
711  size_t step = offset = Vec::size() >> 1;
712  try_narrow:
714  (*(cbegin() + offset))->interval.key_begin, key) == Condition::GT) {
715  if(step < MAX_FRAGMENTS_NARROW)
716  return offset;
717  offset += step >>= 1;
718  goto try_narrow;
719  }
720  if((step >>= 1) <= MAX_FRAGMENTS_NARROW)
721  ++step;
722  if(offset < step)
723  return 0;
724  offset -= step;
725  goto try_narrow;
726 }
727 
728 
729 }}} // namespace SWC::Ranger::CommitLog
SWC::Core::Vector< Fragment::Ptr >::erase
SWC_CAN_INLINE iterator erase(size_type offset) noexcept(_NoExceptMoveAssign &&_NoExceptDestructor)
Definition: Vector.h:464
SWC::Ranger::CommitLog::Fragments::_narrow
size_t _narrow(const DB::Cell::Key &key) const
Definition: CommitLog.cc:706
SWC::Ranger::CommitLog::Fragments::load_cells
void load_cells(BlockLoader *loader, bool &is_final, Vec &frags, uint8_t vol)
Definition: CommitLog.cc:337
SWC::Core::Vector::front
constexpr SWC_CAN_INLINE reference front() noexcept
Definition: Vector.h:243
SWC::Ranger::CommitLog::Fragment::Ptr
std::shared_ptr< Fragment > Ptr
Definition: CommitLogFragment.h:31
SWC::Core::AtomicBase::compare_exchange_weak
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
Definition: Atomic.h:52
SWC::Core::Semaphore
Definition: Semaphore.h:16
SWC::Env::Rgr::res
static SWC_CAN_INLINE System::Resources & res() noexcept
Definition: RangerEnv.h:131
SWC::Core::StateRunning::stop
constexpr SWC_CAN_INLINE void stop() noexcept
Definition: StateRunning.h:32
SWC::Ranger::CommitLog::Fragments::empty
bool empty()
Definition: CommitLog.cc:514
SWC::Ranger::CommitLog::Fragments::take_ownership
Fragment::Ptr take_ownership(int &err, Fragment::Ptr &frag)
Definition: CommitLog.cc:441
SWC::Ranger::CommitLog::Fragments::Fragments
Fragments(const DB::Types::KeySeq key_seq)
Definition: CommitLog.cc:18
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::DB::Cells::MutableVec::key_seq
const Types::KeySeq key_seq
Definition: MutableVec.h:29
SWC::Ranger::CommitLog::Compact
Definition: CommitLogCompact.h:14
SWC::Env::Rgr::compaction_schedule
static void compaction_schedule(uint32_t ms)
Definition: RangerEnv.h:385
SWC::Ranger::CommitLog::Fragments::_load_cells
void _load_cells(BlockLoader *loader, Vec &frags, uint8_t &vol)
Definition: CommitLog.cc:349
SWC::Ranger::run
SWC_SHOULD_NOT_INLINE int run()
Definition: main.cc:54
SWC::Ranger::CommitLog::Fragments::m_mutex
std::shared_mutex m_mutex
Definition: CommitLog.h:167
SWC::Core::Vector< Fragment::Ptr >::clear
SWC_CAN_INLINE void clear() noexcept(_NoExceptDestructor)
Definition: Vector.h:120
SWC::Core::UniqueLock
Definition: MutexLock.h:68
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Time::now_ns
SWC_CAN_INLINE int64_t now_ns() noexcept
Definition: Time.h:43
SWC::Ranger::CommitLog::Fragments::unload
void unload()
Definition: CommitLog.cc:427
CommitLog.h
SWC::Ranger::CommitLog::Fragments::deleting
bool deleting()
Definition: CommitLog.cc:494
SWC::Core::ScopedLock
Definition: MutexLock.h:41
SWC::DB::Cells::MutableVec::configure
void configure(uint32_t split, const uint32_t revs, const uint64_t ttl_ns, const Types::Column typ, bool finalized) noexcept
Definition: MutableVec.cc:17
SWC::DB::Cells::Interval::align
SWC_CAN_INLINE bool align(const Interval &other)
Definition: Interval.h:152
SWC::Ranger::CommitLog::Fragments::m_commit
Core::StateRunning m_commit
Definition: CommitLog.h:164
SWC::Ranger::CommitLog::Fragments::m_mutex_cells
std::shared_mutex m_mutex_cells
Definition: CommitLog.h:161
SWC::Ranger::CommitLog::Fragment::make_read
static Ptr make_read(int &err, std::string &&filepath, const DB::Types::KeySeq key_seq)
Definition: CommitLogFragment.cc:29
SWC::Condition::GT
@ GT
Definition: Comparators.h:30
SWC::Env::FsInterface::interface
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
Definition: Interface.h:150
SWC::Core::SharedLock
Definition: MutexLock.h:19
SWC::System::Resources::more_mem_future
SWC_CAN_INLINE void more_mem_future(size_t sz) noexcept
Definition: Resources.h:104
SWC::FS::SmartFd::make_ptr
static SWC_CAN_INLINE Ptr make_ptr(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
Definition: SmartFd.h:40
SWC::Error::FS_PATH_NOT_FOUND
@ FS_PATH_NOT_FOUND
Definition: Error.h:97
SWC::Ranger::CommitLog::Fragments::print
void print(std::ostream &out, bool minimal)
Definition: CommitLog.cc:566
SWC::Core::Semaphore::acquire
void acquire()
Definition: Semaphore.cc:29
SWC::Error::UNPOSSIBLE
@ UNPOSSIBLE
Definition: Error.h:42
SWC::Condition::LT
@ LT
Definition: Comparators.h:34
SWC::DB::Cells::Interval::expand
void expand(const Interval &other)
Definition: Interval.cc:38
SWC::DB::Cells::Cell
Definition: Cell.h:92
SWC::Ranger::CommitLog::Fragments::processing
bool processing() noexcept
Definition: CommitLog.cc:540
SWC::Ranger::CommitLog::Fragments::stopping
Core::AtomicBool stopping
Definition: CommitLog.h:32
SWC::Ranger::CommitLog::MAX_FRAGMENTS_NARROW
static const uint8_t MAX_FRAGMENTS_NARROW
Definition: CommitLog.cc:14
SWC::DB::Cell::Key
Definition: CellKey.h:24
SWC::Ranger::CommitLog::Fragments::_processing
bool _processing() const noexcept
Definition: CommitLog.cc:685
SWC::Ranger::CommitLog::Fragments::_need_roll
bool _need_roll() const noexcept
Definition: CommitLog.cc:594
SWC::Ranger::CommitLog::Fragments::expand
void expand(DB::Cells::Interval &intval)
Definition: CommitLog.cc:322
SWC::Ranger::CommitLog::Fragments::_need_compact
size_t _need_compact(CompactGroups &groups, const Vec &without, size_t vol)
Definition: CommitLog.cc:611
SWC::Ranger::CommitLog::Fragments::_commit
size_t _commit(bool finalize)
Definition: CommitLog.cc:101
SWC::DB::Cells::MutableVec::size
SWC_CAN_INLINE size_t size() const noexcept
Definition: MutableVec.h:71
SWC::DB::Cells::MutableVec::write_and_free
void write_and_free(DynamicBuffer &cells, uint32_t &cell_count, Interval &intval, uint32_t threshold, uint32_t max_cells)
Definition: MutableVec.cc:71
SWC::DB::Cells::MutableVec::empty
SWC_CAN_INLINE bool empty() const noexcept
Definition: MutableVec.h:66
SWC::Ranger::Range::COMPACT_PREPARING
static const uint8_t COMPACT_PREPARING
Definition: Range.h:54
SWC::Ranger::CommitLog::Fragments
Definition: CommitLog.h:19
SWC::Core::Semaphore::wait_all
void wait_all()
Definition: Semaphore.cc:62
SWC::DB::Types::KeySeq
KeySeq
Definition: KeySeq.h:13
SWC::Ranger::CommitLog::Fragments::get
void get(Vec &fragments)
Definition: CommitLog.cc:362
SWC::DB::Cell::Key::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: CellKey.h:186
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Condition::Comp
Comp
Definition: Comparators.h:27
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC::Ranger::RangePtr
std::shared_ptr< Range > RangePtr
Definition: Columns.h:15
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::System::Resources::less_mem_releasable
SWC_CAN_INLINE void less_mem_releasable(size_t sz) noexcept
Definition: Resources.h:119
SWC::Core::Vector::back
constexpr SWC_CAN_INLINE reference back() noexcept
Definition: Vector.h:254
SWC::Ranger::CommitLog::Fragments::remove
void remove()
Definition: CommitLog.cc:409
SWC::Ranger::CommitLog::Fragments::load
void load(int &err)
Definition: CommitLog.cc:297
SWC::Env::Rgr::log_compact_possible
static bool log_compact_possible() noexcept
Definition: RangerEnv.h:370
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Ranger::CommitLog::Fragments::commit_finalize
void commit_finalize()
Definition: CommitLog.cc:90
SWC::Ranger::CommitLog::Fragments::size_bytes
size_t size_bytes(bool only_loaded=false)
Definition: CommitLog.cc:528
SWC::Ranger::CommitLog::Fragments::_next_id
uint64_t _next_id()
Definition: CommitLog.cc:555
SWC::Ranger::CommitLog::Fragments::m_releasable_bytes
Core::Atomic< size_t > m_releasable_bytes
Definition: CommitLog.h:172
SWC::Core::BufferDyn< StaticBuffer >
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::Ranger::CommitLog::Fragments::finish_compact
void finish_compact(const Compact *compact)
Definition: CommitLog.cc:263
SWC::System::Resources::need_ram
SWC_CAN_INLINE size_t need_ram() const noexcept
Definition: Resources.h:89
SWC::Env::Rgr::log_compact_finished
static void log_compact_finished() noexcept
Definition: RangerEnv.h:375
SWC::Condition::EQ
@ EQ
Definition: Comparators.h:32
SWC::System::Resources::adj_mem_releasable
SWC_CAN_INLINE void adj_mem_releasable(ssize_t sz) noexcept
Definition: Resources.h:124
SWC::Ranger::CommitLog::Compact::repetition
const uint32_t repetition
Definition: CommitLogCompact.h:63
SWC::Ranger::BlockLoader
Definition: RangeBlockLoader.h:19
SWC::Ranger::CommitLog::Fragments::_remove
void _remove(int &err, Vec &fragments_old, Core::Semaphore *semp)
Definition: CommitLog.cc:389
SWC::DB::Cells::MutableVec::add_raw
void add_raw(const Cell &cell, bool finalized)
Definition: MutableVec.cc:35
SWC::DB::RangeBase::LOG_DIR
static constexpr const char LOG_DIR[]
Definition: RangeBase.h:22
SWC::Core::StateRunning::running
constexpr SWC_CAN_INLINE bool running() noexcept
Definition: StateRunning.h:37
SWC::Ranger::BlockLoader::block
Block::Ptr block
Definition: RangeBlockLoader.h:22
SWC::Ranger::CommitLog::Fragments::m_modification_ts
Core::Atomic< size_t > m_modification_ts
Definition: CommitLog.h:173
Time.h
SWC::System::Resources::less_mem_future
SWC_CAN_INLINE void less_mem_future(size_t sz) noexcept
Definition: Resources.h:109
SWC::Ranger::CommitLog::Fragments::m_sem
Core::Semaphore m_sem
Definition: CommitLog.h:170
SWC::Ranger::Range::COMPACT_NONE
static const uint8_t COMPACT_NONE
Definition: Range.h:51
Semaphore.h
SWC::Core::AtomicBase::exchange
constexpr SWC_CAN_INLINE T exchange(T value) noexcept
Definition: Atomic.h:47
SWC::Ranger::CommitLog::Fragments::next_id
uint64_t next_id()
Definition: CommitLog.cc:549
SWC::DB::Cells::MutableVec::size_bytes
SWC_CAN_INLINE size_t size_bytes() const noexcept
Definition: MutableVec.h:79
SWC::Core::Vector
Definition: Vector.h:14
SWC::Ranger::Range::COMPACT_COMPACTING
static const uint8_t COMPACT_COMPACTING
Definition: Range.h:53
SWC::Core::BufferDyn::fill
constexpr SWC_CAN_INLINE size_t fill() const noexcept
Definition: Buffer.h:192
SWC::Ranger::CommitLog::Fragments::schema_update
void schema_update()
Definition: CommitLog.cc:41
SWC::Ranger::CommitLog::Fragments::size_bytes_encoded
size_t size_bytes_encoded()
Definition: CommitLog.cc:532
SWC::Ranger::CommitLog::Fragments::commit
void commit() noexcept
Definition: CommitLog.cc:59
SWC::Ranger::CommitLog::Fragments::add
void add(const DB::Cells::Cell &cell)
Definition: CommitLog.cc:54
SWC::Core::Semaphore::wait_available
size_t wait_available()
Definition: Semaphore.cc:55
SWC::Ranger::CommitLog::Fragments::_need_compact_major
bool _need_compact_major()
Definition: CommitLog.cc:667
SWC::Core::Vector< Fragment::Ptr >::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::Ranger::CommitLog::Fragments::size
size_t size() noexcept
Definition: CommitLog.cc:522
SWC::Ranger::Block::is_consist
bool is_consist(const DB::Cells::Interval &intval) const
Definition: RangeBlock.cc:96
SWC::Ranger::CommitLog::Fragments::expand_and_align
void expand_and_align(DB::Cells::Interval &intval)
Definition: CommitLog.cc:328
SWC::Ranger::CommitLog::Fragments::need_compact
size_t need_compact(CompactGroups &groups, const Vec &without, size_t vol)
Definition: CommitLog.cc:209
SWC::Ranger::CommitLog::Fragments::get_log_fragment
std::string get_log_fragment(const int64_t frag) const
Definition: CommitLog.cc:279
SWC::Ranger::CommitLog::Fragments::m_cells
DB::Cells::MutableVec m_cells
Definition: CommitLog.h:162
SWC::Ranger::CommitLog::Fragments::m_cv
std::condition_variable_any m_cv
Definition: CommitLog.h:169
SWC::Ranger::CommitLog::Fragments::try_compact
bool try_compact(uint32_t tnum=1)
Definition: CommitLog.cc:216
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Ranger::CommitLog::Fragments::~Fragments
~Fragments() noexcept
Definition: CommitLog.cc:30
SWC::Ranger::CommitLog::Fragments::range
RangePtr range
Definition: CommitLog.h:31
SWC::DB::Cells::MutableVec::size_of_internal
SWC_CAN_INLINE size_t size_of_internal() const noexcept
Definition: MutableVec.h:87
SWC::Core::Vector::assign
SWC_CAN_INLINE void assign(IteratorT first, IteratorT last)
Definition: Vector.h:452
SWC::Ranger::CommitLog::Fragments::m_deleting
bool m_deleting
Definition: CommitLog.h:168
SWC::Env::Rgr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: RangerEnv.h:114
SWC::Ranger::CommitLog::Fragments::init
void init(const RangePtr &for_range)
Definition: CommitLog.cc:35
SWC::DB::Cells::Interval
Definition: Interval.h:17
SWC::Ranger::CommitLog::Fragments::is_compacting
bool is_compacting() const
Definition: CommitLog.cc:205
SWC::Core::Vector< Fragment::Ptr >::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::Ranger::CommitLog::Fragments::cells_count
size_t cells_count(bool only_current=false)
Definition: CommitLog.cc:499
SWC::Error::print
void print(std::ostream &out, int err)
Definition: Error.cc:191
SWC::Ranger::CommitLog::Fragments::_size_bytes
size_t _size_bytes(bool only_loaded=false)
Definition: CommitLog.cc:694
SWC::Core::Vector< Fragment::Ptr >::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Core::to_string
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Definition: BitFieldInt.h:263
SWC::Ranger::CommitLog::Fragments::m_compacting
Core::AtomicBool m_compacting
Definition: CommitLog.h:165
SWC::Ranger::CommitLog::Fragments::m_last_id
uint64_t m_last_id
Definition: CommitLog.h:171
SWC::Ranger::CommitLog::Fragments::_add
SWC_CAN_INLINE void _add(const DB::Cells::Cell &cell, size_t *offset_itp, size_t *offsetp)
Definition: CommitLog.h:61
SWC::Ranger::CommitLog::Fragment::make_write
static Ptr make_write(int &err, std::string &&filepath, DB::Cells::Interval &&interval, DB::Types::Encoder encoder, const uint32_t cell_revs, const uint32_t cells_count, DynamicBuffer &cells, StaticBuffer &buffer)
Definition: CommitLogFragment.cc:135
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Ranger::Block::load_final
void load_final(const DB::Cells::MutableVec &cells)
Definition: RangeBlock.cc:190
SWC::Core::Vector::emplace_back
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
Definition: Vector.h:349
SWC::Ranger::CommitLog::Fragments::release
size_t release(size_t bytes)
Definition: CommitLog.cc:367
SWC::Core::Vector< Fragment::Ptr >::insert
SWC_CAN_INLINE iterator insert(size_type offset, ArgsT &&... args)
Definition: Vector.h:367
SWC::Core::Vector::reserve
SWC_CAN_INLINE void reserve(size_type cap)
Definition: Vector.h:288
SWC::DB::KeySeq::compare
Condition::Comp compare(const Types::KeySeq seq, const Cell::Key &key, const Cell::Key &other) SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: KeyComparator.h:294
SWC::Ranger::CommitLog::Fragments::commit_release
size_t commit_release()
Definition: CommitLog.cc:82