SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
RangeSplit.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #ifndef swcdb_ranger_db_RangeSplit_h
8 #define swcdb_ranger_db_RangeSplit_h
9 
10 
12 
13 
14 namespace SWC { namespace Ranger {
15 
16 
17 static void mngr_remove_range(const RangePtr& new_range) {
19  struct ReqData {
20  const RangePtr& new_range;
23  ReqData(const RangePtr& a_new_range, Core::StateSynchronization& a_res)
24  noexcept : new_range(a_new_range), res(a_res) {
25  }
27  cid_t get_cid() const noexcept {
28  return new_range->cfg->cid;
29  }
31  client::Clients::Ptr& get_clients() noexcept {
32  return Env::Clients::get();
33  }
35  bool valid() noexcept {
37  }
39  void callback(
42 
44  "Mngr::Req::RangeRemove err=%d(%s) " SWC_FMT_LU "/" SWC_FMT_LU,
45  rsp.err, Error::get_text(rsp.err),
46  new_range->cfg->cid, new_range->rid);
47 
48  if(rsp.err && valid() &&
52  req->request_again();
53  } else {
54  res.acknowledge();
55  }
56  }
57  };
60  new_range->cfg->cid, new_range->rid),
61  10000,
62  new_range, res
63  );
64  res.wait();
65 }
66 
67 
68 
69 class RangeSplit final {
70  public:
71 
72 
73  RangeSplit(const RangePtr& a_range, const size_t a_split_at)
74  : range(a_range), split_at(a_split_at) {
76  "COMPACT-SPLIT RANGE START " SWC_FMT_LU "/" SWC_FMT_LU
77  " at=" SWC_FMT_LU,
78  range->cfg->cid, range->rid, split_at);
79  }
80 
81  RangeSplit(const RangeSplit&) = delete;
82 
83  RangeSplit(const RangeSplit&&) = delete;
84 
85  RangeSplit& operator=(const RangeSplit&) = delete;
86 
87  ~RangeSplit() noexcept { }
88 
89  int run() {
90  Time::Measure_ns t_measure;
91  int err = Error::OK;
92 
93  rid_t new_rid = 0;
94  mngr_create_range(err, new_rid);
95  if((err && err != Error::COLUMN_NOT_READY) || !new_rid)
96  return err;
97 
98  ColumnPtr col = Env::Rgr::columns()->get_column(range->cfg->cid);
99  if(!col || col->removing())
100  return Error::CANCELLED;
101 
103  "COMPACT-SPLIT RANGE " SWC_FMT_LU "/" SWC_FMT_LU " new-rid=" SWC_FMT_LU,
104  range->cfg->cid, range->rid, new_rid);
105 
106  auto new_range = col->internal_create(err, new_rid, true);
107  if(!err)
108  new_range->internal_create_folders(err);
109 
110  if(err) {
112  "COMPACT-SPLIT RANGE cancelled err=%d " SWC_FMT_LU "/" SWC_FMT_LU
113  " new-rid=" SWC_FMT_LU,
114  err, range->cfg->cid, range->rid, new_rid);
115  int tmperr = Error::OK;
116  new_range->compacting(Range::COMPACT_NONE);
117  col->internal_remove(tmperr, new_rid);
118  mngr_remove_range(new_range);
119  return err;
120  }
121 
123 
124  auto it = range->blocks.cellstores.cbegin() + split_at;
125  mv_css.assign(it, range->blocks.cellstores.cend());
126 
127  new_range->internal_create(err, mv_css);
128  if(!err) {
129  for(; it != range->blocks.cellstores.cend(); ) {
130  delete *it;
131  range->blocks.cellstores.erase(it);
132  }
133  } else {
134  int tmperr = Error::OK;
135  new_range->compacting(Range::COMPACT_NONE);
136  col->internal_remove(tmperr, new_rid);
137  mngr_remove_range(new_range);
138  return err;
139  }
140 
141  if(range->blocks.commitlog.cells_count()) {
142  CommitLog::Fragments::Vec fragments_old;
143  range->blocks.commitlog.get(fragments_old); // fragments for removal
144 
145  CommitLog::Splitter splitter(
146  range->blocks.cellstores.back()->interval.key_end,
147  fragments_old,
148  &range->blocks.commitlog,
149  &new_range->blocks.commitlog
150  );
151 
152  splitter.run();
153  range->blocks.commitlog.remove(err, fragments_old);
154 
155  range->blocks.commitlog.commit_finalize();
156  new_range->blocks.commitlog.commit_finalize();
157  }
158 
160  new_range->expand_and_align(false, Query::Update::CommonMeta::make(
161  new_range,
162  [this, col, await=&res]
163  (const Query::Update::CommonMeta::Ptr& hdlr) {
165  "COMPACT-SPLIT RANGE " SWC_FMT_LU "/" SWC_FMT_LU
166  " unloading new-rid=" SWC_FMT_LU " reg-err=%d(%s)",
167  col->cfg->cid, range->rid, hdlr->range->rid,
168  hdlr->error(), Error::get_text(hdlr->error()));
169  hdlr->range->compacting(Range::COMPACT_NONE);
170  col->internal_unload(hdlr->range->rid);
171 
172  struct ReqData {
173  const cid_t cid;
174  const rid_t new_rid;
176  ReqData(cid_t a_cid, rid_t a_new_rid) noexcept
177  : cid(a_cid), new_rid(a_new_rid) {
178  }
180  cid_t get_cid() const noexcept {
181  return cid;
182  }
184  client::Clients::Ptr& get_clients() noexcept {
185  return Env::Clients::get();
186  }
188  bool valid() noexcept {
189  return !Env::Rgr::is_not_accepting();
190  }
192  void callback(
196  "RangeSplit::Mngr::Req::RangeUnloaded err=%d(%s) "
198  rsp.err, Error::get_text(rsp.err), cid, new_rid);
199  if(rsp.err && valid() &&
200  rsp.err != Error::CLIENT_STOPPING &&
201  rsp.err != Error::COLUMN_NOT_EXISTS &&
203  rsp.err != Error::COLUMN_NOT_READY) {
204  req->request_again();
205  }
206  }
207  };
210  col->cfg->cid, hdlr->range->rid),
211  10000,
212  col->cfg->cid, hdlr->range->rid
213  );
214 
215  range->expand_and_align(true, Query::Update::CommonMeta::make(
216  range,
217  [await] (const Query::Update::CommonMeta::Ptr&) noexcept {
218  await->acknowledge();
219  })
220  );
221  }));
222  new_range = nullptr;
223  res.wait();
224 
227  "COMPACT-SPLITTED RANGE " SWC_FMT_LU "/" SWC_FMT_LU
228  " took=" SWC_FMT_LU "ns new-end=",
229  range->cfg->cid, range->rid, t_measure.elapsed());
230  range->blocks.cellstores.back()->interval.key_end.print(SWC_LOG_OSTREAM);
231  );
232  return Error::OK;
233  }
234 
235  void mngr_create_range(int& err, rid_t& new_rid) {
237 
238  struct ReqData {
239  const cid_t cid;
240  int& err;
241  rid_t& new_rid;
244  ReqData(cid_t a_cid, int& a_err, rid_t& a_new_rid,
245  Core::StateSynchronization& a_res) noexcept
246  : cid(a_cid), err(a_err), new_rid(a_new_rid), res(a_res) {
247  }
249  cid_t get_cid() const noexcept {
250  return cid;
251  }
253  client::Clients::Ptr& get_clients() noexcept {
254  return Env::Clients::get();
255  }
257  bool valid() noexcept {
258  return !Env::Rgr::is_not_accepting();
259  }
261  void callback(
265  "RangeSplit::Mngr::Req::RangeCreate err=%d(%s) "
267  rsp.err, Error::get_text(rsp.err), cid, rsp.rid);
268 
269  if(rsp.err && valid() &&
270  rsp.err != Error::CLIENT_STOPPING &&
271  rsp.err != Error::COLUMN_NOT_EXISTS &&
273  rsp.err != Error::COLUMN_NOT_READY) {
274  req->request_again();
275  return;
276  }
277  err = rsp.err;
278  new_rid = rsp.rid;
279  res.acknowledge();
280  }
281  };
284  range->cfg->cid, Env::Rgr::rgr_data()->rgrid),
285  10000,
286  range->cfg->cid, err, new_rid, res
287  );
288  res.wait();
289  }
290 
292  const size_t split_at;
293 };
294 
295 
296 }} // namespace SWC::Ranger
297 
298 #endif // swcdb_ranger_db_RangeSplit_h
SWC::Comm::Protocol::Mngr::Params::RangeRemoveReq
Definition: RangeRemove.h:16
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Ranger::RangeSplit::operator=
RangeSplit & operator=(const RangeSplit &)=delete
SWC::Comm::Protocol::Mngr::Params::RangeUnloadedRsp::err
int err
Definition: RangeUnloaded.h:78
SWC::Ranger::RangeSplit::mngr_create_range
void mngr_create_range(int &err, rid_t &new_rid)
Definition: RangeSplit.h:235
SWC::Env::Clients::get
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
Definition: Clients.h:299
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Comm::Protocol::Mngr::Params::RangeRemoveRsp::err
int err
Definition: RangeRemove.h:78
SWC::Ranger::RangeSplit::RangeSplit
RangeSplit(const RangePtr &a_range, const size_t a_split_at)
Definition: RangeSplit.h:73
StateSynchronization.h
SWC::Ranger::RangeSplit::RangeSplit
RangeSplit(const RangeSplit &&)=delete
SWC::Env::Rgr::rgr_data
static SWC_CAN_INLINE DB::RgrData * rgr_data() noexcept
Definition: RangerEnv.h:48
SWC::Comm::Protocol::Mngr::Params::RangeRemoveRsp
Definition: RangeRemove.h:59
SWC::Comm::Protocol::Mngr::Params::RangeUnloadedReq::cid
cid_t cid
Definition: RangeUnloaded.h:26
SWC::Error::get_text
const char * get_text(const int err) noexcept
Definition: Error.cc:173
SWC::Comm::Protocol::Mngr::Params::RangeCreateReq
Definition: RangeCreate.h:16
SWC_LOG_PRINTF
#define SWC_LOG_PRINTF(fmt,...)
Definition: Logger.h:165
SWC::Env::Rgr::columns
static SWC_CAN_INLINE Ranger::Columns * columns() noexcept
Definition: RangerEnv.h:144
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::client::Clients::Ptr
ClientsPtr Ptr
Definition: Clients.h:58
SWC::Comm::Protocol::Mngr::Params::RangeCreateRsp
Definition: RangeCreate.h:59
SWC::Ranger::Columns::get_column
ColumnPtr get_column(const cid_t cid)
Definition: Columns.cc:14
SWC::DB::RgrData::rgrid
Core::Atomic< rgrid_t > rgrid
Definition: RgrData.h:28
SWC::Ranger::CommitLog::Splitter
Definition: CommitLogSplitter.h:14
SWC::Time::Measure::elapsed
SWC_CAN_INLINE uint64_t elapsed() const noexcept
Definition: Time.h:74
SWC::Ranger::ColumnPtr
std::shared_ptr< Column > ColumnPtr
Definition: Columns.h:13
SWC::Ranger::RangeSplit
Definition: RangeSplit.h:69
SWC::Comm::Protocol::Mngr::Req::RangeUnloaded::request
static SWC_CAN_INLINE void request(const Params::RangeUnloadedReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: RangeUnloaded.h:36
SWC::Ranger::RangeSplit::~RangeSplit
~RangeSplit() noexcept
Definition: RangeSplit.h:87
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Ranger::RangePtr
std::shared_ptr< Range > RangePtr
Definition: Columns.h:15
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::Comm::Protocol::Mngr::Params::RangeUnloadedReq
Definition: RangeUnloaded.h:16
SWC::Core::StateSynchronization::wait
SWC_CAN_INLINE void wait()
Definition: StateSynchronization.h:25
SWC::Ranger::Query::Update::CommonMeta::Ptr
std::shared_ptr< CommonMeta > Ptr
Definition: CommonMeta.h:15
SWC::Ranger::mngr_remove_range
static void mngr_remove_range(const RangePtr &new_range)
Definition: RangeSplit.h:17
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Time::Measure
Definition: Time.h:55
SWC::Error::COLUMN_NOT_READY
@ COLUMN_NOT_READY
Definition: Error.h:99
SWC::Comm::Protocol::Mngr::Req::RangeCreate::request
static SWC_CAN_INLINE void request(const Params::RangeCreateReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: RangeCreate.h:36
SWC::Error::CLIENT_STOPPING
@ CLIENT_STOPPING
Definition: Error.h:127
SWC::Ranger::RangeSplit::split_at
const size_t split_at
Definition: RangeSplit.h:292
SWC::Ranger::RangeSplit::run
int run()
Definition: RangeSplit.h:89
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::Ranger::Range::COMPACT_NONE
static const uint8_t COMPACT_NONE
Definition: Range.h:51
SWC::Ranger::RangeSplit::RangeSplit
RangeSplit(const RangeSplit &)=delete
SWC::cid_t
uint64_t cid_t
Definition: Identifiers.h:16
SWC::Error::COLUMN_NOT_EXISTS
@ COLUMN_NOT_EXISTS
Definition: Error.h:100
SWC::Core::Vector< Read::Ptr >
SWC::Comm::Protocol::Mngr::Params::RangeUnloadedRsp
Definition: RangeUnloaded.h:59
SWC::Error::COLUMN_MARKED_REMOVED
@ COLUMN_MARKED_REMOVED
Definition: Error.h:102
SWC::Comm::Protocol::Mngr::Params::RangeCreateRsp::err
int err
Definition: RangeCreate.h:78
SWC::Core::StateSynchronization
Definition: StateSynchronization.h:16
SWC::rid_t
uint64_t rid_t
Definition: Identifiers.h:17
SWC::Env::Rgr::is_not_accepting
static SWC_CAN_INLINE bool is_not_accepting() noexcept
Definition: RangerEnv.h:53
SWC::Core::StateSynchronization::acknowledge
SWC_CAN_INLINE void acknowledge() noexcept
Definition: StateSynchronization.h:31
SWC::Comm::Protocol::Mngr::Req::RangeRemove::request
static SWC_CAN_INLINE void request(const Params::RangeRemoveReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: RangeRemove.h:37
SWC::Comm::client::ConnQueueReqBase::Ptr
std::shared_ptr< ConnQueueReqBase > Ptr
Definition: ClientConnQueue.h:25
SWC::Core::Vector::assign
SWC_CAN_INLINE void assign(IteratorT first, IteratorT last)
Definition: Vector.h:452
SWC::Ranger::RangeSplit::range
const RangePtr range
Definition: RangeSplit.h:291
SWC::Error::CANCELLED
@ CANCELLED
Definition: Error.h:56
SWC::Ranger::CommitLog::Splitter::run
void run()
Definition: CommitLogSplitter.h:32
SWC::Comm::Protocol::Mngr::Params::RangeCreateRsp::rid
rid_t rid
Definition: RangeCreate.h:79
SWC::Ranger::Query::Update::CommonMeta::make
static SWC_CAN_INLINE Ptr make(const RangePtr &range, Cb_t &&cb)
Definition: CommonMeta.h:19