SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
CommitLogSplitter.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #ifndef swcdb_ranger_db_CommitLogSplitter_h
8 #define swcdb_ranger_db_CommitLogSplitter_h
9 
10 
11 namespace SWC { namespace Ranger { namespace CommitLog {
12 
13 
14 class Splitter final : private Fragment::LoadCallback {
15  public:
16 
17 
18  Splitter(const DB::Cell::Key& a_key, Fragments::Vec& fragments,
19  Fragments::Ptr a_log_left, Fragments::Ptr a_log_right)
20  : m_sem(4), m_fragments(fragments), m_splitting(),
21  key(a_key), log_left(a_log_left), log_right(a_log_right) {
22  }
23 
24  Splitter(const Splitter&) = delete;
25 
26  Splitter(const Splitter&&) = delete;
27 
28  Splitter& operator=(const Splitter&) = delete;
29 
30  ~Splitter() noexcept { }
31 
32  void run () {
34  "COMPACT-SPLIT commitlog START "
35  "from(" SWC_FMT_LU "/" SWC_FMT_LU ") to(" SWC_FMT_LU "/" SWC_FMT_LU
36  ") fragments=" SWC_FMT_LD,
37  log_left->range->cfg->cid, log_left->range->rid,
38  log_right->range->cfg->cid, log_right->range->rid,
39  int64_t(m_fragments.size())
40  );
41 
42  int err;
43  size_t skipped = 0;
44  size_t moved = 0;
45  size_t splitted = 0;
46  Fragment::Ptr frag;
47  for(auto it = m_fragments.cbegin(); it != m_fragments.cend();) {
48  frag = *it;
49  if(DB::KeySeq::compare(frag->interval.key_seq,
50  key, frag->interval.key_end) != Condition::GT) {
51  m_fragments.erase(it);
52  ++skipped;
53 
54  } else if(DB::KeySeq::compare(frag->interval.key_seq,
55  key, frag->interval.key_begin) == Condition::GT &&
56  log_right->take_ownership(err= Error::OK, frag)) {
57  log_left->remove(err, frag, false);
58  m_fragments.erase(it);
59  ++moved;
60 
61  } else {
62  frag->processing_increment();
63  m_sem.acquire();
64  frag->load(this);
65  ++splitted;
66  ++it;
67  }
68  }
69  m_sem.wait_all();
70 
72  "COMPACT-SPLIT commitlog FINISH "
73  "from(" SWC_FMT_LU "/" SWC_FMT_LU ") to(" SWC_FMT_LU "/" SWC_FMT_LU
74  ") skipped=" SWC_FMT_LU " moved=" SWC_FMT_LU " splitted=" SWC_FMT_LU,
75  log_left->range->cfg->cid, log_left->range->rid,
76  log_right->range->cfg->cid, log_right->range->rid,
77  skipped, moved, splitted
78  );
79  }
80 
81  void loaded(Fragment::Ptr&& frag) override {
82  int err;
83  if(!frag->loaded(err)) {
86  SWC_LOG_OSTREAM << "COMPACT-SPLIT fragment retrying to ", err);
87  frag->print(SWC_LOG_OSTREAM << ' ');
88  );
89  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
90  frag->load(this);
91 
92  } else if(m_splitting.push_and_is_1st(std::move(frag))) {
93  struct Task {
94  Splitter* ptr;
96  Task(Splitter* a_ptr) noexcept : ptr(a_ptr) { }
97  void operator()() { ptr->split(); }
98  };
99  Env::Rgr::post(Task(this));
100  }
101  }
102 
103  private:
104 
106  void split() {
107  int err;
108  bool more;
109  do {
110  m_splitting.front()->split(err, key, log_left, log_right);
111  more = m_splitting.pop_and_more();
112  m_sem.release();
113  } while(more);
114  }
115 
119 
123 };
124 
125 
126 }}} // namespace SWC::Ranger::CommitLog
127 
128 #endif // swcdb_ranger_db_CommitLogSplitter_h
SWC::Core::Vector::erase
SWC_CAN_INLINE iterator erase(size_type offset) noexcept(_NoExceptMoveAssign &&_NoExceptDestructor)
Definition: Vector.h:464
SWC::Ranger::CommitLog::Fragment::Ptr
std::shared_ptr< Fragment > Ptr
Definition: CommitLogFragment.h:31
SWC::Ranger::CommitLog::Splitter::split
SWC_CAN_INLINE void split()
Definition: CommitLogSplitter.h:106
SWC::Core::Semaphore
Definition: Semaphore.h:16
SWC::Core::QueueSafe::push_and_is_1st
SWC_CAN_INLINE bool push_and_is_1st(const ItemT &item)
Definition: QueueSafe.h:54
SWC::Ranger::CommitLog::Fragments::take_ownership
Fragment::Ptr take_ownership(int &err, Fragment::Ptr &frag)
Definition: CommitLog.cc:441
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Ranger::CommitLog::Splitter::key
const DB::Cell::Key key
Definition: CommitLogSplitter.h:120
SWC::Ranger::CommitLog::Splitter::m_fragments
Fragments::Vec & m_fragments
Definition: CommitLogSplitter.h:117
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Condition::GT
@ GT
Definition: Comparators.h:30
SWC::Core::Semaphore::acquire
void acquire()
Definition: Semaphore.cc:29
SWC::Ranger::CommitLog::Splitter::log_left
Fragments::Ptr log_left
Definition: CommitLogSplitter.h:121
SWC::DB::Cell::Key
Definition: CellKey.h:24
SWC::Ranger::CommitLog::Splitter
Definition: CommitLogSplitter.h:14
SWC::Ranger::CommitLog::Splitter::operator=
Splitter & operator=(const Splitter &)=delete
SWC::Ranger::CommitLog::Fragments
Definition: CommitLog.h:19
SWC::Core::QueueSafe::front
SWC_CAN_INLINE ItemT & front() noexcept
Definition: QueueSafe.h:72
SWC::Core::Semaphore::wait_all
void wait_all()
Definition: Semaphore.cc:62
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Core::QueueSafe::pop_and_more
SWC_CAN_INLINE bool pop_and_more()
Definition: QueueSafe.h:90
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::Ranger::CommitLog::Splitter::log_right
Fragments::Ptr log_right
Definition: CommitLogSplitter.h:122
SWC::Ranger::CommitLog::Splitter::~Splitter
~Splitter() noexcept
Definition: CommitLogSplitter.h:30
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Ranger::CommitLog::Fragments::remove
void remove(int &err, Vec &fragments_old)
Definition: CommitLog.cc:380
SWC::Ranger::CommitLog::Splitter::loaded
void loaded(Fragment::Ptr &&frag) override
Definition: CommitLogSplitter.h:81
SWC::Ranger::CommitLog::Splitter::Splitter
Splitter(const Splitter &&)=delete
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::Ranger::CommitLog::Splitter::Splitter
Splitter(const Splitter &)=delete
SWC::Core::Vector< Fragment::Ptr >
SWC::Ranger::CommitLog::Splitter::Splitter
Splitter(const DB::Cell::Key &a_key, Fragments::Vec &fragments, Fragments::Ptr a_log_left, Fragments::Ptr a_log_right)
Definition: CommitLogSplitter.h:18
SWC::Core::Vector::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::Ranger::CommitLog::Splitter::m_splitting
Core::QueueSafe< Fragment::Ptr > m_splitting
Definition: CommitLogSplitter.h:118
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Ranger::CommitLog::Fragments::range
RangePtr range
Definition: CommitLog.h:31
SWC::Core::Semaphore::release
void release()
Definition: Semaphore.cc:36
SWC::Env::Rgr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: RangerEnv.h:114
SWC::Error::print
void print(std::ostream &out, int err)
Definition: Error.cc:191
SWC_FMT_LD
#define SWC_FMT_LD
Definition: Compat.h:99
SWC::Core::Vector::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Core::QueueSafe< Fragment::Ptr >
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Ranger::CommitLog::Splitter::m_sem
Core::Semaphore m_sem
Definition: CommitLogSplitter.h:116
SWC::Ranger::CommitLog::Splitter::run
void run()
Definition: CommitLogSplitter.h:32
SWC::DB::KeySeq::compare
Condition::Comp compare(const Types::KeySeq seq, const Cell::Key &key, const Cell::Key &other) SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: KeyComparator.h:294
SWC::Ranger::CommitLog::Fragment::LoadCallback
Definition: CommitLogFragment.h:32