SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Column.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 
9 namespace SWC { namespace Ranger {
10 
11 
12 
14 Column::Column(const cid_t cid, const DB::SchemaPrimitives& schema)
15  : cfg(new ColumnCfg(cid, schema)),
16  m_mutex(), m_q_mng(), m_release() {
18 }
19 
20 Column::~Column() noexcept {
22 }
23 
24 size_t Column::ranges_count() noexcept {
26  return size();
27 }
28 
30 bool Column::removing() noexcept {
32  return cfg->deleting;
33 }
34 
35 bool Column::is_not_used() noexcept {
36  return cfg.use_count() == 1 && m_q_mng.empty() && !ranges_count();
37 }
38 
42  auto it = find(rid);
43  return it == cend() ? nullptr : it->second;
44 }
45 
46 RangePtr Column::get_next(cid_t& last_rid, size_t &idx) {
48  if(last_rid) {
49  auto it = find(last_rid);
50  if(it != cend()) {
51  if(++it != cend()) {
52  last_rid = it->second->rid;
53  return it->second;
54  }
55  idx = 0;
56  last_rid = 0;
57  return nullptr;
58  }
59  }
60  if(size() > idx) {
61  auto it = cbegin();
62  for(size_t i=idx; i; --i, ++it);
63  last_rid = it->second->rid;
64  return it->second;
65  }
66  idx = 0;
67  last_rid = 0;
68  return nullptr;
69 }
70 
71 void Column::get_rids(rids_t& rids) {
73  rids.reserve(size());
74  for(auto it = cbegin(); it != cend(); ++it)
75  rids.push_back(it->first);
76 }
77 
80  ranges.reserve(size());
81  for(auto it = cbegin(); it != cend(); ++it)
82  ranges.push_back(it->second);
83 }
84 
86  bool compact =
87  cfg->c_versions > schema.cell_versions ||
88  (!cfg->c_ttl && schema.cell_ttl) ||
89  cfg->c_ttl > uint64_t(schema.cell_ttl) * 1000000000;
90  bool and_cells = cfg->c_versions != schema.cell_versions ||
91  cfg->col_type != schema.col_type ||
92  cfg->c_ttl != uint64_t(schema.cell_ttl) * 1000000000;
93  cfg->update(schema);
94  if(and_cells) {
96  for(auto it = cbegin(); it != cend(); ++it)
97  it->second->schema_update(compact);
98  }
99  if(compact)
101 }
102 
104  {
106  for(auto it = cbegin(); it != cend(); ++it)
107  it->second->compact_require(true);
108  }
110 }
111 
112 
114 size_t Column::release(size_t bytes, uint8_t level) {
115  if(m_release.running())
116  return 0;
117 
118  RangePtr range = nullptr;
119  const_iterator it;
120  size_t released = 0;
121  for(size_t offset = 0; ; ++offset) {
122  {
124  if(cfg->deleting)
125  break;
126  if(range && (it = find(range->rid)) != cend()) {
127  ++it;
128  } else {
129  it = cbegin();
130  for(size_t i=0; i<offset && it != cend(); ++it, ++i);
131  }
132  if(it == cend())
133  break;
134  range = it->second;
135  }
136  if(!range->is_loaded() || range->compacting())
137  continue;
138  released += range->blocks.release(bytes - released, level);
139  if(released >= bytes)
140  break;
141  }
142  m_release.stop();
143  return released;
144 }
145 
146 void Column::print(std::ostream& out, bool minimal) {
148  cfg->print(out << '(');
149  out << " ranges=[";
150 
151  for(auto it = cbegin(); it != cend(); ++it){
152  it->second->print(out, minimal);
153  out << ", ";
154  }
155  out << "])";
156 }
157 
158 
159 RangePtr Column::internal_create(int& err, rid_t rid, bool compacting) {
165  } else if(cfg->deleting) {
167  }
168  if(err)
169  return nullptr;
170 
171  auto res = emplace(rid, nullptr);
172  if(res.second) {
173  res.first->second.reset(new Range(cfg, rid));
174  res.first->second->init();
175  if(compacting)
176  res.first->second->compacting(Range::COMPACT_APPLYING);
177  }
178  return res.first->second;
179 }
180 
182  auto range = get_range(rid);
183  if(range) {
184  bool chk_empty = false;
185  range->internal_unload(true, chk_empty);
186  range = nullptr;
187  internal_delete(rid);
188  }
189 }
190 
191 void Column::internal_unload(const rid_t rid, bool& chk_empty) {
192  auto range = get_range(rid);
193  if(range) {
194  range->internal_unload(true, chk_empty);
195  range = nullptr;
196  internal_delete(rid);
197  }
198 }
199 
200 void Column::internal_remove(int &err, const rid_t rid) {
201  auto range = get_range(rid);
202  if(range) {
203  range->internal_remove(err);
204  range = nullptr;
205  internal_delete(rid);
206  }
207 }
208 
211  auto it = find(rid);
212  if(it != cend())
213  erase(it);
214 }
215 
216 
222  noexcept : ptr(std::move(a_ptr)), req(std::move(a_req)) {
223  }
225  TaskRunMngReq(TaskRunMngReq&& other) noexcept
226  : ptr(std::move(other.ptr)), req(std::move(other.req)) {
227  }
228  TaskRunMngReq(const TaskRunMngReq&) = delete;
231  ~TaskRunMngReq() noexcept { }
232  void operator()() {
233  switch(req->action) {
235  ptr->load(
236  std::dynamic_pointer_cast<Callback::RangeLoad>(req));
237  break;
238  }
239 
241  ptr->unload(
242  std::dynamic_pointer_cast<Callback::RangeUnload>(req));
243  break;
244  }
245 
247  ptr->unload(
248  std::dynamic_pointer_cast<Callback::RangeUnloadInternal>(req));
249  break;
250  }
251 
253  ptr->unload_all(
254  std::dynamic_pointer_cast<Callback::ColumnsUnload>(req));
255  break;
256  }
257 
259  ptr->remove(
260  std::dynamic_pointer_cast<Callback::ColumnDelete>(req));
261  break;
262  }
263  /*
264  default: {
265  req->send_error(Error::NOT_IMPLEMENTED);
266  run_mng_queue();
267  break;
268  }
269  */
270  }
271  }
272 };
273 
275  if(m_q_mng.activating(std::move(req)))
276  Env::Rgr::post(TaskRunMngReq(shared_from_this(), std::move(req)));
277 }
278 
281  if(!m_q_mng.deactivating(req))
282  Env::Rgr::post(TaskRunMngReq(shared_from_this(), std::move(req)));
283 }
284 
285 
287  if(req->expired(1000))
288  return run_mng_queue();
289 
290  int err = Error::OK;
291  auto range = internal_create(err, req->rid, false);
292  req->col = shared_from_this();
293  err ? req->loaded(err) : range->load(req);
294 }
295 
297  if(req->expired(1000))
298  return run_mng_queue();
299 
300  bool chk_empty = true;
301  internal_unload(req->rid, chk_empty);
302  if(chk_empty)
303  req->rsp_params.set_empty();
304  req->response_params();
306  run_mng_queue();
307 }
308 
310  if(req) {
311  bool chk_empty = false;
312  internal_unload(req->rid, chk_empty);
313  req->response();
315  }
316  run_mng_queue();
317 }
318 
320  if(!cfg->deleting) for(RangePtr range;;) {
321  {
323  if(empty())
324  break;
325  range = cbegin()->second;
326  }
327  if(range) {
328  bool chk_empty = false;
329  range->internal_unload(req->completely, chk_empty);
330  req->unloaded(range);
331  internal_delete(range->rid);
332  }
333  }
334  req->unloaded(shared_from_this());
336  run_mng_queue();
337 }
338 
340  if(req->expired(1000))
341  return run_mng_queue();
342 
343  if(cfg->deleting)
344  return req->response_ok();
345 
346  cfg->deleting.store(true);
347  Core::Vector<RangePtr> ranges;
348  {
350  ranges.reserve(size());
351  for(auto it=cbegin(); it != cend(); ++it) {
352  req->add(it->second);
353  ranges.push_back(it->second);
354  }
355  }
356  req->col = shared_from_this();
357  if(ranges.empty()) {
358  req->complete();
359  } else {
360  for(auto& range : ranges)
361  range->remove(req);
362  }
363 }
364 
365 
366 }}
SWC::Ranger::Column::internal_unload
void internal_unload(const rid_t rid)
Definition: Column.cc:181
SWC::Ranger::Column::Column
Column(const cid_t cid, const DB::SchemaPrimitives &schema)
Definition: Column.cc:14
SWC::Core::StateRunning::stop
constexpr SWC_CAN_INLINE void stop() noexcept
Definition: StateRunning.h:32
SWC::Ranger::Column::m_q_mng
Core::QueueSafeStated< Callback::ManageBase::Ptr > m_q_mng
Definition: Column.h:80
SWC::Ranger::Column::ranges_count
size_t ranges_count() noexcept
Definition: Column.cc:24
SWC::Env::Rgr::compaction_schedule
static void compaction_schedule(uint32_t ms)
Definition: RangerEnv.h:385
SWC::Ranger::Callback::ColumnsUnload::Ptr
std::shared_ptr< ColumnsUnload > Ptr
Definition: ColumnsUnload.h:20
SWC::Ranger::Column::TaskRunMngReq::TaskRunMngReq
SWC_CAN_INLINE TaskRunMngReq(ColumnPtr &&a_ptr, Callback::ManageBase::Ptr &&a_req) noexcept
Definition: Column.cc:221
SWC::Ranger::Column::TaskRunMngReq
Definition: Column.cc:217
SWC::Ranger::Column::TaskRunMngReq::~TaskRunMngReq
~TaskRunMngReq() noexcept
Definition: Column.cc:231
SWC::Error::SERVER_SHUTTING_DOWN
@ SERVER_SHUTTING_DOWN
Definition: Error.h:84
SWC::Ranger::Column::TaskRunMngReq::operator=
TaskRunMngReq & operator=(TaskRunMngReq &&)=delete
SWC::Ranger::Column::TaskRunMngReq::req
Callback::ManageBase::Ptr req
Definition: Column.cc:219
SWC::Ranger::Column::run_mng_queue
void run_mng_queue()
Definition: Column.cc:279
SWC::Ranger::Column::cfg
const ColumnCfg::Ptr cfg
Definition: Column.h:22
SWC::DB::Types::SystemColumn::is_data
constexpr SWC_CAN_INLINE bool is_data(cid_t cid) noexcept
Definition: SystemColumn.h:46
SWC::Ranger::Column::print
void print(std::ostream &out, bool minimal=true)
Definition: Column.cc:146
SWC::Env::Rgr::columns
static SWC_CAN_INLINE Ranger::Columns * columns() noexcept
Definition: RangerEnv.h:144
SWC::Ranger::Column::internal_delete
void internal_delete(rid_t rid)
Definition: Column.cc:209
SWC::Ranger::Callback::ManageBase::RANGE_LOAD
@ RANGE_LOAD
Definition: ManageBase.h:26
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::DB::SchemaPrimitives
Definition: Schema.h:23
SWC::Ranger::Column::TaskRunMngReq::ptr
ColumnPtr ptr
Definition: Column.cc:218
SWC::Ranger::Column::TaskRunMngReq::operator()
void operator()()
Definition: Column.cc:232
SWC::Ranger::Column::load
void load(const Callback::RangeLoad::Ptr &req)
Definition: Column.cc:286
SWC::Ranger::Column::m_release
Core::StateRunning m_release
Definition: Column.h:81
SWC::Ranger::Column::TaskRunMngReq::TaskRunMngReq
SWC_CAN_INLINE TaskRunMngReq(TaskRunMngReq &&other) noexcept
Definition: Column.cc:225
SWC::Ranger::Callback::ManageBase::RANGE_UNLOAD
@ RANGE_UNLOAD
Definition: ManageBase.h:27
SWC::Ranger::Column::add_managing
void add_managing(Callback::ManageBase::Ptr &&req)
Definition: Column.cc:274
SWC::Ranger::Column::get_next
RangePtr get_next(cid_t &last_rid, size_t &idx)
Definition: Column.cc:46
SWC::Ranger::Callback::ManageBase::COLUMN_DELETE
@ COLUMN_DELETE
Definition: ManageBase.h:30
SWC::Ranger::ColumnPtr
std::shared_ptr< Column > ColumnPtr
Definition: Columns.h:13
SWC::Ranger::Column::compact
void compact()
Definition: Column.cc:103
Column.h
SWC::Ranger::Callback::RangeUnload::Ptr
std::shared_ptr< RangeUnload > Ptr
Definition: RangeUnload.h:19
SWC::Ranger::Callback::RangeUnloadInternal::Ptr
std::shared_ptr< RangeUnloadInternal > Ptr
Definition: RangeUnloadInternal.h:18
SWC::Ranger::Columns::erase_if_empty
void erase_if_empty(cid_t cid)
Definition: Columns.cc:159
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC::Ranger::RangePtr
std::shared_ptr< Range > RangePtr
Definition: Columns.h:15
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Ranger::Callback::ManageBase::RANGE_UNLOAD_INTERNAL
@ RANGE_UNLOAD_INTERNAL
Definition: ManageBase.h:28
SWC::Ranger::Column::removing
bool removing() noexcept
Definition: Column.cc:30
SWC::Env::Rgr::in_process
static SWC_CAN_INLINE int64_t in_process() noexcept
Definition: RangerEnv.h:63
size
uint32_t size
Buffer size.
Definition: HeaderBufferInfo.h:47
SWC::DB::SchemaPrimitives::cell_ttl
uint32_t cell_ttl
Definition: Schema.h:164
SWC::Ranger::Callback::ManageBase::COLUMNS_UNLOAD
@ COLUMNS_UNLOAD
Definition: ManageBase.h:29
SWC::Ranger::Column::schema_update
void schema_update(const DB::SchemaPrimitives &schema)
Definition: Column.cc:85
SWC::Ranger::Column::internal_create
RangePtr internal_create(int &err, rid_t rid, bool compacting)
Definition: Column.cc:159
SWC::Core::StateRunning::running
constexpr SWC_CAN_INLINE bool running() noexcept
Definition: StateRunning.h:37
SWC::Core::QueueSafeStated::deactivating
SWC_CAN_INLINE bool deactivating()
Definition: QueueSafeStated.h:77
SWC::Ranger::Column::get_range
RangePtr get_range(const rid_t rid)
Definition: Column.cc:40
SWC::cid_t
uint64_t cid_t
Definition: Identifiers.h:16
SWC::Ranger::Column::get_rids
void get_rids(rids_t &rids)
Definition: Column.cc:71
SWC::Ranger::Column::is_not_used
bool is_not_used() noexcept
Definition: Column.cc:35
SWC::Ranger::Callback::ManageBase::Ptr
std::shared_ptr< ManageBase > Ptr
Definition: ManageBase.h:23
SWC::Ranger::Range::COMPACT_APPLYING
static const uint8_t COMPACT_APPLYING
Definition: Range.h:55
SWC::Core::Vector< rid_t, rid_t >
SWC::Ranger::Column::remove
void remove(const Callback::ColumnDelete::Ptr &req)
Definition: Column.cc:339
SWC::Core::QueueSafeStated::empty
SWC_CAN_INLINE bool empty() noexcept
Definition: QueueSafeStated.h:53
SWC::Ranger::Callback::ColumnDelete::Ptr
std::shared_ptr< ColumnDelete > Ptr
Definition: ColumnDelete.h:16
SWC::Error::COLUMN_MARKED_REMOVED
@ COLUMN_MARKED_REMOVED
Definition: Error.h:102
SWC::Env::Rgr::is_shuttingdown
static SWC_CAN_INLINE bool is_shuttingdown() noexcept
Definition: RangerEnv.h:58
SWC::Ranger::Column::unload_all
void unload_all(const Callback::ColumnsUnload::Ptr &req)
Definition: Column.cc:319
SWC::rid_t
uint64_t rid_t
Definition: Identifiers.h:17
SWC::Ranger::Callback::RangeLoad::Ptr
std::shared_ptr< RangeLoad > Ptr
Definition: RangeLoad.h:16
SWC::Env::Rgr::is_not_accepting
static SWC_CAN_INLINE bool is_not_accepting() noexcept
Definition: RangerEnv.h:53
SWC::Ranger::Column::TaskRunMngReq::operator=
TaskRunMngReq & operator=(const TaskRunMngReq &)=delete
SWC::Core::QueueSafeStated::activating
SWC_CAN_INLINE bool activating() noexcept
Definition: QueueSafeStated.h:71
SWC::Ranger::ColumnCfg
Definition: ColumnCfg.h:15
SWC::DB::SchemaPrimitives::cell_versions
uint32_t cell_versions
Definition: Schema.h:163
SWC::Env::Rgr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: RangerEnv.h:114
SWC::DB::SchemaPrimitives::col_type
Types::Column col_type
Definition: Schema.h:169
SWC::Ranger::Column::release
size_t release(size_t bytes, uint8_t level)
Definition: Column.cc:114
SWC::Ranger::Column::get_ranges
void get_ranges(Core::Vector< RangePtr > &ranges)
Definition: Column.cc:78
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::Ranger::Column::m_mutex
Core::MutexSptd m_mutex
Definition: Column.h:79
SWC::Ranger::Column::unload
void unload(const Callback::RangeUnload::Ptr &req)
Definition: Column.cc:296
SWC::DB::Types::Range
Range
Definition: Range.h:14
SWC::Ranger::Column::internal_remove
void internal_remove(int &err, const rid_t rid)
Definition: Column.cc:200
SWC::Ranger::Column::TaskRunMngReq::TaskRunMngReq
TaskRunMngReq(const TaskRunMngReq &)=delete
SWC::Core::Vector::reserve
SWC_CAN_INLINE void reserve(size_type cap)
Definition: Vector.h:288
SWC::Ranger::Column::~Column
~Column() noexcept
Definition: Column.cc:20