15 namespace SWC {
namespace Manager {
22 : col_checker(a_col_checker), rgr(a_rgr),
24 m_ranges(), m_checkings(0),
25 m_success(0), m_failures(0) {
29 if(!m_success && m_failures)
30 rgr->failures.fetch_add(1);
41 col_checker->col->need_health_check(
42 col_checker->check_ts, col_checker->check_intval,
43 ranges, rgr->rgrid, more
46 for(
auto& range : ranges)
55 int err, uint8_t
flags) {
60 while(!m_ranges.empty() && m_checkings < 10) {
61 auto r = m_ranges.front();
62 if(r->assigned() && r->get_rgr_id() == rgr->rgrid)
66 size_t sz = m_checkings + m_ranges.size();
67 more = sz < 10 ? 10 - sz : 0;
71 m_failures.fetch_add(1);
73 m_success.fetch_add(1);
75 rgr->failures.store(0);
77 col_checker->add_mergeable(range);
80 col_checker->col->set_unloaded(range);
88 range->cfg->cid, range->rid, rgr->rgrid.load(),
93 col_checker->finishing(
true);
97 if(m_checkings == 10) {
100 col_checker->completion.increment();
108 range->cfg->cid, range->rid, rgr->rgrid.load());
118 uint32_t a_check_intval)
138 for(
auto& range : ranges) {
139 if(!range->assigned())
141 rgrid = range->get_rgr_id();
144 auto it = std::find_if(
147 return rgrid == _checker->rgr->rgrid; }
164 checker->add_range(range);
210 return merger->run_master();
225 int err = _hdlr->state_error;
227 auto _col = _hdlr->get_columnn(meta_cid);
228 if(!(err = _col->error()) && !_col->empty())
229 _col->get_cells(merger->cells);
231 (err || merger->cells.empty())
232 ? merger->completion()
238 hdlr->scan(
col->cfg->key_seq, meta_cid, std::move(spec));
247 : col_checker(a_col_checker),
248 m_ranges(std::move(ranges)), cells(),
249 m_mutex(), m_mergers() {
257 for(
auto it=sorted.
cbegin(); it != sorted.
cend(); ++it) {
258 if((*it)->after(range)) {
269 for(
auto& range : sorted) {
273 }
else if(group.
empty()) {
276 }
else if(left->rid == group.
back()->rid) {
281 new RangesMerger(shared_from_this(), std::move(group)));
288 new RangesMerger(shared_from_this(), std::move(group)));
306 for(
auto& cell : cells) {
311 cell->get_value(v,
false);
312 const uint8_t* ptr = v.
base;
313 size_t remain = v.
size;
316 key_end.
decode(&ptr, &remain,
false);
320 Range::Ptr range = col_checker->col->get_range(rid);
327 }
else if(group.
empty()) {
330 }
else if(left->rid == group.
back()->rid) {
334 m_mergers.emplace_back(
335 new RangesMerger(shared_from_this(), std::move(group)));
342 m_mergers.emplace_back(
343 new RangesMerger(shared_from_this(), std::move(group)));
345 if(m_mergers.empty()) {
348 col_checker->col->cfg->cid, int64_t(m_ranges.size()));
353 col_checker->col->cfg->cid, int64_t(m_mergers.size()));
359 if(m_mergers.empty()) {
361 col_checker->col->cfg->cid);
365 auto merger = m_mergers.back();
366 m_mergers.erase(m_mergers.cend() - 1);
376 : col_merger(a_col_merger),
378 m_ranges(std::move(ranges)),
383 for(
auto& range : m_ranges) {
384 auto rgrid = range->get_rgr_id();
390 rgr, shared_from_this(), range)
395 range->cfg->cid, range->rid, rgr->rgrid.load());
411 m_ready.push_back(
nullptr);
412 }
else if(!empty && m_ranges.front() != _range) {
415 " NOT-EMPTY cancelling-merge",
416 _range->cfg->cid, _range->rid);
417 m_ready.push_back(_range);
420 }
else if(col_merger->col_checker->col->set_merging(_range)) {
421 m_ready.push_back(_range);
423 m_ready.push_back(
nullptr);
427 if(m_ranges.size() > m_ready.size())
432 col_merger->col_checker->col->state(m_err=
Error::OK);
434 for(
auto& range : m_ready) {
436 col_merger->col_checker->col->set_unloaded(range);
440 return col_merger->completion();
445 m_ranges.erase(m_ranges.cbegin());
447 const std::string main_range_path =
449 main_range->cfg->cid, main_range->rid);
450 const std::string main_cs_path =
461 fs->readdir(err, main_cs_path, files);
468 for(
auto& entry : files) {
469 if(entry.name.find(
".cs", entry.name.length()-3) != std::string::npos) {
470 auto idn = entry.name.substr(0, entry.name.length()-3);
471 entries.
push_back(strtoull(idn.c_str(),
nullptr, 0));
475 if(entries.
empty()) {
479 last_cs_id = entries.
back();
482 for(
auto& range : m_ranges) {
485 range->cfg->cid, range->rid);
491 if(err || !files.
empty())
497 fs->readdir(err, cs_path, files);
498 if(err || files.
size() != 1 || !files.
front().name.ends_with(
".cs"))
504 fs->rename(err, cs_path + files.
front().name, to_cs);
509 fs->rmdir(err, range_path);
517 for(
auto& range : merged)
518 col_merger->col_checker->col->remove_range(range->rid);
520 if(!merged.empty() &&
525 main_range->cfg->key_seq,
527 auto&
col = hdlr->create(
529 for(
auto& cell : col_merger->cells) {
531 cell->get_value(v,
false);
532 const uint8_t* ptr = v.
base;
533 size_t remain = v.
size;
536 key_end.
decode(&ptr, &remain,
false);
540 for(
auto& range : merged) {
541 if(rid == range->rid) {
549 hdlr->commit_if_need();
554 for(
auto& range : m_ranges) {
556 col_merger->col_checker->col->set_unloaded(range);
559 #ifdef SWC_RANGER_WITH_RANGEDATA
561 fs->remove(err, DB::RangeBase::get_path_range_data(main_range_path));
564 col_merger->col_checker->col->set_unloaded(main_range);
570 main_range->cfg->cid, int64_t(merged.size()), int64_t(m_ranges.size()),
572 return col_merger->completion();