11 namespace SWC {
namespace Ranger {
16 Env::Config::settings()->get<Config::Property::Value_uint8_g>(
17 "swc.rgr.compaction.read.ahead")),
19 Env::Config::settings()->get<Config::Property::Value_uint8_g>(
20 "swc.rgr.compaction.range.max")),
22 Env::Config::settings()->get<Config::Property::Value_uint8_g>(
23 "swc.rgr.compaction.commitlog.max")),
25 Env::Config::settings()->get<Config::Property::Value_int32_g>(
26 "swc.rgr.compaction.check.interval")),
28 Env::Config::settings()->get<Config::Property::Value_int32_g>(
29 "swc.rgr.compaction.range.uncompacted.max")),
31 m_schedule(), m_running(0),
32 m_log_chk(), m_log_compactions(0),
35 asio::high_resolution_timer(
36 Env::Rgr::maintenance_io()->executor())),
38 m_last_cid(0), m_idx_cid(0),
39 m_last_rid(0), m_idx_rid(0),
40 m_next(false), m_uncompacted(0),
76 for(uint8_t n=0; ; ++n) {
111 if(col->removing() ||
122 range->cfg->cid, range->rid);
124 if((!range->compact_required() && range->blocks.commitlog.try_compact()) ||
125 !range->compact_possible())
145 if(!range->is_loaded() ||
stopped()) {
150 auto& commitlog = range->blocks.commitlog;
152 uint32_t cs_size = range->cfg->cellstore_size();
153 size_t cs_max = range->cfg->cellstore_max();
155 uint32_t blk_size = range->cfg->block_size();
156 if(cs_size < blk_size)
158 uint8_t perc = range->cfg->compact_percent();
159 uint32_t allow_sz = (cs_size / 100) * perc;
160 uint32_t cell_revs = range->cfg->cell_versions();
161 uint64_t cell_ttl = range->cfg->cell_ttl();
164 bool do_compaction =
false;
167 if((do_compaction = range->compact_required() && !commitlog.empty())) {
168 need.append(
"Required");
170 }
else if((do_compaction = (value = commitlog.size_bytes(
true))
172 need.append(
"LogBytes=");
175 }
else if((do_compaction = (value = commitlog.size()) > cs_size/blk_size)) {
176 need.append(
"LogCount=");
179 }
else if((do_compaction = range->blocks.cellstores.need_compaction(
180 cs_max, cs_size+allow_sz, blk_size+(blk_size/100)*perc) )) {
181 need.append(
"CsResize");
183 }
else if((do_compaction = cell_ttl &&
184 int64_t(value = range->blocks.cellstores.get_ts_earliest())
187 need.append(
"CsTTL");
189 }
else if((do_compaction = range->blocks.cellstores.get_cell_revs()
191 need.append(
"CsVersions");
193 }
else if((do_compaction =
196 !commitlog.empty() &&
198 need.append(
"Uncompacted=");
202 if(
stopped() || !do_compaction) {
211 range->cfg->cid, range->rid, need.c_str());
218 Task(Task&& other) noexcept : req(std::move(other.req)) { }
219 Task(
const Task&) =
delete;
223 void operator()() { req->initialize(); }
226 Task task(
new CompactRange(
this, range, cs_size, blk_size));
237 range->blocks.reset_blocks();
240 range->blocks.release(bytes);
257 range->cfg->cid, range->rid);
271 SWC_SHOULD_NOT_INLINE
281 Task(
Compaction* a_ptr) noexcept : ptr(a_ptr) { }
282 void operator()() { ptr->
run(); }
290 auto set_in = std::chrono::milliseconds(t_ms);
291 auto now = asio::high_resolution_timer::clock_type::now();
295 if(set_on < now + set_in)
303 TimerTask(
Compaction* a_ptr) noexcept : ptr(a_ptr) { }
304 void operator()(
const asio::error_code& ec) {
305 if(ec != asio::error::operation_aborted)