|
SWC-DB
v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
|
Go to the documentation of this file.
22 namespace SWC {
namespace Manager {
27 Env::Config::settings()->get<Config::Property::Value_uint16_g>(
28 "swc.mngr.ranges.assign.Rgr.remove.failures")),
30 Env::Config::settings()->get<Config::Property::Value_int32_g>(
31 "swc.mngr.ranges.assign.delay.onRangerChange")),
33 Env::Config::settings()->get<Config::Property::Value_int32_g>(
34 "swc.mngr.ranges.assign.interval.check")),
36 Env::Config::settings()->get<Config::Property::Value_int32_g>(
37 "swc.mngr.ranges.assign.due")),
38 cfg_column_health_chk(
39 Env::Config::settings()->get<Config::Property::Value_int32_g>(
40 "swc.mngr.column.health.interval.check")),
41 cfg_column_health_chkers(
42 Env::Config::settings()->get<Config::Property::Value_int32_g>(
43 "swc.mngr.column.health.checks")),
44 cfg_column_health_chkers_delay(
45 Env::Config::settings()->get<Config::Property::Value_int32_g>(
46 "swc.mngr.column.health.checks.delay")),
49 m_timer(asio::high_resolution_timer(app_io->executor())),
50 m_rangers(), m_rangers_resources(), m_assign(), m_assignments(0),
51 m_mutex_columns_check(),
52 m_columns_check(), m_columns_check_ts(0) {
79 std::this_thread::sleep_for(std::chrono::microseconds(100));
96 auto set_in = std::chrono::milliseconds(t_ms);
98 auto now = asio::high_resolution_timer::clock_type::now();
99 if(set_on > now && set_on < now + set_in)
106 TimerTask(
Rangers* a_ptr) noexcept : ptr(a_ptr) { }
107 void operator()(
const asio::error_code& ec) {
108 if(ec != asio::error::operation_aborted)
112 m_timer.async_wait(TimerTask(
this));
156 if(changed.
empty()) {
167 if(rgr->rgrid == rgrid)
177 if(rgr->rgrid == rgrid) {
179 endpoints = rgr->endpoints;
187 endpoints = rgr->endpoints;
193 if(!rgrid || rgr->rgrid == rgrid) {
203 return rgr_set(endpoints, opt_rgrid)->rgrid;
215 state = h->state.load();
237 bool new_id_required =
false;
241 if(rgrid == h->rgrid) {
244 new_id_required =
true;
249 return rgr_set_id(endpoints, new_id_required ? 0 : rgrid);
258 removed = std::move(*it);
285 if(rangers_mngr && !sync_all)
298 for(
auto& rgr_new : new_rgr_status) {
306 if((chg = rgr_new->rgrid != h->rgrid)) {
308 rgr_new->rgrid.store(h->rgrid);
310 rgrid_t was = h->rgrid.exchange(rgr_new->rgrid);
316 it->reset(
new Ranger(*h.get(), rgr_new->endpoints));
317 (h = *it)->init_queue();
320 if(rgr_new->state != h->state) {
323 rgr_new->state.store(h->state);
330 h, cid_begin, cid_end)
333 h->failures.store(0);
342 h->state.store(rgr_new->state);
346 if(rgr_new->load_scale != h->load_scale) {
347 h->load_scale.store(rgr_new->load_scale);
351 if(rgr_new->rebalance()) {
352 h->rebalance(rgr_new->rebalance());
365 rgr_new->init_queue();
373 r->interm_ranges.store(0);
376 for(
auto& h : removing) {
380 for(
auto& h : balance_rangers) {
384 for(
auto& range : ranges) {
386 if(err || !h->can_rebalance())
391 h->interm_ranges.fetch_add(1);
393 if(h->rebalance() && !sync_all) {
394 if(std::find(changed.
cbegin(), changed.
cend(), h) == changed.
cend())
400 changes(changed, sync_all && !rangers_mngr);
405 int err,
bool failure,
bool verbose) {
407 if(!range->deleted()) {
411 rgr->failures.fetch_add(1);
422 range->set_state_none();
432 rgr->failures.store(0);
433 range->set_state_assigned(rgr->rgrid, revision);
443 uint64_t req_id,
bool ack_required) {
445 col->need_schema_sync(schema->revision, rgrids);
446 bool undergo =
false;
457 rgr, col, schema, req_id)
472 if(rgrid != rgr->rgrid)
483 col->assigned(rgrids);
500 auto it = std::find_if(
506 col->reset_health_check();
529 return chk->col->cfg->cid == cid; });
533 std::this_thread::sleep_for(std::chrono::microseconds(100));
541 h->print(out <<
"\n ");
552 Task(
Rangers* a_ptr) noexcept : ptr(a_ptr) { }
576 if(!(range = columns.get_next_unassigned(state=
false))) {
583 range->cfg->cid, range->rid);
590 !(col = columns.get_column(err =
Error::OK, range->cfg->cid))) {
594 range->set_state_queued(rgr->rgrid);
595 rgr->interm_ranges.fetch_add(1);
612 size_t avg_ranges = 0;
614 const DB::RgrData& last_rgr = range->get_last_rgr();
621 state = rgr->state.load();
639 avg_ranges += rgr->interm_ranges;
646 avg_ranges /= n_rgrs;
648 size_t interm_ranges = UINT64_MAX;
650 state = rgr->state.load();
652 avg_ranges >= rgr->interm_ranges &&
653 interm_ranges >= rgr->interm_ranges &&
654 rgr->load_scale >= best &&
657 best = rgr->load_scale;
658 interm_ranges = rgr->interm_ranges;
671 if(ts - m_columns_check_ts < cfg_column_health_chkers_delay->get()) {
679 Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
680 Task(
const Task&) =
delete;
684 void operator()() { ptr->run(); }
713 it->reset(
new Ranger(*removing.get(), endpoints));
714 (rgr = *it)->init_queue();
738 if(nxt == h->rgrid) {
771 for(
auto it = hosts.
cbegin(); it != hosts.
cend(); ) {
774 it += n > 1000 ? 1000 : n;
SWC_CAN_INLINE iterator erase(size_type offset) noexcept(_NoExceptMoveAssign &&_NoExceptDestructor)
const Config::Property::Value_int32_g::Ptr cfg_chk_assign
constexpr SWC_CAN_INLINE reference front() noexcept
constexpr SWC_CAN_INLINE void stop() noexcept
static constexpr const cid_t NO_CID
#define SWC_LOGF(priority, fmt,...)
std::shared_ptr< Schema > Ptr
SWC_CAN_INLINE uint16_t get() const noexcept
#define SWC_LOG_OUT(pr, _code_)
void health_check_columns()
std::shared_ptr< ColumnCompact > Ptr
std::shared_ptr< ColumnDelete > Ptr
static SWC_CAN_INLINE void post(T_Handler &&handler)
void column_compact(const Column::Ptr &col)
std::shared_ptr< ColumnHealthCheck > Ptr
Ranger::Ptr rgr_set(const Comm::EndPoints &endpoints, rgrid_t opt_rgrid=0)
constexpr SWC_CAN_INLINE bool is_data(cid_t cid) noexcept
bool active(cid_t &cid_begin, cid_t &cid_end) noexcept
std::shared_ptr< IoContext > IoContextPtr
bool add_and_more(rgrid_t rgrid, int err, const Comm::Protocol::Rgr::Params::Report::RspRes &rsp)
static SWC_CAN_INLINE Manager::Columns * columns() noexcept
void _changes(const RangerList &hosts, bool sync_all=false)
bool update(const Column::Ptr &col, const DB::Schema::Ptr &schema, uint64_t req_id, bool ack_required)
rgrid_t rgr_had_id(rgrid_t rgrid, const Comm::EndPoints &endpoints)
SWC_CAN_INLINE void unlock(const bool &support) noexcept
SWC_CAN_INLINE int32_t get() const noexcept
bool SWC_PURE_FUNC has_endpoint(const EndPoint &e1, const EndPoints &endpoints_in) noexcept
std::shared_ptr< Range > Ptr
int64_t m_columns_check_ts
static SWC_CAN_INLINE bool is_shuttingdown() noexcept
Core::MutexSptd m_mutex_columns_check
void changes(const RangerList &rangers, RangerList &changed)
void need_health_check(const Column::Ptr &col)
std::shared_ptr< RgrUpdate > Ptr
Rangers & operator=(const Rangers &)=delete
void range_loaded(Ranger::Ptr rgr, Range::Ptr range, int64_t revision, int err, bool failure=false, bool verbose=true)
void assigned(rgrid_t rgrid, size_t num, Core::Vector< Range::Ptr > &ranges)
constexpr SWC_CAN_INLINE T add_rslt(T v) noexcept
Core::StateRunning m_assign
constexpr SWC_CAN_INLINE void store(T v) noexcept
const uint8_t SHUTTINGDOWN
constexpr SWC_CAN_INLINE bool empty() const noexcept
rgrid_t rgr_set_id(const Comm::EndPoints &endpoints, rgrid_t opt_rgrid=0)
std::shared_ptr< ColumnUpdate > Ptr
const Config::Property::Value_int32_g::Ptr cfg_delay_rgr_chg
#define SWC_LOG(priority, message)
void changes(const RangerList &hosts, bool sync_all=false)
void rgr_list(const rgrid_t rgrid, RangerList &rangers)
bool rgr_ack_id(rgrid_t rgrid, const Comm::EndPoints &endpoints)
The SWC-DB C++ namespace 'SWC'.
static SWC_CAN_INLINE Manager::MngdColumns * mngd_columns() noexcept
void set_rgr_unassigned(rgrid_t rgrid)
void rgr_report(rgrid_t rgrid, int err, const Comm::Protocol::Rgr::Params::Report::RspRes &rsp)
void stop(bool shuttingdown=true)
bool is_active_role(uint8_t role)
int64_t check(const RangerList &rangers)
std::shared_ptr< Ranger > Ptr
constexpr SWC_CAN_INLINE bool running() noexcept
void rgr_shutdown(rgrid_t rgrid, const Comm::EndPoints &endpoints)
const uint8_t MARKED_OFFLINE
Ranger::Ptr rgr_get(const rgrid_t rgrid)
RangersResources m_rangers_resources
void change_rgr(rgrid_t rgrid_old, rgrid_t rgrid)
void next_rgr(const Range::Ptr &range, Ranger::Ptr &rgr_set)
std::shared_ptr< RangeLoad > Ptr
const Config::Property::Value_uint16_g::Ptr cfg_rgr_failures
void schedule_check(uint32_t t_ms=10000)
const Config::Property::Value_int32_g::Ptr cfg_column_health_chkers
Comp from(const char **buf, uint32_t *remainp, uint8_t extended=0x00) noexcept
const Config::Property::Value_int32_g::Ptr cfg_column_health_chk
Column::Ptr get_column(int &err, const cid_t cid)
Column::Ptr get_need_health_check(int64_t ts, uint32_t ms)
static SWC_CAN_INLINE Manager::Schemas * schemas() noexcept
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
void print(std::ostream &out)
ColumnHealthChecks m_columns_check
SWC_CAN_INLINE int64_t now_ms() noexcept
const Config::Property::Value_int32_g::Ptr cfg_column_health_chkers_delay
static SWC_CAN_INLINE Manager::MngrRole * role() noexcept
void req_mngr_inchain(const Comm::client::ConnQueue::ReqBase::Ptr &req)
void wait_health_check(cid_t cid=DB::Schema::NO_CID)
Comm::EndPoints endpoints
Core::Atomic< int32_t > m_assignments
void health_check_finished(const ColumnHealthCheck::Ptr &chk)
std::shared_ptr< Column > Ptr
std::shared_ptr< ColumnsUnload > Ptr
Core::Vector< Ranger::Ptr > RangerList
void column_delete(const DB::Schema::Ptr &schema, uint64_t req_id, const Core::Vector< rgrid_t > &rgrids)
asio::high_resolution_timer m_timer
std::shared_ptr< RangeUnload > Ptr
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
SWC_CAN_INLINE void push_back(ArgsT &&... args)
void print(std::ostream &out, int err)
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
constexpr SWC_CAN_INLINE size_type size() const noexcept
bool try_full_lock(bool &support) noexcept
void update_status(const RangerList &new_rgr_status, bool sync_all)
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
Rangers(const Comm::IoContextPtr &app_io)
bool SWC_PURE_FUNC equal_endpoints(const EndPoints &endpoints1, const EndPoints &endpoints2) noexcept
const Config::Property::Value_int32_g::Ptr cfg_assign_due
constexpr SWC_CAN_INLINE iterator begin() noexcept