|
SWC-DB
v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
|
Go to the documentation of this file.
17 namespace SWC {
namespace Ranger {
23 typedef std::shared_ptr<MetaRegOnLoadReq>
Ptr;
45 Task(
Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
47 Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
48 Task(
const Task&) =
delete;
52 void operator()() { ptr->callback(); }
56 std::dynamic_pointer_cast<MetaRegOnLoadReq>(shared_from_this())));
70 case Range::State::NOTLOADED:
74 case Range::State::LOADED:
76 case Range::State::UNLOADING:
92 :
ptr(std::move(other.ptr)) {
108 :
ptr(std::move(other.ptr)) {
120 : cfg(a_cfg), rid(a_rid),
121 blocks(cfg->key_seq),
123 m_path(DB::RangeBase::get_path(cfg->cid, rid)),
124 m_mutex_intval(), m_mutex_intval_alignment(),
125 m_interval(cfg->key_seq), m_load_revision(0),
127 m_state(
State::NOTLOADED),
128 m_compacting(COMPACT_NONE), m_require_compact(false),
129 m_q_run_add(false), m_q_run_scan(false),
131 m_q_add(), m_q_scan(), m_cv() {
146 s.reserve(
m_path.length() + suff.length());
159 const csid_t csid)
const {
203 return m_state == State::LOADED &&
255 return m_state == State::LOADED;
292 while(check.pop(&req)) {
321 : ptr(std::move(a_ptr)), req(std::move(a_req)) {
322 ptr->blocks.processing_increment();
325 Task(Task&& other) noexcept
326 : ptr(std::move(other.ptr)), req(std::move(other.req)) {
328 Task(
const Task&) =
delete;
329 Task& operator=(Task&&) =
delete;
330 Task& operator=(
const Task&) =
delete;
333 ptr->blocks.scan(std::move(req));
334 ptr->blocks.processing_decrement();
339 if(!req->expired()) {
362 auto at(State::NOTLOADED);
407 typedef std::shared_ptr<SetRgr> Ptr;
412 : range(std::move(a_range)), req(a_req) { }
413 virtual ~SetRgr() noexcept { }
414 void callback()
override {
418 Task(SetRgr::Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
420 Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
421 Task(
const Task&) =
delete;
422 Task& operator=(Task&&) =
delete;
423 Task& operator=(
const Task&) =
delete;
426 int err = ptr->error();
428 ? ptr->range->loaded(err, ptr->req)
429 : ptr->range->load(err, ptr->req);
433 std::dynamic_pointer_cast<SetRgr>(shared_from_this())));
436 SetRgr::Ptr(
new SetRgr(shared_from_this(), req))
475 if(col && !col->removing())
484 return req->removed(shared_from_this());
500 hdlr->range->wait_queue();
501 hdlr->range->blocks.remove(err);
508 req->removed(hdlr->range);
537 std::this_thread::sleep_for(std::chrono::milliseconds(1));
617 if(
cfg->range_type == DB::Types::Range::MASTER) {
631 hdlr->column.add(cell);
645 aligned_min.
add(cid_f);
646 aligned_max.
add(cid_f);
661 wfields.
add(fid, key_end);
662 wfields.
add(fid, int64_t(
rid));
663 wfields.
add(++fid, aligned_min);
664 wfields.
add(++fid, aligned_max);
668 hdlr->column.add(cell);
678 hdlr->column.add(cell);
682 hdlr->commit(&hdlr->column);
733 on_change(
false, hdlr, w_chg_chk ? &old_key_begin :
nullptr);
757 for(
auto& cs : w_cellstores) {
760 cs->smartfd->filepath(),
773 #ifdef SWC_RANGER_WITH_RANGEDATA
789 #ifdef SWC_RANGER_WITH_RANGEDATA
798 cfg->print(out <<
'(');
823 return load(err, req);
844 typedef std::shared_ptr<GetRgr> Ptr;
849 noexcept : range(std::move(a_range)), req(a_req) { }
850 virtual ~GetRgr() noexcept { }
851 void callback()
override {
855 Task(GetRgr::Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
857 Task(Task&& other) noexcept : ptr(std::move(other.ptr)) { }
858 Task(
const Task&) =
delete;
859 Task& operator=(Task&&) =
delete;
860 Task& operator=(
const Task&) =
delete;
866 if(err && err != ENOKEY)
867 return ptr->range->loaded(err, ptr->req);
871 return ptr->range->load(err, ptr->req);
882 ptr->range, ptr->req)));
886 return ptr->range->internal_take_ownership(err, ptr->req);
890 std::dynamic_pointer_cast<GetRgr>(shared_from_this())));
893 GetRgr::Ptr(
new GetRgr(shared_from_this(), req))->scan(
cfg->cid,
rid);
904 bool is_initial_column_range =
false;
905 #ifdef SWC_RANGER_WITH_RANGEDATA
919 is_initial_column_range =
true;
940 if(is_initial_column_range) {
941 #ifdef SWC_RANGER_WITH_RANGEDATA
947 if(
cfg->range_type == DB::Types::Range::MASTER)
957 int err = hdlr->state_error;
962 return loaded(err, hdlr->req);
969 <<
"Range MetaData missing cid=" <<
cfg->cid <<
" rid=" <<
rid;
980 hdlr->get_cells(cells);
983 bool synced = cells.size() == 1;
987 <<
"Range MetaData DUPLICATE-RID cid=" <<
cfg->cid <<
" rid=" <<
rid;
995 auto& cell = *cells[0];
1008 cell.get_value(v,
false);
1009 const uint8_t* ptr = v.
base;
1010 size_t remain = v.
size;
1043 return loaded(err, hdlr->req);
1046 for(
auto cell : cells) {
1049 updater->column.add(*cell);
1090 [
this, from_state]() {
1111 typedef std::shared_ptr<MetaRegOnAddReq>
Ptr;
1146 range->blocks.commitlog.commit();
1147 range->blocks.processing_decrement();
1154 const uint64_t ttl =
cfg->cell_ttl();
1163 auto check(std::move(
m_q_add));
1165 if(req->expired()) {
1168 req->_other =
nullptr;
1199 const uint8_t* buf = req->
ev->data_ext.base;
1200 size_t remain = req->
ev->data_ext.size;
1201 bool aligned_chg =
false;
1205 cell.read(&buf, &remain,
false);
1228 if(cell.has_expired(ttl))
1233 ? cell.set_revision(ts)
1234 : cell.set_timestamp_with_rev_is_ts(ts);
1238 switch(
cfg->range_type) {
void apply_new(int &err, CellStore::Writers &w_cellstores, CommitLog::Fragments::Vec &fragments_old, const Query::Update::BaseMeta::Ptr &hdlr=nullptr)
std::string get_path_cs_on(const std::string folder, const csid_t csid) const
void save(int &err, CellStore::Readers &cellstores)
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
SWC_CAN_INLINE uint8_t get() const noexcept
static const std::string get_path_cs(const std::string &range_path, const std::string &folder, const csid_t csid)
SWC_CAN_INLINE bool push_and_is_1st(const ItemT &item)
void add(Callback::RangeQueryUpdate *req)
SWC_CAN_INLINE void set_value(uint8_t *v, uint32_t len, bool owner)
void loaded(int err, const Callback::RangeLoad::Ptr &req)
Range(const ColumnCfg::Ptr &cfg, const rid_t rid)
bool align(const Types::KeySeq seq, const Cell::Key &key, Cell::KeyVec &start, Cell::KeyVec &finish) SWC_ATTRIBS((SWC_ATTRIB_O3))
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
#define SWC_LOGF(priority, fmt,...)
void internal_take_ownership(int &err, const Callback::RangeLoad::Ptr &req)
static constexpr const char LOG_TMP_DIR[]
#define SWC_LOG_OUT(pr, _code_)
SWC_CAN_INLINE int64_t now_ns() noexcept
Core::QueueSafe< ReqScan::Ptr > m_q_scan
TaskRunQueueScan & operator=(const TaskRunQueueScan &)=delete
void exists(const ConnHandlerPtr &conn, const Event::Ptr &ev)
std::string get_path(const std::string suff) const
static SWC_CAN_INLINE DB::RgrData * rgr_data() noexcept
void expand_and_align(bool w_chg_chk, const Query::Update::BaseMeta::Ptr &hdlr)
static SWC_CAN_INLINE int64_t in_process_ranges() noexcept
const char * get_text(const int err) noexcept
void remove(const uint32_t idx)
SWC_CAN_INLINE bool align(const Interval &other)
SWC_CAN_INLINE void free() noexcept
TaskRunQueueScan(const TaskRunQueueScan &)=delete
constexpr SWC_CAN_INLINE bool is_data(cid_t cid) noexcept
void expand_and_align(DB::Cells::Interval &intval)
static SWC_CAN_INLINE Ranger::Columns * columns() noexcept
std::shared_ptr< Base > Ptr
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
DB::Cell::KeyVec aligned_max
void print(std::ostream &out) const
constexpr SWC_CAN_INLINE void copy(const Timestamp &other) noexcept
SWC_CAN_INLINE void insert(uint32_t idx, const std::string &fraction)
static const uint8_t COMPACT_CHECKING
void remove_rgr(int &err) const noexcept
void check_meta(const Query::Select::CheckMeta::Ptr &hdlr)
void load(const Callback::RangeLoad::Ptr &req)
const SWC::Config::Property::Value_uint8_g::Ptr cfg_req_add_concurrency
SWC_CAN_INLINE void ensure(size_t len)
bool _is_any_begin() const
void schema_update(bool compact)
constexpr SWC_CAN_INLINE bool equal(const Key &other) const noexcept
ColumnPtr get_column(const cid_t cid)
void load(int &err, CellStore::Readers &cellstores)
TaskRunQueueAdd & operator=(const TaskRunQueueAdd &)=delete
bool wait(uint8_t from_state=COMPACT_CHECKING, bool incr=false)
Core::AtomicBool stopping
void print(std::ostream &out)
bool SWC_PURE_FUNC has_endpoint(const EndPoint &e1, const EndPoints &endpoints_in) noexcept
size_t size_bytes_total(bool only_loaded=false)
void processing_increment()
SWC_CAN_INLINE void add(Field *field)
Core::MutexAtomic m_mutex_intval_alignment
CommitLog::Fragments commitlog
Core::QueuePointer< Callback::RangeQueryUpdate * > m_q_add
void decode(const uint8_t **bufp, size_t *remainp, bool owner)
SWC_CAN_INLINE void add(const std::string_view &fraction)
static Read::Ptr create_initial(int &err, const RangePtr &range)
void init(const RangePtr &for_range)
void internal_create_folders(int &err)
size_t size_bytes(bool only_loaded=false) const
Base & operator=(const Base &)=delete
void scan(ReqScan::Ptr &&req)
void internal_create(int &err, const CellStore::Writers &w_cellstores)
static const uint8_t COMPACT_PREPARING
SWC_CAN_INLINE void push(const ItemT &item)
~TaskRunQueueScan() noexcept
SWC_CAN_INLINE bool empty() noexcept
void get_interval(DB::Cells::Interval &interval)
static void get_rgr(DB::RgrData &data, const std::string &filepath) noexcept
constexpr SWC_CAN_INLINE bool empty() const noexcept
SWC_CAN_INLINE bool SWC_PURE_FUNC equal(const Interval &other) const noexcept
std::shared_ptr< RangeUnloadInternal > Ptr
constexpr SWC_CAN_INLINE void store(T v) noexcept
constexpr SWC_CAN_INLINE bool empty() const noexcept
std::shared_ptr< Range > RangePtr
void processing_decrement()
Comm::Protocol::Rgr::Params::RangeQueryUpdateRsp rsp
DB::Cell::KeyVec aligned_min
DB::Cell::Key range_prev_end
The SWC-DB C++ namespace 'SWC'.
Core::Atomic< State > m_state
SWC_CAN_INLINE TaskRunQueueScan(TaskRunQueueScan &&other) noexcept
static constexpr const char CELLSTORES_BAK_DIR[]
void reset_load_revision()
static Ptr make(int &err, const csid_t csid, const RangePtr &range, bool chk_base=false)
void internal_remove(int &err)
void load_from_path(int &err)
TaskRunQueueAdd(const TaskRunQueueAdd &)=delete
DB::Cells::Interval m_interval
void print(std::ostream &out, bool minimal=true)
static void remove(int &err, cid_t cid, rid_t rid) noexcept
void last_rgr_chk(int &err, const Callback::RangeLoad::Ptr &req)
void copy(const Key &other)
void move_from(int &err, Readers::Vec &mv_css)
static constexpr const char CELLSTORES_DIR[]
SWC_CAN_INLINE TaskRunQueueAdd(TaskRunQueueAdd &&other) noexcept
static constexpr const char LOG_DIR[]
static std::string get_path_ranger(const std::string &range_path)
uint24_t known_interval_count()
constexpr SWC_CAN_INLINE void set_time_order_desc(bool desc) noexcept
bool state_unloading() const noexcept
constexpr SWC_CAN_INLINE uint8_t encoded_length_vi64(uint64_t val) noexcept
constexpr const uint8_t HAVE_TIMESTAMP
constexpr uint32_t encoded_length() const noexcept
void scan(ReqScan::Ptr req, Block::Ptr blk_ptr=nullptr)
std::shared_mutex m_mutex
void internal_unload(bool completely, bool &chk_empty)
static const uint8_t COMPACT_NONE
constexpr SWC_CAN_INLINE T exchange(T value) noexcept
const char *SWC_CONST_FUNC to_string(Range::State state) noexcept
std::string get_path_cs(const csid_t csid) const
void scan_internal(ReqScan::Ptr &&req)
void apply_new(int &err, CellStore::Writers &w_cellstores, CommitLog::Fragments::Vec &fragments_old)
virtual int error() noexcept
static const uint8_t COMPACT_APPLYING
CellStore::Readers cellstores
void on_change(bool removal, const Query::Update::BaseMeta::Ptr &hdlr, const DB::Cell::Key *old_key_begin=nullptr)
bool processing() noexcept
constexpr SWC_CAN_INLINE size_t fill() const noexcept
static void set_rgr(const DB::RgrData &data, int &err, std::string &&filepath, uint8_t replication) noexcept
void copy(const Interval &other)
Core::MutexAtomic m_mutex_intval
constexpr SWC_CAN_INLINE T load() const noexcept
std::shared_ptr< ColumnDelete > Ptr
SWC_CAN_INLINE void skip_type_and_id(const uint8_t **bufp, size_t *remainp)
void copy(const KeyVec &other)
static SWC_CAN_INLINE bool is_shuttingdown() noexcept
Core::Atomic< int64_t > m_load_revision
SWC_CAN_INLINE TaskRunQueueAdd(const RangePtr &a_ptr) noexcept
void remove(const Callback::ColumnDelete::Ptr &req)
constexpr SWC_CAN_INLINE uint64_t decode_vi64(const uint8_t **bufp, size_t *remainp)
void expand(DB::Cells::Interval &intval)
DB::Cell::Key prev_range_end
void set_rgr(int &err) const noexcept
TaskRunQueueScan & operator=(TaskRunQueueScan &&)=delete
Specs::Timestamp ts_earliest
~TaskRunQueueAdd() noexcept
void state(int &err) const
SWC_CAN_INLINE void response()
bool equal(const KeyVec &other) const noexcept
Core::AtomicBool m_require_compact
void compact_require(bool require)
void print(std::ostream &out, Types::Column col_type=Types::Column::PLAIN, bool with_cells=false) const
std::shared_ptr< RangeLoad > Ptr
bool compacting_ifnot_applying(uint8_t state)
void set_state(State new_state)
static SWC_CAN_INLINE bool is_not_accepting() noexcept
Comm::EndPoints endpoints
int64_t get_load_revision()
void print(std::ostream &out) const
SWC_CAN_INLINE TaskRunQueueScan(RangePtr &&a_ptr) noexcept
void add_logged(const DB::Cells::Cell &cell)
void remove(uint32_t idx, bool recursive=false)
void print(std::ostream &out, bool minimal)
std::condition_variable_any m_cv
static SWC_CAN_INLINE void post(T_Handler &&handler)
TaskRunQueueAdd & operator=(TaskRunQueueAdd &&)=delete
std::shared_ptr< RangeUnload > Ptr
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
SWC_CAN_INLINE bool expired() const
void print(std::ostream &out, int err)
constexpr SWC_CAN_INLINE T fetch_add(T v) noexcept
bool compact_possible(bool minor=true)
Specs::Timestamp ts_latest
constexpr SWC_CAN_INLINE size_type size() const noexcept
constexpr SWC_CAN_INLINE bool is_rgr_data_on_fs(cid_t cid) noexcept
static void get_rgr(const SyncSelector::Ptr &hdlr, RgrData &data, cid_t cid, rid_t rid) noexcept
std::shared_ptr< ReqScan > Ptr
#define SWC_LOG_CURRENT_EXCEPTION(_s_)
void _get_interval(DB::Cells::Interval &interval) const
Core::Atomic< uint32_t > m_adding
void set_rgr(int &err, cid_t cid, rid_t rid) noexcept
void get_prev_key_end(uint32_t idx, DB::Cell::Key &key) const
SWC_CAN_INLINE void reserve(size_type cap)
bool SWC_PURE_FUNC equal_endpoints(const EndPoints &endpoints1, const EndPoints &endpoints2) noexcept
Condition::Comp compare(const Types::KeySeq seq, const Cell::Key &key, const Cell::Key &other) SWC_ATTRIBS((SWC_ATTRIB_O3))
void decode(const uint8_t **bufp, size_t *remainp)
static SWC_CAN_INLINE Rgr * get() noexcept
TaskRunQueueAdd(RangePtr &&a_ptr) noexcept
SWC_CAN_INLINE bool pop(ItemT *item)