SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
BrokerCommitter.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
9 
10 namespace SWC { namespace client { namespace Query { namespace Update {
11 
12 
13 
14 #define SWC_BROKER_COMMIT_RSP_DEBUG(msg) \
15  SWC_LOG_OUT(LOG_DEBUG, \
16  print(SWC_LOG_OSTREAM << msg << ' '); \
17  rsp.print(SWC_LOG_OSTREAM << ' '); \
18  SWC_LOG_OSTREAM << " buff-sz=" << cells_buff.fill(); \
19  );
20 
21 
23 
24 void BrokerCommitter::print(std::ostream& out) {
25  out << "BrokerCommitter(cid=" << colp->get_cid()
26  << " completion=" << hdlr->completion.count()
27  << ')';
28 }
29 
31  hdlr->completion.increment();
32  if(!hdlr->valid())
33  return hdlr->response();
34 
35  struct ReqData {
36  Ptr committer;
38  DynamicBuffer cells_buff;
40  ReqData(const Ptr& a_committer, DynamicBuffer& a_cells_buff) noexcept
41  : committer(a_committer),
42  profile(committer->hdlr->profile.bkr()),
43  cells_buff(std::move(a_cells_buff)) {
44  }
46  ~ReqData() noexcept { }
48  client::Clients::Ptr& get_clients() noexcept {
49  return committer->hdlr->clients;
50  }
52  bool valid() {
53  return committer->hdlr->valid();
54  }
56  void callback(
57  const ReqBase::Ptr& req,
59  profile.add(rsp.err);
60  committer->committed(req, rsp, cells_buff);
61  }
62  };
63 
65  bool more = true;
66  DynamicBuffer cells_buff;
67 
68  while(more && hdlr->valid() &&
69  colp->get_buff(hdlr->buff_sz, more, cells_buff)) {
74  cells_buff,
75  hdlr->timeout + cells_buff.fill()/hdlr->timeout_ratio,
76  shared_from_this(),
77  std::move(cells_buff)
78  );
79  }
80 
81  if(workload.is_last())
82  hdlr->response();
83 }
84 
86  ReqBase::Ptr req,
88  const DynamicBuffer& cells_buff) {
89  switch(rsp.err) {
90  case Error::OK: {
91  SWC_BROKER_COMMIT_RSP_DEBUG("bkr_commit");
92  if(workload.is_last())
93  hdlr->response();
94  return;
95  }
96 
98  SWC_BROKER_COMMIT_RSP_DEBUG("bkr_commit");
99  hdlr->clients->schemas.remove(colp->get_cid());
100  hdlr->add_resend_count(colp->add(cells_buff));
101  hdlr->error(colp->get_cid(), rsp.err);
102  if(workload.is_last())
103  hdlr->response(rsp.err);
104  return;
105  }
106 
107  case Error::CLIENT_STOPPING: {
108  SWC_BROKER_COMMIT_RSP_DEBUG("bkr_commit STOPPED");
109  hdlr->add_resend_count(colp->add(cells_buff));
110  hdlr->error(rsp.err);
111  if(workload.is_last())
112  hdlr->response(rsp.err);
113  return;
114  }
115 
117  case Error::REQUEST_TIMEOUT: {
118  if(hdlr->valid()) {
119  SWC_BROKER_COMMIT_RSP_DEBUG("bkr_commit RETRYING");
120  return req->request_again();
121  }
122  [[fallthrough]];
123  }
124 
125  default: {
126  hdlr->add_resend_count(colp->add(cells_buff));
127  if(workload.is_last()) {
128  if(hdlr->valid()) {
129  SWC_BROKER_COMMIT_RSP_DEBUG("bkr_commit RETRYING");
130  commit();
131  }
132  hdlr->response();
133  }
134  return;
135  }
136  }
137 }
138 
139 
140 
141 }}}}
SWC::Error::REQUEST_TIMEOUT
@ REQUEST_TIMEOUT
Definition: Error.h:73
SWC::client::Query::Update::BrokerCommitter::Ptr
std::shared_ptr< BrokerCommitter > Ptr
Definition: BrokerCommitter.h:31
SWC::Comm::Protocol::Bkr::Params::CellsUpdateReq
Definition: CellsUpdate.h:17
SWC_BROKER_COMMIT_RSP_DEBUG
#define SWC_BROKER_COMMIT_RSP_DEBUG(msg)
Definition: BrokerCommitter.cc:14
SWC::client::Clients::Ptr
ClientsPtr Ptr
Definition: Clients.h:58
SWC::client::Query::Update::BrokerCommitter::committed
void committed(ReqBase::Ptr req, const Comm::Protocol::Bkr::Params::CellsUpdateRsp &rsp, const DynamicBuffer &cells_buff)
Definition: BrokerCommitter.cc:85
SWC::Comm::Protocol::Bkr::Params::CellsUpdateRsp::err
int32_t err
Definition: CellsUpdate.h:56
SWC::client::Query::Update::BrokerCommitter::colp
Query::Update::Handlers::Base::Column * colp
Definition: BrokerCommitter.h:33
SWC::client::Query::Update::BrokerCommitter::print
void print(std::ostream &out)
Definition: BrokerCommitter.cc:24
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Comm::Protocol::Bkr::Req::CellsUpdate::request
static SWC_CAN_INLINE void request(const Params::CellsUpdateReq &params, StaticBuffer &snd_buf, const uint32_t timeout, DataArgsT &&... args)
Definition: CellsUpdate.h:52
CellsUpdate.h
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Core::BufferDyn< StaticBuffer >
SWC::client::Query::Update::Handlers::Base::Column::get_cid
virtual cid_t get_cid() const noexcept=0
BrokerCommitter.h
SWC::client::Query::Update::BrokerCommitter::hdlr
Query::Update::Handlers::Base::Ptr hdlr
Definition: BrokerCommitter.h:32
SWC::client::Query::Update::Handlers::Base::Column::get_buff
virtual bool get_buff(const DB::Cell::Key &key_start, const DB::Cell::Key &key_end, size_t buff_sz, bool &more, DynamicBuffer &cells_buff)=0
SWC::client::Query::Update::BrokerCommitter::~BrokerCommitter
~BrokerCommitter() noexcept
Definition: BrokerCommitter.cc:22
SWC::Error::CLIENT_STOPPING
@ CLIENT_STOPPING
Definition: Error.h:127
SWC::Comm::Protocol::Bkr::Params::CellsUpdateRsp
Definition: CellsUpdate.h:44
SWC::Error::COMM_NOT_CONNECTED
@ COMM_NOT_CONNECTED
Definition: Error.h:64
SWC::Error::COLUMN_NOT_EXISTS
@ COLUMN_NOT_EXISTS
Definition: Error.h:100
SWC::Core::BufferDyn::fill
constexpr SWC_CAN_INLINE size_t fill() const noexcept
Definition: Buffer.h:192
SWC::Core::CompletionCounter::increment
constexpr SWC_CAN_INLINE void increment(CountT v=1) noexcept
Definition: CompletionCounter.h:32
SWC::client::Query::Update::BrokerCommitter::workload
Core::CompletionCounter workload
Definition: BrokerCommitter.h:34
SWC::client::Query::Profiling::Component::Start
Definition: Profiling.h:56
SWC::Core::CompletionCounter::is_last
constexpr SWC_CAN_INLINE bool is_last() noexcept
Definition: CompletionCounter.h:37
SWC::client::Query::Update::BrokerCommitter::commit
void commit()
Definition: BrokerCommitter.cc:30
SWC::Comm::client::ConnQueueReqBase::Ptr
std::shared_ptr< ConnQueueReqBase > Ptr
Definition: ClientConnQueue.h:25
SWC::client::Query::Update::Handlers::Base::Column::add
virtual size_t add(const DynamicBuffer &cells, const DB::Cell::Key &upto_key, const DB::Cell::Key &from_key, uint32_t skip, bool malformed)=0
SWC::client::Query::Profiling::Component::Start::add
SWC_CAN_INLINE void add(bool err) const noexcept
Definition: Profiling.h:66