SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
FileSystem.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #include "swcdb/fs/FileSystem.h"
8 
9 
10 namespace SWC { namespace FS {
11 
12 
13 std::string normalize_pathname(std::string s) {
14  if(*(--s.cend()) != '/')
15  s.append("/");
16  return s;
17 }
18 
19 Type fs_type(const std::string& fs_name) {
20 
21 #if !defined (FS_BROKER_APP)
22  if(Condition::str_case_eq(fs_name.data(), "broker", fs_name.size()))
23  return Type::BROKER;
24 #endif
25 
26  if(Condition::str_case_eq(fs_name.data(), "local", fs_name.size()))
27  return Type::LOCAL;
28 
29  if(Condition::str_case_eq(fs_name.data(), "hadoop", fs_name.size()))
30  return Type::HADOOP;
31 
32  if(Condition::str_case_eq(fs_name.data(), "hadoop_jvm", fs_name.size()))
33  return Type::HADOOP_JVM;
34 
35  if(Condition::str_case_eq(fs_name.data(), "ceph", fs_name.size()))
36  return Type::CEPH;
37 
38  if(Condition::str_case_eq(fs_name.data(), "custom", fs_name.size()))
39  return Type::CUSTOM;
40 
41  else
43  "Unknown FileSystem name=%s", fs_name.c_str());
44  return Type::UNKNOWN;
45 }
46 
47 const char* to_string(Type typ) noexcept {
48  switch(typ) {
49  case Type::LOCAL:
50  return "local";
51  case Type::BROKER:
52  return "broker";
53  case Type::HADOOP:
54  return "hadoop";
55  case Type::HADOOP_JVM:
56  return "hadoopJVM";
57  case Type::CEPH:
58  return "ceph";
59  case Type::CUSTOM:
60  return "custom";
61  default:
62  return "unknown";
63  }
64 }
65 
66 SWC_SHOULD_NOT_INLINE
67 ImplOptions::ImplOptions(bool has_all) noexcept
68  : async_exists(has_all), async_remove(has_all), async_length(has_all),
69  async_mkdirs(has_all), async_readdir(has_all), async_rmdir(has_all),
70  async_rename(has_all), async_write(has_all), async_readall(has_all),
71  async_combi_pread(has_all), async_create(has_all), async_open(has_all),
72  async_read(has_all), async_pread(has_all), async_append(has_all),
73  async_seek(has_all), async_flush(has_all), async_sync(has_all),
74  async_close(has_all) {
75 }
76 
77 #define ADD_OPTION(_N) \
78  ImplOptions& ImplOptions::add_##_N() noexcept { _N = 1; return *this; }\
79  bool ImplOptions::has_##_N() const noexcept { return _N; }
80 ADD_OPTION(async_exists)
81 ADD_OPTION(async_remove)
82 ADD_OPTION(async_length)
83 ADD_OPTION(async_mkdirs)
84 ADD_OPTION(async_readdir)
85 ADD_OPTION(async_rmdir)
86 ADD_OPTION(async_rename)
87 ADD_OPTION(async_write)
88 ADD_OPTION(async_readall)
89 ADD_OPTION(async_combi_pread)
90 ADD_OPTION(async_create)
91 ADD_OPTION(async_open)
92 ADD_OPTION(async_read)
93 ADD_OPTION(async_pread)
94 ADD_OPTION(async_append)
95 ADD_OPTION(async_seek)
96 ADD_OPTION(async_flush)
97 ADD_OPTION(async_sync)
98 ADD_OPTION(async_close)
99 #undef ADD_OPTION
100 
101 
102 FileSystem::FileSystem(const Configurables* config, ImplOptions impl_opts)
103  : impl_options(impl_opts),
104  path_root(config->path_root.empty()
105  ? "" : normalize_pathname(config->path_root)),
106  path_data(
107  normalize_pathname(config->settings->get_str("swc.fs.path.data"))),
108  settings(config->settings),
109  cfg_fds_max(config->cfg_fds_max), m_run(true),
110  statistics(config->stats_enabled) {
111 }
112 
113 FileSystem::~FileSystem() noexcept { }
114 
115 void FileSystem::stop() {
116  m_run.store(false);
117  if(statistics.fds_count.load())
118  SWC_LOGF(LOG_WARN, "FS %s remained with open-fds=" SWC_FMT_LU,
119  to_string().c_str(), statistics.fds_count.load());
120 }
121 
122 Type FileSystem::get_type() const noexcept {
123  return Type::UNKNOWN;
124 }
125 
126 Type FileSystem::get_type_underlying() const noexcept {
127  return get_type();
128 }
129 
130 std::string FileSystem::to_string() const {
131  return format(
132  "(type=UNKNOWN path_root=%s path_data=%s)",
133  path_root.c_str(),
134  path_data.c_str()
135  );
136 }
137 
138 void FileSystem::get_abspath(const std::string& name, std::string& abspath,
139  size_t reserve) {
140  abspath.reserve(
141  path_root.length() +
142  path_data.length() +
143  (name.empty() ? -1 : name.length()) +
144  reserve
145  );
146  abspath.append(path_root);
147  if(name.empty()) {
148  abspath.append(path_data.substr(0, path_data.length() - 1));
149  } else {
150  abspath.append(path_data);
151  abspath.append(name);
152  }
153 }
154 
155 void FileSystem::fd_open_incr() noexcept {
156  statistics.fds_count.fetch_add(1);
157 }
158 
159 void FileSystem::fd_open_decr() noexcept {
160  statistics.fds_count.fetch_sub(1);
161 }
162 
163 bool FileSystem::need_fds() const noexcept {
164  return statistics.fds_count >= size_t(cfg_fds_max->get());
165 }
166 
167 size_t FileSystem::fds_open() const noexcept {
168  return statistics.fds_count.load();
169 }
170 
171 
172 /* Default functions of an async call,
173  * used if a FileSystem(typ) does not implement
174 */
175 
177  const std::string& name) {
178  int err = Error::OK;
179  bool state = exists(err, name);
180  cb(err, state);
181 }
182 
184  const std::string& name) {
185  int err = Error::OK;
186  remove(err, name);
187  cb(err);
188 }
189 
191  const std::string& name) {
192  int err = Error::OK;
193  size_t len = length(err, name);
194  cb(err, len);
195 }
196 
198  const std::string& name) {
199  int err = Error::OK;
200  mkdirs(err, name);
201  cb(err);
202 }
203 
205  const std::string& name) {
206  int err = Error::OK;
207  DirentList listing;
208  readdir(err, name, listing);
209  cb(err, std::move(listing));
210 }
211 
213  const std::string& name) {
214  int err = Error::OK;
215  rmdir(err, name);
216  cb(err);
217 }
218 
220  const std::string& from, const std::string& to) {
221  int err = Error::OK;
222  rename(err, from, to);
223  cb(err);
224 }
225 
226 void FileSystem::default_write(int& err, SmartFd::Ptr& smartfd,
227  uint8_t replication, StaticBuffer& buffer) {
228  auto tracker = statistics.tracker(Statistics::WRITE_SYNC);
229  SWC_FS_WRITE_START(smartfd, replication, buffer.size);
230 
231  create(err, smartfd, replication);
232  if(!err && !smartfd->valid()) {
233  err = EBADR;
234  }
235  if(!err && buffer.size) {
236  append(err, smartfd, buffer, Flags::FLUSH);
237  }
238  for(int errtmp; smartfd->valid(); ) {
239  close(errtmp=Error::OK, smartfd);
240  }
241 
242  if(!err && smartfd->flags() & OpenFlags::WRITE_VALIDATE_LENGTH &&
243  length(err, smartfd->filepath()) != buffer.size && !err) {
244  err = Error::FS_EOF;
245  }
246 
247  SWC_FS_WRITE_FINISH(err, smartfd, tracker);
248 }
249 
251  uint8_t replication, StaticBuffer&& buffer) {
252  int err = Error::OK;
253  write(err, smartfd, replication, buffer);
254  cb(err, std::move(buffer));
255 }
256 
257 void FileSystem::default_read(int& err, const std::string& name,
258  StaticBuffer* buffer) {
259  auto tracker = statistics.tracker(Statistics::READ_ALL_SYNC);
260  SWC_FS_READALL_START(name);
261 
262  size_t len;
263  FS::SmartFd::Ptr smartfd;
264 
265  if(!exists(err, name)) {
266  if(!err)
268  len = 0;
269  goto finish;
270  }
271  len = length(err, name);
272  if(err)
273  goto finish;
274 
275  smartfd = FS::SmartFd::make_ptr(name, 0);
276 
277  open(err, smartfd);
278  if(!err && !smartfd->valid()) {
279  err = EBADR;
280  }
281  if(!err) {
282  buffer->free();
283  if(read(err, smartfd, buffer, len) != len) {
284  err = Error::FS_EOF;
285  buffer->free();
286  }
287  }
288  for(int errtmp; smartfd->valid(); ) {
289  close(errtmp=Error::OK, smartfd);
290  }
291 
292  finish:
293  SWC_FS_READALL_FINISH(err, name, len, tracker);
294 }
295 
297  const std::string& name) {
298  int err = Error::OK;
299  StaticBuffer dst;
300  read(err, name, &dst);
301  cb(err, std::move(dst));
302 }
303 
304 void FileSystem::default_combi_pread(int& err, SmartFd::Ptr& smartfd,
305  uint64_t offset, uint32_t amount,
306  StaticBuffer* buffer) {
307  auto tracker = statistics.tracker(Statistics::COMBI_PREAD_SYNC);
308  SWC_FS_COMBI_PREAD_START(smartfd, offset, amount);
309 
310  open(err, smartfd);
311  if(!err && !smartfd->valid()) {
312  err = EBADR;
313  }
314  if(!err) {
315  buffer->free();
316  if(pread(err, smartfd, offset, buffer, amount) != amount) {
317  err = Error::FS_EOF;
318  buffer->free();
319  }
320  }
321  for(int errtmp; smartfd->valid(); ) {
322  close(errtmp=Error::OK, smartfd);
323  }
324 
325  SWC_FS_COMBI_PREAD_FINISH(err, smartfd, amount, tracker);
326 }
327 
329  SmartFd::Ptr& smartfd,
330  uint64_t offset, uint32_t amount) {
331  int err = Error::OK;
332  StaticBuffer dst;
333  combi_pread(err, smartfd, offset, amount, &dst);
334  cb(err, std::move(dst));
335 }
336 
338  uint8_t replication) {
339  int err = Error::OK;
340  create(err, smartfd, replication);
341  cb(err);
342 }
343 
345  int err = Error::OK;
346  open(err, smartfd);
347  cb(err);
348 }
349 
350 size_t FileSystem::default_read(int& err, SmartFd::Ptr& smartfd,
351  StaticBuffer* dst, size_t amount) {
352  if(!dst->size)
353  dst->reallocate(amount);
354  return read(err, smartfd, dst->base, amount);
355 }
356 
358  size_t amount) {
359  int err = Error::OK;
360  StaticBuffer dst;
361  read(err, smartfd, &dst, amount);
362  cb(err, std::move(dst));
363 }
364 
365 size_t FileSystem::default_pread(int& err, SmartFd::Ptr& smartfd,
366  uint64_t offset,
367  StaticBuffer* dst, size_t amount) {
368  if(!dst->size)
369  dst->reallocate(amount);
370  return pread(err, smartfd, offset, dst->base, amount);
371 }
372 
374  uint64_t offset, size_t amount) {
375  int err = Error::OK;
376  StaticBuffer dst;
377  pread(err, smartfd, offset, &dst, amount);
378  cb(err, std::move(dst));
379 }
380 
382  StaticBuffer& buffer, Flags flags) {
383  int err = Error::OK;
384  size_t len = append(err, smartfd, buffer, flags);
385  cb(err, len);
386 }
387 
389  size_t offset) {
390  int err = Error::OK;
391  seek(err, smartfd, offset);
392  cb(err);
393 }
394 
396  int err = Error::OK;
397  flush(err, smartfd);
398  cb(err);
399 }
400 
402  int err = Error::OK;
403  sync(err, smartfd);
404  cb(err);
405 }
406 
408  int err = Error::OK;
409  close(err, smartfd);
410  cb(err);
411 }
412 
413 
414 }}
SWC::Comm::Protocol::FsBroker::Handler::close
void close(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Close.h:17
SWC::FS::Callback::OpenCb_t
std::function< void(int)> OpenCb_t
Definition: Callbacks.h:34
SWC::Comm::Protocol::FsBroker::Handler::seek
void seek(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Seek.h:17
SWC::FS::ADD_OPTION
ADD_OPTION(async_exists) ADD_OPTION(async_remove) ADD_OPTION(async_length) ADD_OPTION(async_mkdirs) ADD_OPTION(async_readdir) ADD_OPTION(async_rmdir) ADD_OPTION(async_rename) ADD_OPTION(async_write) ADD_OPTION(async_readall) ADD_OPTION(async_combi_pread) ADD_OPTION(async_create) ADD_OPTION(async_open) ADD_OPTION(async_read) ADD_OPTION(async_pread) ADD_OPTION(async_append) ADD_OPTION(async_seek) ADD_OPTION(async_flush) ADD_OPTION(async_sync) ADD_OPTION(async_close) FileSystem
Definition: FileSystem.cc:80
FileSystem.h
SWC::FS::Callback::WriteCb_t
std::function< void(int, StaticBuffer &&)> WriteCb_t
Definition: Callbacks.h:29
SWC::FS::fs_type
Type fs_type(const std::string &fs_name)
Definition: FileSystem.cc:19
SWC::FS::Callback::PreadCb_t
std::function< void(int, StaticBuffer &&)> PreadCb_t
Definition: Callbacks.h:36
SWC::FS::LOCAL
@ LOCAL
Definition: FileSystem.h:23
SWC::Core::Buffer::reallocate
SWC_CAN_INLINE void reallocate(size_t len)
Definition: Buffer.h:92
SWC::Comm::Protocol::FsBroker::Handler::rmdir
void rmdir(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Rmdir.h:17
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::FS::Callback::CombiPreadCb_t
std::function< void(int, StaticBuffer &&)> CombiPreadCb_t
Definition: Callbacks.h:31
SWC::FS::HADOOP
@ HADOOP
Definition: FileSystem.h:26
SWC::FS::Callback::ReadCb_t
std::function< void(int, StaticBuffer &&)> ReadCb_t
Definition: Callbacks.h:35
SWC::FS::Type
Type
Definition: FileSystem.h:20
SWC::FS::WRITE_VALIDATE_LENGTH
@ WRITE_VALIDATE_LENGTH
Definition: FileSystem.h:37
SWC::Comm::Protocol::FsBroker::Handler::exists
void exists(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Exists.h:17
SWC::FS::Callback::LengthCb_t
std::function< void(int, size_t)> LengthCb_t
Definition: Callbacks.h:22
SWC::FS::Callback::SyncCb_t
std::function< void(int)> SyncCb_t
Definition: Callbacks.h:40
SWC::FS::SmartFd::make_ptr
static SWC_CAN_INLINE Ptr make_ptr(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
Definition: SmartFd.h:40
SWC::FS::Flags
Flags
Definition: FileSystem.h:41
SWC::Error::FS_PATH_NOT_FOUND
@ FS_PATH_NOT_FOUND
Definition: Error.h:97
SWC::FS::BROKER
@ BROKER
Definition: FileSystem.h:24
SWC::Common::Files::RgrData::read
static void read(DB::RgrData &data, int &err, const std::string &filepath)
Definition: RgrData.h:27
SWC::FS::Callback::ExistsCb_t
std::function< void(int, bool)> ExistsCb_t
Definition: Callbacks.h:20
SWC_FS_COMBI_PREAD_FINISH
#define SWC_FS_COMBI_PREAD_FINISH(_error, _smartfd, _amount, _tracker)
Definition: Logger.h:207
SWC::FS::Configurables
Definition: FileSystem.h:48
SWC_FS_COMBI_PREAD_START
#define SWC_FS_COMBI_PREAD_START(_smartfd, _offset, _amount)
Definition: Logger.h:201
SWC::FS::Callback::CloseCb_t
std::function< void(int)> CloseCb_t
Definition: Callbacks.h:41
SWC::FS::Callback::RenameCb_t
std::function< void(int)> RenameCb_t
Definition: Callbacks.h:27
SWC::FS::ImplOptions
Definition: FileSystem.h:66
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Comm::Protocol::FsBroker::Handler::create
void create(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Create.h:18
SWC::Comm::Protocol::FsBroker::Handler::sync
void sync(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Sync.h:17
SWC::Comm::Protocol::FsBroker::Handler::mkdirs
void mkdirs(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Mkdirs.h:17
SWC::Error::FS_EOF
@ FS_EOF
Definition: Error.h:96
SWC::Comm::Protocol::FsBroker::Handler::readdir
void readdir(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Readdir.h:17
SWC::DB::Cell::Serial::Value::UNKNOWN
@ UNKNOWN
Definition: CellValueSerialField.h:34
SWC::FS::CEPH
@ CEPH
Definition: FileSystem.h:28
SWC::Comm::Protocol::FsBroker::Handler::open
void open(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Open.h:17
SWC::Core::Buffer::base
value_type * base
Definition: Buffer.h:131
SWC_FS_WRITE_START
#define SWC_FS_WRITE_START(_smartfd, _replication, _amount)
Definition: Logger.h:156
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Comm::Protocol::FsBroker::Handler::pread
void pread(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Pread.h:17
SWC::Common::Files::Schema::remove
void remove(int &err, cid_t cid)
Definition: Schema.h:44
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::Core::Buffer::size
size_t size
Definition: Buffer.h:130
SWC::FS::Callback::ReadAllCb_t
std::function< void(int, StaticBuffer &&)> ReadAllCb_t
Definition: Callbacks.h:30
SWC_FS_READALL_FINISH
#define SWC_FS_READALL_FINISH(_error, _name, _amount, _tracker)
Definition: Logger.h:120
SWC_THROWF
#define SWC_THROWF(_code_, _fmt_,...)
Definition: Exception.h:136
SWC::Comm::Protocol::FsBroker::Handler::length
void length(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Length.h:17
SWC::FS::HADOOP_JVM
@ HADOOP_JVM
Definition: FileSystem.h:27
SWC::FS::CUSTOM
@ CUSTOM
Definition: FileSystem.h:25
SWC::FS::Callback::ReaddirCb_t
std::function< void(int, DirentList &&)> ReaddirCb_t
Definition: Callbacks.h:25
SWC::FS::Callback::RemoveCb_t
std::function< void(int)> RemoveCb_t
Definition: Callbacks.h:21
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::FS::Callback::RmdirCb_t
std::function< void(int)> RmdirCb_t
Definition: Callbacks.h:26
SWC::Core::Buffer::free
SWC_CAN_INLINE void free() noexcept
Definition: Buffer.h:85
SWC::FS::ImplOptions::ImplOptions
ImplOptions(bool has_all=false) noexcept
Definition: FileSystem.cc:67
SWC_FS_WRITE_FINISH
#define SWC_FS_WRITE_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:163
SWC::Error::CONFIG_BAD_VALUE
@ CONFIG_BAD_VALUE
Definition: Error.h:81
SWC::format
std::string format(const char *fmt,...) __attribute__((format(printf
Definition: String.cc:17
SWC::FS::Callback::CreateCb_t
std::function< void(int)> CreateCb_t
Definition: Callbacks.h:33
SWC::Core::Vector< Dirent >
SWC::Condition::from
Comp from(const char **buf, uint32_t *remainp, uint8_t extended=0x00) noexcept
Definition: Comparators.cc:15
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC::FS::FileSystem::FileSystem
FileSystem(const Configurables *config, ImplOptions impl_opts)
SWC::FS::normalize_pathname
std::string normalize_pathname(std::string s)
Definition: FileSystem.cc:13
SWC::FS::Callback::FlushCb_t
std::function< void(int)> FlushCb_t
Definition: Callbacks.h:39
SWC::Comm::Protocol::FsBroker::Handler::rename
void rename(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Rename.h:17
SWC::Comm::Protocol::FsBroker::Handler::flush
void flush(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Flush.h:17
SWC::Common::Files::Schema::write
void write(SWC::DynamicBuffer &dst_buf, const DB::Schema::Ptr &schema)
Definition: Schema.h:50
SWC::FS::FLUSH
@ FLUSH
Definition: FileSystem.h:43
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::Comm::Protocol::FsBroker::Handler::combi_pread
void combi_pread(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: CombiPread.h:17
SWC::FS::Callback::SeekCb_t
std::function< void(int)> SeekCb_t
Definition: Callbacks.h:38
flags
uint8_t flags
Flags.
Definition: Header.h:55
SWC::Comm::Protocol::FsBroker::Handler::append
void append(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Append.h:17
SWC::Core::to_string
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Definition: BitFieldInt.h:263
SWC::FS::Callback::AppendCb_t
std::function< void(int, size_t)> AppendCb_t
Definition: Callbacks.h:37
SWC::Condition::str_case_eq
bool str_case_eq(const char *s1, const char *s2, size_t count) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: Comparators_basic.h:257
SWC_FS_READALL_START
#define SWC_FS_READALL_START(_name)
Definition: Logger.h:115
SWC::FS::Callback::MkdirsCb_t
std::function< void(int)> MkdirsCb_t
Definition: Callbacks.h:24
SWC::FS::to_string
const char *SWC_CONST_FUNC to_string(Type typ) noexcept
Definition: FileSystem.cc:47