SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
AppContext.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 #ifndef swcdb_fsbroker_AppContext_h
7 #define swcdb_fsbroker_AppContext_h
8 
10 
13 
15 #include "swcdb/fs/Interface.h"
17 
38 
39 
40 namespace SWC { namespace FsBroker {
41 
42 
43 class AppContext final : public Comm::AppContext {
44 
45  struct CommandHandler {
46  // in-order of Protocol::FsBroker::Coomand
47  static constexpr const Comm::AppHandler_t handlers[] = {
68 
69  //&Comm::Protocol::FsBroker::Handler::debug,
70  //&Comm::Protocol::FsBroker::Handler::status,
71  //&Comm::Protocol::FsBroker::Handler::shutdown
72  };
77  const Comm::Event::Ptr& a_ev) noexcept
78  : conn(a_conn), ev(a_ev) {
80  }
82  CommandHandler(CommandHandler&& other) noexcept
83  : conn(std::move(other.conn)), ev(std::move(other.ev)) {
84  }
85  CommandHandler(const CommandHandler&) = delete;
88  ~CommandHandler() noexcept { }
89  void operator()() {
90  if(!ev->expired() && conn->is_open())
91  handlers[ev->header.command](conn, ev);
93  }
94  };
95 
96 
97  public:
98 
99  typedef std::shared_ptr<AppContext> Ptr;
100 
102  : Comm::AppContext(
103  Env::Config::settings()->get<Config::Property::Value_enum_g>(
104  "swc.FsBroker.comm.encoder")) {
105 
106  auto settings = Env::Config::settings();
107 
109  settings->get_bool("swc.FsBroker.concurrency.relative"),
110  settings->get_i32("swc.FsBroker.handlers")
111  );
112 
113  auto fs_type = FS::fs_type(settings->get_str("swc.fs.broker.underlying"));
115  Env::FsInterface::init(settings, fs_type);
116 
117  if(settings->get_bool("swc.FsBroker.metrics.enabled")) {
119  (settings->get_bool("swc.FsBroker.metrics.report.broker")
121  *settings,
122  Env::IoCtx::io(),
124  new client::ContextBroker(*settings))
125  )
127  *settings,
128  Env::IoCtx::io(),
130  new client::ContextManager(*settings)),
132  new client::ContextRanger(*settings))
133  )
134  )->init()
135  );
136  }
138 
139  auto period = settings->get<Config::Property::Value_int32_g>(
140  "swc.cfg.dyn.period");
141  if(period->get()) {
142  Env::IoCtx::io()->set_periodic_timer(
143  period,
144  []() noexcept { Env::Config::settings()->check_dynamic_files(); }
145  );
146  }
147  }
148 
149  void init(const std::string& host,
150  const Comm::EndPoints& endpoints) override {
151  int sig = 0;
152  Env::IoCtx::io()->set_signals();
153  shutting_down(std::error_code(), sig);
154 
156  m_metrics->configure_fsbroker(host.c_str(), endpoints);
157  m_metrics->start();
158  }
159  }
160 
162  m_srv = srv;
163  }
164 
165  virtual ~AppContext() noexcept { }
166 
168  m_srv->connection_add(conn);
169  if(m_metrics)
170  m_metrics->net->connected(conn);
171  }
172 
173  void handle_disconnect(Comm::ConnHandlerPtr conn) noexcept override {
174  m_srv->connection_del(conn);
175  if(m_metrics)
176  m_metrics->net->disconnected(conn);
177  }
178 
179  void handle(Comm::ConnHandlerPtr conn, const Comm::Event::Ptr& ev) override {
180  // SWC_LOG_OUT(LOG_DEBUG, ev->print(SWC_LOG_OSTREAM << "handle: "); );
182  return conn->do_close();
183 
184  if(ev->error) {
185  m_metrics->net->error(conn);
186 
187  } else if(!ev->header.command ||
188  ev->header.command >= Comm::Protocol::FsBroker::MAX_CMD) {
190  if(m_metrics)
191  m_metrics->net->error(conn);
192 
193  } else {
194  Env::IoCtx::post(CommandHandler(conn, ev));
195  if(m_metrics)
196  m_metrics->net->command(conn, ev->header.command);
197  }
198 
200  }
201 
202  void net_bytes_sent(const Comm::ConnHandlerPtr& conn, size_t b)
203  noexcept override {
204  if(m_metrics)
205  m_metrics->net->sent(conn, b);
206  }
207 
208  void net_bytes_received(const Comm::ConnHandlerPtr& conn, size_t b)
209  noexcept override {
210  if(m_metrics)
211  m_metrics->net->received(conn, b);
212  }
213 
214  void net_accepted(const Comm::EndPoint& endpoint, bool secure)
215  noexcept override {
216  if(m_metrics)
217  m_metrics->net->accepted(endpoint, secure);
218  }
219 
220  void shutting_down(const std::error_code &ec, const int &sig) {
221  if(!sig) { // set signals listener
222  struct Task {
223  AppContext* ptr;
225  Task(AppContext* a_ptr) noexcept : ptr(a_ptr) { }
226  void operator()(const std::error_code& ec, const int &sig) {
227  if(ec == asio::error::operation_aborted)
228  return;
229  SWC_LOGF(LOG_INFO, "Received signal, sig=%d ec=%s",
230  sig, ec.message().c_str());
231  ptr->shutting_down(ec, sig);
232  }
233  };
234  Env::IoCtx::io()->signals->async_wait(Task(this));
235 
236  SWC_LOGF(LOG_INFO, "Listening for Shutdown signal, set at sig=%d ec=%s",
237  sig, ec.message().c_str());
238  return;
239  }
240 
241  SWC_LOGF(LOG_INFO, "Shutdown signal, sig=%d ec=%s", sig, ec.message().c_str());
242  std::shared_ptr<std::thread> d(new std::thread);
243  *d.get() = std::thread([d, ptr=shared_from_this()]{ ptr->stop(); });
244  d->detach();
245  }
246 
247  void stop() override {
248 
249  auto guard = m_srv->stop_accepting(); // no further requests accepted
250 
252  if(m_metrics) {
253  #if defined(SWC_ENABLE_SANITIZER)
254  m_metrics->wait();
255  #endif
256  Env::Clients::get()->stop_services();
257  }
258 
259  Env::IoCtx::io()->stop();
260 
261  Env::FsInterface::interface()->stop();
262 
263  m_srv->shutdown();
264 
265  #if defined(SWC_ENABLE_SANITIZER)
266  std::this_thread::sleep_for(std::chrono::seconds(2));
267  m_srv = nullptr;
268  m_metrics = nullptr;
273  #endif
274 
275  guard = nullptr;
276  }
277 
278  private:
281 };
282 
283 
284 }}
285 
286 
287 
288 #endif // swcdb_fsbroker_AppContext_h
SWC::Env::FsInterface::init
static void init(const SWC::Config::Settings::Ptr &settings, FS::Type typ)
Definition: Interface.cc:513
Write.h
SWC::Comm::Protocol::FsBroker::Handler::close
void close(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Close.h:17
SWC::Comm::Protocol::FsBroker::Handler::seek
void seek(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Seek.h:17
SWC::FsBroker::AppContext::CommandHandler::handlers
static constexpr const Comm::AppHandler_t handlers[]
Definition: AppContext.h:47
SWC::FsBroker::AppContext::handle_established
void handle_established(Comm::ConnHandlerPtr conn) override
Definition: AppContext.h:167
SWC::FS::fs_type
Type fs_type(const std::string &fs_name)
Definition: FileSystem.cc:19
Exists.h
Clients.h
SWC::Comm::Protocol::FsBroker::Handler::rmdir
void rmdir(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Rmdir.h:17
SWC::Env::Clients::get
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
Definition: Clients.h:299
SWC::FsBroker::AppContext::CommandHandler::operator()
void operator()()
Definition: AppContext.h:89
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Env::FsBroker::init
static void init()
Definition: FsBrokerEnv.h:64
SWC::Env::IoCtx::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: IoContext.h:139
SWC::FsBroker::AppContext::~AppContext
virtual ~AppContext() noexcept
Definition: AppContext.h:165
SWC::Comm::Protocol::FsBroker::Handler::read
void read(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Read.h:17
SWC::client::ContextManager::Ptr
std::shared_ptr< ContextManager > Ptr
Definition: ContextManager.h:18
SWC::Comm::Protocol::FsBroker::Handler::exists
void exists(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Exists.h:17
SWC::FsBroker::AppContext::m_srv
Comm::server::SerializedServer::Ptr m_srv
Definition: AppContext.h:279
SWC::Env::Config::settings
static SWC::Config::Settings::Ptr & settings()
Definition: Settings.h:128
SWC::Comm::EndPoint
asio::ip::tcp::endpoint EndPoint
Definition: Resolver.h:19
SWC::Env::FsInterface::interface
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
Definition: Interface.h:150
SWC::FsBroker::AppContext
Definition: AppContext.h:43
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
Mkdirs.h
SWC::FsBroker::AppContext::CommandHandler::CommandHandler
CommandHandler(const CommandHandler &)=delete
Seek.h
SWC::client::ContextBroker::Ptr
std::shared_ptr< ContextBroker > Ptr
Definition: ContextBroker.h:18
Remove.h
SWC::FS::BROKER
@ BROKER
Definition: FileSystem.h:24
Rmdir.h
Length.h
SWC::FsBroker::AppContext::CommandHandler::ev
Comm::Event::Ptr ev
Definition: AppContext.h:74
SWC::Env::FsBroker::reset
static void reset() noexcept
Definition: FsBrokerEnv.h:74
SWC::FsBroker::AppContext::shutting_down
void shutting_down(const std::error_code &ec, const int &sig)
Definition: AppContext.h:220
SWC::Env::Clients::init
static void init(const client::Clients::Ptr &clients)
Definition: Clients.cc:182
SWC::FsBroker::AppContext::set_srv
void set_srv(Comm::server::SerializedServer::Ptr srv)
Definition: AppContext.h:161
SWC::Comm::Protocol::FsBroker::Handler::read_all
void read_all(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: ReadAll.h:27
CombiPread.h
SWC::FsBroker::AppContext::m_metrics
Metric::Reporting::Ptr m_metrics
Definition: AppContext.h:280
Rename.h
AppHandler.h
SWC::FsBroker::AppContext::CommandHandler::operator=
CommandHandler & operator=(const CommandHandler &)=delete
SWC::FsBroker::AppContext::CommandHandler::CommandHandler
SWC_CAN_INLINE CommandHandler(const Comm::ConnHandlerPtr &a_conn, const Comm::Event::Ptr &a_ev) noexcept
Definition: AppContext.h:76
SWC::Comm::Protocol::FsBroker::Handler::create
void create(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Create.h:18
AppContext.h
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Comm::Protocol::FsBroker::Handler::sync
void sync(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Sync.h:17
SWC::Comm::Protocol::FsBroker::Handler::mkdirs
void mkdirs(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Mkdirs.h:17
SWC::Comm::Protocol::FsBroker::Handler::readdir
void readdir(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Readdir.h:17
SWC::FsBroker::AppContext::CommandHandler
Definition: AppContext.h:45
SWC::FsBroker::AppContext::net_bytes_sent
void net_bytes_sent(const Comm::ConnHandlerPtr &conn, size_t b) noexcept override
Definition: AppContext.h:202
Close.h
SWC::Comm::Protocol::FsBroker::Handler::open
void open(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Open.h:17
SWC::FsBroker::AppContext::net_bytes_received
void net_bytes_received(const Comm::ConnHandlerPtr &conn, size_t b) noexcept override
Definition: AppContext.h:208
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Comm::Protocol::FsBroker::Handler::pread
void pread(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Pread.h:17
SWC::FsBroker::AppContext::stop
void stop() override
Definition: AppContext.h:247
SWC::Env::FsBroker::metrics_track
static SWC_CAN_INLINE SWC::FsBroker::Metric::Reporting::Ptr & metrics_track() noexcept
Definition: FsBrokerEnv.h:79
SWC::Env::FsBroker::can_process
static SWC_CAN_INLINE bool can_process() noexcept
Definition: FsBrokerEnv.h:99
SWC::Env::FsBroker::in_process
static SWC_CAN_INLINE int64_t in_process() noexcept
Definition: FsBrokerEnv.h:89
SWC::Comm::Protocol::FsBroker::MAX_CMD
@ MAX_CMD
Maximum code marker.
Definition: Commands.h:50
SWC::Env::IoCtx::init
static SWC_CAN_INLINE void init(int32_t size)
Definition: IoContext.h:118
SWC::Comm::AppContext
Definition: AppContext.h:21
Serialization.h
SWC::Comm::Protocol::FsBroker::Handler::length
void length(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Length.h:17
SWC::Comm::Protocol::Common::Handler::not_implemented
void not_implemented(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: NotImplemented.h:16
SWC::Comm::server::SerializedServer::Ptr
std::shared_ptr< SerializedServer > Ptr
Definition: SerializedServer.h:62
Create.h
SWC::client::ContextManager
Definition: ContextManager.h:15
SWC::FsBroker::AppContext::AppContext
AppContext()
Definition: AppContext.h:101
SWC::FsBroker::AppContext::CommandHandler::~CommandHandler
~CommandHandler() noexcept
Definition: AppContext.h:88
SWC::Comm::Protocol::FsBroker::Handler::write
void write(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Write.h:17
SWC::FsBroker::AppContext::CommandHandler::operator=
CommandHandler & operator=(CommandHandler &&)=delete
SWC::FsBroker::AppContext::CommandHandler::conn
Comm::ConnHandlerPtr conn
Definition: AppContext.h:73
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
Sync.h
SWC::Core::Vector< EndPoint >
SWC::client::ContextRanger::Ptr
std::shared_ptr< ContextRanger > Ptr
Definition: ContextRanger.h:18
SWC::Comm::Protocol::FsBroker::Handler::remove
void remove(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Remove.h:17
SWC::client::ContextBroker
Definition: ContextBroker.h:15
NotImplemented.h
SWC::Comm::AppHandler_t
void(* AppHandler_t)(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: AppHandler.h:15
SWC::FsBroker::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:99
Append.h
SWC::Comm::Protocol::FsBroker::Handler::rename
void rename(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Rename.h:17
SWC::Env::FsBroker::shuttingdown
static void shuttingdown()
Definition: FsBrokerEnv.h:103
Flush.h
Read.h
SWC::Comm::Protocol::FsBroker::Handler::flush
void flush(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Flush.h:17
SWC::Env::Clients::reset
static void reset() noexcept
Definition: Clients.h:308
SWC::FsBroker::AppContext::handle_disconnect
void handle_disconnect(Comm::ConnHandlerPtr conn) noexcept override
Definition: AppContext.h:173
SWC::FsBroker::AppContext::CommandHandler::CommandHandler
SWC_CAN_INLINE CommandHandler(CommandHandler &&other) noexcept
Definition: AppContext.h:82
SWC::client::Clients::make
static Ptr make(const Config::Settings &settings, const Comm::IoContextPtr &io_ctx, const ContextManager::Ptr &mngr_ctx, const ContextRanger::Ptr &rgr_ctx, const ContextBroker::Ptr &bkr_ctx)
Definition: Clients.cc:13
SWC::Env::IoCtx::io
static Comm::IoContextPtr io()
Definition: IoContext.h:132
SWC::Env::IoCtx::reset
static void reset() noexcept
Definition: IoContext.h:148
FsBrokerEnv.h
SWC::Comm::Protocol::FsBroker::Handler::combi_pread
void combi_pread(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: CombiPread.h:17
ReadAll.h
SWC::Comm::Event::Ptr
std::shared_ptr< Event > Ptr
Definition: Event.h:33
Pread.h
SWC::FsBroker::AppContext::net_accepted
void net_accepted(const Comm::EndPoint &endpoint, bool secure) noexcept override
Definition: AppContext.h:214
SWC_ASSERT
#define SWC_ASSERT(_e_)
Definition: Exception.h:165
SWC::Comm::Protocol::FsBroker::Handler::append
void append(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Append.h:17
SWC::FsBroker::AppContext::init
void init(const std::string &host, const Comm::EndPoints &endpoints) override
Definition: AppContext.h:149
SWC::FsBroker::Metric::Reporting::Ptr
std::shared_ptr< Reporting > Ptr
Definition: MetricsReporting.h:26
Interface.h
SWC::FsBroker::AppContext::handle
void handle(Comm::ConnHandlerPtr conn, const Comm::Event::Ptr &ev) override
Definition: AppContext.h:179
Open.h
SWC::Config::Property::Value_int32_g
Definition: Property.h:586
SWC::Env::FsInterface::reset
static void reset() noexcept
Definition: Interface.h:159
SWC::client::ContextRanger
Definition: ContextRanger.h:15
Readdir.h