SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
CommitLogCompact.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
7 
8 namespace SWC { namespace Ranger { namespace CommitLog {
9 
11 Compact::Group::Group(Compact* a_compact, uint8_t a_worker)
12  : ts(), read_frags(),
13  worker(a_worker), error(Error::OK),
14  compact(a_compact),
15  m_idx(0), m_running(0), m_finishing(0),
16  m_mutex(),
17  m_cells(
18  compact->log->range->cfg->key_seq,
19  compact->log->range->cfg->block_cells() * 2,
20  compact->log->range->cfg->cell_versions(),
21  compact->log->range->cfg->cell_ttl(),
22  compact->log->range->cfg->column_type()
23  ),
24  m_remove(), m_fragments() {
25 }
26 
27 void Compact::Group::run(bool initial) {
28  size_t running;
29  if(initial) {
30  running = compact->preload;
31  m_finishing.store(read_frags.size() + 1);
32  } else {
33  running = m_running.fetch_sub(1);
34  }
35 
36  if(running == compact->preload) do {
37  size_t idx = m_idx.fetch_add(1);
38  if(idx >= read_frags.size())
39  break;
40  if(error || compact->log->stopping) {
41  m_finishing.fetch_sub(read_frags.size() - idx);
42  break;
43  }
44  running = m_running.add_rslt(1);
45  read_frags[idx]->processing_increment();
46  read_frags[idx]->load(this);
47  } while(running < compact->preload);
48 
49  if(m_finishing.fetch_sub(1) == 1)
50  write();
51 }
52 
54  struct Task {
55  Group* g;
56  Fragment::Ptr frag;
58  Task(Group* a_g, Fragment::Ptr&& a_frag) noexcept
59  : g(a_g), frag(std::move(a_frag)) { }
61  Task(Task&& other) noexcept
62  : g(other.g), frag(std::move(other.frag)) { }
63  Task(const Task&) = delete;
64  Task& operator=(Task&&) = delete;
65  Task& operator=(const Task&) = delete;
66  ~Task() noexcept { }
67  void operator()() { g->_loaded(frag); }
68  };
69  Env::Rgr::post(Task(this, std::move(frag)));
70 }
71 
73  if(compact->log->stopping || error) {
74  frag->processing_decrement();
75  return run(false);
76  }
77 
78  int err;
79  if(!frag->loaded(err)) {
82  SWC_LOG_OSTREAM << "COMPACT-LOG fragment retrying to ", err);
83  frag->print(SWC_LOG_OSTREAM << ' ');
84  );
85  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
86  frag->load(this);
87  return;
88  }
89 
90  Env::Rgr::res().more_mem_future(frag->size_bytes());
91  {
92  Core::MutexSptd::scope lock(m_mutex);
93  frag->load_cells(err, m_cells);
94  m_remove.push_back(frag);
95  }
96  frag->release();
97  Env::Rgr::res().less_mem_future(frag->size_bytes());
98  run(false);
99 }
100 
102  Core::Semaphore sem(5);
103  size_t total_cells_count = 0;
104  int err;
105  uint32_t cells_count;
106  if(compact->log->stopping || error || m_cells.empty())
107  goto finished_write;
108 
109  do {
110  DynamicBuffer cells;
111  DB::Cells::Interval interval(m_cells.key_seq);
112  StaticBuffer buff_write;
113  m_cells.write_and_free(
114  cells, cells_count = 0, interval,
115  compact->log->range->cfg->block_size(),
116  compact->log->range->cfg->block_cells()
117  );
118  total_cells_count += cells_count;
119 
120  auto frag = Fragment::make_write(
121  err = Error::OK,
122  compact->get_filepath(compact->log->next_id()),
123  std::move(interval),
124  compact->log->range->cfg->block_enc(),
125  compact->log->range->cfg->cell_versions(),
126  cells_count, cells,
127  buff_write
128  );
129  if(err)
130  error.store(err);
131  if(!frag)
132  break;
133  m_fragments.push_back(frag);
134 
135  sem.acquire();
136  frag->write(
138  compact->log->range->cfg->file_replication(),
139  std::move(buff_write),
140  &sem
141  );
142  } while(!error && !m_cells.empty());
143 
144  finished_write:
145  sem.wait_all();
146  compact->finished(this, total_cells_count);
147 }
148 
150  int err = Error::OK;
151  if(!error && !compact->log->stopping &&
152  read_frags.size() == m_remove.size()) {
153  compact->log->take_ownership(err, m_fragments, m_remove);
154  }
155 
156  if(!m_fragments.empty()) {
157  Core::Semaphore sem(10);
158  for(auto frag : m_fragments) {
159  sem.acquire();
160  frag->remove(err = Error::OK, &sem);
161  }
162  sem.wait_all();
163  }
164 
165  struct Task {
166  Compact* compact;
168  Task(Compact* a_compact) noexcept : compact(a_compact) { }
169  void operator()() { compact->finalized(); }
170  };
171  Env::Rgr::post(Task(compact));
172 }
173 
174 
175 
176 Compact::Compact(Fragments* a_log, uint32_t a_repetition,
177  const Fragments::CompactGroups& groups,
178  uint8_t cointervaling,
179  Compact::Cb_t&& cb)
180  : log(a_log), ts(),
181  preload(log->range->cfg->log_fragment_preload()),
182  repetition(a_repetition),
183  ngroups(groups.size()), nfrags(0),
184  m_workers(), m_groups(),
185  m_cb(std::move(cb)) {
186  for(auto frags : groups)
187  nfrags += frags.size();
188 
189  uint32_t blks = Env::Rgr::res().avail_ram() / log->range->cfg->block_size();
190  if(blks < nfrags)
191  log->range->blocks.release((nfrags-blks) * log->range->cfg->block_size());
192  if(!blks)
193  blks = log->range->cfg->log_rollout_ratio();
194 
195  size_t g_sz = Env::Rgr::res().concurrency() / 2;
196  m_groups.reserve(g_sz + 1);
197 
198  for(auto frags : groups) {
199  if(frags.empty())
200  continue;
201  m_groups.push_back(new Group(this, m_groups.size()+1));
202 
203  for(auto it = frags.cbegin(); it != frags.cend(); ++it) {
204  m_groups.back()->read_frags.push_back(*it);
205  if(!blks) {
206  if(m_groups.back()->read_frags.size() < cointervaling)
207  continue;
208  break;
209  }
210  --blks;
211  }
212  if(!blks || m_groups.size() >= g_sz)
213  break;
214  }
215 
216  if(m_groups.empty()) {
217  m_cb ? m_cb(this) : log->finish_compact(this);
218  return;
219  }
220 
222  "COMPACT-LOG-START " SWC_FMT_LU "/" SWC_FMT_LU " w=" SWC_FMT_LD
223  " frags=" SWC_FMT_LU "(" SWC_FMT_LU ")/" SWC_FMT_LU " repetition=%u",
224  log->range->cfg->cid, log->range->rid,
225  int64_t(m_groups.size()), nfrags, ngroups, log->size(), repetition
226  );
227 
228  m_workers.store(m_groups.size());
229 
230  std::sort(m_groups.begin(), m_groups.end(),
231  [](const Group* p1, const Group* p2) {
232  return p1->read_frags.size() >= p2->read_frags.size(); });
233 
234  for(auto g : m_groups)
235  g->run(true);
236 }
237 
238 void Compact::finished(Group* group, size_t cells_count) {
239  size_t running(m_workers.sub_rslt(1));
240 
241  auto took = group->ts.elapsed();
243  "COMPACT-LOG-PROGRESS " SWC_FMT_LU "/" SWC_FMT_LU
244  " running=" SWC_FMT_LU
245  " worker=%u " SWC_FMT_LU "us cells=" SWC_FMT_LU "(" SWC_FMT_LU "ns)",
246  log->range->cfg->cid, log->range->rid, running,
247  group->worker, took/1000,
248  cells_count, cells_count ? took/cells_count: 0
249  );
250  if(running)
251  return;
252 
254  "COMPACT-LOG-FINISHING " SWC_FMT_LU "/" SWC_FMT_LU" w=" SWC_FMT_LD,
255  log->range->cfg->cid, log->range->rid, int64_t(m_groups.size()));
256 
257  log->range->compacting(Range::COMPACT_APPLYING);
258  log->range->blocks.wait_processing(); // sync processing state
259 
260  m_workers.store(m_groups.size());
261 
262  struct Task {
263  Group* g;
265  Task(Group* a_g) noexcept : g(a_g) { }
266  void operator()() {
267  g->finalize();
268  delete g;
269  }
270  };
271  for(auto g : m_groups)
272  Env::Rgr::post(Task(g));
273 }
274 
276  if(m_workers.fetch_sub(1) != 1)
277  return;
278 
280  "COMPACT-LOG-FINISH " SWC_FMT_LU "/" SWC_FMT_LU " w=" SWC_FMT_LD
281  " frags=" SWC_FMT_LU "(" SWC_FMT_LU ")/" SWC_FMT_LU
282  " repetition=%u " SWC_FMT_LU "ns",
283  log->range->cfg->cid, log->range->rid,
284  int64_t(m_groups.size()), nfrags, ngroups, log->size(), repetition,
285  ts.elapsed()
286  );
287 
288  m_cb ? m_cb(this) : log->finish_compact(this);
289 }
290 
292 std::string Compact::get_filepath(const int64_t frag) const {
293  std::string s(log->range->get_path(Range::LOG_TMP_DIR));
294  std::string tmp(std::to_string(frag));
295  s.reserve(s.length() + 6 + tmp.length());
296  s.append("/");
297  s.append(tmp);
298  s.append(".frag");
299  return s;
300 }
301 
302 
303 
304 }}} // namespace SWC::Ranger::CommitLog
305 
SWC::Ranger::CommitLog::Fragment::Ptr
std::shared_ptr< Fragment > Ptr
Definition: CommitLogFragment.h:31
SWC::Core::Semaphore
Definition: Semaphore.h:16
SWC::Env::Rgr::res
static SWC_CAN_INLINE System::Resources & res() noexcept
Definition: RangerEnv.h:131
SWC::System::Resources::concurrency
SWC_CAN_INLINE uint32_t concurrency() const noexcept
Definition: Resources.h:134
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Ranger::CommitLog::Compact
Definition: CommitLogCompact.h:14
SWC::Ranger::CommitLog::Compact::Group::write
void write()
Definition: CommitLogCompact.cc:101
SWC::Ranger::CommitLog::Compact::operator=
Compact & operator=(const Compact &)=delete
SWC::System::Resources::avail_ram
SWC_CAN_INLINE size_t avail_ram() const noexcept
Definition: Resources.h:94
SWC::Ranger::run
SWC_SHOULD_NOT_INLINE int run()
Definition: main.cc:54
SWC::Ranger::CommitLog::Compact::m_groups
Core::Vector< Group * > m_groups
Definition: CommitLogCompact.h:89
SWC::Ranger::CommitLog::Compact::finalized
void finalized()
Definition: CommitLogCompact.cc:275
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Ranger::Range::LOG_TMP_DIR
static constexpr const char LOG_TMP_DIR[]
Definition: Range.h:40
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::System::Resources::more_mem_future
SWC_CAN_INLINE void more_mem_future(size_t sz) noexcept
Definition: Resources.h:104
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::Ranger::CommitLog::Compact::Compact
Compact(Fragments *log, uint32_t repetition, const Fragments::CompactGroups &groups, uint8_t cointervaling, Compact::Cb_t &&cb=nullptr)
Definition: CommitLogCompact.cc:176
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Core::Semaphore::acquire
void acquire()
Definition: Semaphore.cc:29
SWC::Error::UNPOSSIBLE
@ UNPOSSIBLE
Definition: Error.h:42
SWC::Ranger::CommitLog::Compact::preload
const uint8_t preload
Definition: CommitLogCompact.h:62
SWC::Ranger::CommitLog::Compact::m_workers
Core::Atomic< size_t > m_workers
Definition: CommitLogCompact.h:88
CommitLogCompact.h
SWC::Time::Measure::elapsed
SWC_CAN_INLINE uint64_t elapsed() const noexcept
Definition: Time.h:74
SWC::Core::Atomic::sub_rslt
constexpr SWC_CAN_INLINE T sub_rslt(T v) noexcept
Definition: Atomic.h:115
SWC::Ranger::CommitLog::Fragments
Definition: CommitLog.h:19
SWC::Core::Semaphore::wait_all
void wait_all()
Definition: Semaphore.cc:62
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Ranger::CommitLog::Compact::nfrags
size_t nfrags
Definition: CommitLogCompact.h:65
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Ranger::CommitLog::Compact::Group::Group
Group(Compact *compact, uint8_t worker)
Definition: CommitLogCompact.cc:11
SWC::Ranger::CommitLog::Compact::Cb_t
const std::function< void(const Compact *)> Cb_t
Definition: CommitLogCompact.h:58
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Ranger::CommitLog::Compact::Group::ts
const Time::Measure_ns ts
Definition: CommitLogCompact.h:19
SWC::Core::BufferDyn< StaticBuffer >
SWC::Core::Buffer
Definition: Buffer.h:18
size
uint32_t size
Buffer size.
Definition: HeaderBufferInfo.h:47
SWC::Ranger::CommitLog::Fragments::finish_compact
void finish_compact(const Compact *compact)
Definition: CommitLog.cc:263
SWC::Ranger::CommitLog::Compact::Group::loaded
void loaded(Fragment::Ptr &&frag) override
Definition: CommitLogCompact.cc:53
SWC::Ranger::CommitLog::Compact::ts
const Time::Measure_ns ts
Definition: CommitLogCompact.h:61
SWC::Ranger::CommitLog::Compact::repetition
const uint32_t repetition
Definition: CommitLogCompact.h:63
SWC::Comm::Resolver::sort
void sort(const Networks &nets, const EndPoints &endpoints, EndPoints &sorted)
Definition: Resolver.cc:243
SWC::Ranger::CommitLog::Compact::get_filepath
std::string get_filepath(const int64_t frag) const
Definition: CommitLogCompact.cc:292
SWC::Ranger::CommitLog::Compact::m_cb
Cb_t m_cb
Definition: CommitLogCompact.h:90
SWC::System::Resources::less_mem_future
SWC_CAN_INLINE void less_mem_future(size_t sz) noexcept
Definition: Resources.h:109
SWC::Ranger::CommitLog::Compact::log
Fragments * log
Definition: CommitLogCompact.h:60
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::Ranger::CommitLog::Compact::Group::run
void run(bool initial)
Definition: CommitLogCompact.cc:27
SWC::Ranger::Range::COMPACT_APPLYING
static const uint8_t COMPACT_APPLYING
Definition: Range.h:55
SWC::Core::Vector
Definition: Vector.h:14
SWC::Ranger::CommitLog::Compact::Group
Definition: CommitLogCompact.h:16
SWC::Ranger::CommitLog::Fragments::size
size_t size() noexcept
Definition: CommitLog.cc:522
SWC::Common::Files::Schema::write
void write(SWC::DynamicBuffer &dst_buf, const DB::Schema::Ptr &schema)
Definition: Schema.h:50
SWC::Ranger::CommitLog::Compact::Group::finalize
void finalize()
Definition: CommitLogCompact.cc:149
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Ranger::CommitLog::Fragments::range
RangePtr range
Definition: CommitLog.h:31
SWC::Ranger::CommitLog::Compact::ngroups
size_t ngroups
Definition: CommitLogCompact.h:64
SWC::Env::Rgr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: RangerEnv.h:114
SWC::Core::Atomic::fetch_sub
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
Definition: Atomic.h:88
SWC::DB::Cells::Interval
Definition: Interval.h:17
SWC::Ranger::CommitLog::Compact::Group::worker
const uint8_t worker
Definition: CommitLogCompact.h:21
SWC::Error::print
void print(std::ostream &out, int err)
Definition: Error.cc:191
SWC_FMT_LD
#define SWC_FMT_LD
Definition: Compat.h:99
SWC::Core::to_string
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Definition: BitFieldInt.h:263
SWC::Ranger::CommitLog::Fragment::make_write
static Ptr make_write(int &err, std::string &&filepath, DB::Cells::Interval &&interval, DB::Types::Encoder encoder, const uint32_t cell_revs, const uint32_t cells_count, DynamicBuffer &cells, StaticBuffer &buffer)
Definition: CommitLogFragment.cc:135
SWC::Ranger::CommitLog::Compact::finished
void finished(Group *group, size_t cells_count)
Definition: CommitLogCompact.cc:238
SWC::Ranger::CommitLog::Compact::Group::_loaded
void _loaded(const Fragment::Ptr &frag)
Definition: CommitLogCompact.cc:72