SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
AppContext.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 #ifndef swcdb_broker_AppContext_h
7 #define swcdb_broker_AppContext_h
8 
9 
14 
16 #include "swcdb/broker/BrokerEnv.h"
17 
25 
26 
27 namespace SWC { namespace Broker {
28 
29 
30 
31 class AppContext final : public Comm::AppContext {
32 
33  struct CommandHandler {
34  // in-order of Protocol::Bkr::Command
35  static constexpr const Comm::AppHandler_t handlers[] = {
43  };
48  const Comm::Event::Ptr& a_ev) noexcept
49  : conn(a_conn), ev(a_ev) {
51  }
53  CommandHandler(CommandHandler&& other) noexcept
54  : conn(std::move(other.conn)), ev(std::move(other.ev)) {
55  }
56  CommandHandler(const CommandHandler&) = delete;
59  ~CommandHandler() noexcept { }
60  void operator()() {
61  ev->expired() || !conn->is_open()
63  : handlers[ev->header.command](conn, ev);
64  }
65  };
66 
67 
68  public:
69 
70  typedef std::shared_ptr<AppContext> Ptr;
71 
72  static Ptr make() {
73  auto settings = Env::Config::settings();
74 
75  settings->parse_file(
76  settings->get_str("swc.bkr.cfg", ""),
77  "swc.bkr.cfg.dyn"
78  );
79 
82  *settings,
84  "Clients",
85  settings->get_bool("swc.bkr.concurrency.relative"),
86  settings->get_i32("swc.bkr.clients.handlers")
87  ),
89  new client::ContextManager(*settings)),
91  new client::ContextRanger(*settings))
92  )->init()
93  );
94 
96 
97  auto period = settings->get<Config::Property::Value_int32_g>(
98  "swc.cfg.dyn.period");
99  if(period->get()) {
100  Env::Bkr::io()->set_periodic_timer(
101  period,
102  []() noexcept { Env::Config::settings()->check_dynamic_files(); }
103  );
104  }
105 
106  return Ptr(new AppContext());
107  }
108 
110  : Comm::AppContext(
111  Env::Config::settings()->get<Config::Property::Value_enum_g>(
112  "swc.bkr.comm.encoder")),
113  m_metrics(Env::Bkr::metrics_track()) {
114  }
115 
116  void init(const std::string& host,
117  const Comm::EndPoints& endpoints) override {
118 
119  int sig = 0;
120  Env::Bkr::io()->set_signals();
121  shutting_down(std::error_code(), sig);
122 
123  Env::Bkr::start();
124 
125  if(m_metrics) {
126  m_metrics->configure_bkr(host.c_str(), endpoints);
127  m_metrics->start();
128  }
129  }
130 
132  m_srv = srv;
133  }
134 
135  virtual ~AppContext() noexcept { }
136 
138  m_srv->connection_add(conn);
139  if(m_metrics)
140  m_metrics->net->connected(conn);
141  }
142 
143  void handle_disconnect(Comm::ConnHandlerPtr conn) noexcept override {
144  m_srv->connection_del(conn);
145  if(m_metrics)
146  m_metrics->net->disconnected(conn);
147  }
148 
149  void handle(Comm::ConnHandlerPtr conn, const Comm::Event::Ptr& ev) override {
150  // SWC_LOG_OUT(LOG_DEBUG, ev->print(SWC_LOG_OSTREAM << "handle: "); );
151 
152  if(!Env::Bkr::can_process())
153  return conn->do_close();
154 
155  if(ev->error) {
156  m_metrics->net->error(conn);
157 
158  } else if(!ev->header.command ||
159  ev->header.command >= Comm::Protocol::Bkr::MAX_CMD) {
161  if(m_metrics)
162  m_metrics->net->error(conn);
163 
164  } else {
165  Env::Bkr::post(CommandHandler(conn, ev));
166  if(m_metrics)
167  m_metrics->net->command(conn, ev->header.command);
168  }
169 
171  }
172 
173  void net_bytes_sent(const Comm::ConnHandlerPtr& conn, size_t b)
174  noexcept override {
175  if(m_metrics)
176  m_metrics->net->sent(conn, b);
177  }
178 
179  void net_bytes_received(const Comm::ConnHandlerPtr& conn, size_t b)
180  noexcept override {
181  if(m_metrics)
182  m_metrics->net->received(conn, b);
183  }
184 
185  void net_accepted(const Comm::EndPoint& endpoint, bool secure)
186  noexcept override {
187  if(m_metrics)
188  m_metrics->net->accepted(endpoint, secure);
189  }
190 
191  void shutting_down(const std::error_code &ec, const int &sig) {
192  if(!sig) { // set signals listener
193  struct Task {
194  AppContext* ptr;
196  Task(AppContext* a_ptr) noexcept : ptr(a_ptr) { }
197  void operator()(const std::error_code& ec, const int &sig) {
198  if(ec == asio::error::operation_aborted)
199  return;
200  SWC_LOGF(LOG_INFO, "Received signal, sig=%d ec=%s",
201  sig, ec.message().c_str());
202  ptr->shutting_down(ec, sig);
203  }
204  };
205  Env::Bkr::io()->signals->async_wait(Task(this));
206 
207  SWC_LOGF(LOG_INFO, "Listening for Shutdown signal, set at sig=%d ec=%s",
208  sig, ec.message().c_str());
209  return;
210  }
211  SWC_LOGF(LOG_INFO, "Shutdown signal, sig=%d ec=%s",
212  sig, ec.message().c_str());
213 
214  if(!m_srv) {
215  SWC_LOG(LOG_INFO, "Exit");
216  SWC_QUICK_EXIT(EXIT_SUCCESS);
217  }
218 
219  std::shared_ptr<std::thread> d(new std::thread);
220  *d.get() = std::thread([d, ptr=shared_from_this()]{ ptr->stop(); });
221  d->detach();
222  }
223 
224  void stop() override {
225  auto guard = m_srv->stop_accepting(); // no further requests accepted
226 
228 
229  #if defined(SWC_ENABLE_SANITIZER)
230  if(m_metrics)
231  m_metrics->wait();
232  #endif
233 
234  Env::Clients::get()->stop();
235 
236  Env::Bkr::io()->stop();
237 
238  m_srv->shutdown();
239 
240  #if defined(SWC_ENABLE_SANITIZER)
241  std::this_thread::sleep_for(std::chrono::seconds(2));
242  m_metrics = nullptr;
243  m_srv = nullptr;
244  Env::Bkr::reset();
246  #endif
247 
248  guard = nullptr;
249  }
250 
251  private:
252 
255 
256 };
257 
258 
259 }}
260 
261 
262 
263 #endif // swcdb_broker_AppContext_h
SWC::Broker::AppContext::CommandHandler::CommandHandler
SWC_CAN_INLINE CommandHandler(const Comm::ConnHandlerPtr &a_conn, const Comm::Event::Ptr &a_ev) noexcept
Definition: AppContext.h:47
SWC::Env::Bkr::in_process
static SWC_CAN_INLINE void in_process(int64_t count) noexcept
Definition: BrokerEnv.h:50
SWC::Comm::Protocol::Bkr::MAX_CMD
@ MAX_CMD
Definition: Commands.h:108
SWC::Broker::AppContext::m_metrics
Metric::Reporting::Ptr m_metrics
Definition: AppContext.h:254
SWC::Broker::AppContext::CommandHandler
Definition: AppContext.h:33
Clients.h
SWC::Comm::Protocol::Bkr::Handler::cells_select
void cells_select(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: CellsSelect.h:93
SWC::Env::Clients::get
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
Definition: Clients.h:299
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
CellsSelect.h
SWC::Broker::AppContext::set_srv
void set_srv(Comm::server::SerializedServer::Ptr srv)
Definition: AppContext.h:131
SWC::client::ContextManager::Ptr
std::shared_ptr< ContextManager > Ptr
Definition: ContextManager.h:18
SWC::Env::Config::settings
static SWC::Config::Settings::Ptr & settings()
Definition: Settings.h:128
SWC::Env::Bkr::shuttingdown
static void shuttingdown()
Definition: BrokerEnv.h:32
SWC::Comm::Protocol::Bkr::Handler::column_get
void column_get(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ColumnGet.h:79
SWC::Comm::EndPoint
asio::ip::tcp::endpoint EndPoint
Definition: Resolver.h:19
SWC::Broker::AppContext::net_bytes_received
void net_bytes_received(const Comm::ConnHandlerPtr &conn, size_t b) noexcept override
Definition: AppContext.h:179
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::Comm::Protocol::Bkr::Handler::column_compact
void column_compact(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ColumnCompact.h:56
ColumnGet.h
SWC::Env::Bkr::init
static void init()
Definition: BrokerEnv.h:25
SWC::Env::Clients::init
static void init(const client::Clients::Ptr &clients)
Definition: Clients.cc:182
SWC::Comm::Protocol::Bkr::Handler::column_list
void column_list(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ColumnList.h:74
SWC::Broker::AppContext::CommandHandler::CommandHandler
SWC_CAN_INLINE CommandHandler(CommandHandler &&other) noexcept
Definition: AppContext.h:53
AppHandler.h
ColumnList.h
SWC::Comm::Protocol::Bkr::Handler::cells_update
void cells_update(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: CellsUpdate.h:89
SWC::Broker::AppContext::CommandHandler::conn
Comm::ConnHandlerPtr conn
Definition: AppContext.h:44
AppContext.h
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Broker::AppContext::stop
void stop() override
Definition: AppContext.h:224
SWC::Broker::AppContext::CommandHandler::~CommandHandler
~CommandHandler() noexcept
Definition: AppContext.h:59
SWC::Broker::AppContext
Definition: AppContext.h:31
SWC_LOG
#define SWC_LOG(priority, message)
Definition: Logger.h:191
SWC::Broker::AppContext::net_accepted
void net_accepted(const Comm::EndPoint &endpoint, bool secure) noexcept override
Definition: AppContext.h:185
SWC::Env::Bkr::reset
static void reset() noexcept
Definition: BrokerEnv.h:85
SWC::Broker::AppContext::CommandHandler::ev
Comm::Event::Ptr ev
Definition: AppContext.h:45
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Broker::AppContext::handle_established
void handle_established(Comm::ConnHandlerPtr conn) override
Definition: AppContext.h:137
SWC::Broker::AppContext::CommandHandler::CommandHandler
CommandHandler(const CommandHandler &)=delete
SWC::Broker::AppContext::AppContext
AppContext()
Definition: AppContext.h:109
SWC::Comm::AppContext
Definition: AppContext.h:21
SWC::Comm::Protocol::Common::Handler::not_implemented
void not_implemented(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: NotImplemented.h:16
SWC::Comm::server::SerializedServer::Ptr
std::shared_ptr< SerializedServer > Ptr
Definition: SerializedServer.h:62
SWC::client::ContextManager
Definition: ContextManager.h:15
BrokerEnv.h
SWC::Broker::AppContext::CommandHandler::operator()
void operator()()
Definition: AppContext.h:60
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
SWC::Broker::Metric::Reporting::Ptr
std::shared_ptr< Reporting > Ptr
Definition: MetricsReporting.h:26
SWC::Core::Vector< EndPoint >
SWC_QUICK_EXIT
#define SWC_QUICK_EXIT(_CODE_)
Definition: Compat.h:184
SWC::client::ContextRanger::Ptr
std::shared_ptr< ContextRanger > Ptr
Definition: ContextRanger.h:18
SWC::Broker::AppContext::handle_disconnect
void handle_disconnect(Comm::ConnHandlerPtr conn) noexcept override
Definition: AppContext.h:143
ColumnCompact.h
SWC::Broker::AppContext::init
void init(const std::string &host, const Comm::EndPoints &endpoints) override
Definition: AppContext.h:116
SWC::Broker::AppContext::shutting_down
void shutting_down(const std::error_code &ec, const int &sig)
Definition: AppContext.h:191
SWC::Env::Bkr::can_process
static SWC_CAN_INLINE bool can_process() noexcept
Definition: BrokerEnv.h:42
NotImplemented.h
SWC::Comm::AppHandler_t
void(* AppHandler_t)(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: AppHandler.h:15
SWC::Broker::AppContext::m_srv
Comm::server::SerializedServer::Ptr m_srv
Definition: AppContext.h:253
SWC::Env::Clients::reset
static void reset() noexcept
Definition: Clients.h:308
SWC::Broker::AppContext::handle
void handle(Comm::ConnHandlerPtr conn, const Comm::Event::Ptr &ev) override
Definition: AppContext.h:149
SWC::Broker::AppContext::net_bytes_sent
void net_bytes_sent(const Comm::ConnHandlerPtr &conn, size_t b) noexcept override
Definition: AppContext.h:173
SWC::Env::Bkr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: BrokerEnv.h:71
SWC::client::Clients::make
static Ptr make(const Config::Settings &settings, const Comm::IoContextPtr &io_ctx, const ContextManager::Ptr &mngr_ctx, const ContextRanger::Ptr &rgr_ctx, const ContextBroker::Ptr &bkr_ctx)
Definition: Clients.cc:13
SWC::Broker::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:70
ColumnMng.h
SWC::Comm::Event::Ptr
std::shared_ptr< Event > Ptr
Definition: Event.h:33
SWC::Comm::Protocol::Bkr::Handler::column_mng
void column_mng(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ColumnMng.h:59
ResponseCallback.h
SWC::Broker::AppContext::make
static Ptr make()
Definition: AppContext.h:72
SWC::Broker::AppContext::CommandHandler::operator=
CommandHandler & operator=(const CommandHandler &)=delete
DispatchHandler.h
SWC::Env::Bkr::io
static SWC_CAN_INLINE Comm::IoContextPtr io() noexcept
Definition: BrokerEnv.h:65
SWC::Env::Bkr::start
static void start()
Definition: BrokerEnv.h:156
SWC::Broker::AppContext::~AppContext
virtual ~AppContext() noexcept
Definition: AppContext.h:135
SWC::Broker::AppContext::CommandHandler::handlers
static constexpr const Comm::AppHandler_t handlers[]
Definition: AppContext.h:35
SWC::Env::Bkr::processed
static SWC_CAN_INLINE void processed() noexcept
Definition: BrokerEnv.h:55
SWC::Comm::IoContext::make
static IoContextPtr make(std::string &&_name, int32_t size)
Definition: IoContext.h:41
CellsUpdate.h
SWC::Broker::AppContext::CommandHandler::operator=
CommandHandler & operator=(CommandHandler &&)=delete
SWC::Config::Property::Value_int32_g
Definition: Property.h:586
SWC::client::ContextRanger
Definition: ContextRanger.h:15