8 namespace SWC {
namespace Ranger {
namespace CommitLog {
13 worker(a_worker), error(Error::
OK),
15 m_idx(0), m_running(0), m_finishing(0),
18 compact->log->range->cfg->key_seq,
19 compact->log->range->cfg->block_cells() * 2,
20 compact->log->range->cfg->cell_versions(),
21 compact->log->range->cfg->cell_ttl(),
22 compact->log->range->cfg->column_type()
24 m_remove(), m_fragments() {
30 running = compact->preload;
31 m_finishing.store(read_frags.size() + 1);
33 running = m_running.fetch_sub(1);
36 if(running == compact->preload)
do {
37 size_t idx = m_idx.fetch_add(1);
38 if(idx >= read_frags.size())
40 if(error || compact->log->stopping) {
41 m_finishing.fetch_sub(read_frags.size() - idx);
44 running = m_running.add_rslt(1);
45 read_frags[idx]->processing_increment();
46 read_frags[idx]->load(
this);
47 }
while(running < compact->
preload);
49 if(m_finishing.fetch_sub(1) == 1)
59 : g(a_g), frag(std::move(a_frag)) { }
61 Task(Task&& other) noexcept
62 : g(other.g), frag(std::move(other.frag)) { }
63 Task(
const Task&) =
delete;
67 void operator()() { g->
_loaded(frag); }
73 if(compact->log->stopping || error) {
74 frag->processing_decrement();
79 if(!frag->loaded(err)) {
85 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
93 frag->load_cells(err, m_cells);
94 m_remove.push_back(frag);
103 size_t total_cells_count = 0;
105 uint32_t cells_count;
106 if(compact->log->stopping || error || m_cells.empty())
113 m_cells.write_and_free(
114 cells, cells_count = 0, interval,
115 compact->log->range->cfg->block_size(),
116 compact->log->range->cfg->block_cells()
118 total_cells_count += cells_count;
122 compact->get_filepath(compact->log->next_id()),
124 compact->log->range->cfg->block_enc(),
125 compact->log->range->cfg->cell_versions(),
133 m_fragments.push_back(frag);
138 compact->log->range->cfg->file_replication(),
139 std::move(buff_write),
142 }
while(!error && !m_cells.empty());
146 compact->finished(
this, total_cells_count);
151 if(!error && !compact->log->stopping &&
152 read_frags.size() == m_remove.size()) {
153 compact->log->take_ownership(err, m_fragments, m_remove);
156 if(!m_fragments.empty()) {
158 for(
auto frag : m_fragments) {
168 Task(
Compact* a_compact) noexcept : compact(a_compact) { }
169 void operator()() { compact->
finalized(); }
178 uint8_t cointervaling,
181 preload(
log->range->cfg->log_fragment_preload()),
185 m_cb(std::move(cb)) {
186 for(
auto frags : groups)
193 blks =
log->
range->cfg->log_rollout_ratio();
198 for(
auto frags : groups) {
203 for(
auto it = frags.cbegin(); it != frags.cend(); ++it) {
204 m_groups.back()->read_frags.push_back(*it);
206 if(
m_groups.back()->read_frags.size() < cointervaling)
212 if(!blks ||
m_groups.size() >= g_sz)
232 return p1->read_frags.size() >= p2->read_frags.size(); });
248 cells_count, cells_count ? took/cells_count: 0
258 log->
range->blocks.wait_processing();
265 Task(
Group* a_g) noexcept : g(a_g) { }
295 s.reserve(s.length() + 6 + tmp.length());