6 #ifndef swcdb_app_thriftbroker_AppHandler_h
7 #define swcdb_app_thriftbroker_AppHandler_h
12 #include "swcdb/thrift/gen-cpp/Broker.h"
15 #include <thrift/transport/TSocket.h>
27 namespace thrift = apache::thrift;
30 namespace ThriftBroker {
33 using namespace Thrift;
39 typedef std::shared_ptr<AppHandler>
Ptr;
41 const std::shared_ptr<thrift::transport::TSocket>
socket;
43 AppHandler(
const std::shared_ptr<thrift::transport::TSocket>& a_socket)
45 m_mutex(), m_updaters(), m_processing(0), m_run(true) {
54 for(
size_t updaters = 0;;) {
61 if(!(updaters = m_updaters.size()))
67 m_processing.load(), updaters, n
70 std::this_thread::sleep_for(std::chrono::milliseconds(300));
75 void exec_sql(Result& _return,
const std::string& sql)
override {
89 return sql_mng_column(sql);
92 sql_list_columns(_return.schemas, sql);
97 sql_select(_return.cells, sql);
102 sql_compact_columns(_return.compact, sql);
107 return sql_update(sql, 0);
120 get_schemas(err,
"list", sql, dbschemas);
121 process_results(err, dbschemas, _return);
130 auto func = Comm::Protocol::Mngr::Params::ColumnMng::Function::CREATE;
138 mng_column(func, schema);
142 const std::string& sql)
override {
147 get_schemas(err,
"compact", sql, dbschemas);
148 process_results(err, dbschemas, _return);
158 uint8_t display_flags = 0;
161 hdlr->scan(err, std::move(specs));
170 void sql_query(CellsGroup& _return,
const std::string& sql,
171 const CellsResult::type rslt)
override {
175 case CellsResult::ON_COLUMN : {
176 sql_select_rslt_on_column(_return.ccells, sql);
179 case CellsResult::ON_KEY : {
180 sql_select_rslt_on_key(_return.kcells, sql);
183 case CellsResult::ON_FRACTION : {
184 sql_select_rslt_on_fraction(_return.fcells, sql);
188 sql_select(_return.cells, sql);
194 void sql_select(Cells& _return,
const std::string& sql)
override {
197 auto hdlr = sync_select(sql);
200 process_results(err, hdlr, _return);
208 auto hdlr = sync_select(sql);
211 process_results(err, hdlr, _return);
217 const std::string& sql)
override {
220 auto hdlr = sync_select(sql);
223 process_results(err, hdlr, _return);
229 const std::string& sql)
override {
232 auto hdlr = sync_select(sql);
235 process_results(err, hdlr, _return);
241 const std::string& sql)
override {
244 auto hdlr = sync_select(sql);
247 process_results(err, hdlr, _return);
256 auto hdlr = sync_select(sql);
259 process_results(err, hdlr, _return);
268 auto hdlr = sync_select(sql);
271 process_results(err, hdlr, _return);
278 void sql_update(
const std::string& sql,
const int64_t updater_id)
override {
283 updater(updater_id, hdlr);
289 uint8_t display_flags = 0;
296 hdlr->commit_or_wait();
298 hdlr->commit_if_need();
301 if((err = hdlr->error()))
309 const SpecSchemas& spec)
override {
314 get_schemas(err, spec, dbschemas);
315 process_results(err, dbschemas, _return);
319 const Schema& schema)
override {
331 const SpecSchemas& spec)
override {
336 get_schemas(err, spec, dbschemas);
337 process_results(err, dbschemas, _return);
347 if(spec.__isset.flags)
352 for(
auto& col : spec.columns_plain) {
353 schema = hdlr->clients->get_schema(err, col.cid);
356 if(schema->col_type != DB::Types::Column::PLAIN)
360 for(
auto& intval : col.intervals) {
365 for(
auto& col : spec.columns_counter) {
366 schema = hdlr->clients->get_schema(err, col.cid);
373 for(
auto& intval : col.intervals) {
378 for(
auto& col : spec.columns_serial) {
379 schema = hdlr->clients->get_schema(err, col.cid);
386 for(
auto& intval : col.intervals) {
392 hdlr->scan(err, std::move(specs));
402 const CellsResult::type rslt)
override {
406 case CellsResult::ON_COLUMN : {
407 scan_rslt_on_column(_return.ccells, specs);
410 case CellsResult::ON_KEY : {
411 scan_rslt_on_key(_return.kcells, specs);
414 case CellsResult::ON_FRACTION : {
415 scan_rslt_on_fraction(_return.fcells, specs);
419 scan(_return.cells, specs);
425 void scan(Cells& _return,
const SpecScan& specs)
override {
428 auto hdlr = sync_select(specs);
431 process_results(err, hdlr, _return);
439 auto hdlr = sync_select(specs);
442 process_results(err, hdlr, _return);
450 auto hdlr = sync_select(specs);
453 process_results(err, hdlr, _return);
461 auto hdlr = sync_select(specs);
464 process_results(err, hdlr, _return);
477 for(
auto it = m_updaters.cbegin();
478 it != m_updaters.cend();
479 it = m_updaters.find(++
id)
481 auto& hdlr = m_updaters[
id] =
485 hdlr->buff_sz.store(buffer_size);
488 sizeof(hdlr) +
sizeof(*hdlr.get()));
499 auto it = m_updaters.find(
id);
500 if(it == m_updaters.cend())
503 m_updaters.erase(it);
507 sizeof(hdlr) +
sizeof(*hdlr.get()));
512 const int64_t updater_id)
override {
517 updater(updater_id, hdlr);
522 set_cols(cells, hdlr);
526 hdlr->commit_or_wait();
528 hdlr->commit_if_need();
531 if(
int err = hdlr->error())
537 const int64_t updater_id)
override {
542 updater(updater_id, hdlr);
547 set_cols(cells, hdlr);
551 hdlr->commit_or_wait();
553 hdlr->commit_if_need();
556 if(
int err = hdlr->error())
562 const int64_t updater_id)
override {
567 updater(updater_id, hdlr);
572 set_cols(cells, hdlr);
576 hdlr->commit_or_wait();
578 hdlr->commit_if_need();
581 if(
int err = hdlr->error())
587 const UCCellsCounter& counter,
588 const UCCellsSerial& serial,
589 const int64_t updater_id)
override {
594 updater(updater_id, hdlr);
599 set_cols(plain, hdlr);
600 set_cols(counter, hdlr);
601 set_cols(serial, hdlr);
608 hdlr->commit_or_wait();
610 hdlr->commit_if_need();
613 if(
int err = hdlr->error())
624 for(
size_t idx=0;;++idx) {
627 if(idx >= m_updaters.size())
629 auto it = m_updaters.cbegin();
630 for(
size_t n=0; n < idx; ++it, ++n);
633 if(
size_t sz = hdlr->size_bytes()) {
635 if(release && release <= (released += sz))
644 template <
typename UCCellsT>
649 for(
auto& col_cells : cells) {
650 if(!hdlr->get(col_cells.first)) {
651 auto schema = hdlr->clients->get_schema(err, col_cells.first);
654 hdlr->create(schema);
660 static void set(
const UCCellsPlain& cells,
663 for(
auto& col_cells : cells) {
664 auto col = hdlr->get(col_cells.first);
665 for(
auto& cell : col_cells.second) {
666 dbcell.
flag = uint8_t(cell.f);
671 if(cell.__isset.ts_desc)
680 hdlr->commit_or_wait(col.get());
686 static void set(
const UCCellsCounter& cells,
689 for(
auto& col_cells : cells) {
690 auto col = hdlr->get(col_cells.first);
691 for(
auto& cell : col_cells.second) {
692 dbcell.
flag = uint8_t(cell.f);
697 if(cell.__isset.ts_desc)
703 hdlr->commit_or_wait(col.get());
709 static void set(
const UCCellsSerial& cells,
712 for(
auto& col_cells : cells) {
713 auto col = hdlr->get(col_cells.first);
714 for(
auto& cell : col_cells.second) {
715 dbcell.
flag = uint8_t(cell.f);
720 if(cell.__isset.ts_desc)
731 hdlr->commit_or_wait(col.get());
740 auto it = m_updaters.cbegin();
741 if(it == m_updaters.cend())
744 m_updaters.erase(it);
746 try { updater_close(hdlr); }
catch(...) {}
748 sizeof(hdlr) +
sizeof(*hdlr.get()));
756 auto it = m_updaters.find(
id);
757 if(it == m_updaters.cend())
764 hdlr->commit_if_need();
766 int err = hdlr->error();
771 void get_schemas(
int& err,
const char* cmd,
const std::string& sql,
777 err, clients, sql, dbschemas, params, message, cmd);
784 clients->get_schema(err, params.
patterns, schemas);
787 err,
"problem getting columns schemas on patterns");
791 }
else if(dbschemas.
empty()) {
793 params, 300000, clients, err, dbschemas);
802 bool has_patterns = !spec.patterns.names.empty() ||
803 !spec.patterns.tags.values.empty() ||
807 if(!spec.patterns.names.empty()) {
809 for(
auto& pattern : spec.patterns.names) {
814 if(!spec.patterns.tags.values.empty() ||
816 dbpatterns.
tags.
reserve(spec.patterns.tags.values.size());
819 for(
auto& pattern : spec.patterns.tags.values) {
825 clients->get_schema(err, dbpatterns, dbschemas);
828 err,
"problem getting columns schemas on patterns");
833 for(
auto& cid : spec.cids) {
834 schema = clients->get_schema(err, cid);
839 err,
"problem getting column cid='"+
std::to_string(cid)+
"' schema");
843 for(
auto& name : spec.names) {
844 schema = clients->get_schema(err, name);
849 err,
"problem getting column name='"+name+
"' schema");
853 if(!has_patterns && dbschemas.
empty()) {
856 clients, err, dbschemas
879 _return.resize(dbschemas.
size());
881 for(
auto& dbschema : dbschemas) {
888 CompactResults& _return) {
889 size_t sz = dbschemas.
size();
892 for(
size_t idx = 0; idx < sz; ++idx) {
893 auto& r = _return[idx];
894 r.cid = dbschemas[idx]->cid;
896 r.cid, 300000, clients, r.err);
907 for(
cid_t cid : hdlr->get_cids()) {
909 hdlr->get_cells(cid, cells);
911 schema = clients->get_schema(err, cid);
914 switch(schema->col_type) {
916 auto& rcells = _return.serial_cells;
917 size_t c = rcells.size();
918 rcells.resize(c + cells.size());
919 for(
auto dbcell : cells) {
920 auto& cell = rcells[c++];
921 cell.c = schema->col_name;
922 dbcell->key.convert_to(cell.k);
923 cell.ts = dbcell->get_timestamp();
929 case DB::Types::Column::COUNTER_I64:
930 case DB::Types::Column::COUNTER_I32:
931 case DB::Types::Column::COUNTER_I16:
932 case DB::Types::Column::COUNTER_I8: {
933 auto& rcells = _return.counter_cells;
934 size_t c = rcells.size();
935 rcells.resize(c + cells.size());
936 for(
auto dbcell : cells) {
937 auto& cell = rcells[c++];
938 cell.c = schema->col_name;
939 dbcell->key.convert_to(cell.k);
940 cell.ts = dbcell->get_timestamp();
942 cell.v = dbcell->get_counter(op, cell.eq);
944 cell.__isset.eq =
true;
952 auto& rcells = _return.plain_cells;
953 size_t c = rcells.size();
954 rcells.resize(c + cells.size());
955 for(
auto dbcell : cells) {
956 auto& cell = rcells[c++];
957 cell.c = schema->col_name;
958 dbcell->key.convert_to(cell.k);
959 cell.ts = dbcell->get_timestamp();
960 dbcell->get_value(cell.v);
969 CellsPlain& _return) {
973 for(
cid_t cid : hdlr->get_cids()) {
975 hdlr->get_cells(cid, cells);
976 schema = clients->get_schema(err, cid);
980 size_t c = _return.size();
981 _return.resize(c + cells.size());
982 for(
auto dbcell : cells) {
983 auto& cell = _return[c++];
984 cell.c = schema->col_name;
985 dbcell->key.convert_to(cell.k);
986 cell.ts = dbcell->get_timestamp();
987 dbcell->get_value(cell.v);
994 CellsCounter& _return) {
998 for(
cid_t cid : hdlr->get_cids()) {
1000 hdlr->get_cells(cid, cells);
1001 schema = clients->get_schema(err, cid);
1005 size_t c = _return.size();
1006 _return.resize(c + cells.size());
1007 for(
auto dbcell : cells) {
1008 auto& cell = _return[c++];
1009 cell.c = schema->col_name;
1010 dbcell->key.convert_to(cell.k);
1011 cell.ts = dbcell->get_timestamp();
1013 cell.v = dbcell->get_counter(op, cell.eq);
1015 cell.__isset.eq =
true;
1025 CellsSerial& _return) {
1029 for(
cid_t cid : hdlr->get_cids()) {
1031 hdlr->get_cells(cid, cells);
1032 schema = clients->get_schema(err, cid);
1036 size_t c = _return.size();
1037 _return.resize(c + cells.size());
1038 for(
auto dbcell : cells) {
1039 auto& cell = _return[c++];
1040 cell.c = schema->col_name;
1041 dbcell->key.convert_to(cell.k);
1042 cell.ts = dbcell->get_timestamp();
1056 for(
cid_t cid : hdlr->get_cids()) {
1058 hdlr->get_cells(cid, cells);
1060 schema = clients->get_schema(err, cid);
1063 auto& _col = _return[schema->col_name];
1065 switch(schema->col_type) {
1067 auto& rcells = _col.serial_cells;
1068 size_t c = rcells.size();
1069 rcells.resize(c + cells.size());
1070 for(
auto dbcell : cells) {
1071 auto& cell = rcells[c++];
1072 dbcell->key.convert_to(cell.k);
1073 cell.ts = dbcell->get_timestamp();
1079 case DB::Types::Column::COUNTER_I64:
1080 case DB::Types::Column::COUNTER_I32:
1081 case DB::Types::Column::COUNTER_I16:
1082 case DB::Types::Column::COUNTER_I8: {
1083 auto& rcells = _col.counter_cells;
1084 size_t c = rcells.size();
1085 rcells.resize(c + cells.size());
1086 for(
auto dbcell : cells) {
1087 auto& cell = rcells[c++];
1088 dbcell->key.convert_to(cell.k);
1089 cell.ts = dbcell->get_timestamp();
1091 cell.v = dbcell->get_counter(op, cell.eq);
1093 cell.__isset.eq =
true;
1101 auto& rcells = _col.plain_cells;
1102 size_t c = rcells.size();
1103 rcells.resize(c + cells.size());
1104 for(
auto dbcell : cells) {
1105 auto& cell = rcells[c++];
1106 dbcell->key.convert_to(cell.k);
1107 cell.ts = dbcell->get_timestamp();
1108 dbcell->get_value(cell.v);
1122 for(
cid_t cid : hdlr->get_cids()) {
1124 hdlr->get_cells(cid, cells);
1126 schema = clients->get_schema(err, cid);
1129 switch(schema->col_type) {
1131 for(
auto dbcell : cells) {
1132 auto it = std::find_if(_return.begin(), _return.end(),
1133 [dbcell](
const kCells& key_cells)
1134 {return dbcell->key.equal(key_cells.k);});
1135 if(it == _return.end()) {
1136 _return.emplace_back();
1137 it = _return.end()-1;
1138 dbcell->key.convert_to(it->k);
1141 auto& cell = it->serial_cells.emplace_back();
1142 cell.c = schema->col_name;
1143 cell.ts = dbcell->get_timestamp();
1149 case DB::Types::Column::COUNTER_I64:
1150 case DB::Types::Column::COUNTER_I32:
1151 case DB::Types::Column::COUNTER_I16:
1152 case DB::Types::Column::COUNTER_I8: {
1153 for(
auto dbcell : cells) {
1154 auto it = std::find_if(_return.begin(), _return.end(),
1155 [dbcell](
const kCells& key_cells)
1156 {return dbcell->key.equal(key_cells.k);});
1157 if(it == _return.end()) {
1158 _return.emplace_back();
1159 it = _return.end()-1;
1160 dbcell->key.convert_to(it->k);
1163 auto& cell = it->counter_cells.emplace_back();
1164 cell.c = schema->col_name;
1165 cell.ts = dbcell->get_timestamp();
1167 cell.v = dbcell->get_counter(op, cell.eq);
1169 cell.__isset.eq =
true;
1177 for(
auto dbcell : cells) {
1178 auto it = std::find_if(_return.begin(), _return.end(),
1179 [dbcell](
const kCells& key_cells)
1180 {return dbcell->key.equal(key_cells.k);});
1181 if(it == _return.end()) {
1182 _return.emplace_back();
1183 it = _return.end()-1;
1184 dbcell->key.convert_to(it->k);
1187 auto& cell = it->plain_cells.emplace_back();
1188 cell.c = schema->col_name;
1189 cell.ts = dbcell->get_timestamp();
1190 dbcell->get_value(cell.v);
1206 for(
cid_t cid : hdlr->get_cids()) {
1208 hdlr->get_cells(cid, cells);
1210 schema = clients->get_schema(err, cid);
1213 switch(schema->col_type) {
1215 FCells* fraction_cells;
1216 for(
auto dbcell : cells) {
1217 fraction_cells = &_return;
1219 dbcell->key.convert_to(key);
1221 fraction_cells = &fraction_cells->f[f];
1222 auto& cell = fraction_cells->serial_cells.emplace_back();
1223 cell.c = schema->col_name;
1224 cell.ts = dbcell->get_timestamp();
1230 case DB::Types::Column::COUNTER_I64:
1231 case DB::Types::Column::COUNTER_I32:
1232 case DB::Types::Column::COUNTER_I16:
1233 case DB::Types::Column::COUNTER_I8: {
1234 FCells* fraction_cells;
1235 for(
auto dbcell : cells) {
1236 fraction_cells = &_return;
1238 dbcell->key.convert_to(key);
1240 fraction_cells = &fraction_cells->f[f];
1241 auto& cell = fraction_cells->counter_cells.emplace_back();
1242 cell.c = schema->col_name;
1243 cell.ts = dbcell->get_timestamp();
1245 cell.v = dbcell->get_counter(op, cell.eq);
1247 cell.__isset.eq =
true;
1255 FCells* fraction_cells;
1256 for(
auto dbcell : cells) {
1257 fraction_cells = &_return;
1259 dbcell->key.convert_to(key);
1261 fraction_cells = &fraction_cells->f[f];
1262 auto& cell = fraction_cells->plain_cells.emplace_back();
1263 cell.c = schema->col_name;
1264 cell.ts = dbcell->get_timestamp();
1265 dbcell->get_value(cell.v);
1300 #endif // swcdb_app_thriftbroker_AppHandler_h