 |
SWC-DB
v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
|
Go to the documentation of this file.
13 #include "hdfspp/config_parser.h"
16 namespace SWC {
namespace FS {
20 config->
settings->file_desc.add_options()
22 "Hadoop FileSystem's base root path")
27 "Namenode Host + optional(:Port), muliple")
33 (
"swc.fs.hadoop.concurrency.relative",
Config::boo(
true),
34 "Determined ratio by HW-Concurrency")
36 "Number or HW-Concurrency base of Handlers for hadoop tasks")
38 (
"swc.fs.hadoop.metrics.enabled",
Config::boo(
true),
39 "Enable or Disable Metrics Tracking")
42 "Max Open Fds for opt. without closing")
46 config->
settings->get_str(
"swc.fs.hadoop.cfg",
""),
47 "swc.fs.hadoop.cfg.dyn"
51 "swc.fs.hadoop.path.root");
55 "swc.fs.hadoop.metrics.enabled");
70 smart_fd->filepath(), smart_fd->flags(),
71 smart_fd->fd(), smart_fd->pos()
76 const std::string&
filepath, uint32_t
flags, int32_t fd, uint64_t pos)
94 auto hd_fd = std::dynamic_pointer_cast<SmartFdHadoop>(smartfd);
97 smartfd = std::static_pointer_cast<SmartFd>(hd_fd);
117 "(type=HADOOP path_root=%s path_data=%s)",
139 "FS-Hadoop, unable to initialize connection to hadoop, try=%u",
141 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
169 hdfs::ConfigParser parser;
170 if(!parser.LoadDefaultResources())
171 SWC_LOG_FATAL(
"hdfs::ConfigParser could not load default resources.");
173 auto stats = parser.ValidateResources();
174 for(
auto& s : stats) {
180 hdfs::Options options;
181 if(!parser.get_options(options))
182 SWC_LOG_FATAL(
"hdfs::ConfigParser could not load Options object.");
184 hdfs::IoService* io_service = hdfs::IoService::New();
185 io_service->InitWorkers(
187 settings->get_bool(
"swc.fs.hadoop.concurrency.relative"),
188 settings->get_i32(
"swc.fs.hadoop.handlers")
192 hdfs::FileSystem* connection = hdfs::FileSystem::New(
194 settings->get_str(
"swc.fs.hadoop.user",
""),
199 hdfs::FsInfo fs_info;
202 if(
settings->has(
"swc.fs.hadoop.namenode")) {
203 for(
auto& h :
settings->get_strs(
"swc.fs.hadoop.namenode")) {
205 "swc.fs.hadoop.namenode.port", 0));
208 h.c_str(), port.c_str());
210 status = connection->Connect(h, port);
212 status = connection->GetFsStats(fs_info);
216 SWC_PRINT <<
"FS-Hadoop, Could not connect to " << h <<
":" << port
221 SWC_LOGF(
LOG_DEBUG,
"FS-Hadoop, connecting to default namenode=%s", options.defaultFS.get_path().c_str());
222 status = connection->ConnectToDefaultFs();
224 status = connection->GetFsStats(fs_info);
226 SWC_PRINT <<
"FS-Hadoop, Could not connect to " << options.defaultFS
232 fs.reset(
new Service(connection));
242 bool connect =
false;
304 err = (errno == EIO || errno == ENOENT ?
Error::OK: errno);
398 err = errno == EIO ? ENOENT: errno;
406 const std::string& to) {
409 std::string abspath_from;
411 std::string abspath_to;
424 uint8_t replication) {
430 int oflags = O_WRONLY;
437 if(!(tmperr = err)) {
438 auto hadoop_fd =
get_fd(smartfd);
441 hdfs::FileHandle* fd =
nullptr;
449 if(tmperr == EACCES || tmperr == ENOENT)
451 else if (tmperr == EPERM)
469 int oflags = O_RDONLY;
473 if(!(tmperr = err)) {
474 auto hadoop_fd =
get_fd(smartfd);
477 hdfs::FileHandle* fd =
nullptr;
485 if(tmperr == EACCES || tmperr == ENOENT)
487 else if (tmperr == EPERM)
501 void *dst,
size_t amount) {
503 auto hadoop_fd =
get_fd(smartfd);
517 if((ret = nread) != amount)
519 hadoop_fd->forward(nread);
527 uint64_t offset,
void *dst,
size_t amount) {
529 auto hadoop_fd =
get_fd(smartfd);
543 if((ret = nread) != amount)
545 hadoop_fd->pos(offset + nread);
555 auto hadoop_fd =
get_fd(smartfd);
557 ssize_t nwritten = 0;
577 hadoop_fd->forward(nwritten);
593 auto hadoop_fd =
get_fd(smartfd);
602 hadoop_fd->pos(offset);
609 auto hadoop_fd =
get_fd(smartfd);
623 auto hadoop_fd =
get_fd(smartfd);
637 auto hadoop_fd =
get_fd(smartfd);
640 if(hadoop_fd->file()) {
647 hadoop_fd->file(
nullptr);
#define SWC_FS_FLUSH_FINISH(_error, _smartfd, _tracker)
Property::Value_int32::Ptr i32(const int32_t &v)
#define SWC_FS_APPEND_START(_smartfd, _amount, _flags)
void mkdirs(int &err, const std::string &name) override
Property::Value_int32_g::Ptr g_i32(const int32_t &v)
SmartFdHadoop::Ptr get_fd(SmartFd::Ptr &smartfd)
void close(int &err, SmartFd::Ptr &smartfd) override
void seek(int &err, SmartFd::Ptr &smartfd, size_t offset) override
#define SWC_LOGF(priority, fmt,...)
void rename(int &err, const std::string &from, const std::string &to) override
std::shared_ptr< SmartFdHadoop > Ptr
#define SWC_FS_FLUSH_START(_smartfd)
#define SWC_FS_RMDIR_FINISH(_error, _path, _tracker)
static Ptr make_ptr(const std::string &filepath, uint32_t flags)
#define SWC_FS_RENAME_FINISH(_error, _from, _to, _tracker)
virtual ~FileSystemHadoop() noexcept
SWC_CAN_INLINE Metric::Tracker tracker(Command cmd) noexcept
bool exists(int &err, const std::string &name) override
Property::Value_bool::Ptr boo(const bool &v)
Service::Ptr setup_connection()
size_t append(int &err, SmartFd::Ptr &smartfd, StaticBuffer &buffer, Flags flags) override
void need_reconnect(int &err, Service::Ptr &fs)
FileSystemHadoop(Configurables *config)
Type SWC_CONST_FUNC get_type() const noexcept override
#define SWC_FS_PREAD_START(_smartfd, _offset, _amount)
SWC::FS::FileSystem * fs_make_new_hadoop(SWC::FS::Configurables *config)
#define SWC_FS_MKDIRS_FINISH(_error, _path, _tracker)
std::shared_ptr< Service > Ptr
Property::Value_uint16::Ptr i16(const uint16_t &v)
SmartFdHadoop(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
#define SWC_FS_READDIR_FINISH(_error, _path, _sz, _tracker)
#define SWC_FS_CLOSE_START(_smartfd)
#define SWC_FS_OPEN_FINISH(_error, _smartfd, _open_fds, _tracker)
constexpr SWC_CAN_INLINE T add_rslt(T v) noexcept
void create(int &err, SmartFd::Ptr &smartfd, uint8_t replication) override
bool initialize(Service::Ptr &fs)
constexpr SWC_CAN_INLINE void store(T v) noexcept
static SWC_CAN_INLINE uint32_t get_number_of_threads(bool relative, int32_t size) noexcept
#define SWC_FS_RENAME_START(_from, _to)
#define SWC_FS_MKDIRS_START(_path)
#define SWC_FS_LENGTH_FINISH(_error, _path, _len, _tracker)
#define SWC_FS_CREATE_FINISH(_error, _smartfd, _open_fds, _tracker)
void read(int &err, const std::string &name, StaticBuffer *dst) override
#define SWC_FS_SYNC_START(_smartfd)
#define SWC_LOG_FATAL(msg)
The SWC-DB C++ namespace 'SWC'.
virtual ~SmartFdHadoop() noexcept
#define SWC_FS_APPEND_FINISH(_error, _smartfd, _amount, _tracker)
Core::Atomic< int32_t > m_nxt_fd
void remove(int &err, const std::string &name) override
void readdir(int &err, const std::string &name, DirentList &results) override
Config::Settings::Ptr settings
#define SWC_FS_CREATE_START(_smartfd, _replication)
Property::Value_strings::Ptr strs(Strings &&v)
Config::Property::Value_int32_g::Ptr cfg_fds_max
#define SWC_FS_OPEN_START(_smartfd)
Property::Value_string::Ptr str(std::string &&v)
#define SWC_FS_READ_FINISH(_error, _smartfd, _amount, _tracker)
void flush(int &err, SmartFd::Ptr &smartfd) override
constexpr SWC_CAN_INLINE uint32_t flags() const noexcept
void fd_open_incr() noexcept
void sync(int &err, SmartFd::Ptr &smartfd) override
const Config::Settings::Ptr settings
std::string format(const char *fmt,...) __attribute__((format(printf
constexpr SWC_CAN_INLINE const std::string & filepath() const noexcept
hdfs::FileHandle * file() const
Comp from(const char **buf, uint32_t *remainp, uint8_t extended=0x00) noexcept
std::shared_ptr< SmartFd > Ptr
#define SWC_FS_READDIR_START(_path)
#define SWC_FS_PREAD_FINISH(_error, _smartfd, _amount, _tracker)
std::string to_string() const override
void fd_open_decr() noexcept
Service::Ptr get_fs(int &err)
void open(int &err, SmartFd::Ptr &smartfd) override
const std::string path_root
#define SWC_FS_READ_START(_smartfd, _amount)
#define SWC_FS_SEEK_FINISH(_error, _smartfd, _tracker)
size_t pread(int &err, SmartFd::Ptr &smartfd, uint64_t offset, void *dst, size_t amount) override
std::string filepath(cid_t cid)
size_t fds_open() const noexcept
const std::string path_data
#define SWC_FS_EXISTS_START(_path)
#define SWC_FS_RMDIR_START(_path)
#define SWC_FS_SEEK_START(_smartfd, _offset)
#define SWC_FS_LENGTH_START(_path)
std::condition_variable m_cv
#define SWC_FS_CLOSE_FINISH(_error, _smartfd, _tracker)
#define SWC_FS_REMOVE_FINISH(_error, _path, _tracker)
virtual void get_abspath(const std::string &name, std::string &abspath, size_t reserve=0)
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
constexpr SWC_CAN_INLINE size_type size() const noexcept
#define SWC_FS_SYNC_FINISH(_error, _smartfd, _tracker)
size_t length(int &err, const std::string &name) override
#define SWC_FS_REMOVE_START(_path)
Configurables * apply_hadoop(Configurables *config)
void rmdir(int &err, const std::string &name) override
#define SWC_FS_EXISTS_FINISH(_error, _path, _state, _tracker)