SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
SerializedClient.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #ifndef swcdb_core_comm_SerializedClient_h
8 #define swcdb_core_comm_SerializedClient_h
9 
10 
11 #include "swcdb/core/QueueSafe.h"
15 
16 
17 namespace SWC { namespace Comm {
18 
19 
21 namespace client {
22 
23 
24 
25 class Serialized final : public std::enable_shared_from_this<Serialized> {
26  public:
27 
28  typedef std::shared_ptr<Serialized> Ptr;
29 
30  Serialized(const Config::Settings& settings,
31  std::string&& srv_name,
32  const IoContextPtr& ioctx,
33  const AppContext::Ptr& ctx);
34 
35  Serialized(Serialized&&) = delete;
36  Serialized(const Serialized&) = delete;
38  Serialized& operator=(const Serialized&) = delete;
39 
40  virtual ~Serialized() noexcept;
41 
43  IoContextPtr io() noexcept {
44  return m_ioctx;
45  }
46 
48  const EndPoints& endpoints,
49  const std::chrono::milliseconds& timeout,
50  uint16_t probes
51  ) noexcept;
52 
53 
54  template<typename HdlrT>
57  const EndPoints& endpoints,
58  HdlrT&& cb,
59  const std::chrono::milliseconds& timeout,
60  uint16_t probes) noexcept {
61  get_connection<HdlrT>(endpoints, timeout, probes, std::move(cb));
62  }
63 
64  template<typename HdlrT, typename... DataArgsT>
67  const EndPoints& endpoints,
68  const std::chrono::milliseconds& timeout,
69  uint16_t probes,
70  DataArgsT&&... args) noexcept {
71  m_calls.fetch_add(1);
72  if(m_run) try {
73  if(endpoints.empty()) {
74  SWC_LOGF(LOG_WARN, "get_connection: %s, Empty-Endpoints",
75  m_srv_name.c_str());
76  } else {
77  (void)timeout;
78  return Connector<HdlrT>(
79  shared_from_this(),
80  endpoints,
81  probes,
82  std::forward<DataArgsT>(args)...
83  ).connect();
84  }
85  } catch (...) {
87  }
88  HdlrT(std::forward<DataArgsT>(args)...)(nullptr);
89  m_calls.fetch_sub(1);
90  }
91 
92  void print(std::ostream& out, ConnHandlerPtr& conn);
93 
94  void stop();
95 
96  private:
97 
98  template<typename HdlrT>
99  struct Connector {
100  using SocketPtr = std::shared_ptr<asio::ip::tcp::socket>;
101 
104  //std::chrono::milliseconds timeout;
105  uint16_t probes;
106  uint16_t tries;
107  uint32_t next;
109  HdlrT callback;
110 
111  template<typename... DataArgsT>
114  const EndPoints& a_endpoints, uint16_t a_probes,
115  DataArgsT&&... args)
116  : ptr(std::move(a_ptr)),
117  endpoints(a_endpoints), probes(a_probes), tries(a_probes), next(0),
118  socket(nullptr), callback(std::forward<DataArgsT>(args)...) {
119  }
120 
121  Connector(Connector&& other) noexcept
122  : ptr(std::move(other.ptr)),
123  endpoints(std::move(other.endpoints)),
124  probes(other.probes), tries(other.tries), next(other.next),
125  socket(std::move(other.socket)),
126  callback(std::move(other.callback)) {
127  }
128 
129  Connector(const Connector&) = delete;
131  Connector& operator=(const Connector&&) = delete;
132 
133  void connect() {
134  if(next >= endpoints.size())
135  next = 0;
136  auto endpoint = endpoints[next];
137  SWC_LOG_OUT(LOG_DEBUG, SWC_LOG_OSTREAM << "Connecting Async: "
138  << ptr->m_srv_name << ' ' << endpoint << ' '
139  << (ptr->m_ssl_cfg && ptr->m_ssl_cfg->need_ssl(endpoint)
140  ? "SECURE" : "PLAIN"); );
141 
142  socket.reset(new asio::ip::tcp::socket(ptr->m_ioctx->executor()));
143  auto sock = socket;
144  sock->async_connect(endpoint, std::move(*this));
145  }
146 
147  ~Connector() noexcept { }
148 
149  void operator()(const std::error_code& _ec) noexcept {
150  if(_ec) {
151  asio::error_code ec;
152  socket->close(ec);
153  } else if(socket->is_open()) try {
154 
155  if(ptr->m_ssl_cfg &&
156  ptr->m_ssl_cfg->need_ssl(endpoints[next]) &&
157  socket->remote_endpoint().address()
158  != socket->local_endpoint().address()) {
159  struct Handshaker {
160  ConnHandlerSSL::Ptr conn;
161  Connector<HdlrT> connector;
163  Handshaker(const ConnHandlerSSL::Ptr& a_conn,
164  Connector<HdlrT>&& hdlr)
165  : conn(a_conn), connector(std::move(hdlr)) {
166  }
167  Handshaker(Handshaker&& other) noexcept
168  : conn(std::move(other.conn)),
169  connector(std::move(other.connector)) {
170  }
171  Handshaker(const Handshaker&) = delete;
172  Handshaker& operator=(Handshaker&&) = delete;
173  Handshaker& operator=(const Handshaker&) = delete;
174  ~Handshaker() noexcept { }
175  void operator()(const asio::error_code& ec) {
176  if(conn->is_open()) {
177  if(!ec) {
178  conn->new_connection();
179  connector.callback(conn);
180  connector.ptr->m_calls.fetch_sub(1);
181  return;
182  }
183  conn->do_close();
184  }
185  connector.reconnect();
186  }
187  };
188  auto conn = ptr->m_ssl_cfg->make_client(ptr->m_ctx, *socket.get());
189  conn->handshake(
190  SocketSSL::client, Handshaker(conn, std::move(*this)));
191  return;
192  }
193 
194  auto conn = ConnHandlerPlain::make(ptr->m_ctx, *socket.get());
195  if(conn->is_open()) {
196  conn->new_connection();
197  callback(conn);
198  ptr->m_calls.fetch_sub(1);
199  return;
200  }
201  } catch(...) {
203  }
204 
205  reconnect();
206  }
207 
208  void reconnect() noexcept {
209  if(ptr->m_run && ptr->m_ioctx->running && !endpoints.empty() &&
210  (!probes || tries)) try {
211  SWC_LOGF(LOG_DEBUG, "get_connection: %s, tries=%u",
212  ptr->m_srv_name.c_str(), tries);
213  std::this_thread::sleep_for(std::chrono::milliseconds(3000)); // ? cfg-setting
214 
215  if(++next == endpoints.size())
216  --tries;
217  return connect();
218 
219  } catch(...) {
221  }
222  callback(nullptr);
223  ptr->m_calls.fetch_sub(1);
224  }
225 
226  };
227 
228  const std::string m_srv_name;
234 
235 };
236 
237 
238 }}} // namespace SWC::Comm::client
239 
240 
241 #ifdef SWC_IMPL_SOURCE
243 #endif
244 
245 #endif // swcdb_core_comm_SerializedClient_h
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Core::AtomicBase< bool >
SWC::Comm::client::Serialized::Connector::callback
HdlrT callback
Definition: SerializedClient.h:109
SWC::Comm::client::Serialized::m_ioctx
IoContextPtr m_ioctx
Definition: SerializedClient.h:229
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Comm::client::Serialized::Connector::operator=
Connector & operator=(Connector &&)=delete
SWC::Comm::ConnHandlerSSL::Ptr
std::shared_ptr< ConnHandlerSSL > Ptr
Definition: ConnHandler.h:299
SWC::Comm::client::Serialized::Connector::reconnect
void reconnect() noexcept
Definition: SerializedClient.h:208
SWC::Core::Atomic< size_t >
SWC::Comm::client::Serialized::Connector::ptr
Serialized::Ptr ptr
Definition: SerializedClient.h:102
SWC::Comm::client::Serialized::~Serialized
virtual ~Serialized() noexcept
Definition: SerializedClient.cc:28
SerializedClient.cc
SWC::Comm::client::Serialized::Connector
Definition: SerializedClient.h:99
ConnHandler.h
SWC::Comm::IoContextPtr
std::shared_ptr< IoContext > IoContextPtr
Definition: IoContext.h:16
SWC::Comm::client::Serialized::Connector::next
uint32_t next
Definition: SerializedClient.h:107
SWC::Comm::client::Serialized::io
SWC_CAN_INLINE IoContextPtr io() noexcept
Definition: SerializedClient.h:43
SWC::Comm::client::Serialized::Connector::tries
uint16_t tries
Definition: SerializedClient.h:106
IoContext.h
SWC::Comm::client::Serialized::Connector::Connector
Connector(const Connector &)=delete
ConfigSSL.h
SWC::Comm::client::Serialized::print
void print(std::ostream &out, ConnHandlerPtr &conn)
Definition: SerializedClient.cc:88
SWC::Comm::client::Serialized::m_srv_name
const std::string m_srv_name
Definition: SerializedClient.h:228
SWC::Comm::client::Serialized::Connector::probes
uint16_t probes
Definition: SerializedClient.h:105
SWC::Comm::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:23
SWC::Comm::client::Serialized::Connector::SocketPtr
std::shared_ptr< asio::ip::tcp::socket > SocketPtr
Definition: SerializedClient.h:100
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::Comm::client::Serialized::get_connection
SWC_CAN_INLINE void get_connection(const EndPoints &endpoints, const std::chrono::milliseconds &timeout, uint16_t probes, DataArgsT &&... args) noexcept
Definition: SerializedClient.h:66
SWC::Comm::client::Serialized::get_connection
ConnHandlerPtr get_connection(const EndPoints &endpoints, const std::chrono::milliseconds &timeout, uint16_t probes) noexcept
Definition: SerializedClient.cc:33
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Comm::client::Serialized::Serialized
Serialized(Serialized &&)=delete
SWC::Comm::client::Serialized::Connector::operator()
void operator()(const std::error_code &_ec) noexcept
Definition: SerializedClient.h:149
SWC::Comm::client::Serialized::Connector::~Connector
~Connector() noexcept
Definition: SerializedClient.h:147
SWC::Comm::client::Serialized::m_run
Core::AtomicBool m_run
Definition: SerializedClient.h:232
SWC::Comm::client::Serialized::m_ctx
AppContext::Ptr m_ctx
Definition: SerializedClient.h:230
SWC::Comm::client::Serialized::Connector::connect
void connect()
Definition: SerializedClient.h:133
SWC::Comm::ConfigSSL
Definition: ConfigSSL.h:18
SWC::Comm::client::Serialized::Connector::socket
SocketPtr socket
Definition: SerializedClient.h:108
QueueSafe.h
SWC::Config::Settings
Definition: Settings.h:25
SWC::Comm::client::Serialized::m_ssl_cfg
ConfigSSL * m_ssl_cfg
Definition: SerializedClient.h:231
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
SWC::Core::Vector< EndPoint >
SWC::Comm::client::Serialized::Serialized
Serialized(const Config::Settings &settings, std::string &&srv_name, const IoContextPtr &ioctx, const AppContext::Ptr &ctx)
Definition: SerializedClient.cc:15
SWC::Comm::client::Serialized::get_connection
SWC_CAN_INLINE void get_connection(const EndPoints &endpoints, HdlrT &&cb, const std::chrono::milliseconds &timeout, uint16_t probes) noexcept
Definition: SerializedClient.h:56
SWC::Comm::client::Serialized::operator=
Serialized & operator=(const Serialized &)=delete
SWC::Comm::client::Serialized::operator=
Serialized & operator=(Serialized &&)=delete
SWC::Comm::client::Serialized::Connector::operator=
Connector & operator=(const Connector &&)=delete
SWC::Comm::client::Serialized::Ptr
std::shared_ptr< Serialized > Ptr
Definition: SerializedClient.h:28
SWC::Comm::client::Serialized::Connector::endpoints
EndPoints endpoints
Definition: SerializedClient.h:103
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Comm::client::Serialized
Definition: SerializedClient.h:25
SWC::Core::Atomic::fetch_sub
constexpr SWC_CAN_INLINE T fetch_sub(T v) noexcept
Definition: Atomic.h:88
SWC::Comm::client::Serialized::Connector::Connector
SWC_CAN_INLINE Connector(Serialized::Ptr &&a_ptr, const EndPoints &a_endpoints, uint16_t a_probes, DataArgsT &&... args)
Definition: SerializedClient.h:113
SWC::Core::Atomic::fetch_add
constexpr SWC_CAN_INLINE T fetch_add(T v) noexcept
Definition: Atomic.h:93
SWC::Comm::client::Serialized::stop
void stop()
Definition: SerializedClient.cc:92
SWC::Comm::client::Serialized::m_calls
Core::Atomic< size_t > m_calls
Definition: SerializedClient.h:233
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Comm::ConnHandlerPlain::make
static Ptr make(AppContext::Ptr &app_ctx, SocketPlain &socket)
Definition: ConnHandler.cc:569
SWC_LOG_CURRENT_EXCEPTION
#define SWC_LOG_CURRENT_EXCEPTION(_s_)
Definition: Exception.h:144
SWC::Comm::client::Serialized::Connector::Connector
Connector(Connector &&other) noexcept
Definition: SerializedClient.h:121
SWC::Comm::client::Serialized::Serialized
Serialized(const Serialized &)=delete