SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Compaction.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 
9 
10 
11 namespace SWC { namespace Ranger {
12 
13 
15  : cfg_read_ahead(
16  Env::Config::settings()->get<Config::Property::Value_uint8_g>(
17  "swc.rgr.compaction.read.ahead")),
18  cfg_max_range(
19  Env::Config::settings()->get<Config::Property::Value_uint8_g>(
20  "swc.rgr.compaction.range.max")),
21  cfg_max_log(
22  Env::Config::settings()->get<Config::Property::Value_uint8_g>(
23  "swc.rgr.compaction.commitlog.max")),
24  cfg_check_interval(
25  Env::Config::settings()->get<Config::Property::Value_int32_g>(
26  "swc.rgr.compaction.check.interval")),
27  cfg_uncompacted_max(
28  Env::Config::settings()->get<Config::Property::Value_int32_g>(
29  "swc.rgr.compaction.range.uncompacted.max")),
30  m_run(true),
31  m_schedule(), m_running(0),
32  m_log_chk(), m_log_compactions(0),
33  m_mutex(),
34  m_check_timer(
35  asio::high_resolution_timer(
36  Env::Rgr::maintenance_io()->executor())),
37  m_cv(),
38  m_last_cid(0), m_idx_cid(0),
39  m_last_rid(0), m_idx_rid(0),
40  m_next(false), m_uncompacted(0),
41  m_compacting() {
42 }
43 
46  if(m_log_chk.running())
47  return false;
48  bool ok = uint16_t(m_log_compactions) + m_running
49  < uint16_t(cfg_max_log->get());
50  if(ok)
52  m_log_chk.stop();
53  return ok;
54 }
55 
59 }
60 
62 bool Compaction::available() noexcept {
63  return m_running < cfg_max_range->get() &&
65 }
66 
68  m_run.store(false);
69  {
70  Core::UniqueLock lock_wait(m_mutex);
71  if(m_schedule.running())
72  m_cv.wait(lock_wait, [this](){ return !m_schedule.running(); });
73  m_check_timer.cancel();
74  }
75  uint8_t sz = 0;
76  for(uint8_t n=0; ; ++n) {
78  {
80  if(m_compacting.empty())
81  break;
82  if(sz && sz == m_compacting.size()) {
83  if(n >= m_compacting.size())
84  break;
85  } else {
86  sz = m_compacting.size();
87  n = 0;
88  }
89  req = m_compacting[n];
90  }
91  req->quit();
92  }
93  Core::UniqueLock lock_wait(m_mutex);
94  m_check_timer.cancel();
95  if(m_running)
96  m_cv.wait(lock_wait, [this](){ return !m_running; });
97  m_schedule.stop();
98 }
99 
102  return !m_run;
103 }
104 
106  for(ColumnPtr col = nullptr;
107  !stopped() && available() &&
108  (col || (col=Env::Rgr::columns()->get_next(m_last_cid, m_idx_cid)));) {
109 
110  RangePtr range;
111  if(col->removing() ||
112  !(range = col->get_next(m_last_rid, m_idx_rid))) {
113  ++m_idx_cid;
114  col = nullptr;
115  m_last_rid = 0;
116  m_idx_rid = 0;
117  continue;
118  }
119  ++m_idx_rid;
120 
121  SWC_LOGF(LOG_DEBUG, "COMPACT-CHECKING " SWC_FMT_LU "/" SWC_FMT_LU,
122  range->cfg->cid, range->rid);
123 
124  if((!range->compact_required() && range->blocks.commitlog.try_compact()) ||
125  !range->compact_possible())
126  continue;
127 
128  compact(range);
129  }
131  if(!m_idx_cid)
132  m_uncompacted = 0;
133  m_schedule.stop();
134 
135  if(stopped()) {
137  m_cv.notify_all();
138  } else {
140  }
141 }
142 
143 void Compaction::compact(const RangePtr& range) {
144 
145  if(!range->is_loaded() || stopped()) {
146  range->compacting(Range::COMPACT_NONE);
147  return;
148  }
149 
150  auto& commitlog = range->blocks.commitlog;
151 
152  uint32_t cs_size = range->cfg->cellstore_size();
153  size_t cs_max = range->cfg->cellstore_max();
154 
155  uint32_t blk_size = range->cfg->block_size();
156  if(cs_size < blk_size)
157  blk_size = cs_size;
158  uint8_t perc = range->cfg->compact_percent();
159  uint32_t allow_sz = (cs_size / 100) * perc;
160  uint32_t cell_revs = range->cfg->cell_versions();
161  uint64_t cell_ttl = range->cfg->cell_ttl();
162 
163  size_t value;
164  bool do_compaction = false;
165  std::string need;
166 
167  if((do_compaction = range->compact_required() && !commitlog.empty())) {
168  need.append("Required");
169 
170  } else if((do_compaction = (value = commitlog.size_bytes(true))
171  >= allow_sz)) {
172  need.append("LogBytes=");
173  need.append(std::to_string(value-allow_sz));
174 
175  } else if((do_compaction = (value = commitlog.size()) > cs_size/blk_size)) {
176  need.append("LogCount=");
177  need.append(std::to_string(value-cs_size/blk_size));
178 
179  } else if((do_compaction = range->blocks.cellstores.need_compaction(
180  cs_max, cs_size+allow_sz, blk_size+(blk_size/100)*perc) )) {
181  need.append("CsResize");
182 
183  } else if((do_compaction = cell_ttl &&
184  int64_t(value = range->blocks.cellstores.get_ts_earliest())
186  value < Time::now_ns()-cell_ttl*100)) {
187  need.append("CsTTL");
188 
189  } else if((do_compaction = range->blocks.cellstores.get_cell_revs()
190  > cell_revs)) {
191  need.append("CsVersions");
192 
193  } else if((do_compaction =
194  (Time::now_ns() - commitlog.modification_ts())/1000000
195  > cfg_check_interval->get() &&
196  !commitlog.empty() &&
197  ++m_uncompacted > size_t(cfg_uncompacted_max->get()))) {
198  need.append("Uncompacted=");
199  need.append(std::to_string(m_uncompacted--));
200  }
201 
202  if(stopped() || !do_compaction) {
203  range->compacting(Range::COMPACT_NONE);
204  return;
205  }
206 
207  Env::Rgr::res().more_mem_future(blk_size);
208  m_running.fetch_add(1);
209 
210  SWC_LOGF(LOG_INFO, "COMPACT-STARTED " SWC_FMT_LU "/" SWC_FMT_LU " %s",
211  range->cfg->cid, range->rid, need.c_str());
212 
213  struct Task {
214  CompactRange::Ptr req;
216  Task(CompactRange* a_req) noexcept : req(a_req) { }
218  Task(Task&& other) noexcept : req(std::move(other.req)) { }
219  Task(const Task&) = delete;
220  Task& operator=(Task&&) = delete;
221  Task& operator=(const Task&) = delete;
222  ~Task() noexcept { }
223  void operator()() { req->initialize(); }
224  };
225 
226  Task task(new CompactRange(this, range, cs_size, blk_size));
227  {
229  m_compacting.push_back(task.req);
230  }
231  Env::Rgr::maintenance_post(std::move(task));
232 }
233 
235  const RangePtr& range, bool all) {
236  if(all) {
237  range->blocks.reset_blocks();
238 
239  } else if(size_t bytes = Env::Rgr::res().need_ram()) {
240  range->blocks.release(bytes);
241  }
242 
243  range->compacting(Range::COMPACT_NONE);
244  bool ok;
245  {
247  auto it = std::find(m_compacting.cbegin(), m_compacting.cend(), req);
248  if((ok = it != m_compacting.cend()))
249  m_compacting.erase(it);
250  }
251  if(ok) {
252  Env::Rgr::res().less_mem_future(req->blk_size);
253  compacted();
254  } else {
256  "Ranger compaction track missed(" SWC_FMT_LU "/" SWC_FMT_LU ")",
257  range->cfg->cid, range->rid);
258  }
259 }
260 
263  if(m_running.fetch_sub(1) == 1 && stopped()) {
265  m_cv.notify_all();
266  } else {
268  }
269 }
270 
271 SWC_SHOULD_NOT_INLINE
272 void Compaction::schedule(uint32_t t_ms) {
273  if(stopped() || !available())
274  return;
275 
276  if(t_ms < 3 || m_next) {
277  if(!m_schedule.running()) {
278  struct Task {
279  Compaction* ptr;
281  Task(Compaction* a_ptr) noexcept : ptr(a_ptr) { }
282  void operator()() { ptr->run(); }
283  };
284  Env::Rgr::maintenance_post(Task(this));
285  }
286  return;
287  }
288 
289  {
290  auto set_in = std::chrono::milliseconds(t_ms);
291  auto now = asio::high_resolution_timer::clock_type::now();
293  auto set_on = m_check_timer.expiry();
294  if(set_on > now) {
295  if(set_on < now + set_in)
296  return;
297  m_check_timer.cancel();
298  }
299  m_check_timer.expires_after(set_in);
300  struct TimerTask {
301  Compaction* ptr;
303  TimerTask(Compaction* a_ptr) noexcept : ptr(a_ptr) { }
304  void operator()(const asio::error_code& ec) {
305  if(ec != asio::error::operation_aborted)
306  ptr->schedule(0);
307  }
308  };
309  m_check_timer.async_wait(TimerTask(this));
310  }
311  SWC_LOGF(LOG_DEBUG, "Ranger compaction scheduled in ms=%u", t_ms);
312 }
313 
314 
315 
316 }}
SWC::Core::Vector::erase
SWC_CAN_INLINE iterator erase(size_type offset) noexcept(_NoExceptMoveAssign &&_NoExceptDestructor)
Definition: Vector.h:464
SWC::Config::Property::Value_uint8_g::get
SWC_CAN_INLINE uint8_t get() const noexcept
Definition: Property.h:535
SWC::Env::Rgr::res
static SWC_CAN_INLINE System::Resources & res() noexcept
Definition: RangerEnv.h:131
SWC::Core::StateRunning::stop
constexpr SWC_CAN_INLINE void stop() noexcept
Definition: StateRunning.h:32
SWC::Ranger::Compaction::cfg_uncompacted_max
const Config::Property::Value_int32_g::Ptr cfg_uncompacted_max
Definition: Compaction.h:28
SWC::Ranger::Compaction::m_log_chk
Core::StateRunning m_log_chk
Definition: Compaction.h:67
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Env::Rgr::maintenance_post
static SWC_CAN_INLINE void maintenance_post(T_Handler &&handler)
Definition: RangerEnv.h:120
SWC::Core::UniqueLock
Definition: MutexLock.h:68
SWC::Time::now_ns
SWC_CAN_INLINE int64_t now_ns() noexcept
Definition: Time.h:43
SWC::Core::ScopedLock
Definition: MutexLock.h:41
SWC::Ranger::Compaction::schedule
void schedule(uint32_t t_ms)
Definition: Compaction.cc:272
SWC::Ranger::Compaction::m_mutex
std::mutex m_mutex
Definition: Compaction.h:70
SWC::Env::Rgr::columns
static SWC_CAN_INLINE Ranger::Columns * columns() noexcept
Definition: RangerEnv.h:144
SWC::Ranger::Compaction::m_schedule
Core::StateRunning m_schedule
Definition: Compaction.h:65
SWC::Ranger::Compaction::log_compact_finished
void log_compact_finished() noexcept
Definition: Compaction.cc:57
SWC::System::Resources::more_mem_future
SWC_CAN_INLINE void more_mem_future(size_t sz) noexcept
Definition: Resources.h:104
SWC::Ranger::Compaction::m_next
Core::AtomicBool m_next
Definition: Compaction.h:78
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::Ranger::Compaction::operator=
Compaction & operator=(const Compaction &)=delete
SWC::Ranger::Compaction::cfg_max_range
const Config::Property::Value_uint8_g::Ptr cfg_max_range
Definition: Compaction.h:25
SWC::DB::Cells::TIMESTAMP_AUTO
constexpr const int64_t TIMESTAMP_AUTO
Definition: Cell.h:73
SWC::Ranger::CompactRange::Ptr
std::shared_ptr< CompactRange > Ptr
Definition: CompactRange.h:14
SWC::Config::Property::Value_int32_g::get
SWC_CAN_INLINE int32_t get() const noexcept
Definition: Property.h:610
SWC::Ranger::Compaction::available
bool available() noexcept
Definition: Compaction.cc:62
SWC::Ranger::Compaction::m_last_cid
cid_t m_last_cid
Definition: Compaction.h:74
SWC::Ranger::CompactRange
Definition: CompactRange.h:13
SWC::Ranger::ColumnPtr
std::shared_ptr< Column > ColumnPtr
Definition: Columns.h:13
SWC::Ranger::Compaction::m_uncompacted
size_t m_uncompacted
Definition: Compaction.h:79
SWC::Ranger::Compaction::m_run
Core::AtomicBool m_run
Definition: Compaction.h:64
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Ranger::Compaction::m_log_compactions
Core::Atomic< uint8_t > m_log_compactions
Definition: Compaction.h:68
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC::Ranger::RangePtr
std::shared_ptr< Range > RangePtr
Definition: Columns.h:15
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
Compaction.h
SWC::Ranger::Compaction::log_compact_possible
bool log_compact_possible() noexcept
Definition: Compaction.cc:45
SWC::Ranger::Compaction::m_idx_cid
size_t m_idx_cid
Definition: Compaction.h:75
SWC::Ranger::Columns::get_next
ColumnPtr get_next(cid_t &last_cid, size_t &idx)
Definition: Columns.cc:30
SWC::Ranger::Compaction::run
void run()
Definition: Compaction.cc:105
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Ranger::Compaction::compacted
void compacted()
Definition: Compaction.cc:262
SWC::Ranger::Compaction::stop
void stop()
Definition: Compaction.cc:67
SWC::Ranger::Compaction::stopped
bool stopped()
Definition: Compaction.cc:101
SWC::Ranger::Compaction::m_running
Core::Atomic< uint8_t > m_running
Definition: Compaction.h:66
SWC::Core::StateRunning::running
constexpr SWC_CAN_INLINE bool running() noexcept
Definition: StateRunning.h:37
SWC::System::Resources::less_mem_future
SWC_CAN_INLINE void less_mem_future(size_t sz) noexcept
Definition: Resources.h:109
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::Ranger::Compaction::m_compacting
Core::Vector< CompactRange::Ptr > m_compacting
Definition: Compaction.h:81
SWC::Ranger::Range::COMPACT_NONE
static const uint8_t COMPACT_NONE
Definition: Range.h:51
SWC::Ranger::Compaction::Compaction
Compaction()
Definition: Compaction.cc:14
SWC::Ranger::Compaction::m_idx_rid
size_t m_idx_rid
Definition: Compaction.h:77
SWC::Ranger::Compaction::compact
void compact(const RangePtr &range)
Definition: Compaction.cc:143
SWC::Ranger::Compaction::cfg_check_interval
const Config::Property::Value_int32_g::Ptr cfg_check_interval
Definition: Compaction.h:27
SWC::Ranger::Compaction::cfg_max_log
const Config::Property::Value_uint8_g::Ptr cfg_max_log
Definition: Compaction.h:26
SWC::Ranger::Compaction::m_check_timer
asio::high_resolution_timer m_check_timer
Definition: Compaction.h:71
SWC::System::Resources::is_low_mem_state
SWC_CAN_INLINE bool is_low_mem_state() const noexcept
Definition: Resources.h:84
SWC::Core::Vector::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::Ranger::Compaction::m_cv
std::condition_variable m_cv
Definition: Compaction.h:72
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Ranger::Compaction
Definition: Compaction.h:21
SWC::Core::Atomic::fetch_sub
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
Definition: Atomic.h:88
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::Core::Vector::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Core::Atomic::fetch_add
constexpr SWC_CAN_INLINE T fetch_add(T v) noexcept
Definition: Atomic.h:93
SWC::Core::to_string
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Definition: BitFieldInt.h:263
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Ranger::Compaction::m_last_rid
cid_t m_last_rid
Definition: Compaction.h:76