10 #include <thrift/server/TThreadPoolServer.h>
11 #include <thrift/concurrency/ThreadManager.h>
12 #include <thrift/concurrency/ThreadFactory.h>
13 #include <thrift/transport/TServerSocket.h>
14 #include <thrift/transport/TSocket.h>
15 #include <thrift/transport/TBufferTransports.h>
16 #include <thrift/transport/TZlibTransport.h>
17 #include <thrift/protocol/TBinaryProtocol.h>
28 namespace ThriftBroker {
39 uint32_t reactors = 1;
40 int workers = settings->get_i32(
"swc.ThriftBroker.workers");
41 uint64_t conns_max = settings->get_i64(
"swc.ThriftBroker.connections.max");
42 uint32_t
timeout_ms = settings->get_i32(
"swc.ThriftBroker.timeout");
43 std::string transport = settings->get_str(
"swc.ThriftBroker.transport");
45 std::shared_ptr<thrift::transport::TTransportFactory> transportFactory;
48 transportFactory.reset(
new thrift::transport::TFramedTransportFactory());
51 transportFactory.reset(
new thrift::transport::TZlibTransportFactory());
55 LOG_FATAL,
"No implementation for transport=%s", transport.c_str());
60 if(settings->has(
"addr"))
61 addrs = settings->get_strs(
"addr");
64 if(settings->has(
"host")) {
65 host = host.append(settings->get_str(
"host"));
68 if(gethostname(hostname,
sizeof(hostname)) == -1)
70 host.append(hostname);
74 settings->get_i16(
"swc.ThriftBroker.port"),
82 "STARTING SERVER: THRIFT-BROKER, reactors=%u workers=%d transport=%s",
83 reactors, workers, transport.c_str());
90 app_ctx->init(host, tmp);
93 for(uint32_t reactor=0; reactor < reactors; ++reactor) {
95 std::shared_ptr<thrift::concurrency::ThreadManager> threadManager(
96 thrift::concurrency::ThreadManager::newSimpleThreadManager(workers));
97 threadManager->threadFactory(
98 std::make_shared<thrift::concurrency::ThreadFactory>());
99 threadManager->start();
101 for(
auto& endpoint : endpoints) {
102 bool is_plain =
true;
103 std::shared_ptr<thrift::transport::TServerSocket> socket;
105 socket.reset(
new thrift::transport::TServerSocket(
106 endpoint.address().to_string(), endpoint.port()));
114 auto protocol = std::make_shared<thrift::protocol::TBinaryProtocolFactory>();
121 auto server = std::make_shared<thrift::server::TThreadPoolServer>(
122 std::make_shared<Thrift::BrokerProcessorFactory>(app_ctx),
128 server->setConcurrentClientLimit(conns_max);
130 servers.push_back(server);
131 threads.emplace_back(
132 new std::thread([app_ctx, is_plain, endpoint, server] {
134 <<
"Listening On: " << endpoint
135 <<
" fd=" << server->getServerTransport()->getSocketFD()
136 <<
' ' << (is_plain ?
"PLAIN" :
"SECURE");
142 <<
"Stopping to Listen On: " << endpoint
143 <<
" fd=" << server->getServerTransport()->getSocketFD()
144 <<
' ' << (is_plain ?
"PLAIN" :
"SECURE");
146 app_ctx->shutting_down(std::error_code(), SIGINT);
154 SWC_SHOULD_NOT_INLINE
155 int exiting(Threads_t& threads, Servers_t& servers) {
156 for(
auto& server : servers) {
158 server->getThreadManager()->stop();
162 for(
auto& th : threads)
171 std::this_thread::sleep_for(std::chrono::seconds(1));
180 return exiting(threads, servers);
191 int main(
int argc,
char** argv) {