SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
RangeBlockLoader.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 
9 
10 namespace SWC { namespace Ranger {
11 
13  public:
14  LoadQueue() noexcept { }
15  LoadQueue(LoadQueue&&) = delete;
16  LoadQueue(const LoadQueue&) = delete;
18  LoadQueue& operator=(const LoadQueue&) = delete;
19  virtual ~LoadQueue() noexcept { }
20  virtual void load_cells(BlockLoader* loader) = 0;
21 };
22 
23 template<typename ItemT, int ErrorT>
24 class LoadQueueItem final : public BlockLoader::LoadQueue {
25  public:
26 
27  LoadQueueItem(ItemT&& a_ptr, size_t& a_counter) noexcept
28  : ptr(std::move(a_ptr)), counter(a_counter) {
29  }
30 
32  LoadQueueItem(const LoadQueueItem&) = delete;
35 
36  virtual ~LoadQueueItem() noexcept { }
37 
38  virtual void load_cells(BlockLoader* loader) override {
39  int err = Error::OK;
40  bool loaded = ptr->loaded(err);
41 
42  if(loaded)
43  ptr->load_cells(err, loader->block);
44 
45  if(err) {
46  if(!loader->error)
47  loader->error = ErrorT;
49  LOG_WARN,
50  ptr->print(SWC_LOG_OSTREAM << "BlockLoader error(" << err << ')');
51  );
52  if(!loaded)
53  ptr->processing_decrement();
54  }
55  ++counter;
56  }
57 
58  private:
59  ItemT ptr;
60  size_t& counter;
61 };
62 
63 
64 
67  : block(a_block),
68  count_cs_blocks(0), count_fragments(0),
69  q_req(),
70  error(Error::OK),
71  preload(block->blocks->range->cfg->log_fragment_preload()),
72  m_logs(0), m_cs_blks(1),
73  m_mutex(), m_cv(), m_queue(), m_f_selected(),
74  m_cs_selected(), m_cs_ready() {
75 }
76 
78 void BlockLoader::add(const ReqScan::Ptr& req) {
79  q_req.push(ReqQueue(req, Time::now_ns()));
80 }
81 
84  struct Task {
85  BlockLoader* ptr;
87  Task(BlockLoader* a_ptr) noexcept : ptr(a_ptr) { }
88  Task(Task&& other) noexcept : ptr(other.ptr) { }
89  Task(const Task&) = delete;
90  Task& operator=(Task&&) = delete;
91  Task& operator=(const Task&) = delete;
92  ~Task() noexcept { }
93  void operator()() { ptr->load_cells(); }
94  };
95  Env::Rgr::block_loader_post(Task(this));
96 
98 
100  --m_cs_blks;
101  m_queue.push(nullptr);
102  m_cv.notify_one();
103 }
104 
105 //CellStores
109  ++m_cs_blks;
110  m_cs_selected.push(blk);
111 }
112 
115  {
117  if(m_cs_selected.front() == blk) {
118  m_queue.push(
119  new LoadQueueItem<
122  >(std::move(blk), count_cs_blocks)
123  );
124  m_cs_selected.pop();
125  --m_cs_blks;
126  m_cv.notify_one();
127  if(m_cs_ready.empty()) {
128  return;
129  }
130  } else {
131  m_cs_ready.push_back(blk);
132  }
133  }
134  bool init = true;
137  if(init) {
138  it = m_cs_ready.begin();
139  init = false;
140  }
141  if(it == m_cs_ready.end())
142  break;
143  if(m_cs_selected.front() == *it) {
144  m_queue.push(
145  new LoadQueueItem<
148  >(std::move(*it), count_cs_blocks)
149  );
150  m_cs_selected.pop();
151  --m_cs_blks;
152  m_cv.notify_one();
153  m_cs_ready.erase(it);
154  it = m_cs_ready.begin();
155  } else {
156  ++it;
157  }
158  }
159 }
160 
162  LoadQueue* ptr(
163  new LoadQueueItem<
166  >(std::move(frag), count_fragments)
167  );
169  m_queue.push(ptr);
170  --m_logs;
171  m_cv.notify_one();
172 }
173 
176  for(LoadQueue* ptr; ;) {
177  size_t sz;
178  {
179  Core::UniqueLock lock_wait(m_mutex);
180  if(m_queue.empty())
181  m_cv.wait(lock_wait, [this] { return !m_queue.empty(); });
182  ptr = m_queue.front();
183  m_queue.pop();
184  sz = m_cs_blks ? preload : (m_queue.size() + m_logs);
185  }
186  bool is_final = false;
187  if(sz < preload) {
188  uint8_t vol = preload - sz;
189  if((is_final = vol == preload) && ptr) {
190  ptr->load_cells(this);
191  delete ptr;
192  ptr = nullptr;
193  }
194  size_t offset = m_f_selected.size();
195  block->blocks->commitlog.load_cells(this, is_final, m_f_selected, vol);
196  if((sz = m_f_selected.size() - offset)) {
197  {
199  m_logs += sz;
200  }
201  for(auto it=m_f_selected.cbegin() + offset;
202  it != m_f_selected.cend(); ++it) {
203  (*it)->load(this);
204  }
205  } else if(is_final) {
206  SWC_LOG_OUT(LOG_DEBUG, print(SWC_LOG_OSTREAM << "Loaded "););
207  block->loader_loaded();
208  return;
209  }
210  }
211  if(ptr) {
212  ptr->load_cells(this);
213  delete ptr;
214  ptr = nullptr;
215  }
216  }
217 }
218 
219 
220 }}
SWC::Ranger::CommitLog::Fragments::load_cells
void load_cells(BlockLoader *loader, bool &is_final, Vec &frags, uint8_t vol)
Definition: CommitLog.cc:337
SWC::Ranger::BlockLoader::preload
const uint8_t preload
Definition: RangeBlockLoader.h:42
SWC::Ranger::CommitLog::Fragment::Ptr
std::shared_ptr< Fragment > Ptr
Definition: CommitLogFragment.h:31
SWC::Ranger::BlockLoader::LoadQueue::load_cells
virtual void load_cells(BlockLoader *loader)=0
RangeBlockLoader.h
SWC::Ranger::Block::blocks
Blocks * blocks
Definition: RangeBlock.h:41
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Ranger::BlockLoader::count_fragments
size_t count_fragments
Definition: RangeBlockLoader.h:24
SWC::Ranger::LoadQueueItem::counter
size_t & counter
Definition: RangeBlockLoader.cc:60
SWC::Ranger::Block
Definition: RangeBlock.h:23
SWC::Ranger::BlockLoader::m_f_selected
CommitLog::Fragments::Vec m_f_selected
Definition: RangeBlockLoader.h:89
SWC::Core::UniqueLock
Definition: MutexLock.h:68
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Time::now_ns
SWC_CAN_INLINE int64_t now_ns() noexcept
Definition: Time.h:43
SWC::Ranger::LoadQueueItem
Definition: RangeBlockLoader.cc:24
SWC::Core::Vector::iterator
value_type * iterator
Definition: Vector.h:50
SWC::Core::ScopedLock
Definition: MutexLock.h:41
SWC::Ranger::BlockLoader::BlockLoader
BlockLoader(Block::Ptr block)
Definition: RangeBlockLoader.cc:66
SWC::Ranger::BlockLoader::q_req
std::queue< ReqQueue > q_req
Definition: RangeBlockLoader.h:40
SWC::Ranger::BlockLoader::print
void print(std::ostream &out)
Definition: RangeBlockLoader.h:65
SWC::Ranger::LoadQueueItem::operator=
LoadQueueItem & operator=(const LoadQueueItem &)=delete
SWC::Ranger::BlockLoader::add
void add(const ReqScan::Ptr &req)
Definition: RangeBlockLoader.cc:78
SWC::Error::RANGE_COMMITLOG
@ RANGE_COMMITLOG
Definition: Error.h:118
SWC::Ranger::BlockLoader::loaded
void loaded(CellStore::Block::Read::Ptr &&blk)
Definition: RangeBlockLoader.cc:114
SWC::Error::RANGE_CELLSTORES
@ RANGE_CELLSTORES
Definition: Error.h:117
SWC::Ranger::Blocks::commitlog
CommitLog::Fragments commitlog
Definition: RangeBlocks.h:30
SWC::Ranger::LoadQueueItem::LoadQueueItem
LoadQueueItem(ItemT &&a_ptr, size_t &a_counter) noexcept
Definition: RangeBlockLoader.cc:27
SWC::Ranger::BlockLoader::m_cv
std::condition_variable m_cv
Definition: RangeBlockLoader.h:87
SWC::Ranger::BlockLoader::LoadQueue::LoadQueue
LoadQueue(const LoadQueue &)=delete
SWC::Env::Rgr::block_loader_post
static SWC_CAN_INLINE void block_loader_post(T_Handler &&handler)
Definition: RangerEnv.h:126
SWC::Ranger::BlockLoader::m_cs_blks
size_t m_cs_blks
Definition: RangeBlockLoader.h:84
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Ranger::BlockLoader::error
int error
Definition: RangeBlockLoader.h:41
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::Ranger::BlockLoader::load_cells
void load_cells()
Definition: RangeBlockLoader.cc:175
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Ranger::LoadQueueItem::~LoadQueueItem
virtual ~LoadQueueItem() noexcept
Definition: RangeBlockLoader.cc:36
SWC::Ranger::BlockLoader::operator=
BlockLoader & operator=(BlockLoader &&)=delete
SWC::Ranger::BlockLoader::LoadQueue::LoadQueue
LoadQueue() noexcept
Definition: RangeBlockLoader.cc:14
SWC::Ranger::LoadQueueItem::ptr
ItemT ptr
Definition: RangeBlockLoader.cc:59
SWC::Ranger::BlockLoader
Definition: RangeBlockLoader.h:19
SWC::Ranger::CellStore::Block::Read
Definition: CellStoreBlock.h:28
SWC::Ranger::BlockLoader::m_logs
uint8_t m_logs
Definition: RangeBlockLoader.h:83
SWC::Ranger::BlockLoader::block
Block::Ptr block
Definition: RangeBlockLoader.h:22
SWC::Ranger::BlockLoader::LoadQueue
Definition: RangeBlockLoader.cc:12
SWC::Ranger::BlockLoader::LoadQueue::LoadQueue
LoadQueue(LoadQueue &&)=delete
SWC::Ranger::BlockLoader::LoadQueue::operator=
LoadQueue & operator=(LoadQueue &&)=delete
SWC::Ranger::Blocks::cellstores
CellStore::Readers cellstores
Definition: RangeBlocks.h:31
SWC::Ranger::BlockLoader::ReqQueue
Definition: RangeBlockLoader.h:26
SWC::Ranger::Block::loader_loaded
void loader_loaded()
Definition: RangeBlock.cc:304
SWC::Ranger::LoadQueueItem::LoadQueueItem
LoadQueueItem(const LoadQueueItem &)=delete
SWC::Ranger::BlockLoader::m_mutex
std::mutex m_mutex
Definition: RangeBlockLoader.h:86
SWC::Ranger::BlockLoader::count_cs_blocks
size_t count_cs_blocks
Definition: RangeBlockLoader.h:23
SWC::Ranger::BlockLoader::m_queue
std::queue< LoadQueue * > m_queue
Definition: RangeBlockLoader.h:88
SWC::Core::Vector::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::Ranger::BlockLoader::m_cs_ready
Core::Vector< CellStore::Block::Read::Ptr > m_cs_ready
Definition: RangeBlockLoader.h:91
SWC::Ranger::LoadQueueItem::load_cells
virtual void load_cells(BlockLoader *loader) override
Definition: RangeBlockLoader.cc:38
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Ranger::CellStore::Readers::load_cells
void load_cells(BlockLoader *loader)
Definition: CellStoreReaders.cc:100
SWC::Ranger::BlockLoader::LoadQueue::~LoadQueue
virtual ~LoadQueue() noexcept
Definition: RangeBlockLoader.cc:19
SWC::Ranger::LoadQueueItem::LoadQueueItem
LoadQueueItem(LoadQueueItem &&)=delete
SWC::Ranger::BlockLoader::LoadQueue::operator=
LoadQueue & operator=(const LoadQueue &)=delete
SWC::Core::Vector::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Ranger::BlockLoader::run
void run()
Definition: RangeBlockLoader.cc:83
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Ranger::BlockLoader::m_cs_selected
std::queue< CellStore::Block::Read::Ptr > m_cs_selected
Definition: RangeBlockLoader.h:90
SWC::Ranger::LoadQueueItem::operator=
LoadQueueItem & operator=(LoadQueueItem &&)=delete
SWC::Ranger::ReqScan::Ptr
std::shared_ptr< ReqScan > Ptr
Definition: ReqScan.h:30