SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
SerializedServer.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
9 
10 
11 namespace SWC { namespace Comm { namespace server {
12 
13 
14 
16 
17  struct Handshaker {
20  Handshaker(const ConnHandlerSSL::Ptr& a_conn) : conn(a_conn) { }
22  Handshaker(Handshaker&& other) noexcept : conn(std::move(other.conn)) { }
23  Handshaker(const Handshaker&) = delete;
25  Handshaker& operator=(const Handshaker&) = delete;
26  ~Handshaker() noexcept { }
27  void operator()(const asio::error_code& ec) {
28  if(!ec) {
29  conn->new_connection();
30  } else {
31  SWC_LOGF(LOG_DEBUG, "handshake error=%d(%s)",
32  ec.value(), ec.message().c_str());
33  }
34  }
35  };
36 
39  Mixed(Acceptor* a_acceptor) noexcept : acceptor(a_acceptor) { }
40  void operator()(std::error_code ec, asio::ip::tcp::socket new_sock) {
41  bool need_ssl = false;
42  if(ec) {
43  if(ec.value() == ECANCELED)
44  return;
45  SWC_LOGF(LOG_WARN, "SRV-accept error=%d(%s)",
46  ec.value(), ec.message().c_str());
47 
48  } else if(new_sock.is_open()) {
49  try {
50  need_ssl = acceptor->m_ssl_cfg->need_ssl(
51  new_sock.local_endpoint(ec), new_sock.remote_endpoint(ec));
52  if(!ec) {
53  if(need_ssl) {
54  auto conn = acceptor->m_ssl_cfg->make_connection(
55  acceptor->m_app_ctx, new_sock);
56  conn->handshake(SocketSSL::server, Handshaker(conn));
57  } else {
58  auto conn = ConnHandlerPlain::make(
59  acceptor->m_app_ctx, new_sock);
60  conn->new_connection();
61  }
62  } else if(new_sock.is_open()) {
63  new_sock.close(ec);
64  }
65  } catch(...) {
67  new_sock.close(ec);
68  }
69  }
70 
71  acceptor->m_app_ctx->net_accepted(acceptor->local_endpoint(), need_ssl);
72  acceptor->async_accept(Mixed(acceptor));
73  }
74 };
75 
79  Plain(Acceptor* a_acceptor) noexcept : acceptor(a_acceptor) { }
80  void operator()(const std::error_code& ec, asio::ip::tcp::socket new_sock) {
81  if(ec) {
82  if(ec.value() == ECANCELED)
83  return;
84  SWC_LOGF(LOG_WARN, "SRV-accept error=%d(%s)",
85  ec.value(), ec.message().c_str());
86 
87  } else if(new_sock.is_open()) {
88  try {
89  auto conn = ConnHandlerPlain::make(acceptor->m_app_ctx, new_sock);
90  conn->new_connection();
91  } catch(...) {
93  std::error_code _ec;
94  new_sock.close(_ec);
95  }
96  }
97 
98  acceptor->m_app_ctx->net_accepted(acceptor->local_endpoint(), false);
99  acceptor->async_accept(Plain(acceptor));
100  }
101 };
102 
103 SWC_SHOULD_NOT_INLINE
104 Acceptor::Acceptor(asio::ip::tcp::acceptor& acceptor,
105  AppContext::Ptr& app_ctx,
106  ConfigSSL* ssl_cfg)
107  : asio::ip::tcp::acceptor(std::move(acceptor)),
108  m_app_ctx(app_ctx),
109  m_ssl_cfg(ssl_cfg) {
110  set_option(asio::ip::tcp::acceptor::reuse_address(true));
111  set_option(asio::ip::tcp::no_delay(true));
112 }
113 
115  if(m_ssl_cfg)
116  async_accept(Mixed(this));
117  else
118  async_accept(Plain(this));
119 
121  SWC_LOG_OSTREAM << "Listening On: " << local_endpoint()
122  << " fd=" << native_handle()
123  << ' ' << (m_ssl_cfg ? "SECURE" : "PLAIN");
124  );
125 }
126 
129  SWC_LOG_OSTREAM << "Stopping to Listen On: " << local_endpoint()
130  << " fd=" << native_handle();
131  );
132 
133  if(is_open()) {
134  std::error_code ec;
135  close(ec);
136  }
137 }
138 
139 
140 SWC_SHOULD_NOT_INLINE
142  const Config::Settings& settings,
143  std::string&& name,
144  bool concurrency_relative,
145  uint32_t reactors, uint32_t workers,
146  uint16_t port,
147  AppContext::Ptr app_ctx
148  ) : m_appname(std::move(name)), m_run(true),
149  m_io_contexts(), m_acceptors(), m_mutex(), m_conns(),
150  m_ssl_cfg(
151  settings.get_bool("swc.comm.ssl")
152  ? new ConfigSSL(settings, false)
153  : nullptr
154  ) {
155 
156  SWC_EXPECT(reactors >= 1, Error::CONFIG_BAD_VALUE);
157  if(concurrency_relative) {
158  uint32_t num_cores = std::thread::hardware_concurrency();
159  reactors = num_cores / reactors;
160  if(reactors < 1) reactors = 1;
161  }
162 
163  SWC_LOGF(LOG_INFO, "STARTING SERVER: %s, reactors=%u, workers=%u",
164  m_appname.c_str(), reactors, workers);
165 
166  Config::Strings addrs;
167  if(settings.has("addr"))
168  addrs = settings.get_strs("addr");
169 
170  std::string host;
171  if(settings.has("host"))
172  host = host.append(settings.get_str("host"));
173  else {
174  char hostname[256];
175  if(gethostname(hostname, sizeof(hostname)) == -1)
176  SWC_THROW(errno, "gethostname");
177  host.append(hostname);
178  }
179 
180  Networks nets;
181  asio::error_code ec;
183  settings.get_strs("swc.comm.network.priority"), nets, ec);
184  if(ec)
186  "swc.comm.network.priority error(%s)",
187  ec.message().c_str());
188 
190  port,
191  addrs,
192  host,
193  nets,
194  true
195  );
196  EndPoints endpoints_final;
197 
198  for(uint32_t reactor=0; reactor<reactors; ++reactor) {
200  IoContext::make(m_appname + "-reactor-" + std::to_string(reactor+1),
201  workers
202  ));
203 
204  for (std::size_t i = 0; i < endpoints.size(); ++i) {
205  bool ssl_conn = m_ssl_cfg && m_ssl_cfg->need_ssl(endpoints[i]);
206 
208  SWC_LOG_OSTREAM << "Binding On: " << endpoints[i]
209  << ' ' << (ssl_conn ? "SECURE" : "PLAIN");
210  );
211 
212  if(!reactor) {
213  auto acceptor = asio::ip::tcp::acceptor(
214  m_io_contexts.back()->executor(),
215  endpoints[i]
216  );
218  acceptor, app_ctx, ssl_conn ? m_ssl_cfg : nullptr));
219  endpoints_final.push_back(m_acceptors[i]->sock()->local_endpoint());
220 
221  } else {
222  auto acceptor = asio::ip::tcp::acceptor(
223  m_io_contexts.back()->executor(),
224  endpoints[i].protocol(),
225  dup(m_acceptors[i]->sock()->native_handle())
226  );
228  acceptor, app_ctx, ssl_conn ? m_ssl_cfg : nullptr));
229  }
230  }
231 
232  if(!reactor) {
233  if(nets.empty()) {
234  app_ctx->init(host, endpoints_final);
235  } else {
236  EndPoints sorted;
237  Resolver::sort(nets, endpoints_final, sorted);
238  app_ctx->init(host, sorted);
239  }
240  }
241  }
242 }
243 
244 SWC_SHOULD_NOT_INLINE
246  for(auto& acceptor : m_acceptors)
247  acceptor->accept();
248 
249  for(auto& io : m_io_contexts)
250  io->pool.join();
251 
252  SWC_LOGF(LOG_INFO, "STOPPED SERVER: %s", m_appname.c_str());
253 }
254 
255 std::shared_ptr<IoContext::ExecutorWorkGuard>
257  auto guard = std::shared_ptr<IoContext::ExecutorWorkGuard>(
258  new IoContext::ExecutorWorkGuard(m_io_contexts.back()->executor()));
259 
260  for(auto it = m_acceptors.cbegin(); it != m_acceptors.cend(); ) {
261  (*it)->stop();
262  m_acceptors.erase(it);
263  }
264 
265  SWC_LOGF(LOG_INFO, "STOPPED ACCEPTING: %s", m_appname.c_str());
266  return guard;
267 }
268 
270  SWC_LOGF(LOG_INFO, "STOPPING SERVER: %s", m_appname.c_str());
271  m_run.store(false);
272 
273  ConnHandlerPtr conn;
274  for(;;) {
275  {
277  auto it = m_conns.cbegin();
278  if(it == m_conns.cend())
279  break;
280  conn = *it;
281  m_conns.erase(it);
282  }
283  conn->do_close();
284  }
285  //for(auto& io : m_io_contexts)
286  // io->stop();
287 }
288 
291  m_conns.push_back(conn);
292 
293  //SWC_LOGF(LOG_DEBUG, "%s, conn-add open=%d", m_appname.c_str(), m_conns.size());
294 }
295 
298  for(auto it=m_conns.cbegin(); it != m_conns.cend(); ++it) {
299  if(conn->endpoint_remote == (*it)->endpoint_remote){
300  m_conns.erase(it);
301  break;
302  }
303  }
304  //SWC_LOGF(LOG_DEBUG, "%s, conn-del open=%d", m_appname.c_str(), m_conns.size());
305 }
306 
308  delete m_ssl_cfg;
309 }
310 
311 
312 
313 }}}
SWC::Core::Vector::erase
SWC_CAN_INLINE iterator erase(size_type offset) noexcept(_NoExceptMoveAssign &&_NoExceptDestructor)
Definition: Vector.h:464
SWC::Comm::Protocol::FsBroker::Handler::close
void close(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Close.h:17
SWC::Comm::server::Acceptor::Plain::acceptor
Acceptor * acceptor
Definition: SerializedServer.cc:77
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Config::Properties::get_strs
Strings get_strs(const char *name) const
Definition: Properties.h:85
SWC::Comm::server::Acceptor::Mixed::Handshaker::conn
ConnHandlerSSL::Ptr conn
Definition: SerializedServer.cc:18
SWC::Comm::Resolver::get_networks
void get_networks(const Config::Strings &networks, Networks &nets, asio::error_code &ec)
Definition: Resolver.cc:264
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Comm::server::Acceptor::Mixed
Definition: SerializedServer.cc:15
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
Settings.h
SWC::Comm::ConnHandlerSSL::Ptr
std::shared_ptr< ConnHandlerSSL > Ptr
Definition: ConnHandler.h:299
SWC_THROW
#define SWC_THROW(_code_, _msg_)
Definition: Exception.h:134
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Comm::server::Acceptor::Plain::operator()
void operator()(const std::error_code &ec, asio::ip::tcp::socket new_sock)
Definition: SerializedServer.cc:80
SWC::Comm::server::Acceptor::Acceptor
Acceptor(asio::ip::tcp::acceptor &acceptor, AppContext::Ptr &app_ctx, ConfigSSL *ssl_cfg)
Definition: SerializedServer.cc:104
SWC::Comm::server::SerializedServer::m_mutex
Core::MutexSptd m_mutex
Definition: SerializedServer.h:96
SWC::Comm::server::SerializedServer::m_ssl_cfg
ConfigSSL * m_ssl_cfg
Definition: SerializedServer.h:98
SWC::Comm::server::Acceptor::Mixed::operator()
void operator()(std::error_code ec, asio::ip::tcp::socket new_sock)
Definition: SerializedServer.cc:40
SWC::Comm::server::SerializedServer::connection_add
void connection_add(const ConnHandlerPtr &conn)
Definition: SerializedServer.cc:289
SWC::Comm::server::Acceptor::Mixed::Mixed
SWC_CAN_INLINE Mixed(Acceptor *a_acceptor) noexcept
Definition: SerializedServer.cc:39
SWC::Comm::server::SerializedServer::SerializedServer
SerializedServer(const Config::Settings &settings, std::string &&name, bool concurrency_relative, uint32_t reactors, uint32_t workers, uint16_t port, AppContext::Ptr app_ctx)
Definition: SerializedServer.cc:141
SWC::Comm::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:23
SWC::Comm::server::Acceptor::Mixed::Handshaker::~Handshaker
~Handshaker() noexcept
Definition: SerializedServer.cc:26
SWC::Comm::server::Acceptor::Mixed::Handshaker
Definition: SerializedServer.cc:17
SWC::Comm::server::SerializedServer::m_conns
Core::Vector< ConnHandlerPtr > m_conns
Definition: SerializedServer.h:97
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::Core::Vector::back
constexpr SWC_CAN_INLINE reference back() noexcept
Definition: Vector.h:254
SWC::Comm::server::Acceptor::Mixed::Handshaker::Handshaker
SWC_CAN_INLINE Handshaker(Handshaker &&other) noexcept
Definition: SerializedServer.cc:22
SWC::Comm::server::SerializedServer::shutdown
void shutdown()
Definition: SerializedServer.cc:269
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Comm::server::SerializedServer::m_run
Core::AtomicBool m_run
Definition: SerializedServer.h:92
SWC::Config::Properties::has
bool SWC_PURE_FUNC has(const char *name) const noexcept
Definition: Properties.cc:82
SWC::Comm::server::SerializedServer::m_io_contexts
Core::Vector< IoContextPtr > m_io_contexts
Definition: SerializedServer.h:93
SWC::Comm::server::Acceptor::Plain::Plain
SWC_CAN_INLINE Plain(Acceptor *a_acceptor) noexcept
Definition: SerializedServer.cc:79
SWC_THROWF
#define SWC_THROWF(_code_, _fmt_,...)
Definition: Exception.h:136
SWC::Comm::ConfigSSL
Definition: ConfigSSL.h:18
SWC::Comm::server::Acceptor::m_ssl_cfg
ConfigSSL * m_ssl_cfg
Definition: SerializedServer.h:53
SWC::Comm::server::Acceptor::Mixed::Handshaker::Handshaker
Handshaker(const Handshaker &)=delete
SWC::Comm::Resolver::sort
void sort(const Networks &nets, const EndPoints &endpoints, EndPoints &sorted)
Definition: Resolver.cc:243
SWC::Comm::server::Acceptor::Mixed::Handshaker::operator()
void operator()(const asio::error_code &ec)
Definition: SerializedServer.cc:27
SerializedServer.h
SWC::Config::Settings
Definition: Settings.h:25
SWC::Comm::server::Acceptor::stop
void stop()
Definition: SerializedServer.cc:127
SWC::Comm::server::Acceptor::m_app_ctx
AppContext::Ptr m_app_ctx
Definition: SerializedServer.h:50
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
SWC::Comm::server::Acceptor::accept
void accept()
Definition: SerializedServer.cc:114
SWC::Error::CONFIG_BAD_VALUE
@ CONFIG_BAD_VALUE
Definition: Error.h:81
SWC::Comm::server::Acceptor::Mixed::Handshaker::operator=
Handshaker & operator=(Handshaker &&)=delete
SWC::Core::Vector< std::string >
SWC::Comm::server::Acceptor::Mixed::acceptor
Acceptor * acceptor
Definition: SerializedServer.cc:37
SWC::Config::Properties::get_str
std::string get_str(const char *name) const
Definition: Properties.h:77
SWC::Core::Vector::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC_EXPECT
#define SWC_EXPECT(_e_, _code_)
Definition: Exception.h:158
SWC::Comm::server::SerializedServer::run
void run()
Definition: SerializedServer.cc:245
SWC::Comm::server::Acceptor::Mixed::Handshaker::Handshaker
SWC_CAN_INLINE Handshaker(const ConnHandlerSSL::Ptr &a_conn)
Definition: SerializedServer.cc:20
SWC::Comm::server::SerializedServer::connection_del
void connection_del(const ConnHandlerPtr &conn)
Definition: SerializedServer.cc:296
SWC::Comm::server::SerializedServer::~SerializedServer
~SerializedServer() noexcept
Definition: SerializedServer.cc:307
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Comm::ConfigSSL::need_ssl
SWC_CAN_INLINE bool need_ssl(const EndPoint &remote) const noexcept
Definition: ConfigSSL.h:28
SWC::Comm::IoContext::ExecutorWorkGuard
asio::executor_work_guard< Executor > ExecutorWorkGuard
Definition: IoContext.h:39
SWC::Comm::server::Acceptor::Plain
Definition: SerializedServer.cc:76
SWC::Comm::server::SerializedServer::stop_accepting
std::shared_ptr< IoContext::ExecutorWorkGuard > stop_accepting()
Definition: SerializedServer.cc:256
SWC::Comm::Resolver::get_endpoints
EndPoints get_endpoints(uint16_t defaul_port, const Config::Strings &addrs, const std::string &host, const Networks &nets, bool srv=false)
Definition: Resolver.cc:152
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::Core::Vector::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Core::to_string
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Definition: BitFieldInt.h:263
SWC::Comm::IoContext::make
static IoContextPtr make(std::string &&_name, int32_t size)
Definition: IoContext.h:41
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Comm::ConnHandlerPlain::make
static Ptr make(AppContext::Ptr &app_ctx, SocketPlain &socket)
Definition: ConnHandler.cc:569
SWC_LOG_CURRENT_EXCEPTION
#define SWC_LOG_CURRENT_EXCEPTION(_s_)
Definition: Exception.h:144
SWC::Core::Vector::emplace_back
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
Definition: Vector.h:349
SWC::Comm::server::SerializedServer::m_acceptors
Core::Vector< Acceptor::Ptr > m_acceptors
Definition: SerializedServer.h:94
SWC::Comm::server::Acceptor
Definition: SerializedServer.h:24
SWC::Comm::server::Acceptor::Mixed::Handshaker::operator=
Handshaker & operator=(const Handshaker &)=delete
SWC::Comm::ConfigSSL::make_connection
ConnHandlerSSL::Ptr make_connection(AppContext::Ptr &app_ctx, SocketPlain &socket) const
Definition: ConfigSSL.h:39
SWC::Comm::server::SerializedServer::m_appname
const std::string m_appname
Definition: SerializedServer.h:91