|
SWC-DB
v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
|
Go to the documentation of this file.
12 namespace SWC {
namespace Ranger {
namespace CommitLog {
19 : range(nullptr), stopping(false),
20 m_mutex_cells(), m_cells(key_seq),
26 m_sem(5), m_last_id(0),
27 m_releasable_bytes(0), m_modification_ts(0) {
44 range->cfg->block_cells() * 2,
45 range->cfg->cell_versions(),
46 range->cfg->cell_ttl(),
47 range->cfg->column_type(),
76 Task(
Fragments* a_ptr) noexcept : ptr(a_ptr) { }
77 void operator()() { ptr->
_commit(
false); }
94 m_cv.wait(lock_wait, [
this] {
103 size_t released_bytes = 0;
113 block_size =
range->cfg->block_size();
127 block_size,
range->cfg->block_cells());
137 range->cfg->block_enc(),
range->cfg->cell_versions(),
146 <<
range->cfg->cid <<
'/' <<
range->rid <<
' ', err);
156 range->cfg->file_replication(),
157 std::move(buff_write),
164 released_bytes += tmp - releasable;
182 return released_bytes;
193 it !=
cend(); ++it) {
195 (*it)->interval.key_begin, frag->interval.key_begin)
225 if(at && !(at=
range->compact_possible(
true)))
237 uint8_t cointervaling =
range->cfg->log_compact_cointervaling();
247 range->compacting(need / groups.
size() >
range->cfg->log_rollout_ratio()
251 new Compact(
this, tnum, groups, cointervaling);
257 range->compact_require(
true);
282 s.reserve(s.length() + 6 + tmp.length());
291 s.reserve(s.length() + 1 + frag.length());
309 for(
auto& entry : fragments) {
324 for(
auto& frag : *
this)
325 intval.
expand(frag->interval);
330 for(
auto& frag : *
this) {
331 intval.
expand(frag->interval);
332 intval.
align(frag->interval);
342 if(is_final && (is_final = base == vol)) {
351 for(
auto& frag : *
this) {
352 if(std::find(frags.
cbegin(), frags.
cend(), frag) == frags.
cend() &&
354 frag->processing_increment();
369 if(
m_mutex.try_lock_shared()) {
370 for(
auto& frag : *
this) {
371 released += frag->release();
372 if(released >= bytes)
384 _remove(err, fragments_old, &sem);
391 for(
auto& frag : fragments_old) {
393 frag->remove(err, semp);
415 m_cv.wait(lock_wait, [
this] {
420 for(
auto& frag : *
this)
421 frag->mark_removed();
432 m_cv.wait(lock_wait, [
this] {
444 err, take_frag->get_filepath(), smartfd->filepath());
453 err =
Error::OK, smartfd->filepath(), take_frag->get_filepath());
464 fs_if->rename(err, (*it)->get_filepath(), smartfd->filepath());
473 fs_if->remove(err, smartfd->filepath());
480 for(
auto& frag : tmp_frags) {
486 for(
auto& frag : tmp_frags)
507 for(
auto& frag : *
this)
508 count += frag->cells_count;
535 for(
auto& frag : *
this)
536 size += frag->size_bytes_encoded();
542 if(!(busy = !
m_mutex.try_lock())) {
561 <<
" Fragments::next_id was the same id=" << new_id; );
570 out <<
"CommitLog(count=" << count
580 for(
auto& frag : *
this){
601 auto cfg_cells =
range->cfg->block_cells();
602 auto cfg_bytes =
range->cfg->block_size();
603 if(cells < cfg_cells && bytes < cfg_bytes)
605 auto cfg_ratio =
range->cfg->log_rollout_ratio();
606 return bytes >= cfg_bytes * cfg_ratio ||
607 cells >= cfg_cells * cfg_ratio ||
619 if(without.
empty()) {
625 for(
auto& frag : *
this) {
626 if(std::find(without.
cbegin(), without.
cend(), frag) == without.
cend())
629 if(fragments.
size() < vol)
637 for(
auto it = fragments.
cbegin() + 1; it != fragments.
cend(); ++it) {
638 auto const& last = *groups.
back().back();
642 curt->cells_count <
range->cfg->block_cells() &&
643 curt->size_bytes() <
range->cfg->block_size() &&
644 last.cells_count <
range->cfg->block_cells() &&
645 last.size_bytes() <
range->cfg->block_size() ) ) ) {
646 if(groups.
back().size() < vol) {
647 need -= groups.
back().size();
648 groups.
back().clear();
653 groups.
back().push_back(curt);
656 for(
auto it=groups.
cbegin(); it != groups.
cend(); ) {
657 if(it->size() < vol) {
668 size_t ok =
range->cfg->cellstore_size();
670 ok *=
range->cfg->compact_percent();
673 range->blocks.cellstores.blocks_count() < ok/
range->cfg->block_size();
676 for(
auto& frag : *
this) {
677 if((need = (sz_bytes += frag->size_bytes_encoded()) > ok))
688 for(
auto& frag : *
this)
689 if(frag->processing())
700 for(
auto& frag : *
this)
701 size += frag->size_bytes(only_loaded);
717 offset += step >>= 1;
SWC_CAN_INLINE iterator erase(size_type offset) noexcept(_NoExceptMoveAssign &&_NoExceptDestructor)
size_t _narrow(const DB::Cell::Key &key) const
void load_cells(BlockLoader *loader, bool &is_final, Vec &frags, uint8_t vol)
constexpr SWC_CAN_INLINE reference front() noexcept
std::shared_ptr< Fragment > Ptr
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
static SWC_CAN_INLINE System::Resources & res() noexcept
constexpr SWC_CAN_INLINE void stop() noexcept
Fragment::Ptr take_ownership(int &err, Fragment::Ptr &frag)
Fragments(const DB::Types::KeySeq key_seq)
const Types::KeySeq key_seq
static void compaction_schedule(uint32_t ms)
void _load_cells(BlockLoader *loader, Vec &frags, uint8_t &vol)
SWC_SHOULD_NOT_INLINE int run()
std::shared_mutex m_mutex
SWC_CAN_INLINE void clear() noexcept(_NoExceptDestructor)
#define SWC_LOG_OUT(pr, _code_)
SWC_CAN_INLINE int64_t now_ns() noexcept
void configure(uint32_t split, const uint32_t revs, const uint64_t ttl_ns, const Types::Column typ, bool finalized) noexcept
SWC_CAN_INLINE bool align(const Interval &other)
Core::StateRunning m_commit
std::shared_mutex m_mutex_cells
static Ptr make_read(int &err, std::string &&filepath, const DB::Types::KeySeq key_seq)
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
SWC_CAN_INLINE void more_mem_future(size_t sz) noexcept
static SWC_CAN_INLINE Ptr make_ptr(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
void print(std::ostream &out, bool minimal)
void expand(const Interval &other)
bool processing() noexcept
Core::AtomicBool stopping
static const uint8_t MAX_FRAGMENTS_NARROW
bool _processing() const noexcept
bool _need_roll() const noexcept
void expand(DB::Cells::Interval &intval)
size_t _need_compact(CompactGroups &groups, const Vec &without, size_t vol)
size_t _commit(bool finalize)
SWC_CAN_INLINE size_t size() const noexcept
void write_and_free(DynamicBuffer &cells, uint32_t &cell_count, Interval &intval, uint32_t threshold, uint32_t max_cells)
SWC_CAN_INLINE bool empty() const noexcept
static const uint8_t COMPACT_PREPARING
constexpr SWC_CAN_INLINE bool empty() const noexcept
constexpr SWC_CAN_INLINE void store(T v) noexcept
constexpr SWC_CAN_INLINE bool empty() const noexcept
std::shared_ptr< Range > RangePtr
SWC_CAN_INLINE void less_mem_releasable(size_t sz) noexcept
constexpr SWC_CAN_INLINE reference back() noexcept
static bool log_compact_possible() noexcept
The SWC-DB C++ namespace 'SWC'.
size_t size_bytes(bool only_loaded=false)
Core::Atomic< size_t > m_releasable_bytes
void finish_compact(const Compact *compact)
SWC_CAN_INLINE size_t need_ram() const noexcept
static void log_compact_finished() noexcept
SWC_CAN_INLINE void adj_mem_releasable(ssize_t sz) noexcept
const uint32_t repetition
void _remove(int &err, Vec &fragments_old, Core::Semaphore *semp)
void add_raw(const Cell &cell, bool finalized)
static constexpr const char LOG_DIR[]
constexpr SWC_CAN_INLINE bool running() noexcept
Core::Atomic< size_t > m_modification_ts
SWC_CAN_INLINE void less_mem_future(size_t sz) noexcept
static const uint8_t COMPACT_NONE
constexpr SWC_CAN_INLINE T exchange(T value) noexcept
SWC_CAN_INLINE size_t size_bytes() const noexcept
static const uint8_t COMPACT_COMPACTING
constexpr SWC_CAN_INLINE size_t fill() const noexcept
size_t size_bytes_encoded()
void add(const DB::Cells::Cell &cell)
bool _need_compact_major()
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
bool is_consist(const DB::Cells::Interval &intval) const
void expand_and_align(DB::Cells::Interval &intval)
size_t need_compact(CompactGroups &groups, const Vec &without, size_t vol)
std::string get_log_fragment(const int64_t frag) const
DB::Cells::MutableVec m_cells
std::condition_variable_any m_cv
bool try_compact(uint32_t tnum=1)
SWC_CAN_INLINE size_t size_of_internal() const noexcept
SWC_CAN_INLINE void assign(IteratorT first, IteratorT last)
static SWC_CAN_INLINE void post(T_Handler &&handler)
void init(const RangePtr &for_range)
bool is_compacting() const
SWC_CAN_INLINE void push_back(ArgsT &&... args)
size_t cells_count(bool only_current=false)
void print(std::ostream &out, int err)
size_t _size_bytes(bool only_loaded=false)
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Core::AtomicBool m_compacting
SWC_CAN_INLINE void _add(const DB::Cells::Cell &cell, size_t *offset_itp, size_t *offsetp)
static Ptr make_write(int &err, std::string &&filepath, DB::Cells::Interval &&interval, DB::Types::Encoder encoder, const uint32_t cell_revs, const uint32_t cells_count, DynamicBuffer &cells, StaticBuffer &buffer)
constexpr SWC_CAN_INLINE size_type size() const noexcept
void load_final(const DB::Cells::MutableVec &cells)
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
size_t release(size_t bytes)
SWC_CAN_INLINE iterator insert(size_type offset, ArgsT &&... args)
SWC_CAN_INLINE void reserve(size_type cap)
Condition::Comp compare(const Types::KeySeq seq, const Cell::Key &key, const Cell::Key &other) SWC_ATTRIBS((SWC_ATTRIB_O3))