SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
FileSystem.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 #ifndef swcdb_fs_HadoopJVM_FileSystem_h
7 #define swcdb_fs_HadoopJVM_FileSystem_h
8 
9 
10 #include "swcdb/fs/FileSystem.h"
11 #include <hdfs.h>
12 #include <condition_variable>
13 
14 
15 namespace SWC { namespace FS {
16 
17 
18 Configurables* apply_hadoop_jvm(Configurables* config);
19 
20 
21 class FileSystemHadoopJVM final : public FileSystem {
22  public:
23 
24  struct Service {
25  typedef std::shared_ptr<Service> Ptr;
26 
27  Service(hdfsFS a_srv) noexcept : srv(a_srv) { }
28 
29  Service(Service&&) = delete;
30  Service(const Service&) = delete;
31  Service& operator=(Service&&) = delete;
32  Service& operator=(const Service&) = delete;
33 
34  ~Service() noexcept { if(srv) hdfsDisconnect(srv); }
35 
36  hdfsFS srv;
37  };
38 
40 
45 
46  virtual ~FileSystemHadoopJVM() noexcept;
47 
48  Type SWC_CONST_FUNC get_type() const noexcept override;
49 
50  std::string to_string() const override;
51 
52  void stop() override;
53 
55 
56  bool initialize(Service::Ptr& fs);
57 
58  Service::Ptr get_fs(int& err);
59 
60  void need_reconnect(int& err, Service::Ptr& fs);
61 
62 
63  bool exists(int& err, const std::string& name) override;
64 
65  void remove(int& err, const std::string& name) override;
66 
67  size_t length(int& err, const std::string& name) override;
68 
69  void mkdirs(int& err, const std::string& name) override;
70 
71  void readdir(int& err, const std::string& name,
72  DirentList& results) override;
73 
74  void rmdir(int& err, const std::string& name) override;
75 
76  void rename(int& err, const std::string& from,
77  const std::string& to) override;
78 
79  void write(int& err, SmartFd::Ptr& smartfd,
80  uint8_t replication, StaticBuffer& buffer) override {
81  default_write(err, smartfd, replication, buffer);
82  }
83 
84  void read(int& err, const std::string& name, StaticBuffer* dst) override {
85  default_read(err, name, dst);
86  }
87 
88  void combi_pread(int& err, SmartFd::Ptr& smartfd,
89  uint64_t offset, uint32_t amount,
90  StaticBuffer* dst) override {
91  default_combi_pread(err, smartfd, offset, amount, dst);
92  }
93 
94  void create(int& err, SmartFd::Ptr& smartfd, uint8_t replication) override;
95 
96  void open(int& err, SmartFd::Ptr& smartfd) override;
97 
98  size_t read(int& err, SmartFd::Ptr& smartfd,
99  void *dst, size_t amount) override;
100 
101  size_t read(int& err, SmartFd::Ptr& smartfd,
102  StaticBuffer* dst, size_t amount) override {
103  return default_read(err, smartfd, dst, amount);
104  }
105 
106  size_t pread(int& err, SmartFd::Ptr& smartfd,
107  uint64_t offset, void *dst, size_t amount) override;
108 
109  size_t pread(int& err, SmartFd::Ptr& smartfd, uint64_t offset,
110  StaticBuffer* dst, size_t amount) override {
111  return default_pread(err, smartfd, offset, dst, amount);
112  }
113 
114  size_t append(int& err, SmartFd::Ptr& smartfd,
115  StaticBuffer& buffer, Flags flags) override;
116 
117  void seek(int& err, SmartFd::Ptr& smartfd, size_t offset) override;
118 
119  void flush(int& err, SmartFd::Ptr& smartfd) override;
120 
121  void sync(int& err, SmartFd::Ptr& smartfd) override;
122 
123  void close(int& err, SmartFd::Ptr& smartfd) override;
124 
125  private:
126 
127  struct SmartFdHadoopJVM final : public SmartFd {
128  public:
129 
130  typedef std::shared_ptr<SmartFdHadoopJVM> Ptr;
131 
132  static Ptr make_ptr(const std::string& filepath, uint32_t flags);
133 
134  static Ptr make_ptr(SmartFd::Ptr& smart_fd);
135 
136  SmartFdHadoopJVM(const std::string& filepath, uint32_t flags,
137  int32_t fd=-1, uint64_t pos=0);
138 
139  virtual ~SmartFdHadoopJVM() noexcept;
140 
141  hdfsFile file() noexcept;
142 
143  void file(const hdfsFile& file) noexcept;
144 
145  void use_release() noexcept;
146 
147  bool file_used() const noexcept;
148 
149  hdfsFile invalidate() noexcept;
150 
151  private:
152 
153  std::atomic<hdfsFile> m_hfile;
154  Core::Atomic<size_t> m_use_count;
155 
156  };
157 
158  SmartFdHadoopJVM::Ptr get_fd(SmartFd::Ptr& smartfd);
159 
160 
161  const Config::Property::Value_int32_g::Ptr cfg_use_delay;
162  const Config::Property::Value_int32_g::Ptr cfg_r_buffer_size;
163  const Config::Property::Value_int32_g::Ptr cfg_w_buffer_size;
164  const Config::Property::Value_int32_g::Ptr cfg_block_size;
165 
166  Core::Atomic<int32_t> m_nxt_fd;
167  std::mutex m_mutex;
168  std::condition_variable m_cv;
171 
172 
173 };
174 
175 
176 }}
177 
178 
179 
180 extern "C" {
182 }
183 
184 #ifdef SWC_IMPL_SOURCE
186 #endif
187 
188 
189 #endif // swcdb_fs_HadoopJVM_FileSystem_h
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::~SmartFdHadoopJVM
virtual ~SmartFdHadoopJVM() noexcept
Definition: FileSystem.cc:78
SWC::FS::FileSystemHadoopJVM::operator=
FileSystemHadoopJVM & operator=(const FileSystemHadoopJVM &)=delete
SWC::FS::FileSystemHadoopJVM::Service
Definition: FileSystem.h:24
SWC::FS::FileSystemHadoopJVM::need_reconnect
void need_reconnect(int &err, Service::Ptr &fs)
Definition: FileSystem.cc:326
SWC::FS::FileSystemHadoopJVM::get_type
Type SWC_CONST_FUNC get_type() const noexcept override
Definition: FileSystem.cc:150
SWC::FS::FileSystemHadoopJVM::FileSystemHadoopJVM
FileSystemHadoopJVM(Configurables *config)
Definition: FileSystem.cc:130
SWC::FS::FileSystemHadoopJVM::Service::operator=
Service & operator=(const Service &)=delete
SWC::FS::FileSystem::Ptr
std::shared_ptr< FileSystem > Ptr
Definition: FileSystem.h:104
SWC::FS::FileSystemHadoopJVM::Service::srv
hdfsFS srv
Definition: FileSystem.h:36
FileSystem.h
SWC::FS::FileSystemHadoopJVM::length
size_t length(int &err, const std::string &name) override
Definition: FileSystem.cc:371
SWC::FS::FileSystemHadoopJVM::~FileSystemHadoopJVM
virtual ~FileSystemHadoopJVM() noexcept
Definition: FileSystem.cc:148
SWC::FS::FileSystem::default_read
void default_read(int &err, const std::string &name, StaticBuffer *dst)
Definition: FileSystem.cc:257
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::m_hfile
std::atomic< hdfsFile > m_hfile
Definition: FileSystem.h:153
SWC::FS::FileSystemHadoopJVM::write
void write(int &err, SmartFd::Ptr &smartfd, uint8_t replication, StaticBuffer &buffer) override
Definition: FileSystem.h:79
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::m_use_count
Core::Atomic< size_t > m_use_count
Definition: FileSystem.h:154
SWC::FS::FileSystemHadoopJVM::sync
void sync(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:700
SWC::FS::Type
Type
Definition: FileSystem.h:20
SWC::FS::FileSystemHadoopJVM::combi_pread
void combi_pread(int &err, SmartFd::Ptr &smartfd, uint64_t offset, uint32_t amount, StaticBuffer *dst) override
Definition: FileSystem.h:88
SWC::FS::SmartFd::pos
constexpr SWC_CAN_INLINE uint64_t pos() const noexcept
Definition: SmartFd.h:108
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::SmartFdHadoopJVM
SmartFdHadoopJVM(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
Definition: FileSystem.cc:73
SWC::FS::FileSystemHadoopJVM::cfg_r_buffer_size
const Config::Property::Value_int32_g::Ptr cfg_r_buffer_size
Definition: FileSystem.h:162
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::file
hdfsFile file() noexcept
Definition: FileSystem.cc:80
SWC::FS::FileSystemHadoopJVM::close
void close(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:721
SWC::FS::Flags
Flags
Definition: FileSystem.h:41
SWC::FS::FileSystem::default_combi_pread
void default_combi_pread(int &err, SmartFd::Ptr &smartfd, uint64_t offset, uint32_t amount, StaticBuffer *dst)
Definition: FileSystem.cc:304
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::invalidate
hdfsFile invalidate() noexcept
Definition: FileSystem.cc:110
SWC::FS::FileSystem
Definition: FileSystem.h:101
SWC::FS::FileSystemHadoopJVM::pread
size_t pread(int &err, SmartFd::Ptr &smartfd, uint64_t offset, void *dst, size_t amount) override
Definition: FileSystem.cc:583
SWC::FS::FileSystemHadoopJVM::Service::~Service
~Service() noexcept
Definition: FileSystem.h:34
SWC::FS::FileSystemHadoopJVM::readdir
void readdir(int &err, const std::string &name, DirentList &results) override
Definition: FileSystem.cc:408
SWC::FS::FileSystemHadoopJVM::exists
bool exists(int &err, const std::string &name) override
Definition: FileSystem.cc:337
SWC::FS::FileSystemHadoopJVM::Service::operator=
Service & operator=(Service &&)=delete
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::use_release
void use_release() noexcept
Definition: FileSystem.cc:102
SWC::FS::Configurables
Definition: FileSystem.h:48
SWC::FS::FileSystemHadoopJVM::cfg_w_buffer_size
const Config::Property::Value_int32_g::Ptr cfg_w_buffer_size
Definition: FileSystem.h:163
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::Ptr
std::shared_ptr< SmartFdHadoopJVM > Ptr
Definition: FileSystem.h:130
SWC::FS::FileSystemHadoopJVM::read
size_t read(int &err, SmartFd::Ptr &smartfd, StaticBuffer *dst, size_t amount) override
Definition: FileSystem.h:101
SWC::FS::FileSystemHadoopJVM::cfg_block_size
const Config::Property::Value_int32_g::Ptr cfg_block_size
Definition: FileSystem.h:164
SWC_CONST_FUNC
#define SWC_CONST_FUNC
Definition: Compat.h:107
SWC::FS::FileSystemHadoopJVM::mkdirs
void mkdirs(int &err, const std::string &name) override
Definition: FileSystem.cc:393
SWC::FS::FileSystemHadoopJVM::open
void open(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:519
fs_make_new_hadoop_jvm
SWC::FS::FileSystem * fs_make_new_hadoop_jvm(SWC::FS::Configurables *config)
Definition: FileSystem.cc:758
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::FS::FileSystemHadoopJVM::cfg_use_delay
const Config::Property::Value_int32_g::Ptr cfg_use_delay
Definition: FileSystem.h:161
SWC::FS::apply_hadoop_jvm
Configurables * apply_hadoop_jvm(Configurables *config)
Definition: FileSystem.cc:13
SWC::FS::FileSystemHadoopJVM::remove
void remove(int &err, const std::string &name) override
Definition: FileSystem.cc:354
SWC::FS::FileSystemHadoopJVM::Service::Service
Service(hdfsFS a_srv) noexcept
Definition: FileSystem.h:27
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::FS::FileSystemHadoopJVM::operator=
FileSystemHadoopJVM & operator=(FileSystemHadoopJVM &&)=delete
SWC::FS::FileSystemHadoopJVM::rename
void rename(int &err, const std::string &from, const std::string &to) override
Definition: FileSystem.cc:463
SWC::FS::FileSystemHadoopJVM::setup_connection
Service::Ptr setup_connection()
Definition: FileSystem.cc:173
SWC::FS::FileSystemHadoopJVM::append
size_t append(int &err, SmartFd::Ptr &smartfd, StaticBuffer &buffer, Flags flags) override
Definition: FileSystem.cc:614
SWC::FS::FileSystemHadoopJVM::read
void read(int &err, const std::string &name, StaticBuffer *dst) override
Definition: FileSystem.h:84
SWC::FS::FileSystemHadoopJVM::stop
void stop() override
Definition: FileSystem.cc:162
SWC::FS::FileSystemHadoopJVM::create
void create(int &err, SmartFd::Ptr &smartfd, uint8_t replication) override
Definition: FileSystem.cc:481
SWC::FS::SmartFd::flags
constexpr SWC_CAN_INLINE uint32_t flags() const noexcept
Definition: SmartFd.h:77
SWC::FS::SmartFd
Smart FileDescriptor.
Definition: SmartFd.h:34
SWC::FS::FileSystemHadoopJVM::m_fs
Service::Ptr m_fs
Definition: FileSystem.h:170
SWC::FS::FileSystemHadoopJVM::pread
size_t pread(int &err, SmartFd::Ptr &smartfd, uint64_t offset, StaticBuffer *dst, size_t amount) override
Definition: FileSystem.h:109
SWC::FS::FileSystemHadoopJVM
Definition: FileSystem.h:21
SWC::Core::Vector< Dirent >
SWC::FS::SmartFd::filepath
constexpr SWC_CAN_INLINE const std::string & filepath() const noexcept
Definition: SmartFd.h:67
SWC::FS::FileSystem::default_write
void default_write(int &err, SmartFd::Ptr &smartfd, uint8_t replication, StaticBuffer &buffer)
Definition: FileSystem.cc:226
SWC::Condition::from
Comp from(const char **buf, uint32_t *remainp, uint8_t extended=0x00) noexcept
Definition: Comparators.cc:15
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC::FS::FileSystemHadoopJVM::Service::Ptr
std::shared_ptr< Service > Ptr
Definition: FileSystem.h:25
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::file_used
bool file_used() const noexcept
Definition: FileSystem.cc:106
SWC::FS::FileSystemHadoopJVM::m_mutex
std::mutex m_mutex
Definition: FileSystem.h:167
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM::make_ptr
static Ptr make_ptr(const std::string &filepath, uint32_t flags)
Definition: FileSystem.cc:60
SWC::FS::FileSystemHadoopJVM::Service::Service
Service(Service &&)=delete
SWC::FS::FileSystemHadoopJVM::m_connecting
bool m_connecting
Definition: FileSystem.h:169
SWC::FS::FileSystemHadoopJVM::m_cv
std::condition_variable m_cv
Definition: FileSystem.h:168
SWC::FS::FileSystemHadoopJVM::flush
void flush(int &err, SmartFd::Ptr &smartfd) override
Definition: FileSystem.cc:679
SWC::FS::FileSystemHadoopJVM::to_string
std::string to_string() const override
Definition: FileSystem.cc:154
flags
uint8_t flags
Flags.
Definition: Header.h:55
SWC::FS::FileSystemHadoopJVM::FileSystemHadoopJVM
FileSystemHadoopJVM(FileSystemHadoopJVM &&)=delete
SWC::FS::SmartFd::fd
constexpr SWC_CAN_INLINE int32_t fd() const noexcept
Definition: SmartFd.h:87
SWC::FS::FileSystemHadoopJVM::get_fs
Service::Ptr get_fs(int &err)
Definition: FileSystem.cc:297
SWC::FS::FileSystemHadoopJVM::FileSystemHadoopJVM
FileSystemHadoopJVM(const FileSystemHadoopJVM &)=delete
SWC::FS::FileSystemHadoopJVM::SmartFdHadoopJVM
Definition: FileSystem.h:127
FileSystem.cc
SWC::FS::FileSystemHadoopJVM::seek
void seek(int &err, SmartFd::Ptr &smartfd, size_t offset) override
Definition: FileSystem.cc:655
SWC::FS::FileSystemHadoopJVM::get_fd
SmartFdHadoopJVM::Ptr get_fd(SmartFd::Ptr &smartfd)
Definition: FileSystem.cc:120
SWC::FS::FileSystem::default_pread
size_t default_pread(int &err, SmartFd::Ptr &smartfd, uint64_t offset, StaticBuffer *dst, size_t amount)
Definition: FileSystem.cc:365
SWC::FS::FileSystemHadoopJVM::m_nxt_fd
Core::Atomic< int32_t > m_nxt_fd
Definition: FileSystem.h:166
SWC::FS::FileSystemHadoopJVM::initialize
bool initialize(Service::Ptr &fs)
Definition: FileSystem.cc:224
SWC::FS::FileSystemHadoopJVM::Service::Service
Service(const Service &)=delete
SWC::FS::FileSystemHadoopJVM::rmdir
void rmdir(int &err, const std::string &name) override
Definition: FileSystem.cc:446