12 #if defined(SWC_FS_LOCAL_USE_IO_URING)
13 #include <asio/stream_file.hpp>
17 #if defined(__MINGW64__) || defined(_WIN32)
24 ssize_t __pread(
int fd, uint8_t* ptr,
size_t n, off_t offset) noexcept {
25 long unsigned int read_bytes = 0;
26 OVERLAPPED overlapped;
27 memset(&overlapped, 0,
sizeof(OVERLAPPED));
28 overlapped.OffsetHigh = offset >> 32;
29 overlapped.Offset = ((offset << 32) >> 32);
31 if(!ReadFile(HANDLE(_get_osfhandle(fd)), ptr, n, &read_bytes, &overlapped)) {
32 errno = GetLastError();
38 #define SWC_MKDIR(path, perms) ::mkdir(path)
39 #define SWC_FSYNC(fd) ::_commit(fd)
40 #define SWC_PREAD(fd, ptr, nleft, offset) __pread(fd, ptr, nleft, offset)
43 #define SWC_MKDIR(path, perms) ::mkdir(path, perms)
44 #define SWC_FSYNC(fd) ::fsync(fd)
45 #define SWC_PREAD(fd, ptr, nleft, offset) ::pread(fd, ptr, nleft, offset)
50 namespace SWC {
namespace FS {
54 config->
settings->file_desc.add_options()
56 "Local FileSystem's base root path")
61 "Enable or Disable Metrics Tracking")
63 #
if defined(SWC_FS_LOCAL_USE_IO_URING)
65 "Handlers for async filesystem")
69 "Max Open Fds for opt. without closing")
72 config->
settings->get_str(
"swc.fs.local.cfg",
""),
73 "swc.fs.local.cfg.dyn"
77 "swc.fs.local.path.root");
81 "swc.fs.local.metrics.enabled");
90 #if defined(SWC_FS_LOCAL_USE_IO_URING)
94 m_directio(settings->get_bool(
95 "swc.fs.local.DirectIO", false))
96 #if defined(SWC_FS_LOCAL_USE_IO_URING)
98 m_io(new Comm::IoContext(
99 "FsLocal", settings->get_i32(
"swc.fs.local.handlers")))
106 #if defined(SWC_FS_LOCAL_USE_IO_URING)
119 "(type=LOCAL path_root=%s path_data=%s)",
134 bool state = !::stat(abspath.c_str(), &statbuf);
135 err = errno == ENOENT ?
Error::OK : errno;
146 err = ::unlink(abspath.c_str())==-1 && errno != ENOENT ? errno :
Error::OK;
158 size_t len = stat(abspath.c_str(), &statbuf) ? 0 : statbuf.st_size;
167 int _mkdirs(std::string& dirname) {
171 for(
auto it = dirname.begin(); ++it != dirname.cend(); ) {
175 if(stat(dirname.c_str(), &statbuf)) {
196 err = _mkdirs(abspath);
207 #if defined(__MINGW64__) || defined(_WIN32)
208 const std::filesystem::path _dir_path(std::move(abspath));
211 for(
auto const& de : std::filesystem::directory_iterator(_dir_path, ec)) {
212 bool is_dir = de.is_directory(ec);
215 sz = is_dir ? 0 : de.file_size(ec);
219 if(ec.value() == ENOENT) {
227 de.path().filename().string().c_str(), 0, is_dir, sz);
233 std::string full_entry_path;
238 dirp = ::opendir(abspath.c_str());
244 for(
struct dirent* dep; (dep =
::readdir(dirp)); ) {
245 if(!dep->d_name[0] || dep->d_name[0] ==
'.')
247 full_entry_path.clear();
248 full_entry_path.reserve(abspath.length() + 1 + strlen(dep->d_name));
249 full_entry_path.append(abspath);
250 full_entry_path.append(
"/");
251 full_entry_path.append(dep->d_name);
252 if(::stat(full_entry_path.c_str(), &statbuf) == -1) {
253 if(errno == ENOENT) {
261 dep->d_name, statbuf.st_mtime, dep->d_type == DT_DIR, statbuf.st_size);
279 std::filesystem::remove_all(abspath, ec);
285 const std::string& to) {
288 std::string abspath_from;
290 std::string abspath_to;
298 #if defined(SWC_FS_LOCAL_USE_IO_URING)
374 const std::string& name) {
377 struct HandlerReadAll {
381 asio::stream_file sf;
388 : name(a_name), cb(std::move(a_cb)),
389 sf(io->executor()), recved(0),
390 tracker(stats.tracker(
Statistics::READ_ALL_SYNC)) {
395 sf.open(abspath, asio::stream_file::read_only, ec);
398 size_t sz = sf.size(ec);
408 callback(ec.value());
410 void operator()(
const asio::error_code& ec,
size_t bytes) {
411 SWC_PRINT <<
"HandlerReadAll() ec=" << ec <<
" is_open=" << sf.is_open() <<
" bytes=" << bytes <<
" left=" << (buffer->size - bytes) <<
SWC_PRINT_CLOSE;
412 if(!ec && (recved += bytes) < buffer->size) {
414 asio::buffer(buffer->base + bytes, buffer->size - bytes),
418 callback(ec.value());
421 void callback(
int err) {
424 asio::error_code tmp;
432 std::move(*buffer.get())
438 HandlerReadAll(
this, name, std::move(cb), m_io,
statistics);
444 uint8_t replication) {
450 int oflags = O_WRONLY | O_CREAT
461 smartfd->fd(::
open(abspath.c_str(), oflags, 0644));
462 if (!smartfd->valid()) {
464 if(tmperr == EACCES || tmperr == ENOENT)
466 else if (tmperr == EPERM)
473 #if defined(__APPLE__)
475 fcntl(smartfd->fd(), F_NOCACHE, 1);
477 #elif defined(__sun__)
479 directio(smartfd->fd(), DIRECTIO_ON);
491 int oflags = O_RDONLY;
500 smartfd->fd(::
open(abspath.c_str(), oflags));
501 if (!smartfd->valid()) {
503 if(tmperr == EACCES || tmperr == ENOENT)
505 else if (tmperr == EPERM)
515 directio(smartfd->fd(), DIRECTIO_ON);
524 ssize_t _read(
int fd, uint8_t* ptr,
size_t n) noexcept {
526 for(ssize_t nread; nleft; nleft -= nread, ptr += nread) {
527 if ((nread = ::
read(fd, ptr, nleft)) < 0) {
530 else if (errno == EAGAIN)
543 void *dst,
size_t amount) {
548 ssize_t nread = _read(smartfd->fd(),
static_cast<uint8_t*
>(dst), amount);
555 smartfd->forward(ret);
564 ssize_t _pread(
int fd, off_t offset, uint8_t* ptr,
size_t n) noexcept {
566 for(ssize_t nread; nleft; nleft -= nread, ptr += nread, offset += nread) {
567 if((nread =
SWC_PREAD(fd, ptr, nleft, offset)) < 0) {
570 else if(errno == EAGAIN)
583 uint64_t offset,
void* dst,
590 ssize_t nread = _pread(
591 smartfd->fd(), off_t(offset),
static_cast<uint8_t*
>(dst), amount);
598 smartfd->pos(offset + ret);
607 bool _write(
int fd,
const uint8_t* ptr,
size_t nleft) noexcept {
608 for(ssize_t nwritten; nleft; nleft -= nwritten, ptr += nwritten) {
609 if((nwritten = ::
write(fd, ptr, nleft)) <= 0) {
625 if(!_write(smartfd->fd(), buffer.
base, buffer.
size)) {
626 err = errno ? errno : ECANCELED;
628 smartfd->forward(buffer.
size);
645 uint64_t at = ::lseek(smartfd->fd(), offset, SEEK_SET);
647 if (at == uint64_t(-1) || at != offset || err) {
651 smartfd->pos(offset);
672 int32_t fd = smartfd->invalidate();