SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
FileSystem.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 
9 extern "C" {
10 #include <fcntl.h>
11 }
12 
13 #include "hdfspp/config_parser.h"
14 
15 
16 namespace SWC { namespace FS {
17 
18 
20  config->settings->file_desc.add_options()
21  ("swc.fs.hadoop.path.root", Config::str(""),
22  "Hadoop FileSystem's base root path")
23  ("swc.fs.hadoop.cfg.dyn", Config::strs(),
24  "Dyn-config file")
25 
26  ("swc.fs.hadoop.namenode", Config::strs(),
27  "Namenode Host + optional(:Port), muliple")
28  ("swc.fs.hadoop.namenode.port", Config::i16(),
29  "Namenode Port")
30  ("swc.fs.hadoop.user", Config::str(),
31  "Hadoop user")
32 
33  ("swc.fs.hadoop.concurrency.relative", Config::boo(true),
34  "Determined ratio by HW-Concurrency")
35  ("swc.fs.hadoop.handlers", Config::i32(6),
36  "Number or HW-Concurrency base of Handlers for hadoop tasks")
37 
38  ("swc.fs.hadoop.metrics.enabled", Config::boo(true),
39  "Enable or Disable Metrics Tracking")
40 
41  ("swc.fs.hadoop.fds.max", Config::g_i32(256),
42  "Max Open Fds for opt. without closing")
43  ;
44 
45  config->settings->parse_file(
46  config->settings->get_str("swc.fs.hadoop.cfg", ""),
47  "swc.fs.hadoop.cfg.dyn"
48  );
49 
50  config->path_root = config->settings->get_str(
51  "swc.fs.hadoop.path.root");
52  config->cfg_fds_max = config->settings
53  ->get<Config::Property::Value_int32_g>("swc.fs.hadoop.fds.max");
54  config->stats_enabled = config->settings->get_bool(
55  "swc.fs.hadoop.metrics.enabled");
56  return config;
57 }
58 
59 
60 
63  const std::string& filepath, uint32_t flags) {
65 }
66 
70  smart_fd->filepath(), smart_fd->flags(),
71  smart_fd->fd(), smart_fd->pos()
72  ));
73 }
74 
76  const std::string& filepath, uint32_t flags, int32_t fd, uint64_t pos)
77  : SmartFd(filepath, flags, fd, pos), m_file(nullptr) {
78 }
79 
81 
82 hdfs::FileHandle* FileSystemHadoop::SmartFdHadoop::file() const {
83  return m_file;
84 }
85 
86 void FileSystemHadoop::SmartFdHadoop::file(hdfs::FileHandle* file) {
87  m_file = file;
88 }
89 
90 
91 
94  auto hd_fd = std::dynamic_pointer_cast<SmartFdHadoop>(smartfd);
95  if(!hd_fd){
96  hd_fd = SmartFdHadoop::make_ptr(smartfd);
97  smartfd = std::static_pointer_cast<SmartFd>(hd_fd);
98  }
99  return hd_fd;
100 }
101 
102 
104  : FileSystem(apply_hadoop(config), ImplOptions()),
105  m_nxt_fd(0), m_connecting(false),
107 }
108 
110 
112  return Type::HADOOP;
113 }
114 
115 std::string FileSystemHadoop::to_string() const {
116  return format(
117  "(type=HADOOP path_root=%s path_data=%s)",
118  path_root.c_str(),
119  path_data.c_str()
120  );
121 }
122 
124  m_run.store(false);
125  {
127  m_fs = nullptr;
128  m_cv.notify_all();
129  }
131 }
132 
135  Service::Ptr fs;
136  uint32_t tries=0;
137  while(m_run && !initialize(fs)) {
139  "FS-Hadoop, unable to initialize connection to hadoop, try=%u",
140  ++tries);
141  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
142  }
143  if(!fs)
144  return fs;
145 
146  //std::string abspath;
147  //get_abspath("", abspath);
148  //hdfsSetWorkingDirectory(fs->srv, abspath.c_str());
149 
150  /*
151  char* host;
152  uint16_t port;
153  hdfsConfGetStr("hdfs.namenode.host", &host);
154  hdfsConfGetInt("hdfs.namenode.port", &port);
155  SWC_LOGF(LOG_INFO,
156  "FS-HadoopJV<, connected to namenode=[%s]:%d", host, port);
157  hdfsConfStrFree(host);
158  */
159 
160  // status check
161  //char buffer[256];
162  //hdfsGetWorkingDirectory(fs->srv, buffer, 256);
163  //SWC_LOGF(LOG_DEBUG, "FS-Hadoop, working Dir='%s'", buffer);
164  return fs;
165 }
166 
168 
169  hdfs::ConfigParser parser;
170  if(!parser.LoadDefaultResources())
171  SWC_LOG_FATAL("hdfs::ConfigParser could not load default resources.");
172 
173  auto stats = parser.ValidateResources();
174  for(auto& s : stats) {
175  SWC_PRINT << s.first << "=" << s.second.ToString() << SWC_PRINT_CLOSE;
176  SWC_ASSERT(s.second.ok());
177  }
178  SWC_PRINT << " fs.defaultFS=" << parser.get_string_or("fs.defaultFS", "NONE") << SWC_PRINT_CLOSE;
179 
180  hdfs::Options options;
181  if(!parser.get_options(options))
182  SWC_LOG_FATAL("hdfs::ConfigParser could not load Options object.");
183 
184  hdfs::IoService* io_service = hdfs::IoService::New();
185  io_service->InitWorkers(
187  settings->get_bool("swc.fs.hadoop.concurrency.relative"),
188  settings->get_i32("swc.fs.hadoop.handlers")
189  )
190  );
191 
192  hdfs::FileSystem* connection = hdfs::FileSystem::New(
193  io_service,
194  settings->get_str("swc.fs.hadoop.user", ""),
195  options
196  );
197  SWC_PRINT << "hdfs::FileSystem Initialized" << SWC_PRINT_CLOSE;
198 
199  hdfs::FsInfo fs_info;
200  hdfs::Status status;
201  std::string port;
202  if(settings->has("swc.fs.hadoop.namenode")) {
203  for(auto& h : settings->get_strs("swc.fs.hadoop.namenode")) {
204  port = std::to_string(settings->get_i16(
205  "swc.fs.hadoop.namenode.port", 0));
206 
207  SWC_LOGF(LOG_DEBUG, "FS-Hadoop, Connecting to namenode=[%s]:%s",
208  h.c_str(), port.c_str());
209 
210  status = connection->Connect(h, port);
211  if(status.ok()) {
212  status = connection->GetFsStats(fs_info);
213  if(status.ok())
214  break;
215  }
216  SWC_PRINT << "FS-Hadoop, Could not connect to " << h << ":" << port
217  << ". " << status.ToString() << SWC_PRINT_CLOSE;
218  }
219 
220  } else {
221  SWC_LOGF(LOG_DEBUG, "FS-Hadoop, connecting to default namenode=%s", options.defaultFS.get_path().c_str());
222  status = connection->ConnectToDefaultFs();
223  if(status.ok())
224  status = connection->GetFsStats(fs_info);
225  if(!status.ok())
226  SWC_PRINT << "FS-Hadoop, Could not connect to " << options.defaultFS
227  << ". " << status.ToString() << SWC_PRINT_CLOSE;
228  }
229 
230  if(status.ok()) {
231  SWC_LOGF(LOG_INFO, "%s", fs_info.str("FS-Hadoop").c_str());
232  fs.reset(new Service(connection));
233  } else {
234  delete connection;
235  }
236 
237  return bool(fs);
238 }
239 
241  if(m_run && !m_fs) {
242  bool connect = false;
243  {
244  Core::UniqueLock lock_wait(m_mutex);
245  if(m_connecting) {
246  m_cv.wait(lock_wait, [this](){ return !m_connecting || !m_run; });
247  } else {
248  connect = m_connecting = true;
249  }
250  }
251  if(connect) {
252  auto fs = setup_connection();
254  m_fs = fs;
255  m_connecting = false;
256  m_cv.notify_all();
257  }
258  }
259 
260  auto fs = m_fs;
261  err = m_run
264  return fs;
265 }
266 
268  int& err, FileSystemHadoop::Service::Ptr& fs) {
269  if(err == 255) {
271  if(m_fs == fs)
272  m_fs = nullptr;
273  }
274  // ? org.apache.hadoop.ipc.StandbyException
275 }
276 
277 
278 bool FileSystemHadoop::exists(int& err, const std::string& name) {
280  SWC_FS_EXISTS_START(name);
281  std::string abspath;
282  get_abspath(name, abspath);
283  bool state = false;
284  auto fs = get_fs(err);
285  if(!err) {
286  errno = 0;
287  state = false;
288  need_reconnect(err = errno == ENOENT ? Error::OK : errno, fs);
289  }
290  SWC_FS_EXISTS_FINISH(err, abspath, state, tracker);
291  return state;
292 }
293 
294 void FileSystemHadoop::remove(int& err, const std::string& name) {
296  SWC_FS_REMOVE_START(name);
297  std::string abspath;
298  get_abspath(name, abspath);
299 
300  auto fs = get_fs(err);
301  if(!err) {
302  errno = 0;
303  if (true) { // hdfsDelete(fs->srv, abspath.c_str(), false) == -1) {
304  err = (errno == EIO || errno == ENOENT ? Error::OK: errno);
305  need_reconnect(err, fs);
306  }
307  }
308  SWC_FS_REMOVE_FINISH(err, abspath, tracker);
309 }
310 
311 size_t FileSystemHadoop::length(int& err, const std::string& name) {
313  SWC_FS_LENGTH_START(name);
314  std::string abspath;
315  get_abspath(name, abspath);
316  //hdfsFileInfo *fileInfo;
317  size_t len = 0;
318 
319  auto fs = get_fs(err);
320  if(!err) {
321  errno = 0;
322  if(true) {
323  need_reconnect(err = errno, fs);
324  } else {
325  //len = fileInfo->mSize;
326  //hdfsFreeFileInfo(fileInfo, 1);
327  }
328  }
329  SWC_FS_LENGTH_FINISH(err, abspath, len, tracker);
330  return len;
331 }
332 
333 void FileSystemHadoop::mkdirs(int& err, const std::string& name) {
335  SWC_FS_MKDIRS_START(name);
336  std::string abspath;
337  get_abspath(name, abspath);
338 
339  auto fs = get_fs(err);
340  if(!err) {
341  errno = 0;
342  if(false) //hdfsCreateDirectory(fs->srv, abspath.c_str()) == -1)
343  need_reconnect(err = errno, fs);
344  }
345  SWC_FS_MKDIRS_FINISH(err, abspath, tracker);
346 }
347 
348 void FileSystemHadoop::readdir(int& err, const std::string& name,
349  DirentList& results) {
351  SWC_FS_READDIR_START(name);
352  std::string abspath;
353  get_abspath(name, abspath);
354  (void)results;
355  /*
356  hdfsFileInfo *fileInfo;
357  int numEntries;
358 
359  auto fs = get_fs(err);
360  if(!err) {
361  errno = 0;
362  if (!(fileInfo = hdfsListDirectory(
363  fs->srv, abspath.c_str(), &numEntries))) {
364  need_reconnect(err = errno, fs);
365 
366  } else {
367  for (int i=0; i<numEntries; ++i) {
368  if (fileInfo[i].mName[0] == '.' || !fileInfo[i].mName[0])
369  continue;
370  auto& entry = results.emplace_back();
371  const char *ptr;
372  if((ptr = strrchr(fileInfo[i].mName, '/')))
373  entry.name = (std::string)(ptr+1);
374  else
375  entry.name = (std::string)fileInfo[i].mName;
376 
377  entry.length = fileInfo[i].mSize;
378  entry.last_modification_time = fileInfo[i].mLastMod;
379  entry.is_dir = fileInfo[i].mKind == kObjectKindDirectory;
380  }
381  hdfsFreeFileInfo(fileInfo, numEntries);
382  }
383  }
384  */
385  SWC_FS_READDIR_FINISH(err, abspath, results.size(), tracker);
386 }
387 
388 void FileSystemHadoop::rmdir(int& err, const std::string& name) {
389  auto tracker = statistics.tracker(Statistics::RMDIR_SYNC);
390  SWC_FS_RMDIR_START(name);
391  std::string abspath;
392  get_abspath(name, abspath);
393 
394  auto fs = get_fs(err);
395  if(!err) {
396  errno = 0;
397  if(false) { // hdfsDelete(fs->srv, abspath.c_str(), true) == -1) {
398  err = errno == EIO ? ENOENT: errno;
399  need_reconnect(err, fs);
400  }
401  }
402  SWC_FS_RMDIR_FINISH(err == ENOENT? Error::OK : err, abspath, tracker);
403 }
404 
405 void FileSystemHadoop::rename(int& err, const std::string& from,
406  const std::string& to) {
409  std::string abspath_from;
410  get_abspath(from, abspath_from);
411  std::string abspath_to;
412  get_abspath(to, abspath_to);
413 
414  auto fs = get_fs(err);
415  if(!err) {
416  errno = 0;
417  if(false) // hdfsRename(fs->srv, abspath_from.c_str(), abspath_to.c_str()) == -1)
418  need_reconnect(err = errno == EIO ? ENOENT : errno, fs);
419  }
420  SWC_FS_RENAME_FINISH(err, abspath_from, abspath_to, tracker);
421 }
422 
423 void FileSystemHadoop::create(int& err, SmartFd::Ptr& smartfd,
424  uint8_t replication) {
426  SWC_FS_CREATE_START(smartfd, replication);
427  std::string abspath;
428  get_abspath(smartfd->filepath(), abspath);
429 
430  int oflags = O_WRONLY;
431  if(!(smartfd->flags() & OpenFlags::OPEN_FLAG_OVERWRITE))
432  oflags |= O_APPEND;
433 
434  int tmperr;
435 
436  auto fs = get_fs(err);
437  if(!(tmperr = err)) {
438  auto hadoop_fd = get_fd(smartfd);
439  errno = 0;
440  /* Open the file */
441  hdfs::FileHandle* fd = nullptr;
442  (void)oflags;
443  // fd = hdfsOpenFile(fs->srv, abspath.c_str(), oflags, replication, 0);
444  if(!fd) {
445  need_reconnect(tmperr = errno, fs);
446  hadoop_fd->file(fd);
447  hadoop_fd->fd(-1);
448 
449  if(tmperr == EACCES || tmperr == ENOENT)
451  else if (tmperr == EPERM)
453  else
454  err = tmperr;
455  } else {
456  hadoop_fd->file(fd);
457  hadoop_fd->fd(m_nxt_fd.add_rslt(1));
458  fd_open_incr();
459  }
460  }
461  SWC_FS_CREATE_FINISH(tmperr, smartfd, fds_open(), tracker);
462 }
463 
464 void FileSystemHadoop::open(int& err, SmartFd::Ptr& smartfd) {
465  auto tracker = statistics.tracker(Statistics::OPEN_SYNC);
466  SWC_FS_OPEN_START(smartfd);
467  std::string abspath;
468  get_abspath(smartfd->filepath(), abspath);
469  int oflags = O_RDONLY;
470  int tmperr;
471 
472  auto fs = get_fs(err);
473  if(!(tmperr = err)) {
474  auto hadoop_fd = get_fd(smartfd);
475  errno = 0;
476  /* Open the file */
477  hdfs::FileHandle* fd = nullptr;
478  (void)oflags;
479  //fd = hdfsOpenFile(fs->srv, abspath.c_str(), oflags, 0, 0, 0);
480  if(!fd) {
481  need_reconnect(tmperr = errno, fs);
482  hadoop_fd->file(fd);
483  hadoop_fd->fd(-1);
484 
485  if(tmperr == EACCES || tmperr == ENOENT)
487  else if (tmperr == EPERM)
489  else
490  err = tmperr;
491  } else {
492  hadoop_fd->file(fd);
493  hadoop_fd->fd(m_nxt_fd.add_rslt(1));
494  fd_open_incr();
495  }
496  }
497  SWC_FS_OPEN_FINISH(tmperr, smartfd, fds_open(), tracker);
498 }
499 
500 size_t FileSystemHadoop::read(int& err, SmartFd::Ptr& smartfd,
501  void *dst, size_t amount) {
502  auto tracker = statistics.tracker(Statistics::READ_SYNC);
503  auto hadoop_fd = get_fd(smartfd);
504  SWC_FS_READ_START(hadoop_fd, amount);
505  size_t ret = 0;
506 
507  auto fs = get_fs(err);
508  if(!err) {
509  (void)dst;
510  errno = 0;
511  ssize_t nread = -1; //(ssize_t)hdfsRead(fs->srv, hadoop_fd->file(),
512  // dst, (tSize)amount);
513  if(nread == -1) {
514  nread = 0;
515  need_reconnect(err = errno, fs);
516  } else {
517  if((ret = nread) != amount)
518  err = Error::FS_EOF;
519  hadoop_fd->forward(nread);
520  }
521  }
522  SWC_FS_READ_FINISH(err, hadoop_fd, ret, tracker);
523  return ret;
524 }
525 
526 size_t FileSystemHadoop::pread(int& err, SmartFd::Ptr& smartfd,
527  uint64_t offset, void *dst, size_t amount) {
528  auto tracker = statistics.tracker(Statistics::PREAD_SYNC);
529  auto hadoop_fd = get_fd(smartfd);
530  SWC_FS_PREAD_START(hadoop_fd, offset, amount);
531  size_t ret = 0;
532 
533  auto fs = get_fs(err);
534  if(!err) {
535  errno = 0;
536  (void)dst;
537  ssize_t nread = -1; //(ssize_t)hdfsPread(
538  // fs->srv, hadoop_fd->file(), (tOffset)offset, dst, (tSize)amount);
539  if(nread == -1) {
540  nread = 0;
541  need_reconnect(err = errno, fs);
542  } else {
543  if((ret = nread) != amount)
544  err = Error::FS_EOF;
545  hadoop_fd->pos(offset + nread);
546  }
547  }
548  SWC_FS_PREAD_FINISH(err, hadoop_fd, ret, tracker);
549  return ret;
550 }
551 
552 size_t FileSystemHadoop::append(int& err, SmartFd::Ptr& smartfd,
553  StaticBuffer& buffer, Flags flags) {
555  auto hadoop_fd = get_fd(smartfd);
556  SWC_FS_APPEND_START(hadoop_fd, buffer.size, flags);
557  ssize_t nwritten = 0;
558 
559  auto fs = get_fs(err);
560  if(!err) {
561  /*
562  uint64_t offset;
563  if ((offset = (uint64_t)hdfsTell(fs->srv, hadoop_fd->file()))
564  == (uint64_t)-1) {
565  err = errno;
566  SWC_LOGF(LOG_ERROR, "write, tell failed: %d(%s), %s offset=" SWC_FMT_LU,
567  err, Error::get_text(err), smartfd->to_string().c_str(), offset);
568  return nwritten;
569  }
570  */
571  errno = 0;
572  if(true) { //(nwritten = (ssize_t)hdfsWrite(fs->srv, hadoop_fd->file(),
573  // buffer.base, (tSize)buffer.size)) == -1) {
574  nwritten = 0;
575  need_reconnect(err = errno, fs);
576  } else {
577  hadoop_fd->forward(nwritten);
578 
579  if(flags == Flags::FLUSH || flags == Flags::SYNC) {
580  if(true) { //hdfsFlush(fs->srv, hadoop_fd->file()) == -1) {
581  need_reconnect(err = errno, fs);
582  }
583  }
584  }
585  }
586  SWC_FS_APPEND_FINISH(err, hadoop_fd, nwritten, tracker);
587  return nwritten;
588 }
589 
590 void FileSystemHadoop::seek(int& err, SmartFd::Ptr& smartfd,
591  size_t offset) {
592  auto tracker = statistics.tracker(Statistics::SEEK_SYNC);
593  auto hadoop_fd = get_fd(smartfd);
594  SWC_FS_SEEK_START(hadoop_fd, offset);
595 
596  auto fs = get_fs(err);
597  if(!err) {
598  errno = 0;
599  if(true)//hdfsSeek(fs->srv, hadoop_fd->file(), (tOffset)offset) == -1)
600  need_reconnect(err = errno, fs);
601  else
602  hadoop_fd->pos(offset);
603  }
604  SWC_FS_SEEK_FINISH(err, hadoop_fd, tracker);
605 }
606 
607 void FileSystemHadoop::flush(int& err, SmartFd::Ptr& smartfd) {
608  auto tracker = statistics.tracker(Statistics::FLUSH_SYNC);
609  auto hadoop_fd = get_fd(smartfd);
610  SWC_FS_FLUSH_START(hadoop_fd);
611 
612  auto fs = get_fs(err);
613  if(!err) {
614  errno = 0;
615  if(true)//hdfsHFlush(fs->srv, hadoop_fd->file()) == -1)
616  need_reconnect(err = errno, fs);
617  }
618  SWC_FS_FLUSH_FINISH(err, hadoop_fd, tracker);
619 }
620 
621 void FileSystemHadoop::sync(int& err, SmartFd::Ptr& smartfd) {
622  auto tracker = statistics.tracker(Statistics::SYNC_SYNC);
623  auto hadoop_fd = get_fd(smartfd);
624  SWC_FS_SYNC_START(hadoop_fd);
625 
626  auto fs = get_fs(err);
627  if(!err) {
628  errno = 0;
629  if(true)//hdfsHSync(fs->srv, hadoop_fd->file()) == -1)
630  need_reconnect(err = errno, fs);
631  }
632  SWC_FS_SYNC_FINISH(err, hadoop_fd, tracker);
633 }
634 
635 void FileSystemHadoop::close(int& err, SmartFd::Ptr& smartfd) {
636  auto tracker = statistics.tracker(Statistics::CLOSE_SYNC);
637  auto hadoop_fd = get_fd(smartfd);
638  SWC_FS_CLOSE_START(hadoop_fd);
639 
640  if(hadoop_fd->file()) {
641  fd_open_decr();
642  auto fs = get_fs(err);
643  if(!err) {
644  errno = 0;
645  if(true)//hdfsCloseFile(fs->srv, hadoop_fd->file()) == -1)
646  need_reconnect(err = errno, fs);
647  hadoop_fd->file(nullptr);
648  }
649  } else {
650  err = EBADR;
651  }
652  hadoop_fd->fd(-1);
653  hadoop_fd->pos(0);
654 
655  SWC_FS_CLOSE_FINISH(err, hadoop_fd, tracker);
656 }
657 
658 
659 
660 
661 
662 
663 
664 }} // namespace SWC
665 
666 
667 
668 extern "C" {
670  return static_cast<SWC::FS::FileSystem*>(
671  new SWC::FS::FileSystemHadoop(config));
672 }
673 }
SWC_FS_FLUSH_FINISH
#define SWC_FS_FLUSH_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:246
SWC::Config::i32
Property::Value_int32::Ptr i32(const int32_t &v)
Definition: PropertiesParser.cc:33
SWC_FS_APPEND_START
#define SWC_FS_APPEND_START(_smartfd, _amount, _flags)
Definition: Logger.h:217
SWC::FS::Statistics::FLUSH_SYNC
@ FLUSH_SYNC
Definition: Statistics.h:42
SWC::FS::FileSystemHadoop::mkdirs
void mkdirs(int &err, const std::string &name) override
Definition: FileSystem.cc:333
SWC::Config::g_i32
Property::Value_int32_g::Ptr g_i32(const int32_t &v)
Definition: PropertiesParser.cc:77
SWC::FS::FileSystemHadoop::get_fd
SmartFdHadoop::Ptr get_fd(SmartFd::Ptr &smartfd)
Definition: FileSystem.cc:93
SWC::FS::Statistics::REMOVE_SYNC
@ REMOVE_SYNC
Definition: Statistics.h:34
SWC::FS::FileSystemHadoop::close
void close(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:635
SWC::FS::FileSystemHadoop::seek
void seek(int &err, SmartFd::Ptr &smartfd, size_t offset) override
Definition: FileSystem.cc:590
SWC::FS::Statistics::READDIR_SYNC
@ READDIR_SYNC
Definition: Statistics.h:46
SWC::Error::SERVER_SHUTTING_DOWN
@ SERVER_SHUTTING_DOWN
Definition: Error.h:84
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::FS::HADOOP
@ HADOOP
Definition: FileSystem.h:26
SWC::FS::FileSystemHadoop::rename
void rename(int &err, const std::string &from, const std::string &to) override
Definition: FileSystem.cc:405
SWC::FS::FileSystemHadoop::SmartFdHadoop::Ptr
std::shared_ptr< SmartFdHadoop > Ptr
Definition: FileSystem.h:132
SWC::Core::UniqueLock
Definition: MutexLock.h:68
SWC::FS::Type
Type
Definition: FileSystem.h:20
SWC::FS::Statistics::CLOSE_SYNC
@ CLOSE_SYNC
Definition: Statistics.h:26
SWC::FS::Statistics::RENAME_SYNC
@ RENAME_SYNC
Definition: Statistics.h:50
SWC::FS::Statistics::RMDIR_SYNC
@ RMDIR_SYNC
Definition: Statistics.h:44
SWC_FS_FLUSH_START
#define SWC_FS_FLUSH_START(_smartfd)
Definition: Logger.h:241
SWC::Core::ScopedLock
Definition: MutexLock.h:41
SWC::Error::FS_PERMISSION_DENIED
@ FS_PERMISSION_DENIED
Definition: Error.h:95
SWC_FS_RMDIR_FINISH
#define SWC_FS_RMDIR_FINISH(_error, _path, _tracker)
Definition: Logger.h:96
SWC::FS::FileSystemHadoop::SmartFdHadoop::make_ptr
static Ptr make_ptr(const std::string &filepath, uint32_t flags)
Definition: FileSystem.cc:62
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC_FS_RENAME_FINISH
#define SWC_FS_RENAME_FINISH(_error, _from, _to, _tracker)
Definition: Logger.h:108
SWC::FS::FileSystemHadoop::~FileSystemHadoop
virtual ~FileSystemHadoop() noexcept
Definition: FileSystem.cc:109
SWC::FS::Flags
Flags
Definition: FileSystem.h:41
SWC::FS::Statistics::tracker
SWC_CAN_INLINE Metric::Tracker tracker(Command cmd) noexcept
Definition: Statistics.h:119
SWC::Error::FS_PATH_NOT_FOUND
@ FS_PATH_NOT_FOUND
Definition: Error.h:97
SWC::FS::FileSystem
Definition: FileSystem.h:101
SWC::FS::FileSystemHadoop::exists
bool exists(int &err, const std::string &name) override
Definition: FileSystem.cc:278
SWC::FS::Statistics::SYNC_SYNC
@ SYNC_SYNC
Definition: Statistics.h:52
SWC_PRINT
#define SWC_PRINT
Definition: Logger.h:167
SWC::Config::boo
Property::Value_bool::Ptr boo(const bool &v)
Definition: PropertiesParser.cc:21
SWC_PRINT_CLOSE
#define SWC_PRINT_CLOSE
Definition: Logger.h:171
SWC::FS::FileSystemHadoop::setup_connection
Service::Ptr setup_connection()
Definition: FileSystem.cc:134
SWC::FS::FileSystemHadoop::m_connecting
bool m_connecting
Definition: FileSystem.h:158
SWC::FS::FileSystemHadoop::append
size_t append(int &err, SmartFd::Ptr &smartfd, StaticBuffer &buffer, Flags flags) override
Definition: FileSystem.cc:552
SWC::FS::Statistics::SEEK_SYNC
@ SEEK_SYNC
Definition: Statistics.h:32
SWC::FS::FileSystemHadoop::need_reconnect
void need_reconnect(int &err, Service::Ptr &fs)
Definition: FileSystem.cc:267
SWC::FS::FileSystemHadoop::FileSystemHadoop
FileSystemHadoop(Configurables *config)
Definition: FileSystem.cc:103
SWC::FS::Statistics::CREATE_SYNC
@ CREATE_SYNC
Definition: Statistics.h:24
SWC::FS::FileSystemHadoop::get_type
Type SWC_CONST_FUNC get_type() const noexcept override
Definition: FileSystem.cc:111
SWC_FS_PREAD_START
#define SWC_FS_PREAD_START(_smartfd, _offset, _amount)
Definition: Logger.h:185
fs_make_new_hadoop
SWC::FS::FileSystem * fs_make_new_hadoop(SWC::FS::Configurables *config)
Definition: FileSystem.cc:669
SWC_FS_MKDIRS_FINISH
#define SWC_FS_MKDIRS_FINISH(_error, _path, _tracker)
Definition: Logger.h:72
SWC::FS::Configurables
Definition: FileSystem.h:48
SWC::FS::FileSystemHadoop::Service::Ptr
std::shared_ptr< Service > Ptr
Definition: FileSystem.h:25
SWC::FS::Statistics::MKDIRS_SYNC
@ MKDIRS_SYNC
Definition: Statistics.h:40
SWC::FS::Statistics::EXISTS_SYNC
@ EXISTS_SYNC
Definition: Statistics.h:48
SWC::Config::i16
Property::Value_uint16::Ptr i16(const uint16_t &v)
Definition: PropertiesParser.cc:29
SWC::FS::FileSystemHadoop::SmartFdHadoop::SmartFdHadoop
SmartFdHadoop(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
Definition: FileSystem.cc:75
SWC_FS_READDIR_FINISH
#define SWC_FS_READDIR_FINISH(_error, _path, _sz, _tracker)
Definition: Logger.h:84
SWC_FS_CLOSE_START
#define SWC_FS_CLOSE_START(_smartfd)
Definition: Logger.h:263
SWC::FS::ImplOptions
Definition: FileSystem.h:66
SWC_FS_OPEN_FINISH
#define SWC_FS_OPEN_FINISH(_error, _smartfd, _open_fds, _tracker)
Definition: Logger.h:149
SWC::Core::Atomic::add_rslt
constexpr SWC_CAN_INLINE T add_rslt(T v) noexcept
Definition: Atomic.h:120
SWC::FS::FileSystemHadoop::create
void create(int &err, SmartFd::Ptr &smartfd, uint8_t replication) override
Definition: FileSystem.cc:423
SWC::FS::Configurables::path_root
std::string path_root
Definition: FileSystem.h:51
SWC::FS::FileSystemHadoop::initialize
bool initialize(Service::Ptr &fs)
Definition: FileSystem.cc:167
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Comm::IoContext::get_number_of_threads
static SWC_CAN_INLINE uint32_t get_number_of_threads(bool relative, int32_t size) noexcept
Definition: IoContext.h:47
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::FS::Statistics::READ_SYNC
@ READ_SYNC
Definition: Statistics.h:28
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC_FS_RENAME_START
#define SWC_FS_RENAME_START(_from, _to)
Definition: Logger.h:103
SWC::Error::FS_EOF
@ FS_EOF
Definition: Error.h:96
SWC_FS_MKDIRS_START
#define SWC_FS_MKDIRS_START(_path)
Definition: Logger.h:67
SWC_FS_LENGTH_FINISH
#define SWC_FS_LENGTH_FINISH(_error, _path, _len, _tracker)
Definition: Logger.h:60
SWC_FS_CREATE_FINISH
#define SWC_FS_CREATE_FINISH(_error, _smartfd, _open_fds, _tracker)
Definition: Logger.h:137
SWC::FS::FileSystemHadoop::read
void read(int &err, const std::string &name, StaticBuffer *dst) override
Definition: FileSystem.h:86
SWC_FS_SYNC_START
#define SWC_FS_SYNC_START(_smartfd)
Definition: Logger.h:252
SWC_LOG_FATAL
#define SWC_LOG_FATAL(msg)
Definition: Logger.h:201
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::FS::FileSystemHadoop::SmartFdHadoop::~SmartFdHadoop
virtual ~SmartFdHadoop() noexcept
Definition: FileSystem.cc:80
SWC::FS::FileSystemHadoop
Definition: FileSystem.h:21
SWC_FS_APPEND_FINISH
#define SWC_FS_APPEND_FINISH(_error, _smartfd, _amount, _tracker)
Definition: Logger.h:223
SWC::FS::OPEN_FLAG_OVERWRITE
@ OPEN_FLAG_OVERWRITE
Definition: FileSystem.h:35
SWC::FS::FileSystemHadoop::m_nxt_fd
Core::Atomic< int32_t > m_nxt_fd
Definition: FileSystem.h:154
SWC::FS::FileSystemHadoop::remove
void remove(int &err, const std::string &name) override
Definition: FileSystem.cc:294
SWC::FS::Statistics::LENGTH_SYNC
@ LENGTH_SYNC
Definition: Statistics.h:36
SWC::FS::FileSystemHadoop::readdir
void readdir(int &err, const std::string &name, DirentList &results) override
Definition: FileSystem.cc:348
SWC::FS::Configurables::settings
Config::Settings::Ptr settings
Definition: FileSystem.h:49
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::Core::Buffer::size
size_t size
Definition: Buffer.h:130
SWC_FS_CREATE_START
#define SWC_FS_CREATE_START(_smartfd, _replication)
Definition: Logger.h:131
SWC::Config::strs
Property::Value_strings::Ptr strs(Strings &&v)
Definition: PropertiesParser.cc:49
SWC::FS::Configurables::cfg_fds_max
Config::Property::Value_int32_g::Ptr cfg_fds_max
Definition: FileSystem.h:50
SWC_FS_OPEN_START
#define SWC_FS_OPEN_START(_smartfd)
Definition: Logger.h:144
SWC::Config::str
Property::Value_string::Ptr str(std::string &&v)
Definition: PropertiesParser.cc:45
SWC_FS_READ_FINISH
#define SWC_FS_READ_FINISH(_error, _smartfd, _amount, _tracker)
Definition: Logger.h:175
SWC::FS::Statistics::APPEND_SYNC
@ APPEND_SYNC
Definition: Statistics.h:30
SWC::FS::FileSystemHadoop::flush
void flush(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:607
SWC::FS::Statistics::OPEN_SYNC
@ OPEN_SYNC
Definition: Statistics.h:22
SWC::FS::SmartFd::flags
constexpr SWC_CAN_INLINE uint32_t flags() const noexcept
Definition: SmartFd.h:77
SWC::LOG_ERROR
@ LOG_ERROR
Definition: Logger.h:32
SWC::FS::FileSystemHadoop::Service
Definition: FileSystem.h:24
SWC::FS::FileSystem::fd_open_incr
void fd_open_incr() noexcept
Definition: FileSystem.cc:155
SWC::FS::SmartFd
Smart FileDescriptor.
Definition: SmartFd.h:34
SWC::FS::FileSystemHadoop::sync
void sync(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:621
SWC::FS::FileSystem::settings
const Config::Settings::Ptr settings
Definition: FileSystem.h:110
SWC::FS::FileSystemHadoop::SmartFdHadoop
Definition: FileSystem.h:129
SWC::format
std::string format(const char *fmt,...) __attribute__((format(printf
Definition: String.cc:17
SWC::Core::Vector< Dirent >
SWC::FS::SmartFd::filepath
constexpr SWC_CAN_INLINE const std::string & filepath() const noexcept
Definition: SmartFd.h:67
SWC::FS::Statistics::PREAD_SYNC
@ PREAD_SYNC
Definition: Statistics.h:38
SWC::FS::FileSystemHadoop::SmartFdHadoop::file
hdfs::FileHandle * file() const
Definition: FileSystem.cc:82
SWC::Condition::from
Comp from(const char **buf, uint32_t *remainp, uint8_t extended=0x00) noexcept
Definition: Comparators.cc:15
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC_FS_READDIR_START
#define SWC_FS_READDIR_START(_path)
Definition: Logger.h:79
SWC::FS::FileSystemHadoop::m_fs
Service::Ptr m_fs
Definition: FileSystem.h:160
SWC::FS::FileSystem::m_run
Core::AtomicBool m_run
Definition: FileSystem.h:113
SWC::FS::FileSystem::stop
virtual void stop()
Definition: FileSystem.cc:115
SWC_FS_PREAD_FINISH
#define SWC_FS_PREAD_FINISH(_error, _smartfd, _amount, _tracker)
Definition: Logger.h:191
SWC::FS::FileSystemHadoop::to_string
std::string to_string() const override
Definition: FileSystem.cc:115
SWC::FS::FileSystem::fd_open_decr
void fd_open_decr() noexcept
Definition: FileSystem.cc:159
SWC::FS::Configurables::stats_enabled
bool stats_enabled
Definition: FileSystem.h:52
SWC::FS::FLUSH
@ FLUSH
Definition: FileSystem.h:43
SWC::FS::FileSystemHadoop::get_fs
Service::Ptr get_fs(int &err)
Definition: FileSystem.cc:240
SWC::FS::FileSystemHadoop::open
void open(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:464
SWC::FS::FileSystem::path_root
const std::string path_root
Definition: FileSystem.h:107
SWC_FS_READ_START
#define SWC_FS_READ_START(_smartfd, _amount)
Definition: Logger.h:170
SWC_FS_SEEK_FINISH
#define SWC_FS_SEEK_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:235
SWC::FS::FileSystemHadoop::pread
size_t pread(int &err, SmartFd::Ptr &smartfd, uint64_t offset, void *dst, size_t amount) override
Definition: FileSystem.cc:526
SWC::FS::FileSystemHadoop::m_mutex
std::mutex m_mutex
Definition: FileSystem.h:156
SWC::Common::Files::Schema::filepath
std::string filepath(cid_t cid)
Definition: Schema.h:34
SWC::FS::FileSystem::fds_open
size_t fds_open() const noexcept
Definition: FileSystem.cc:167
SWC::FS::FileSystem::path_data
const std::string path_data
Definition: FileSystem.h:108
SWC_FS_EXISTS_START
#define SWC_FS_EXISTS_START(_path)
Definition: Logger.h:31
FileSystem.h
SWC_FS_RMDIR_START
#define SWC_FS_RMDIR_START(_path)
Definition: Logger.h:91
flags
uint8_t flags
Flags.
Definition: Header.h:55
SWC::FS::SYNC
@ SYNC
Definition: FileSystem.h:44
SWC_FS_SEEK_START
#define SWC_FS_SEEK_START(_smartfd, _offset)
Definition: Logger.h:230
SWC_FS_LENGTH_START
#define SWC_FS_LENGTH_START(_path)
Definition: Logger.h:55
SWC::FS::FileSystemHadoop::m_cv
std::condition_variable m_cv
Definition: FileSystem.h:157
SWC_FS_CLOSE_FINISH
#define SWC_FS_CLOSE_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:268
SWC_ASSERT
#define SWC_ASSERT(_e_)
Definition: Exception.h:165
SWC_FS_REMOVE_FINISH
#define SWC_FS_REMOVE_FINISH(_error, _path, _tracker)
Definition: Logger.h:48
SWC::FS::FileSystem::get_abspath
virtual void get_abspath(const std::string &name, std::string &abspath, size_t reserve=0)
Definition: FileSystem.cc:138
SWC::Core::to_string
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Definition: BitFieldInt.h:263
SWC::FS::FileSystemHadoop::stop
void stop() override
Definition: FileSystem.cc:123
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::FS::FileSystem::statistics
Statistics statistics
Definition: FileSystem.h:114
SWC_FS_SYNC_FINISH
#define SWC_FS_SYNC_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:257
SWC::FS::FileSystemHadoop::length
size_t length(int &err, const std::string &name) override
Definition: FileSystem.cc:311
SWC_FS_REMOVE_START
#define SWC_FS_REMOVE_START(_path)
Definition: Logger.h:43
SWC::FS::apply_hadoop
Configurables * apply_hadoop(Configurables *config)
Definition: FileSystem.cc:19
SWC::Config::Property::Value_int32_g
Definition: Property.h:586
SWC::FS::FileSystemHadoop::rmdir
void rmdir(int &err, const std::string &name) override
Definition: FileSystem.cc:388
SWC_FS_EXISTS_FINISH
#define SWC_FS_EXISTS_FINISH(_error, _path, _state, _tracker)
Definition: Logger.h:36
SWC::Error::SERVER_NOT_READY
@ SERVER_NOT_READY
Definition: Error.h:83