SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
ConnHandler.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #ifndef swcdb_core_comm_ConnHandler_h
8 #define swcdb_core_comm_ConnHandler_h
9 
10 
11 #include "swcdb/core/Exception.h"
15 #include "swcdb/core/comm/Event.h"
20 
21 
22 namespace SWC { namespace Comm {
23 
24 using SocketLayer = asio::ip::tcp::socket::lowest_layer_type;
25 using SocketPlain = asio::ip::tcp::socket;
26 using SocketSSL = asio::ssl::stream<asio::ip::tcp::socket>;
27 
28 
29 class ConnHandler : public std::enable_shared_from_this<ConnHandler> {
30 
31 
32  struct Outgoing final {
35 
37  Outgoing() noexcept : cbuf(nullptr), hdlr(nullptr) { }
38 
40  Outgoing(Buffers::Ptr&& a_cbuf, DispatchHandler::Ptr&& a_hdlr) noexcept
41  : cbuf(std::move(a_cbuf)), hdlr(std::move(a_hdlr)) {
42  }
43 
45  Outgoing(Outgoing&& other) noexcept
46  : cbuf(std::move(other.cbuf)), hdlr(std::move(other.hdlr)) {
47  }
48 
50  ~Outgoing() noexcept { }
51 
53  Outgoing& operator=(Outgoing&& other) noexcept {
54  cbuf = std::move(other.cbuf);
55  hdlr = std::move(other.hdlr);
56  return *this;
57  }
58 
59  Outgoing(const Outgoing&) = delete;
60 
61  Outgoing& operator=(const Outgoing&) = delete;
62  };
63 
64  struct Pending final {
66  asio::high_resolution_timer* timer;
67 
69  Pending() noexcept : hdlr(nullptr), timer(nullptr) { }
70 
72  Pending(DispatchHandler::Ptr&& a_hdlr) noexcept
73  : hdlr(std::move(a_hdlr)), timer(nullptr) {
74  }
75 
77  Pending(Pending&& other) noexcept
78  : hdlr(std::move(other.hdlr)), timer(other.timer) {
79  other.timer = nullptr;
80  }
81 
83  ~Pending() noexcept {
84  delete timer;
85  }
86 
88  Pending& operator=(Pending&& other) noexcept {
89  hdlr = std::move(other.hdlr);
90  timer = other.timer;
91  other.timer = nullptr;
92  return *this;
93  }
94 
95  Pending(const Pending&) = delete;
96 
97  Pending& operator=(const Pending&) = delete;
98 
99  };
100 
101 
102  public:
103 
104  ConnHandler(const ConnHandler&) = delete;
106  ConnHandler& operator=(const ConnHandler&) = delete;
108 
113 
115  size_t endpoint_remote_hash() const noexcept {
117  }
118 
120  size_t endpoint_local_hash() const noexcept {
122  }
123 
125  Core::Encoder::Type get_encoder() const noexcept {
126  return !app_ctx->cfg_encoder ||
127  endpoint_local.address() == endpoint_remote.address()
128  ? Core::Encoder::Type::PLAIN
129  : Core::Encoder::Type(app_ctx->cfg_encoder->get());
130  }
131 
132  virtual bool is_secure() const noexcept { return false; };
133 
134  void new_connection();
135 
136  virtual bool is_open() const noexcept = 0;
137 
138  size_t pending_read() noexcept;
139 
141  bool due() {
142  return m_outgoing.is_active() || m_outgoing.size() || pending_read();
143  }
144 
145  virtual void do_close() noexcept = 0;
146 
147  bool send_error(int error, const std::string& msg,
148  const Event::Ptr& ev) noexcept;
149 
150  bool response_ok(const Event::Ptr& ev) noexcept;
151 
152  bool send_response(Buffers::Ptr cbuf,
153  DispatchHandler::Ptr hdlr=nullptr) noexcept;
154 
155  bool send_request(Buffers::Ptr cbuf, DispatchHandler::Ptr hdlr);
156 
157  void print(std::ostream& out) const;
158 
159  protected:
160 
162  ConnHandler(AppContext::Ptr& a_app_ctx) noexcept
163  : connected(true), app_ctx(a_app_ctx),
165  m_mutex(), m_next_req_id(0),
167  }
168 
170  ConnHandlerPtr ptr() noexcept {
171  return shared_from_this();
172  }
173 
174  virtual ~ConnHandler() noexcept { }
175 
176  virtual SocketLayer* socket_layer() noexcept = 0;
177 
178  virtual void read(uint8_t** bufp, size_t* remainp, asio::error_code& ec) = 0;
179 
180 
181  struct Sender_noAck;
182  struct Sender_Ack;
183 
184  virtual void do_async_write(
185  Core::Vector<asio::const_buffer>&& buffers, Sender_noAck&& hdlr)
186  noexcept = 0;
187  virtual void do_async_write(
188  Core::Vector<asio::const_buffer>&& buffers, Sender_Ack&& hdlr)
189  noexcept = 0;
190 
191 
192  struct Receiver_HeaderPrefix;
193  struct Receiver_Header;
194  struct Receiver_Buffer;
195 
196  virtual void do_async_read(uint8_t* data, uint32_t sz,
197  Receiver_HeaderPrefix&& hdlr) noexcept = 0;
198  virtual void do_async_read(uint8_t* data, uint32_t sz,
199  Receiver_Header&& hdlr) noexcept = 0;
200  virtual void do_async_read(uint8_t* data, uint32_t sz,
201  Receiver_Buffer&& hdlr) noexcept = 0;
202 
203 
204  void do_close_run() noexcept;
205 
206  void disconnected() noexcept;
207 
208  Core::MutexSptd m_mutex;
209 
210  private:
211 
212  void write_or_queue(Outgoing&& outgoing);
213 
214  void write_next();
215 
216  void write(Outgoing& outgoing);
217 
218  void read() noexcept;
219 
220  void recv_buffers(Event::Ptr&& ev);
221 
222  void received(Event::Ptr&& ev) noexcept;
223 
225  void do_close_recv() noexcept {
226  if(m_recv_bytes)
227  app_ctx->net_bytes_received(ptr(), m_recv_bytes);
228  do_close();
229  }
230 
231  void run_pending(Event::Ptr&& ev);
232 
233  struct PendingHash {
235  size_t operator()(const uint32_t id) const {
236  return id >> 12;
237  }
238  };
239 
240  uint32_t m_next_req_id;
242  std::unordered_map<uint32_t,
243  Pending,
245  size_t m_recv_bytes;
247 };
248 
249 
250 
251 
252 
253 class ConnHandlerPlain final : public ConnHandler {
254  public:
255 
256  typedef std::shared_ptr<ConnHandlerPlain> Ptr;
257 
258  static Ptr make(AppContext::Ptr& app_ctx, SocketPlain& socket);
259 
260  virtual ~ConnHandlerPlain() noexcept;
261 
262  void do_close() noexcept override;
263 
264  bool is_open() const noexcept override;
265 
266  protected:
267 
268  SocketLayer* SWC_CONST_FUNC socket_layer() noexcept override;
269 
270  void read(uint8_t** bufp, size_t* remainp, asio::error_code& ec) override;
271 
272  void do_async_write(
273  Core::Vector<asio::const_buffer>&& buffers, Sender_noAck&& hdlr)
274  noexcept override;
275  void do_async_write(
276  Core::Vector<asio::const_buffer>&& buffers, Sender_Ack&& hdlr)
277  noexcept override;
278 
279  void do_async_read(uint8_t* data, uint32_t sz,
280  Receiver_HeaderPrefix&& hdlr) noexcept override;
281  void do_async_read(uint8_t* data, uint32_t sz,
282  Receiver_Header&& hdlr) noexcept override;
283  void do_async_read(uint8_t* data, uint32_t sz,
284  Receiver_Buffer&& hdlr) noexcept override;
285 
286  private:
287 
288  ConnHandlerPlain(AppContext::Ptr& app_ctx, SocketPlain& socket) noexcept;
289 
291 
292 };
293 
294 
295 
296 class ConnHandlerSSL final : public ConnHandler {
297  public:
298 
299  typedef std::shared_ptr<ConnHandlerSSL> Ptr;
300 
301  static Ptr make(AppContext::Ptr& app_ctx, asio::ssl::context& ssl_ctx,
302  SocketPlain& socket);
303 
304  virtual ~ConnHandlerSSL() noexcept;
305 
306  bool is_secure() const noexcept override { return true; }
307 
308  void do_close() noexcept override;
309 
310  bool is_open() const noexcept override;
311 
312  template<typename T>
313  void set_verify(T&& hdlr) noexcept {
314  m_sock.set_verify_callback(std::move(hdlr));
315  }
316 
317  template<typename T>
318  void handshake(SocketSSL::handshake_type typ, T&& hdlr) noexcept {
319  /* any options ?
320  asio::error_code ec;
321  m_sock.lowest_layer().non_blocking(true, ec);
322  if(ec)
323  hdlr(ec);
324  else
325  */
326  m_sock.async_handshake(typ, std::move(hdlr));
327  }
328 
329  void handshake(SocketSSL::handshake_type typ,
330  asio::error_code& ec) noexcept;
331 
332  protected:
333 
334  SocketLayer* SWC_CONST_FUNC socket_layer() noexcept override;
335 
336  void read(uint8_t** bufp, size_t* remainp, asio::error_code& ec) override;
337 
338  void do_async_write(
339  Core::Vector<asio::const_buffer>&& buffers, Sender_noAck&& hdlr)
340  noexcept override;
341  void do_async_write(
342  Core::Vector<asio::const_buffer>&& buffers, Sender_Ack&& hdlr)
343  noexcept override;
344 
345  void do_async_read(uint8_t* data, uint32_t sz,
346  Receiver_HeaderPrefix&& hdlr) noexcept override;
347  void do_async_read(uint8_t* data, uint32_t sz,
348  Receiver_Header&& hdlr) noexcept override;
349  void do_async_read(uint8_t* data, uint32_t sz,
350  Receiver_Buffer&& hdlr) noexcept override;
351 
352  private:
353 
354  ConnHandlerSSL(AppContext::Ptr& app_ctx, asio::ssl::context& ssl_ctx,
355  SocketPlain& socket) noexcept;
356 
358  asio::strand<SocketSSL::executor_type> m_strand;
359 
360 };
361 
362 
363 }} // namespace SWC::Comm
364 
365 
366 
367 #ifdef SWC_IMPL_SOURCE
369 #endif
370 
371 #endif // swcdb_core_comm_ConnHandler_h
StateRunning.h
SWC::Comm::ConnHandler::endpoint_remote_hash
SWC_CAN_INLINE size_t endpoint_remote_hash() const noexcept
Definition: ConnHandler.h:115
SWC::Comm::ConnHandler::do_async_read
virtual void do_async_read(uint8_t *data, uint32_t sz, Receiver_HeaderPrefix &&hdlr) noexcept=0
SWC::Comm::ConnHandler::Outgoing::hdlr
DispatchHandler::Ptr hdlr
Definition: ConnHandler.h:34
SWC::Comm::ConnHandler::Pending::Pending
SWC_CAN_INLINE Pending(Pending &&other) noexcept
Definition: ConnHandler.h:77
SWC::Comm::ConnHandler::Pending
Definition: ConnHandler.h:64
SWC::Comm::ConnHandler::socket_layer
virtual SocketLayer * socket_layer() noexcept=0
SWC::Comm::ConnHandler::Outgoing::operator=
Outgoing & operator=(const Outgoing &)=delete
SWC::Comm::ConnHandler::Sender_noAck
Definition: ConnHandler.cc:94
SWC::Comm::ConnHandler::send_response
bool send_response(Buffers::Ptr cbuf, DispatchHandler::Ptr hdlr=nullptr) noexcept
Definition: ConnHandler.cc:51
SWC::Core::AtomicBase< bool >
SWC::Comm::ConnHandlerPlain::m_sock
SocketPlain m_sock
Definition: ConnHandler.h:290
SWC::Comm::ConnHandler::Outgoing::Outgoing
Outgoing(const Outgoing &)=delete
SWC::Comm::Buffers::Ptr
std::shared_ptr< Buffers > Ptr
Definition: Buffers.h:23
QueueSafeStated.h
data
T data
Definition: BitFieldInt.h:1
SWC::Comm::ConnHandler::Outgoing::operator=
SWC_CAN_INLINE Outgoing & operator=(Outgoing &&other) noexcept
Definition: ConnHandler.h:53
SWC::Comm::ConnHandler::Receiver_Header
Definition: ConnHandler.cc:270
SWC::Comm::ConnHandlerSSL::Ptr
std::shared_ptr< ConnHandlerSSL > Ptr
Definition: ConnHandler.h:299
SWC::Comm::ConnHandler::is_open
virtual bool is_open() const noexcept=0
SWC::Core::Encoder::Type
Type
Definition: Encoder.h:28
SWC::Comm::ConnHandler::Outgoing
Definition: ConnHandler.h:32
SWC::Comm::EndPoint
asio::ip::tcp::endpoint EndPoint
Definition: Resolver.h:19
SWC::Comm::ConnHandler::app_ctx
const AppContext::Ptr app_ctx
Definition: ConnHandler.h:110
SWC::Comm::ConnHandler::read
void read() noexcept
Definition: ConnHandler.cc:411
SWC::Comm::ConnHandlerSSL::handshake
void handshake(SocketSSL::handshake_type typ, T &&hdlr) noexcept
Definition: ConnHandler.h:318
SWC::Comm::ConnHandler::Pending::timer
asio::high_resolution_timer * timer
Definition: ConnHandler.h:66
SWC::Comm::ConnHandler::received
void received(Event::Ptr &&ev) noexcept
Definition: ConnHandler.cc:438
SWC::Comm::ConnHandler::endpoint_remote
EndPoint endpoint_remote
Definition: ConnHandler.h:111
SWC::Comm::SocketPlain
asio::ip::tcp::socket SocketPlain
Definition: ConnHandler.h:25
SWC::Comm::ConnHandler::Pending::Pending
SWC_CAN_INLINE Pending(DispatchHandler::Ptr &&a_hdlr) noexcept
Definition: ConnHandler.h:72
SWC::Comm::SocketLayer
asio::ip::tcp::socket::lowest_layer_type SocketLayer
Definition: ConnHandler.h:24
SWC::Comm::ConnHandlerPlain::do_async_read
void do_async_read(uint8_t *data, uint32_t sz, Receiver_HeaderPrefix &&hdlr) noexcept override
Definition: ConnHandler.cc:626
SWC::Comm::Header::MAX_LENGTH
static const uint8_t MAX_LENGTH
Definition: Header.h:23
SWC::Comm::ConnHandler::write_or_queue
void write_or_queue(Outgoing &&outgoing)
Definition: ConnHandler.cc:79
SWC::Comm::ConnHandler::Pending::operator=
Pending & operator=(const Pending &)=delete
SWC::Comm::ConnHandler::m_recv_bytes
size_t m_recv_bytes
Definition: ConnHandler.h:245
SWC::Comm::ConnHandlerPlain::~ConnHandlerPlain
virtual ~ConnHandlerPlain() noexcept
Definition: ConnHandler.cc:580
SWC::Comm::ConnHandler::do_close
virtual void do_close() noexcept=0
SWC::Comm::ConnHandler::m_next_req_id
uint32_t m_next_req_id
Definition: ConnHandler.h:240
SWC::Comm::ConnHandlerPlain
Definition: ConnHandler.h:253
SWC::Comm::ConnHandler::Pending::Pending
Pending(const Pending &)=delete
SWC::Comm::ConnHandler::print
void print(std::ostream &out) const
Definition: ConnHandler.cc:69
SWC::Comm::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:23
SWC::Core::QueueSafeStated
Definition: QueueSafeStated.h:18
Buffers.h
Resolver.h
SWC::Comm::ConnHandler::ConnHandler
ConnHandler(const ConnHandler &)=delete
SWC::Comm::ConnHandler::new_connection
void new_connection()
Definition: ConnHandler.cc:16
SWC::Comm::ConnHandler::write_next
void write_next()
Definition: ConnHandler.cc:86
SWC_CONST_FUNC
#define SWC_CONST_FUNC
Definition: Compat.h:107
SWC::Comm::ConnHandler::recv_buffers
void recv_buffers(Event::Ptr &&ev)
Definition: ConnHandler.cc:421
SWC::Comm::ConnHandler::operator=
ConnHandler & operator=(const ConnHandler &)=delete
AppContext.h
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Comm::ConnHandler::due
SWC_CAN_INLINE bool due()
Definition: ConnHandler.h:141
Exception.h
SWC::Comm::ConnHandlerPlain::is_open
bool is_open() const noexcept override
Definition: ConnHandler.cc:600
SWC::Comm::ConnHandler::~ConnHandler
virtual ~ConnHandler() noexcept
Definition: ConnHandler.h:174
SWC::Comm::ConnHandler::run_pending
void run_pending(Event::Ptr &&ev)
Definition: ConnHandler.cc:489
SWC::Comm::Event
Definition: Event.h:20
SWC::Comm::ConnHandler::_buf_header
uint8_t _buf_header[Header::MAX_LENGTH]
Definition: ConnHandler.h:246
SWC::Comm::ConnHandler::Pending::Pending
SWC_CAN_INLINE Pending() noexcept
Definition: ConnHandler.h:69
SWC::Comm::ConnHandler::send_error
bool send_error(int error, const std::string &msg, const Event::Ptr &ev) noexcept
Definition: ConnHandler.cc:36
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Comm::ConnHandler::operator=
ConnHandler & operator=(ConnHandler &&)=delete
SWC::Comm::ConnHandlerSSL
Definition: ConnHandler.h:296
SWC::Comm::ConnHandler::response_ok
bool response_ok(const Event::Ptr &ev) noexcept
Definition: ConnHandler.cc:45
SWC::Comm::ConnHandler::Outgoing::Outgoing
SWC_CAN_INLINE Outgoing(Outgoing &&other) noexcept
Definition: ConnHandler.h:45
SWC::Comm::ConnHandler::do_async_write
virtual void do_async_write(Core::Vector< asio::const_buffer > &&buffers, Sender_noAck &&hdlr) noexcept=0
SWC::Comm::ConnHandler::Sender_Ack
Definition: ConnHandler.cc:123
SWC::Comm::ConnHandler
Definition: ConnHandler.h:29
SWC::Comm::AppContext
Definition: AppContext.h:21
SWC::Comm::ConnHandler::get_encoder
SWC_CAN_INLINE Core::Encoder::Type get_encoder() const noexcept
Definition: ConnHandler.h:125
SWC::Comm::ConnHandler::endpoint_local
EndPoint endpoint_local
Definition: ConnHandler.h:112
SWC::Comm::ConnHandler::Pending::operator=
SWC_CAN_INLINE Pending & operator=(Pending &&other) noexcept
Definition: ConnHandler.h:88
SWC::Comm::ConnHandler::connected
Core::AtomicBool connected
Definition: ConnHandler.h:109
SWC::Comm::ConnHandlerPlain::socket_layer
SocketLayer *SWC_CONST_FUNC socket_layer() noexcept override
Definition: ConnHandler.cc:604
Event.h
SWC::Comm::ConnHandler::PendingHash
Definition: ConnHandler.h:233
SWC::Comm::Buffers
Definition: Buffers.h:20
SWC::Comm::SocketSSL
asio::ssl::stream< asio::ip::tcp::socket > SocketSSL
Definition: ConnHandler.h:26
SWC::Comm::ConnHandler::ptr
SWC_CAN_INLINE ConnHandlerPtr ptr() noexcept
Definition: ConnHandler.h:170
buffers
uint8_t buffers
number of buffers from 0 to 2 (data+data_ext)
Definition: Header.h:56
SWC::Comm::ConnHandlerPlain::Ptr
std::shared_ptr< ConnHandlerPlain > Ptr
Definition: ConnHandler.h:256
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
SWC::Comm::ConnHandler::PendingHash::operator()
SWC_CAN_INLINE size_t operator()(const uint32_t id) const
Definition: ConnHandler.h:235
asio_wrap.h
SWC::Comm::ConnHandler::write
void write(Outgoing &outgoing)
Definition: ConnHandler.cc:162
SWC::Comm::ConnHandler::do_close_recv
SWC_CAN_INLINE void do_close_recv() noexcept
Definition: ConnHandler.h:225
ConnHandler.cc
SWC::Comm::ConnHandlerPlain::do_async_write
void do_async_write(Core::Vector< asio::const_buffer > &&buffers, Sender_noAck &&hdlr) noexcept override
Definition: ConnHandler.cc:612
SWC::Comm::ConnHandler::m_pending
std::unordered_map< uint32_t, Pending, PendingHash > m_pending
Definition: ConnHandler.h:244
SWC::Comm::DispatchHandler
Definition: DispatchHandler.h:17
SWC::Comm::ConnHandler::Pending::hdlr
DispatchHandler::Ptr hdlr
Definition: ConnHandler.h:65
SWC::Config::T
const uint64_t T
Definition: Property.h:27
SWC::Comm::ConnHandler::send_request
bool send_request(Buffers::Ptr cbuf, DispatchHandler::Ptr hdlr)
Definition: ConnHandler.cc:60
SWC::Comm::ConnHandler::Outgoing::Outgoing
SWC_CAN_INLINE Outgoing() noexcept
Definition: ConnHandler.h:37
SWC::Comm::ConnHandler::pending_read
size_t pending_read() noexcept
Definition: ConnHandler.cc:27
SWC::Comm::ConnHandlerPlain::do_close
void do_close() noexcept override
Definition: ConnHandler.cc:587
SWC::Comm::ConnHandler::Receiver_HeaderPrefix
Definition: ConnHandler.cc:255
SWC::Comm::Event::Ptr
std::shared_ptr< Event > Ptr
Definition: Event.h:33
SWC::Comm::endpoint_hash
size_t SWC_PURE_FUNC endpoint_hash(const EndPoint &endpoint) noexcept
Definition: Resolver.cc:133
SWC::Comm::ConnHandler::disconnected
void disconnected() noexcept
Definition: ConnHandler.cc:459
DispatchHandler.h
SWC::Comm::ConnHandler::Outgoing::Outgoing
SWC_CAN_INLINE Outgoing(Buffers::Ptr &&a_cbuf, DispatchHandler::Ptr &&a_hdlr) noexcept
Definition: ConnHandler.h:40
SWC::Comm::ConnHandler::ConnHandler
ConnHandler(ConnHandler &&)=delete
SWC::Comm::ConnHandler::Pending::~Pending
SWC_CAN_INLINE ~Pending() noexcept
Definition: ConnHandler.h:83
SWC::Comm::DispatchHandler::Ptr
std::shared_ptr< DispatchHandler > Ptr
Definition: DispatchHandler.h:20
SWC::Comm::ConnHandler::endpoint_local_hash
SWC_CAN_INLINE size_t endpoint_local_hash() const noexcept
Definition: ConnHandler.h:120
SWC::Comm::ConnHandler::m_mutex
Core::MutexSptd m_mutex
Definition: ConnHandler.h:208
SWC::Comm::ConnHandlerPlain::make
static Ptr make(AppContext::Ptr &app_ctx, SocketPlain &socket)
Definition: ConnHandler.cc:569
SWC::Comm::ConnHandler::is_secure
virtual bool is_secure() const noexcept
Definition: ConnHandler.h:132
SWC::Comm::ConnHandler::Outgoing::cbuf
Buffers::Ptr cbuf
Definition: ConnHandler.h:33
SWC::Comm::ConnHandler::Outgoing::~Outgoing
SWC_CAN_INLINE ~Outgoing() noexcept
Definition: ConnHandler.h:50
SWC::Comm::ConnHandler::do_close_run
void do_close_run() noexcept
Definition: ConnHandler.cc:32
SWC::Comm::ConnHandler::m_outgoing
Core::QueueSafeStated< Outgoing > m_outgoing
Definition: ConnHandler.h:241
SWC::Comm::ConnHandler::Receiver_Buffer
Definition: ConnHandler.cc:286