SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
CellsUpdate.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #ifndef swcdb_broker_Protocol_handlers_CellsUpdate_h
8 #define swcdb_broker_Protocol_handlers_CellsUpdate_h
9 
10 
14 
15 
16 namespace SWC { namespace Comm { namespace Protocol {
17 namespace Bkr { namespace Handler {
18 
19 
20 class Updater final
22  public:
23  typedef std::shared_ptr<Updater> Ptr;
24 
27 
29  Updater(const DB::Schema::Ptr& schema,
30  const ConnHandlerPtr& a_conn, const Event::Ptr& a_ev)
31  : SWC::client::Query::Update::Handlers::BaseSingleColumn(
32  Env::Clients::get(),
33  schema->cid, schema->col_seq,
34  schema->cell_versions, schema->cell_ttl, schema->col_type,
35  a_ev->data_ext
36  ),
37  conn(a_conn), ev(a_ev) {
38  ev->data_ext.free();
39  }
40 
41  virtual ~Updater() noexcept { }
42 
43  bool valid() noexcept override {
44  return !error() && !ev->expired() && conn->is_open() &&
46  }
47 
48  void response(int err=Error::OK) override {
49  if(!completion.is_last())
50  return;
51 
52  if(!err && requires_commit() && Env::Bkr::is_accepting()) {
53  struct Task {
54  Updater::Ptr ptr;
56  Task(Updater::Ptr&& a_ptr) noexcept : ptr(std::move(a_ptr)) { }
57  void operator()() { ptr->commit(&ptr->column); }
58  };
60  Task(std::dynamic_pointer_cast<Updater>(shared_from_this()))
61  );
62  return;
63  }
64 
65  if(err)
66  error(err);
67  else if(!empty())
69 
70  if(!ev->expired() && conn->is_open()) {
73  conn->send_response(Buffers::make(ev, Params::CellsUpdateRsp(error())));
74  }
75  profile.finished();
76 
78  SWC_LOG_OSTREAM << "Column(" << column.cid << ")";
79  profile.print(SWC_LOG_OSTREAM << " Update-");
81  );
82 
84  }
85 
86 };
87 
88 
89 void cells_update(const ConnHandlerPtr& conn, const Event::Ptr& ev) {
90  int err = Error::OK;
92 
93  try {
94  const uint8_t *ptr = ev->data.base;
95  size_t remain = ev->data.size;
96  params.decode(&ptr, &remain);
97 
98  if(!ev->data_ext.size) {
100  } else {
101  auto schema = Env::Clients::get()->get_schema(err, params.cid);
102  if(!err) {
103  Updater::Ptr hdlr(new Updater(schema, conn, ev));
104  hdlr->commit(&hdlr->column);
105  return;
106  }
107  }
108 
109  } catch(...) {
112  err = e.code();
113  }
114  conn->send_response(Buffers::make(ev, Params::CellsUpdateRsp(err)));
116 }
117 
118 
119 }}}}}
120 
121 #endif // swcdb_broker_Protocol_handlers_CellsUpdate_h
SWC::Comm::Protocol::Bkr::Handler::Updater::ev
Event::Ptr ev
Definition: CellsUpdate.h:26
BaseSingleColumn.h
SWC::Error::Exception::code
constexpr SWC_CAN_INLINE int code() const noexcept
Definition: Exception.h:51
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Error::SERVER_SHUTTING_DOWN
@ SERVER_SHUTTING_DOWN
Definition: Error.h:84
SWC::Env::Clients::get
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
Definition: Clients.h:299
SWC::DB::Schema::Ptr
std::shared_ptr< Schema > Ptr
Definition: Schema.h:185
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Comm::Protocol::Bkr::Params::CellsUpdateReq
Definition: CellsUpdate.h:17
SWC::client::Query::Update::Handlers::BaseSingleColumn::column
ColumnMutable column
Definition: BaseSingleColumn.h:24
data_ext
BufferInfo data_ext
Data Extended Buffer.
Definition: Header.h:63
SWC::Comm::Protocol::Bkr::Handler::Updater
Definition: CellsUpdate.h:21
SWC::client::Query::Update::Handlers::ColumnMutable::cid
cid_t cid
Definition: BaseColumnMutable.h:25
SWC::client::Query::Update::Handlers::BaseSingleColumn::empty
virtual bool empty() noexcept override
Definition: BaseSingleColumn.h:53
SWC::Env::Bkr::is_accepting
static SWC_CAN_INLINE bool is_accepting() noexcept
Definition: BrokerEnv.h:37
SWC::Comm::Protocol::Bkr::Handler::cells_update
void cells_update(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: CellsUpdate.h:89
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC_CURRENT_EXCEPTION
#define SWC_CURRENT_EXCEPTION(_msg_)
Definition: Exception.h:119
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::client::Query::Update::Handlers::Base::completion
Core::CompletionCounter< uint64_t > completion
Definition: Base.h:113
CellsUpdate.h
SWC::Comm::Protocol::Bkr::Handler::Updater::~Updater
virtual ~Updater() noexcept
Definition: CellsUpdate.h:41
SWC::client::Query::Update::Handlers::BaseSingleColumn
Definition: BaseSingleColumn.h:18
SWC::client::Query::Profiling::finished
SWC_CAN_INLINE void finished() noexcept
Definition: Profiling.h:127
SWC::Error::CLIENT_DATA_REMAINED
@ CLIENT_DATA_REMAINED
Definition: Error.h:125
SWC::Error::CLIENT_STOPPING
@ CLIENT_STOPPING
Definition: Error.h:127
SWC::LOG_ERROR
@ LOG_ERROR
Definition: Logger.h:32
SWC::Comm::Protocol::Bkr::Params::CellsUpdateReq::cid
cid_t cid
Definition: CellsUpdate.h:30
SWC::Comm::Protocol::Bkr::Params::CellsUpdateRsp
Definition: CellsUpdate.h:44
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
SWC::client::Query::Update::Handlers::Base::error
virtual int error() noexcept
Definition: Base.h:159
SWC::Comm::Protocol::Bkr::Handler::Updater::Ptr
std::shared_ptr< Updater > Ptr
Definition: CellsUpdate.h:23
Committer.h
SWC::Comm::Buffers::make
static SWC_CAN_INLINE Ptr make(uint32_t reserve=0)
Definition: Buffers.h:27
SWC::Comm::Protocol::Bkr::Handler::Updater::response
void response(int err=Error::OK) override
Definition: CellsUpdate.h:48
SWC::Comm::Protocol::Bkr::Handler::Updater::Updater
SWC_CAN_INLINE Updater(const DB::Schema::Ptr &schema, const ConnHandlerPtr &a_conn, const Event::Ptr &a_ev)
Definition: CellsUpdate.h:29
SWC::Comm::Protocol::Bkr::Handler::Updater::conn
ConnHandlerPtr conn
Definition: CellsUpdate.h:25
SWC::Env::Bkr::post
static SWC_CAN_INLINE void post(T_Handler &&handler)
Definition: BrokerEnv.h:71
SWC::Comm::Serializable::decode
void decode(const uint8_t **bufp, size_t *remainp)
Definition: Serializable.h:59
SWC::Comm::Event::Ptr
std::shared_ptr< Event > Ptr
Definition: Event.h:33
SWC::Comm::Protocol::Bkr::Handler::Updater::valid
bool valid() noexcept override
Definition: CellsUpdate.h:43
SWC::Core::CompletionCounter::is_last
constexpr SWC_CAN_INLINE bool is_last() noexcept
Definition: CompletionCounter.h:37
SWC::client::Query::Update::Handlers::Base::profile
Profiling profile
Definition: Base.h:111
SWC::Error::print
void print(std::ostream &out, int err)
Definition: Error.cc:191
SWC::Env::Bkr::processed
static SWC_CAN_INLINE void processed() noexcept
Definition: BrokerEnv.h:55
SWC::client::Query::Profiling::print
void print(std::ostream &out) const
Definition: Profiling.h:204
SWC::Error::INVALID_ARGUMENT
@ INVALID_ARGUMENT
Definition: Error.h:58
SWC::client::Query::Update::Handlers::BaseSingleColumn::BaseSingleColumn
SWC_CAN_INLINE BaseSingleColumn(const Clients::Ptr &a_clients, const cid_t cid, DB::Types::KeySeq seq, uint32_t versions, uint32_t ttl_secs, DB::Types::Column type, Clients::Flag a_executor=Clients::DEFAULT)
Definition: BaseSingleColumn.h:27
SWC::Error::Exception
Definition: Exception.h:21
SWC::client::Query::Update::Handlers::BaseSingleColumn::requires_commit
virtual bool requires_commit() noexcept override
Definition: BaseSingleColumn.h:49