10 namespace SWC {
namespace FS {
14 config->
settings->file_desc.add_options()
16 "HadoopJVM FileSystem's base root path")
21 "Namenode Host + optional(:Port), muliple")
27 (
"swc.fs.hadoop_jvm.metrics.enabled",
Config::boo(
true),
28 "Enable or Disable Metrics Tracking")
31 "Max Open Fds for opt. without closing")
33 "In ms delay use of connection after re-connect")
36 "Size of read buffer in bytes")
38 "Size of write buffer in bytes")
40 "Size of block in aligned 512 bytes")
44 config->
settings->get_str(
"swc.fs.hadoop_jvm.cfg",
""),
45 "swc.fs.hadoop_jvm.cfg.dyn"
49 "swc.fs.hadoop_jvm.path.root");
53 "swc.fs.hadoop_jvm.metrics.enabled");
68 smart_fd->filepath(), smart_fd->flags(),
69 smart_fd->fd(), smart_fd->pos()
74 const std::string&
filepath, uint32_t
flags, int32_t fd, uint64_t pos)
81 auto c = m_use_count.fetch_add(1);
82 auto hfile = m_hfile.load();
84 m_use_count.fetch_sub(1);
89 do std::this_thread::sleep_for(std::chrono::microseconds(1));
90 while(m_use_count > 1);
103 m_use_count.fetch_sub(1);
111 auto hfile = m_hfile.exchange(
nullptr);
121 auto hd_fd = std::dynamic_pointer_cast<SmartFdHadoopJVM>(smartfd);
124 smartfd = std::static_pointer_cast<SmartFd>(hd_fd);
133 settings->get<Config::Property::Value_int32_g>(
134 "swc.fs.hadoop_jvm.reconnect.delay.ms")),
136 settings->get<Config::Property::Value_int32_g>(
137 "swc.fs.hadoop_jvm.read.buffer.size")),
139 settings->get<Config::Property::Value_int32_g>(
140 "swc.fs.hadoop_jvm.write.buffer.size")),
142 settings->get<Config::Property::Value_int32_g>(
143 "swc.fs.hadoop_jvm.block.size")),
156 "(type=HADOOP_JVM path_root=%s path_data=%s)",
178 "FS-HadoopJVM, unable to initialize connection to hadoop_jvm, try=%u",
180 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
187 hdfsSetWorkingDirectory(fs->srv, abspath.c_str());
193 SWC_LOGF(
LOG_WARN,
"FS-HadoopJVM, block-size=%d is not 512-aligned changing to=%d",
198 hdfsConfGetInt(
"dfs.namenode.fs-limits.min-block-size", &value);
219 hdfsGetWorkingDirectory(fs->srv, buffer, 256);
226 hdfsFS connection =
nullptr;
227 if (
settings->has(
"swc.fs.hadoop_jvm.namenode")) {
228 for(
auto& h :
settings->get_strs(
"swc.fs.hadoop_jvm.namenode")) {
229 hdfsBuilder* bld = hdfsNewBuilder();
230 hdfsBuilderSetNameNode(bld, h.c_str());
232 if (
settings->has(
"swc.fs.hadoop_jvm.namenode.port"))
233 hdfsBuilderSetNameNodePort(
234 bld,
settings->get_i16(
"swc.fs.hadoop_jvm.namenode.port"));
236 if (
settings->has(
"swc.fs.hadoop_jvm.user"))
237 hdfsBuilderSetUserName(
239 settings->get_str(
"swc.fs.hadoop_jvm.user").c_str()
243 connection = hdfsBuilderConnect(bld);
250 int64_t sz_used = hdfsGetUsed(connection);
253 hdfsDisconnect(connection);
254 connection =
nullptr;
262 sz_used = hdfsGetCapacity(connection);
265 hdfsDisconnect(connection);
266 connection =
nullptr;
282 connection = hdfsConnect(
"default", 0);
285 hdfsConfGetStr(
"fs.defaultFS", &value);
287 "FS-HadoopJVM, connecting to default namenode=%s", value);
290 fs.reset(
new Service(connection));
299 bool connect =
false;
310 std::this_thread::sleep_for(
347 state = !hdfsExists(fs->srv, abspath.c_str());
363 if(hdfsDelete(fs->srv, abspath.c_str(),
false) == -1) {
364 err = errno == EIO || errno == ENOENT ?
Error::OK: errno;
376 hdfsFileInfo *fileInfo;
382 if(!(fileInfo = hdfsGetPathInfo(fs->srv, abspath.c_str()))) {
385 len = fileInfo->mSize;
386 hdfsFreeFileInfo(fileInfo, 1);
402 if(hdfsCreateDirectory(fs->srv, abspath.c_str()) == -1)
414 hdfsFileInfo *fileInfo;
420 if (!(fileInfo = hdfsListDirectory(
421 fs->srv, abspath.c_str(), &numEntries))) {
427 for(
int i=0; i < numEntries; ++i) {
428 if(fileInfo[i].mName[0] ==
'.' || !fileInfo[i].mName[0])
430 const char *ptr = strrchr(fileInfo[i].mName,
'/');
432 ptr ? ++ptr : fileInfo[i].mName,
433 fileInfo[i].mLastMod,
434 fileInfo[i].mKind == kObjectKindDirectory,
439 hdfsFreeFileInfo(fileInfo, numEntries);
455 if(hdfsDelete(fs->srv, abspath.c_str(),
true) == -1) {
464 const std::string& to) {
467 std::string abspath_from;
469 std::string abspath_to;
475 if(hdfsRename(fs->srv, abspath_from.c_str(), abspath_to.c_str()) == -1)
482 uint8_t replication) {
488 int oflags = O_WRONLY;
494 if(!(tmperr = err)) {
495 auto hadoop_fd =
get_fd(smartfd);
498 auto hfile = hdfsOpenFile(
499 fs->srv, abspath.c_str(), oflags,
504 if(tmperr == EACCES || tmperr == ENOENT)
506 else if (tmperr == EPERM)
511 hadoop_fd->file(hfile);
524 int oflags = O_RDONLY;
528 if(!(tmperr = err)) {
529 auto hadoop_fd =
get_fd(smartfd);
532 auto hfile = hdfsOpenFile(
533 fs->srv, abspath.c_str(), oflags,
538 if(tmperr == EACCES || tmperr == ENOENT)
540 else if (tmperr == EPERM)
545 hadoop_fd->file(hfile);
554 void *dst,
size_t amount) {
556 auto hadoop_fd =
get_fd(smartfd);
562 auto hfile = hadoop_fd->file();
565 ssize_t nread = hdfsRead(fs->srv, hfile, dst, tSize(amount));
566 hadoop_fd->use_release();
571 if((ret = nread) != amount)
573 hadoop_fd->forward(nread);
584 uint64_t offset,
void *dst,
size_t amount) {
586 auto hadoop_fd =
get_fd(smartfd);
592 auto hfile = hadoop_fd->file();
595 ssize_t nread = hdfsPread(
596 fs->srv, hfile, tOffset(offset), dst, tSize(amount));
597 hadoop_fd->use_release();
602 if((ret = nread) != amount)
604 hadoop_fd->pos(offset + nread);
617 auto hadoop_fd =
get_fd(smartfd);
620 ssize_t nwritten = 0;
624 auto hfile = hadoop_fd->file();
627 nwritten = hdfsWrite(fs->srv, hfile, buffer.
base, tSize(buffer.
size));
628 if(nwritten == -1 || nwritten != ssize_t(buffer.
size)) {
629 hadoop_fd->use_release();
635 hadoop_fd->forward(nwritten);
638 auto res = hdfsFlush(fs->srv, hfile);
639 hadoop_fd->use_release();
644 hadoop_fd->use_release();
658 auto hadoop_fd =
get_fd(smartfd);
663 auto hfile = hadoop_fd->file();
666 auto res = hdfsSeek(fs->srv, hfile, tOffset(offset));
667 hadoop_fd->use_release();
671 hadoop_fd->pos(offset);
681 auto hadoop_fd =
get_fd(smartfd);
686 auto hfile = hadoop_fd->file();
689 auto res = hdfsHFlush(fs->srv, hfile);
690 hadoop_fd->use_release();
702 auto hadoop_fd =
get_fd(smartfd);
707 auto hfile = hadoop_fd->file();
710 auto res = hdfsHSync(fs->srv, hfile);
711 hadoop_fd->use_release();
723 auto hadoop_fd =
get_fd(smartfd);
728 auto hfile = hadoop_fd->invalidate();
731 if(hadoop_fd->file_used()) {
733 hadoop_fd->to_string().c_str());
734 do std::this_thread::sleep_for(std::chrono::microseconds(1));
735 while(hadoop_fd->file_used());
738 hadoop_fd->to_string().c_str());
741 if(hdfsCloseFile(fs->srv, hfile) == -1)