SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
CompactRange.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
10 
14 
15 namespace SWC { namespace Ranger {
16 
17 struct CompactRange::InBlock final : Core::QueuePointer<InBlock*>::Pointer {
18 
19  const bool is_last;
20 
22  InBlock(const DB::Types::KeySeq key_seq) noexcept
23  : is_last(true),
24  cells(), header(key_seq), err(Error::OK),
25  last_cell(nullptr) {
26  }
27 
29  InBlock(const DB::Types::KeySeq key_seq, size_t size,
30  InBlock* inblock = nullptr)
31  : is_last(false),
32  cells(size + 1000000), header(key_seq), err(Error::OK),
33  last_cell(nullptr) {
34  if(inblock)
35  inblock->move_last(this);
36  }
37 
38  InBlock(const InBlock&) = delete;
39 
40  InBlock(const InBlock&&) = delete;
41 
42  InBlock& operator=(const InBlock&) = delete;
43 
44  ~InBlock() noexcept { }
45 
47  size_t cell_avg_size() const {
48  return cells.fill()/header.cells_count;
49  }
50 
52  void add(const DB::Cells::Cell& cell) {
54  cells.set_mark(); // start of last cell
55  cell.write(cells);
57  }
58 
61  const uint8_t* ptr = last_cell;
62  size_t remain = cells.ptr - ptr;
63  DB::Cells::Cell cell(&ptr, &remain, false);
64  spec.offset_key.copy(cell.key);
65  spec.offset_rev = cell.get_timestamp();
66  }
67 
69  void move_last(InBlock* to) {
70  const uint8_t* ptr = last_cell;
71  size_t remain = cells.ptr - ptr;
72 
73  to->add(DB::Cells::Cell(&ptr, &remain, false));
74 
76  cells.ptr = const_cast<uint8_t*>(last_cell);
77  cells.mark = nullptr;
78  }
79 
80  void finalize_interval(bool any_begin, bool any_end) {
81  const uint8_t* ptr = cells.base;
82  size_t remain = cells.fill();
83  bool set_begin = true;
84 
85  DB::Cells::Cell cell;
86  while(remain) {
87  cell.read(&ptr, &remain, false);
88  header.interval.align(cell.key);
90 
91  if(set_begin) {
93  set_begin = false;
94  }
95  }
97 
98  if(any_begin)
100  if(any_end)
102  }
103 
106  header.encoder = encoding;
107  DynamicBuffer output;
109  if(err)
110  return;
111  cells.free();
112  cells.take_ownership(output);
113  }
114 
117 
118  int err;
119 
120  private:
121 
122  const uint8_t* last_cell;
123 };
124 
125 
127  const RangePtr& a_range,
128  const uint32_t a_cs_size,
129  const uint32_t a_blk_size)
130  : ReqScan(
131  ReqScan::Type::COMPACTION, true,
132  a_compactor->cfg_read_ahead->get()/2, a_blk_size
133  ),
134  fragments_old(),
135  compactor(a_compactor), range(a_range),
136  cs_size(a_cs_size),
137  blk_cells(range->cfg->block_cells()),
138  blk_encoding(range->cfg->block_enc()),
139  m_required_key_last(),
140  tmp_dir(false), cs_writer(nullptr),
141  cellstores(),
142  m_inblock(new InBlock(range->cfg->key_seq, a_blk_size)),
143  m_processing(0),
144  m_q_intval(), m_q_encode(), m_q_write(),
145  total_cells(0), total_blocks(0),
146  time_intval(0), time_encode(0), time_write(0),
147  state_default(Range::COMPACT_COMPACTING),
148  req_last_time(0), req_ts(Time::now_ns()),
149  m_stopped(false), m_chk_final(false),
150  m_get(true),
151  m_mutex(), m_log_sz(0),
152  m_chk_timer(
153  asio::high_resolution_timer(Env::Rgr::io()->executor())) {
154  spec.flags.max_versions = range->cfg->cell_versions();
155 }
156 
158  delete m_inblock;
159 
160  InBlock* blk;
161  while(m_q_intval.pop(&blk))
162  delete blk;
163  while(m_q_encode.pop(&blk))
164  delete blk;
165  while(m_q_write.pop(&blk))
166  delete blk;
167 }
168 
171  return std::dynamic_pointer_cast<CompactRange>(shared_from_this());
172 }
173 
175  // sync processing state
176  range->compacting(Range::COMPACT_APPLYING);
177  int64_t wait_ns = 60000000000 + 100000000 * range->blocks.commitlog.size();
178  if(!range->blocks.wait_processing(Time::now_ns() + wait_ns)) {
180  "COMPACT-ERROR cancelled " SWC_FMT_LU "/" SWC_FMT_LU
181  " waited %u-minutes for processing",
182  range->cfg->cid, range->rid, uint32_t(wait_ns/60000000000));
183  return compactor->compacted(shared(), range);
184  }
185 
186  if(range->blocks.commitlog.cells_count(true))
187  range->blocks.commitlog.commit_finalize();
188 
189  // ? immediate split
190  if(range->blocks.cellstores.size() >= range->cfg->cellstore_max() * 2 &&
191  range->blocks.cellstores.size_bytes(false) > cs_size) {
192  size_t split_at = range->blocks.cellstores.size() / 2;
193  auto it = range->blocks.cellstores.cbegin();
194 
195  split_option:
196  it = range->blocks.cellstores.cbegin() + split_at;
197  do {
198  if(!(*it)->interval.key_begin.equal((*(it-1))->interval.key_end))
199  break;
200  } while(++it != range->blocks.cellstores.cend());
201 
202  if(it == range->blocks.cellstores.cend()) {
203  if(split_at > 1) {
204  split_at = 1;
205  goto split_option;
206  }
207  } else {
208  RangeSplit splitter(range, it - range->blocks.cellstores.cbegin());
209  int err = splitter.run();
210  if(!err)
211  return finished(true);
212 
215  "COMPACT-PROGRESS SPLIT RANGE cancelled " SWC_FMT_LU "/" SWC_FMT_LU,
216  range->cfg->cid, range->rid);
218  );
219  }
220  }
221 
223 }
224 
225 void CompactRange::initial_commitlog(uint32_t tnum) {
226  m_log_sz = range->blocks.commitlog.size();
227  uint8_t cointervaling = range->cfg->log_compact_cointervaling();
228  if(m_log_sz < cointervaling || m_stopped)
229  return initial_commitlog_done(nullptr);
230 
232  if(range->blocks.commitlog.need_compact(groups, {}, cointervaling) &&
233  !m_stopped) {
234  new CommitLog::Compact(
235  &range->blocks.commitlog, tnum, groups, cointervaling,
236  [ptr=shared()] (const CommitLog::Compact* compact) {
237  ptr->initial_commitlog_done(compact);
238  }
239  );
240  } else {
241  initial_commitlog_done(nullptr);
242  }
243 }
244 
246  if(m_stopped) {
247  delete compact;
248  return;
249  }
250  if(compact) {
251  uint32_t tnum = 0;
252  uint8_t cointervaling = range->cfg->log_compact_cointervaling();
253  if(compact->nfrags > 100 ||
254  compact->ngroups > cointervaling ||
255  compact->nfrags / compact->ngroups > cointervaling)
256  tnum += compact->repetition + 1;
257  delete compact;
258  if(tnum) {
259  range->compacting(Range::COMPACT_PREPARING); // range scan can continue
260  return initial_commitlog(tnum);
261  }
262  }
263 
264  range->blocks.commitlog.get(fragments_old); // fragments for removal
265 
266  range->blocks.cellstores.get_key_end(m_required_key_last);
267  for(auto& frag : fragments_old) {
268  if(DB::KeySeq::compare(range->cfg->key_seq,
269  m_required_key_last, frag->interval.key_end) == Condition::GT)
270  m_required_key_last.copy(frag->interval.key_end);
271  }
274  "COMPACT-PROGRESS " SWC_FMT_LU "/" SWC_FMT_LU
275  " early-split possible from scan offset ",
276  range->cfg->cid, range->rid);
278  );
279 
280  range->compacting(state_default); // range scan &/ add can continue
283 
284  range->scan_internal(shared());
285 }
286 
287 bool CompactRange::with_block() const noexcept {
288  return true;
289 }
290 
292  const DB::Cells::Cell& cell, bool&) {
293  return spec.is_matching(
294  key_seq, cell.key, cell.get_timestamp(), cell.is_time_order_desc())
295  &&
296  spec.key_intervals.is_matching_start(key_seq, cell.key);
297 }
298 
300  return m_stopped
301  || (m_inblock->header.cells_count > 1 &&
304 }
305 
307  auto sz = m_inblock->cells.fill();
308  m_inblock->add(cell);
310  return !reached_limits();
311 }
312 
313 void CompactRange::response(int& err) {
314  if(m_stopped)
315  return;
316 
321 
322  profile.finished();
325  "COMPACT-PROGRESS " SWC_FMT_LU "/" SWC_FMT_LU " blocks=" SWC_FMT_LU
326  " avg(i=" SWC_FMT_LU " e=" SWC_FMT_LU " w=" SWC_FMT_LU ")us ",
327  range->cfg->cid, range->rid,
328  total_blocks.load(),
329  (time_intval / total_blocks)/1000,
330  (time_encode / total_blocks)/1000,
331  (time_write / total_blocks)/1000
332  );
334  profile.print(SWC_LOG_OSTREAM << ' ');
335  );
336 
337  if(err) {
338  struct Task {
339  Ptr ptr;
341  Task(Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
343  Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
344  Task(const Task&) = delete;
345  Task& operator=(Task&&) = delete;
346  Task& operator=(const Task&) = delete;
347  ~Task() noexcept { }
348  void operator()() { ptr->quit(); }
349  };
351  return;
352  }
353 
354  bool finishing;
355  if((finishing = !reached_limits())) {
358  range->compacting(Range::COMPACT_APPLYING);
359  range->blocks.wait_processing();
360  range->blocks.commitlog.commit_finalize();
361  }
362 
363  bool is_last;
364  auto in_block = m_inblock->header.cells_count <= 1 ? nullptr : m_inblock;
365  if(in_block) {
367  is_last = false;
368  m_inblock = new InBlock(range->cfg->key_seq, blk_size, in_block);
370 
371  if(can_split_at() > 0 && DB::KeySeq::compare(range->cfg->key_seq,
373  if(spec.key_intervals.empty())
378 
381  "COMPACT-PROGRESS " SWC_FMT_LU "/" SWC_FMT_LU
382  " finishing early-split scan offset ",
383  range->cfg->cid, range->rid
384  );
386  );
387  }
388  } else {
389  is_last = true;
390  in_block = new InBlock(range->cfg->key_seq);
391  }
392 
393  struct NextTask {
394  Ptr ptr;
396  NextTask(Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
398  NextTask(NextTask&& other) noexcept : ptr(std::move(other.ptr)) { }
399  NextTask(const NextTask&) = delete;
400  NextTask& operator=(NextTask&&) = delete;
401  NextTask& operator=(const NextTask&) = delete;
402  ~NextTask() noexcept { }
403  void operator()() { ptr->process_interval(); }
404  };
405  if(m_q_intval.push_and_is_1st(in_block))
406  Env::Rgr::maintenance_post(NextTask(shared()));
407 
408  if(m_stopped || is_last)
409  return;
410 
411  finishing
412  ? commitlog_done(nullptr)
413  : commitlog(1);
414 }
415 
416 bool CompactRange::is_slow_req(int64_t& median) const {
417  median = (total_cells
418  ? (Time::now_ns() - profile.ts_start) / total_cells : 10000)
419  * blk_cells * 3;
420  return req_last_time > median || Time::now_ns() - req_ts > median;
421 }
422 
423 void CompactRange::commitlog(uint32_t tnum) {
424  size_t log_sz = range->blocks.commitlog.size();
425  uint8_t cointervaling = range->cfg->log_compact_cointervaling();
426  if((log_sz > m_log_sz ? log_sz - m_log_sz : m_log_sz - log_sz)
427  < cointervaling || m_stopped)
428  return commitlog_done(nullptr);
429 
431  if(range->blocks.commitlog.need_compact(
432  groups, fragments_old, cointervaling) && !m_stopped) {
433  new CommitLog::Compact(
434  &range->blocks.commitlog, tnum, groups, cointervaling,
435  [ptr=shared()] (const CommitLog::Compact* compact) {
436  ptr->commitlog_done(compact);
437  }
438  );
439  } else {
440  m_log_sz = log_sz;
441  commitlog_done(nullptr);
442  }
443 }
444 
446  if(m_stopped) {
447  delete compact;
448  return;
449  }
450  if(compact) {
451  uint32_t tnum = 0;
452  if(compact->nfrags > 100 ||
453  compact->ngroups > range->cfg->log_rollout_ratio() ||
454  compact->nfrags / compact->ngroups > range->cfg->log_rollout_ratio())
455  tnum += compact->repetition + 1;
456  delete compact;
457 
458  if(!m_stopped && !m_chk_final) {
459  size_t bytes = range->blocks.commitlog.size_bytes_encoded();
460  float fits = float(bytes)/cs_size;
461  if(size_t(fits) + 1 >= range->cfg->cellstore_max() &&
462  fits - size_t(fits) >= float(range->cfg->compact_percent())/100) {
465  range->compacting(Range::COMPACT_PREPARING);
467  "COMPACT-MITIGATE(add req.) " SWC_FMT_LU "/" SWC_FMT_LU
468  " reached max-log-size(" SWC_FMT_LU ")",
469  range->cfg->cid, range->rid, bytes);
470  } else {
471  int64_t median;
472  range->compacting(tnum && is_slow_req(median)
474  }
475  if(tnum && range->is_loaded())
476  return commitlog(tnum);
477  }
478  }
479 
480  m_get.stop();
481  request_more();
482 }
483 
485  {
487  if(m_stopped || m_chk_final)
488  return;
489  }
490 
491  int64_t median;
492  bool slow = is_slow_req(median);
493  if(range->compacting_ifnot_applying(
495  request_more();
496  }
497  if((median /= 1000000) < 1000)
498  median = 1000;
499 
501  if(m_stopped || m_chk_final)
502  return;
503  m_chk_timer.expires_after(std::chrono::milliseconds(median));
504  struct TimerTask {
505  Ptr ptr;
507  TimerTask(Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
509  TimerTask(TimerTask&& other) noexcept : ptr(std::move(other.ptr)) { }
510  TimerTask(const TimerTask&) = delete;
511  TimerTask& operator=(TimerTask&&) = delete;
512  TimerTask& operator=(const TimerTask&) = delete;
513  ~TimerTask() noexcept { }
514  void operator()(const asio::error_code& ec) {
515  if(ec != asio::error::operation_aborted)
516  ptr->progress_check_timer();
517  }
518  };
519  m_chk_timer.async_wait(TimerTask(shared()));
520 }
521 
523  bool at = false;
524  if(m_chk_final.compare_exchange_weak(at, true)) {
526  m_chk_timer.cancel();
527  }
528 }
529 
531  if(m_get || m_stopped)
532  return;
533 
534  size_t sz = m_processing;
535  if(sz && (sz >= compactor->cfg_read_ahead->get() ||
536  (sz > Env::Rgr::res().avail_ram()/blk_size &&
537  range->blocks.release(sz * blk_size) < sz * blk_size))) {
538  return;
539  }
540 
541  struct Task {
542  Ptr ptr;
544  Task(Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
546  Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
547  Task(const Task&) = delete;
548  Task& operator=(Task&&) = delete;
549  Task& operator=(const Task&) = delete;
550  void operator()() { ptr->range->scan_internal(ptr); }
551  };
552  if(!m_get.running())
554 }
555 
558  struct NextTask {
559  Ptr ptr;
561  NextTask(Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
563  NextTask(NextTask&& other) noexcept : ptr(std::move(other.ptr)) { }
564  NextTask(const NextTask&) = delete;
565  NextTask& operator=(NextTask&&) = delete;
566  NextTask& operator=(const NextTask&) = delete;
567  ~NextTask() noexcept { }
568  void operator()() { ptr->process_encode(); }
569  };
570  Time::Measure_ns t_measure;
571  _do: {
572  auto in_block(m_q_intval.front());
573  if(!in_block->is_last) {
574  request_more();
575  in_block->finalize_interval(false, false);
576  }
577  bool more = m_q_intval.pop_and_more() && !m_stopped;
578  in_block->_other = nullptr;
579  if(m_q_encode.push_and_is_1st(in_block))
580  Env::Rgr::maintenance_post(NextTask(shared()));
581  if(more)
582  goto _do;
583  }
584  time_intval.fetch_add(t_measure.elapsed());
585  request_more();
586 }
587 
590  struct NextTask {
591  Ptr ptr;
593  NextTask(Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
595  NextTask(NextTask&& other) noexcept : ptr(std::move(other.ptr)) { }
596  NextTask(const NextTask&) = delete;
597  NextTask& operator=(NextTask&&) = delete;
598  NextTask& operator=(const NextTask&) = delete;
599  ~NextTask() noexcept { }
600  void operator()() { ptr->process_write(); }
601  };
602  Time::Measure_ns t_measure;
603  _do: {
604  auto in_block(m_q_encode.front());
605  if(!in_block->is_last) {
606  request_more();
607  in_block->finalize_encode(blk_encoding);
608  }
609  bool more = m_q_encode.pop_and_more() && !m_stopped;
610  in_block->_other = nullptr;
611  if(m_q_write.push_and_is_1st(in_block))
612  Env::Rgr::maintenance_post(NextTask(shared()));
613  if(more)
614  goto _do;
615  }
616  time_encode.fetch_add(t_measure.elapsed());
617  request_more();
618 }
619 
622  Time::Measure_ns t_measure;
623  int err = Error::OK;
624  _do: {
625  auto in_block(m_q_write.front());
626  if(in_block->is_last) {
627  time_write.fetch_add(t_measure.elapsed());
628  return finalize();
629  }
630 
631  if(in_block->err)
632  return quit();
633 
634  write_cells(err, in_block);
635 
636  if(err || !range->is_loaded() || compactor->stopped())
637  return quit();
638 
639  bool more = m_q_write.pop_and_more() && !m_stopped;
640  delete in_block;
641 
643  request_more();
644  if(more)
645  goto _do;
646  }
647 
648  time_write.fetch_add(t_measure.elapsed());
649  request_more();
650 }
651 
653  if(!tmp_dir) {
654  err = Error::OK;
655  auto cs_tmp_dir = range->get_path(Range::CELLSTORES_TMP_DIR);
656  if(Env::FsInterface::interface()->exists(err, cs_tmp_dir) && !err)
657  Env::FsInterface::interface()->rmdir(err, cs_tmp_dir);
658  if(!err)
659  Env::FsInterface::interface()->mkdirs(err, cs_tmp_dir);
660  if(err)
661  return 0;
662  tmp_dir = true;
663  }
664 
665  csid_t csid = cellstores.size() + 1;
666  cs_writer.reset(new CellStore::Write(
667  csid,
668  range->get_path_cs_on(Range::CELLSTORES_TMP_DIR, csid),
669  range,
671  cellstores.empty()
672  ? range->prev_range_end
673  : cellstores.back()->blocks.back()->header.interval.key_end
674  ));
675  cs_writer->create(err, range->cfg->file_replication());
676 
677  if(!m_chk_final) {
678  uint32_t p = range->cfg->compact_percent()/10;
679  if(csid == range->cfg->cellstore_max() * (p ? p : 1)) {
682  if(range->compacting_ifnot_applying(Range::COMPACT_PREPARING)) {
684  "COMPACT-MITIGATE(add req.) " SWC_FMT_LU "/" SWC_FMT_LU
685  " reached cs-max(%u)",
686  range->cfg->cid, range->rid, csid);
687  }
688  }
689  }
690  return csid;
691 }
692 
694 void CompactRange::write_cells(int& err, InBlock* in_block) {
695  if(!cs_writer) {
696  if(create_cs(err) == 1 && range->_is_any_begin())
698  if(err)
699  return;
700  }
701 
702  cs_writer->block_write(err, in_block->cells, std::move(in_block->header));
703  if(!err &&
704  (cs_writer->size >= cs_size ||
705  cs_writer->blocks.size() == CellStore::Read::MAX_BLOCKS - 1)) {
706  add_cs(err);
707  }
708 }
709 
710 void CompactRange::add_cs(int& err) {
711  cs_writer->finalize(err);
712  if(!err) {
715  }
716  cs_writer = nullptr;
717 }
718 
720  auto max = range->cfg->cellstore_max();
721 
723  if(cellstores.size() < (max < 2 ? 2 : max))
724  return 0;
725 
726  auto it = cellstores.cbegin() + 1;
727  if((*it)->size < (cs_size/100) * range->cfg->compact_percent())
728  return -2;
729 
730  size_t at = cellstores.size() / 2;
731 
732  split_option:
733  it = cellstores.cbegin() + at;
734  do {
735  if(!(*it)->blocks.front()->header.interval.key_begin.equal(
736  (*(it-1))->blocks.back()->header.interval.key_end))
737  break;
738  } while(++it != cellstores.cend());
739 
740  if(it == cellstores.cend()) {
741  if(at > 1) {
742  at = 1;
743  goto split_option;
744  }
745  return -1;
746  }
747  return it - cellstores.cbegin();
748 }
749 
751  if(m_stopped)
752  return;
754 
755  if(!range->is_loaded())
756  return quit();
757 
758  Time::Measure_ns t_measure;
759  int err = Error::OK;
760  bool empty_cs = false;
761 
762  {
763  auto ptr = shared();
764  for(uint32_t chk = 0; !m_stopped && ptr.use_count() > 3; ++chk) {
765  // insure sane
766  if(chk == 3000000) {
768  "COMPACT-FINALIZING " SWC_FMT_LU "/" SWC_FMT_LU
769  " use_count=" SWC_FMT_LU,
770  range->cfg->cid, range->rid, size_t(ptr.use_count()));
771  chk = 0;
772  }
773  if(ptr.use_count() > 3)
774  std::this_thread::sleep_for(std::chrono::microseconds(1));
775  }
776  if(m_stopped)
777  return;
778  }
779 
781  // first or/and last block of any-type set with empty-key
782  bool any_begin = false;
783  if(!cs_writer) {
784  any_begin = create_cs(err) == 1 && range->_is_any_begin();
785  if(err)
786  return quit();
787  }
788  m_inblock->finalize_interval(any_begin, range->_is_any_end());
789  cs_writer->block_encode(
790  err, m_inblock->cells, std::move(m_inblock->header));
791 
792  } else if(!cellstores.size() && !cs_writer) {
793  // as an initial empty range cs with range intervals
794  empty_cs = true;
795  create_cs(err); //csid_t csid =
796  if(err)
797  return quit();
798  range->_get_interval(
801  );
806  cs_writer->block_encode(
807  err, m_inblock->cells, std::move(m_inblock->header));
808  }
809  if(err)
810  return quit();
811 
812  if(cs_writer) {
813  if(!cs_writer->size)
814  return quit();
815  add_cs(err);
816  if(err)
817  return quit();
818  }
819  time_write.fetch_add(t_measure.elapsed());
820 
821  range->compacting(Range::COMPACT_APPLYING);
822  range->blocks.wait_processing();
823 
824  ssize_t split_at = can_split_at();
825  if(split_at == -1) {
827  "COMPACT-SPLIT " SWC_FMT_LU "/" SWC_FMT_LU
828  " fail(versions-over-cs) cs-count=" SWC_FMT_LD,
829  range->cfg->cid, range->rid, int64_t(cellstores.size()));
830 
831  } else if(split_at == -2) {
833  "COMPACT-SPLIT " SWC_FMT_LU "/" SWC_FMT_LU
834  " skipping(last-cs-small) cs-count=" SWC_FMT_LD,
835  range->cfg->cid, range->rid, int64_t(cellstores.size()));
836  }
837 
838  split_at > 0
839  ? mngr_create_range(split_at)
840  : apply_new(empty_cs);
841 
842 }
843 
844 void CompactRange::mngr_create_range(uint32_t split_at) {
845  struct ReqData {
846  CompactRange::Ptr ptr;
847  uint32_t split_at;
849  ReqData(const CompactRange::Ptr& a_ptr, uint32_t a_split_at)
850  noexcept : ptr(a_ptr), split_at(a_split_at) {
851  }
852  ~ReqData() noexcept { }
854  cid_t get_cid() const noexcept {
855  return ptr->range->cfg->cid;
856  }
858  client::Clients::Ptr& get_clients() noexcept {
859  return Env::Clients::get();
860  }
862  bool valid() noexcept {
863  return !ptr->m_stopped && !Env::Rgr::is_not_accepting();
864  }
866  void callback(
870  "Compact::Mngr::Req::RangeCreate err=%d(%s) "
872  rsp.err, Error::get_text(rsp.err), get_cid(), rsp.rid);
873 
874  if(rsp.err && valid() &&
875  rsp.err != Error::CLIENT_STOPPING &&
876  rsp.err != Error::COLUMN_NOT_EXISTS &&
878  rsp.err != Error::COLUMN_NOT_READY) {
879  req->request_again();
880  return;
881  }
882  if((!rsp.err || rsp.err == Error::COLUMN_NOT_READY) && rsp.rid)
883  ptr->split(rsp.rid, split_at);
884  else
885  ptr->apply_new();
886  }
887  };
890  range->cfg->cid, Env::Rgr::rgr_data()->rgrid),
891  10000,
892  shared(), split_at
893  );
894 }
895 
896 
897 template<bool clear>
901  TaskFinished(Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
903  TaskFinished(TaskFinished&& other) noexcept : ptr(std::move(other.ptr)) { }
904  TaskFinished(const TaskFinished&) = delete;
907  ~TaskFinished() noexcept { }
908  void operator()() { ptr->finished(clear); }
909 };
910 
911 
912 void CompactRange::split(rid_t new_rid, uint32_t split_at) {
913  ColumnPtr col = Env::Rgr::columns()->get_column(range->cfg->cid);
914  if(!col || col->removing())
915  return quit();
916 
917  Time::Measure_ns t_measure;
919  "COMPACT-SPLIT " SWC_FMT_LU "/" SWC_FMT_LU " new-rid=" SWC_FMT_LU,
920  range->cfg->cid, range->rid, new_rid);
921 
922  int err = Error::OK;
923  auto new_range = col->internal_create(err, new_rid, true);
924  if(!err)
925  new_range->internal_create_folders(err);
926  if(err) {
928  "COMPACT-SPLIT cancelled err=%d " SWC_FMT_LU "/" SWC_FMT_LU
929  " new-rid=" SWC_FMT_LU,
930  err, range->cfg->cid, range->rid, new_rid);
931  err = Error::OK;
932  new_range->compacting(Range::COMPACT_NONE);
933  col->internal_remove(err, new_rid);
934  mngr_remove_range(new_range);
935  return apply_new();
936  }
937 
938  CellStore::Writers new_cellstores;
939  auto it = cellstores.cbegin() + split_at;
940  new_cellstores.assign(it, cellstores.cend());
942 
943  new_range->internal_create(err, new_cellstores);
944  if(!err)
945  range->apply_new(err, cellstores, fragments_old);
946 
947  if(err) {
948  err = Error::OK;
949  new_range->compacting(Range::COMPACT_NONE);
950  col->internal_remove(err, new_rid);
951  mngr_remove_range(new_range);
952  return quit();
953  }
954 
955  if(range->blocks.commitlog.cells_count()) {
956  /* split latest fragments to new_range from new interval key_end */
957 
959  range->blocks.commitlog.commit_finalize();
960  range->blocks.commitlog.get(fragments_old); // fragments for removal
961 
962  CommitLog::Splitter splitter(
963  cellstores.back()->blocks.back()->header.interval.key_end,
965  &range->blocks.commitlog,
966  &new_range->blocks.commitlog
967  );
968 
969  splitter.run();
970  range->blocks.commitlog.remove(err, fragments_old);
971 
972  range->blocks.commitlog.commit_finalize();
973  new_range->blocks.commitlog.commit_finalize();
974  }
975 
976  new_range->expand_and_align(false, Query::Update::CommonMeta::make(
977  new_range,
978  [col, ptr=shared()] (const Query::Update::CommonMeta::Ptr& hdlr) {
980  "COMPACT-SPLIT " SWC_FMT_LU "/" SWC_FMT_LU
981  " unloading new-rid=" SWC_FMT_LU " reg-err=%d(%s)",
982  col->cfg->cid, ptr->range->rid, hdlr->range->rid,
983  hdlr->error(), Error::get_text(hdlr->error()));
984  hdlr->range->compacting(Range::COMPACT_NONE);
985  col->internal_unload(hdlr->range->rid);
986 
987  struct ReqData {
988  CompactRange::Ptr ptr;
989  const rid_t new_rid;
991  ReqData(const CompactRange::Ptr& a_ptr, rid_t a_new_rid)
992  noexcept : ptr(a_ptr), new_rid(a_new_rid) {
993  }
994  ~ReqData() noexcept { }
996  cid_t get_cid() const noexcept {
997  return ptr->range->cfg->cid;
998  }
1000  client::Clients::Ptr& get_clients() noexcept {
1001  return Env::Clients::get();
1002  }
1004  bool valid() noexcept {
1005  return !Env::Rgr::is_not_accepting();
1006  }
1008  void callback(
1012  "Compact::Mngr::Req::RangeUnloaded err=%d(%s)"
1013  SWC_FMT_LU "/" SWC_FMT_LU,
1014  rsp.err, Error::get_text(rsp.err), get_cid(), new_rid);
1015  if(rsp.err && valid() &&
1016  rsp.err != Error::CLIENT_STOPPING &&
1017  rsp.err != Error::COLUMN_NOT_EXISTS &&
1019  rsp.err != Error::COLUMN_NOT_READY) {
1020  req->request_again();
1021  }
1022  }
1023  };
1026  hdlr->range->cfg->cid, hdlr->range->rid),
1027  10000,
1028  ptr, hdlr->range->rid
1029  );
1030  }));
1031 
1032  range->expand_and_align(true, Query::Update::CommonMeta::make(
1033  range,
1034  [t_measure, ptr=shared()]
1035  (const Query::Update::CommonMeta::Ptr&) mutable {
1038  "COMPACT-SPLITTED " SWC_FMT_LU "/" SWC_FMT_LU
1039  " took=" SWC_FMT_LU "ns new-end=",
1040  ptr->range->cfg->cid, ptr->range->rid, t_measure.elapsed());
1041  ptr->cellstores.back()->blocks.back()
1042  ->header.interval.key_end.print(SWC_LOG_OSTREAM);
1043  );
1045  }
1046  ));
1047 }
1048 
1049 void CompactRange::apply_new(bool clear) {
1050  int err = Error::OK;
1052  if(clear) {
1053  cb = [ptr=shared()] (const Query::Update::CommonMeta::Ptr&) mutable {
1055  };
1056  } else {
1057  cb = [ptr=shared()] (const Query::Update::CommonMeta::Ptr&) mutable {
1059  };
1060  }
1061  range->apply_new(
1062  err, cellstores, fragments_old,
1063  Query::Update::CommonMeta::make(range, std::move(cb))
1064  );
1065  if(err)
1066  return quit();
1067 }
1068 
1070  bool at = false;
1071  if(!m_stopped.compare_exchange_weak(at, true))
1072  return false;
1073 
1074  stop_check_timer();
1075 
1076  auto ptr = shared();
1077  for(uint32_t chk = 0; ptr.use_count() > 3; ++chk) { // insure sane
1078  if(chk == 3000000) {
1080  "COMPACT-STOPPING " SWC_FMT_LU "/" SWC_FMT_LU
1081  " use_count=" SWC_FMT_LU,
1082  range->cfg->cid, range->rid, size_t(ptr.use_count()));
1083  chk = 0;
1084  }
1085  if(ptr.use_count() > 3)
1086  std::this_thread::sleep_for(std::chrono::microseconds(1));
1087  }
1088  return true;
1089 }
1090 
1091 void CompactRange::finished(bool clear) {
1092  bool ok = completion();
1093  profile.finished();
1094 
1097  "COMPACT-FINISHED " SWC_FMT_LU "/" SWC_FMT_LU
1098  " cells=" SWC_FMT_LU " blocks=" SWC_FMT_LU
1099  " (total=" SWC_FMT_LD " intval=" SWC_FMT_LU
1100  " encode=" SWC_FMT_LU " write=" SWC_FMT_LU ")ms ",
1101  range->cfg->cid, range->rid, total_cells.load(), total_blocks.load(),
1102  (Time::now_ns() - profile.ts_start)/1000000,
1103  time_intval/1000000, time_encode/1000000, time_write/1000000
1104  );
1106  );
1107 
1108  range->compact_require(false);
1109  if(ok)
1110  compactor->compacted(shared(), range, clear);
1111 }
1112 
1114  if(!completion())
1115  return;
1116 
1117  int err = Error::OK;
1118  if(cs_writer) {
1119  cs_writer->remove(err);
1120  cs_writer = nullptr;
1121  }
1122  if(tmp_dir) {
1123  tmp_dir = false;
1124  Env::FsInterface::interface()->rmdir(
1125  err, range->get_path(Range::CELLSTORES_TMP_DIR));
1126  }
1127  SWC_LOGF(LOG_INFO, "COMPACT-ERROR cancelled " SWC_FMT_LU "/" SWC_FMT_LU,
1128  range->cfg->cid, range->rid);
1129 
1131 }
1132 
1133 
1134 
1135 
1136 
1137 
1138 }}
SWC::Core::Vector::erase
SWC_CAN_INLINE iterator erase(size_type offset) noexcept(_NoExceptMoveAssign &&_NoExceptDestructor)
Definition: Vector.h:464
SWC::Ranger::CellStore::Block::Header
Definition: CellStoreBlockHeader.h:23
SWC::Core::BufferDyn::ptr
value_type * ptr
Definition: Buffer.h:293
SWC::Core::AtomicBase::compare_exchange_weak
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
Definition: Atomic.h:52
SWC::Config::Property::Value_uint8_g::get
SWC_CAN_INLINE uint8_t get() const noexcept
Definition: Property.h:535
SWC::Env::Rgr::res
static SWC_CAN_INLINE System::Resources & res() noexcept
Definition: RangerEnv.h:131
SWC::Core::StateRunning::stop
constexpr SWC_CAN_INLINE void stop() noexcept
Definition: StateRunning.h:32
SWC::DB::Cells::Interval::key_end
DB::Cell::Key key_end
Definition: Interval.h:225
SWC::Ranger::CompactRange::stop_check_timer
void stop_check_timer()
Definition: CompactRange.cc:522
SWC::Ranger::ReqScan
Definition: ReqScan.h:21
SWC::Ranger::CompactRange::InBlock
Definition: CompactRange.cc:17
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Ranger::CommitLog::Compact
Definition: CommitLogCompact.h:14
SWC::Comm::Protocol::Mngr::Params::RangeUnloadedRsp::err
int err
Definition: RangeUnloaded.h:78
SWC::DB::Specs::Flags::max_versions
uint32_t max_versions
Definition: SpecsFlags.h:143
RangeSplit.h
SWC::System::Resources::avail_ram
SWC_CAN_INLINE size_t avail_ram() const noexcept
Definition: Resources.h:94
SWC::DB::Specs::Interval::key_intervals
KeyIntervals key_intervals
Definition: SpecsInterval.h:239
SWC::Ranger::CellStore::Block::Write::encode
static void encode(int &err, DynamicBuffer &cells, DynamicBuffer &output, Header &header)
Definition: CellStoreBlock.cc:370
CommitLogSplitter.h
SWC::Ranger::CellStore::Block::Header::is_any
uint8_t is_any
Definition: CellStoreBlockHeader.h:31
SWC::Ranger::CompactRange::cs_size
const uint32_t cs_size
Definition: CompactRange.h:100
SWC::Ranger::CompactRange::InBlock::is_last
const bool is_last
Definition: CompactRange.cc:19
SWC::Ranger::CompactRange::m_chk_timer
asio::high_resolution_timer m_chk_timer
Definition: CompactRange.h:133
SWC::Env::Clients::get
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
Definition: Clients.h:299
SWC::Ranger::CompactRange::TaskFinished::TaskFinished
SWC_CAN_INLINE TaskFinished(Ptr &&a_ptr) noexcept
Definition: CompactRange.cc:901
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Env::Rgr::maintenance_post
static SWC_CAN_INLINE void maintenance_post(T_Handler &&handler)
Definition: RangerEnv.h:120
SWC::Core::Vector::clear
SWC_CAN_INLINE void clear() noexcept(_NoExceptDestructor)
Definition: Vector.h:120
SWC::Ranger::CompactRange::finished
void finished(bool clear)
Definition: CompactRange.cc:1091
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Time::now_ns
SWC_CAN_INLINE int64_t now_ns() noexcept
Definition: Time.h:43
RangeRemove.h
SWC::Ranger::CompactRange::commitlog_done
void commitlog_done(const CommitLog::Compact *compact)
Definition: CompactRange.cc:445
SWC::DB::Cells::ReqScan::Profile::print
void print(std::ostream &out) const
Definition: ReqScan.h:177
SWC::Ranger::CompactRange::cellstores
CellStore::Writers cellstores
Definition: CompactRange.h:107
SWC::Ranger::CompactRange::TaskFinished::~TaskFinished
~TaskFinished() noexcept
Definition: CompactRange.cc:907
SWC::Comm::Protocol::FsBroker::Handler::exists
void exists(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Exists.h:17
SWC::Env::Rgr::rgr_data
static SWC_CAN_INLINE DB::RgrData * rgr_data() noexcept
Definition: RangerEnv.h:48
SWC::Error::get_text
const char * get_text(const int err) noexcept
Definition: Error.cc:173
SWC::Comm::Protocol::Mngr::Params::RangeCreateReq
Definition: RangeCreate.h:16
SWC::DB::Cells::Interval::align
SWC_CAN_INLINE bool align(const Interval &other)
Definition: Interval.h:152
SWC_LOG_PRINTF
#define SWC_LOG_PRINTF(fmt,...)
Definition: Logger.h:165
SWC::Core::Encoder::Type
Type
Definition: Encoder.h:28
SWC::Ranger::CompactRange::TaskFinished::ptr
Ptr ptr
Definition: CompactRange.cc:899
SWC::Condition::GT
@ GT
Definition: Comparators.h:30
SWC::Core::BufferDyn::mark
value_type * mark
Definition: Buffer.h:294
SWC::Env::Rgr::columns
static SWC_CAN_INLINE Ranger::Columns * columns() noexcept
Definition: RangerEnv.h:144
SWC::Env::FsInterface::interface
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
Definition: Interface.h:150
SWC::DB::Cells::Interval::expand_end
SWC_CAN_INLINE void expand_end(const Cell &cell)
Definition: Interval.h:136
SWC::Ranger::CompactRange::split
void split(rid_t new_rid, uint32_t split_at)
Definition: CompactRange.cc:912
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::Ranger::CompactRange::process_write
void process_write()
Definition: CompactRange.cc:621
SWC::client::Clients::Ptr
ClientsPtr Ptr
Definition: Clients.h:58
SWC::csid_t
uint32_t csid_t
Definition: Identifiers.h:19
SWC::Ranger::CompactRange::m_log_sz
size_t m_log_sz
Definition: CompactRange.h:132
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Ranger::Compaction::compacted
void compacted(const CompactRange::Ptr req, const RangePtr &range, bool all=false)
Definition: Compaction.cc:234
SWC::Comm::Protocol::Mngr::Params::RangeCreateRsp
Definition: RangeCreate.h:59
SWC::Ranger::CompactRange::InBlock::InBlock
SWC_CAN_INLINE InBlock(const DB::Types::KeySeq key_seq) noexcept
Definition: CompactRange.cc:22
SWC::Ranger::CompactRange::response
void response(int &err) override
Definition: CompactRange.cc:313
SWC::Ranger::CellStore::Block::Header::interval
DB::Cells::Interval interval
Definition: CellStoreBlockHeader.h:30
SWC::Ranger::CompactRange::with_block
bool SWC_CONST_FUNC with_block() const noexcept override
Definition: CompactRange.cc:287
SWC::Ranger::CompactRange::commitlog
void commitlog(uint32_t tnum)
Definition: CompactRange.cc:423
SWC::Ranger::Columns::get_column
ColumnPtr get_column(const cid_t cid)
Definition: Columns.cc:14
SWC::Ranger::CompactRange::is_slow_req
bool is_slow_req(int64_t &median) const
Definition: CompactRange.cc:416
SWC::DB::Cells::Interval::expand
void expand(const Interval &other)
Definition: Interval.cc:38
SWC::Ranger::CompactRange::Ptr
std::shared_ptr< CompactRange > Ptr
Definition: CompactRange.h:14
SWC::Ranger::CompactRange::mngr_create_range
void mngr_create_range(uint32_t split_at)
Definition: CompactRange.cc:844
SWC::DB::Cells::Cell
Definition: Cell.h:92
SWC::Ranger::Query::Update::CommonMeta::Cb_t
std::function< void(const Ptr &)> Cb_t
Definition: CommonMeta.h:16
SWC::Ranger::CompactRange::m_mutex
Core::MutexSptd m_mutex
Definition: CompactRange.h:131
SWC::Ranger::CompactRange::InBlock::set_offset
SWC_CAN_INLINE void set_offset(DB::Specs::Interval &spec) const
Definition: CompactRange.cc:60
SWC::Ranger::CompactRange::blk_cells
const uint32_t blk_cells
Definition: CompactRange.h:101
SWC::DB::RgrData::rgrid
Core::Atomic< rgrid_t > rgrid
Definition: RgrData.h:28
SWC::Ranger::CompactRange::range
RangePtr range
Definition: CompactRange.h:99
CommitLogCompact.h
SWC::Ranger::CompactRange::InBlock::InBlock
SWC_CAN_INLINE InBlock(const DB::Types::KeySeq key_seq, size_t size, InBlock *inblock=nullptr)
Definition: CompactRange.cc:29
SWC::DB::Cells::Cell::key
DB::Cell::Key key
Definition: Cell.h:357
SWC::Ranger::CommitLog::Splitter
Definition: CommitLogSplitter.h:14
SWC::Ranger::CompactRange::InBlock::err
int err
Definition: CompactRange.cc:118
SWC::Time::Measure::elapsed
SWC_CAN_INLINE uint64_t elapsed() const noexcept
Definition: Time.h:74
SWC::Ranger::CompactRange::blk_encoding
const DB::Types::Encoder blk_encoding
Definition: CompactRange.h:102
SWC::DB::Cells::ReqScan::profile
Profile profile
Definition: ReqScan.h:194
SWC::Ranger::ColumnPtr
std::shared_ptr< Column > ColumnPtr
Definition: Columns.h:13
SWC::Ranger::RangeSplit
Definition: RangeSplit.h:69
SWC::Comm::Protocol::Mngr::Req::RangeUnloaded::request
static SWC_CAN_INLINE void request(const Params::RangeUnloadedReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: RangeUnloaded.h:36
SWC::Ranger::CellStore::Block::Header::ANY_BEGIN
static const uint8_t ANY_BEGIN
Definition: CellStoreBlockHeader.h:26
SWC::Ranger::CompactRange::m_get
Core::StateRunning m_get
Definition: CompactRange.h:129
SWC::Ranger::Range::COMPACT_PREPARING
static const uint8_t COMPACT_PREPARING
Definition: Range.h:54
SWC::DB::Types::KeySeq
KeySeq
Definition: KeySeq.h:13
SWC::Ranger::CompactRange::~CompactRange
virtual ~CompactRange() noexcept
Definition: CompactRange.cc:157
SWC::DB::Cell::Key::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: CellKey.h:186
SWC::Ranger::CompactRange::m_q_encode
Core::QueuePointer< InBlock * > m_q_encode
Definition: CompactRange.h:112
SWC::Ranger::CompactRange::total_cells
Core::Atomic< uint64_t > total_cells
Definition: CompactRange.h:115
SWC::Ranger::CompactRange::m_q_intval
Core::QueuePointer< InBlock * > m_q_intval
Definition: CompactRange.h:111
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Ranger::CellStore::Block::Header::cells_count
uint32_t cells_count
Definition: CellStoreBlockHeader.h:36
SWC::Ranger::CompactRange::compactor
Compaction * compactor
Definition: CompactRange.h:98
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Ranger::CompactRange::initial_commitlog_done
void initial_commitlog_done(const CommitLog::Compact *compact)
Definition: CompactRange.cc:245
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC::Ranger::CellStore::Read::MAX_BLOCKS
static constexpr uint32_t MAX_BLOCKS
Definition: CellStore.h:47
SWC::Ranger::CommitLog::Compact::nfrags
size_t nfrags
Definition: CommitLogCompact.h:65
SWC::Ranger::ReqScan::Type
Type
Definition: ReqScan.h:24
SWC::Ranger::RangePtr
std::shared_ptr< Range > RangePtr
Definition: Columns.h:15
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::DB::Cells::Cell::is_time_order_desc
constexpr SWC_CAN_INLINE bool is_time_order_desc() const noexcept
Definition: Cell.h:177
SWC::Comm::Protocol::Mngr::Params::RangeUnloadedReq
Definition: RangeUnloaded.h:16
SWC::Core::Vector::back
constexpr SWC_CAN_INLINE reference back() noexcept
Definition: Vector.h:254
SWC::Ranger::CompactRange::shared
Ptr shared()
Definition: CompactRange.cc:170
SWC::Ranger::Query::Update::CommonMeta::Ptr
std::shared_ptr< CommonMeta > Ptr
Definition: CommonMeta.h:15
SWC::Ranger::mngr_remove_range
static void mngr_remove_range(const RangePtr &new_range)
Definition: RangeSplit.h:17
SWC::Ranger::CompactRange::apply_new
void apply_new(bool clear=false)
Definition: CompactRange.cc:1049
SWC::DB::Specs::KeyIntervals::add
KeyInterval & add()
Definition: SpecsKeyIntervals.cc:28
SWC::DB::Cells::ReqScan::Profile::finished
SWC_CAN_INLINE void finished()
Definition: ReqScan.h:130
SWC::Ranger::CompactRange::req_ts
Core::Atomic< int64_t > req_ts
Definition: CompactRange.h:124
SWC::Core::Buffer::base
value_type * base
Definition: Buffer.h:131
SWC::Ranger::CompactRange::InBlock::add
SWC_CAN_INLINE void add(const DB::Cells::Cell &cell)
Definition: CompactRange.cc:52
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::DB::Cells::ReqScan::Profile::cells_count
uint64_t cells_count
Definition: ReqScan.h:114
SWC::Ranger::CompactRange::can_split_at
ssize_t can_split_at()
Definition: CompactRange.cc:719
SWC::Ranger::CompactRange::InBlock::InBlock
InBlock(const InBlock &&)=delete
SWC::Ranger::CompactRange::m_required_key_last
DB::Cell::Key m_required_key_last
Definition: CompactRange.h:103
SWC::Core::BufferDyn::set_mark
constexpr SWC_CAN_INLINE void set_mark() noexcept
Definition: Buffer.h:202
SWC::Ranger::CompactRange::InBlock::cell_avg_size
SWC_CAN_INLINE size_t cell_avg_size() const
Definition: CompactRange.cc:47
SWC::Ranger::CompactRange::add_cs
void add_cs(int &err)
Definition: CompactRange.cc:710
SWC::Time::Measure
Definition: Time.h:55
SWC::Ranger::CompactRange::m_inblock
InBlock * m_inblock
Definition: CompactRange.h:108
SWC::Core::BufferDyn< StaticBuffer >
size
uint32_t size
Buffer size.
Definition: HeaderBufferInfo.h:47
SWC::Ranger::CompactRange::InBlock::move_last
SWC_CAN_INLINE void move_last(InBlock *to)
Definition: CompactRange.cc:69
SWC::Condition::EQ
@ EQ
Definition: Comparators.h:32
SWC::Ranger::CompactRange::m_q_write
Core::QueuePointer< InBlock * > m_q_write
Definition: CompactRange.h:113
SWC::DB::Cells::ReqScan::Profile::ts_start
const int64_t ts_start
Definition: ReqScan.h:112
SWC::Ranger::CompactRange::add_cell_and_more
bool add_cell_and_more(const DB::Cells::Cell &cell) override
Definition: CompactRange.cc:306
SWC::Ranger::CompactRange::fragments_old
CommitLog::Fragments::Vec fragments_old
Definition: CompactRange.h:19
SWC::Ranger::CompactRange::TaskFinished::operator=
TaskFinished & operator=(const TaskFinished &)=delete
SWC::DB::Cell::Key::copy
void copy(const Key &other)
Definition: CellKey.h:314
SWC::Ranger::CompactRange::CompactRange
CompactRange(Compaction *compactor, const RangePtr &range, const uint32_t cs_size, const uint32_t blk_size)
Definition: CompactRange.cc:126
SWC::Ranger::CommitLog::Compact::repetition
const uint32_t repetition
Definition: CommitLogCompact.h:63
SWC::Ranger::Range::CELLSTORES_TMP_DIR
static constexpr const char CELLSTORES_TMP_DIR[]
Definition: Range.h:39
SWC::Ranger::CompactRange::InBlock::finalize_encode
SWC_CAN_INLINE void finalize_encode(DB::Types::Encoder encoding)
Definition: CompactRange.cc:105
SWC::Comm::ResponseCallback::Ptr
std::shared_ptr< ResponseCallback > Ptr
Definition: ResponseCallback.h:18
SWC::Ranger::CompactRange::completion
bool completion()
Definition: CompactRange.cc:1069
SWC::Ranger::Compaction::stopped
bool stopped()
Definition: Compaction.cc:101
SWC::Comm::Protocol::Mngr::Params::RangeUnloadedReq::rid
rid_t rid
Definition: RangeUnloaded.h:27
SWC::Ranger::Compaction::cfg_read_ahead
const Config::Property::Value_uint8_g::Ptr cfg_read_ahead
Definition: Compaction.h:24
SWC::Error::COLUMN_NOT_READY
@ COLUMN_NOT_READY
Definition: Error.h:99
SWC::Core::StateRunning::running
constexpr SWC_CAN_INLINE bool running() noexcept
Definition: StateRunning.h:37
SWC::Ranger::CompactRange::process_encode
void process_encode()
Definition: CompactRange.cc:589
SWC::Comm::Protocol::Mngr::Req::RangeCreate::request
static SWC_CAN_INLINE void request(const Params::RangeCreateReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: RangeCreate.h:36
SWC::Ranger::CompactRange::TaskFinished::TaskFinished
TaskFinished(const TaskFinished &)=delete
SWC::Error::CLIENT_STOPPING
@ CLIENT_STOPPING
Definition: Error.h:127
SWC::Ranger::CompactRange::progress_check_timer
void progress_check_timer()
Definition: CompactRange.cc:484
SWC::Ranger::RangeSplit::run
int run()
Definition: RangeSplit.h:89
SWC::DB::Cells::Cell::read
void read(const uint8_t **bufp, size_t *remainp, bool owner)
Definition: Cell.h:484
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
RangeUnloaded.h
SWC::DB::Specs::Interval::flags
Flags flags
Definition: SpecsInterval.h:242
SWC::Ranger::Range::COMPACT_NONE
static const uint8_t COMPACT_NONE
Definition: Range.h:51
SWC::Core::BufferDyn::free
SWC_CAN_INLINE void free()
Definition: Buffer.h:171
SWC::Ranger::CompactRange::InBlock::~InBlock
~InBlock() noexcept
Definition: CompactRange.cc:44
SWC::cid_t
uint64_t cid_t
Definition: Identifiers.h:16
SWC::Ranger::CompactRange::finalize
void finalize()
Definition: CompactRange.cc:750
SWC::Ranger::CompactRange::time_encode
Core::Atomic< uint64_t > time_encode
Definition: CompactRange.h:119
SWC::Error::COLUMN_NOT_EXISTS
@ COLUMN_NOT_EXISTS
Definition: Error.h:100
SWC::Core::QueuePointer
Definition: QueuePointer.h:15
SWC::DB::Specs::Interval::range_end
Cell::Key range_end
Definition: SpecsInterval.h:238
SWC::Ranger::Range::COMPACT_APPLYING
static const uint8_t COMPACT_APPLYING
Definition: Range.h:55
SWC::Ranger::Range
Definition: Range.h:27
SWC::Core::Vector
Definition: Vector.h:14
SWC::DB::Cells::ReqScan::spec
DB::Specs::Interval spec
Definition: ReqScan.h:106
SWC::Ranger::CompactRange::m_chk_final
Core::AtomicBool m_chk_final
Definition: CompactRange.h:127
SWC::Ranger::CompactRange::selector
bool selector(const DB::Types::KeySeq key_seq, const DB::Cells::Cell &cell, bool &stop) override
Definition: CompactRange.cc:291
SWC::Comm::Protocol::Mngr::Params::RangeUnloadedRsp
Definition: RangeUnloaded.h:59
SWC::Core::BufferDyn::take_ownership
void take_ownership(BufferDyn< BufferT > &other) noexcept
Definition: Buffer.h:272
SWC::DB::Cells::Cell::write
void write(DynamicBuffer &dst_buf, bool no_value=false) const
Definition: Cell.h:513
SWC::Core::BufferDyn::fill
constexpr SWC_CAN_INLINE size_t fill() const noexcept
Definition: Buffer.h:192
SWC::DB::Specs::Interval
Definition: SpecsInterval.h:25
SWC::DB::Cells::Interval::expand_begin
SWC_CAN_INLINE void expand_begin(const Cell &cell)
Definition: Interval.h:129
SWC::DB::Cells::Cell::get_timestamp
constexpr SWC_CAN_INLINE int64_t get_timestamp() const noexcept
Definition: Cell.h:317
SWC::Core::AtomicBase::load
constexpr SWC_CAN_INLINE T load() const noexcept
Definition: Atomic.h:42
SWC::Ranger::CellStore::Block::Header::encoder
DB::Types::Encoder encoder
Definition: CellStoreBlockHeader.h:33
SWC::Ranger::CompactRange::m_processing
Core::Atomic< size_t > m_processing
Definition: CompactRange.h:110
SWC::Core::Vector::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::DB::Specs::KeyIntervals::is_matching_start
bool is_matching_start(const Types::KeySeq key_seq, const DB::Cell::Key &cellkey) const
Definition: SpecsKeyIntervals.h:229
SWC::Ranger::CompactRange::total_blocks
Core::Atomic< uint64_t > total_blocks
Definition: CompactRange.h:116
SWC::Ranger::CompactRange::InBlock::cells
DynamicBuffer cells
Definition: CompactRange.cc:115
SWC::DB::Specs::Interval::offset_rev
int64_t offset_rev
Definition: SpecsInterval.h:245
SWC::Error::COLUMN_MARKED_REMOVED
@ COLUMN_MARKED_REMOVED
Definition: Error.h:102
SWC::Comm::Protocol::Mngr::Params::RangeCreateRsp::err
int err
Definition: RangeCreate.h:78
SWC::Ranger::CompactRange::time_write
Core::Atomic< uint64_t > time_write
Definition: CompactRange.h:120
SWC::rid_t
uint64_t rid_t
Definition: Identifiers.h:17
SWC::DB::Specs::Interval::offset_key
Cell::Key offset_key
Definition: SpecsInterval.h:244
SWC::Ranger::CompactRange::req_last_time
Core::Atomic< int64_t > req_last_time
Definition: CompactRange.h:123
SWC::Env::Rgr::is_not_accepting
static SWC_CAN_INLINE bool is_not_accepting() noexcept
Definition: RangerEnv.h:53
SWC::Ranger::CompactRange::quit
void quit()
Definition: CompactRange.cc:1113
SWC::Ranger::CompactRange::TaskFinished::TaskFinished
SWC_CAN_INLINE TaskFinished(TaskFinished &&other) noexcept
Definition: CompactRange.cc:903
SWC::Ranger::CompactRange::InBlock::header
CellStore::Block::Header header
Definition: CompactRange.cc:116
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Ranger::ReqScan::blk_size
const uint32_t blk_size
Definition: ReqScan.h:91
SWC::Ranger::CompactRange::InBlock::last_cell
const uint8_t * last_cell
Definition: CompactRange.cc:122
SWC::Ranger::CompactRange::initialize
void initialize()
Definition: CompactRange.cc:174
SWC::Ranger::CompactRange::process_interval
void process_interval()
Definition: CompactRange.cc:557
SWC::DB::Cell::Key::print
void print(std::ostream &out) const
Definition: CellKey.cc:182
SWC::Ranger::CommitLog::Compact::ngroups
size_t ngroups
Definition: CommitLogCompact.h:64
SWC::Ranger::CompactRange::InBlock::InBlock
InBlock(const InBlock &)=delete
SWC::Ranger::CompactRange::initial_commitlog
void initial_commitlog(uint32_t tnum)
Definition: CompactRange.cc:225
SWC::Ranger::Compaction
Definition: Compaction.h:21
SWC::Ranger::CompactRange::TaskFinished::operator=
TaskFinished & operator=(TaskFinished &&)=delete
SWC::Comm::client::ConnQueueReqBase::Ptr
std::shared_ptr< ConnQueueReqBase > Ptr
Definition: ClientConnQueue.h:25
SWC::Core::Vector::assign
SWC_CAN_INLINE void assign(IteratorT first, IteratorT last)
Definition: Vector.h:452
SWC::DB::Specs::Interval::set_opt__key_equal
constexpr SWC_CAN_INLINE void set_opt__key_equal() noexcept
Definition: SpecsInterval.h:148
SWC::Ranger::CompactRange::InBlock::operator=
InBlock & operator=(const InBlock &)=delete
SWC::Ranger::CompactRange::cs_writer
CellStore::Write::Ptr cs_writer
Definition: CompactRange.h:106
SWC::Core::Atomic::fetch_sub
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
Definition: Atomic.h:88
SWC::Ranger::CellStore::Block::Header::ANY_END
static const uint8_t ANY_END
Definition: CellStoreBlockHeader.h:27
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::DB::Cells::ReqScan::Profile::add_cell
SWC_CAN_INLINE void add_cell(uint32_t bytes) noexcept
Definition: ReqScan.h:135
SWC::Ranger::CompactRange::InBlock::finalize_interval
void finalize_interval(bool any_begin, bool any_end)
Definition: CompactRange.cc:80
SWC::Error::print
void print(std::ostream &out, int err)
Definition: Error.cc:191
SWC::Ranger::CompactRange::time_intval
Core::Atomic< uint64_t > time_intval
Definition: CompactRange.h:118
SWC::Ranger::CompactRange::operator=
CompactRange & operator=(const CompactRange &)=delete
SWC_FMT_LD
#define SWC_FMT_LD
Definition: Compat.h:99
SWC::Core::Vector::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Core::Atomic::fetch_add
constexpr SWC_CAN_INLINE T fetch_add(T v) noexcept
Definition: Atomic.h:93
SWC::Ranger::CompactRange::state_default
Core::Atomic< uint8_t > state_default
Definition: CompactRange.h:122
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::DB::Specs::Interval::is_matching
SWC_CAN_INLINE bool is_matching(const Types::KeySeq key_seq, const Cell::Key &key, int64_t timestamp, bool desc) const
Definition: SpecsInterval.h:91
SWC::Ranger::CompactRange::request_more
void request_more()
Definition: CompactRange.cc:530
SWC::Ranger::CompactRange::tmp_dir
bool tmp_dir
Definition: CompactRange.h:105
SWC::Ranger::CommitLog::Splitter::run
void run()
Definition: CommitLogSplitter.h:32
SWC::Ranger::CompactRange::TaskFinished
Definition: CompactRange.cc:898
SWC::Ranger::CompactRange::reached_limits
bool reached_limits() override
Definition: CompactRange.cc:299
SWC::Ranger::CompactRange::write_cells
void write_cells(int &err, InBlock *inblock)
Definition: CompactRange.cc:694
SWC::Ranger::CompactRange::create_cs
csid_t create_cs(int &err)
Definition: CompactRange.cc:652
SWC::DB::KeySeq::compare
Condition::Comp compare(const Types::KeySeq seq, const Cell::Key &key, const Cell::Key &other) SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: KeyComparator.h:294
SWC::Comm::Protocol::Mngr::Params::RangeCreateRsp::rid
rid_t rid
Definition: RangeCreate.h:79
RangeCreate.h
SWC::DB::Cells::Interval::key_begin
DB::Cell::Key key_begin
Definition: Interval.h:224
SWC::Ranger::CompactRange::m_stopped
Core::AtomicBool m_stopped
Definition: CompactRange.h:126
SWC::Ranger::CellStore::Write
Definition: CellStore.h:128
SWC::Ranger::CompactRange::TaskFinished::operator()
void operator()()
Definition: CompactRange.cc:908
SWC::Ranger::Query::Update::CommonMeta::make
static SWC_CAN_INLINE Ptr make(const RangePtr &range, Cb_t &&cb)
Definition: CommonMeta.h:19