SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
AppContext.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 #ifndef swcdb_ranger_AppContext_h
7 #define swcdb_ranger_AppContext_h
8 
9 
14 
16 #include "swcdb/ranger/RangerEnv.h"
17 
20 
33 
34 
35 namespace SWC { namespace Ranger {
36 
37 
38 
39 class AppContext final : public Comm::AppContext {
40  public:
41  typedef std::shared_ptr<AppContext> Ptr;
42 
43  SWC_SHOULD_NOT_INLINE
44  static Ptr make() {
45  auto settings = Env::Config::settings();
46 
47  settings->parse_file(
48  settings->get_str("swc.rgr.cfg", ""),
49  "swc.rgr.cfg.dyn"
50  );
51 
54  *settings,
56  "Clients",
57  settings->get_bool("swc.rgr.concurrency.relative"),
58  settings->get_i32("swc.rgr.clients.handlers")
59  ),
61  new client::ContextManager(*settings)),
63  new client::ContextRanger(*settings)),
65  new client::ContextBroker(*settings))
66  )->init()
67  );
68 
70  settings,
71  FS::fs_type(settings->get_str("swc.fs"))
72  );
73 
75 
76  auto period = settings->get<Config::Property::Value_int32_g>(
77  "swc.cfg.dyn.period");
78  if(period->get()) {
79  Env::Rgr::io()->set_periodic_timer(
80  period,
81  []() noexcept { Env::Config::settings()->check_dynamic_files(); }
82  );
83  }
84 
85  Ptr app(new AppContext());
86  app->id_mngr.reset(new Comm::Protocol::Mngr::Req::RgrMngId(
87  Env::Rgr::io(),
88  [app]() {
89  std::shared_ptr<std::thread> d(new std::thread);
90  *d.get() = std::thread([d, app]{ app->stop(); });
91  d->detach();
92  }
93  ));
94  return app;
95  }
96 
98  : Comm::AppContext(
99  Env::Config::settings()->get<Config::Property::Value_enum_g>(
100  "swc.rgr.comm.encoder")),
101  id_mngr(nullptr),
102  m_srv(nullptr),
103  m_metrics(Env::Rgr::metrics_track()),
104  m_guard(nullptr) {
105  }
106 
107  void init(const std::string& host,
108  const Comm::EndPoints& endpoints) override {
109  Env::Rgr::rgr_data()->endpoints = endpoints;
110 
111  int sig = 0;
112  Env::Rgr::io()->set_signals();
113  shutting_down(std::error_code(), sig);
114 
115  Env::Rgr::start();
116  id_mngr->request();
117 
118  if(m_metrics) {
119  m_metrics->configure_rgr(host.c_str(), endpoints);
120  m_metrics->start();
121  }
122  }
123 
125  m_srv = srv;
126  }
127 
128  virtual ~AppContext() noexcept { }
129 
131  m_srv->connection_add(conn);
132  if(m_metrics)
133  m_metrics->net->connected(conn);
134  }
135 
136  void handle_disconnect(Comm::ConnHandlerPtr conn) noexcept override {
137  m_srv->connection_del(conn);
138  if(m_metrics)
139  m_metrics->net->disconnected(conn);
140  }
141 
142  void handle(Comm::ConnHandlerPtr conn, const Comm::Event::Ptr& ev) override {
143  // SWC_LOG_OUT(LOG_DEBUG, ev->print(SWC_LOG_OSTREAM << "handle: "); );
144 
145  #if defined(SWC_ENABLE_SANITIZER)
147  return conn->do_close();
148  #endif
149 
150  if(ev->error) {
151  m_metrics->net->error(conn);
152  return;
153  }
154 
155  switch(Env::Rgr::rgr_data()->rgrid
156  ? Comm::Protocol::Rgr::Command(ev->header.command)
158 
160  conn->send_error(Error::RGR_NOT_READY, "", ev);
161  break;
162 
165  break;
166 
169  break;
170 
173  break;
174 
177  break;
178 
181  break;
182 
185  break;
186 
190  break;
191 
194  break;
195 
199  break;
200 
203  break;
204 
208  break;
209 
213  break;
214 
215  //&Comm::Protocol::Rgr::Handler::debug,
216  //&Comm::Protocol::Rgr::Handler::status,
217  /*
218  case Comm::Protocol::Rgr::Handler::shutdown: {
219  struct Task {
220  AppContext* ptr;
221  SWC_CAN_INLINE
222  Task(AppContext* a_ptr) noexcept : ptr(a_ptr) { }
223  void operator()() { ptr->shutting_down(std::error_code(), SIGINT); }
224  };
225  Env::Rgr::post(Task(this));
226  conn->response_ok(ev);
227  break;
228  }
229  */
230 
231  default: {
233  if(m_metrics)
234  m_metrics->net->error(conn);
235  return;
236  }
237  }
238 
239  if(m_metrics)
240  m_metrics->net->command(conn, ev->header.command);
241  }
242 
243  void net_bytes_sent(const Comm::ConnHandlerPtr& conn, size_t b)
244  noexcept override {
245  if(m_metrics)
246  m_metrics->net->sent(conn, b);
247  }
248 
249  void net_bytes_received(const Comm::ConnHandlerPtr& conn, size_t b)
250  noexcept override {
251  if(m_metrics)
252  m_metrics->net->received(conn, b);
253  }
254 
255  void net_accepted(const Comm::EndPoint& endpoint, bool secure)
256  noexcept override {
257  if(m_metrics)
258  m_metrics->net->accepted(endpoint, secure);
259  }
260 
261  void shutting_down(const std::error_code &ec, const int &sig) {
262  if(!sig) { // set signals listener
263  struct Task {
264  AppContext* ptr;
266  Task(AppContext* a_ptr) noexcept : ptr(a_ptr) { }
267  void operator()(const std::error_code& ec, const int &sig) {
268  if(ec == asio::error::operation_aborted)
269  return;
270  SWC_LOGF(LOG_INFO, "Received signal, sig=%d ec=%s",
271  sig, ec.message().c_str());
272  ptr->shutting_down(ec, sig);
273  }
274  };
275  Env::Rgr::io()->signals->async_wait(Task(this));
276 
277  SWC_LOGF(LOG_INFO, "Listening for Shutdown signal, set at sig=%d ec=%s",
278  sig, ec.message().c_str());
279  return;
280  }
281  Env::Rgr::io()->signals->cancel();
282  SWC_LOGF(LOG_INFO, "Shutdown signal, sig=%d ec=%s",
283  sig, ec.message().c_str());
284 
285  if(!m_srv) {
286  SWC_LOG(LOG_INFO, "Exit");
287  SWC_QUICK_EXIT(EXIT_SUCCESS);
288  }
289 
290 
292 
293  m_guard = m_srv->stop_accepting(); // no further requests accepted
294 
295  id_mngr->request();
296  }
297 
298  void stop() override {
300 
301  #if defined(SWC_ENABLE_SANITIZER)
302  if(m_metrics)
303  m_metrics->wait();
304  #endif
305 
306  Env::Clients::get()->stop();
307 
308  Env::FsInterface::interface()->stop();
309 
310  Env::Rgr::io()->stop();
311 
312  m_srv->shutdown();
313 
314  #if defined(SWC_ENABLE_SANITIZER)
315  std::this_thread::sleep_for(std::chrono::seconds(2));
316  m_metrics = nullptr;
317  id_mngr = nullptr;
318  m_srv = nullptr;
319  Env::Rgr::reset();
322  #endif
323 
324  m_guard = nullptr;
325  }
326 
327  private:
328 
332  std::shared_ptr<Comm::IoContext::ExecutorWorkGuard> m_guard;
333 
334 };
335 
336 
337 }}
338 
339 
340 
341 #endif // swcdb_ranger_AppContext_h
SWC::Env::FsInterface::init
static void init(const SWC::Config::Settings::Ptr &settings, FS::Type typ)
Definition: Interface.cc:513
SWC::Comm::Protocol::Rgr::Handler::column_compact
void column_compact(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ColumnCompact.h:17
SWC::Comm::Protocol::Rgr::Handler::Report
Definition: Report.h:17
SWC::Comm::Protocol::Rgr::REPORT
@ REPORT
Definition: Commands.h:36
SWC::Comm::Protocol::Rgr::Handler::RangeQuerySelect
Definition: RangeQuerySelect.h:25
SWC::FS::fs_type
Type fs_type(const std::string &fs_name)
Definition: FileSystem.cc:19
Clients.h
SWC::Comm::Protocol::Mngr::Req::RgrMngId
Definition: RgrMngId.h:16
SWC::Env::Clients::get
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
Definition: Clients.h:299
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Ranger::AppContext::m_srv
Comm::server::SerializedServer::Ptr m_srv
Definition: AppContext.h:330
SWC::Error::RGR_NOT_READY
@ RGR_NOT_READY
Definition: Error.h:90
SWC::client::ContextManager::Ptr
std::shared_ptr< ContextManager > Ptr
Definition: ContextManager.h:18
SWC::Env::Rgr::rgr_data
static SWC_CAN_INLINE DB::RgrData * rgr_data() noexcept
Definition: RangerEnv.h:48
SWC::Env::Config::settings
static SWC::Config::Settings::Ptr & settings()
Definition: Settings.h:128
SWC::Ranger::AppContext::handle_established
void handle_established(Comm::ConnHandlerPtr conn) override
Definition: AppContext.h:130
SWC::Comm::EndPoint
asio::ip::tcp::endpoint EndPoint
Definition: Resolver.h:19
SWC::Ranger::AppContext
Definition: AppContext.h:39
SWC::Ranger::AppContext::shutting_down
void shutting_down(const std::error_code &ec, const int &sig)
Definition: AppContext.h:261
SWC::Env::FsInterface::interface
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
Definition: Interface.h:150
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
RangeIsLoaded.h
ColumnCompact.h
ColumnsUnload.h
RangeUnload.h
RangeQuerySelect.h
SWC::Comm::Protocol::Rgr::Handler::assign_id
void assign_id(const ConnHandlerPtr &conn, const Event::Ptr &ev, Mngr::Req::RgrMngId::Ptr id_mngr)
Definition: AssignId.h:15
SWC::Comm::Protocol::Rgr::RANGE_IS_LOADED
@ RANGE_IS_LOADED
Definition: Commands.h:30
SWC::client::ContextBroker::Ptr
std::shared_ptr< ContextBroker > Ptr
Definition: ContextBroker.h:18
SWC::Env::Rgr::init
static void init()
Definition: RangerEnv.h:36
SWC::Comm::Protocol::Rgr::Handler::RangeLocate
Definition: RangeLocate.h:18
SWC::Comm::Protocol::Rgr::Handler::range_unload
void range_unload(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: RangeUnload.h:17
SWC::Env::Clients::init
static void init(const client::Clients::Ptr &clients)
Definition: Clients.cc:182
SWC::Env::Rgr::start
static void start()
Definition: RangerEnv.h:329
SWC::Comm::Protocol::Rgr::Command
Command
Definition: Commands.h:25
SWC::Ranger::AppContext::id_mngr
Comm::Protocol::Mngr::Req::RgrMngId::Ptr id_mngr
Definition: AppContext.h:329
SWC::Comm::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:23
AppHandler.h
SWC::Comm::Protocol::Rgr::COLUMN_COMPACT
@ COLUMN_COMPACT
Definition: Commands.h:28
SWC::Comm::Protocol::Rgr::Handler::range_query_update
void range_query_update(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: RangeQueryUpdate.h:18
SWC::Comm::Protocol::Rgr::COLUMNS_UNLOAD
@ COLUMNS_UNLOAD
Definition: Commands.h:37
SWC::Comm::Protocol::Rgr::RANGE_LOCATE
@ RANGE_LOCATE
Definition: Commands.h:33
AppContext.h
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
RangeLocate.h
SWC::Ranger::AppContext::make
static SWC_SHOULD_NOT_INLINE Ptr make()
Definition: AppContext.h:44
Report.h
SWC_LOG
#define SWC_LOG(priority, message)
Definition: Logger.h:191
RangerEnv.h
SWC::Comm::Protocol::Rgr::Handler::range_is_loaded
void range_is_loaded(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: RangeIsLoaded.h:17
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Comm::Protocol::Rgr::RANGE_UNLOAD
@ RANGE_UNLOAD
Definition: Commands.h:32
SWC::Comm::Protocol::Rgr::COLUMN_DELETE
@ COLUMN_DELETE
Definition: Commands.h:27
SWC::Ranger::AppContext::net_bytes_received
void net_bytes_received(const Comm::ConnHandlerPtr &conn, size_t b) noexcept override
Definition: AppContext.h:249
SWC::Comm::AppContext
Definition: AppContext.h:21
SWC::Comm::Protocol::Rgr::Handler::column_update
void column_update(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ColumnUpdate.h:17
SWC::Comm::Protocol::Common::Handler::not_implemented
void not_implemented(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: NotImplemented.h:16
SWC::Ranger::AppContext::net_accepted
void net_accepted(const Comm::EndPoint &endpoint, bool secure) noexcept override
Definition: AppContext.h:255
SWC::Ranger::AppContext::stop
void stop() override
Definition: AppContext.h:298
SWC::Comm::server::SerializedServer::Ptr
std::shared_ptr< SerializedServer > Ptr
Definition: SerializedServer.h:62
SWC::client::ContextManager
Definition: ContextManager.h:15
RgrMngId.h
SWC::Ranger::AppContext::set_srv
void set_srv(Comm::server::SerializedServer::Ptr srv)
Definition: AppContext.h:124
SWC::Ranger::AppContext::init
void init(const std::string &host, const Comm::EndPoints &endpoints) override
Definition: AppContext.h:107
SWC::Ranger::Metric::Reporting::Ptr
std::shared_ptr< Reporting > Ptr
Definition: MetricsReporting.h:26
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
SWC::Core::Vector< EndPoint >
SWC::Comm::Protocol::Rgr::Handler::RangeLoad
Definition: RangeLoad.h:18
SWC::Ranger::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:41
SWC_QUICK_EXIT
#define SWC_QUICK_EXIT(_CODE_)
Definition: Compat.h:184
SWC::client::ContextRanger::Ptr
std::shared_ptr< ContextRanger > Ptr
Definition: ContextRanger.h:18
SWC::client::ContextBroker
Definition: ContextBroker.h:15
NotImplemented.h
SWC::Comm::Protocol::Rgr::RANGE_QUERY_UPDATE
@ RANGE_QUERY_UPDATE
Definition: Commands.h:34
SWC::Comm::Protocol::Rgr::ASSIGN_ID_NEEDED
@ ASSIGN_ID_NEEDED
Definition: Commands.h:38
ColumnUpdate.h
SWC::Env::Rgr::reset
static void reset() noexcept
Definition: RangerEnv.h:158
SWC::Comm::Protocol::Rgr::Handler::column_delete
void column_delete(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ColumnDelete.h:17
SWC::Env::Rgr::is_shuttingdown
static SWC_CAN_INLINE bool is_shuttingdown() noexcept
Definition: RangerEnv.h:58
SWC::Comm::Protocol::Rgr::RANGE_QUERY_SELECT
@ RANGE_QUERY_SELECT
Definition: Commands.h:35
SWC::Ranger::AppContext::net_bytes_sent
void net_bytes_sent(const Comm::ConnHandlerPtr &conn, size_t b) noexcept override
Definition: AppContext.h:243
SWC::Env::Clients::reset
static void reset() noexcept
Definition: Clients.h:308
ColumnDelete.h
SWC::Ranger::AppContext::AppContext
AppContext()
Definition: AppContext.h:97
SWC::client::Clients::make
static Ptr make(const Config::Settings &settings, const Comm::IoContextPtr &io_ctx, const ContextManager::Ptr &mngr_ctx, const ContextRanger::Ptr &rgr_ctx, const ContextBroker::Ptr &bkr_ctx)
Definition: Clients.cc:13
SWC::DB::RgrData::endpoints
Comm::EndPoints endpoints
Definition: RgrData.h:29
RangeQueryUpdate.h
SWC::Comm::Event::Ptr
std::shared_ptr< Event > Ptr
Definition: Event.h:33
ResponseCallback.h
SWC::Env::Rgr::wait_if_in_process
static void wait_if_in_process()
Definition: RangerEnv.h:357
SWC::Comm::Protocol::Rgr::RANGE_LOAD
@ RANGE_LOAD
Definition: Commands.h:31
SWC::Env::Rgr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: RangerEnv.h:114
DispatchHandler.h
SWC::Comm::Protocol::Mngr::Req::RgrMngId::Ptr
std::shared_ptr< RgrMngId > Ptr
Definition: RgrMngId.h:19
SWC::Ranger::AppContext::handle
void handle(Comm::ConnHandlerPtr conn, const Comm::Event::Ptr &ev) override
Definition: AppContext.h:142
SWC::Ranger::AppContext::handle_disconnect
void handle_disconnect(Comm::ConnHandlerPtr conn) noexcept override
Definition: AppContext.h:136
AssignId.h
SWC::Ranger::AppContext::m_guard
std::shared_ptr< Comm::IoContext::ExecutorWorkGuard > m_guard
Definition: AppContext.h:332
SWC::Comm::IoContext::make
static IoContextPtr make(std::string &&_name, int32_t size)
Definition: IoContext.h:41
SWC::Env::Rgr::shuttingdown
static void shuttingdown()
Definition: RangerEnv.h:334
SWC::Ranger::AppContext::m_metrics
Metric::Reporting::Ptr m_metrics
Definition: AppContext.h:331
SWC::Comm::Protocol::Rgr::SCHEMA_UPDATE
@ SCHEMA_UPDATE
Definition: Commands.h:29
SWC::Env::Rgr::io
static SWC_CAN_INLINE Comm::IoContextPtr io() noexcept
Definition: RangerEnv.h:108
RangeLoad.h
SWC::Config::Property::Value_int32_g
Definition: Property.h:586
SWC::Env::FsInterface::reset
static void reset() noexcept
Definition: Interface.h:159
SWC::client::ContextRanger
Definition: ContextRanger.h:15
SWC::Comm::Protocol::Rgr::MAX_CMD
@ MAX_CMD
Definition: Commands.h:39
SWC::Ranger::AppContext::~AppContext
virtual ~AppContext() noexcept
Definition: AppContext.h:128
SWC::Comm::Protocol::Rgr::Handler::columns_unload
void columns_unload(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ColumnsUnload.h:17