 |
SWC-DB
v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
|
Go to the documentation of this file.
17 namespace SWC {
namespace Manager {
23 : cfg_schema_replication(
24 Env::Config::settings()->get<Config::Property::Value_uint8_g>(
25 "swc.mngr.schema.replication")),
26 cfg_schemas_store_cap(
27 Env::Config::settings()->get<Config::Property::Value_uint64_g>(
28 "swc.mngr.schemas.store.from.capacity")),
29 cfg_schemas_store_blksz(
30 Env::Config::settings()->get<Config::Property::Value_int32_g>(
31 "swc.mngr.schemas.store.block.size")),
32 cfg_schemas_store_encoder(
33 Env::Config::settings()->get<Config::Property::Value_enum_g>(
34 "swc.mngr.schemas.store.block.encoder")),
36 Env::Config::settings()->get<Config::Property::Value_int32_g>(
37 "swc.mngr.ranges.assign.delay.afterColumnsInit")),
38 m_run(true), m_columns_load(),
39 m_mutex_schemas(), m_schemas_set(false),
40 m_mutex_active(), m_cid_active(false),
41 m_cid_begin(DB::Schema::NO_CID), m_cid_end(DB::Schema::NO_CID),
42 m_expected_remain(STATE_COLUMNS_NOT_INITIALIZED),
43 m_last_used_cid(DB::Types::SystemColumn::
SYS_CID_END),
45 m_mutex_actions(), m_actions(), m_actions_pending() {
55 bool expecting =
false;
57 for(
size_t c=0; ; ++c) {
67 if(!expecting && !pending)
70 std::this_thread::sleep_for(std::chrono::milliseconds(10));
73 "Stop-Waiting colums load=%s actions=" SWC_FMT_LU
76 expecting ?
"True" :
"False", pending);
141 return m_cid_active && cid &&
142 (!m_cid_begin || m_cid_begin <= cid) &&
143 (!m_cid_end || m_cid_end >= cid);
149 cid_begin = m_cid_begin;
250 cids_t&& columns,
bool initial) {
264 cid_begin, cid_end, total, std::move(columns))
275 for(
auto cid : columns) {
278 !_cols->get_column(err =
Error::OK, cid)) {
292 "Expected Columns to Load CANCELLED for cid(begin=" SWC_FMT_LU
299 need, total, cid_begin, cid_end);
306 uint64_t req_id,
bool initial) {
310 if(!initial && schemas_mngr)
315 return update(func, schema, err, req_id);
319 <<
" req_id=" << req_id;
323 bool do_update =
false;
326 case ColumnMngFunc::REMOVE: {
335 remove(schema, 0, req_id);
338 }
else if(!col->do_remove()) {
344 col->assigned(rgrids);
345 do_update = rgrids.
empty();
347 remove(schema, 0, req_id);
354 case ColumnMngFunc::CREATE:
355 case ColumnMngFunc::INTERNAL_LOAD: {
358 if(init || !err || !
m_run) {
381 case ColumnMngFunc::MODIFY: {
385 if(col && !schemas_mngr) {
387 if((do_update = !existing || !existing->equal(schema)))
390 do_update = col && do_update &&
403 :
update(co_func, schema, err, req_id);
408 rgrid_t rgrid, uint64_t req_id) {
412 if(col && !col->finalize_remove(err, rgrid))
424 ColumnMngFunc::INTERNAL_ACK_DELETE, schema,
Error::OK, req_id);
428 [
this, req_id, schema]
436 int err = _hdlr->state_error;
438 auto col = _hdlr->get_columnn(meta_cid);
439 if(!(err = col->error()) && !col->empty())
440 col->get_cells(cells_meta);
443 if(!col->error() && !col->empty())
444 col->get_cells(cells_rgrdata);
449 "Range MetaData might remained, result-err=%d(%s)",
452 if(!cells_meta.empty() || !cells_rgrdata.empty()) {
455 <<
" deleted Ranges MetaData:\n meta_cid=" << meta_cid
456 <<
" remained(" << cells_meta.size() <<
") cells=[";
457 for(
auto cell : cells_meta)
460 <<
" remained(" << cells_rgrdata.size() <<
") cells=[";
461 for(
auto cell : cells_rgrdata)
467 ColumnMngFunc::INTERNAL_ACK_DELETE, schema,
Error::OK, req_id);
474 spec.set_opt__deleting();
475 spec.flags.set_only_keys();
476 auto& key_intval = spec.key_intervals.add();
477 key_intval.start.reserve(2);
484 hdlr->scan(schema->col_seq, meta_cid, std::move(spec));
486 hdlr->completion.increment();
487 hdlr->scan(schema->col_seq, meta_cid, spec);
488 spec.values.col_type = DB::Types::Column::PLAIN;
490 DB::Types::KeySeq::VOLUME,
494 if(hdlr->completion.is_last())
537 if(std::find(entries.
cbegin(), entries.
cend(), cid) == entries.
cend()) {
546 auto it = entries.
cbegin();
560 uint8_t a_replicas) noexcept
561 : ptr(a_ptr), pending(a_pending),
562 it_begin(a_it_begin), it_end(a_it_end),
563 replicas(a_replicas) {
568 for(
cid_t cid; ptr->
m_run && it_begin != it_end; ++it_begin) {
583 it_to = entries.
cend() - it > vol ? (it + vol) : entries.
cend();
587 }
while(
m_run && it_to != entries.
cend());
611 std::this_thread::sleep_for(std::chrono::milliseconds(100));
629 if(!
m_run || groups.empty() || entries.
empty()) {
634 std::sort(groups.begin(), groups.end(), []
636 return (!g1->cid_begin || g1->cid_begin < g2->cid_begin) &&
637 (g2->cid_end && g1->cid_end < g2->cid_end); });
640 std::this_thread::sleep_for(std::chrono::milliseconds(100));
644 return s1->cid < s2->cid; });
646 auto it = entries.
cbegin();
649 for(
auto& g : groups) {
658 itc != entries.
cend() &&
659 (!g->cid_begin || g->cid_begin <= (*itc)->cid) &&
660 (!g->cid_end || g->cid_end >= (*itc)->cid); ++itc) {
669 for(;it != entries.
cend() && columns.
size() < 1000 &&
670 (!g->cid_begin || g->cid_begin <= (*it)->cid) &&
671 (!g->cid_end || g->cid_end >= (*it)->cid); ++it) {
674 if(++g_batches > 1 && columns.
empty())
677 int64_t sz = columns.
size();
681 g->cid_begin, g->cid_end, g_batches, sz, total);
682 set_expect(g->cid_begin, g->cid_end, total, std::move(columns),
true);
685 for(; it_batch != it; ++it_batch) {
687 "Set Expected Load cid(" SWC_FMT_LU ")", (*it_batch)->cid);
689 ColumnMngFunc::INTERNAL_LOAD, *it_batch,
Error::OK, 0,
true);
694 g->cid_begin, g->cid_end,
700 if(it != entries.
cend() && sz == 1000)
745 schema->cell_versions = 1;
748 schema_save->cid = cid;
749 if(!schema_save->revision)
768 if(old->col_seq != schema->col_seq ||
778 schema->cell_versions = 1;
782 schema->cell_versions = 1;
783 schema->cell_ttl = 0;
788 schema_save->cid = old->cid;
789 if(!schema_save->revision)
794 if(schema_save->equal(old,
false))
823 <<
" req_id=" << req_id;
829 case ColumnMngFunc::INTERNAL_LOAD_ALL: {
833 case ColumnMngFunc::INTERNAL_ACK_LOAD:
838 ColumnMngFunc::INTERNAL_LOAD, existing,
Error::OK, req_id,
true);
841 case ColumnMngFunc::INTERNAL_ACK_CREATE:
842 case ColumnMngFunc::INTERNAL_ACK_MODIFY:
844 case ColumnMngFunc::INTERNAL_ACK_DELETE: {
884 pending = std::move(it->second);
890 pending->response(err);
894 <<
"Missing Pending Req-Ack for func=" << co_func
895 <<
" req_id=" << req_id;
916 }
else if(req->expired(1000)) {
919 }
else if(req->schema->col_name.empty()) {
929 req->schema->col_name);
936 if(!err)
switch(req->function) {
937 case ColumnMngFunc::CREATE: {
939 err = req->schema->cid == schema->cid
946 case ColumnMngFunc::MODIFY: {
950 schema->cid != req->schema->cid)
953 update(err, req->schema, schema);
956 case ColumnMngFunc::REMOVE: {
959 else if(schema->cid != req->schema->cid ||
965 req->schema = schema;
std::unordered_map< uint64_t, ColumnReq::Ptr > m_actions_pending
static void columns_by_fs(int &err, FS::IdEntries_t &entries)
void add(int &err, const Schema::Ptr &schema)
bool is_schemas_mngr(int &err)
SWC_CAN_INLINE uint8_t get() const noexcept
Core::Atomic< cid_t > m_last_used_cid
constexpr SWC_CAN_INLINE void stop() noexcept
static SWC_CAN_INLINE Ptr make(const Clients::Ptr &clients, Cb_t &&cb=nullptr, bool rsp_partials=false, const Comm::IoContextPtr &io=nullptr, Clients::Flag executor=Clients::DEFAULT)
SWC_CAN_INLINE bool push_and_is_1st(const ItemT &item)
Core::StateRunning m_columns_load
const Config::Property::Value_int32_g::Ptr cfg_delay_cols_init
Column::Ptr get_column(int &err, cid_t cid)
std::shared_ptr< Common > Ptr
@ COLUMN_SCHEMA_NAME_NOT_EXISTS
SWC_CAN_INLINE uint32_t concurrency() const noexcept
SWC_CAN_INLINE uint64_t get() const noexcept
bool has_active() noexcept
std::shared_ptr< ColumnReq > Ptr
void set_expect(cid_t cid_begin, cid_t cid_end, uint64_t total, cids_t &&columns, bool initial)
void print(std::ostream &out)
Core::AtomicBool m_schemas_set
static constexpr const cid_t NO_CID
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
#define SWC_LOGF(priority, fmt,...)
bool is_active(cid_t cid) noexcept
std::shared_ptr< Schema > Ptr
SWC_CAN_INLINE void clear() noexcept(_NoExceptDestructor)
#define SWC_LOG_OUT(pr, _code_)
SWC_CAN_INLINE int64_t now_ns() noexcept
void create(int &err, DB::Schema::Ptr &schema)
static SWC_CAN_INLINE void post(T_Handler &&handler)
@ COLUMN_SCHEMA_IS_SYSTEM
@ COLUMN_SCHEMA_NOT_DIFFERENT
const char * get_text(const int err) noexcept
void change_active(const cid_t cid_begin, const cid_t cid_end, bool has_cols)
void load(int &err, const std::string &filepath, DB::Schema::Ptr &schema)
Core::MutexSptd m_mutex_active
void store_create(int &err, uint8_t replicas, uint32_t blksz, const DB::Types::Encoder cfg_encoder)
bool active(cid_t &cid_begin, cid_t &cid_end) noexcept
static SWC_CAN_INLINE Manager::Columns * columns() noexcept
static bool remove(int &err, const cid_t cid)
void remove(const cid_t cid)
void update_status(Comm::Protocol::Mngr::Params::ColumnMng::Function func, const DB::Schema::Ptr &schema, int err, uint64_t req_id, bool initial=false)
bool update(const Column::Ptr &col, const DB::Schema::Ptr &schema, uint64_t req_id, bool ack_required)
std::shared_ptr< Group > Ptr
SWC_CAN_INLINE int32_t get() const noexcept
const value_type * const_iterator
@ COLUMN_SCHEMA_NAME_EMPTY
constexpr SWC_CAN_INLINE bool is_master(cid_t cid) noexcept
void remove(const DB::Schema::Ptr &schema, rgrid_t rgrid, uint64_t req_id)
void print(std::ostream &out)
void columns_ready(int &err)
bool str_eq(const char *s1, const char *s2) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
SWC_CAN_INLINE ItemT & front() noexcept
constexpr SWC_CAN_INLINE Range get_range_type(cid_t cid) noexcept
SWC_CAN_INLINE bool empty() noexcept
constexpr SWC_CAN_INLINE T add_rslt(T v) noexcept
std::forward_list< cid_t > m_expected_load
constexpr SWC_CAN_INLINE void store(T v) noexcept
constexpr SWC_CAN_INLINE bool empty() const noexcept
SWC_CAN_INLINE bool pop_and_more()
Schema::Ptr get(cid_t cid) noexcept
#define SWC_LOG(priority, message)
void all(SchemasVec &entries)
constexpr SWC_CAN_INLINE iterator end() noexcept
const Config::Property::Value_uint8_g::Ptr cfg_schema_replication
The SWC-DB C++ namespace 'SWC'.
const Config::Property::Value_enum_g::Ptr cfg_schemas_store_encoder
bool is_an_initialization(int &err, const DB::Schema::Ptr &schema)
void replace(const Schema::Ptr &schema)
const Config::Property::Value_int32_g::Ptr cfg_schemas_store_blksz
static SWC_CAN_INLINE Ptr make()
bool SWC_CONST_FUNC is_counter(const Column typ) noexcept
uint64_t m_expected_remain
void reset(bool schemas_mngr)
Comm::Protocol::Mngr::Params::ColumnMng::Function ColumnMngFunc
bool is_active_role(uint8_t role)
static SWC_CAN_INLINE System::Resources & res() noexcept
void sort(const Networks &nets, const EndPoints &endpoints, EndPoints &sorted)
constexpr SWC_CAN_INLINE bool running() noexcept
uint8_t SWC_CONST_FUNC get_sys_cid(KeySeq col_seq, Range col_type) noexcept
@ COLUMN_CHANGE_INCOMPATIBLE
@ COLUMN_SCHEMA_NAME_NOT_CORRES
void create_schemas_store()
void schedule_check(uint32_t t_ms=10000)
void update(Comm::Protocol::Mngr::Params::ColumnMng::Function func, const DB::Schema::Ptr &schema, int err, uint64_t req_id)
Column::Ptr get_column(int &err, const cid_t cid)
void save_with_validation(int &err, const DB::Schema::Ptr &schema_save, uint8_t replication)
Core::MutexSptd m_mutex_actions
static SWC_CAN_INLINE Manager::Schemas * schemas() noexcept
@ COLUMN_SCHEMA_NAME_EXISTS
constexpr static uint64_t STATE_COLUMNS_NOT_INITIALIZED
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
const Config::Property::Value_uint64_g::Ptr cfg_schemas_store_cap
SWC_CAN_INLINE size_t size() noexcept
bool expected_ready() noexcept
void print(std::ostream &out, Types::Column col_type=Types::Column::PLAIN, bool with_cells=false) const
static SWC_CAN_INLINE Manager::MngrRole * role() noexcept
void req_mngr_inchain(const Comm::client::ConnQueue::ReqBase::Ptr &req)
void wait_health_check(cid_t cid=DB::Schema::NO_CID)
Core::MutexSptd m_mutex_schemas
std::shared_ptr< Column > Ptr
static bool create(int &err, const cid_t cid)
static SWC_CAN_INLINE Comm::IoContextPtr io() noexcept
void column_delete(const DB::Schema::Ptr &schema, uint64_t req_id, const Core::Vector< rgrid_t > &rgrids)
@ COLUMN_SCHEMA_ID_EXISTS
SWC_CAN_INLINE void push_back(ArgsT &&... args)
void print(std::ostream &out, int err)
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Core::QueueSafe< ColumnReq::Ptr > m_actions
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
constexpr SWC_CAN_INLINE size_type size() const noexcept
constexpr SWC_CAN_INLINE bool is_rgr_data_on_fs(cid_t cid) noexcept
#define SWC_LOG_CURRENT_EXCEPTION(_s_)
std::shared_ptr< ColumnUpdate > Ptr
@ COLUMN_REACHED_ID_LIMIT
static SWC_CAN_INLINE Manager::Rangers * rangers() noexcept
SWC_CAN_INLINE void reserve(size_type cap)
SWC_CAN_INLINE int32_t get() const noexcept
void update_status_ack(Comm::Protocol::Mngr::Params::ColumnMng::Function func, const DB::Schema::Ptr &schema, int err, uint64_t req_id)
void action(const ColumnReq::Ptr &req)
constexpr SWC_CAN_INLINE iterator begin() noexcept