SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Common.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
9 
10 
11 
12 namespace SWC { namespace client { namespace Query { namespace Select {
13 
14 namespace Handlers {
15 
16 
17 Common::Common(const Clients::Ptr& a_clients,
18  Cb_t&& cb, bool rsp_partials,
19  const Comm::IoContextPtr& io,
20  Clients::Flag a_executor) noexcept
21  : BaseUnorderedMap(a_clients, a_executor),
22  valid_state(true),
23  m_mutex(),
24  m_cb(std::move(cb)), m_dispatcher_io(io),
25  m_notify(m_cb && rsp_partials),
26  m_cv(),
27  m_sending_result(false) {
28 }
29 
30 bool Common::add_cells(const cid_t cid, StaticBuffer& buffer,
31  bool reached_limit, DB::Specs::Interval& interval) {
32  bool more = BaseUnorderedMap::add_cells(
33  cid, buffer, reached_limit, interval);
35  return more;
36 }
37 
38 void Common::get_cells(const cid_t cid, DB::Cells::Result& cells) {
39  BaseUnorderedMap::get_cells(cid, cells);
40  if(m_notify) {
42  m_cv.notify_all();
43  }
44 }
45 
46 void Common::free(const cid_t cid) {
48  if(m_notify) {
50  m_cv.notify_all();
51  }
52 }
53 
54 void Common::response(int err) {
55  if(err) {
56  int at = Error::OK;
58  }
59 
60  profile.finished();
61 
62  if(m_cb) {
64  send_result();
65  } else {
67  m_cv.notify_all();
68  }
69 }
70 
71 bool Common::valid() noexcept {
72  return valid_state;
73 }
74 
76  if(!m_notify)
77  return;
78 
80  Core::UniqueLock lock_wait(m_mutex);
82  if(wait_on_partials()) {
83  m_cv.wait(lock_wait, [this] () {
84  return !m_sending_result || !wait_on_partials();
85  });
86  }
87  return;
88  }
89  }
90  send_result();
91 }
92 
94  return valid() && get_size_bytes() > buff_sz * buff_ahead;
95 }
96 
97 void Common::wait() {
98  _wait: {
99  {
100  Core::UniqueLock lock_wait(m_mutex);
102  m_cv.wait(lock_wait, [this] () {
103  return !m_sending_result && !completion.count();
104  });
105  }
106  }
107  if(m_notify && valid() && !empty()) {
109  send_result();
110  goto _wait;
111  }
112  }
113 }
114 
116  struct Task {
117  Ptr hdlr;
119  Task(Ptr&& a_hdlr) noexcept : hdlr(std::move(a_hdlr)) { }
121  Task(Task&& other) noexcept : hdlr(std::move(other.hdlr)) { }
122  Task(const Task&) = delete;
123  Task& operator=(Task&&) = delete;
124  Task& operator=(const Task&) = delete;
125  ~Task() noexcept { }
126  void operator()() {
127  hdlr->m_cb(hdlr);
128 
129  Core::ScopedLock lock(hdlr->m_mutex);
130  hdlr->m_sending_result.stop();
131  hdlr->m_cv.notify_all();
132  }
133  };
134  (m_dispatcher_io ? m_dispatcher_io : clients->get_io())
135  ->post(Task(std::dynamic_pointer_cast<Common>(shared_from_this())));
136 }
137 
138 
139 
140 
141 
142 
143 }}}}}
SWC::client::Query::Select::Handlers::Common::wait_on_partials
bool wait_on_partials()
Definition: Common.cc:93
SWC::Core::AtomicBase::compare_exchange_weak
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
Definition: Atomic.h:52
SWC::client::Query::Select::Handlers::BaseUnorderedMap::get_size_bytes
virtual size_t get_size_bytes() noexcept override
Definition: BaseUnorderedMap.cc:76
SWC::client::Query::Select::Handlers::Common::Cb_t
std::function< void(Ptr)> Cb_t
Definition: Common.h:23
Clients.h
SWC::Core::UniqueLock
Definition: MutexLock.h:68
SWC::Core::ScopedLock
Definition: MutexLock.h:41
SWC::client::Query::Select::Handlers::Common::response_partials
void response_partials()
Definition: Common.cc:75
SWC::client::Clients::Ptr
ClientsPtr Ptr
Definition: Clients.h:58
SWC::Comm::IoContextPtr
std::shared_ptr< IoContext > IoContextPtr
Definition: IoContext.h:16
SWC::client::Query::Select::Handlers::BaseUnorderedMap::empty
bool empty() noexcept
Definition: BaseUnorderedMap.cc:97
SWC::client::Query::Select::Handlers::Base::operator=
Base & operator=(const Base &)=delete
SWC::client::Query::Select::Handlers::Base::buff_ahead
Core::Atomic< uint8_t > buff_ahead
Definition: Base.h:37
SWC::client::Query::Select::Handlers::Common::Common
Common(const Clients::Ptr &clients, Cb_t &&cb, bool rsp_partials=false, const Comm::IoContextPtr &io=nullptr, Clients::Flag executor=Clients::DEFAULT) noexcept
Definition: Common.cc:17
SWC::client::Query::Select::Handlers::Common::m_cb
const Cb_t m_cb
Definition: Common.h:68
SWC::client::Query::Select::Handlers::BaseUnorderedMap::add_cells
virtual bool add_cells(const cid_t cid, StaticBuffer &buffer, bool reached_limit, DB::Specs::Interval &interval) override
Definition: BaseUnorderedMap.h:41
SWC::client::Query::Select::Handlers::Base::buff_sz
Core::Atomic< uint32_t > buff_sz
Definition: Base.h:36
SWC::client::Query::Select::Handlers::BaseUnorderedMap::free
virtual void free(const cid_t cid)
Definition: BaseUnorderedMap.h:53
SWC::client::Query::Select::Handlers::Common::m_mutex
std::mutex m_mutex
Definition: Common.h:58
SWC::client::Query::Select::Handlers::Common::m_sending_result
Core::StateRunning m_sending_result
Definition: Common.h:72
SWC::DB::Cells::Result
Definition: Result.h:16
SWC::client::Query::Select::Handlers::Common::m_dispatcher_io
Comm::IoContextPtr m_dispatcher_io
Definition: Common.h:69
SWC::client::Query::Select::Handlers::Base::clients
Clients::Ptr clients
Definition: Base.h:30
SWC::client::Query::Select::Handlers::Common::wait
void wait()
Definition: Common.cc:97
SWC::client::Query::Select::Handlers::Common::m_cv
std::condition_variable m_cv
Definition: Common.h:71
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::client::Query::Select::Handlers::Common::valid
virtual bool valid() noexcept override
Definition: Common.cc:71
SWC::client::Query::Select::Handlers::BaseUnorderedMap
Definition: BaseUnorderedMap.h:21
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::client::Query::Select::Handlers::Common::response
virtual void response(int err) override
Definition: Common.cc:54
SWC::client::Clients::Flag
Flag
Definition: Clients.h:60
SWC::client::Query::Select::Handlers::Base::completion
Core::CompletionCounter< uint64_t > completion
Definition: Base.h:33
SWC::client::Query::Select::Handlers::Common::m_notify
const bool m_notify
Definition: Common.h:70
SWC::client::Query::Profiling::finished
SWC_CAN_INLINE void finished() noexcept
Definition: Profiling.h:127
SWC::Core::StateRunning::running
constexpr SWC_CAN_INLINE bool running() noexcept
Definition: StateRunning.h:37
SWC::client::Query::Select::Handlers::Common::add_cells
virtual bool add_cells(const cid_t cid, StaticBuffer &buffer, bool reached_limit, DB::Specs::Interval &interval) override
Definition: Common.cc:30
SWC::cid_t
uint64_t cid_t
Definition: Identifiers.h:16
SWC::client::Query::Select::Handlers::Base::Ptr
std::shared_ptr< Base > Ptr
Definition: Base.h:28
SWC::Core::CompletionCounter::count
constexpr SWC_CAN_INLINE CountT count() const noexcept
Definition: CompletionCounter.h:42
SWC::DB::Specs::Interval
Definition: SpecsInterval.h:25
SWC::client::Query::Select::Handlers::Common::free
virtual void free(const cid_t cid) override
Definition: Common.cc:46
SWC::client::Query::Select::Handlers::Base::state_error
Core::Atomic< int > state_error
Definition: Base.h:32
SWC::client::Query::Select::Handlers::Common::valid_state
Core::AtomicBool valid_state
Definition: Common.h:34
SWC::client::Query::Select::Handlers::Common::get_cells
virtual void get_cells(const cid_t cid, DB::Cells::Result &cells) override
Definition: Common.cc:38
SWC::client::Query::Select::Handlers::BaseUnorderedMap::get_cells
virtual void get_cells(const cid_t cid, DB::Cells::Result &cells)
Definition: BaseUnorderedMap.h:49
Common.h
SWC::client::Query::Select::Handlers::Base::profile
Profiling profile
Definition: Base.h:31
SWC::client::Query::Select::Handlers::Common::send_result
void send_result()
Definition: Common.cc:115