SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
FsBrokerEnv.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 #ifndef swcdb_fsbroker_FsBrokerEnv_h
7 #define swcdb_fsbroker_FsBrokerEnv_h
8 
9 
13 #include <fcntl.h>
14 
15 
16 namespace SWC { namespace FsBroker {
17 
18 class Fds final : private std::unordered_map<int32_t, FS::SmartFd::Ptr> {
19 
20  public:
21 
22  typedef Fds* Ptr;
23 
24  Fds() noexcept : m_mutex(), m_next_fd(0) {}
25 
26  ~Fds() noexcept { }
27 
28  int32_t add(const FS::SmartFd::Ptr& smartfd);
29 
30  FS::SmartFd::Ptr remove(int32_t fd);
31 
33  FS::SmartFd::Ptr select(int32_t fd) noexcept {
35 
36  auto it = find(fd);
37  return it == cend() ? nullptr : it->second;
38  }
39 
42 
43  auto it = cbegin();
44  if(it == cend())
45  return nullptr;
46  FS::SmartFd::Ptr smartfd = std::move(it->second);
47  erase(it);
48  return smartfd;
49  }
50 
51  private:
53  int32_t m_next_fd;
54 };
55 
56 } // namespace FsBroker
57 
58 
59 
60 namespace Env {
61 class FsBroker final {
62  public:
63 
64  static void init() {
65  SWC_ASSERT(!m_env);
66  m_env.reset(new FsBroker());
67  }
68 
70  static SWC::FsBroker::Fds& fds() noexcept {
71  return m_env->m_fds;
72  }
73 
74  static void reset() noexcept {
75  m_env = nullptr;
76  }
77 
80  return m_env->_reporting;
81  }
82 
84  static System::Resources& res() noexcept {
85  return m_env->_resources;
86  }
87 
89  static int64_t in_process() noexcept {
90  return m_env->m_in_process;
91  }
92 
94  static void in_process(int64_t count) noexcept {
95  m_env->m_in_process.fetch_add(count);
96  }
97 
99  static bool can_process() noexcept {
100  return m_env->_can_process();
101  }
102 
103  static void shuttingdown() {
104  return m_env->_shuttingdown();
105  }
106 
107  FsBroker() noexcept
108  : cfg_ram_percent_allowed(100, nullptr),
109  cfg_ram_percent_reserved(0, nullptr),
110  cfg_ram_release_rate(100, nullptr),
111  _reporting(
112  SWC::Env::Config::settings()->get_bool("swc.FsBroker.metrics.enabled")
113  ? new SWC::FsBroker::Metric::Reporting()
114  : nullptr
115  ),
116  _resources(
117  Env::IoCtx::io(),
121  _reporting ? &_reporting->system : nullptr,
122  nullptr
123  ),
124  m_shuttingdown(false), m_in_process(0), m_fds() {
125  }
126 
127  ~FsBroker() noexcept { }
128 
130  bool _can_process() noexcept {
131  if(m_shuttingdown)
132  return false;
134  return true;
135  }
136 
137  void _shuttingdown() {
138  m_shuttingdown.store(true);
139 
140  if(_reporting)
141  _reporting->stop();
142 
143  auto fs = Env::FsInterface::fs();
144  size_t n = 0;
145  do {
146  std::this_thread::sleep_for(std::chrono::milliseconds(200));
147  if(!(++n % 10))
149  "In-process=" SWC_FMT_LD " fs-use-count=" SWC_FMT_LU
150  " check=" SWC_FMT_LU,
151  m_in_process.load(), size_t(fs.use_count()), n);
152  } while(m_in_process || fs.use_count() > 2 + bool(_reporting));
153 
154  int err;
155  for(FS::SmartFd::Ptr fd; (fd = m_fds.pop_next()); ) {
156  if(fd->valid()) {
157  err = Error::OK;
158  if(fd->flags() & O_WRONLY)
159  Env::FsInterface::fs()->sync(err, fd);
160  Env::FsInterface::fs()->close(err, fd);
161  }
162  }
163  }
164 
168 
169  private:
175  inline static std::shared_ptr<FsBroker> m_env = nullptr;
176 
177 };
178 
179 
180 } // namespace Env
181 
182 
183 
184 namespace FsBroker {
185 
187 int32_t Fds::add(const FS::SmartFd::Ptr& smartfd) {
188  int32_t fd;
189  assign_fd: {
191  if(!emplace(++m_next_fd < 1 ? m_next_fd=1 : m_next_fd, smartfd).second)
192  goto assign_fd;
193  for(auto it = cbegin(); it != cend(); ++it) {
194  if(it->first != m_next_fd && it->second->fd() == smartfd->fd()) {
195  erase(it);
196  break;
197  }
198  }
199  fd = m_next_fd;
200  }
201  if(auto& m = Env::FsBroker::metrics_track())
202  m->fds->increment();
203  return fd;
204 }
205 
208  FS::SmartFd::Ptr smartfd;
209  {
211  auto it = find(fd);
212  if(it == cend())
213  return nullptr;
214  smartfd = std::move(it->second);
215  erase(it);
216  }
217  if(auto& m = Env::FsBroker::metrics_track())
218  m->fds->decrement();
219  return smartfd;
220 }
221 
222 
223 }}
224 
225 
226 
228 
229 
230 #endif // swcdb_fsbroker_FsBrokerEnv_h
SWC::FsBroker::Fds::pop_next
FS::SmartFd::Ptr pop_next()
Definition: FsBrokerEnv.h:40
SWC::System::Resources
Definition: Resources.h:50
SWC::FsBroker::Fds::Ptr
Fds * Ptr
Definition: FsBrokerEnv.h:22
SWC::Env::FsBroker::cfg_ram_percent_allowed
SWC::Config::Property::Value_int32_g cfg_ram_percent_allowed
Definition: FsBrokerEnv.h:165
SWC::Core::AtomicBase< bool >
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Env::FsBroker::init
static void init()
Definition: FsBrokerEnv.h:64
MetricsReporting.cc
SWC::Core::Atomic< int64_t >
SWC::FsBroker::Fds::select
SWC_CAN_INLINE FS::SmartFd::Ptr select(int32_t fd) noexcept
Definition: FsBrokerEnv.h:33
SWC::Env::FsBroker::~FsBroker
~FsBroker() noexcept
Definition: FsBrokerEnv.h:127
SWC::Core::MutexSptd::scope
Definition: MutexSptd.h:96
SWC::Env::Config
Definition: Settings.h:103
SWC::Env::FsBroker::_can_process
SWC_CAN_INLINE bool _can_process() noexcept
Definition: FsBrokerEnv.h:130
SWC::Env::FsBroker::reset
static void reset() noexcept
Definition: FsBrokerEnv.h:74
SWC::FsBroker::Fds::remove
FS::SmartFd::Ptr remove(int32_t fd)
Definition: FsBrokerEnv.h:207
SWC::Env::FsBroker::_reporting
SWC::FsBroker::Metric::Reporting::Ptr _reporting
Definition: FsBrokerEnv.h:170
SWC::Env::FsBroker::m_env
static std::shared_ptr< FsBroker > m_env
Definition: FsBrokerEnv.h:175
SWC::Env::FsBroker::_shuttingdown
void _shuttingdown()
Definition: FsBrokerEnv.h:137
SWC::Env::IoCtx
Definition: IoContext.h:114
SWC::Core::AtomicBase::store
constexpr SWC_CAN_INLINE void store(T v) noexcept
Definition: Atomic.h:37
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::Env::FsBroker::m_fds
SWC::FsBroker::Fds m_fds
Definition: FsBrokerEnv.h:174
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Env::FsBroker::metrics_track
static SWC_CAN_INLINE SWC::FsBroker::Metric::Reporting::Ptr & metrics_track() noexcept
Definition: FsBrokerEnv.h:79
SWC::Env::FsBroker::can_process
static SWC_CAN_INLINE bool can_process() noexcept
Definition: FsBrokerEnv.h:99
SWC::Env::FsBroker::cfg_ram_percent_reserved
SWC::Config::Property::Value_int32_g cfg_ram_percent_reserved
Definition: FsBrokerEnv.h:166
SWC::Env::FsBroker::in_process
static SWC_CAN_INLINE int64_t in_process() noexcept
Definition: FsBrokerEnv.h:89
SWC::Env::FsInterface::fs
static SWC_CAN_INLINE FS::FileSystem::Ptr & fs() noexcept
Definition: Interface.h:155
SWC::Env::FsBroker::m_shuttingdown
Core::AtomicBool m_shuttingdown
Definition: FsBrokerEnv.h:172
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::Env::FsBroker::res
static SWC_CAN_INLINE System::Resources & res() noexcept
Definition: FsBrokerEnv.h:84
SWC::FsBroker::Fds::m_next_fd
int32_t m_next_fd
Definition: FsBrokerEnv.h:53
Commands.h
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC::Env::FsBroker::m_in_process
Core::Atomic< int64_t > m_in_process
Definition: FsBrokerEnv.h:173
SWC::Env::FsBroker::in_process
static SWC_CAN_INLINE void in_process(int64_t count) noexcept
Definition: FsBrokerEnv.h:94
SWC::Core::AtomicBase::load
constexpr SWC_CAN_INLINE T load() const noexcept
Definition: Atomic.h:42
SWC::FsBroker::Fds::Fds
Fds() noexcept
Definition: FsBrokerEnv.h:24
SWC::Env::FsBroker::shuttingdown
static void shuttingdown()
Definition: FsBrokerEnv.h:103
SWC::Core::MutexSptd
Definition: MutexSptd.h:16
SWC::LOG_WARN
@ LOG_WARN
Definition: Logger.h:33
SWC::FsBroker::Fds::m_mutex
Core::MutexSptd m_mutex
Definition: FsBrokerEnv.h:52
SWC::Env::FsBroker::fds
static SWC_CAN_INLINE SWC::FsBroker::Fds & fds() noexcept
Definition: FsBrokerEnv.h:70
SWC::FsBroker::Fds::~Fds
~Fds() noexcept
Definition: FsBrokerEnv.h:26
SWC::Env::FsBroker
Definition: FsBrokerEnv.h:61
SWC::FsBroker::Fds
Definition: FsBrokerEnv.h:18
SWC_ASSERT
#define SWC_ASSERT(_e_)
Definition: Exception.h:165
SWC::Env::FsBroker::_resources
System::Resources _resources
Definition: FsBrokerEnv.h:171
SWC_FMT_LD
#define SWC_FMT_LD
Definition: Compat.h:99
SWC::Core::Atomic::fetch_add
constexpr SWC_CAN_INLINE T fetch_add(T v) noexcept
Definition: Atomic.h:93
SWC::FsBroker::Metric::Reporting::Ptr
std::shared_ptr< Reporting > Ptr
Definition: MetricsReporting.h:26
Resources.h
SWC::FsBroker::Fds::add
int32_t add(const FS::SmartFd::Ptr &smartfd)
Definition: FsBrokerEnv.h:187
SWC::Env::FsBroker::FsBroker
FsBroker() noexcept
Definition: FsBrokerEnv.h:107
SWC::Env::FsBroker::cfg_ram_release_rate
SWC::Config::Property::Value_int32_g cfg_ram_release_rate
Definition: FsBrokerEnv.h:167
MetricsReporting.h
SWC::Config::Property::Value_int32_g
Definition: Property.h:586