SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Rangers.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
8 
11 
15 
17 
19 
20 
21 
22 namespace SWC { namespace Manager {
23 
24 
26  : cfg_rgr_failures(
27  Env::Config::settings()->get<Config::Property::Value_uint16_g>(
28  "swc.mngr.ranges.assign.Rgr.remove.failures")),
29  cfg_delay_rgr_chg(
30  Env::Config::settings()->get<Config::Property::Value_int32_g>(
31  "swc.mngr.ranges.assign.delay.onRangerChange")),
32  cfg_chk_assign(
33  Env::Config::settings()->get<Config::Property::Value_int32_g>(
34  "swc.mngr.ranges.assign.interval.check")),
35  cfg_assign_due(
36  Env::Config::settings()->get<Config::Property::Value_int32_g>(
37  "swc.mngr.ranges.assign.due")),
38  cfg_column_health_chk(
39  Env::Config::settings()->get<Config::Property::Value_int32_g>(
40  "swc.mngr.column.health.interval.check")),
41  cfg_column_health_chkers(
42  Env::Config::settings()->get<Config::Property::Value_int32_g>(
43  "swc.mngr.column.health.checks")),
44  cfg_column_health_chkers_delay(
45  Env::Config::settings()->get<Config::Property::Value_int32_g>(
46  "swc.mngr.column.health.checks.delay")),
47  m_run(true),
48  m_mutex(),
49  m_timer(asio::high_resolution_timer(app_io->executor())),
50  m_rangers(), m_rangers_resources(), m_assign(), m_assignments(0),
51  m_mutex_columns_check(),
52  m_columns_check(), m_columns_check_ts(0) {
53 }
54 
55 void Rangers::stop(bool shuttingdown) {
56  SWC_LOG(LOG_INFO, "Stopping Rangers");
57  if(shuttingdown)
58  m_run.store(false);
59  {
61  m_timer.cancel();
62  }
63  for(Ranger::Ptr h;;) {
64  {
66  if(m_rangers.empty())
67  break;
68  h = std::move(m_rangers.front());
70  }
71  h->stop();
72  h = nullptr;
73  }
74  if(shuttingdown) {
75  SWC_LOG(LOG_INFO, "Stopping Rangers wait_health_check");
77  SWC_LOG(LOG_INFO, "Stopping Rangers assignments");
78  while(m_assign.running())
79  std::this_thread::sleep_for(std::chrono::microseconds(100));
80  m_assign.stop();
81  }
82 }
83 
85 bool Rangers::empty() noexcept {
87  return m_rangers.empty();
88 }
89 
90 void Rangers::schedule_check(uint32_t t_ms) {
91  if(!m_run)
92  return;
93 
95 
96  auto set_in = std::chrono::milliseconds(t_ms);
97  auto set_on = m_timer.expiry();
98  auto now = asio::high_resolution_timer::clock_type::now();
99  if(set_on > now && set_on < now + set_in)
100  return;
101 
102  m_timer.expires_after(set_in);
103  struct TimerTask {
104  Rangers* ptr;
106  TimerTask(Rangers* a_ptr) noexcept : ptr(a_ptr) { }
107  void operator()(const asio::error_code& ec) {
108  if(ec != asio::error::operation_aborted)
109  ptr->schedule_run();
110  }
111  };
112  m_timer.async_wait(TimerTask(this));
113 
114  if(t_ms >= 5000)
115  SWC_LOGF(LOG_DEBUG, "Rangers scheduled in ms=%u", t_ms);
116 }
117 
119  int64_t chk = 0;
120  if(Env::Mngr::role()->is_active_role(DB::Types::MngrRole::RANGERS)) {
123  }
124  if(Env::Mngr::mngd_columns()->has_active()) {
125  assign_ranges();
127  if(chk > cfg_column_health_chk->get())
128  chk = cfg_column_health_chk->get();
129  }
130  if(chk)
131  schedule_check(chk);
132 }
133 
135  rgrid_t rgrid,
136  int err,
138  if(m_rangers_resources.add_and_more(rgrid, err, rsp))
139  return;
140 
142  RangerList changed;
144  {
146  for(auto& rgr : m_rangers) {
147  if(rgr->state == RangerState::ACK &&
148  rgr->failures >= cfg_rgr_failures->get()) {
149  if(sync_all)
150  Env::Mngr::columns()->set_rgr_unassigned(rgr->rgrid);
151  rgr->state.store(RangerState::MARKED_OFFLINE);
152  changed.push_back(rgr);
153  }
154  }
155  }
156  if(changed.empty()) {
159  }
160  if(!changed.empty())
161  changes(changed, sync_all);
162 }
163 
166  for(auto& rgr : m_rangers) {
167  if(rgr->rgrid == rgrid)
168  return rgr;
169  }
170  return nullptr;
171 }
172 
174 void Rangers::rgr_get(const rgrid_t rgrid, Comm::EndPoints& endpoints) {
176  for(auto& rgr : m_rangers) {
177  if(rgr->rgrid == rgrid) {
178  if(rgr->state & RangerState::ACK)
179  endpoints = rgr->endpoints;
180  break;
181  }
182  }
183 }
184 
185 void Rangers::rgr_get(const Ranger::Ptr& rgr, Comm::EndPoints& endpoints) {
187  endpoints = rgr->endpoints;
188 }
189 
190 void Rangers::rgr_list(const rgrid_t rgrid, RangerList& rangers) {
192  for(auto& rgr : m_rangers) {
193  if(!rgrid || rgr->rgrid == rgrid) {
194  rangers.push_back(rgr);
195  if(rgrid)
196  break;
197  }
198  }
199 }
200 
202  rgrid_t opt_rgrid) {
203  return rgr_set(endpoints, opt_rgrid)->rgrid;
204 }
205 
206 bool Rangers::rgr_ack_id(rgrid_t rgrid, const Comm::EndPoints& endpoints) {
207  bool ack = false;
208  Ranger::Ptr new_ack = nullptr;
209  {
210  uint8_t state;
212 
213  for(auto& h : m_rangers) {
214  if(Comm::has_endpoint(h->endpoints, endpoints) && rgrid == h->rgrid) {
215  state = h->state.load();
216  if(!(state & RangerState::ACK)) {
217  if(state != RangerState::ACK)
218  new_ack = h;
219  h->state.store(RangerState::ACK);
220  }
221  ack = true;
222  break;
223  }
224  }
225  }
226 
227  if(new_ack) {
228  RangerList chged;
229  chged.push_back(new_ack);
230  changes(chged);
231  schedule_check(500);
232  }
233  return ack;
234 }
235 
237  bool new_id_required = false;
238  {
240  for(auto& h : m_rangers) {
241  if(rgrid == h->rgrid) {
242  if(Comm::has_endpoint(h->endpoints, endpoints))
243  return 0; // zero=OK
244  new_id_required = true;
245  break;
246  }
247  }
248  }
249  return rgr_set_id(endpoints, new_id_required ? 0 : rgrid);
250 }
251 
253  Ranger::Ptr removed = nullptr;
254  {
256  for(auto it=m_rangers.cbegin(); it != m_rangers.cend(); ++it) {
257  if(Comm::has_endpoint((*it)->endpoints, endpoints)) {
258  removed = std::move(*it);
259  m_rangers.erase(it);
260  removed->state.store(RangerState::REMOVED);
261  Env::Mngr::columns()->set_rgr_unassigned(removed->rgrid);
262  break;
263  }
264  }
265  }
266  if(removed) {
267  removed->stop();
268  RangerList chged;
269  chged.push_back(removed);
270  changes(chged);
271  schedule_check(5000);
272  }
273 }
274 
275 
278  changes(m_rangers, true);
279 }
280 
281 void Rangers::update_status(const RangerList& new_rgr_status, bool sync_all) {
282  bool rangers_mngr = Env::Mngr::role()->is_active_role(
284 
285  if(rangers_mngr && !sync_all)
286  return;
287 
288  RangerList changed;
289  RangerList balance_rangers;
290  RangerList removing;
291  {
292  Ranger::Ptr h;
293  bool found;
294  bool chg;
295 
297 
298  for(auto& rgr_new : new_rgr_status) {
299  found = false;
300  for(auto it = m_rangers.begin(); it != m_rangers.cend(); ++it) {
301  h = *it;
302  if(!Comm::has_endpoint(h->endpoints, rgr_new->endpoints))
303  continue;
304 
305  found = true;
306  if((chg = rgr_new->rgrid != h->rgrid)) {
307  if(rangers_mngr) {
308  rgr_new->rgrid.store(h->rgrid);
309  } else {
310  rgrid_t was = h->rgrid.exchange(rgr_new->rgrid);
311  Env::Mngr::columns()->change_rgr(was, h->rgrid);
312  }
313  }
314  if(!Comm::equal_endpoints(rgr_new->endpoints, h->endpoints)) {
315  removing.push_back(h);
316  it->reset(new Ranger(*h.get(), rgr_new->endpoints));
317  (h = *it)->init_queue();
318  chg = true;
319  }
320  if(rgr_new->state != h->state) {
321  if(rgr_new->state & RangerState::ACK) {
322  if(h->state & RangerState::SHUTTINGDOWN) {
323  rgr_new->state.store(h->state);
324  } else if(rgr_new->state == RangerState::ACK &&
325  h->state == RangerState::MARKED_OFFLINE) {
326  cid_t cid_begin, cid_end = DB::Schema::NO_CID;
327  if(Env::Mngr::mngd_columns()->active(cid_begin, cid_end)) {
330  h, cid_begin, cid_end)
331  ));
332  }
333  h->failures.store(0);
334  }
335  } else {
337  if(rgr_new->state == RangerState::REMOVED) {
338  m_rangers.erase(it);
339  removing.push_back(h);
340  }
341  }
342  h->state.store(rgr_new->state);
343  chg = true;
344  }
345 
346  if(rgr_new->load_scale != h->load_scale) {
347  h->load_scale.store(rgr_new->load_scale);
348  chg = true;
349  }
350 
351  if(rgr_new->rebalance()) {
352  h->rebalance(rgr_new->rebalance());
353  balance_rangers.push_back(h);
354  } else {
355  h->rebalance(0);
356  }
357 
358  if(chg || sync_all)
359  changed.push_back(h);
360  break;
361  }
362 
363  if(!found) {
364  if(rgr_new->state == RangerState::ACK) {
365  rgr_new->init_queue();
366  m_rangers.push_back(rgr_new);
367  changed.push_back(rgr_new);
368  }
369  }
370  }
371 
372  for(auto& r : m_rangers)
373  r->interm_ranges.store(0);
374  }
375 
376  for(auto& h : removing) {
377  h->stop();
378  }
379 
380  for(auto& h : balance_rangers) {
382  Env::Mngr::columns()->assigned(h->rgrid, 1, ranges);
383  int err = Error::OK;
384  for(auto& range : ranges) {
385  auto col = Env::Mngr::columns()->get_column(err, range->cfg->cid);
386  if(err || !h->can_rebalance())
387  break;
389  new Comm::Protocol::Rgr::Req::RangeUnload(h, col, range, true)
390  ));
391  h->interm_ranges.fetch_add(1);
392  }
393  if(h->rebalance() && !sync_all) {
394  if(std::find(changed.cbegin(), changed.cend(), h) == changed.cend())
395  changed.push_back(h);
396  }
397  }
398 
399  if(!changed.empty())
400  changes(changed, sync_all && !rangers_mngr);
401 }
402 
404  int64_t revision,
405  int err, bool failure, bool verbose) {
406  bool run_assign = m_assignments.fetch_sub(1) == cfg_assign_due->get();
407  if(!range->deleted()) {
408  if(err) {
409 
410  if(failure) {
411  rgr->failures.fetch_add(1);
412 
413  } else if(err == Error::SERVER_SHUTTING_DOWN &&
414  rgr->state == RangerState::ACK) {
415  rgr->state.fetch_or(RangerState::SHUTTINGDOWN);
416  RangerList chged;
417  chged.push_back(rgr);
418  changes(chged, true);
419  schedule_check(1000);
420  }
421 
422  range->set_state_none();
423  if(!run_assign)
424  schedule_check(2000);
425  if(verbose)
427  Error::print(SWC_LOG_OSTREAM << "RANGE-STATUS ", err);
428  range->print(SWC_LOG_OSTREAM << ", ");
429  );
430 
431  } else {
432  rgr->failures.store(0);
433  range->set_state_assigned(rgr->rgrid, revision);
434  }
435  }
436 
437  if(run_assign)
438  assign_ranges();
439 }
440 
441 
442 bool Rangers::update(const Column::Ptr& col, const DB::Schema::Ptr& schema,
443  uint64_t req_id, bool ack_required) {
444  Core::Vector<rgrid_t> rgrids;
445  col->need_schema_sync(schema->revision, rgrids);
446  bool undergo = false;
447  for(rgrid_t rgrid : rgrids) {
449  for(auto& rgr : m_rangers) {
450  if(rgr->failures < cfg_rgr_failures->get() &&
451  rgr->state == RangerState::ACK && rgr->rgrid == rgrid) {
452  undergo = true;
453  if(ack_required) {
454  rgr->put(
457  rgr, col, schema, req_id)
458  )
459  );
460  }
461  }
462  }
463  }
464  return undergo;
465 }
466 
467 void Rangers::column_delete(const DB::Schema::Ptr& schema, uint64_t req_id,
468  const Core::Vector<rgrid_t>& rgrids) {
469  for(rgrid_t rgrid : rgrids) {
471  for(auto& rgr : m_rangers) {
472  if(rgrid != rgr->rgrid)
473  continue;
475  new Comm::Protocol::Rgr::Req::ColumnDelete(rgr, schema, req_id)
476  ));
477  }
478  }
479 }
480 
482  Core::Vector<rgrid_t> rgrids;
483  col->assigned(rgrids);
484  for(rgrid_t rgrid : rgrids) {
486  for(auto& rgr : m_rangers) {
487  if(rgr->failures < cfg_rgr_failures->get() &&
488  rgr->state == RangerState::ACK && rgr->rgrid == rgrid) {
490  new Comm::Protocol::Rgr::Req::ColumnCompact(col->cfg->cid)
491  ));
492  }
493  }
494  }
495 }
496 
498  {
500  auto it = std::find_if(
502  [&col](const ColumnHealthCheck::Ptr chk) { return chk->col == col; });
503  if(it != m_columns_check.cend())
504  return;
505  }
506  col->reset_health_check();
508 }
509 
511  {
513  auto it = std::find(
515  if(it != m_columns_check.cend())
517  }
519 }
520 
522  for(;;) {
523  {
525  auto it = cid == DB::Schema::NO_CID
527  : std::find_if(m_columns_check.cbegin(), m_columns_check.cend(),
528  [cid](const ColumnHealthCheck::Ptr chk) {
529  return chk->col->cfg->cid == cid; });
530  if(it == m_columns_check.cend())
531  return;
532  }
533  std::this_thread::sleep_for(std::chrono::microseconds(100));
534  }
535 }
536 
537 void Rangers::print(std::ostream& out) {
538  out << "Rangers:";
540  for(auto& h : m_rangers)
541  h->print(out << "\n ");
542 }
543 
545  if(!Env::Mngr::mngd_columns()->expected_ready() ||
547  return schedule_check(5001);
548 
549  struct Task {
550  Rangers* ptr;
552  Task(Rangers* a_ptr) noexcept : ptr(a_ptr) { }
553  void operator()() { ptr->assign_ranges_run(); }
554  };
555  if(m_run && !m_assign.running())
556  Env::Mngr::post(Task(this));
557 }
558 
560  auto& columns = *Env::Mngr::columns();
561  int err;
562  DB::Schema::Ptr schema;
563  Column::Ptr col;
564  Range::Ptr range;
565  Ranger::Ptr rgr;
566 
567  for(bool state; ;) {
568  {
570  state = m_rangers.empty() || !m_run;
571  }
572  if(state) {
573  m_assign.stop();
574  return schedule_check();
575  }
576  if(!(range = columns.get_next_unassigned(state=false))) {
577  m_assign.stop();
578  if(state) // waiting-on-meta-ranges
579  return schedule_check(2000);
580  break;
581  }
582  SWC_LOGF(LOG_DEBUG, "Assigning Range(" SWC_FMT_LU "/" SWC_FMT_LU ")",
583  range->cfg->cid, range->rid);
584  next_rgr(range, rgr = nullptr);
585  if(!rgr) {
586  m_assign.stop();
587  return schedule_check();
588  }
589  if(!(schema = Env::Mngr::schemas()->get(range->cfg->cid)) ||
590  !(col = columns.get_column(err = Error::OK, range->cfg->cid))) {
591  continue;
592  }
593 
594  range->set_state_queued(rgr->rgrid);
595  rgr->interm_ranges.fetch_add(1);
596  state = m_assignments.add_rslt(1) < cfg_assign_due->get();
597  if(!state)
598  m_assign.stop();
599 
601  new Comm::Protocol::Rgr::Req::RangeLoad(rgr, col, range, schema)
602  ));
603  if(!state)
604  return;
605  }
606 
608 }
609 
610 void Rangers::next_rgr(const Range::Ptr& range, Ranger::Ptr& _rgr_set) {
611  size_t n_rgrs = 0;
612  size_t avg_ranges = 0;
613 
614  const DB::RgrData& last_rgr = range->get_last_rgr();
616  return;
617  uint8_t state;
619 
620  for(auto& rgr : m_rangers) {
621  state = rgr->state.load();
622  if(state == RangerState::MARKED_OFFLINE) {
623  continue;
624 
625  } else if(rgr->failures >= cfg_rgr_failures->get()) {
626  Env::Mngr::columns()->set_rgr_unassigned(rgr->rgrid);
627  rgr->state.store(RangerState::MARKED_OFFLINE);
628  RangerList chged;
629  chged.push_back(rgr);
630  _changes(chged, true);
631 
632  } else if(state & RangerState::ACK) {
633  if(!last_rgr.endpoints.empty() &&
634  !(state & RangerState::SHUTTINGDOWN) &&
635  Comm::has_endpoint(rgr->endpoints, last_rgr.endpoints)) {
636  _rgr_set = rgr;
637  return;
638  }
639  avg_ranges += rgr->interm_ranges;
640  ++n_rgrs;
641  }
642  }
643  if(!n_rgrs)
644  return;
645 
646  avg_ranges /= n_rgrs;
647  uint16_t best = 0;
648  size_t interm_ranges = UINT64_MAX;
649  for(auto& rgr : m_rangers) {
650  state = rgr->state.load();
651  if(state & RangerState::ACK &&
652  avg_ranges >= rgr->interm_ranges &&
653  interm_ranges >= rgr->interm_ranges &&
654  rgr->load_scale >= best &&
655  (!(state & RangerState::SHUTTINGDOWN) ||
656  (n_rgrs == 1 && !DB::Types::SystemColumn::is_data(range->cfg->cid)) )) {
657  best = rgr->load_scale;
658  interm_ranges = rgr->interm_ranges;
659  _rgr_set = rgr;
660  }
661  }
662 }
663 
665  bool support;
666  if(!m_run ||
667  !Env::Mngr::mngd_columns()->expected_ready() ||
669  return;
670  int64_t ts = Time::now_ms();
671  if(ts - m_columns_check_ts < cfg_column_health_chkers_delay->get()) {
673  } else {
674  struct Task {
677  Task(const ColumnHealthCheck::Ptr& a_ptr) noexcept : ptr(a_ptr) { }
679  Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
680  Task(const Task&) = delete;
681  Task& operator=(Task&&) = delete;
682  Task& operator=(const Task&) = delete;
683  ~Task() noexcept { }
684  void operator()() { ptr->run(); }
685  };
686  m_columns_check_ts = ts;
687  uint32_t intval = cfg_column_health_chk->get();
688  for(Column::Ptr col;
690  (col = Env::Mngr::columns()->get_need_health_check(ts, intval)); ) {
692  new ColumnHealthCheck(col, ts, intval))));
693  }
694  }
695  m_mutex_columns_check.unlock(support);
696 }
697 
698 
700  rgrid_t opt_rgrid) {
701  Ranger::Ptr removing;
702  Ranger::Ptr rgr;
703  {
705 
706  for(auto it = m_rangers.begin(); it != m_rangers.cend(); ++it) {
707  if(Comm::has_endpoint((*it)->endpoints, endpoints)) {
708  if((*it)->state & RangerState::ACK) {
709  if(Comm::equal_endpoints(endpoints, (*it)->endpoints)) {
710  rgr = *it;
711  } else {
712  removing = *it;
713  it->reset(new Ranger(*removing.get(), endpoints));
714  (rgr = *it)->init_queue();
715  }
716  goto _return;
717  }
718  removing = *it;
719  m_rangers.erase(it);
720  Env::Mngr::columns()->set_rgr_unassigned(removing->rgrid);
721  break;
722  }
723  }
724 
725  rgrid_t next_id=0;
726  rgrid_t nxt;
727  bool ok;
728  do {
729  if(!opt_rgrid) {
730  nxt = ++next_id;
731  } else {
732  nxt = opt_rgrid;
733  opt_rgrid = 0;
734  }
735 
736  ok = true;
737  for(auto& h : m_rangers) {
738  if(nxt == h->rgrid) {
739  ok = false;
740  break;
741  }
742  }
743  } while(!ok);
744 
745  rgr = m_rangers.emplace_back(new Ranger(nxt, endpoints));
746  rgr->init_queue();
747  }
748 
749  _return:
750  if(removing)
751  removing->stop();
752  return rgr;
753 }
754 
755 
756 void Rangers::changes(const RangerList& hosts, bool sync_all) {
757  {
759  _changes(hosts, sync_all);
760  }
761 
762  if(Env::Mngr::mngd_columns()->has_active())
764 }
765 
766 void Rangers::_changes(const RangerList& hosts, bool sync_all) {
767  if(hosts.empty())
768  return;
769 
770  // batches of 1000 hosts a req
771  for(auto it = hosts.cbegin(); it != hosts.cend(); ) {
772  auto from = it;
773  size_t n = hosts.cend() - from;
774  it += n > 1000 ? 1000 : n;
778  RangerList(from, it),
779  sync_all
780  )
781  )
782  );
783  }
784 
786  SWC_LOG_OSTREAM << " Rangers::changes:";
787  for(auto& h : hosts)
788  h->print(SWC_LOG_OSTREAM << "\n ");
789  );
790 }
791 
792 
793 
794 }} // namespace SWC::Manager
795 
796 
797 
798 
SWC::Core::Vector::erase
SWC_CAN_INLINE iterator erase(size_type offset) noexcept(_NoExceptMoveAssign &&_NoExceptDestructor)
Definition: Vector.h:464
SWC::Manager::Rangers::cfg_chk_assign
const Config::Property::Value_int32_g::Ptr cfg_chk_assign
Definition: Rangers.h:23
SWC::Comm::Protocol::Rgr::Req::ColumnUpdate
Definition: ColumnUpdate.h:14
SWC::Core::Vector::front
constexpr SWC_CAN_INLINE reference front() noexcept
Definition: Vector.h:243
SWC::Comm::Protocol::Rgr::Req::ColumnDelete
Definition: ColumnDelete.h:14
SWC::Core::StateRunning::stop
constexpr SWC_CAN_INLINE void stop() noexcept
Definition: StateRunning.h:32
ColumnCompact.h
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Comm::Protocol::Rgr::Req::ColumnsUnload
Definition: ColumnsUnload.h:14
ReportRes.cc
ColumnMng.h
SWC::Error::SERVER_SHUTTING_DOWN
@ SERVER_SHUTTING_DOWN
Definition: Error.h:84
SWC::DB::SchemaPrimitives::NO_CID
static constexpr const cid_t NO_CID
Definition: Schema.h:25
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::DB::Schema::Ptr
std::shared_ptr< Schema > Ptr
Definition: Schema.h:185
SWC::Config::Property::Value_uint16_g::get
SWC_CAN_INLINE uint16_t get() const noexcept
Definition: Property.h:572
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Manager::Rangers::health_check_columns
void health_check_columns()
Definition: Rangers.cc:664
SWC::Comm::Protocol::Rgr::Req::ColumnCompact::Ptr
std::shared_ptr< ColumnCompact > Ptr
Definition: ColumnCompact.h:18
SWC::Comm::Protocol::Mngr::Req::RgrUpdate
Definition: RgrUpdate.h:15
ColumnDelete.h
SWC::Comm::Protocol::Rgr::Req::ColumnDelete::Ptr
std::shared_ptr< ColumnDelete > Ptr
Definition: ColumnDelete.h:16
SWC::Env::Mngr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: MngrEnv.h:39
SWC::Manager::Rangers::column_compact
void column_compact(const Column::Ptr &col)
Definition: Rangers.cc:481
SWC::Manager::ColumnHealthCheck::Ptr
std::shared_ptr< ColumnHealthCheck > Ptr
Definition: ColumnHealthCheck.h:23
SWC::Manager::Rangers::rgr_set
Ranger::Ptr rgr_set(const Comm::EndPoints &endpoints, rgrid_t opt_rgrid=0)
Definition: Rangers.cc:699
ColumnUpdate.h
SWC::DB::Types::SystemColumn::is_data
constexpr SWC_CAN_INLINE bool is_data(cid_t cid) noexcept
Definition: SystemColumn.h:46
Rangers.h
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::Manager::MngdColumns::active
bool active(cid_t &cid_begin, cid_t &cid_end) noexcept
Definition: MngdColumns.cc:146
SWC::Comm::IoContextPtr
std::shared_ptr< IoContext > IoContextPtr
Definition: IoContext.h:16
SWC::Manager::RangersResources::add_and_more
bool add_and_more(rgrid_t rgrid, int err, const Comm::Protocol::Rgr::Params::Report::RspRes &rsp)
Definition: RangersResources.h:109
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Env::Mngr::columns
static SWC_CAN_INLINE Manager::Columns * columns() noexcept
Definition: MngrEnv.h:59
SWC::Manager::Rangers::schedule_run
void schedule_run()
Definition: Rangers.cc:118
SWC::Manager::Rangers::_changes
void _changes(const RangerList &hosts, bool sync_all=false)
Definition: Rangers.cc:766
SWC::Manager::Rangers::update
bool update(const Column::Ptr &col, const DB::Schema::Ptr &schema, uint64_t req_id, bool ack_required)
Definition: Rangers.cc:442
SWC::Manager::Rangers::rgr_had_id
rgrid_t rgr_had_id(rgrid_t rgrid, const Comm::EndPoints &endpoints)
Definition: Rangers.cc:236
SWC::Core::MutexSptd::unlock
SWC_CAN_INLINE void unlock(const bool &support) noexcept
Definition: MutexSptd.h:71
SWC::Config::Property::Value_int32_g::get
SWC_CAN_INLINE int32_t get() const noexcept
Definition: Property.h:610
SWC::Comm::has_endpoint
bool SWC_PURE_FUNC has_endpoint(const EndPoint &e1, const EndPoints &endpoints_in) noexcept
Definition: Resolver.cc:93
SWC::Manager::Range::Ptr
std::shared_ptr< Range > Ptr
Definition: Range.h:24
SWC::Manager::Rangers::m_columns_check_ts
int64_t m_columns_check_ts
Definition: Rangers.h:125
SWC::Env::Mngr::is_shuttingdown
static SWC_CAN_INLINE bool is_shuttingdown() noexcept
Definition: MngrEnv.h:79
SWC::Manager::Rangers::m_mutex_columns_check
Core::MutexSptd m_mutex_columns_check
Definition: Rangers.h:123
SWC::Manager::RangersResources::changes
void changes(const RangerList &rangers, RangerList &changed)
Definition: RangersResources.h:186
SWC::Manager::Rangers::need_health_check
void need_health_check(const Column::Ptr &col)
Definition: Rangers.cc:497
SWC::Comm::Protocol::Mngr::Req::RgrUpdate::Ptr
std::shared_ptr< RgrUpdate > Ptr
Definition: RgrUpdate.h:17
ColumnUpdate.cc
SWC::Manager::Rangers::operator=
Rangers & operator=(const Rangers &)=delete
SWC::Manager::Rangers::range_loaded
void range_loaded(Ranger::Ptr rgr, Range::Ptr range, int64_t revision, int err, bool failure=false, bool verbose=true)
Definition: Rangers.cc:403
SWC::Manager::Columns::assigned
void assigned(rgrid_t rgrid, size_t num, Core::Vector< Range::Ptr > &ranges)
Definition: Columns.h:123
SWC::DB::Types::MngrRangerState::ACK
const uint8_t ACK
Definition: MngrRangerState.h:18
SWC::Core::Atomic::add_rslt
constexpr SWC_CAN_INLINE T add_rslt(T v) noexcept
Definition: Atomic.h:120
RangeLoad.cc
SWC::Manager::ColumnHealthCheck
Definition: ColumnHealthCheck.h:20
SWC::Manager::Rangers::m_assign
Core::StateRunning m_assign
Definition: Rangers.h:120
SWC::Manager::Rangers::m_rangers
RangerList m_rangers
Definition: Rangers.h:117
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::DB::Types::MngrRangerState::SHUTTINGDOWN
const uint8_t SHUTTINGDOWN
Definition: MngrRangerState.h:21
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC::Manager::Rangers::rgr_set_id
rgrid_t rgr_set_id(const Comm::EndPoints &endpoints, rgrid_t opt_rgrid=0)
Definition: Rangers.cc:201
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::Comm::Protocol::Rgr::Req::ColumnUpdate::Ptr
std::shared_ptr< ColumnUpdate > Ptr
Definition: ColumnUpdate.h:16
SWC::Manager::Rangers::cfg_delay_rgr_chg
const Config::Property::Value_int32_g::Ptr cfg_delay_rgr_chg
Definition: Rangers.h:22
SWC_LOG
#define SWC_LOG(priority, message)
Definition: Logger.h:191
SWC::Manager::Rangers::changes
void changes(const RangerList &hosts, bool sync_all=false)
Definition: Rangers.cc:756
SWC::Manager::Rangers::rgr_list
void rgr_list(const rgrid_t rgrid, RangerList &rangers)
Definition: Rangers.cc:190
SWC::Manager::Rangers::rgr_ack_id
bool rgr_ack_id(rgrid_t rgrid, const Comm::EndPoints &endpoints)
Definition: Rangers.cc:206
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Env::Mngr::mngd_columns
static SWC_CAN_INLINE Manager::MngdColumns * mngd_columns() noexcept
Definition: MngrEnv.h:74
ColumnsUnload.h
SWC::Manager::Columns::set_rgr_unassigned
void set_rgr_unassigned(rgrid_t rgrid)
Definition: Columns.h:111
RangeLoad.h
SWC::DB::Types::MngrRangerState::REMOVED
const uint8_t REMOVED
Definition: MngrRangerState.h:19
SWC::Comm::Protocol::Rgr::Req::RangeLoad
Definition: RangeLoad.h:13
SWC::Manager::Rangers::m_mutex
Core::MutexSptd m_mutex
Definition: Rangers.h:115
SWC::Manager::Rangers::rgr_report
void rgr_report(rgrid_t rgrid, int err, const Comm::Protocol::Rgr::Params::Report::RspRes &rsp)
Definition: Rangers.cc:134
SWC::Manager::Rangers::stop
void stop(bool shuttingdown=true)
Definition: Rangers.cc:55
SWC::Manager::MngrRole::is_active_role
bool is_active_role(uint8_t role)
Definition: MngrRole.cc:80
SWC::Manager::RangersResources::check
int64_t check(const RangerList &rangers)
Definition: RangersResources.h:82
SWC::Manager::Ranger::Ptr
std::shared_ptr< Ranger > Ptr
Definition: Ranger.h:21
SWC::Core::StateRunning::running
constexpr SWC_CAN_INLINE bool running() noexcept
Definition: StateRunning.h:37
SWC::rgrid_t
uint64_t rgrid_t
Definition: Identifiers.h:18
SWC::Manager::Rangers::rgr_shutdown
void rgr_shutdown(rgrid_t rgrid, const Comm::EndPoints &endpoints)
Definition: Rangers.cc:252
SWC::DB::Types::MngrRangerState::MARKED_OFFLINE
const uint8_t MARKED_OFFLINE
Definition: MngrRangerState.h:20
SWC::Manager::Rangers::rgr_get
Ranger::Ptr rgr_get(const rgrid_t rgrid)
Definition: Rangers.cc:164
SWC::Manager::Rangers::sync
void sync()
Definition: Rangers.cc:277
SWC::DB::RgrData
Definition: RgrData.h:24
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::Manager::Rangers::m_rangers_resources
RangersResources m_rangers_resources
Definition: Rangers.h:118
SWC::Manager::Columns::change_rgr
void change_rgr(rgrid_t rgrid_old, rgrid_t rgrid)
Definition: Columns.h:117
SWC::Manager::Rangers::next_rgr
void next_rgr(const Range::Ptr &range, Ranger::Ptr &rgr_set)
Definition: Rangers.cc:610
SWC::cid_t
uint64_t cid_t
Definition: Identifiers.h:16
RangeUnload.h
SWC::Comm::Protocol::Rgr::Req::RangeLoad::Ptr
std::shared_ptr< RangeLoad > Ptr
Definition: RangeLoad.h:15
SWC::Manager::Rangers::cfg_rgr_failures
const Config::Property::Value_uint16_g::Ptr cfg_rgr_failures
Definition: Rangers.h:21
SWC::Core::Vector< Ranger::Ptr >
SWC::Manager::Rangers::schedule_check
void schedule_check(uint32_t t_ms=10000)
Definition: Rangers.cc:90
SWC::Manager::Rangers::cfg_column_health_chkers
const Config::Property::Value_int32_g::Ptr cfg_column_health_chkers
Definition: Rangers.h:27
SWC::Condition::from
Comp from(const char **buf, uint32_t *remainp, uint8_t extended=0x00) noexcept
Definition: Comparators.cc:15
SWC::Manager::Rangers::cfg_column_health_chk
const Config::Property::Value_int32_g::Ptr cfg_column_health_chk
Definition: Rangers.h:26
SWC::Manager::Columns::get_column
Column::Ptr get_column(int &err, const cid_t cid)
Definition: Columns.h:71
RgrUpdate.h
SWC::Manager::Columns::get_need_health_check
Column::Ptr get_need_health_check(int64_t ts, uint32_t ms)
Definition: Columns.h:143
SWC::Manager::RangersResources::evaluate
void evaluate()
Definition: RangersResources.h:122
SWC::Env::Mngr::schemas
static SWC_CAN_INLINE Manager::Schemas * schemas() noexcept
Definition: MngrEnv.h:54
SWC::Manager::Rangers::m_run
Core::AtomicBool m_run
Definition: Rangers.h:113
ColumnDelete.cc
SWC::Manager::Rangers::assign_ranges
void assign_ranges()
Definition: Rangers.cc:544
SWC::Core::Vector::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::Manager::Rangers::print
void print(std::ostream &out)
Definition: Rangers.cc:537
SWC::Manager::Rangers::assign_ranges_run
void assign_ranges_run()
Definition: Rangers.cc:559
SWC::Manager::Rangers::m_columns_check
ColumnHealthChecks m_columns_check
Definition: Rangers.h:124
SWC::Time::now_ms
SWC_CAN_INLINE int64_t now_ms() noexcept
Definition: Time.h:36
SWC::Manager::Rangers::cfg_column_health_chkers_delay
const Config::Property::Value_int32_g::Ptr cfg_column_health_chkers_delay
Definition: Rangers.h:28
SWC::Env::Mngr::role
static SWC_CAN_INLINE Manager::MngrRole * role() noexcept
Definition: MngrEnv.h:64
SWC::Manager::MngrRole::req_mngr_inchain
void req_mngr_inchain(const Comm::client::ConnQueue::ReqBase::Ptr &req)
Definition: MngrRole.cc:137
SWC::Manager::Rangers::wait_health_check
void wait_health_check(cid_t cid=DB::Schema::NO_CID)
Definition: Rangers.cc:521
SWC::DB::RgrData::endpoints
Comm::EndPoints endpoints
Definition: RgrData.h:29
SWC::Manager::Rangers
Definition: Rangers.h:18
SWC::Manager::Rangers::m_assignments
Core::Atomic< int32_t > m_assignments
Definition: Rangers.h:121
SWC::Manager::Rangers::health_check_finished
void health_check_finished(const ColumnHealthCheck::Ptr &chk)
Definition: Rangers.cc:510
SWC::Manager::Column::Ptr
std::shared_ptr< Column > Ptr
Definition: Column.h:24
SWC::Comm::Protocol::Rgr::Req::ColumnsUnload::Ptr
std::shared_ptr< ColumnsUnload > Ptr
Definition: ColumnsUnload.h:16
SWC::Comm::Protocol::Rgr::Req::RangeUnload
Definition: RangeUnload.h:15
SWC::Manager::RangerList
Core::Vector< Ranger::Ptr > RangerList
Definition: Ranger.h:146
SWC::Manager::Ranger
Definition: Ranger.h:17
SWC::Manager::Rangers::column_delete
void column_delete(const DB::Schema::Ptr &schema, uint64_t req_id, const Core::Vector< rgrid_t > &rgrids)
Definition: Rangers.cc:467
SWC::DB::Types::MngrRole::RANGERS
const uint8_t RANGERS
Definition: MngrRole.h:16
SWC::Manager::Rangers::empty
bool empty() noexcept
Definition: Rangers.cc:85
SWC::Manager::Rangers::m_timer
asio::high_resolution_timer m_timer
Definition: Rangers.h:116
SWC::Comm::Protocol::Rgr::Req::RangeUnload::Ptr
std::shared_ptr< RangeUnload > Ptr
Definition: RangeUnload.h:17
SWC::Core::Atomic::fetch_sub
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
Definition: Atomic.h:88
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::Error::print
void print(std::ostream &out, int err)
Definition: Error.cc:191
SWC::Core::Vector::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Comm::Protocol::Rgr::Params::Report::RspRes
Definition: Report.h:53
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Core::MutexSptd::try_full_lock
bool try_full_lock(bool &support) noexcept
Definition: MutexSptd.h:55
SWC::Manager::Rangers::update_status
void update_status(const RangerList &new_rgr_status, bool sync_all)
Definition: Rangers.cc:281
SWC::Comm::Protocol::Rgr::Req::ColumnCompact
Definition: ColumnCompact.h:16
SWC::Core::Vector::emplace_back
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
Definition: Vector.h:349
SWC::Manager::Rangers::Rangers
Rangers(const Comm::IoContextPtr &app_io)
Definition: Rangers.cc:25
SWC::DB::Types::MngrRole::COLUMNS
const uint8_t COLUMNS
Definition: MngrRole.h:14
SWC::Comm::equal_endpoints
bool SWC_PURE_FUNC equal_endpoints(const EndPoints &endpoints1, const EndPoints &endpoints2) noexcept
Definition: Resolver.cc:108
SWC::Manager::Rangers::cfg_assign_due
const Config::Property::Value_int32_g::Ptr cfg_assign_due
Definition: Rangers.h:24
SWC::Core::Vector::begin
constexpr SWC_CAN_INLINE iterator begin() noexcept
Definition: Vector.h:211