SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
main.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
9 
10 #include <thrift/server/TThreadPoolServer.h>
11 #include <thrift/concurrency/ThreadManager.h>
12 #include <thrift/concurrency/ThreadFactory.h>
13 #include <thrift/transport/TServerSocket.h>
14 #include <thrift/transport/TSocket.h>
15 #include <thrift/transport/TBufferTransports.h>
16 #include <thrift/transport/TZlibTransport.h>
17 #include <thrift/protocol/TBinaryProtocol.h>
18 
19 
20 namespace SWC {
21 
22 
28 namespace ThriftBroker {
29 
30 namespace {
33 }
34 
35 SWC_SHOULD_NOT_INLINE
36 AppContext::Ptr make_service(Threads_t& threads, Servers_t& servers) {
37  auto settings = Env::Config::settings();
38 
39  uint32_t reactors = 1; // settings->get_i32("swc.ThriftBroker.reactors");
40  int workers = settings->get_i32("swc.ThriftBroker.workers");
41  uint64_t conns_max = settings->get_i64("swc.ThriftBroker.connections.max");
42  uint32_t timeout_ms = settings->get_i32("swc.ThriftBroker.timeout");
43  std::string transport = settings->get_str("swc.ThriftBroker.transport");
44 
45  std::shared_ptr<thrift::transport::TTransportFactory> transportFactory;
46 
47  if(Condition::str_case_eq(transport.data(), "framed", 6)) {
48  transportFactory.reset(new thrift::transport::TFramedTransportFactory());
49 
50  } else if(Condition::str_case_eq(transport.data(), "zlib", 4)) {
51  transportFactory.reset(new thrift::transport::TZlibTransportFactory());
52 
53  } else {
54  SWC_LOGF(
55  LOG_FATAL, "No implementation for transport=%s", transport.c_str());
56  SWC_QUICK_EXIT(EXIT_FAILURE);
57  }
58 
59  Config::Strings addrs;
60  if(settings->has("addr"))
61  addrs = settings->get_strs("addr");
62 
63  std::string host;
64  if(settings->has("host")) {
65  host = host.append(settings->get_str("host"));
66  } else {
67  char hostname[256];
68  if(gethostname(hostname, sizeof(hostname)) == -1)
69  SWC_THROW(errno, "gethostname");
70  host.append(hostname);
71  }
72 
74  settings->get_i16("swc.ThriftBroker.port"),
75  addrs,
76  host,
77  {},
78  true
79  );
80 
82  "STARTING SERVER: THRIFT-BROKER, reactors=%u workers=%d transport=%s",
83  reactors, workers, transport.c_str());
84 
85 
86  AppContext::Ptr app_ctx(new AppContext());
87  {
88  Comm::EndPoints tmp;
89  tmp.push_back(std::cref(endpoints.front()));
90  app_ctx->init(host, tmp); //missing socket->getLocalAddr
91  }
92 
93  for(uint32_t reactor=0; reactor < reactors; ++reactor) {
94 
95  std::shared_ptr<thrift::concurrency::ThreadManager> threadManager(
96  thrift::concurrency::ThreadManager::newSimpleThreadManager(workers));
97  threadManager->threadFactory(
98  std::make_shared<thrift::concurrency::ThreadFactory>());
99  threadManager->start();
100 
101  for(auto& endpoint : endpoints) {
102  bool is_plain = true; // if use_ssl && need ssl.. transportFactory.reset(..)
103  std::shared_ptr<thrift::transport::TServerSocket> socket;
104  if(!reactor) {
105  socket.reset(new thrift::transport::TServerSocket(
106  endpoint.address().to_string(), endpoint.port()));
107  } else {
108  continue;
109  //1st socket->getSocketFD dup >> init socket from fd (per reactor)
110  }
111  socket->setSendTimeout(timeout_ms);
112  socket->setRecvTimeout(timeout_ms);
113 
114  auto protocol = std::make_shared<thrift::protocol::TBinaryProtocolFactory>();
115  /*
116  protocol->setRecurisionLimit(...);
117  set the DEFAULT_RECURSION_LIMIT
118  FCells in sql_select_rslt_on_fraction
119  requirment depends on the length/depth of key-fractions
120  */
121  auto server = std::make_shared<thrift::server::TThreadPoolServer>(
122  std::make_shared<Thrift::BrokerProcessorFactory>(app_ctx),
123  socket,
124  transportFactory,
125  protocol,
126  threadManager
127  );
128  server->setConcurrentClientLimit(conns_max);
129 
130  servers.push_back(server);
131  threads.emplace_back(
132  new std::thread([app_ctx, is_plain, endpoint, server] {
134  << "Listening On: " << endpoint
135  << " fd=" << server->getServerTransport()->getSocketFD()
136  << ' ' << (is_plain ? "PLAIN" : "SECURE");
137  );
138 
139  server->serve();
140 
142  << "Stopping to Listen On: " << endpoint
143  << " fd=" << server->getServerTransport()->getSocketFD()
144  << ' ' << (is_plain ? "PLAIN" : "SECURE");
145  );
146  app_ctx->shutting_down(std::error_code(), SIGINT);
147  })
148  );
149  }
150  }
151  return app_ctx;
152 }
153 
154 SWC_SHOULD_NOT_INLINE
155 int exiting(Threads_t& threads, Servers_t& servers) {
156  for(auto& server : servers) {
157  server->stop();
158  server->getThreadManager()->stop();
159  }
160  servers.clear();
161 
162  for(auto& th : threads)
163  th->join();
164  threads.clear();
165 
166  SWC_LOG(LOG_INFO, "Exit");
167  SWC_CAN_QUICK_EXIT(EXIT_SUCCESS);
168 
170 
171  std::this_thread::sleep_for(std::chrono::seconds(1));
172  return 0;
173 }
174 
175 int run() {
176  SWC_TRY_OR_LOG("",
177  Threads_t threads;
178  Servers_t servers;
179  make_service(threads, servers)->wait_while_run();
180  return exiting(threads, servers);
181  );
182  return 1;
183 }
184 
185 
186 
187 }} //namespace SWC::ThriftBroker
188 
189 
190 
191 int main(int argc, char** argv) {
193  argc,
194  argv,
197  );
198  SWC::Env::Config::settings()->init_process(true, "swc.ThriftBroker.port");
199  return SWC::ThriftBroker::run();
200 }
SWC::ThriftBroker::exiting
SWC_SHOULD_NOT_INLINE int exiting(Threads_t &threads, Servers_t &servers)
Definition: main.cc:155
SWC::Core::Vector::front
constexpr SWC_CAN_INLINE reference front() noexcept
Definition: Vector.h:243
SWC_LOG_OSTREAM
#define SWC_LOG_OSTREAM
Definition: Logger.h:44
main
int main(int argc, char **argv)
Definition: main.cc:68
SWC::Env::Config::reset
static void reset() noexcept
Definition: Settings.h:133
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC_LOG_OUT
#define SWC_LOG_OUT(pr, _code_)
Definition: Logger.h:178
SWC_TRY_OR_LOG
#define SWC_TRY_OR_LOG(_s_, _code_)
Definition: Exception.h:153
SWC::LOG_FATAL
@ LOG_FATAL
Definition: Logger.h:29
SWC::Env::Config::settings
static SWC::Config::Settings::Ptr & settings()
Definition: Settings.h:128
SWC::Config::init_app_options
void init_app_options(Settings *settings)
Definition: load_generator.cc:166
SWC_THROW
#define SWC_THROW(_code_, _msg_)
Definition: Exception.h:134
Settings.h
SWC::LOG_INFO
@ LOG_INFO
Definition: Logger.h:35
SWC::Env::Config::init
static SWC_SHOULD_NOT_INLINE void init(int argc, char **argv, SWC::Config::Settings::init_option_t init_app_options, SWC::Config::Settings::init_option_t init_post_cmd_args)
Definition: Settings.h:110
timeout_ms
uint32_t timeout_ms
Request timeout.
Definition: Header.h:58
SWC_LOG
#define SWC_LOG(priority, message)
Definition: Logger.h:191
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC_CAN_QUICK_EXIT
#define SWC_CAN_QUICK_EXIT(_CODE_)
Definition: Compat.h:191
SWC::ThriftBroker::make_service
SWC_SHOULD_NOT_INLINE AppContext::Ptr make_service(Threads_t &threads, Servers_t &servers)
Definition: main.cc:36
SWC::ThriftBroker::AppContext
Definition: AppContext.h:27
SWC::Core::Vector
Definition: Vector.h:14
SWC_QUICK_EXIT
#define SWC_QUICK_EXIT(_CODE_)
Definition: Compat.h:184
SWC::ThriftBroker::AppContext::Ptr
std::shared_ptr< AppContext > Ptr
Definition: AppContext.h:30
SWC::Config::init_post_cmd_args
void init_post_cmd_args(Settings *settings)
Definition: Settings.h:64
SWC::Comm::Resolver::get_endpoints
EndPoints get_endpoints(uint16_t defaul_port, const Config::Strings &addrs, const std::string &host, const Networks &nets, bool srv=false)
Definition: Resolver.cc:152
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::Condition::str_case_eq
bool str_case_eq(const char *s1, const char *s2, size_t count) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: Comparators_basic.h:257
SWC::ThriftBroker::run
int run()
Definition: main.cc:175
AppContext.h