SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
ClientConnQueue.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
7 
8 namespace SWC { namespace Comm { namespace client {
9 
10 
11 
13  if(queue)
14  queue->delay(req());
15  else
16  run();
17 }
18 
19 void ConnQueueReqBase::print(std::ostream& out) {
20  cbp->header.print(out << "ReqBase(insistent=" << insistent() << ' ');
21  out << " derived=" << Core::type_name(*this) << ')';
22 }
23 
24 
25 
27  if(m_timer) {
29  m_timer->cancel();
30  }
31  for(;;) {
32  {
34  if(m_delayed.empty())
35  break;
36  (*m_delayed.cbegin())->cancel();
37  m_delayed.erase(m_delayed.cbegin());
38  }
39  std::this_thread::yield();
40  }
41 
42  while(m_q_state.running())
43  std::this_thread::yield();
44 
45  ConnHandlerPtr conn;
46  for(ReqBase::Ptr req;;) {
47  {
49  if(empty()) {
50  m_conn.swap(conn);
51  break;
52  }
53  req = front();
54  pop();
55  }
56  req->handle_no_conn();
57  }
58  if(conn)
59  conn->do_close();
60  m_q_state.stop();
61 }
62 
65  return m_conn && m_conn->is_open() ? m_conn->endpoint_remote : EndPoint();
66 }
67 
70  return m_conn && m_conn->is_open() ? m_conn->endpoint_local : EndPoint();
71 }
72 
74  if(req->queue.get() != this)
75  req->queue = shared_from_this();
76  bool make_conn;
77  {
79  push(req);
80  if((make_conn = (!m_conn || !m_conn->is_open()))) {
81  if(m_connecting)
82  return;
83  m_connecting = true;
84  }
85  }
86  if(make_conn && connect())
87  return;
88  exec_queue();
89 }
90 
91 void ConnQueue::set(const ConnHandlerPtr& conn) {
92  {
94  m_conn = conn;
95  m_connecting = false;
96  }
97  exec_queue();
98 }
99 
101  if(!cfg_again_delay_ms)
102  return put(req);
103 
104  auto tm = new asio::high_resolution_timer(m_ioctx->executor());
105  tm->expires_after(std::chrono::milliseconds(cfg_again_delay_ms->get()));
106 
108  m_delayed.insert(tm);
109 
110  struct TimerTask {
111  ConnQueue* queue;
112  ReqBase::Ptr req;
113  asio::high_resolution_timer* tm;
115  TimerTask(ConnQueue* a_queue, ReqBase::Ptr&& a_req,
116  asio::high_resolution_timer* a_tm) noexcept
117  : queue(a_queue), req(std::move(a_req)), tm(a_tm) {
118  }
120  TimerTask(TimerTask&& other) noexcept
121  : queue(other.queue), req(std::move(other.req)), tm(other.tm) {
122  }
123  TimerTask(const TimerTask&) = delete;
124  TimerTask& operator=(TimerTask&&) = delete;
125  TimerTask& operator=(const TimerTask&) = delete;
126  ~TimerTask() noexcept { }
127  void operator()(const asio::error_code& ec) {
128  if(ec == asio::error::operation_aborted) {
129  req->handle_no_conn();
130  } else {
131  {
132  Core::MutexSptd::scope lock(queue->m_mutex);
133  queue->m_delayed.erase(tm);
134  }
135  queue->put(req);
136  }
137  delete tm;
138  }
139  };
140  tm->async_wait(TimerTask(this, std::move(req), tm));
141 }
142 
143 void ConnQueue::print(std::ostream& out) {
144  out << "ConnQueue(size=";
146  out << size() << " conn=";
147  if(m_conn)
148  m_conn->print(out);
149  else
150  out << "no";
151  out << ')';
152 }
153 
155  if(m_q_state.running())
156  return;
157  {
159  if(empty()) {
160  m_q_state.stop();
161  return;
162  }
163  }
164  struct Task {
165  ConnQueuePtr queue;
167  Task(ConnQueuePtr&& a_queue) noexcept : queue(std::move(a_queue)) { }
169  Task(Task&& other) noexcept : queue(std::move(other.queue)) { }
170  Task(const Task&) = delete;
171  Task& operator=(Task&&) = delete;
172  Task& operator=(const Task&) = delete;
173  ~Task() noexcept { }
174  void operator()() { queue->run_queue(); }
175  };
176  m_ioctx->post(Task(shared_from_this()));
177 }
178 
180  if(m_timer) {
182  m_timer->cancel();
183  }
184 
185  ConnHandlerPtr conn;
186  for(ReqBase::Ptr req;;) {
187  {
189  conn = (m_conn && m_conn->is_open()) ? m_conn : nullptr;
190  req = front();
191  }
192  if(!conn || !conn->send_request(req->cbp, req)) {
193  if(req->insistent()) {
194  m_q_state.stop();
195  req->handle_no_conn();
196  break;
197  }
198  req->handle_no_conn();
199  }
200  {
202  pop();
203  if(empty()) {
204  m_q_state.stop();
205  break;
206  }
207  }
208  }
209 
210  if(m_timer) // nullptr -eq persistent
211  schedule_close(false);
212  //~ on timer after ms+ OR socket_opt ka(0)+interval(ms+)
213 }
214 
215 void ConnQueue::schedule_close(bool closing) {
216  if(closing) {
217  ConnHandlerPtr conn;
218  {
220  if((closing = empty() && m_delayed.empty() &&
221  (!m_conn || !m_conn->due()))) {
222  m_conn.swap(conn);
223  m_timer->cancel();
224  }
225  }
226  if(closing) {
227  if(conn)
228  conn->do_close();
229  return close_issued();
230  }
231  }
232 
234  if(!m_conn)
235  return;
236  m_timer->cancel();
237  m_timer->expires_after(std::chrono::milliseconds(cfg_keepalive_ms->get()));
238  struct TimerTask {
239  ConnQueuePtr queue;
241  TimerTask(ConnQueuePtr&& a_queue) noexcept : queue(std::move(a_queue)) { }
243  TimerTask(TimerTask&& other) noexcept : queue(std::move(other.queue)) { }
244  TimerTask(const TimerTask&) = delete;
245  TimerTask& operator=(TimerTask&&) = delete;
246  TimerTask& operator=(const TimerTask&) = delete;
247  ~TimerTask() noexcept { }
248  void operator()(const asio::error_code& ec) {
249  if(ec != asio::error::operation_aborted) {
250  queue->schedule_close(true);
251  }
252  }
253  };
254  m_timer->async_wait(TimerTask(shared_from_this()));
255 }
256 
257 }}}
SWC::Comm::client::ConnQueue::m_delayed
std::unordered_set< asio::high_resolution_timer * > m_delayed
Definition: ClientConnQueue.h:127
SWC::Core::StateRunning::stop
constexpr SWC_CAN_INLINE void stop() noexcept
Definition: StateRunning.h:32
SWC::Comm::client::ConnQueue::get_endpoint_remote
EndPoint get_endpoint_remote() noexcept
Definition: ClientConnQueue.cc:63
SWC::Comm::client::ConnQueueReqBase::queue
ConnQueuePtr queue
Definition: ClientConnQueue.h:27
SWC::Comm::client::ConnQueueReqBase::req
SWC_CAN_INLINE Ptr req() noexcept
Definition: ClientConnQueue.h:39
SWC::Comm::EndPoint
asio::ip::tcp::endpoint EndPoint
Definition: Resolver.h:19
SWC::Comm::client::ConnQueue::stop
void stop()
Definition: ClientConnQueue.cc:26
SWC::Comm::client::ConnQueue::set
void set(const ConnHandlerPtr &conn)
Definition: ClientConnQueue.cc:91
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Comm::client::ConnQueue
Definition: ClientConnQueue.h:59
SWC::Comm::client::ConnQueueReqBase::request_again
void request_again()
Definition: ClientConnQueue.cc:12
SWC::Config::Property::Value_int32_g::get
SWC_CAN_INLINE int32_t get() const noexcept
Definition: Property.h:610
SWC::Comm::client::ConnQueue::run_queue
void run_queue()
Definition: ClientConnQueue.cc:179
SWC::Comm::client::ConnQueue::print
void print(std::ostream &out)
Definition: ClientConnQueue.cc:143
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Comm::client::ConnQueue::cfg_again_delay_ms
const Config::Property::Value_int32_g::Ptr cfg_again_delay_ms
Definition: ClientConnQueue.h:111
size
uint32_t size
Buffer size.
Definition: HeaderBufferInfo.h:47
SWC::Comm::client::ConnQueue::put
void put(const ReqBase::Ptr &req)
Definition: ClientConnQueue.cc:73
SWC::Comm::client::ConnQueue::exec_queue
void exec_queue()
Definition: ClientConnQueue.cc:154
SWC::Comm::client::ConnQueue::m_timer
asio::high_resolution_timer * m_timer
Definition: ClientConnQueue.h:126
SWC::Comm::client::ConnQueue::m_q_state
Core::StateRunning m_q_state
Definition: ClientConnQueue.h:125
ClientConnQueue.h
SWC::Core::StateRunning::running
constexpr SWC_CAN_INLINE bool running() noexcept
Definition: StateRunning.h:37
SWC::Comm::client::ConnQueue::schedule_close
void schedule_close(bool closing)
Definition: ClientConnQueue.cc:215
SWC::Comm::client::ConnQueue::delay
void delay(ReqBase::Ptr &&req)
Definition: ClientConnQueue.cc:100
SWC::Comm::client::ConnQueue::get_endpoint_local
EndPoint get_endpoint_local() noexcept
Definition: ClientConnQueue.cc:68
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
SWC::Comm::client::ConnQueueReqBase::print
void print(std::ostream &out)
Definition: ClientConnQueue.cc:19
SWC::Comm::client::ConnQueue::close_issued
virtual void close_issued()
Definition: ClientConnQueue.h:92
SWC::Comm::client::ConnQueueReqBase::cbp
Buffers::Ptr cbp
Definition: ClientConnQueue.h:26
SWC::Core::type_name
SWC_CAN_INLINE const char * type_name(const T &obj) noexcept
Definition: Compat.h:204
SWC::Comm::client::ConnQueue::m_mutex
Core::MutexSptd m_mutex
Definition: ClientConnQueue.h:121
SWC::Comm::client::ConnQueue::m_connecting
bool m_connecting
Definition: ClientConnQueue.h:124
SWC::Comm::client::ConnQueueReqBase::Ptr
std::shared_ptr< ConnQueueReqBase > Ptr
Definition: ClientConnQueue.h:25
SWC::Comm::client::ConnQueueReqBase::insistent
virtual bool insistent() noexcept
Definition: ClientConnQueue.h:43
SWC::Comm::client::ConnQueue::m_conn
ConnHandlerPtr m_conn
Definition: ClientConnQueue.h:123
SWC::Comm::client::ConnQueue::operator=
ConnQueue & operator=(ConnQueue &&)=delete
SWC::Comm::client::ConnQueue::cfg_keepalive_ms
const Config::Property::Value_int32_g::Ptr cfg_keepalive_ms
Definition: ClientConnQueue.h:110
SWC::Comm::client::ConnQueuePtr
std::shared_ptr< ConnQueue > ConnQueuePtr
Definition: ClientConnQueue.h:19
SWC::Comm::client::ConnQueue::m_ioctx
IoContextPtr m_ioctx
Definition: ClientConnQueue.h:122
SWC::Comm::DispatchHandler::run
virtual bool run()
Definition: DispatchHandler.h:26
SWC::Comm::client::ConnQueue::connect
virtual bool connect()
Definition: ClientConnQueue.h:88