SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Schemas.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
12 
13 namespace SWC { namespace client {
14 
15 
18  auto it = m_schemas.find(cid);
19  if(it != m_schemas.cend())
20  m_schemas.erase(it);
21 }
22 
23 void Schemas::remove(const std::string& name) {
25  for(auto it = m_schemas.cbegin(); it != m_schemas.cend(); ++it) {
26  if(Condition::str_eq(name, it->second.schema->col_name)) {
27  m_schemas.erase(it);
28  return;
29  }
30  }
31 }
32 
34  auto ts = Time::now_ms();
36  for(auto it = m_schemas.cbegin(); it != m_schemas.cend(); ) {
37  if(ts >= it->second.ts + m_expiry_ms->get()) {
38  m_schemas.erase(it);
39  } else {
40  ++it;
41  }
42  }
43 }
44 
45 
47  public:
48 
50  ColumnGetData(const SWC::client::Clients::Ptr& a_clients) noexcept
51  : clients(a_clients), await(), schema(),
52  err(Error::OK), once(false) {
53  }
54 
56  ~ColumnGetData() noexcept { }
57 
60  return clients;
61  }
62 
64  bool valid() {
65  return true;
66  }
67 
70  int error,
72  schema = std::move(rsp.schema);
73  err = error;
75  }
76 
78  bool wait(int& _err, DB::Schema::Ptr& _schema) {
79  await.wait();
80  _schema = schema;
81  _err = err;
82  bool at = false;
83  return once.compare_exchange_weak(at, true);
84  }
85 
86  private:
90  int err;
92 
93 };
94 
95 
96 
98 Schemas::get(int& err, cid_t cid, uint32_t timeout) {
99  Pending pending;
100  bool has_req;
101  {
103  auto it = m_schemas.find(cid);
104  if(it != m_schemas.cend() &&
105  Time::now_ms() < it->second.ts + m_expiry_ms->get()) {
106  return it->second.schema;
107  }
108  Pending& tmp = m_pending_cid[cid];
109  if(!(has_req = tmp.datap))
110  tmp = _request(cid, timeout);
111  pending = tmp;
112  }
113  if(!has_req)
114  pending.req->run();
115 
116  DB::Schema::Ptr schema;
117  if(pending.datap->wait(err, schema)) {
118  auto ts = Time::now_ms();
120  if(schema)
121  m_schemas[schema->cid].assign(ts, schema);
122  m_pending_cid.erase(cid);
123  }
124  if(!schema && !err)
126  return schema;
127 }
128 
130 Schemas::get(int& err, const std::string& name, uint32_t timeout) {
131  Pending pending;
132  bool has_req;
133  {
135  for(const auto& data : m_schemas) {
136  if(Condition::str_eq(name, data.second.schema->col_name)) {
137  if(Time::now_ms() < data.second.ts + m_expiry_ms->get())
138  return data.second.schema;
139  break;
140  }
141  }
142  Pending& tmp = m_pending_name[name];
143  if(!(has_req = tmp.datap))
144  tmp = _request(name, timeout);
145  pending = tmp;
146  }
147  if(!has_req)
148  pending.req->run();
149 
150  DB::Schema::Ptr schema;
151  if(pending.datap->wait(err, schema)) {
152  auto ts = Time::now_ms();
154  if(schema)
155  m_schemas[schema->cid].assign(ts, schema);
156  m_pending_name.erase(name);
157  }
158  if(!schema && !err)
160  return schema;
161 }
162 
165  auto it = m_schemas.find(cid);
166  return it != m_schemas.cend() &&
167  Time::now_ms() < it->second.ts + m_expiry_ms->get()
168  ? it->second.schema : nullptr;
169 }
170 
171 DB::Schema::Ptr Schemas::get(const std::string& name) {
173  for(const auto& data : m_schemas) { // ? cross-map m_schemas_names
174  if(Condition::str_eq(name, data.second.schema->col_name)) {
175  if(Time::now_ms() < data.second.ts + m_expiry_ms->get())
176  return data.second.schema;
177  break;
178  }
179  }
180  return nullptr;
181 }
182 
183 void
184 Schemas::get(int& err, const DB::Schemas::SelectorPatterns& patterns,
185  DB::SchemasVec& schemas, uint32_t timeout) {
186  _request(err, patterns, schemas, timeout);
187 
188  if(!err && schemas.empty()) {
190 
191  } else if(!err) {
192  auto ts = Time::now_ms();
194  for(auto& schema : schemas) {
195  m_schemas[schema->cid].assign(ts, schema);
196  }
197  }
198 }
199 
201 Schemas::get(int& err, const DB::Schemas::SelectorPatterns& patterns,
202  uint32_t timeout) {
203  DB::SchemasVec schemas;
204  get(err, patterns, schemas, timeout);
205  return schemas;
206 }
207 
208 void Schemas::set(const DB::Schema::Ptr& schema) {
209  auto ts = Time::now_ms();
211  m_schemas[schema->cid].assign(ts, schema);
212 }
213 
214 void Schemas::set(const DB::SchemasVec& schemas) {
215  auto ts = Time::now_ms();
217  for(auto& schema : schemas) {
218  m_schemas[schema->cid].assign(ts, schema);
219  }
220 }
221 
222 
224 Schemas::_request(cid_t cid, uint32_t timeout) {
225  switch(_clients->flags) {
226  case Clients::Flag::DEFAULT |Clients::Flag::BROKER |Clients::Flag::SCHEMA:
227  case Clients::Flag::BROKER: {
230  Comm::Protocol::Mngr::Params::ColumnGetReq::Flag::SCHEMA_BY_ID,
231  cid
232  ),
233  timeout,
234  _clients->shared()
235  );
236  return Schemas::Pending(req, &req->data);
237  }
238  default: {
241  Comm::Protocol::Mngr::Params::ColumnGetReq::Flag::SCHEMA_BY_ID,
242  cid
243  ),
244  timeout,
245  _clients->shared()
246  );
247  return Schemas::Pending(req, &req->data);
248  }
249  }
250 }
251 
253 Schemas::_request(const std::string& name, uint32_t timeout) {
254  switch(_clients->flags) {
255  case Clients::Flag::DEFAULT |Clients::Flag::BROKER |Clients::Flag::SCHEMA:
256  case Clients::Flag::BROKER: {
259  Comm::Protocol::Mngr::Params::ColumnGetReq::Flag::SCHEMA_BY_NAME,
260  name
261  ),
262  timeout,
263  _clients->shared()
264  );
265  return Schemas::Pending(req, &req->data);
266  }
267  default: {
270  Comm::Protocol::Mngr::Params::ColumnGetReq::Flag::SCHEMA_BY_NAME,
271  name
272  ),
273  timeout,
274  _clients->shared()
275  );
276  return Schemas::Pending(req, &req->data);
277  }
278  }
279 }
280 
281 void Schemas::_request(int& err,
282  const DB::Schemas::SelectorPatterns& patterns,
283  DB::SchemasVec& schemas,
284  uint32_t timeout) {
286  switch(_clients->flags) {
287  case Clients::Flag::DEFAULT |Clients::Flag::BROKER |Clients::Flag::SCHEMA:
290  params, timeout, _clients->shared(), err, schemas);
291  default:
293  params, timeout, _clients->shared(), err, schemas);
294  }
295 }
296 
297 
298 }}
SWC::Core::AtomicBase::compare_exchange_weak
constexpr SWC_CAN_INLINE bool compare_exchange_weak(T &at, T value) noexcept
Definition: Atomic.h:52
SWC::Comm::Protocol::Mngr::Params::ColumnListReq
Definition: ColumnList.h:18
SWC::client::Schemas::ColumnGetData::ColumnGetData
SWC_CAN_INLINE ColumnGetData(const SWC::client::Clients::Ptr &a_clients) noexcept
Definition: Schemas.cc:50
SWC::Comm::Protocol::Mngr::Params::ColumnGetReq
Definition: ColumnGet.h:18
SWC::client::Schemas::m_pending_name
std::unordered_map< std::string, Pending > m_pending_name
Definition: Schemas.h:102
SWC::Comm::Protocol::Bkr::Req::ColumnGet::make
static SWC_CAN_INLINE Ptr make(const Mngr::Params::ColumnGetReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: ColumnGet.h:27
SWC::Core::AtomicBase< bool >
Clients.h
SWC::DB::Schema::Ptr
std::shared_ptr< Schema > Ptr
Definition: Schema.h:185
StateSynchronization.h
data
T data
Definition: BitFieldInt.h:1
SWC::client::Schemas::ColumnGetData::~ColumnGetData
SWC_CAN_INLINE ~ColumnGetData() noexcept
Definition: Schemas.cc:56
SWC::client::Schemas::m_schemas
std::unordered_map< cid_t, SchemaData > m_schemas
Definition: Schemas.h:98
SWC::client::Clients::Ptr
ClientsPtr Ptr
Definition: Clients.h:58
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::client::Schemas::set
void set(const DB::Schema::Ptr &schema)
Definition: Schemas.cc:208
SWC::client::Schemas::clear_expired
void clear_expired()
Definition: Schemas.cc:33
SWC::FS::BROKER
@ BROKER
Definition: FileSystem.h:24
SWC::client::Schemas::ColumnGetData::callback
SWC_CAN_INLINE void callback(const Comm::client::ConnQueue::ReqBase::Ptr &, int error, const Comm::Protocol::Mngr::Params::ColumnGetRsp &rsp)
Definition: Schemas.cc:69
SWC::client::Schemas::m_expiry_ms
Config::Property::Value_int32_g::Ptr m_expiry_ms
Definition: Schemas.h:100
SWC::DB::Schemas::SelectorPatterns
Definition: Schemas.h:100
SWC::Config::Property::Value_int32_g::get
SWC_CAN_INLINE int32_t get() const noexcept
Definition: Property.h:610
SWC::client::Schemas::Pending::datap
ColumnGetData * datap
Definition: Schemas.h:65
SWC::client::Schemas::m_mutex
Core::MutexSptd m_mutex
Definition: Schemas.h:97
SWC::Condition::str_eq
bool str_eq(const char *s1, const char *s2) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: Comparators_basic.h:237
SWC::client::Schemas::ColumnGetData::schema
DB::Schema::Ptr schema
Definition: Schemas.cc:89
SWC::client::Schemas::Pending
Definition: Schemas.h:63
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Core::StateSynchronization::wait
SWC_CAN_INLINE void wait()
Definition: StateSynchronization.h:25
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
ColumnGet.h
SWC::client::Schemas::ColumnGetData
Definition: Schemas.cc:46
SWC::client::Schemas::ColumnGetData::once
Core::AtomicBool once
Definition: Schemas.cc:91
SWC::client::Schemas::ColumnGetData::clients
SWC::client::Clients::Ptr clients
Definition: Schemas.cc:87
ColumnList_Sync.h
SWC::client::Schemas::Pending::req
Comm::DispatchHandler::Ptr req
Definition: Schemas.h:64
SWC::client::Schemas::ColumnGetData::get_clients
SWC_CAN_INLINE SWC::client::Clients::Ptr & get_clients() noexcept
Definition: Schemas.cc:59
SWC::cid_t
uint64_t cid_t
Definition: Identifiers.h:16
SWC::client::Schemas::ColumnGetData::err
int err
Definition: Schemas.cc:90
SWC::client::Schemas::_request
Pending _request(cid_t cid, uint32_t timeout)
Definition: Schemas.cc:224
SWC::Core::Vector< Schema::Ptr >
SWC::client::Schemas::remove
void remove(cid_t cid)
Definition: Schemas.cc:16
SWC::Error::COLUMN_SCHEMA_MISSING
@ COLUMN_SCHEMA_MISSING
Definition: Error.h:112
SWC::client::Schemas::m_pending_cid
std::unordered_map< cid_t, Pending > m_pending_cid
Definition: Schemas.h:101
SWC::client::Schemas::get
DB::Schema::Ptr get(int &err, cid_t cid, uint32_t timeout=300000)
Definition: Schemas.cc:98
SWC::Comm::Protocol::Mngr::Params::ColumnGetRsp
Definition: ColumnGet.h:59
ColumnList_Sync.h
ColumnGet.h
SWC::Comm::Protocol::Bkr::Req::ColumnList_Sync::request
static SWC_CAN_INLINE void request(const Mngr::Params::ColumnListReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: ColumnList_Sync.h:32
SWC::Core::StateSynchronization
Definition: StateSynchronization.h:16
SWC::Time::now_ms
SWC_CAN_INLINE int64_t now_ms() noexcept
Definition: Time.h:36
SWC::client::Schemas::_clients
Clients * _clients
Definition: Schemas.h:99
SWC::client::Schemas::ColumnGetData::valid
SWC_CAN_INLINE bool valid()
Definition: Schemas.cc:64
SWC::Comm::Protocol::Mngr::Req::ColumnList_Sync::request
static SWC_CAN_INLINE void request(const Params::ColumnListReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: ColumnList_Sync.h:32
SWC::Core::StateSynchronization::acknowledge
SWC_CAN_INLINE void acknowledge() noexcept
Definition: StateSynchronization.h:31
SWC::Comm::Protocol::Mngr::Params::ColumnGetRsp::schema
DB::Schema::Ptr schema
Definition: ColumnGet.h:74
SWC::Comm::client::ConnQueueReqBase::Ptr
std::shared_ptr< ConnQueueReqBase > Ptr
Definition: ClientConnQueue.h:25
SWC::client::Schemas::ColumnGetData::wait
SWC_CAN_INLINE bool wait(int &_err, DB::Schema::Ptr &_schema)
Definition: Schemas.cc:78
SWC::client::Clients::shared
Ptr shared()
Definition: Clients.h:87
SWC::client::Schemas::ColumnGetData::await
Core::StateSynchronization await
Definition: Schemas.cc:88
SWC::Comm::Protocol::Mngr::Req::ColumnGet::make
static SWC_CAN_INLINE Ptr make(const Params::ColumnGetReq &params, const uint32_t timeout, DataArgsT &&... args)
Definition: ColumnGet.h:27
SWC::client::Clients::flags
Core::Atomic< uint8_t > flags
Definition: Clients.h:267