SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
FileSystem.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
8 
9 
10 namespace SWC { namespace FS {
11 
12 
14  config->settings->file_desc.add_options()
15  ("swc.fs.hadoop_jvm.path.root", Config::str(""),
16  "HadoopJVM FileSystem's base root path")
17  ("swc.fs.hadoop_jvm.cfg.dyn", Config::strs(),
18  "Dyn-config file")
19 
20  ("swc.fs.hadoop_jvm.namenode", Config::strs(),
21  "Namenode Host + optional(:Port), muliple")
22  ("swc.fs.hadoop_jvm.namenode.port", Config::i16(),
23  "Namenode Port")
24  ("swc.fs.hadoop_jvm.user", Config::str(),
25  "HadoopJVM user")
26 
27  ("swc.fs.hadoop_jvm.metrics.enabled", Config::boo(true),
28  "Enable or Disable Metrics Tracking")
29 
30  ("swc.fs.hadoop_jvm.fds.max", Config::g_i32(256),
31  "Max Open Fds for opt. without closing")
32  ("swc.fs.hadoop_jvm.reconnect.delay.ms", Config::g_i32(3000),
33  "In ms delay use of connection after re-connect")
34 
35  ("swc.fs.hadoop_jvm.read.buffer.size", Config::g_i32(0),
36  "Size of read buffer in bytes")
37  ("swc.fs.hadoop_jvm.write.buffer.size", Config::g_i32(0),
38  "Size of write buffer in bytes")
39  ("swc.fs.hadoop_jvm.block.size", Config::g_i32(0),
40  "Size of block in aligned 512 bytes")
41  ;
42 
43  config->settings->parse_file(
44  config->settings->get_str("swc.fs.hadoop_jvm.cfg", ""),
45  "swc.fs.hadoop_jvm.cfg.dyn"
46  );
47 
48  config->path_root = config->settings->get_str(
49  "swc.fs.hadoop_jvm.path.root");
50  config->cfg_fds_max = config->settings
51  ->get<Config::Property::Value_int32_g>("swc.fs.hadoop_jvm.fds.max");
52  config->stats_enabled = config->settings->get_bool(
53  "swc.fs.hadoop_jvm.metrics.enabled");
54  return config;
55 }
56 
57 
58 
61  const std::string& filepath, uint32_t flags) {
63 }
64 
68  smart_fd->filepath(), smart_fd->flags(),
69  smart_fd->fd(), smart_fd->pos()
70  ));
71 }
72 
74  const std::string& filepath, uint32_t flags, int32_t fd, uint64_t pos)
75  : SmartFd(filepath, flags, fd, pos), m_hfile(nullptr), m_use_count(0) {
76 }
77 
79 
81  auto c = m_use_count.fetch_add(1);
82  auto hfile = m_hfile.load();
83  if(!hfile) {
84  m_use_count.fetch_sub(1);
85  } else if(c) {
86  // Debugging Info -- that should not be happening!
87  SWC_LOGF(LOG_WARN, "hfile '%s' waiting for file-use-released=" SWC_FMT_LU,
88  to_string().c_str(), c);
89  do std::this_thread::sleep_for(std::chrono::microseconds(1));
90  while(m_use_count > 1);
91  SWC_LOGF(LOG_WARN, "hfile '%s' waited for file-use-released",
92  to_string().c_str());
93  }
94  return hfile;
95 }
96 
98  noexcept {
99  m_hfile.store(file);
100 }
101 
103  m_use_count.fetch_sub(1);
104 }
105 
107  return m_use_count;
108 }
109 
111  auto hfile = m_hfile.exchange(nullptr);
112  pos(0);
113  fd(-1);
114  return hfile;
115 }
116 
117 
118 
121  auto hd_fd = std::dynamic_pointer_cast<SmartFdHadoopJVM>(smartfd);
122  if(!hd_fd){
123  hd_fd = SmartFdHadoopJVM::make_ptr(smartfd);
124  smartfd = std::static_pointer_cast<SmartFd>(hd_fd);
125  }
126  return hd_fd;
127 }
128 
129 
133  settings->get<Config::Property::Value_int32_g>(
134  "swc.fs.hadoop_jvm.reconnect.delay.ms")),
136  settings->get<Config::Property::Value_int32_g>(
137  "swc.fs.hadoop_jvm.read.buffer.size")),
139  settings->get<Config::Property::Value_int32_g>(
140  "swc.fs.hadoop_jvm.write.buffer.size")),
142  settings->get<Config::Property::Value_int32_g>(
143  "swc.fs.hadoop_jvm.block.size")),
144  m_nxt_fd(0), m_mutex(), m_cv(), m_connecting(false),
146 }
147 
149 
151  return Type::HADOOP_JVM;
152 }
153 
154 std::string FileSystemHadoopJVM::to_string() const {
155  return format(
156  "(type=HADOOP_JVM path_root=%s path_data=%s)",
157  path_root.c_str(),
158  path_data.c_str()
159  );
160 }
161 
163  m_run.store(false);
164  {
166  m_fs = nullptr;
167  m_cv.notify_all();
168  }
170 }
171 
174  Service::Ptr fs;
175  uint32_t tries=0;
176  while(m_run && !initialize(fs)) {
178  "FS-HadoopJVM, unable to initialize connection to hadoop_jvm, try=%u",
179  ++tries);
180  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
181  }
182  if(!fs)
183  return fs;
184 
185  std::string abspath;
186  get_abspath("", abspath);
187  hdfsSetWorkingDirectory(fs->srv, abspath.c_str());
188 
189  if(cfg_block_size->get()) {
190  int value;
191  if(cfg_block_size->get() % 512 != 0) {
192  value = (cfg_block_size->get()/512)*512;
193  SWC_LOGF(LOG_WARN, "FS-HadoopJVM, block-size=%d is not 512-aligned changing to=%d",
194  cfg_block_size->get(), value);
195  cfg_block_size->value.store(value);
196  }
197  value = 0;
198  hdfsConfGetInt("dfs.namenode.fs-limits.min-block-size", &value);
199  if(!value)
200  value = 1048576;
201  if(cfg_block_size->get() < value) {
202  SWC_LOGF(LOG_WARN, "FS-HadoopJVM, block-size=%d is to low changing to=%d",
203  cfg_block_size->get(), value);
204  cfg_block_size->value.store(value);
205  }
206  }
207  /*
208  char* host;
209  uint16_t port;
210  hdfsConfGetStr("hdfs.namenode.host", &host);
211  hdfsConfGetInt("hdfs.namenode.port", &port);
212  SWC_LOGF(LOG_INFO,
213  "FS-HadoopJV<, connected to namenode=[%s]:%d", host, port);
214  hdfsConfStrFree(host);
215  */
216 
217  // status check
218  char buffer[256];
219  hdfsGetWorkingDirectory(fs->srv, buffer, 256);
220  SWC_LOGF(LOG_DEBUG, "FS-HadoopJVM, working Dir='%s'", buffer);
221  return fs;
222 }
223 
225 
226  hdfsFS connection = nullptr;
227  if (settings->has("swc.fs.hadoop_jvm.namenode")) {
228  for(auto& h : settings->get_strs("swc.fs.hadoop_jvm.namenode")) {
229  hdfsBuilder* bld = hdfsNewBuilder();
230  hdfsBuilderSetNameNode(bld, h.c_str());
231 
232  if (settings->has("swc.fs.hadoop_jvm.namenode.port"))
233  hdfsBuilderSetNameNodePort(
234  bld, settings->get_i16("swc.fs.hadoop_jvm.namenode.port"));
235 
236  if (settings->has("swc.fs.hadoop_jvm.user"))
237  hdfsBuilderSetUserName(
238  bld,
239  settings->get_str("swc.fs.hadoop_jvm.user").c_str()
240  );
241 
242  SWC_LOGF(LOG_INFO, "Connecting to namenode=%s", h.c_str());
243  connection = hdfsBuilderConnect(bld);
244  // java.lang.IllegalArgumentException: java.net.UnknownHostException:
245  if(!connection)
246  continue;
247 
248  // check status, namenode need to be active
249  errno = 0;
250  int64_t sz_used = hdfsGetUsed(connection);
251  if(sz_used == -1) {
252  if(errno != 255)
253  hdfsDisconnect(connection);
254  connection = nullptr;
255  SWC_LOGF(LOG_ERROR, "hdfsGetUsed('%s') failed - %d(%s)",
256  h.c_str(), errno, Error::get_text(errno));
257  continue;
258  }
259  SWC_LOGF(LOG_INFO, "Non DFS Used bytes: " SWC_FMT_LD, sz_used);
260 
261  errno = 0;
262  sz_used = hdfsGetCapacity(connection);
263  if(sz_used == -1) {
264  if(errno != 255)
265  hdfsDisconnect(connection);
266  connection = nullptr;
267  SWC_LOGF(LOG_ERROR, "hdfsGetCapacity('%s') failed - %d(%s)",
268  h.c_str(), errno, Error::get_text(errno));
269  continue;
270  }
271  SWC_LOGF(LOG_INFO, "Configured Capacity bytes: " SWC_FMT_LD, sz_used);
272  break;
273  }
274 
275  } else {
276  //"default" > from the XML configuration file
277  /*
278  hdfsBuilder* bld = hdfsNewBuilder();
279  hdfsBuilderSetNameNode(bld, "default");
280  connection = hdfsBuilderConnect(bld);
281  */
282  connection = hdfsConnect("default", 0);
283 
284  char* value;
285  hdfsConfGetStr("fs.defaultFS", &value);
287  "FS-HadoopJVM, connecting to default namenode=%s", value);
288  }
289  if(connection) {
290  fs.reset(new Service(connection));
291  return true;
292  }
293  fs = nullptr;
294  return false;
295 }
296 
298  if(m_run && !m_fs) {
299  bool connect = false;
300  {
301  Core::UniqueLock lock_wait(m_mutex);
302  if(m_connecting) {
303  m_cv.wait(lock_wait, [this](){ return !m_connecting || !m_run; });
304  } else {
305  connect = m_connecting = true;
306  }
307  }
308  if(connect) {
309  auto fs = setup_connection();
310  std::this_thread::sleep_for(
311  std::chrono::milliseconds(cfg_use_delay->get()));
313  m_fs = fs;
314  m_connecting = false;
315  m_cv.notify_all();
316  }
317  }
318 
319  auto fs = m_fs;
320  err = m_run
323  return fs;
324 }
325 
327  int& err, FileSystemHadoopJVM::Service::Ptr& fs) {
328  if(err == 255) {
330  if(m_fs == fs)
331  m_fs = nullptr;
332  }
333  // ? org.apache.hadoop.ipc.StandbyException
334 }
335 
336 
337 bool FileSystemHadoopJVM::exists(int& err, const std::string& name) {
339  SWC_FS_EXISTS_START(name);
340  std::string abspath;
341  get_abspath(name, abspath);
342 
343  bool state = false;
344  auto fs = get_fs(err);
345  if(!err) {
346  errno = 0;
347  state = !hdfsExists(fs->srv, abspath.c_str());
348  need_reconnect(err = errno == ENOENT ? Error::OK : errno, fs);
349  }
350  SWC_FS_EXISTS_FINISH(err, abspath, state, tracker);
351  return state;
352 }
353 
354 void FileSystemHadoopJVM::remove(int& err, const std::string& name) {
356  SWC_FS_REMOVE_START(name);
357  std::string abspath;
358  get_abspath(name, abspath);
359 
360  auto fs = get_fs(err);
361  if(!err) {
362  errno = 0;
363  if(hdfsDelete(fs->srv, abspath.c_str(), false) == -1) {
364  err = errno == EIO || errno == ENOENT ? Error::OK: errno;
365  need_reconnect(err, fs);
366  }
367  }
368  SWC_FS_REMOVE_FINISH(err, abspath, tracker);
369 }
370 
371 size_t FileSystemHadoopJVM::length(int& err, const std::string& name) {
373  SWC_FS_LENGTH_START(name);
374  std::string abspath;
375  get_abspath(name, abspath);
376  hdfsFileInfo *fileInfo;
377  size_t len = 0;
378 
379  auto fs = get_fs(err);
380  if(!err) {
381  errno = 0;
382  if(!(fileInfo = hdfsGetPathInfo(fs->srv, abspath.c_str()))) {
383  need_reconnect(err = errno, fs);
384  } else {
385  len = fileInfo->mSize;
386  hdfsFreeFileInfo(fileInfo, 1);
387  }
388  }
389  SWC_FS_LENGTH_FINISH(err, abspath, len, tracker);
390  return len;
391 }
392 
393 void FileSystemHadoopJVM::mkdirs(int& err, const std::string& name) {
395  SWC_FS_MKDIRS_START(name);
396  std::string abspath;
397  get_abspath(name, abspath);
398 
399  auto fs = get_fs(err);
400  if(!err) {
401  errno = 0;
402  if(hdfsCreateDirectory(fs->srv, abspath.c_str()) == -1)
403  need_reconnect(err = errno, fs);
404  }
405  SWC_FS_MKDIRS_FINISH(err, abspath, tracker);
406 }
407 
408 void FileSystemHadoopJVM::readdir(int& err, const std::string& name,
409  DirentList& results) {
411  SWC_FS_READDIR_START(name);
412  std::string abspath;
413  get_abspath(name, abspath);
414  hdfsFileInfo *fileInfo;
415  int numEntries;
416 
417  auto fs = get_fs(err);
418  if(!err) {
419  errno = 0;
420  if (!(fileInfo = hdfsListDirectory(
421  fs->srv, abspath.c_str(), &numEntries))) {
422  need_reconnect(err = errno, fs);
423 
424  } else {
425  if(numEntries > 0) {
426  results.reserve(numEntries);
427  for(int i=0; i < numEntries; ++i) {
428  if(fileInfo[i].mName[0] == '.' || !fileInfo[i].mName[0])
429  continue;
430  const char *ptr = strrchr(fileInfo[i].mName, '/');
431  results.emplace_back(
432  ptr ? ++ptr : fileInfo[i].mName,
433  fileInfo[i].mLastMod,
434  fileInfo[i].mKind == kObjectKindDirectory,
435  fileInfo[i].mSize
436  );
437  }
438  }
439  hdfsFreeFileInfo(fileInfo, numEntries);
440  }
441  }
442 
443  SWC_FS_READDIR_FINISH(err, abspath, results.size(), tracker);
444 }
445 
446 void FileSystemHadoopJVM::rmdir(int& err, const std::string& name) {
447  auto tracker = statistics.tracker(Statistics::RMDIR_SYNC);
448  SWC_FS_RMDIR_START(name);
449  std::string abspath;
450  get_abspath(name, abspath);
451 
452  auto fs = get_fs(err);
453  if(!err) {
454  errno = 0;
455  if(hdfsDelete(fs->srv, abspath.c_str(), true) == -1) {
456  // io error(not-exists)
457  need_reconnect(err = errno == EIO ? ENOENT: errno, fs);
458  }
459  }
460  SWC_FS_RMDIR_FINISH(err == ENOENT ? Error::OK : err, abspath, tracker);
461 }
462 
463 void FileSystemHadoopJVM::rename(int& err, const std::string& from,
464  const std::string& to) {
467  std::string abspath_from;
468  get_abspath(from, abspath_from);
469  std::string abspath_to;
470  get_abspath(to, abspath_to);
471 
472  auto fs = get_fs(err);
473  if(!err) {
474  errno = 0;
475  if(hdfsRename(fs->srv, abspath_from.c_str(), abspath_to.c_str()) == -1)
476  need_reconnect(err = errno == EIO ? ENOENT : errno, fs);
477  }
478  SWC_FS_RENAME_FINISH(err, abspath_from, abspath_to, tracker);
479 }
480 
482  uint8_t replication) {
484  SWC_FS_CREATE_START(smartfd, replication);
485  std::string abspath;
486  get_abspath(smartfd->filepath(), abspath);
487 
488  int oflags = O_WRONLY;
489  if(!(smartfd->flags() & OpenFlags::OPEN_FLAG_OVERWRITE))
490  oflags |= O_APPEND;
491 
492  int tmperr;
493  auto fs = get_fs(err);
494  if(!(tmperr = err)) {
495  auto hadoop_fd = get_fd(smartfd);
496  errno = 0;
497  /* Open the file */
498  auto hfile = hdfsOpenFile(
499  fs->srv, abspath.c_str(), oflags,
500  cfg_w_buffer_size->get(), replication, cfg_block_size->get()
501  );
502  if(!hfile) {
503  need_reconnect(tmperr = errno, fs);
504  if(tmperr == EACCES || tmperr == ENOENT)
506  else if (tmperr == EPERM)
508  else
509  err = tmperr;
510  } else {
511  hadoop_fd->file(hfile);
512  hadoop_fd->fd(m_nxt_fd.add_rslt(1));
513  fd_open_incr();
514  }
515  }
516  SWC_FS_CREATE_FINISH(tmperr, smartfd, fds_open(), tracker);
517 }
518 
519 void FileSystemHadoopJVM::open(int& err, SmartFd::Ptr& smartfd) {
520  auto tracker = statistics.tracker(Statistics::OPEN_SYNC);
521  SWC_FS_OPEN_START(smartfd);
522  std::string abspath;
523  get_abspath(smartfd->filepath(), abspath);
524  int oflags = O_RDONLY;
525  int tmperr;
526 
527  auto fs = get_fs(err);
528  if(!(tmperr = err)) {
529  auto hadoop_fd = get_fd(smartfd);
530  errno = 0;
531  /* Open the file */
532  auto hfile = hdfsOpenFile(
533  fs->srv, abspath.c_str(), oflags,
534  cfg_r_buffer_size->get(), 0, 0
535  );
536  if(!hfile) {
537  need_reconnect(tmperr = errno, fs);
538  if(tmperr == EACCES || tmperr == ENOENT)
540  else if (tmperr == EPERM)
542  else
543  err = tmperr;
544  } else {
545  hadoop_fd->file(hfile);
546  hadoop_fd->fd(m_nxt_fd.add_rslt(1));
547  fd_open_incr();
548  }
549  }
550  SWC_FS_OPEN_FINISH(tmperr, smartfd, fds_open(), tracker);
551 }
552 
553 size_t FileSystemHadoopJVM::read(int& err, SmartFd::Ptr& smartfd,
554  void *dst, size_t amount) {
555  auto tracker = statistics.tracker(Statistics::READ_SYNC);
556  auto hadoop_fd = get_fd(smartfd);
557  SWC_FS_READ_START(hadoop_fd, amount);
558  size_t ret = 0;
559 
560  auto fs = get_fs(err);
561  if(!err) {
562  auto hfile = hadoop_fd->file();
563  if(hfile) {
564  errno = 0;
565  ssize_t nread = hdfsRead(fs->srv, hfile, dst, tSize(amount));
566  hadoop_fd->use_release();
567  if(nread == -1) {
568  nread = 0;
569  need_reconnect(err = errno, fs);
570  } else {
571  if((ret = nread) != amount)
572  err = Error::FS_EOF;
573  hadoop_fd->forward(nread);
574  }
575  } else {
576  err = EBADR;
577  }
578  }
579  SWC_FS_READ_FINISH(err, hadoop_fd, ret, tracker);
580  return ret;
581 }
582 
583 size_t FileSystemHadoopJVM::pread(int& err, SmartFd::Ptr& smartfd,
584  uint64_t offset, void *dst, size_t amount) {
585  auto tracker = statistics.tracker(Statistics::PREAD_SYNC);
586  auto hadoop_fd = get_fd(smartfd);
587  SWC_FS_PREAD_START(hadoop_fd, offset, amount);
588 
589  size_t ret = 0;
590  auto fs = get_fs(err);
591  if(!err) {
592  auto hfile = hadoop_fd->file();
593  if(hfile) {
594  errno = 0;
595  ssize_t nread = hdfsPread(
596  fs->srv, hfile, tOffset(offset), dst, tSize(amount));
597  hadoop_fd->use_release();
598  if(nread == -1) {
599  nread = 0;
600  need_reconnect(err = errno, fs);
601  } else {
602  if((ret = nread) != amount)
603  err = Error::FS_EOF;
604  hadoop_fd->pos(offset + nread);
605  }
606  } else {
607  err = EBADR;
608  }
609  }
610  SWC_FS_PREAD_FINISH(err, hadoop_fd, ret, tracker);
611  return ret;
612 }
613 
614 size_t FileSystemHadoopJVM::append(int& err, SmartFd::Ptr& smartfd,
615  StaticBuffer& buffer, Flags flags) {
617  auto hadoop_fd = get_fd(smartfd);
618  SWC_FS_APPEND_START(hadoop_fd, buffer.size, flags);
619 
620  ssize_t nwritten = 0;
621 
622  auto fs = get_fs(err);
623  if(!err) {
624  auto hfile = hadoop_fd->file();
625  if(hfile) {
626  errno = 0;
627  nwritten = hdfsWrite(fs->srv, hfile, buffer.base, tSize(buffer.size));
628  if(nwritten == -1 || nwritten != ssize_t(buffer.size)) {
629  hadoop_fd->use_release();
630  nwritten = 0;
631  need_reconnect(err = errno, fs);
632  if(!err)
633  err = ECANCELED;
634  } else {
635  hadoop_fd->forward(nwritten);
636 
637  if(flags == Flags::FLUSH || flags == Flags::SYNC) {
638  auto res = hdfsFlush(fs->srv, hfile);
639  hadoop_fd->use_release();
640  if(res == -1) {
641  need_reconnect(err = errno, fs);
642  }
643  } else {
644  hadoop_fd->use_release();
645  }
646  }
647  } else {
648  err = EBADR;
649  }
650  }
651  SWC_FS_APPEND_FINISH(err, hadoop_fd, nwritten, tracker);
652  return nwritten;
653 }
654 
655 void FileSystemHadoopJVM::seek(int& err, SmartFd::Ptr& smartfd,
656  size_t offset) {
657  auto tracker = statistics.tracker(Statistics::SEEK_SYNC);
658  auto hadoop_fd = get_fd(smartfd);
659  SWC_FS_SEEK_START(hadoop_fd, offset);
660 
661  auto fs = get_fs(err);
662  if(!err) {
663  auto hfile = hadoop_fd->file();
664  if(hfile) {
665  errno = 0;
666  auto res = hdfsSeek(fs->srv, hfile, tOffset(offset));
667  hadoop_fd->use_release();
668  if(res == -1)
669  need_reconnect(err = errno, fs);
670  else
671  hadoop_fd->pos(offset);
672  } else {
673  err = EBADR;
674  }
675  }
676  SWC_FS_SEEK_FINISH(err, hadoop_fd, tracker);
677 }
678 
679 void FileSystemHadoopJVM::flush(int& err, SmartFd::Ptr& smartfd) {
680  auto tracker = statistics.tracker(Statistics::FLUSH_SYNC);
681  auto hadoop_fd = get_fd(smartfd);
682  SWC_FS_FLUSH_START(hadoop_fd);
683 
684  auto fs = get_fs(err);
685  if(!err) {
686  auto hfile = hadoop_fd->file();
687  if(hfile) {
688  errno = 0;
689  auto res = hdfsHFlush(fs->srv, hfile);
690  hadoop_fd->use_release();
691  if(res == -1)
692  need_reconnect(err = errno, fs);
693  } else {
694  err = EBADR;
695  }
696  }
697  SWC_FS_FLUSH_FINISH(err, hadoop_fd, tracker);
698 }
699 
700 void FileSystemHadoopJVM::sync(int& err, SmartFd::Ptr& smartfd) {
701  auto tracker = statistics.tracker(Statistics::SYNC_SYNC);
702  auto hadoop_fd = get_fd(smartfd);
703  SWC_FS_SYNC_START(hadoop_fd);
704 
705  auto fs = get_fs(err);
706  if(!err) {
707  auto hfile = hadoop_fd->file();
708  if(hfile) {
709  errno = 0;
710  auto res = hdfsHSync(fs->srv, hfile);
711  hadoop_fd->use_release();
712  if(res == -1)
713  need_reconnect(err = errno, fs);
714  } else {
715  err = EBADR;
716  }
717  }
718  SWC_FS_SYNC_FINISH(err, hadoop_fd, tracker);
719 }
720 
721 void FileSystemHadoopJVM::close(int& err, SmartFd::Ptr& smartfd) {
722  auto tracker = statistics.tracker(Statistics::CLOSE_SYNC);
723  auto hadoop_fd = get_fd(smartfd);
724  SWC_FS_CLOSE_START(hadoop_fd);
725 
726  auto fs = get_fs(err);
727  if(!err) {
728  auto hfile = hadoop_fd->invalidate();
729  if(hfile) {
730  fd_open_decr();
731  if(hadoop_fd->file_used()) {
732  SWC_LOGF(LOG_WARN, "close '%s' waiting for file-use-released",
733  hadoop_fd->to_string().c_str());
734  do std::this_thread::sleep_for(std::chrono::microseconds(1));
735  while(hadoop_fd->file_used());
736  hadoop_fd->pos(0);
737  SWC_LOGF(LOG_WARN, "close '%s' waited for file-use-released",
738  hadoop_fd->to_string().c_str());
739  }
740  errno = 0;
741  if(hdfsCloseFile(fs->srv, hfile) == -1)
742  err = errno == 255 ? Error::FS_BAD_FILE_HANDLE : errno;
743  } else {
744  err = EBADR;
745  }
746  }
747  SWC_FS_CLOSE_FINISH(err, hadoop_fd, tracker);
748 }
749 
750 
751 
752 
753 }} // namespace SWC
754 
755 
756 
757 extern "C" {
759  return static_cast<SWC::FS::FileSystem*>(
760  new SWC::FS::FileSystemHadoopJVM(config));
761 }
762 }
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::~SmartFdHadoopJVM
virtual ~SmartFdHadoopJVM() noexcept
Definition: FileSystem.cc:78
SWC_FS_FLUSH_FINISH
#define SWC_FS_FLUSH_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:246
SWC_FS_APPEND_START
#define SWC_FS_APPEND_START(_smartfd, _amount, _flags)
Definition: Logger.h:217
SWC::FS::Statistics::FLUSH_SYNC
@ FLUSH_SYNC
Definition: Statistics.h:42
SWC::FS::FileSystemHadoopJVM::Service
Definition: FileSystem.h:24
SWC::FS::FileSystemHadoopJVM::need_reconnect
void need_reconnect(int &err, Service::Ptr &fs)
Definition: FileSystem.cc:326
SWC::FS::FileSystemHadoopJVM::get_type
Type SWC_CONST_FUNC get_type() const noexcept override
Definition: FileSystem.cc:150
SWC::FS::FileSystemHadoopJVM::FileSystemHadoopJVM
FileSystemHadoopJVM(Configurables *config)
Definition: FileSystem.cc:130
SWC::Config::g_i32
Property::Value_int32_g::Ptr g_i32(const int32_t &v)
Definition: PropertiesParser.cc:77
SWC::FS::Statistics::REMOVE_SYNC
@ REMOVE_SYNC
Definition: Statistics.h:34
SWC::FS::FileSystemHadoopJVM::length
size_t length(int &err, const std::string &name) override
Definition: FileSystem.cc:371
SWC::FS::Statistics::READDIR_SYNC
@ READDIR_SYNC
Definition: Statistics.h:46
SWC::FS::FileSystemHadoopJVM::~FileSystemHadoopJVM
virtual ~FileSystemHadoopJVM() noexcept
Definition: FileSystem.cc:148
SWC::Error::SERVER_SHUTTING_DOWN
@ SERVER_SHUTTING_DOWN
Definition: Error.h:84
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Core::UniqueLock
Definition: MutexLock.h:68
SWC::FS::FileSystemHadoopJVM::sync
void sync(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:700
SWC::FS::Type
Type
Definition: FileSystem.h:20
SWC::FS::Statistics::CLOSE_SYNC
@ CLOSE_SYNC
Definition: Statistics.h:26
SWC::FS::Statistics::RENAME_SYNC
@ RENAME_SYNC
Definition: Statistics.h:50
SWC::FS::Statistics::RMDIR_SYNC
@ RMDIR_SYNC
Definition: Statistics.h:44
SWC_FS_FLUSH_START
#define SWC_FS_FLUSH_START(_smartfd)
Definition: Logger.h:241
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::SmartFdHadoopJVM
SmartFdHadoopJVM(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
Definition: FileSystem.cc:73
SWC::Core::ScopedLock
Definition: MutexLock.h:41
SWC::Error::FS_PERMISSION_DENIED
@ FS_PERMISSION_DENIED
Definition: Error.h:95
SWC::Error::get_text
const char * get_text(const int err) noexcept
Definition: Error.cc:173
FileSystem.h
SWC::FS::FileSystemHadoopJVM::cfg_r_buffer_size
const Config::Property::Value_int32_g::Ptr cfg_r_buffer_size
Definition: FileSystem.h:162
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::file
hdfsFile file() noexcept
Definition: FileSystem.cc:80
SWC_FS_RMDIR_FINISH
#define SWC_FS_RMDIR_FINISH(_error, _path, _tracker)
Definition: Logger.h:96
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::FS::FileSystemHadoopJVM::close
void close(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:721
SWC_FS_RENAME_FINISH
#define SWC_FS_RENAME_FINISH(_error, _from, _to, _tracker)
Definition: Logger.h:108
SWC::FS::Flags
Flags
Definition: FileSystem.h:41
SWC::FS::Statistics::tracker
SWC_CAN_INLINE Metric::Tracker tracker(Command cmd) noexcept
Definition: Statistics.h:119
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::invalidate
hdfsFile invalidate() noexcept
Definition: FileSystem.cc:110
SWC::Error::FS_PATH_NOT_FOUND
@ FS_PATH_NOT_FOUND
Definition: Error.h:97
SWC::FS::FileSystem
Definition: FileSystem.h:101
SWC::FS::FileSystemHadoopJVM::pread
size_t pread(int &err, SmartFd::Ptr &smartfd, uint64_t offset, void *dst, size_t amount) override
Definition: FileSystem.cc:583
SWC::FS::Statistics::SYNC_SYNC
@ SYNC_SYNC
Definition: Statistics.h:52
SWC::Config::boo
Property::Value_bool::Ptr boo(const bool &v)
Definition: PropertiesParser.cc:21
SWC::FS::FileSystemHadoopJVM::readdir
void readdir(int &err, const std::string &name, DirentList &results) override
Definition: FileSystem.cc:408
SWC::Config::Property::Value_int32_g::value
Core::Atomic< int32_t > value
Definition: Property.h:618
SWC::Config::Property::Value_int32_g::get
SWC_CAN_INLINE int32_t get() const noexcept
Definition: Property.h:610
SWC::FS::FileSystemHadoopJVM::exists
bool exists(int &err, const std::string &name) override
Definition: FileSystem.cc:337
SWC::FS::Statistics::SEEK_SYNC
@ SEEK_SYNC
Definition: Statistics.h:32
SWC::FS::Statistics::CREATE_SYNC
@ CREATE_SYNC
Definition: Statistics.h:24
SWC_FS_PREAD_START
#define SWC_FS_PREAD_START(_smartfd, _offset, _amount)
Definition: Logger.h:185
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::use_release
void use_release() noexcept
Definition: FileSystem.cc:102
SWC_FS_MKDIRS_FINISH
#define SWC_FS_MKDIRS_FINISH(_error, _path, _tracker)
Definition: Logger.h:72
SWC::FS::Configurables
Definition: FileSystem.h:48
SWC::FS::Statistics::MKDIRS_SYNC
@ MKDIRS_SYNC
Definition: Statistics.h:40
SWC::FS::FileSystemHadoopJVM::cfg_w_buffer_size
const Config::Property::Value_int32_g::Ptr cfg_w_buffer_size
Definition: FileSystem.h:163
SWC::FS::Statistics::EXISTS_SYNC
@ EXISTS_SYNC
Definition: Statistics.h:48
SWC::Config::i16
Property::Value_uint16::Ptr i16(const uint16_t &v)
Definition: PropertiesParser.cc:29
SWC_FS_READDIR_FINISH
#define SWC_FS_READDIR_FINISH(_error, _path, _sz, _tracker)
Definition: Logger.h:84
SWC_FS_CLOSE_START
#define SWC_FS_CLOSE_START(_smartfd)
Definition: Logger.h:263
SWC::FS::ImplOptions
Definition: FileSystem.h:66
SWC_FS_OPEN_FINISH
#define SWC_FS_OPEN_FINISH(_error, _smartfd, _open_fds, _tracker)
Definition: Logger.h:149
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::Ptr
std::shared_ptr< SmartFdHadoopJVM > Ptr
Definition: FileSystem.h:130
SWC::Core::Atomic::add_rslt
constexpr SWC_CAN_INLINE T add_rslt(T v) noexcept
Definition: Atomic.h:120
SWC::FS::FileSystemHadoopJVM::cfg_block_size
const Config::Property::Value_int32_g::Ptr cfg_block_size
Definition: FileSystem.h:164
SWC::FS::Configurables::path_root
std::string path_root
Definition: FileSystem.h:51
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::FS::Statistics::READ_SYNC
@ READ_SYNC
Definition: Statistics.h:28
SWC::FS::FileSystemHadoopJVM::mkdirs
void mkdirs(int &err, const std::string &name) override
Definition: FileSystem.cc:393
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC_FS_RENAME_START
#define SWC_FS_RENAME_START(_from, _to)
Definition: Logger.h:103
SWC::Error::FS_EOF
@ FS_EOF
Definition: Error.h:96
SWC::FS::FileSystemHadoopJVM::open
void open(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:519
SWC_FS_MKDIRS_START
#define SWC_FS_MKDIRS_START(_path)
Definition: Logger.h:67
SWC_FS_LENGTH_FINISH
#define SWC_FS_LENGTH_FINISH(_error, _path, _len, _tracker)
Definition: Logger.h:60
SWC_FS_CREATE_FINISH
#define SWC_FS_CREATE_FINISH(_error, _smartfd, _open_fds, _tracker)
Definition: Logger.h:137
SWC_FS_SYNC_START
#define SWC_FS_SYNC_START(_smartfd)
Definition: Logger.h:252
SWC::Core::Buffer::base
value_type * base
Definition: Buffer.h:131
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC_FS_APPEND_FINISH
#define SWC_FS_APPEND_FINISH(_error, _smartfd, _amount, _tracker)
Definition: Logger.h:223
SWC::FS::OPEN_FLAG_OVERWRITE
@ OPEN_FLAG_OVERWRITE
Definition: FileSystem.h:35
SWC::FS::FileSystemHadoopJVM::cfg_use_delay
const Config::Property::Value_int32_g::Ptr cfg_use_delay
Definition: FileSystem.h:161
SWC::FS::Statistics::LENGTH_SYNC
@ LENGTH_SYNC
Definition: Statistics.h:36
SWC::FS::apply_hadoop_jvm
Configurables * apply_hadoop_jvm(Configurables *config)
Definition: FileSystem.cc:13
SWC::FS::FileSystemHadoopJVM::remove
void remove(int &err, const std::string &name) override
Definition: FileSystem.cc:354
SWC::FS::Configurables::settings
Config::Settings::Ptr settings
Definition: FileSystem.h:49
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::Error::FS_BAD_FILE_HANDLE
@ FS_BAD_FILE_HANDLE
Definition: Error.h:94
SWC::Core::Buffer::size
size_t size
Definition: Buffer.h:130
SWC_FS_CREATE_START
#define SWC_FS_CREATE_START(_smartfd, _replication)
Definition: Logger.h:131
SWC::Config::strs
Property::Value_strings::Ptr strs(Strings &&v)
Definition: PropertiesParser.cc:49
SWC::FS::Configurables::cfg_fds_max
Config::Property::Value_int32_g::Ptr cfg_fds_max
Definition: FileSystem.h:50
SWC::FS::FileSystemHadoopJVM::rename
void rename(int &err, const std::string &from, const std::string &to) override
Definition: FileSystem.cc:463
SWC::FS::HADOOP_JVM
@ HADOOP_JVM
Definition: FileSystem.h:27
SWC::FS::FileSystemHadoopJVM::setup_connection
Service::Ptr setup_connection()
Definition: FileSystem.cc:173
SWC_FS_OPEN_START
#define SWC_FS_OPEN_START(_smartfd)
Definition: Logger.h:144
SWC::FS::FileSystemHadoopJVM::append
size_t append(int &err, SmartFd::Ptr &smartfd, StaticBuffer &buffer, Flags flags) override
Definition: FileSystem.cc:614
SWC::Config::str
Property::Value_string::Ptr str(std::string &&v)
Definition: PropertiesParser.cc:45
SWC_FS_READ_FINISH
#define SWC_FS_READ_FINISH(_error, _smartfd, _amount, _tracker)
Definition: Logger.h:175
SWC::FS::FileSystemHadoopJVM::read
void read(int &err, const std::string &name, StaticBuffer *dst) override
Definition: FileSystem.h:84
SWC::FS::Statistics::APPEND_SYNC
@ APPEND_SYNC
Definition: Statistics.h:30
SWC::FS::Statistics::OPEN_SYNC
@ OPEN_SYNC
Definition: Statistics.h:22
SWC::FS::FileSystemHadoopJVM::stop
void stop() override
Definition: FileSystem.cc:162
SWC::FS::FileSystemHadoopJVM::create
void create(int &err, SmartFd::Ptr &smartfd, uint8_t replication) override
Definition: FileSystem.cc:481
SWC::FS::SmartFd::flags
constexpr SWC_CAN_INLINE uint32_t flags() const noexcept
Definition: SmartFd.h:77
SWC::LOG_ERROR
@ LOG_ERROR
Definition: Logger.h:32
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::FS::FileSystem::fd_open_incr
void fd_open_incr() noexcept
Definition: FileSystem.cc:155
SWC::FS::SmartFd
Smart FileDescriptor.
Definition: SmartFd.h:34
SWC::FS::FileSystemHadoopJVM::m_fs
Service::Ptr m_fs
Definition: FileSystem.h:170
SWC::FS::FileSystem::settings
const Config::Settings::Ptr settings
Definition: FileSystem.h:110
SWC::FS::FileSystemHadoopJVM
Definition: FileSystem.h:21
SWC::format
std::string format(const char *fmt,...) __attribute__((format(printf
Definition: String.cc:17
SWC::Core::Vector< Dirent >
SWC::FS::SmartFd::filepath
constexpr SWC_CAN_INLINE const std::string & filepath() const noexcept
Definition: SmartFd.h:67
SWC::FS::Statistics::PREAD_SYNC
@ PREAD_SYNC
Definition: Statistics.h:38
SWC::Condition::from
Comp from(const char **buf, uint32_t *remainp, uint8_t extended=0x00) noexcept
Definition: Comparators.cc:15
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC_FS_READDIR_START
#define SWC_FS_READDIR_START(_path)
Definition: Logger.h:79
SWC::FS::FileSystemHadoopJVM::Service::Ptr
std::shared_ptr< Service > Ptr
Definition: FileSystem.h:25
SWC::FS::FileSystem::m_run
Core::AtomicBool m_run
Definition: FileSystem.h:113
SWC::FS::FileSystem::stop
virtual void stop()
Definition: FileSystem.cc:115
SWC_FS_PREAD_FINISH
#define SWC_FS_PREAD_FINISH(_error, _smartfd, _amount, _tracker)
Definition: Logger.h:191
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::file_used
bool file_used() const noexcept
Definition: FileSystem.cc:106
SWC::FS::FileSystemHadoopJVM::m_mutex
std::mutex m_mutex
Definition: FileSystem.h:167
SWC::FS::FileSystem::fd_open_decr
void fd_open_decr() noexcept
Definition: FileSystem.cc:159
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::make_ptr
static Ptr make_ptr(const std::string &filepath, uint32_t flags)
Definition: FileSystem.cc:60
SWC::FS::Configurables::stats_enabled
bool stats_enabled
Definition: FileSystem.h:52
SWC::FS::FLUSH
@ FLUSH
Definition: FileSystem.h:43
SWC::FS::FileSystem::path_root
const std::string path_root
Definition: FileSystem.h:107
SWC_FS_READ_START
#define SWC_FS_READ_START(_smartfd, _amount)
Definition: Logger.h:170
SWC_FS_SEEK_FINISH
#define SWC_FS_SEEK_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:235
SWC::Common::Files::Schema::filepath
std::string filepath(cid_t cid)
Definition: Schema.h:34
SWC::FS::FileSystem::fds_open
size_t fds_open() const noexcept
Definition: FileSystem.cc:167
SWC::FS::FileSystemHadoopJVM::m_connecting
bool m_connecting
Definition: FileSystem.h:169
SWC::FS::FileSystem::path_data
const std::string path_data
Definition: FileSystem.h:108
SWC::FS::FileSystemHadoopJVM::m_cv
std::condition_variable m_cv
Definition: FileSystem.h:168
SWC_FS_EXISTS_START
#define SWC_FS_EXISTS_START(_path)
Definition: Logger.h:31
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::FS::FileSystemHadoopJVM::flush
void flush(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:679
SWC_FS_RMDIR_START
#define SWC_FS_RMDIR_START(_path)
Definition: Logger.h:91
SWC::FS::FileSystemHadoopJVM::to_string
std::string to_string() const override
Definition: FileSystem.cc:154
flags
uint8_t flags
Flags.
Definition: Header.h:55
SWC::FS::SYNC
@ SYNC
Definition: FileSystem.h:44
SWC_FS_SEEK_START
#define SWC_FS_SEEK_START(_smartfd, _offset)
Definition: Logger.h:230
SWC_FS_LENGTH_START
#define SWC_FS_LENGTH_START(_path)
Definition: Logger.h:55
SWC_FS_CLOSE_FINISH
#define SWC_FS_CLOSE_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:268
SWC_FS_REMOVE_FINISH
#define SWC_FS_REMOVE_FINISH(_error, _path, _tracker)
Definition: Logger.h:48
SWC::FS::FileSystem::get_abspath
virtual void get_abspath(const std::string &name, std::string &abspath, size_t reserve=0)
Definition: FileSystem.cc:138
SWC_FMT_LD
#define SWC_FMT_LD
Definition: Compat.h:99
SWC::FS::FileSystemHadoopJVM::get_fs
Service::Ptr get_fs(int &err)
Definition: FileSystem.cc:297
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM
Definition: FileSystem.h:127
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Core::Vector::emplace_back
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
Definition: Vector.h:349
SWC::FS::FileSystem::statistics
Statistics statistics
Definition: FileSystem.h:114
SWC_FS_SYNC_FINISH
#define SWC_FS_SYNC_FINISH(_error, _smartfd, _tracker)
Definition: Logger.h:257
SWC_FS_REMOVE_START
#define SWC_FS_REMOVE_START(_path)
Definition: Logger.h:43
SWC::FS::FileSystemHadoopJVM::seek
void seek(int &err, SmartFd::Ptr &smartfd, size_t offset) override
Definition: FileSystem.cc:655
SWC::FS::FileSystemHadoopJVM::get_fd
SmartFdHadoopJVM::Ptr get_fd(SmartFd::Ptr &smartfd)
Definition: FileSystem.cc:120
SWC::Config::Property::Value_int32_g
Definition: Property.h:586
SWC::Core::Vector::reserve
SWC_CAN_INLINE void reserve(size_type cap)
Definition: Vector.h:288
fs_make_new_hadoop_jvm
SWC::FS::FileSystem * fs_make_new_hadoop_jvm(SWC::FS::Configurables *config)
Definition: FileSystem.cc:758
SWC::FS::FileSystemHadoopJVM::m_nxt_fd
Core::Atomic< int32_t > m_nxt_fd
Definition: FileSystem.h:166
SWC::FS::FileSystemHadoopJVM::initialize
bool initialize(Service::Ptr &fs)
Definition: FileSystem.cc:224
SWC_FS_EXISTS_FINISH
#define SWC_FS_EXISTS_FINISH(_error, _path, _state, _tracker)
Definition: Logger.h:36
SWC::Error::SERVER_NOT_READY
@ SERVER_NOT_READY
Definition: Error.h:83
SWC::FS::FileSystemHadoopJVM::rmdir
void rmdir(int &err, const std::string &name) override
Definition: FileSystem.cc:446