SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
AppContext.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 #ifndef swcdb_app_thriftbroker_AppContext_h
7 #define swcdb_app_thriftbroker_AppContext_h
8 
9 
12 
13 
14 namespace SWC {
15 
16 
17 namespace client {
19 namespace ThriftBroker { }
20 }
21 
22 
23 namespace ThriftBroker {
24 
25 
26 class AppContext final : virtual public BrokerIfFactory,
27  public std::enable_shared_from_this<AppContext> {
28  public:
29 
30  typedef std::shared_ptr<AppContext> Ptr;
31 
33  : m_mutex(), m_run(true), m_cv(), m_connections(),
34  m_mutex_handlers(), m_metrics(nullptr), m_handlers() {
35  auto settings = Env::Config::settings();
36  Env::IoCtx::init(settings->get_i32("swc.ThriftBroker.clients.handlers"));
37 
38  int sig = 0;
39  Env::IoCtx::io()->set_signals();
40  shutting_down(std::error_code(), sig);
41 
44  *settings,
47  new client::ContextManager(*settings)),
49  new client::ContextRanger(*settings))
50  )->init()
51  );
52  /*
53  Env::FsInterface::init(
54  settings, FS::fs_type(settings->get_str("swc.fs")));
55  */
56  }
57 
58  void init(const std::string& host, const Comm::EndPoints& endpoints) {
60  [ptr=shared_from_this()] (size_t bytes) {
61  return ptr->release(bytes);
62  }
63  );
64 
65  auto period = Env::Config::settings()
66  ->get<Config::Property::Value_int32_g>("swc.cfg.dyn.period");
67  if(period->get()) {
68  Env::IoCtx::io()->set_periodic_timer(
69  period,
70  []() noexcept { Env::Config::settings()->check_dynamic_files(); }
71  );
72  }
74  m_metrics->configure_thriftbroker(host.c_str(), endpoints);
75  m_metrics->start();
76  }
77  }
78 
79  virtual ~AppContext() noexcept { }
80 
81  void wait_while_run() {
82  Core::UniqueLock lock_wait(m_mutex);
83  m_cv.wait(lock_wait, [this]{return !m_run;});
84  }
85 
86  BrokerIf* getHandler(const thrift::TConnectionInfo& connInfo) override {
87  auto socket = std::dynamic_pointer_cast<thrift::transport::TSocket>(
88  connInfo.transport);
89  if(!socket)
90  Converter::exception(Error::CANCELLED, "Bad Transport Socket");
91 
92  AppHandler* handler = new AppHandler(socket);
93  if(handler->socket) try {
95  SWC_LOG_OSTREAM << "Connection Opened(hdlr=" << size_t(handler)
96  << " [" << handler->socket->getPeerAddress() << "]:"
97  << handler->socket->getPeerPort()
98  << ") open=" << m_connections.increment_and_count();
99  );
100  } catch(...) {
102  }
103  if(m_metrics)
104  m_metrics->net->connected();
106  m_handlers.emplace(handler);
107  return handler;
108  }
109 
110  void releaseHandler(ServiceIf* hdlr) override {
111  AppHandler* handler = dynamic_cast<AppHandler*>(hdlr);
112  size_t remain_open = m_connections.decrement_and_count();
113  if(handler) {
114  handler->disconnected();
115 
116  if(handler->socket) try {
118  SWC_LOG_OSTREAM << "Connection Closed(hdlr=" << size_t(handler)
119  << " [" << handler->socket->getPeerAddress() << "]:"
120  << handler->socket->getPeerPort()
121  << ") open=" << remain_open;
122  );
123  } catch(...) {
125  }
126  } else {
128  SWC_LOG_OSTREAM << "Connection Closed(hdlr=" << size_t(handler)
129  << " BAD CAST) open=" << remain_open; );
130  }
131  if(m_metrics)
132  m_metrics->net->disconnected();
134  auto it = m_handlers.find(handler);
135  if(it != m_handlers.cend())
136  m_handlers.erase(it);
137  }
138 
139  void shutting_down(const std::error_code& ec, const int& sig) {
140  if(!sig) { // set signals listener
141  struct Task {
142  AppContext* ptr;
144  Task(AppContext* a_ptr) noexcept : ptr(a_ptr) { }
145  void operator()(const std::error_code& ec, const int &sig) {
146  if(ec == asio::error::operation_aborted)
147  return;
148  SWC_LOGF(LOG_INFO, "Received signal, sig=%d ec=%s",
149  sig, ec.message().c_str());
150  ptr->shutting_down(ec, sig);
151  }
152  };
153  Env::IoCtx::io()->signals->async_wait(Task(this));
154 
155  SWC_LOGF(LOG_INFO, "Listening for Shutdown signal, set at sig=%d ec=%s",
156  sig, ec.message().c_str());
157  return;
158  } else {
159 
160  bool at = true;
161  if(!m_run.compare_exchange_weak(at, false))
162  return;
163  }
164 
165  SWC_LOGF(LOG_INFO, "Shutdown signal, sig=%d ec=%s", sig, ec.message().c_str());
166  std::shared_ptr<std::thread> d(new std::thread);
167  *d.get() = std::thread([d, ptr=shared_from_this()]{ ptr->stop(); });
168  d->detach();
169  }
170 
171  private:
172 
173  void stop() {
174 
176 
177  #if defined(SWC_ENABLE_SANITIZER)
178  if(m_metrics)
179  m_metrics->wait();
180  #endif
181 
182  for(AppHandler::Ptr hdlr;;) {
183  {
185  if(m_handlers.empty())
186  break;
187  hdlr = *m_handlers.cbegin();
188  }
189  hdlr->stop();
190  std::this_thread::yield();
191  }
192 
193  Env::Clients::get()->stop_services();
194  Env::IoCtx::io()->stop();
195 
196  //Env::FsInterface::interface()->stop();
197 
198  {
200  m_cv.notify_all();
201  }
202 
203  #if defined(SWC_ENABLE_SANITIZER)
204  std::this_thread::sleep_for(std::chrono::seconds(2));
205  m_metrics = nullptr;
208  #endif
209  }
210 
211  size_t release(size_t bytes) {
212  size_t released = 0;
213  try {
214  AppHandler::Ptr handler;
215  for(size_t idx=0;;++idx) {
216  {
218  if(idx >= m_handlers.size())
219  break;
220  auto it = m_handlers.cbegin();
221  for(size_t i = idx; i; --i, ++it);
222  handler = *it;
223  }
224  size_t sz = handler->updaters_commit(bytes);
225  if(bytes) {
226  if(bytes <= (released += sz))
227  break;
228  bytes -= sz;
229  }
230  }
231  } catch (...) {
233  }
234  return released;
235  }
236 
237  std::mutex m_mutex;
239  std::condition_variable m_cv;
241 
243 
245 
246  struct HandlerLess {
247  using is_transparent = void;
248  bool operator()(const AppHandler::Ptr& lhs,
249  const AppHandler::Ptr& rhs) const noexcept {
250  return lhs.get() < rhs.get();
251  }
252  bool operator()(const AppHandler::Ptr& lhs,
253  const AppHandler* rhs) const noexcept {
254  return lhs.get() < rhs;
255  }
256  bool operator()(const AppHandler* lhs,
257  const AppHandler::Ptr& rhs) const noexcept {
258  return lhs < rhs.get();
259  }
260  };
261  std::set<AppHandler::Ptr, HandlerLess> m_handlers;
262 
263 };
264 
265 
266 
267 }}
268 
269 
270 #include "swcdb/thrift/gen-cpp/Broker.cpp"
271 #ifdef SWC_IMPL_SOURCE
272 #include "swcdb/thrift/gen-cpp/Service.cpp"
273 #include "swcdb/thrift/gen-cpp/Service_types.cpp"
274 //#include "swcdb/thrift/Converters.cc"
275 #endif
276 
277 
278 #endif // swcdb_app_thriftbroker_AppContext_h
SWC::Core::AtomicBase::compare_exchange_weak
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
Definition: Atomic.h:52
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Core::AtomicBase< bool >
SWC::ThriftBroker::AppContext::HandlerLess::operator()
bool operator()(const AppHandler::Ptr &lhs, const AppHandler *rhs) const noexcept
Definition: AppContext.h:252
SWC::ThriftBroker::AppContext::wait_while_run
void wait_while_run()
Definition: AppContext.h:81
SWC::ThriftBroker::AppContext::~AppContext
virtual ~AppContext() noexcept
Definition: AppContext.h:79
SWC::Env::Clients::get
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
Definition: Clients.h:299
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::ThriftBroker::AppHandler::Ptr
std::shared_ptr< AppHandler > Ptr
Definition: AppHandler.h:39
SWC::Core::UniqueLock
Definition: MutexLock.h:68
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::ThriftBroker::AppContext::m_cv
std::condition_variable m_cv
Definition: AppContext.h:239
SWC::client::ContextManager::Ptr
std::shared_ptr< ContextManager > Ptr
Definition: ContextManager.h:18
SWC::Core::ScopedLock
Definition: MutexLock.h:41
SWC::Env::Config::settings
static SWC::Config::Settings::Ptr & settings()
Definition: Settings.h:128
SWC::ThriftBroker::AppContext::HandlerLess
Definition: AppContext.h:246
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Thrift::Converter::exception
void exception(int err, const std::string &msg="")
Definition: Converter.h:23
SWC::ThriftBroker::Metric::Reporting::Ptr
std::shared_ptr< Reporting > Ptr
Definition: MetricsReporting.h:37
SWC::ThriftBroker::AppContext::getHandler
BrokerIf * getHandler(const thrift::TConnectionInfo &connInfo) override
Definition: AppContext.h:86
SWC::ThriftBroker::AppHandler::socket
const std::shared_ptr< thrift::transport::TSocket > socket
Definition: AppHandler.h:41
SWC::ThriftBroker::AppContext::stop
void stop()
Definition: AppContext.h:173
SWC::Env::Clients::init
static void init(const client::Clients::Ptr &clients)
Definition: Clients.cc:182
AppHandler.h
SWC::ThriftBroker::AppHandler
Definition: AppHandler.h:36
ThriftBrokerEnv.h
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::ThriftBroker::AppContext::m_mutex
std::mutex m_mutex
Definition: AppContext.h:237
SWC::ThriftBroker::AppContext::m_connections
Core::CompletionCounter< size_t > m_connections
Definition: AppContext.h:240
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::ThriftBroker::AppContext::m_handlers
std::set< AppHandler::Ptr, HandlerLess > m_handlers
Definition: AppContext.h:261
SWC::Env::ThriftBroker::stop
static void stop()
Definition: ThriftBrokerEnv.h:38
SWC::ThriftBroker::AppContext::m_metrics
Metric::Reporting::Ptr m_metrics
Definition: AppContext.h:244
SWC::Core::CompletionCounter< size_t >
SWC::Env::IoCtx::init
static SWC_CAN_INLINE void init(int32_t size)
Definition: IoContext.h:118
SWC::ThriftBroker::AppContext::HandlerLess::operator()
bool operator()(const AppHandler::Ptr &lhs, const AppHandler::Ptr &rhs) const noexcept
Definition: AppContext.h:248
SWC::ThriftBroker::AppContext::release
size_t release(size_t bytes)
Definition: AppContext.h:211
SWC::client::ContextManager
Definition: ContextManager.h:15
SWC::ThriftBroker::AppHandler::disconnected
void disconnected()
Definition: AppHandler.h:617
SWC::ThriftBroker::AppContext
Definition: AppContext.h:27
SWC::ThriftBroker::AppContext::HandlerLess::is_transparent
void is_transparent
Definition: AppContext.h:247
SWC::Core::Vector< EndPoint >
SWC::client::ContextRanger::Ptr
std::shared_ptr< ContextRanger > Ptr
Definition: ContextRanger.h:18
SWC::ThriftBroker::AppContext::HandlerLess::operator()
bool operator()(const AppHandler *lhs, const AppHandler::Ptr &rhs) const noexcept
Definition: AppContext.h:256
SWC::ThriftBroker::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:30
SWC::Env::ThriftBroker::metrics_track
static SWC_CAN_INLINE SWC::ThriftBroker::Metric::Reporting::Ptr & metrics_track() noexcept
Definition: ThriftBrokerEnv.h:25
SWC::Env::Clients::reset
static void reset() noexcept
Definition: Clients.h:308
SWC::ThriftBroker::AppContext::m_mutex_handlers
Core::MutexSptd m_mutex_handlers
Definition: AppContext.h:242
SWC::ThriftBroker::AppContext::AppContext
AppContext()
Definition: AppContext.h:32
SWC::Core::MutexSptd
Definition: MutexSptd.h:16
SWC::client::Clients::make
static Ptr make(const Config::Settings &settings, const Comm::IoContextPtr &io_ctx, const ContextManager::Ptr &mngr_ctx, const ContextRanger::Ptr &rgr_ctx, const ContextBroker::Ptr &bkr_ctx)
Definition: Clients.cc:13
SWC::Env::IoCtx::io
static Comm::IoContextPtr io()
Definition: IoContext.h:132
SWC::ThriftBroker::AppContext::releaseHandler
void releaseHandler(ServiceIf *hdlr) override
Definition: AppContext.h:110
SWC::ThriftBroker::AppContext::m_run
Core::AtomicBool m_run
Definition: AppContext.h:238
SWC::Env::IoCtx::reset
static void reset() noexcept
Definition: IoContext.h:148
SWC::Env::ThriftBroker::init
static void init(System::Mem::ReleaseCall_t &&release_call)
Definition: ThriftBrokerEnv.h:19
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::ThriftBroker::AppContext::init
void init(const std::string &host, const Comm::EndPoints &endpoints)
Definition: AppContext.h:58
SWC::Error::CANCELLED
@ CANCELLED
Definition: Error.h:56
SWC::Core::CompletionCounter::decrement_and_count
constexpr SWC_CAN_INLINE CountT decrement_and_count() noexcept
Definition: CompletionCounter.h:52
SWC_LOG_CURRENT_EXCEPTION
#define SWC_LOG_CURRENT_EXCEPTION(_s_)
Definition: Exception.h:144
SWC::Core::CompletionCounter::increment_and_count
constexpr SWC_CAN_INLINE CountT increment_and_count() noexcept
Definition: CompletionCounter.h:47
SWC::Config::Property::Value_int32_g
Definition: Property.h:586
SWC::client::ContextRanger
Definition: ContextRanger.h:15
SWC::ThriftBroker::AppContext::shutting_down
void shutting_down(const std::error_code &ec, const int &sig)
Definition: AppContext.h:139