SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Groups.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 
9 
10 namespace SWC { namespace client { namespace Mngr {
11 
12 
13 Group::Group(uint8_t a_role, cid_t a_cid_begin, cid_t a_cid_end,
14  const Comm::EndPoints& endpoints)
15  : Hosts(1, endpoints),
16  role(a_role), cid_begin(a_cid_begin), cid_end(a_cid_end),
17  m_mutex() {
18 }
19 
20 Group::Group(uint8_t a_role, cid_t a_cid_begin, cid_t a_cid_end,
21  const Hosts& hosts)
22  : Hosts(hosts),
23  role(a_role), cid_begin(a_cid_begin), cid_end(a_cid_end),
24  m_mutex() {
25 }
26 
28  return Group::Ptr(new Group(role, cid_begin, cid_end, get_hosts()));
29 }
30 
31 void Group::add_host(Comm::EndPoints& new_endpoints) {
33 
34  Comm::EndPoints* found_host = nullptr;
35  for(auto& endpoint : new_endpoints) {
36  _get_host(endpoint, found_host);
37  if(found_host)
38  return found_host->swap(new_endpoints);
39  }
40  push_back(new_endpoints);
41 }
42 
45  return Hosts(cbegin(), cend());
46 }
47 
48 bool Group::is_in_group(const Comm::EndPoint& endpoint) noexcept {
49  Core::MutexSptd::scope lock(m_mutex);
50 
51  Comm::EndPoints* found_host;
52  _get_host(endpoint, found_host);
53  return bool(found_host);
54 }
55 
56 void Group::print(std::ostream& out) {
58 
59  out << "group:\n"
60  << " role=" << DB::Types::MngrRole::to_string(role)
61  << " column=" << cid_begin << '-' << cid_end << '\n';
62  for(auto& endpoints : *this) {
63  out << " host=\n";
64  for(auto& endpoint : endpoints)
65  out << " " << endpoint << '\n';
66  }
67 }
68 
71 
72  for(auto& endpoints : *this) {
73  for(auto& point : endpoints) {
74  if(std::find(to.cbegin(), to.cend(), point) == to.cend())
75  to.push_back(point);
76  }
77  }
78 }
79 
80 void Group::_get_host(const Comm::EndPoint& point,
81  Comm::EndPoints*& found_host) noexcept {
82  for(auto& points : *this) {
83  if(std::find(points.cbegin(), points.cend(), point) != points.cend()) {
84  found_host = &points;
85  return;
86  }
87  }
88  found_host = nullptr;
89 }
90 
91 
93  : cfg_hosts(
94  settings.get<Config::Property::Value_strings_g>("swc.mngr.host")),
95  cfg_port(settings.get_i16("swc.mngr.port")),
96  m_mutex(), m_active_g_host(), m_nets() {
97  asio::error_code ec;
99  settings.get_strs("swc.comm.network.priority"),
100  m_nets, ec
101  );
102  if(ec)
104  "swc.comm.network.priority error(%s)",
105  ec.message().c_str());
106 }
107 
108 
109 Groups::Groups(const Groups& other, Groups::Vec&& groups)
110  : Vec(std::move(groups)),
111  cfg_hosts(other.cfg_hosts), cfg_port(other.cfg_port),
112  m_mutex(), m_active_g_host(), m_nets(other.m_nets) {
113 }
114 
117  [cb=shared_from_this()]{ cb->on_cfg_update(); });
118  on_cfg_update();
119  return shared_from_this();
120 }
121 
123  Vec groups;
125  groups.reserve(size());
126  for(auto& group : *this)
127  groups.push_back(group->copy());
128  return Groups::Ptr(new Groups(*this, std::move(groups)));
129 }
130 
132  SWC_LOG(LOG_DEBUG, "update_cfg()");
133 
134  uint8_t role;
135  int64_t col_begin;
136  int64_t col_end;
137  std::string host_or_ips;
138  uint16_t port;
139 
140  size_t at;
141  size_t at_chk;
142  size_t at_offset;
143 
144  auto hosts = cfg_hosts->get();
145  int n = 0;
146  std::string cfg_chk;
147 
148  bool support = m_mutex.lock();
149  clear();
150  for(auto& cfg : hosts) {
151  SWC_LOGF(LOG_DEBUG, "cfg=%d swc.mngr.host=%s", n++, cfg.c_str());
152 
153  if((at = cfg.find_first_of('|')) == std::string::npos) {
155  continue;
156  }
157 
159  cfg_chk = cfg.substr(at_offset = 0, at);
160  if((at_chk = cfg_chk.find_first_of('{')) != std::string::npos) {
161  cfg_chk = cfg_chk.substr(++at_chk, cfg_chk.find_first_of('}')-1);
162  for(;;) {
163  if(Condition::str_case_eq(cfg_chk.data(), "schemas", 7))
165  else if(Condition::str_case_eq(cfg_chk.data(), "rangers", 7))
167  if((at_chk = cfg_chk.find_first_of(',')) == std::string::npos)
168  break;
169  cfg_chk = cfg_chk.substr(++at_chk);
170  }
171  at = cfg.find_first_of('|', at_offset = ++at);
172  }
173 
174 
175  col_begin = 0;
176  col_end = 0;
177  cfg_chk = cfg.substr(at_offset, at-at_offset);
178  if((at_chk = cfg_chk.find_first_of('[')) != std::string::npos) {
179  cfg_chk = cfg_chk.substr(++at_chk, cfg_chk.find_first_of(']')-1);
180  auto col_range_at = cfg_chk.find_first_of('-');
181  auto b = cfg_chk.substr(0, col_range_at);
182  auto e = cfg_chk.substr(col_range_at+1);
183  if(!b.empty())
184  Config::Property::from_string(b, &col_begin);
185  if(!e.empty())
186  Config::Property::from_string(e, &col_end);
187 
188  at = cfg.find_first_of('|', at_offset = ++at);
189  } else if(role != DB::Types::MngrRole::COLUMNS) {
192  }
193 
194  if(role == DB::Types::MngrRole::COLUMNS && !col_begin && !col_end)
196 
197  cfg_chk = cfg.substr(at_offset);
198  if((at_chk = cfg_chk.find_first_of('|')) == std::string::npos) {
199  host_or_ips = cfg_chk;
200  port = cfg_port;
201  } else {
202  host_or_ips = cfg_chk.substr(0, at_chk);
203  Config::Property::from_string(cfg_chk.c_str() + at_chk + 1, &port);
204  }
205 
206  _add_host(role, col_begin, col_end, port, host_or_ips);
207 
208  }
209  m_mutex.unlock(support);
210 
212 }
213 
214 void Groups::_add_host(uint8_t role, cid_t cid_begin, cid_t cid_end,
215  uint16_t port, std::string host_or_ips) {
216  Config::Strings ips;
217  std::string host;
218  size_t at;
219  do {
220  auto addr = host_or_ips;
221  at = host_or_ips.find_first_of(',');
222  if(at != std::string::npos) {
223  addr = host_or_ips.substr(0, at);
224  host_or_ips = host_or_ips.substr(at+1, host_or_ips.length());
225  }
228  ips.push_back(addr);
229  else
230  host = addr;
231 
232  } while(at != std::string::npos);
233 
235  port, ips, host, m_nets);
236 
237  if(endpoints.empty())
238  return;
239  for(auto& group : *this) {
240  if(group->role == role &&
241  group->cid_begin == cid_begin &&
242  group->cid_end == cid_end)
243  return group->add_host(endpoints);
244  }
245  emplace_back(new Group(role, cid_begin, cid_end, endpoints));
246 }
247 
250  return Vec(cbegin(), cend());
251 }
252 
253 void Groups::hosts(uint8_t role, cid_t cid, Hosts& hosts,
254  Groups::GroupHost &group_host) {
256 
257  for(auto& group : *this) {
258  if(group->role & role && (
259  !(role & DB::Types::MngrRole::COLUMNS) ||
260  (group->cid_begin <= cid && (!group->cid_end || group->cid_end >= cid))
261  )) {
262  hosts = group->get_hosts();
263  group_host.role = group->role;
264  group_host.cid_begin = group->cid_begin;
265  group_host.cid_end = group->cid_end;
266  break;
267  }
268  }
269 }
270 
272  Vec hgroups;
274 
275  for(auto& group : *this) {
276  for(auto& endpoint : endpoints) {
277  if(group->is_in_group(endpoint) &&
278  std::find(hgroups.cbegin(), hgroups.cend(), group) == hgroups.cend())
279  hgroups.push_back(group);
280  }
281  }
282  return hgroups;
283 }
284 
286  cid_t cid_end) {
287  Comm::EndPoints endpoints;
288  if(!cid_end)
289  cid_end = cid_begin;
291 
292  for(auto& group : *this) {
293  if((!role || group->role & role) &&
294  (!cid_begin || group->cid_begin <= cid_begin) &&
295  (!cid_end || group->cid_end ||
296  (cid_end && group->cid_end >= cid_end))) {
297  group->apply_endpoints(endpoints);
298  }
299  }
300  return endpoints;
301 }
302 
303 void Groups::print(std::ostream& out) {
304  out << "manager-groups:\n";
306 
307  for(auto& group : *this)
308  group->print(out);
309 }
310 
313 
314  for(auto it=m_active_g_host.begin(); it != m_active_g_host.cend(); ++it) {
315  if(Comm::has_endpoint(g_host.endpoints, it->endpoints))
316  return;
317  if(g_host.role == it->role &&
318  g_host.cid_begin == it->cid_begin &&
319  g_host.cid_end == it->cid_end) {
320  it->endpoints = std::move(g_host.endpoints);
321  return;
322  }
323  }
324  m_active_g_host.emplace_back(std::move(g_host));
325 }
326 
327 void Groups::remove(const Comm::EndPoints& endpoints) {
329 
330  for(auto it=m_active_g_host.cbegin(); it != m_active_g_host.cend(); ) {
331  if(Comm::has_endpoint(endpoints, it->endpoints))
332  m_active_g_host.erase(it);
333  else
334  ++it;
335  }
336 }
337 
338 void Groups::select(const cid_t& cid, Comm::EndPoints& endpoints) {
340 
341  for(auto& host : m_active_g_host) {
342  if(host.role & DB::Types::MngrRole::COLUMNS &&
343  host.cid_begin <= cid &&
344  (!host.cid_end || host.cid_end >= cid)) {
345  endpoints = host.endpoints;
346  return;
347  }
348  }
349 }
350 
351 void Groups::select(const uint8_t& role, Comm::EndPoints& endpoints) {
353 
354  for(auto& host : m_active_g_host) {
355  if(host.role & role) {
356  endpoints = host.endpoints;
357  return;
358  }
359  }
360 }
361 
362 
363 }}}
SWC::client::Mngr::Groups::cfg_port
const uint16_t cfg_port
Definition: Groups.h:122
SWC::client::Mngr::Groups
Definition: Groups.h:61
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
SWC::Config::Properties::get_strs
Strings get_strs(const char *name) const
Definition: Properties.h:85
SWC::client::Mngr::Group::_get_host
void _get_host(const Comm::EndPoint &point, Comm::EndPoints *&found_host) noexcept
Definition: Groups.cc:80
Groups.h
SWC::client::Mngr::Groups::copy
Ptr copy()
Definition: Groups.cc:122
SWC::client::Mngr::Groups::remove
void remove(const Comm::EndPoints &endpoints)
Definition: Groups.cc:327
SWC::Comm::Resolver::get_networks
void get_networks(const Config::Strings &networks, Networks &nets, asio::error_code &ec)
Definition: Resolver.cc:264
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::client::Mngr::Group::Group
Group(uint8_t role, cid_t cid_begin, cid_t cid_end, const Comm::EndPoints &endpoints)
Definition: Groups.cc:13
SWC::Core::Vector< Group::Ptr >::clear
SWC_CAN_INLINE void clear() noexcept(_NoExceptDestructor)
Definition: Vector.h:120
SWC::client::Mngr::Groups::hosts
void hosts(uint8_t role, cid_t cid, Hosts &hosts, GroupHost &group_host)
Definition: Groups.cc:253
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC::client::Mngr::Groups::print
void print(std::ostream &out)
Definition: Groups.cc:303
SWC::client::Mngr::Groups::add
void add(GroupHost &&g_host)
Definition: Groups.cc:311
SWC::Comm::EndPoint
asio::ip::tcp::endpoint EndPoint
Definition: Resolver.h:19
SWC::DB::Types::MngrRole::NO_COLUMNS
const uint8_t NO_COLUMNS
Definition: MngrRole.h:17
SWC::client::Mngr::Groups::GroupHost
Definition: Groups.h:65
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::client::Mngr::Groups::select
void select(const cid_t &cid, Comm::EndPoints &endpoints)
Definition: Groups.cc:338
SWC::client::Mngr::Groups::Ptr
std::shared_ptr< Groups > Ptr
Definition: Groups.h:78
SWC::client::Mngr::Group::Ptr
std::shared_ptr< Group > Ptr
Definition: Groups.h:23
SWC::Config::Property::Value_strings_g::set_cb_on_chg
void set_cb_on_chg(OnChg_t &&cb)
Definition: Property.cc:1071
SWC::Core::MutexSptd::unlock
SWC_CAN_INLINE void unlock(const bool &support) noexcept
Definition: MutexSptd.h:71
SWC::client::Mngr::Group::m_mutex
Core::MutexSptd m_mutex
Definition: Groups.h:54
SWC::Comm::has_endpoint
bool SWC_PURE_FUNC has_endpoint(const EndPoint &e1, const EndPoints &endpoints_in) noexcept
Definition: Resolver.cc:93
SWC::client::Mngr::Group::cid_begin
const cid_t cid_begin
Definition: Groups.h:25
SWC::client::Mngr::Group::is_in_group
bool is_in_group(const Comm::EndPoint &endpoint) noexcept
Definition: Groups.cc:48
SWC::client::Mngr::Hosts
Core::Vector< Comm::EndPoints > Hosts
Definition: Groups.h:18
SWC::DB::Types::MngrRole::SCHEMAS
const uint8_t SCHEMAS
Definition: MngrRole.h:15
SWC::client::Mngr::Group::get_hosts
Hosts get_hosts()
Definition: Groups.cc:43
SWC::client::Mngr::Groups::GroupHost::cid_begin
cid_t cid_begin
Definition: Groups.h:67
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC::client::Mngr::Groups::get_groups
Vec get_groups()
Definition: Groups.cc:248
SWC::client::Mngr::Groups::on_cfg_update
void on_cfg_update()
Definition: Groups.cc:131
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::client::Mngr::Groups::get_endpoints
Comm::EndPoints get_endpoints(uint8_t role=0, cid_t cid_begin=0, cid_t cid_end=0)
Definition: Groups.cc:285
SWC_LOG
#define SWC_LOG(priority, message)
Definition: Logger.h:191
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Core::MutexSptd::lock
SWC_CAN_INLINE bool lock() noexcept
Definition: MutexSptd.h:39
SWC::client::Mngr::Group::apply_endpoints
void apply_endpoints(Comm::EndPoints &to_endpoints)
Definition: Groups.cc:69
SWC::client::Mngr::Group::add_host
void add_host(Comm::EndPoints &new_endpoints)
Definition: Groups.cc:31
SWC::client::Mngr::Group
Definition: Groups.h:20
SWC::client::Mngr::Groups::m_active_g_host
Core::Vector< GroupHost > m_active_g_host
Definition: Groups.h:125
SWC::client::Mngr::Groups::m_mutex
Core::MutexSptd m_mutex
Definition: Groups.h:124
SWC_THROWF
#define SWC_THROWF(_code_, _fmt_,...)
Definition: Exception.h:136
SWC::client::Mngr::Groups::GroupHost::role
uint8_t role
Definition: Groups.h:66
SWC::Config::Property::from_string
void from_string(const char *s, double *value)
Definition: Property.cc:109
SWC::Core::Vector::swap
SWC_CAN_INLINE void swap(Vector &other) noexcept
Definition: Vector.h:161
SWC::Config::Settings
Definition: Settings.h:25
SWC::Config::Property::Value_strings_g::get
Strings get() const
Definition: Property.cc:1051
SWC::cid_t
uint64_t cid_t
Definition: Identifiers.h:16
SWC::client::Mngr::Groups::_add_host
void _add_host(uint8_t role, cid_t cid_begin, cid_t cid_end, uint16_t port, std::string host_or_ips)
Definition: Groups.cc:214
SWC::Error::CONFIG_BAD_VALUE
@ CONFIG_BAD_VALUE
Definition: Error.h:81
SWC::Core::Vector< EndPoint >
SWC::Comm::Resolver::is_ipv4_address
bool is_ipv4_address(const std::string &str) noexcept
Definition: Resolver.cc:141
SWC::client::Mngr::Group::cid_end
const cid_t cid_end
Definition: Groups.h:26
SWC::client::Mngr::Groups::GroupHost::cid_end
cid_t cid_end
Definition: Groups.h:68
SWC::client::Mngr::Groups::Groups
Groups(const Config::Settings &settings)
Definition: Groups.cc:92
SWC::Core::Vector< Comm::EndPoints >::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::DB::Types::MngrRole::ALL
const uint8_t ALL
Definition: MngrRole.h:18
SWC::client::Mngr::Group::role
const uint8_t role
Definition: Groups.h:24
SWC::client::Mngr::Groups::m_nets
Comm::Networks m_nets
Definition: Groups.h:126
SWC::DB::Types::MngrRole::RANGERS
const uint8_t RANGERS
Definition: MngrRole.h:16
SWC::Comm::Resolver::is_ipv6_address
bool is_ipv6_address(const std::string &str) noexcept
Definition: Resolver.cc:147
SWC::Comm::Resolver::get_endpoints
EndPoints get_endpoints(uint16_t defaul_port, const Config::Strings &addrs, const std::string &host, const Networks &nets, bool srv=false)
Definition: Resolver.cc:152
SWC::Core::Vector< Comm::EndPoints >::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::Core::Vector< Comm::EndPoints >::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Core::Vector< Group::Ptr >::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::client::Mngr::Groups::init
Ptr init()
Definition: Groups.cc:115
SWC::Condition::str_case_eq
bool str_case_eq(const char *s1, const char *s2, size_t count) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: Comparators_basic.h:257
SWC::Core::Vector< Group::Ptr >::emplace_back
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
Definition: Vector.h:349
SWC::client::Mngr::Groups::cfg_hosts
Config::Property::Value_strings_g::Ptr cfg_hosts
Definition: Groups.h:121
SWC::client::Mngr::Groups::Vec
Core::Vector< Group::Ptr > Vec
Definition: Groups.h:79
SWC::DB::Types::MngrRole::COLUMNS
const uint8_t COLUMNS
Definition: MngrRole.h:14
SWC::DB::Types::MngrRole::to_string
std::string to_string(uint8_t role)
Definition: MngrRole.cc:19
SWC::client::Mngr::Group::copy
Ptr copy()
Definition: Groups.cc:27
SWC::Core::Vector::reserve
SWC_CAN_INLINE void reserve(size_type cap)
Definition: Vector.h:288
SWC::client::Mngr::Group::print
void print(std::ostream &out)
Definition: Groups.cc:56