SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
RangerEnv.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #ifndef swcdb_ranger_RangerEnv_h
8 #define swcdb_ranger_RangerEnv_h
9 
10 
12 #include "swcdb/fs/Interface.h"
14 
20 
21 
22 namespace SWC {
23 
24 
25 namespace Ranger {
26 class Compaction;
27 class Columns;
28 }
29 
30 
31 namespace Env {
32 
33 class Rgr final {
34  public:
35 
36  static void init() {
37  SWC_ASSERT(!m_env);
38  m_env.reset(new Rgr());
39  }
40 
41  static void start();
42 
43  static void shuttingdown();
44 
45  static void wait_if_in_process();
46 
48  static DB::RgrData* rgr_data() noexcept {
49  return &m_env->m_rgr_data;
50  }
51 
53  static bool is_not_accepting() noexcept {
54  return m_env->m_not_accepting;
55  }
56 
58  static bool is_shuttingdown() noexcept {
59  return m_env->m_shuttingdown;
60  }
61 
63  static int64_t in_process() noexcept {
64  return m_env->m_in_process;
65  }
66 
68  static void in_process(int64_t count) noexcept {
69  m_env->m_in_process.fetch_add(count);
70  }
71 
73  static int64_t in_process_ranges() noexcept {
74  return m_env->m_in_process_ranges;
75  }
76 
78  static void in_process_ranges(int64_t count) noexcept {
79  m_env->m_in_process_ranges.fetch_add(count);
80  }
81 
83  static size_t scan_reserved_bytes() noexcept {
84  return m_env->m_scan_reserved_bytes;
85  }
86 
88  static void scan_reserved_bytes_add(uint32_t bytes) noexcept {
89  m_env->m_scan_reserved_bytes.fetch_add(bytes);
90  }
91 
93  static void scan_reserved_bytes_sub(uint32_t bytes) noexcept {
94  m_env->m_scan_reserved_bytes.fetch_sub(bytes);
95  }
96 
98  static Rgr* get() noexcept {
99  return m_env.get();
100  }
101 
103  static Comm::IoContextPtr maintenance_io() noexcept {
104  return m_env->mnt_io;
105  }
106 
108  static Comm::IoContextPtr io() noexcept {
109  return m_env->app_io;
110  }
111 
112  template <typename T_Handler>
114  static void post(T_Handler&& handler) {
115  m_env->app_io->post(std::move(handler));
116  }
117 
118  template <typename T_Handler>
120  static void maintenance_post(T_Handler&& handler) {
121  m_env->mnt_io->post(std::move(handler));
122  }
123 
124  template <typename T_Handler>
126  static void block_loader_post(T_Handler&& handler) {
127  m_env->loader_io->post(std::move(handler));
128  }
129 
131  static System::Resources& res() noexcept {
132  return m_env->_resources;
133  }
134 
135  static bool log_compact_possible() noexcept;
136 
137  static void log_compact_finished() noexcept;
138 
139  static bool compaction_available() noexcept;
140 
141  static void compaction_schedule(uint32_t ms);
142 
144  static Ranger::Columns* columns() noexcept {
145  return m_env->_columns;
146  }
147 
150  return m_env->_update_hdlr.get();
151  }
152 
155  return m_env->_reporting;
156  }
157 
158  static void reset() noexcept {
159  m_env = nullptr;
160  }
161 
163 
166 
170 
173 
177 
181 
187 
188  explicit Rgr();
189 
190  Rgr(const Rgr&) = delete;
191  Rgr(Rgr&&) = delete;
192  Rgr& operator=(const Rgr&) = delete;
193  Rgr& operator=(Rgr&&) = delete;
194 
195  ~Rgr() noexcept;
196 
197  private:
198  inline static std::shared_ptr<Rgr> m_env = nullptr;
199 
200  DB::RgrData m_rgr_data;
203  Core::Atomic<int64_t> m_in_process;
204  Core::Atomic<int64_t> m_in_process_ranges;
205  Core::Atomic<size_t> m_scan_reserved_bytes;
206 
207 };
208 
209 
210 } // namespace Env
211 
212 } // namespace SWC
213 
214 
215 #include "swcdb/ranger/db/Columns.h"
217 
218 
219 
220 namespace SWC { namespace Env {
221 
224  SWC::Env::Config::settings()
225  ->get<SWC::Config::Property::Value_uint8_g>(
226  "swc.rgr.Range.req.update.concurrency")),
227  cfg_cs_max(
228  SWC::Env::Config::settings()
229  ->get<SWC::Config::Property::Value_uint8_g>(
230  "swc.rgr.Range.CellStore.count.max")),
231  cfg_cs_sz(
232  SWC::Env::Config::settings()
233  ->get<SWC::Config::Property::Value_int32_g>(
234  "swc.rgr.Range.CellStore.size.max")),
236  SWC::Env::Config::settings()
237  ->get<SWC::Config::Property::Value_uint8_g>(
238  "swc.rgr.Range.CommitLog.rollout.ratio")),
240  SWC::Env::Config::settings()
241  ->get<SWC::Config::Property::Value_uint8_g>(
242  "swc.rgr.Range.CommitLog.Compact.cointervaling")),
244  SWC::Env::Config::settings()
245  ->get<SWC::Config::Property::Value_uint8_g>(
246  "swc.rgr.Range.CommitLog.Fragment.preload")),
248  SWC::Env::Config::settings()
249  ->get<SWC::Config::Property::Value_uint8_g>(
250  "swc.rgr.Range.compaction.percent")),
252  SWC::Env::Config::settings()
253  ->get<SWC::Config::Property::Value_uint8_g>(
254  "swc.rgr.Range.CellStore.replication")),
255  cfg_blk_size(
256  SWC::Env::Config::settings()
257  ->get<SWC::Config::Property::Value_int32_g>(
258  "swc.rgr.Range.block.size")),
260  SWC::Env::Config::settings()
261  ->get<SWC::Config::Property::Value_int32_g>(
262  "swc.rgr.Range.block.cells")),
263  cfg_blk_enc(
264  SWC::Env::Config::settings()
265  ->get<SWC::Config::Property::Value_enum_g>(
266  "swc.rgr.Range.block.encoding")),
267  app_io(
268  Comm::IoContext::make(
269  "Ranger",
270  SWC::Env::Config::settings()->get_bool(
271  "swc.rgr.concurrency.relative"),
272  SWC::Env::Config::settings()->get_i32(
273  "swc.rgr.handlers")
274  )
275  ),
276  loader_io(
277  Comm::IoContext::make(
278  "Loader",
279  SWC::Env::Config::settings()->get_bool(
280  "swc.rgr.concurrency.relative"),
281  SWC::Env::Config::settings()->get_i32(
282  "swc.rgr.loader.handlers")
283  )
284  ),
285  mnt_io(
286  Comm::IoContext::make(
287  "Maintenance",
288  SWC::Env::Config::settings()->get_bool(
289  "swc.rgr.concurrency.relative"),
290  SWC::Env::Config::settings()->get_i32(
291  "swc.rgr.maintenance.handlers")
292  )
293  ),
294  _compaction(nullptr),
295  _columns(new Ranger::Columns()),
296  _update_hdlr(
297  client::Query::Update::Handlers::Common::make(
298  Env::Clients::get(), nullptr, app_io)),
299  _reporting(
300  SWC::Env::Config::settings()->get_bool("swc.rgr.metrics.enabled")
301  ? new Ranger::Metric::Reporting(app_io)
302  : nullptr
303  ),
304  _resources(
305  app_io,
306  SWC::Env::Config::settings()
307  ->get<SWC::Config::Property::Value_int32_g>(
308  "swc.rgr.ram.allowed.percent"),
309  SWC::Env::Config::settings()
310  ->get<SWC::Config::Property::Value_int32_g>(
311  "swc.rgr.ram.reserved.percent"),
312  SWC::Env::Config::settings()
313  ->get<SWC::Config::Property::Value_int32_g>(
314  "swc.rgr.ram.release.rate"),
315  _reporting ? &_reporting->system : nullptr,
316  [this](size_t bytes) { return _columns->release(bytes); }
317  ),
318  m_rgr_data(),
319  m_shuttingdown(false), m_not_accepting(false),
320  m_in_process(0), m_in_process_ranges(0),
321  m_scan_reserved_bytes(0) {
322 }
323 
324 Rgr::~Rgr() noexcept {
325  delete _compaction;
326  delete _columns;
327 }
328 
329 void Rgr::start() {
330  m_env->_compaction = new Ranger::Compaction();
331  m_env->_compaction->schedule(30000);
332 }
333 
335  m_env->m_not_accepting.store(true);
336 
337  if(m_env->_reporting)
338  m_env->_reporting->stop();
339 
340  if(m_env->_compaction)
341  m_env->_compaction->stop();
342  m_env->mnt_io->stop();
343  m_env->loader_io->stop();
344 
345  m_env->_update_hdlr->commit_if_need();
346  m_env->_update_hdlr->wait();
347 
348  m_env->_columns->unload_all(false);
349 
351 
352  m_env->m_shuttingdown.store(true);
353 
354  m_env->_resources.stop();
355 }
356 
358  size_t n = 0;
359  while(in_process() || in_process_ranges()) {
360  std::this_thread::sleep_for(std::chrono::milliseconds(200));
361  m_env->_columns->unload_all(true); //re-check
362  if(!(++n % 10))
364  "In-process=" SWC_FMT_LD " ranges=" SWC_FMT_LD " check=" SWC_FMT_LU,
365  in_process(), in_process_ranges(), n);
366  }
367 }
368 
370 bool Rgr::log_compact_possible() noexcept {
371  return m_env->_compaction->log_compact_possible();
372 }
373 
375 void Rgr::log_compact_finished() noexcept {
376  return m_env->_compaction->log_compact_finished();
377 }
378 
380 bool Rgr::compaction_available() noexcept {
381  return m_env->_compaction->available();
382 }
383 
385 void Rgr::compaction_schedule(uint32_t ms) {
386  if(m_env && m_env->_compaction)
387  m_env->_compaction->schedule(ms);
388 }
389 
390 
391 }} // namespace SWC::Env
392 
393 
395 
396 
397 #endif // swcdb_ranger_RangerEnv_h
SWC::Ranger::Columns
Definition: Columns.h:34
SWC::Env::Rgr::m_env
static std::shared_ptr< Rgr > m_env
Definition: RangerEnv.h:198
SWC::System::Resources
Definition: Resources.h:50
SWC::Env::Rgr::res
static SWC_CAN_INLINE System::Resources & res() noexcept
Definition: RangerEnv.h:131
SWC::Env::Rgr::compaction_schedule
static void compaction_schedule(uint32_t ms)
Definition: RangerEnv.h:385
SWC::Env::Rgr::cfg_compact_percent
const SWC::Config::Property::Value_uint8_g::Ptr cfg_compact_percent
Definition: RangerEnv.h:171
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Env::Rgr::maintenance_post
static SWC_CAN_INLINE void maintenance_post(T_Handler &&handler)
Definition: RangerEnv.h:120
SWC::Env::Rgr::m_in_process_ranges
Core::Atomic< int64_t > m_in_process_ranges
Definition: RangerEnv.h:204
SWC::Env::Rgr::scan_reserved_bytes_add
static SWC_CAN_INLINE void scan_reserved_bytes_add(uint32_t bytes) noexcept
Definition: RangerEnv.h:88
SWC::Env::Rgr::rgr_data
static SWC_CAN_INLINE DB::RgrData * rgr_data() noexcept
Definition: RangerEnv.h:48
SWC::Env::Rgr::in_process_ranges
static SWC_CAN_INLINE int64_t in_process_ranges() noexcept
Definition: RangerEnv.h:73
SWC::Env::Rgr::columns
static SWC_CAN_INLINE Ranger::Columns * columns() noexcept
Definition: RangerEnv.h:144
SWC::Env::Rgr::in_process_ranges
static SWC_CAN_INLINE void in_process_ranges(int64_t count) noexcept
Definition: RangerEnv.h:78
SWC::Env::Rgr::cfg_log_compact_cointervaling
const SWC::Config::Property::Value_uint8_g::Ptr cfg_log_compact_cointervaling
Definition: RangerEnv.h:168
SWC::Comm::IoContextPtr
std::shared_ptr< IoContext > IoContextPtr
Definition: IoContext.h:16
SWC::Env::Rgr::cfg_req_add_concurrency
const SWC::Config::Property::Value_uint8_g::Ptr cfg_req_add_concurrency
Definition: RangerEnv.h:162
SWC::Env::Rgr::cfg_log_rollout_ratio
const SWC::Config::Property::Value_uint8_g::Ptr cfg_log_rollout_ratio
Definition: RangerEnv.h:167
SWC::Env::Rgr::init
static void init()
Definition: RangerEnv.h:36
SWC::Env::Config
Definition: Settings.h:103
SWC::Env::Rgr::m_in_process
Core::Atomic< int64_t > m_in_process
Definition: RangerEnv.h:203
SWC::Env::Rgr::_resources
System::Resources _resources
Definition: RangerEnv.h:186
SWC::Env::Rgr::m_scan_reserved_bytes
Core::Atomic< size_t > m_scan_reserved_bytes
Definition: RangerEnv.h:205
SWC::Env::Rgr::cfg_cs_sz
const SWC::Config::Property::Value_int32_g::Ptr cfg_cs_sz
Definition: RangerEnv.h:165
SWC::Env::Rgr::_compaction
Ranger::Compaction * _compaction
Definition: RangerEnv.h:182
SWC::client::Query::Update::Handlers::Common
Definition: Common.h:18
SWC::Env::Rgr::start
static void start()
Definition: RangerEnv.h:329
SWC::Env::Rgr::block_loader_post
static SWC_CAN_INLINE void block_loader_post(T_Handler &&handler)
Definition: RangerEnv.h:126
MetricsReporting.cc
SWC::Env::Rgr::m_shuttingdown
Core::AtomicBool m_shuttingdown
Definition: RangerEnv.h:201
SWC::Env::Rgr
Definition: RangerEnv.h:33
Scanner.h
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
Compaction.h
SWC::Env::Rgr::Rgr
Rgr(Rgr &&)=delete
SWC::Env::Rgr::log_compact_possible
static bool log_compact_possible() noexcept
Definition: RangerEnv.h:370
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Env::Rgr::updater
static SWC_CAN_INLINE client::Query::Update::Handlers::Common * updater() noexcept
Definition: RangerEnv.h:149
SWC::Env::Rgr::mnt_io
Comm::IoContextPtr mnt_io
Definition: RangerEnv.h:180
SWC::Env::Rgr::cfg_blk_enc
const SWC::Config::Property::Value_enum_g::Ptr cfg_blk_enc
Definition: RangerEnv.h:176
SWC::Env::Rgr::maintenance_io
static SWC_CAN_INLINE Comm::IoContextPtr maintenance_io() noexcept
Definition: RangerEnv.h:103
SWC::Env::Rgr::in_process
static SWC_CAN_INLINE int64_t in_process() noexcept
Definition: RangerEnv.h:63
SWC::Env::Rgr::log_compact_finished
static void log_compact_finished() noexcept
Definition: RangerEnv.h:375
SWC::Env::Rgr::cfg_cs_max
const SWC::Config::Property::Value_uint8_g::Ptr cfg_cs_max
Definition: RangerEnv.h:164
SWC::Env::Rgr::_columns
Ranger::Columns * _columns
Definition: RangerEnv.h:183
SWC::Env::Rgr::~Rgr
~Rgr() noexcept
Definition: RangerEnv.h:324
SWC::Env::Rgr::Rgr
Rgr()
Definition: RangerEnv.h:222
SWC::Env::Rgr::_reporting
Ranger::Metric::Reporting::Ptr _reporting
Definition: RangerEnv.h:185
Commands.h
SWC::DB::RgrData
Definition: RgrData.h:24
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::Config::Property::Value_uint8_g
Definition: Property.h:511
SWC::Ranger::Metric::Reporting::Ptr
std::shared_ptr< Reporting > Ptr
Definition: MetricsReporting.h:26
SWC::Env::Rgr::app_io
Comm::IoContextPtr app_io
Definition: RangerEnv.h:178
RgrData.h
SWC::Env::Rgr::compaction_available
static bool compaction_available() noexcept
Definition: RangerEnv.h:380
SWC::Config::Property::Value_enum_g
Definition: Property.h:662
SWC::client::Query::Update::Handlers::Common::Ptr
std::shared_ptr< Common > Ptr
Definition: Common.h:20
SWC::Env::Rgr::cfg_log_fragment_preload
const SWC::Config::Property::Value_uint8_g::Ptr cfg_log_fragment_preload
Definition: RangerEnv.h:169
SWC::Env::Rgr::operator=
Rgr & operator=(const Rgr &)=delete
SWC::Env::Rgr::cfg_blk_size
const SWC::Config::Property::Value_int32_g::Ptr cfg_blk_size
Definition: RangerEnv.h:174
SWC::Env::Rgr::cfg_blk_cells
const SWC::Config::Property::Value_int32_g::Ptr cfg_blk_cells
Definition: RangerEnv.h:175
MetricsReporting.h
SWC::Env::Rgr::metrics_track
static SWC_CAN_INLINE Ranger::Metric::Reporting::Ptr & metrics_track() noexcept
Definition: RangerEnv.h:154
Committer.h
SWC::Env::Rgr::in_process
static SWC_CAN_INLINE void in_process(int64_t count) noexcept
Definition: RangerEnv.h:68
SWC::Env::Clients
Definition: Clients.h:293
SWC::Env::Rgr::reset
static void reset() noexcept
Definition: RangerEnv.h:158
SWC::Env::Rgr::loader_io
Comm::IoContextPtr loader_io
Definition: RangerEnv.h:179
SWC::Env::Rgr::is_shuttingdown
static SWC_CAN_INLINE bool is_shuttingdown() noexcept
Definition: RangerEnv.h:58
SWC::Env::Rgr::operator=
Rgr & operator=(Rgr &&)=delete
SWC::Env::Rgr::is_not_accepting
static SWC_CAN_INLINE bool is_not_accepting() noexcept
Definition: RangerEnv.h:53
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Env::Rgr::scan_reserved_bytes
static SWC_CAN_INLINE size_t scan_reserved_bytes() noexcept
Definition: RangerEnv.h:83
SWC::Env::Rgr::m_rgr_data
DB::RgrData m_rgr_data
Definition: RangerEnv.h:200
SWC::Env::Rgr::scan_reserved_bytes_sub
static SWC_CAN_INLINE void scan_reserved_bytes_sub(uint32_t bytes) noexcept
Definition: RangerEnv.h:93
SWC::Ranger::Compaction
Definition: Compaction.h:21
SWC::Env::Rgr::wait_if_in_process
static void wait_if_in_process()
Definition: RangerEnv.h:357
SWC::Env::Rgr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: RangerEnv.h:114
SWC_ASSERT
#define SWC_ASSERT(_e_)
Definition: Exception.h:165
SWC_FMT_LD
#define SWC_FMT_LD
Definition: Compat.h:99
SWC::Env::Rgr::_update_hdlr
client::Query::Update::Handlers::Common::Ptr _update_hdlr
Definition: RangerEnv.h:184
SWC::Env::Rgr::shuttingdown
static void shuttingdown()
Definition: RangerEnv.h:334
Interface.h
Resources.h
SWC::Env::Rgr::m_not_accepting
Core::AtomicBool m_not_accepting
Definition: RangerEnv.h:202
Common.h
SWC::Ranger::Columns::release
size_t release(size_t bytes=0)
Definition: Columns.cc:174
SWC::Env::Rgr::io
static SWC_CAN_INLINE Comm::IoContextPtr io() noexcept
Definition: RangerEnv.h:108
SWC::Env::Rgr::cfg_cs_replication
const SWC::Config::Property::Value_uint8_g::Ptr cfg_cs_replication
Definition: RangerEnv.h:172
SWC::Config::Property::Value_int32_g
Definition: Property.h:586
SWC::Env::Rgr::get
static SWC_CAN_INLINE Rgr * get() noexcept
Definition: RangerEnv.h:98
SWC::Core::AtomicBool
AtomicBase< bool > AtomicBool
Definition: Atomic.h:63
SWC::Env::Rgr::Rgr
Rgr(const Rgr &)=delete