15 namespace SWC {
namespace Ranger {
35 inblock->move_last(
this);
83 bool set_begin =
true;
87 cell.
read(&ptr, &remain,
false);
128 const uint32_t a_cs_size,
129 const uint32_t a_blk_size)
132 a_compactor->cfg_read_ahead->get()/2, a_blk_size
135 compactor(a_compactor), range(a_range),
137 blk_cells(range->cfg->block_cells()),
138 blk_encoding(range->cfg->block_enc()),
139 m_required_key_last(),
140 tmp_dir(false), cs_writer(nullptr),
142 m_inblock(new
InBlock(range->cfg->key_seq, a_blk_size)),
144 m_q_intval(), m_q_encode(), m_q_write(),
145 total_cells(0), total_blocks(0),
146 time_intval(0), time_encode(0), time_write(0),
147 state_default(
Range::COMPACT_COMPACTING),
148 req_last_time(0), req_ts(Time::
now_ns()),
149 m_stopped(false), m_chk_final(false),
151 m_mutex(), m_log_sz(0),
153 asio::high_resolution_timer(Env::Rgr::io()->executor())) {
171 return std::dynamic_pointer_cast<CompactRange>(shared_from_this());
177 int64_t wait_ns = 60000000000 + 100000000 *
range->blocks.commitlog.size();
181 " waited %u-minutes for processing",
182 range->cfg->cid,
range->rid, uint32_t(wait_ns/60000000000));
186 if(
range->blocks.commitlog.cells_count(
true))
187 range->blocks.commitlog.commit_finalize();
190 if(
range->blocks.cellstores.size() >=
range->cfg->cellstore_max() * 2 &&
192 size_t split_at =
range->blocks.cellstores.size() / 2;
193 auto it =
range->blocks.cellstores.cbegin();
196 it =
range->blocks.cellstores.cbegin() + split_at;
198 if(!(*it)->interval.key_begin.equal((*(it-1))->interval.key_end))
200 }
while(++it !=
range->blocks.cellstores.cend());
202 if(it ==
range->blocks.cellstores.cend()) {
209 int err = splitter.
run();
227 uint8_t cointervaling =
range->cfg->log_compact_cointervaling();
232 if(
range->blocks.commitlog.need_compact(groups, {}, cointervaling) &&
235 &
range->blocks.commitlog, tnum, groups, cointervaling,
237 ptr->initial_commitlog_done(compact);
252 uint8_t cointervaling =
range->cfg->log_compact_cointervaling();
253 if(compact->
nfrags > 100 ||
254 compact->
ngroups > cointervaling ||
275 " early-split possible from scan offset ",
341 Task(
Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
343 Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
344 Task(
const Task&) =
delete;
348 void operator()() { ptr->
quit(); }
359 range->blocks.wait_processing();
360 range->blocks.commitlog.commit_finalize();
382 " finishing early-split scan offset ",
396 NextTask(
Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
398 NextTask(NextTask&& other) noexcept : ptr(std::move(other.ptr)) { }
399 NextTask(
const NextTask&) =
delete;
400 NextTask&
operator=(NextTask&&) =
delete;
401 NextTask&
operator=(
const NextTask&) =
delete;
402 ~NextTask() noexcept { }
424 size_t log_sz =
range->blocks.commitlog.size();
425 uint8_t cointervaling =
range->cfg->log_compact_cointervaling();
431 if(
range->blocks.commitlog.need_compact(
434 &
range->blocks.commitlog, tnum, groups, cointervaling,
436 ptr->commitlog_done(compact);
452 if(compact->
nfrags > 100 ||
459 size_t bytes =
range->blocks.commitlog.size_bytes_encoded();
460 float fits = float(bytes)/
cs_size;
461 if(
size_t(fits) + 1 >=
range->cfg->cellstore_max() &&
462 fits -
size_t(fits) >=
float(
range->cfg->compact_percent())/100) {
475 if(tnum &&
range->is_loaded())
493 if(
range->compacting_ifnot_applying(
497 if((median /= 1000000) < 1000)
503 m_chk_timer.expires_after(std::chrono::milliseconds(median));
507 TimerTask(
Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
509 TimerTask(TimerTask&& other) noexcept : ptr(std::move(other.ptr)) { }
510 TimerTask(
const TimerTask&) =
delete;
511 TimerTask&
operator=(TimerTask&&) =
delete;
512 TimerTask&
operator=(
const TimerTask&) =
delete;
513 ~TimerTask() noexcept { }
514 void operator()(
const asio::error_code& ec) {
515 if(ec != asio::error::operation_aborted)
544 Task(
Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
546 Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
547 Task(
const Task&) =
delete;
550 void operator()() { ptr->
range->scan_internal(ptr); }
561 NextTask(
Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
563 NextTask(NextTask&& other) noexcept : ptr(std::move(other.ptr)) { }
564 NextTask(
const NextTask&) =
delete;
565 NextTask&
operator=(NextTask&&) =
delete;
566 NextTask&
operator=(
const NextTask&) =
delete;
567 ~NextTask() noexcept { }
573 if(!in_block->is_last) {
575 in_block->finalize_interval(
false,
false);
578 in_block->_other =
nullptr;
593 NextTask(
Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
595 NextTask(NextTask&& other) noexcept : ptr(std::move(other.ptr)) { }
596 NextTask(
const NextTask&) =
delete;
597 NextTask&
operator=(NextTask&&) =
delete;
598 NextTask&
operator=(
const NextTask&) =
delete;
599 ~NextTask() noexcept { }
605 if(!in_block->is_last) {
610 in_block->_other =
nullptr;
626 if(in_block->is_last) {
672 ?
range->prev_range_end
678 uint32_t p =
range->cfg->compact_percent()/10;
679 if(csid ==
range->cfg->cellstore_max() * (p ? p : 1)) {
685 " reached cs-max(%u)",
720 auto max =
range->cfg->cellstore_max();
727 if((*it)->size < (
cs_size/100) *
range->cfg->compact_percent())
735 if(!(*it)->blocks.front()->header.interval.key_begin.equal(
736 (*(it-1))->blocks.back()->header.interval.key_end))
755 if(!
range->is_loaded())
760 bool empty_cs =
false;
764 for(uint32_t chk = 0; !
m_stopped && ptr.use_count() > 3; ++chk) {
770 range->cfg->cid,
range->rid,
size_t(ptr.use_count()));
773 if(ptr.use_count() > 3)
774 std::this_thread::sleep_for(std::chrono::microseconds(1));
782 bool any_begin =
false;
798 range->_get_interval(
822 range->blocks.wait_processing();
828 " fail(versions-over-cs) cs-count=" SWC_FMT_LD,
831 }
else if(split_at == -2) {
834 " skipping(last-cs-small) cs-count=" SWC_FMT_LD,
850 noexcept : ptr(a_ptr), split_at(a_split_at) {
852 ~ReqData() noexcept { }
854 cid_t get_cid()
const noexcept {
855 return ptr->range->cfg->cid;
862 bool valid() noexcept {
870 "Compact::Mngr::Req::RangeCreate err=%d(%s) "
874 if(rsp.
err && valid() &&
879 req->request_again();
883 ptr->split(rsp.
rid, split_at);
914 if(!col || col->removing())
923 auto new_range = col->internal_create(err, new_rid,
true);
925 new_range->internal_create_folders(err);
933 col->internal_remove(err, new_rid);
943 new_range->internal_create(err, new_cellstores);
950 col->internal_remove(err, new_rid);
955 if(
range->blocks.commitlog.cells_count()) {
959 range->blocks.commitlog.commit_finalize();
965 &
range->blocks.commitlog,
966 &new_range->blocks.commitlog
972 range->blocks.commitlog.commit_finalize();
973 new_range->blocks.commitlog.commit_finalize();
981 " unloading new-rid=" SWC_FMT_LU " reg-err=%d(%s)",
982 col->cfg->cid, ptr->range->rid, hdlr->range->rid,
985 col->internal_unload(hdlr->range->rid);
992 noexcept : ptr(a_ptr), new_rid(a_new_rid) {
994 ~ReqData() noexcept { }
996 cid_t get_cid()
const noexcept {
997 return ptr->range->cfg->cid;
1004 bool valid() noexcept {
1012 "Compact::Mngr::Req::RangeUnloaded err=%d(%s)"
1015 if(rsp.
err && valid() &&
1020 req->request_again();
1026 hdlr->range->cfg->cid, hdlr->range->rid),
1028 ptr, hdlr->range->
rid
1034 [t_measure, ptr=
shared()]
1040 ptr->range->cfg->cid, ptr->range->rid, t_measure.
elapsed());
1041 ptr->cellstores.back()->blocks.back()
1077 for(uint32_t chk = 0; ptr.use_count() > 3; ++chk) {
1078 if(chk == 3000000) {
1082 range->cfg->cid,
range->rid,
size_t(ptr.use_count()));
1085 if(ptr.use_count() > 3)
1086 std::this_thread::sleep_for(std::chrono::microseconds(1));
1108 range->compact_require(
false);