SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Brokers.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 
9 namespace SWC { namespace client {
10 
11 
13  Comm::IoContextPtr ioctx,
14  const ContextBroker::Ptr& bkr_ctx)
15  : queues(new Comm::client::ConnQueues(
16  Comm::client::Serialized::Ptr(new Comm::client::Serialized(
17  settings,
18  "BROKER",
19  ioctx,
20  bkr_ctx
21  ? bkr_ctx
22  : ContextBroker::Ptr(new ContextBroker(settings))
23  )),
24  settings.get<Config::Property::Value_int32_g>(
25  "swc.client.Bkr.connection.timeout"),
26  settings.get<Config::Property::Value_uint16_g>(
27  "swc.client.Bkr.connection.probes"),
28  settings.get<Config::Property::Value_int32_g>(
29  "swc.client.Bkr.connection.keepalive"),
30  settings.get<Config::Property::Value_int32_g>(
31  "swc.client.request.again.delay")
32  )),
33  cfg_hosts(
34  settings.get<Config::Property::Value_strings_g>("swc.bkr.host")),
35  cfg_port(settings.get_i16("swc.bkr.port")),
36  m_mutex(), m_brokers() {
37  on_cfg_update();
38 }
39 
40 void Brokers::on_cfg_update() noexcept {
41  try {
42  Config::Strings hosts(cfg_hosts->get());
43  if(hosts.empty()) {
44  set({});
45  return;
46  }
47  BrokersEndPoints _brokers;
48  _brokers.reserve(hosts.size());
49 
50  uint16_t port;
51  size_t at;
52  for(auto& host_str : hosts) {
53  if(host_str.empty())
54  continue;
55  if((at = host_str.find_first_of('|')) == std::string::npos) {
56  port = cfg_port;
57  } else {
58  Config::Property::from_string(host_str.c_str() + at + 1, &port);
59  host_str = host_str.substr(0, at);
60  }
61  Config::Strings ips;
62  std::string host;
63  do {
64  auto addr = host_str;
65  at = host_str.find_first_of(',');
66  if(at != std::string::npos) {
67  addr = host_str.substr(0, at);
68  host_str = host_str.substr(at+1, host_str.length());
69  }
72  ips.push_back(addr);
73  else
74  host = addr;
75  } while(at != std::string::npos);
76 
77  auto tmp = Comm::Resolver::get_endpoints(port, ips, host, {});
78  if(!tmp.empty())
79  _brokers.emplace_back(std::move(tmp));
80  }
81 
83  SWC_LOG_OSTREAM << "Broker Hosts (size=" << _brokers.size() << "): ";
84  for(size_t n=0; n<_brokers.size(); ++n) {
85  SWC_LOG_OSTREAM << "\n " << n << ": ";
86  for(auto& e : _brokers[n])
87  SWC_LOG_OSTREAM << e << ',';
88  }
89  );
90  set(_brokers);
91 
92  } catch(...) {
94  }
95 
96 }
97 
98 size_t Brokers::size() noexcept {
100  return m_brokers.size();
101 }
102 
105  return m_brokers.empty()
106  ? false
107  : !(endpoints = m_brokers[idx.pos >= m_brokers.size()
108  ? (idx.pos = 0)
109  : idx.pos]).empty();
110 }
111 
112 bool Brokers::has_endpoints() noexcept {
114  return !m_brokers.empty() && !m_brokers.front().empty();
115 }
116 
117 void Brokers::set(BrokersEndPoints&& _brokers) {
119  m_brokers = std::move(_brokers);
120 }
121 
122 void Brokers::set(const BrokersEndPoints& _brokers) {
124  m_brokers = _brokers;
125 }
126 
128  Brokers::BrokerIdx& idx) {
129  for(Comm::EndPoints endpoints;;) {
130  {
132  if(!m_brokers.empty()) {
133  endpoints = m_brokers[idx.pos >= m_brokers.size()
134  ? (idx.pos = 0)
135  : idx.pos];
136  }
137  }
138  if(!endpoints.empty()) {
139  queues->get(endpoints)->put(req);
140  return true;
141  }
142  if(!req->valid()) {
143  req->handle_no_conn();
144  return false;
145  }
147  "Broker hosts cfg 'swc.bkr.host' is empty, waiting!");
148  std::this_thread::sleep_for(std::chrono::seconds(3));
149  }
150 }
151 
153  if(++pos < brks.size())
154  return false;
155  pos = 0;
156  return true;
157 }
158 
159 
160 
161 }} //namespace SWC::client
162 
SWC::Core::Vector::front
constexpr SWC_CAN_INLINE reference front() noexcept
Definition: Vector.h:243
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::client::Brokers::BrokerIdx
Definition: Brokers.h:23
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::client::Brokers::queues
const Comm::client::ConnQueuesPtr queues
Definition: Brokers.h:60
SWC::client::Brokers
Definition: Brokers.h:18
SWC::Comm::IoContextPtr
std::shared_ptr< IoContext > IoContextPtr
Definition: IoContext.h:16
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::client::Brokers::m_mutex
Core::MutexSptd m_mutex
Definition: Brokers.h:65
SWC::client::ContextBroker::Ptr
std::shared_ptr< ContextBroker > Ptr
Definition: ContextBroker.h:18
SWC::client::Brokers::cfg_hosts
const Config::Property::Value_strings_g::Ptr cfg_hosts
Definition: Brokers.h:61
SWC::client::Brokers::BrokerIdx::pos
size_t pos
Definition: Brokers.h:24
Brokers.h
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC_LOG
#define SWC_LOG(priority, message)
Definition: Logger.h:191
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::client::Brokers::has_endpoints
bool has_endpoints() noexcept
Definition: Brokers.cc:112
SWC::client::Brokers::m_brokers
BrokersEndPoints m_brokers
Definition: Brokers.h:66
SWC::Config::Property::from_string
void from_string(const char *s, double *value)
Definition: Property.cc:109
SWC::Config::Settings
Definition: Settings.h:25
SWC::LOG_ERROR
@ LOG_ERROR
Definition: Logger.h:32
SWC::client::Brokers::put
bool put(const Comm::client::ConnQueue::ReqBase::Ptr &req, BrokerIdx &idx)
Definition: Brokers.cc:127
SWC::Config::Property::Value_strings_g::get
Strings get() const
Definition: Property.cc:1051
SWC::client::Brokers::BrokerIdx::turn_around
bool turn_around(Brokers &brks) noexcept
Definition: Brokers.cc:152
SWC::Core::Vector< std::string >
SWC::client::Brokers::set
void set(BrokersEndPoints &&endpoints)
Definition: Brokers.cc:117
SWC::Comm::Resolver::is_ipv4_address
bool is_ipv4_address(const std::string &str) noexcept
Definition: Resolver.cc:141
SWC::client::ContextBroker
Definition: ContextBroker.h:15
SWC::client::Brokers::get
bool get(BrokerIdx &idx, Comm::EndPoints &endpoints)
Definition: Brokers.cc:103
SWC::client::Brokers::size
size_t size() noexcept
Definition: Brokers.cc:98
SWC::Comm::client::ConnQueueReqBase::Ptr
std::shared_ptr< ConnQueueReqBase > Ptr
Definition: ClientConnQueue.h:25
SWC::Comm::Resolver::is_ipv6_address
bool is_ipv6_address(const std::string &str) noexcept
Definition: Resolver.cc:147
SWC::Comm::Resolver::get_endpoints
EndPoints get_endpoints(uint16_t defaul_port, const Config::Strings &addrs, const std::string &host, const Networks &nets, bool srv=false)
Definition: Resolver.cc:152
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::client::Brokers::on_cfg_update
void on_cfg_update() noexcept
Definition: Brokers.cc:40
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC_LOG_CURRENT_EXCEPTION
#define SWC_LOG_CURRENT_EXCEPTION(_s_)
Definition: Exception.h:144
SWC::Core::Vector::emplace_back
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
Definition: Vector.h:349
SWC::client::Brokers::Brokers
SWC_CAN_INLINE Brokers() noexcept
Definition: Brokers.h:31
SWC::Core::Vector::reserve
SWC_CAN_INLINE void reserve(size_type cap)
Definition: Vector.h:288
SWC::client::Brokers::cfg_port
const uint16_t cfg_port
Definition: Brokers.h:62