SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Interface.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #include "swcdb/fs/Interface.h"
8 #include <dlfcn.h>
9 
10 
11 namespace SWC { namespace FS {
12 
13 namespace { // local ns
14 void hold_delay() {
15  std::this_thread::sleep_for(std::chrono::microseconds(10000));
16 }
17 }
18 
20  : m_type(typ),
21  m_fs(use_filesystem(settings)),
22  loaded_dl() {
23  SWC_LOGF(LOG_INFO, "INIT-%s", to_string().c_str());
24 }
25 
28  Configurables config(settings);
29 
30  std::string fs_name;
31  switch(m_type) {
32 
33  case Type::LOCAL:{
34  #if defined (BUILTIN_FS_LOCAL) || defined (BUILTIN_FS_ALL)
35  return FileSystem::Ptr(new FileSystemLocal(&config));
36  #endif
37  fs_name.append("local");
38  break;
39  }
40 
41  case Type::BROKER:{
42  #if defined (BUILTIN_FS_BROKER) || defined (BUILTIN_FS_ALL)
43  return FileSystem::Ptr(new FileSystemBroker(&config));
44  #endif
45  fs_name.append("broker");
46  break;
47  }
48 
49  case Type::HADOOP: {
50  #if defined (BUILTIN_FS_HADOOP) || defined (BUILTIN_FS_ALL)
51  return FileSystem::Ptr(new FileSystemHadoop(&config));
52  #endif
53  fs_name.append("hadoop");
54  break;
55  }
56 
57  case Type::HADOOP_JVM: {
58  #if defined (BUILTIN_FS_HADOOP_JVM) || defined (BUILTIN_FS_ALL)
59  return FileSystem::Ptr(new FileSystemHadoopJVM(&config));
60  #endif
61  fs_name.append("hadoop_jvm");
62  break;
63  }
64 
65  case Type::CEPH:{
66  #if defined (BUILTIN_FS_CEPH) || defined (BUILTIN_FS_ALL)
67  return FileSystem::Ptr(new FileSystemCeph(&config));
68  #endif
69  fs_name.append("ceph");
70  break;
71  }
72 
73  case Type::CUSTOM: {
74  fs_name.append("custom");
75  break;
76  }
77 
78  default:
80  "Not implemented FileSystem name=%s type=%d",
81  fs_name.c_str(), int(m_type));
82  }
83 
84  std::string cfg_lib("swc.fs.lib." + fs_name);
85  std::string fs_lib;
86  if(settings->has(cfg_lib.c_str())) {
87  fs_lib = settings->get_str(cfg_lib.c_str());
88  } else {
89  fs_lib.reserve(settings->install_path.length() + 20 + fs_name.length());
90  fs_lib.append(settings->install_path);
91  fs_lib.append("/lib/libswcdb_fs_"); // (./lib/libswcdb_fs_local.so)
92  fs_lib.append(fs_name);
93  fs_lib.append(SWC_DSO_EXT);
94  }
95 
96  const char* err = dlerror();
97  void* handle = dlopen(fs_lib.c_str(), RTLD_NOW | RTLD_LAZY | RTLD_LOCAL);
98  if (err || !handle)
100  "Shared Lib %s, open fail: %s\n",
101  fs_lib.c_str(), err);
102 
103  err = dlerror();
104  std::string handler_name("fs_make_new_" + fs_name);
105  void* f_new_ptr = dlsym(handle, handler_name.c_str());
106  if (err || !f_new_ptr)
108  "Shared Lib %s, link(%s) fail: %s handle=%p\n",
109  fs_lib.c_str(), handler_name.c_str(), err, handle);
110 
111  loaded_dl = {.lib=handle, .make=f_new_ptr};
112  return FileSystem::Ptr(
113  reinterpret_cast<fs_make_new_t*>(f_new_ptr)(&config));
114 }
115 
116 
118  m_fs = nullptr;
119  if(loaded_dl.lib) {
120  dlclose(loaded_dl.lib);
121  }
122 }
123 
125  return m_fs->get_type_underlying();
126 }
127 
128 std::string Interface::to_string() const {
129  return format("FS::Interface(type=%d, details=%s)",
130  int(m_type), m_fs ? m_fs->to_string().c_str() : "NULL");
131 }
132 
133 bool Interface::need_fds() const noexcept {
134  return m_fs->need_fds();
135 }
136 
138  m_fs->stop();
139 }
140 
141 void Interface::get_structured_ids(int& err, const std::string& base_path,
142  IdEntries_t& entries,
143  const std::string& base_id) {
144  //path(base_path) .../111/222/333/444f >> IdEntries_t{(int64_t)111222333444}
145 
146  DirentList dirs;
147  readdir(err, base_path, dirs);
148  if(err)
149  return;
150 
151  entries.reserve(entries.size() + dirs.size());
152  for(auto& entry : dirs) {
153  if(!entry.is_dir) continue;
154 
155  std::string id_name;
156  id_name.reserve(base_id.length() + entry.name.length());
157  id_name.append(base_id);
158  if(entry.name.back() == ID_SPLIT_LAST) {
159  id_name.append(entry.name.substr(0, entry.name.length()-1));
160  try {
161  entries.push_back(std::stoull(id_name));
162  } catch(...){
163  SWC_LOGF(LOG_ERROR, "Error converting id_name=%s to uint64",
164  id_name.c_str());
165  }
166  continue;
167  }
168 
169  std::string new_base_path;
170  new_base_path.reserve(base_path.length() + 1 + entry.name.length());
171  new_base_path.append(base_path);
172  new_base_path.append("/");
173  new_base_path.append(entry.name);
174  id_name.append(entry.name);
175  get_structured_ids(err, new_base_path, entries, id_name);
176  if(err)
177  return;
178  }
179 }
180 
181 // default form to FS methods
182 
183 void Interface::readdir(int& err, const std::string& base_path,
184  DirentList& dirs) {
185  for(;;) {
186  dirs.clear();
187  m_fs->readdir(err = Error::OK, base_path, dirs);
188  switch(err) {
189  case Error::OK:
190  case ENOENT:
192  return;
193  default:
194  hold_delay();
195  SWC_LOGF(LOG_WARN, "readdir, retrying to err=%d(%s) dir(%s)",
196  err, Error::get_text(err), base_path.c_str());
197  }
198  }
199 }
200 
201 bool Interface::exists(int& err, const std::string& name) {
202  for(bool state;;) {
203  state = m_fs->exists(err = Error::OK, name);
204  switch(err) {
205  case Error::OK:
207  return state;
208  default:
209  hold_delay();
210  SWC_LOGF(LOG_WARN, "exists, retrying to err=%d(%s) path(%s)",
211  err, Error::get_text(err), name.c_str());
212  }
213  }
214 }
215 
217  const std::string& name) {
218  m_fs->exists([name, cb=std::move(cb), ptr=ptr()]
219  (int err, bool state) mutable {
220  if(!err || err == Error::SERVER_SHUTTING_DOWN)
221  cb(err, state);
222  else
223  ptr->exists(std::move(cb), name);
224  },
225  name
226  );
227 }
228 
229 void Interface::mkdirs(int& err, const std::string& name) {
230  for(;;) {
231  m_fs->mkdirs(err = Error::OK, name);
232  switch(err) {
233  case Error::OK:
234  case EEXIST:
236  return;
237  default:
238  hold_delay();
239  SWC_LOGF(LOG_WARN, "mkdirs, retrying to err=%d(%s) dir(%s)",
240  err, Error::get_text(err), name.c_str());
241  }
242  }
243 }
244 
245 void Interface::rmdir(int& err, const std::string& name) {
246  for(;;) {
247  m_fs->rmdir(err = Error::OK, name);
248  switch(err) {
249  case Error::OK:
250  case ENOENT:
252  return;
253  default:
254  hold_delay();
255  SWC_LOGF(LOG_WARN, "rmdir, retrying to err=%d(%s) dir(%s)",
256  err, Error::get_text(err), name.c_str());
257  }
258  }
259 }
260 
261 void Interface::rmdir_incl_opt_subs(int& err, const std::string& name,
262  const std::string& up_to) {
263  rmdir(err, name);
264  if(err)
265  return;
266 
267  const char* p=name.data();
268  std::string base_path;
269  for(const char* c=p+name.length(); c>p; --c) {
270  if(*c != '/')
271  continue;
272  base_path = std::string(p, c-p);
273  if(Condition::str_eq(up_to, base_path))
274  break;
275 
276  DirentList entrs;
277  readdir(err, base_path, entrs);
278  if(!err && !entrs.size()) {
279  rmdir(err, base_path);
280  if(!err)
281  continue;
282  }
283  break;
284  }
285  if(err)
286  err = Error::OK;
287 }
288 
289 void Interface::remove(int& err, const std::string& name) {
290  for(;;) {
291  m_fs->remove(err = Error::OK, name);
292  switch(err) {
293  case Error::OK:
294  case ENOENT:
296  return;
297  default:
298  hold_delay();
299  SWC_LOGF(LOG_WARN, "remove, retrying to err=%d(%s) file(%s)",
300  err, Error::get_text(err), name.c_str());
301  }
302  }
303 }
304 
306  const std::string& name) {
307  m_fs->remove([name, cb=std::move(cb), ptr=ptr()]
308  (int err) mutable {
309  switch(err) {
310  case Error::OK:
311  case ENOENT:
313  return cb(err);
314  default: {
315  hold_delay();
316  SWC_LOGF(LOG_WARN, "remove, retrying to err=%d(%s) file(%s)",
317  err, Error::get_text(err), name.c_str());
318  ptr->remove(std::move(cb), name);
319  }
320  }
321  },
322  name
323  );
324 }
325 
326 void Interface::rename(int& err, const std::string& from,
327  const std::string& to) {
328  for(;;) {
329  m_fs->rename(err = Error::OK, from, to);
330  switch(err) {
331  case Error::OK:
333  return;
334  case ENOENT: {
335  int e_err;
336  if(!exists(e_err, from) && !e_err && exists(e_err, to) && !e_err) {
337  err = Error::OK;
338  return;
339  }
340  [[fallthrough]];
341  }
342  case ENOTEMPTY: { // overwrite dir like rename of file
343  int e_err;
344  rmdir(e_err, to);
345  [[fallthrough]];
346  }
347  default:
348  hold_delay();
349  SWC_LOGF(LOG_WARN, "rename, retrying to err=%d(%s) from(%s) to(%s)",
350  err, Error::get_text(err), from.c_str(), to.c_str());
351  }
352  }
353 }
354 
355 size_t Interface::length(int& err, const std::string& name) {
356  for(size_t length;;) {
357  length = m_fs->length(err = Error::OK, name);
358  switch(err) {
359  case Error::OK:
361  case ENOENT:
362  //case Error::FS_PERMISSION_DENIED:
364  return length;
365  default:
366  hold_delay();
367  SWC_LOGF(LOG_WARN, "length, retrying to err=%d(%s) file(%s)",
368  err, Error::get_text(err), name.c_str());
369  }
370  }
371 }
372 
373 
374 void Interface::write(int& err, SmartFd::Ptr smartfd,
375  uint8_t replication, StaticBuffer& buffer) {
376  for(buffer.own=false;;) {
377  m_fs->write(err = Error::OK, smartfd, replication, buffer);
378  switch(err) {
379  case Error::OK:
381  //case Error::FS_PERMISSION_DENIED:
383  buffer.own=true;
384  return;
385  }
386  default:
387  hold_delay();
388  SWC_LOGF(LOG_WARN, "write, retrying to err=%d(%s) file(%s)",
389  err, Error::get_text(err), smartfd->filepath().c_str());
390  }
391  }
392 }
393 
394 void Interface::read(int& err, const std::string& name, StaticBuffer* dst) {
395  for(;;) {
396  m_fs->read(err = Error::OK, name, dst);
397  switch(err) {
398  case Error::OK:
399  case Error::FS_EOF:
401  //case Error::FS_PERMISSION_DENIED:
403  return;
404  default:
405  hold_delay();
406  SWC_LOGF(LOG_WARN, "read-all, retrying to err=%d(%s) file(%s)",
407  err, Error::get_text(err), name.c_str());
408  }
409  }
410 }
411 
412 bool Interface::open(int& err, SmartFd::Ptr& smartfd) {
413  m_fs->open(err = Error::OK, smartfd);
414  switch(err) {
415  case Error::OK:
417  //case Error::FS_PERMISSION_DENIED:
419  return false;
420  default:
421  if(smartfd->valid()) {
422  int tmperr = Error::OK;
423  m_fs->close(tmperr, smartfd);
424  }
425  return true;
426  }
427 }
428 
429 bool Interface::create(int& err, SmartFd::Ptr& smartfd, uint8_t replication) {
430  m_fs->create(err = Error::OK, smartfd, replication);
431  switch(err) {
432  case Error::OK:
434  //case Error::FS_PERMISSION_DENIED:
436  return false;
437  default:
438  if(smartfd->valid()) {
439  int tmperr = Error::OK;
440  m_fs->close(tmperr, smartfd);
441  }
442  return true;
443  }
444 }
445 
446 void Interface::close(int& err, SmartFd::Ptr& smartfd) {
447  while(smartfd->valid()) {
448  m_fs->close(err = Error::OK, smartfd);
449  switch(err) {
450  case Error::OK:
451  case EACCES:
452  case ENOENT:
453  case EBADR:
454  case EBADF:
457  return;
458  default:
459  hold_delay();
460  SWC_LOGF(LOG_WARN, "close, retrying to err=%d(%s) file(%s)",
461  err, Error::get_text(err), smartfd->filepath().c_str());
462  }
463  }
464 }
465 
467  SmartFd::Ptr& smartfd) {
468  m_fs->close(
469  [cb=std::move(cb), _smartfd=smartfd, ptr=ptr()] (int err) mutable {
470  switch(err) {
471  case Error::OK:
472  case EACCES:
473  case ENOENT:
474  case EBADR:
475  case EBADF:
478  return cb(err);
479  default: {
480  if(!_smartfd->valid())
481  return cb(EBADR);
482  hold_delay();
483  SWC_LOGF(LOG_WARN, "close, retrying to err=%d(%s) file(%s)",
484  err, Error::get_text(err), _smartfd->filepath().c_str());
485  ptr->close(std::move(cb), _smartfd);
486  }
487  }
488  },
489  smartfd
490  );
491 }
492 
493 
494 void set_structured_id(const std::string& number, std::string& s) {
495  auto it = number.cbegin();
496  for(size_t n = 0;
497  n + ID_SPLIT_LEN < number.length();
498  n += ID_SPLIT_LEN, it += ID_SPLIT_LEN) {
499  s.reserve(s.length() + ID_SPLIT_LEN + 1);
500  s.append(it, it + ID_SPLIT_LEN);
501  s.append("/");
502  }
503  s.append(it, number.cend());
504  s += ID_SPLIT_LAST;
505 }
506 
507 } // namespace FS
508 
509 
510 
511 namespace Env {
512 
514  FS::Type typ) {
515  m_env.reset(new FsInterface(settings, typ));
516 }
517 
519  FS::Type typ)
520  : m_interface(new FS::Interface(settings ,typ)) {
521 }
522 
523 }
524 
525 
526 } // namespace SWC
SWC::Env::FsInterface::init
static void init(const SWC::Config::Settings::Ptr &settings, FS::Type typ)
Definition: Interface.cc:513
SWC::FS::FileSystem::Ptr
std::shared_ptr< FileSystem > Ptr
Definition: FileSystem.h:104
SWC::FS::LOCAL
@ LOCAL
Definition: FileSystem.h:23
SWC::FS::Interface::rmdir_incl_opt_subs
void rmdir_incl_opt_subs(int &err, const std::string &name, const std::string &up_to)
Definition: Interface.cc:261
SWC::FS::Interface::to_string
std::string to_string() const
Definition: Interface.cc:128
SWC::Error::SERVER_SHUTTING_DOWN
@ SERVER_SHUTTING_DOWN
Definition: Error.h:84
SWC::Config::Settings::Ptr
std::shared_ptr< Settings > Ptr
Definition: Settings.h:29
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::FS::HADOOP
@ HADOOP
Definition: FileSystem.h:26
SWC::Core::Vector::clear
SWC_CAN_INLINE void clear() noexcept(_NoExceptDestructor)
Definition: Vector.h:120
SWC::Env::FsInterface::FsInterface
FsInterface(const SWC::Config::Settings::Ptr &settings, FS::Type typ)
Definition: Interface.cc:518
SWC::FS::Type
Type
Definition: FileSystem.h:20
SWC::FS::Interface::write
void write(int &err, SmartFd::Ptr smartfd, uint8_t replication, StaticBuffer &buffer)
Definition: Interface.cc:374
SWC::Error::get_text
const char * get_text(const int err) noexcept
Definition: Error.cc:173
SWC::FS::Interface::open
bool open(int &err, SmartFd::Ptr &smartfd)
Definition: Interface.cc:412
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::FS::FileSystemCeph
Definition: FileSystem.h:20
SWC::FS::Interface::rmdir
void rmdir(int &err, const std::string &name)
Definition: Interface.cc:245
SWC::Error::FS_PATH_NOT_FOUND
@ FS_PATH_NOT_FOUND
Definition: Error.h:97
SWC::FS::BROKER
@ BROKER
Definition: FileSystem.h:24
SWC::FS::Interface::remove
void remove(int &err, const std::string &name)
Definition: Interface.cc:289
SWC::FS::Callback::ExistsCb_t
std::function< void(int, bool)> ExistsCb_t
Definition: Callbacks.h:20
SWC::FS::FileSystemLocal
Definition: FileSystem.h:22
SWC::FS::Interface::~Interface
~Interface() noexcept
Definition: Interface.cc:117
SWC::FS::FileSystemBroker
Definition: FileSystem.h:30
SWC::FS::Configurables
Definition: FileSystem.h:48
SWC::FS::Callback::CloseCb_t
std::function< void(int)> CloseCb_t
Definition: Callbacks.h:41
SWC::FS::Interface::stop
void stop()
Definition: Interface.cc:137
SWC::Condition::str_eq
bool str_eq(const char *s1, const char *s2) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: Comparators_basic.h:237
SWC::FS::Interface::LoadedDL::lib
void * lib
Definition: Interface.h:119
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Error::FS_EOF
@ FS_EOF
Definition: Error.h:96
SWC::FS::Interface::get_structured_ids
void get_structured_ids(int &err, const std::string &base_path, IdEntries_t &entries, const std::string &base_id="")
Definition: Interface.cc:141
SWC::FS::Interface::exists
bool exists(int &err, const std::string &name)
Definition: Interface.cc:201
SWC::FS::CEPH
@ CEPH
Definition: FileSystem.h:28
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::FS::FileSystemHadoop
Definition: FileSystem.h:21
SWC::FS::Interface::readdir
void readdir(int &err, const std::string &base_path, DirentList &dirs)
Definition: Interface.cc:183
SWC::FS::ID_SPLIT_LAST
const char ID_SPLIT_LAST
Definition: Interface.h:41
SWC_DSO_EXT
#define SWC_DSO_EXT
Definition: Compat.h:185
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::Error::FS_BAD_FILE_HANDLE
@ FS_BAD_FILE_HANDLE
Definition: Error.h:94
SWC::FS::Interface::create
bool create(int &err, SmartFd::Ptr &smartfd, uint8_t replication)
Definition: Interface.cc:429
SWC::FS::Interface::Interface
Interface(const Config::Settings::Ptr &settings, Type typ)
Definition: Interface.cc:19
SWC::FS::Interface::need_fds
bool need_fds() const noexcept
Definition: Interface.cc:133
SWC::FS::Interface::close
void close(int &err, SmartFd::Ptr &smartfd)
Definition: Interface.cc:446
SWC::FS::ID_SPLIT_LEN
const int ID_SPLIT_LEN
Definition: Interface.h:40
SWC_THROWF
#define SWC_THROWF(_code_, _fmt_,...)
Definition: Exception.h:136
SWC::FS::HADOOP_JVM
@ HADOOP_JVM
Definition: FileSystem.h:27
SWC::FS::CUSTOM
@ CUSTOM
Definition: FileSystem.h:25
SWC::FS::Callback::RemoveCb_t
std::function< void(int)> RemoveCb_t
Definition: Callbacks.h:21
SWC::FS::Interface::rename
void rename(int &err, const std::string &from, const std::string &to)
Definition: Interface.cc:326
SWC::LOG_ERROR
@ LOG_ERROR
Definition: Logger.h:32
SWC::Env::FsInterface::m_env
static Ptr m_env
Definition: Interface.h:169
SWC::FS::Interface::mkdirs
void mkdirs(int &err, const std::string &name)
Definition: Interface.cc:229
fs_make_new_t
SWC::FS::FileSystem * fs_make_new_t(SWC::FS::Configurables *)
Definition: FileSystem.h:249
SWC::Error::CONFIG_BAD_VALUE
@ CONFIG_BAD_VALUE
Definition: Error.h:81
SWC::FS::FileSystemHadoopJVM
Definition: FileSystem.h:21
SWC::format
std::string format(const char *fmt,...) __attribute__((format(printf
Definition: String.cc:17
SWC::Core::Vector
Definition: Vector.h:14
SWC::Condition::from
Comp from(const char **buf, uint32_t *remainp, uint8_t extended=0x00) noexcept
Definition: Comparators.cc:15
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC::FS::Interface::use_filesystem
FileSystem::Ptr use_filesystem(const Config::Settings::Ptr &settings)
Definition: Interface.cc:27
SWC::FS::Interface::length
size_t length(int &err, const std::string &name)
Definition: Interface.cc:355
SWC::FS::Interface::m_fs
FileSystem::Ptr m_fs
Definition: Interface.h:116
SWC::FS::Interface::get_type_underlying
Type get_type_underlying() const noexcept
Definition: Interface.cc:124
SWC::FS::Interface::ptr
SWC_CAN_INLINE Ptr ptr() noexcept
Definition: Interface.h:55
SWC::FS::Interface::read
void read(int &err, const std::string &name, StaticBuffer *dst)
Definition: Interface.cc:394
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::FS::set_structured_id
void set_structured_id(const std::string &number, std::string &s)
Definition: Interface.cc:494
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::FS::Interface::loaded_dl
LoadedDL loaded_dl
Definition: Interface.h:123
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
Interface.h
SWC::Core::Buffer::own
bool own
Definition: Buffer.h:129
SWC::Core::Vector::reserve
SWC_CAN_INLINE void reserve(size_type cap)
Definition: Vector.h:288
SWC::FS::Interface::m_type
Type m_type
Definition: Interface.h:115