SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
RgrMngId.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 #ifndef swcdb_ranger_Protocol_mngr_req_RgrMngId_h
7 #define swcdb_ranger_Protocol_mngr_req_RgrMngId_h
8 
11 
12 namespace SWC { namespace Comm { namespace Protocol {
13 namespace Mngr { namespace Req {
14 
15 
17 
18  public:
19  typedef std::shared_ptr<RgrMngId> Ptr;
20 
21  RgrMngId(const IoContextPtr& ioctx,
22  std::function<void()>&& cb = nullptr)
23  : client::ConnQueue::ReqBase(nullptr),
25  Env::Config::settings()->get<Config::Property::Value_int32_g>(
26  "swc.rgr.id.validation.interval")),
27  cb_shutdown(std::move(cb)),
28  endpoints(),
29  m_mutex(),
30  m_timer(asio::high_resolution_timer(ioctx->executor())),
31  m_run(true), m_failures(0) {
32  }
33 
34  RgrMngId(RgrMngId&&) = delete;
35  RgrMngId(const RgrMngId&) = delete;
36  RgrMngId& operator=(RgrMngId&&) = delete;
37  RgrMngId& operator=(const RgrMngId&) = delete;
38 
39  virtual ~RgrMngId() noexcept { }
40 
41  void create(const Params::RgrMngId& params) {
43  cbp = Buffers::make(params, 0, RGR_MNG_ID, 60000);
44  }
45 
46  void request() {
47  cancel();
48 
49  auto rgr_data = Env::Rgr::rgr_data();
52  rgr_data->print(SWC_LOG_OSTREAM << "RGR_SHUTTINGDOWN(req) "); );
53  create(
55  rgr_data->rgrid.load(),
56  Params::RgrMngId::Flag::RGR_SHUTTINGDOWN,
57  rgr_data->endpoints
58  )
59  );
60 
61  } else if(Env::Rgr::is_not_accepting()) {
62  return set(0);
63 
64  } else {
65  create(
67  0,
68  Params::RgrMngId::Flag::RGR_REQ,
69  rgr_data->endpoints
70  )
71  );
72  }
73 
74  run();
75  }
76 
77  void handle_no_conn() override {
79  set(200);
80  }
81 
82  bool run() override {
83  if(endpoints.empty()) {
85  if(endpoints.empty()) {
88  DB::Types::MngrRole::RANGERS, shared_from_this())->run();
89  return false;
90  }
91  }
92  Env::Clients::get()->get_mngr_queue(endpoints)->put(req());
93  return true;
94  }
95 
96  void handle(ConnHandlerPtr, const Event::Ptr& ev) override {
97 
98  if(ev->error || ev->response_code() != Error::OK) {
99  return set(1000);
100  }
101 
102  Params::RgrMngId rsp_params;
103  try {
104  const uint8_t *ptr = ev->data.base + 4;
105  size_t remain = ev->data.size - 4;
106  rsp_params.decode(&ptr, &remain);
107 
108  } catch(...) {
110  return set(500);
111  }
112 
113  switch(rsp_params.flag) {
114 
115  case Params::RgrMngId::Flag::MNGR_ACK: {
116  return set(0);
117  }
118 
119  case Params::RgrMngId::Flag::MNGR_REREQ: {
120  return request();
121  }
122 
123  case Params::RgrMngId::Flag::RGR_SHUTTINGDOWN: {
125  Env::Rgr::rgr_data()->print(SWC_LOG_OSTREAM << "RGR_SHUTTINGDOWN ");
126  );
127  stop();
128  if(cb_shutdown)
129  cb_shutdown();
130  else
131  SWC_LOG(LOG_WARN, "Shutdown flag without Callback!");
132  return;
133  }
134 
135  case Params::RgrMngId::Flag::MNGR_REASSIGN:
136  case Params::RgrMngId::Flag::MNGR_ASSIGNED: {
137 
138  auto rgr_data = Env::Rgr::rgr_data();
139 
140  if(rsp_params.flag == Params::RgrMngId::Flag::MNGR_ASSIGNED &&
141  rsp_params.fs
142  != Env::FsInterface::interface()->get_type_underlying()) {
143 
146  << "Ranger's " << Env::FsInterface::interface()->to_string()
147  << " not matching with Mngr's FS-type="
148  << FS::to_string(rsp_params.fs);
149  rgr_data->print(SWC_LOG_OSTREAM << ", RGR_SHUTTINGDOWN ");
150  );
151 
152  std::raise(SIGTERM);
153  return;
154  }
155 
157  if(!rgr_data->rgrid || rgr_data->rgrid == rsp_params.rgrid ||
158  (rgr_data->rgrid != rsp_params.rgrid &&
159  rsp_params.flag == Params::RgrMngId::Flag::MNGR_REASSIGN)) {
160 
161  rgr_data->rgrid.store(rsp_params.rgrid);
162  flag = Params::RgrMngId::Flag::RGR_ACK;
164  rgr_data->print(SWC_LOG_OSTREAM << "RGR_ACK "); );
165  } else {
166 
167  flag = Params::RgrMngId::Flag::RGR_DISAGREE;
169  rgr_data->print(SWC_LOG_OSTREAM << "RGR_DISAGREE "); );
170  }
171 
172  create(Params::RgrMngId(rgr_data->rgrid, flag, rgr_data->endpoints));
173  run();
174  return;
175  }
176 
177  default: {
178  clear_endpoints();
179  // remain Flag can be only MNGR_NOT_ACTIVE || no other action
180  return set(1000);
181  }
182  }
183  }
184 
185  private:
186 
188  Env::Clients::get()->remove_mngr(endpoints);
189  endpoints.clear();
190  }
191 
192  void stop() {
193  bool at = true;
194  if(m_run.compare_exchange_weak(at, false)) {
196  m_timer.cancel();
197  }
198  }
199 
200  void set(uint32_t ms) {
201  cancel();
202 
204  if(!m_run)
205  return;
206 
207  m_timer.expires_after(
208  std::chrono::milliseconds(ms ? ms : cfg_check_interval->get()));
209  struct TimerTask {
210  RgrMngId* ptr;
212  TimerTask(RgrMngId* a_ptr) noexcept : ptr(a_ptr) { }
213  void operator()(const asio::error_code& ec) {
214  if(ec != asio::error::operation_aborted)
215  ptr->request();
216  }
217  };
218  m_timer.async_wait(TimerTask(this));
219  }
220 
221  void cancel() {
223  m_timer.cancel();
224  }
225 
227  const std::function<void()> cb_shutdown;
229 
231  asio::high_resolution_timer m_timer;
234 
235 };
236 
237 }}}}}
238 
239 #endif // swcdb_ranger_Protocol_mngr_req_RgrMngId_h
SWC::Core::AtomicBase::compare_exchange_weak
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
Definition: Atomic.h:52
SWC::Comm::Protocol::Mngr::Req::RgrMngId::operator=
RgrMngId & operator=(RgrMngId &&)=delete
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Core::AtomicBase< bool >
SWC::Comm::Protocol::Mngr::Req::RgrMngId
Definition: RgrMngId.h:16
SWC::Comm::client::ConnQueueReqBase::req
SWC_CAN_INLINE Ptr req() noexcept
Definition: ClientConnQueue.h:39
SWC::Comm::Protocol::Mngr::Req::RgrMngId::handle
void handle(ConnHandlerPtr, const Event::Ptr &ev) override
Definition: RgrMngId.h:96
SWC::Env::Clients::get
static SWC_CAN_INLINE client::Clients::Ptr & get() noexcept
Definition: Clients.h:299
SWC::Comm::Protocol::Mngr::Req::RgrMngId::create
void create(const Params::RgrMngId &params)
Definition: RgrMngId.h:41
SWC::Comm::Protocol::Mngr::Req::RgrMngId::m_mutex
Core::MutexAtomic m_mutex
Definition: RgrMngId.h:230
SWC::Comm::Protocol::Mngr::Req::RgrMngId::operator=
RgrMngId & operator=(const RgrMngId &)=delete
SWC::Core::Vector::clear
SWC_CAN_INLINE void clear() noexcept(_NoExceptDestructor)
Definition: Vector.h:120
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::Core::Atomic< size_t >
SWC::Comm::Protocol::Mngr::Req::RgrMngId::RgrMngId
RgrMngId(const RgrMngId &)=delete
SWC::Env::Rgr::rgr_data
static SWC_CAN_INLINE DB::RgrData * rgr_data() noexcept
Definition: RangerEnv.h:48
SWC::Comm::Protocol::Mngr::Req::RgrMngId::cb_shutdown
const std::function< void()> cb_shutdown
Definition: RgrMngId.h:227
SWC::Comm::Protocol::Mngr::Params::RgrMngId::flag
Flag flag
Definition: RgrMngId.h:59
SWC::client::Query::ReqBase
Comm::client::ConnQueue::ReqBase ReqBase
Definition: Profiling.h:21
SWC::Env::FsInterface::interface
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
Definition: Interface.h:150
SWC::Comm::client::ConnQueueReqBase
Definition: ClientConnQueue.h:22
SWC::Comm::IoContextPtr
std::shared_ptr< IoContext > IoContextPtr
Definition: IoContext.h:16
SWC::Comm::Protocol::Mngr::Params::RgrMngId::rgrid
rgrid_t rgrid
Definition: RgrMngId.h:58
SWC::Comm::Protocol::Mngr::Req::RgrMngId::request
void request()
Definition: RgrMngId.h:46
SWC::Comm::Protocol::Mngr::Req::RgrMngId::m_timer
asio::high_resolution_timer m_timer
Definition: RgrMngId.h:231
SWC::Comm::Protocol::Mngr::Req::RgrMngId::m_run
Core::AtomicBool m_run
Definition: RgrMngId.h:232
SWC::Config::Property::Value_int32_g::get
SWC_CAN_INLINE int32_t get() const noexcept
Definition: Property.h:610
SWC::Comm::Protocol::Mngr::Params::RgrMngId::Flag
Flag
Definition: RgrMngId.h:20
SWC::Core::MutexAtomic::scope
Definition: MutexAtomic.h:77
SWC::Comm::Protocol::Mngr::Req::RgrMngId::m_failures
Core::Atomic< size_t > m_failures
Definition: RgrMngId.h:233
SWC::Comm::Protocol::Mngr::Req::RgrMngId::clear_endpoints
void clear_endpoints()
Definition: RgrMngId.h:187
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::Comm::Protocol::Mngr::Req::RgrMngId::cfg_check_interval
const Config::Property::Value_int32_g::Ptr cfg_check_interval
Definition: RgrMngId.h:226
SWC_LOG
#define SWC_LOG(priority, message)
Definition: Logger.h:191
RgrMngId.h
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Core::MutexAtomic
Definition: MutexAtomic.h:17
SWC::LOG_ERROR
@ LOG_ERROR
Definition: Logger.h:32
SWC::Comm::Protocol::Mngr::Req::MngrActive::make
static SWC_CAN_INLINE Ptr make(const SWC::client::Clients::Ptr &clients, const cid_t &a_cid, const DispatchHandler::Ptr &hdlr, uint32_t timeout_ms=60000)
Definition: MngrActive.h:25
SWC::Comm::ConnHandlerPtr
std::shared_ptr< ConnHandler > ConnHandlerPtr
Definition: AppContext.h:17
SWC::Comm::Protocol::Mngr::Params::RgrMngId::fs
FS::Type fs
Definition: RgrMngId.h:60
SWC::Comm::client::ConnQueueReqBase::print
void print(std::ostream &out)
Definition: ClientConnQueue.cc:19
SWC::Core::Vector< EndPoint >
SWC::Comm::Protocol::Mngr::Req::RgrMngId::endpoints
EndPoints endpoints
Definition: RgrMngId.h:228
SWC::Comm::Buffers::make
static SWC_CAN_INLINE Ptr make(uint32_t reserve=0)
Definition: Buffers.h:27
MngrActive.h
SWC::Env::Rgr::is_shuttingdown
static SWC_CAN_INLINE bool is_shuttingdown() noexcept
Definition: RangerEnv.h:58
SWC::Comm::client::ConnQueueReqBase::cbp
Buffers::Ptr cbp
Definition: ClientConnQueue.h:26
SWC::Comm::Protocol::Mngr::Params::RgrMngId
Definition: RgrMngId.h:17
SWC::Comm::Protocol::Mngr::Req::RgrMngId::RgrMngId
RgrMngId(RgrMngId &&)=delete
SWC::Comm::Serializable::decode
void decode(const uint8_t **bufp, size_t *remainp)
Definition: Serializable.h:59
SWC::Env::Rgr::is_not_accepting
static SWC_CAN_INLINE bool is_not_accepting() noexcept
Definition: RangerEnv.h:53
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Comm::Event::Ptr
std::shared_ptr< Event > Ptr
Definition: Event.h:33
SWC::DB::Types::MngrRole::RANGERS
const uint8_t RANGERS
Definition: MngrRole.h:16
SWC::Comm::Protocol::Mngr::Req::RgrMngId::Ptr
std::shared_ptr< RgrMngId > Ptr
Definition: RgrMngId.h:19
SWC::Comm::Protocol::Mngr::Req::RgrMngId::~RgrMngId
virtual ~RgrMngId() noexcept
Definition: RgrMngId.h:39
SWC::Comm::Protocol::Mngr::Req::RgrMngId::run
bool run() override
Definition: RgrMngId.h:82
SWC::Core::to_string
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Definition: BitFieldInt.h:263
SWC::Comm::Protocol::Mngr::Req::RgrMngId::set
void set(uint32_t ms)
Definition: RgrMngId.h:200
SWC::Comm::Protocol::Mngr::Req::RgrMngId::handle_no_conn
void handle_no_conn() override
Definition: RgrMngId.h:77
SWC_LOG_CURRENT_EXCEPTION
#define SWC_LOG_CURRENT_EXCEPTION(_s_)
Definition: Exception.h:144
SWC::Comm::Protocol::Mngr::RGR_MNG_ID
@ RGR_MNG_ID
Definition: Commands.h:69
SWC::Config::Property::Value_int32_g
Definition: Property.h:586
SWC::FS::to_string
const char *SWC_CONST_FUNC to_string(Type typ) noexcept
Definition: FileSystem.cc:47
SWC::Comm::Protocol::Mngr::Req::RgrMngId::RgrMngId
RgrMngId(const IoContextPtr &ioctx, std::function< void()> &&cb=nullptr)
Definition: RgrMngId.h:21
SWC::Comm::Protocol::Mngr::Req::RgrMngId::cancel
void cancel()
Definition: RgrMngId.h:221
SWC::Comm::Protocol::Mngr::Req::RgrMngId::stop
void stop()
Definition: RgrMngId.h:192