SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
TSV.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #ifndef swcdb_common_Files_TSV_h
8 #define swcdb_common_Files_TSV_h
9 
10 
12 #include "swcdb/db/Cells/Cell.h"
13 
14 
15 namespace SWC { namespace DB { namespace Cells {
16 
17 
19 namespace TSV {
20 
21 
22 class FileWriter {
23  public:
26  int err;
27  std::string base_path;
28  std::string file_ext;
29 
30  uint64_t split_size;
31  uint8_t output_flags;
32  size_t cells_count;
33  size_t cells_bytes;
34  size_t file_num;
35 
36  FileWriter(const client::Clients::Ptr& a_clients)
37  : clients(a_clients), interface(nullptr),
38  err(Error::OK), base_path(), file_ext(".tsv"),
39  split_size(1073741824),
41  file_num(0), smartfd(nullptr), fds(),
42  cells(), has_encoder(false), flush_vol(0),
43  schemas(), _stream(nullptr) {
44  }
45 
46  virtual ~FileWriter() noexcept { }
47 
48  int set_extension(const std::string& ext, int level) {
49  if(ext.empty()) {
50  _stream.reset(new Core::BufferStreamOut());
51  return _stream->error;
52  }
53  file_ext.reserve(1 + ext.length());
54  file_ext.append(".");
55  file_ext.append(ext);
56 
57  if(ext.length() == 3) {
58  if(Condition::str_eq("zst", ext.c_str(), ext.length())) {
59  _stream.reset(new Core::BufferStreamOut_ZSTD(level));
60  return _stream->error;
61  }
62  }
64  }
65 
66  void initialize() {
67  if(base_path.back() != '/')
68  base_path.append("/");
69  interface->get_fs()->mkdirs(err, base_path);
70  }
71 
72  void finalize() {
73  for(auto& s : schemas) {
74  if(!_stream->empty()) {
75  write();
76  } else if(!file_num) { // header-only-file
77  roll_file();
78  if(!err) {
79  write_header(s.second->col_type);
80  if(!err)
81  write();
82  }
83  }
84  if(err)
85  break;
86  }
87  close();
88  }
89 
90  void write(
92  for(cid_t cid : hdlr->get_cids()) {
93  schemas.emplace(cid, clients->get_schema(err, cid));
94  if(err)
95  break;
96  }
97 
98  Types::Column col_type;
99  do {
100  for(cid_t cid : hdlr->get_cids()) {
101  col_type = schemas[cid]->col_type;
102 
103  cells.free();
104  hdlr->get_cells(cid, cells);
105 
106  cells_count += cells.size();
108 
109  bool need_roll;
110  for(auto cell : cells) {
111  need_roll = !smartfd ||
112  smartfd->pos() + _stream->available() > split_size;
113  if(need_roll) {
114  if(!_stream->empty()) {
115  write();
116  if(err)
117  break;
118  }
119  roll_file();
120  if(err)
121  break;
122  write_header(col_type);
123  }
124 
125  write(*cell, col_type);
126  if(_stream->full()) {
127  write();
128  if(err)
129  break;
130  }
131  }
132  }
133  } while(!cells.empty() && !err);
134  }
135 
137  std::string header;
138 
140  header.append("TIMESTAMP\t");
141 
142  header.append("FLEN\tKEY\tFLAG\t");
143 
145  header.append(Types::is_counter(typ)
146  ? "COUNT\tEQ\tSINCE"
147  : "ORDER\tVLEN\tVALUE");
148 
149  if(has_encoder)
150  header.append("\tENCODER");
151 
152  header.append("\n");
153  _stream->add(reinterpret_cast<const uint8_t*>(header.c_str()), header.size());
154  err = _stream->error;
155  }
156 
157  void write(const Cell &cell, Types::Column typ) {
158  DynamicBuffer cell_buff(8096);
159 
161  cell_buff.add(std::to_string(cell.get_timestamp()));
162  cell_buff.add('\t'); //
163  }
164 
165  uint32_t len = 0;
166  const uint8_t* ptr = cell.key.data;
167  for(uint32_t n=1; n<=cell.key.count; ++n, ptr+=len) {
168  cell_buff.add(std::to_string((len = Serialization::decode_vi32(&ptr))));
169  if(n < cell.key.count)
170  cell_buff.add(',');
171  }
172  cell_buff.add('\t'); //
173 
174  ptr = cell.key.data;
175  for(uint32_t n=1; n<=cell.key.count; ++n, ptr+=len) {
176  if((len = Serialization::decode_vi32(&ptr)))
177  cell_buff.add(ptr, len);
178  if(n < cell.key.count)
179  cell_buff.add(',');
180  }
181  cell_buff.add('\t'); //
182 
183  cell_buff.add(DB::Cells::to_string(DB::Cells::Flag(cell.flag)));
184 
186  cell.flag == Flag::DELETE_LE || cell.flag == Flag::DELETE_EQ) {
187  cell_buff.add('\n');
188  return;
189  }
190  cell_buff.add('\t'); //
191 
192  if(Types::is_counter(typ)) {
193  uint8_t op;
194  int64_t eq_rev = TIMESTAMP_NULL;
195  int64_t i = cell.get_counter(op, eq_rev);
196  std::string v(i > 0 ? "+" : "");
197  v.append(std::to_string(i));
198  cell_buff.add(v);
199 
200  if(op & OP_EQUAL) {
201  cell_buff.add('\t'); //
202  cell_buff.add('=');
203  if(eq_rev != TIMESTAMP_NULL) {
204  cell_buff.add('\t'); //
205  cell_buff.add(std::to_string(eq_rev));
206  }
207  }
208  } else {
209 
210  cell_buff.add(cell.is_time_order_desc() ? 'D' : 'A');
211  cell_buff.add('\t'); //
212 
214  StaticBuffer v;
215  cell.get_value(v, false);
216  cell_buff.add(std::to_string(v.size));
217  cell_buff.add('\t');
218  cell_buff.add(v.base, v.size);
219  } else {
220  cell_buff.add(std::to_string(cell.vlen));
221  cell_buff.add('\t');
222  cell_buff.add(cell.value, cell.vlen);
223  if(cell.have_encoder()) {
224  has_encoder = true;
225  cell_buff.add(std::string("\t1"));
226  }
227  }
228  }
229 
230  cell_buff.add('\n');
231  _stream->add(cell_buff.base, cell_buff.fill());
232  err = _stream->error;
233  }
234 
235  void roll_file() {
236  close();
237 
238  std::string filepath;
239  auto n_str = std::to_string(++file_num);
240  filepath.reserve(base_path.length() + n_str.length() + file_ext.length());
241  filepath.append(base_path);
242  filepath.append(n_str);
243  filepath.append(file_ext);
245  new FS::SmartFd(
247  while(interface->create(err, smartfd, 0));
248  if(!err)
249  flush_vol = 0;
250  }
251 
252  void write() {
253  if((flush_vol += _stream->available()) > 1073741824)
254  flush_vol = 0;
255  StaticBuffer buff_write;
256  _stream->get(buff_write);
257  err = _stream->error;
258  if(err)
259  return;
260  interface->get_fs()->append(
261  err, smartfd, buff_write,
263  );
264  }
265 
267  for(auto fd : fds) {
268  fd->pos(interface->get_fs()->length(err, fd->filepath()));
269  files.push_back(fd);
270  if(err)
271  break;
272  }
273  }
274 
275  private:
276 
277  void close() {
278  if(smartfd && smartfd->valid()) {
279  interface->get_fs()->flush(err, smartfd);
280  interface->close(err, smartfd);
281  }
282  }
283 
288  size_t flush_vol;
289  std::unordered_map<cid_t, DB::Schema::Ptr> schemas;
290  std::unique_ptr<Core::BufferStreamOut> _stream;
291 
292 };
293 
294 
295 
296 class FileReader {
297  public:
300  int err;
302  std::string base_path;
303  std::string message;
305  size_t cells_count;
306  size_t resend_cells;
307  size_t cells_bytes;
308 
310  const client::Clients::Flag flag)
311  : hdlr(
312  client::Query::Update::Handlers::Common::make(
313  clients, nullptr, nullptr, flag)),
314  interface(nullptr),
315  err(Error::CANCELLED), cid(DB::Schema::NO_CID),
316  base_path(), message(), schema(nullptr),
318  fds() {
319  }
320 
321  ~FileReader() noexcept {
322  base_path.clear();
323  base_path.shrink_to_fit();
324  }
325 
326  void initialize() {
327  err = Error::OK;
328  if(base_path.back() != '/')
329  base_path.append("/");
330 
331  FS::DirentList entries;
332  interface->get_fs()->readdir(err, base_path, entries);
333  if(err)
334  return;
335 
337  files.reserve(entries.size());
338  for(auto& entry : entries) {
339  if(entry.is_dir)
340  continue;
341  for(auto it = files.cbegin(); ; ++it) {
342  if(it == files.cend() ||
344  reinterpret_cast<const uint8_t*>((*it)->name.c_str()),
345  (*it)->name.size(),
346  reinterpret_cast<const uint8_t*>(entry.name.c_str()),
347  entry.name.size() ) ) {
348  files.insert(it, &entry);
349  break;
350  }
351  }
352  }
353  fds.reserve(files.size());
354  for(auto file : files) {
355  std::string p;
356  p.reserve(base_path.size() + file->name.size());
357  p.append(base_path);
358  p.append(file->name);
359  fds.emplace_back(new FS::SmartFd(std::move(p), 0));
360  }
361  if(fds.empty()) {
362  err = ENOENT;
363  } else {
364  schema = hdlr->clients->get_schema(err, cid);
365  }
366  }
367 
368  void read_and_load() {
369  if(err)
370  return;
371 
372  hdlr->create(schema);
373 
374  for(auto& fd : fds) {
375  read(fd);
376  if(err)
377  break;
378  }
379 
380  hdlr->commit_if_need();
381  hdlr->wait();
382  resend_cells += hdlr->get_resend_count();
383  }
384 
385  void read(FS::SmartFd::Ptr& smartfd) {
386  size_t length = interface->get_fs()->length(err, smartfd->filepath());
387  if(err)
388  return;
389 
390  auto colp = hdlr->get_base_ptr(cid);
391 
392  std::unique_ptr<Core::BufferStreamIn> instream;
393  const std::string& path = smartfd->filepath();
395  "zst", path.c_str()+(path.length()-3), path.length()-3)) {
396  instream.reset(new Core::BufferStreamIn_ZSTD());
397  } else {
398  instream.reset(new Core::BufferStreamIn());
399  }
400  err = instream ? instream->error : Error::BAD_MEMORY_ALLOCATION;
401  if(err)
402  return;
403 
404  size_t offset = 0;
405  size_t r_sz;
406  StaticBuffer buffer;
407  bool ok;
408  size_t cell_pos = 0;
409  size_t cell_mark = 0;
411  bool has_ts = false;
412  StaticBuffer buffer_read;
413 
414  auto current_bytes = cells_bytes;
415  do {
416  err = Error::OK;
417  if(!smartfd->valid() && !interface->open(err, smartfd) && err)
418  break;
419  if(err)
420  continue;
421 
422  r_sz = length - offset > 1048576 ? 1048576 : length - offset;
423  for(;;) {
424  buffer.free();
425  err = Error::OK;
426 
427  if(interface->get_fs()->pread(
428  err, smartfd, offset, &buffer, r_sz) != r_sz) {
429  int tmperr = Error::OK;
430  interface->close(tmperr, smartfd);
431  if(err != Error::FS_EOF){
432  if(!interface->open(err, smartfd))
433  break;
434  continue;
435  }
436  }
437  break;
438  }
439  if(err)
440  break;
441  offset += r_sz;
442  instream->add(buffer);
443 
444  more_available:
445  if(!instream->get(buffer_read)) {
446  if(offset < length)
447  continue;
448  break;
449  }
450 
451  const uint8_t* ptr = buffer_read.base;
452  size_t remain = buffer_read.size;
453 
454  if(header.empty() &&
455  !header_read(&ptr, &remain, schema->col_type, has_ts, header)) {
456  message.append("TSV file '");
457  message.append(smartfd->filepath());
458  message.append("' missing ");
459  message.append(
460  header.empty() ? "columns defintion" : "value columns");
461  message.append(" in header\n");
462  break;
463  }
464 
465  try {
466  while(remain) {
467  DB::Cells::Cell cell;
468  cell_mark = remain;
469  ok = read(&ptr, &remain, has_ts, schema->col_type, cell);
470  cell_pos += cell_mark-remain;
471  if(ok) {
472  colp->add(cell);
473  ++cells_count;
474  cells_bytes += cell.encoded_length();
475  if(cells_bytes - current_bytes >= hdlr->buff_sz) {
476  hdlr->commit_or_wait(colp);
477  current_bytes = cells_bytes;
478  }
479  } else {
480  instream->put_back(ptr, remain);
481  break;
482  }
483  }
484  if(!remain)
485  goto more_available;
486  } catch(...) {
488  message.append(e.message());
489  message.append(", corrupted '");
490  message.append(smartfd->filepath());
491  message.append("' starting at-offset=");
492  message.append(std::to_string(cell_pos));
493  message.append("\n");
494  offset = length;
495  break;
496  }
497  buffer_read.free();
498  resend_cells += hdlr->get_resend_count();
499  } while(offset < length);
500 
501  if(smartfd->valid())
502  interface->close(err, smartfd);
503 
504  if(!instream->empty()) {
505  message.append("early file end");
506  message.append(", corrupted '");
507  message.append(smartfd->filepath());
508  message.append("' starting at-offset=");
509  message.append(std::to_string(cell_pos));
510  message.append("\n");
511  }
512  if(!message.empty())
514  }
515 
516  bool header_read(const uint8_t** bufp, size_t* remainp, Types::Column, //typ
517  bool& has_ts, Core::Vector<std::string>& header) {
518  const uint8_t* ptr = *bufp;
519  size_t remain = *remainp;
520 
521  const uint8_t* s = *bufp;
522  while(remain && *ptr != '\n') {
523  ++ptr;
524  --remain;
525  if(*ptr == '\t' || *ptr == '\n') {
526  header.emplace_back(reinterpret_cast<const char*>(s), ptr-s);
527  s = ptr+1;
528  }
529  }
530 
531  if(header.empty() || !remain || *ptr != '\n')
532  return false;
533 
534  has_ts = Condition::str_case_eq(
535  header.front().c_str(), "timestamp", 9);
536  bool has_encoder = Condition::str_case_eq(
537  header.back().c_str(), "encoder", 7);
538  if(header.size() < size_t(6 + has_ts + has_encoder))
539  return false;
540 
541  ++ptr; // header's newline
542  --remain;
543 
544  *bufp = ptr;
545  *remainp = remain;
546  return true;
547  }
548 
549  bool read(const uint8_t** bufp, size_t* remainp, bool has_ts,
550  Types::Column typ, Cell &cell) {
551  const uint8_t* ptr = *bufp;
552  size_t remain = *remainp;
553  const uint8_t* s = ptr;
554 
555  if(has_ts) {
556  while(remain && *ptr != '\t') {
557  --remain;
558  ++ptr;
559  }
560  if(!remain)
561  return false;
562  cell.set_timestamp(
563  std::stoll(std::string(reinterpret_cast<const char*>(s), ptr-s)));
564  s = ++ptr; // tab
565  --remain;
566  }
567 
569  while(remain) {
570  if(*ptr == ',' || *ptr == '\t') {
571  flen.push_back(
572  std::stoul(std::string(reinterpret_cast<const char*>(s), ptr-s)));
573  if(*ptr == '\t')
574  break;
575  if(!--remain)
576  return false;
577  s = ++ptr; // comma
578  }
579  ++ptr;
580  --remain;
581  }
582  if(!remain)
583  return false;
584 
585  s = ++ptr; // tab
586  --remain;
587  for(auto len : flen) {
588  if(remain <= len+1)
589  return false;
590  cell.key.add(ptr, len);
591  ptr += len+1;
592  remain -= (len+1);
593  };
594  if(!remain)
595  return false;
596 
597  s = ptr;
598  while(remain && (*ptr != '\t' && *ptr != '\n')) {
599  --remain;
600  ++ptr;
601  }
602  if(!remain)
603  return false;
604  if((cell.flag = DB::Cells::flag_from(s, ptr-s)) == Flag::NONE)
605  throw std::runtime_error("Bad cell Flag");
606 
607  if(cell.flag == Flag::DELETE_LE || cell.flag == Flag::DELETE_EQ) {
608  if(*ptr == '\t')
609  throw std::runtime_error("Expected end of line");
610  goto cell_filled;
611  }
612  if(*ptr == '\n')
613  throw std::runtime_error("Expected a tab");
614 
615  s = ++ptr; // tab
616  --remain;
617  while(remain && (*ptr != '\t' && *ptr != '\n')) {
618  --remain;
619  ++ptr;
620  }
621  if(!remain)
622  return false;
623 
624  if(Types::is_counter(typ)) {
625  int64_t counter = std::stoll(
626  std::string(reinterpret_cast<const char*>(s), ptr-s));
627  int64_t eq_rev = TIMESTAMP_NULL;
628  uint8_t op = 0;
629  if(*ptr == '\t') {
630  if(!--remain)
631  return false;
632  ++ptr; // tab
633  if(*ptr != '=')
634  throw std::runtime_error("Expected EQ symbol");
635  op = OP_EQUAL;
636  if(!--remain)
637  return false;
638  ++ptr;
639 
640  if(*ptr == '\t') {
641  if(!--remain)
642  return false;
643  s = ++ptr; // tab
644  while(remain && *ptr != '\n') {
645  --remain;
646  ++ptr;
647  }
648  if(!remain)
649  return false;
650  eq_rev = std::stoll(
651  std::string(reinterpret_cast<const char*>(s), ptr-s));
652  }
653  }
654  cell.set_counter(op, counter, typ, eq_rev);
655 
656  } else {
657 
658  cell.set_time_order_desc(*s == 'D' || *s == 'd');
659  if(!--remain)
660  return false;
661  s = ++ptr; // tab
662  while(remain && *ptr != '\t') {
663  --remain;
664  ++ptr;
665  }
666  if(!remain)
667  return false;
668 
669  cell.vlen = std::stoul(
670  std::string(reinterpret_cast<const char*>(s), ptr-s));
671  if(--remain < cell.vlen+1)
672  return false;
673  ++ptr; // tab
674  if(cell.vlen)
675  cell.value = const_cast<uint8_t*>(ptr);
676  ptr += cell.vlen;
677  remain -= cell.vlen;
678 
679  if(remain && *ptr == '\t') {
680  --remain;
681  ++ptr; // tab
682  if(!remain)
683  return false;
684  if(*ptr == '1') {
685  cell.control |= HAVE_ENCODER;
686  --remain;
687  ++ptr;
688  }
689  }
690  }
691 
692  cell_filled:
693  if(!remain || *ptr != '\n')
694  return false;
695 
696  *bufp = ++ptr; // newline
697  *remainp = --remain;
698 
699  return true;
700  }
701 
702  private:
704 
705 };
706 
707 
708 
709 }}}} // namespace SWC::DB::Cells::TSV
710 
711 
712 #endif // swcdb_common_Files_TSV_h
SWC::DB::Cells::TSV::FileReader::hdlr
client::Query::Update::Handlers::Common::Ptr hdlr
Definition: TSV.h:298
SWC::DB::Cells::Cell::set_counter
void set_counter(uint8_t op, int64_t v, Types::Column typ=Types::Column::COUNTER_I64, int64_t rev=TIMESTAMP_NULL)
Definition: Cell.h:450
SWC::Core::Vector::front
constexpr SWC_CAN_INLINE reference front() noexcept
Definition: Vector.h:243
SWC::DB::Cells::TSV::FileReader::message
std::string message
Definition: TSV.h:303
SWC::DB::Cells::TSV::FileWriter::~FileWriter
virtual ~FileWriter() noexcept
Definition: TSV.h:46
SWC::DB::Cells::TSV::FileWriter::clients
client::Clients::Ptr clients
Definition: TSV.h:24
SWC::DB::Cells::TSV::FileReader::FileReader
FileReader(const client::Clients::Ptr &clients, const client::Clients::Flag flag)
Definition: TSV.h:309
Cell.h
SWC::Error::SQL_BAD_LOAD_FILE_FORMAT
@ SQL_BAD_LOAD_FILE_FORMAT
Definition: Error.h:123
SWC::DB::Cells::DELETE_EQ
@ DELETE_EQ
Definition: Cell.h:64
SWC::Core::BufferDyn::add
value_type * add(const value_type *data, size_t len)
Definition: Buffer.h:249
SWC::DB::NO_VALUE
@ NO_VALUE
Definition: Cell.h:52
SWC::DB::Cells::TSV::FileWriter::write
void write(const Cell &cell, Types::Column typ)
Definition: TSV.h:157
SWC::DB::Cells::TSV::FileReader::read_and_load
void read_and_load()
Definition: TSV.h:368
SWC::DB::Schema::Ptr
std::shared_ptr< Schema > Ptr
Definition: Schema.h:185
SWC::DB::Cells::Flag
Flag
Definition: Cell.h:60
SWC::DB::Cells::TSV::FileReader::header_read
bool header_read(const uint8_t **bufp, size_t *remainp, Types::Column, bool &has_ts, Core::Vector< std::string > &header)
Definition: TSV.h:516
SWC::DB::Cells::to_string
const char *SWC_CONST_FUNC to_string(Flag flag) noexcept
Definition: Cell.cc:17
SWC::DB::Cells::TSV::FileWriter::interface
FS::Interface::Ptr interface
Definition: TSV.h:25
SWC::client::Query::Select::Handlers::BaseUnorderedMap::Ptr
std::shared_ptr< BaseUnorderedMap > Ptr
Definition: BaseUnorderedMap.h:24
SWC::DB::Cells::TSV::FileReader::cells_count
size_t cells_count
Definition: TSV.h:305
SWC::client::Clients::Ptr
ClientsPtr Ptr
Definition: Clients.h:58
SWC::DB::NO_ENCODE
@ NO_ENCODE
Definition: Cell.h:53
SWC::Condition::lt_volume
SWC_CAN_INLINE bool lt_volume(const uint8_t *p1, uint32_t p1_len, const uint8_t *p2, uint32_t p2_len) noexcept
Definition: Comparators.h:222
SWC::DB::Types::Column
Column
Definition: Column.h:18
SWC::DB::Cells::DELETE_LE
@ DELETE_LE
Definition: Cell.h:63
SWC::DB::Cells::TSV::FileReader
Definition: TSV.h:296
SWC::DB::Cells::TSV::FileWriter::get_length
void get_length(Core::Vector< FS::SmartFd::Ptr > &files)
Definition: TSV.h:266
SWC::DB::Cells::TSV::FileReader::read
bool read(const uint8_t **bufp, size_t *remainp, bool has_ts, Types::Column typ, Cell &cell)
Definition: TSV.h:549
SWC::DB::Cells::TSV::FileWriter::flush_vol
size_t flush_vol
Definition: TSV.h:288
SWC::Core::BufferStreamIn
Definition: BufferStream.h:151
SWC::DB::Cells::TSV::FileWriter::initialize
void initialize()
Definition: TSV.h:66
SWC::DB::Cells::Cell
Definition: Cell.h:92
SWC::DB::Cells::TSV::FileWriter::close
void close()
Definition: TSV.h:277
SWC::DB::Cells::Cell::key
DB::Cell::Key key
Definition: Cell.h:357
SWC::DB::Cells::TSV::FileWriter::roll_file
void roll_file()
Definition: TSV.h:235
SWC::DB::Cells::TSV::FileWriter::has_encoder
bool has_encoder
Definition: TSV.h:287
SWC::DB::Cells::Result
Definition: Result.h:16
SWC::DB::Cell::Key::add
SWC_CAN_INLINE void add(const std::string_view &fraction)
Definition: CellKey.h:86
BufferStream.h
SWC::DB::Cells::TSV::FileWriter::split_size
uint64_t split_size
Definition: TSV.h:30
SWC::DB::Cells::TSV::FileReader::resend_cells
size_t resend_cells
Definition: TSV.h:306
SWC::DB::Schema
Definition: Schema.h:182
SWC::DB::Cell::Key::data
uint8_t * data
Definition: CellKey.h:257
SWC::Condition::str_eq
bool str_eq(const char *s1, const char *s2) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: Comparators_basic.h:237
SWC::DB::Cells::TSV::FileReader::fds
Core::Vector< FS::SmartFd::Ptr > fds
Definition: TSV.h:703
SWC::DB::Cells::TSV::FileWriter::cells
DB::Cells::Result cells
Definition: TSV.h:286
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC::Core::Vector::empty
constexpr SWC_CAN_INLINE bool empty() const noexcept
Definition: Vector.h:168
SWC::Error::FS_EOF
@ FS_EOF
Definition: Error.h:96
SWC::DB::Cells::Cell::is_time_order_desc
constexpr SWC_CAN_INLINE bool is_time_order_desc() const noexcept
Definition: Cell.h:177
SWC::Core::Vector::back
constexpr SWC_CAN_INLINE reference back() noexcept
Definition: Vector.h:254
SWC::DB::Cells::TSV::FileWriter::base_path
std::string base_path
Definition: TSV.h:27
SWC::Core::BufferStreamOut_ZSTD
Definition: BufferStream.h:100
SWC::DB::Cells::TSV::FileWriter::file_ext
std::string file_ext
Definition: TSV.h:28
SWC::DB::Cells::TSV::FileReader::cells_bytes
size_t cells_bytes
Definition: TSV.h:307
SWC_CURRENT_EXCEPTION
#define SWC_CURRENT_EXCEPTION(_msg_)
Definition: Exception.h:119
SWC::DB::Cells::TSV::FileWriter::file_num
size_t file_num
Definition: TSV.h:34
SWC::Core::BufferStreamOut
Definition: BufferStream.h:22
SWC::Core::Buffer::base
value_type * base
Definition: Buffer.h:131
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::FS::OPEN_FLAG_OVERWRITE
@ OPEN_FLAG_OVERWRITE
Definition: FileSystem.h:35
SWC::DB::Cells::Cell::have_encoder
constexpr SWC_CAN_INLINE bool have_encoder() const noexcept
Definition: Cell.h:334
SWC::DB::Types::is_counter
bool SWC_CONST_FUNC is_counter(const Column typ) noexcept
Definition: Column.cc:26
SWC::Core::BufferDyn< StaticBuffer >
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::DB::Cells::HAVE_ENCODER
constexpr const uint8_t HAVE_ENCODER
Definition: Cell.h:78
SWC::Core::Buffer::size
size_t size
Definition: Buffer.h:130
SWC::client::Clients::Flag
Flag
Definition: Clients.h:60
SWC::DB::Cells::TSV::FileWriter::write
void write()
Definition: TSV.h:252
SWC::DB::Cells::Cell::get_value
Types::Encoder get_value(StaticBuffer &v, bool owner) const
Definition: Cell.cc:77
SWC::DB::Cells::TSV::FileReader::cid
cid_t cid
Definition: TSV.h:301
SWC::DB::Cells::Cell::value
uint8_t * value
Definition: Cell.h:362
SWC::DB::Cells::TSV::FileReader::err
int err
Definition: TSV.h:300
SWC::DB::Cells::TSV::FileWriter
Definition: TSV.h:22
SWC::Comm::Protocol::FsBroker::Handler::length
void length(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Length.h:17
SWC::DB::Cells::Result::free
void free() noexcept
Definition: Result.h:56
SWC::DB::Cells::Cell::control
uint8_t control
Definition: Cell.h:360
SWC::Core::BufferStreamIn_ZSTD
Definition: BufferStream.h:183
SWC::DB::Cells::Cell::set_time_order_desc
constexpr SWC_CAN_INLINE void set_time_order_desc(bool desc) noexcept
Definition: Cell.h:169
SWC::DB::Cells::TSV::FileWriter::set_extension
int set_extension(const std::string &ext, int level)
Definition: TSV.h:48
SWC::DB::Cells::TIMESTAMP_NULL
constexpr const int64_t TIMESTAMP_NULL
Definition: Cell.h:72
SWC::Core::Buffer::free
SWC_CAN_INLINE void free() noexcept
Definition: Buffer.h:85
SWC::Condition::NONE
@ NONE
Definition: Comparators.h:28
SWC::cid_t
uint64_t cid_t
Definition: Identifiers.h:16
SWC::FS::SmartFd
Smart FileDescriptor.
Definition: SmartFd.h:34
SWC::DB::Cells::TSV::FileWriter::_stream
std::unique_ptr< Core::BufferStreamOut > _stream
Definition: TSV.h:290
SWC::client::Query::Update::Handlers::Common::Ptr
std::shared_ptr< Common > Ptr
Definition: Common.h:20
SWC::DB::Cells::Cell::vlen
uint32_t vlen
Definition: Cell.h:361
SWC::Core::Vector< FS::SmartFd::Ptr >
SWC::DB::Cells::TSV::FileReader::read
void read(FS::SmartFd::Ptr &smartfd)
Definition: TSV.h:385
SWC::DB::Cells::TSV::FileWriter::write_header
void write_header(Types::Column typ)
Definition: TSV.h:136
SWC::Core::BufferDyn::fill
constexpr SWC_CAN_INLINE size_t fill() const noexcept
Definition: Buffer.h:192
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC::Error::Exception::message
std::string message() const
Definition: Exception.cc:163
SWC::DB::Cells::Cell::encoded_length
constexpr SWC_CAN_INLINE size_t encoded_length(bool no_value=false) const noexcept
Definition: Cell.h:285
SWC::DB::Cell::Key::count
uint24_t count
Definition: CellKey.h:255
SWC::DB::Cells::Cell::get_timestamp
constexpr SWC_CAN_INLINE int64_t get_timestamp() const noexcept
Definition: Cell.h:317
SWC::DB::Cells::OP_EQUAL
constexpr const uint8_t OP_EQUAL
Definition: Cell.h:83
SWC::DB::Cells::TSV::FileWriter::FileWriter
FileWriter(const client::Clients::Ptr &a_clients)
Definition: TSV.h:36
SWC::DB::Cells::TSV::FileWriter::schemas
std::unordered_map< cid_t, DB::Schema::Ptr > schemas
Definition: TSV.h:289
SWC::Core::Vector::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::DB::Cells::Cell::get_counter
constexpr SWC_CAN_INLINE int64_t get_counter() const
Definition: Cell.h:260
SWC::FS::FLUSH
@ FLUSH
Definition: FileSystem.h:43
SWC::DB::NO_TS
@ NO_TS
Definition: Cell.h:51
SWC::DB::Cells::TSV::FileWriter::cells_bytes
size_t cells_bytes
Definition: TSV.h:33
SWC::DB::Cells::TSV::FileWriter::smartfd
FS::SmartFd::Ptr smartfd
Definition: TSV.h:284
SWC::Common::Files::Schema::filepath
std::string filepath(cid_t cid)
Definition: Schema.h:34
SWC::DB::Cells::TSV::FileWriter::output_flags
uint8_t output_flags
Definition: TSV.h:31
SWC::DB::Cells::TSV::FileReader::base_path
std::string base_path
Definition: TSV.h:302
SWC::DB::Cells::TSV::FileReader::schema
DB::Schema::Ptr schema
Definition: TSV.h:304
SWC::FS::Interface::Ptr
std::shared_ptr< Interface > Ptr
Definition: Interface.h:48
SWC::DB::Cells::Result::size_bytes
constexpr SWC_CAN_INLINE size_t size_bytes() const noexcept
Definition: Result.h:64
SWC::Core::Vector::push_back
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Definition: Vector.h:331
SWC::DB::Cells::flag_from
Flag SWC_PURE_FUNC flag_from(const uint8_t *rptr, uint32_t len) noexcept
Definition: Cell.cc:32
SWC::Error::CANCELLED
@ CANCELLED
Definition: Error.h:56
SWC::Core::Vector::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Core::to_string
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Definition: BitFieldInt.h:263
SWC::DB::Cells::TSV::FileWriter::fds
Core::Vector< FS::SmartFd::Ptr > fds
Definition: TSV.h:285
SWC::Error::INVALID_ARGUMENT
@ INVALID_ARGUMENT
Definition: Error.h:58
SWC::Core::Vector::size
constexpr SWC_CAN_INLINE size_type size() const noexcept
Definition: Vector.h:189
SWC::Condition::str_case_eq
bool str_case_eq(const char *s1, const char *s2, size_t count) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
Definition: Comparators_basic.h:257
SWC::DB::Cells::TSV::FileReader::interface
FS::Interface::Ptr interface
Definition: TSV.h:299
SWC::Core::Vector::emplace_back
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
Definition: Vector.h:349
SWC::DB::Cells::TSV::FileWriter::write
void write(const client::Query::Select::Handlers::BaseUnorderedMap::Ptr &hdlr)
Definition: TSV.h:90
SWC::DB::Cells::TSV::FileReader::initialize
void initialize()
Definition: TSV.h:326
SWC::Error::Exception
Definition: Exception.h:21
SWC::Core::Vector::insert
SWC_CAN_INLINE iterator insert(size_type offset, ArgsT &&... args)
Definition: Vector.h:367
SWC::DB::Cells::TSV::FileReader::~FileReader
~FileReader() noexcept
Definition: TSV.h:321
SWC::Core::Vector::reserve
SWC_CAN_INLINE void reserve(size_type cap)
Definition: Vector.h:288
SWC::DB::Cells::Cell::flag
uint8_t flag
Definition: Cell.h:359
SWC::DB::Cells::TSV::FileWriter::finalize
void finalize()
Definition: TSV.h:72
SWC::DB::Cells::TSV::FileWriter::err
int err
Definition: TSV.h:26
SWC::DB::Cells::TSV::FileWriter::cells_count
size_t cells_count
Definition: TSV.h:32
SWC::Serialization::decode_vi32
constexpr SWC_CAN_INLINE uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
Definition: Serialization.h:254
SWC::Error::BAD_MEMORY_ALLOCATION
@ BAD_MEMORY_ALLOCATION
Definition: Error.h:48
SWC::DB::Cells::Cell::set_timestamp
constexpr SWC_CAN_INLINE void set_timestamp(int64_t ts) noexcept
Definition: Cell.h:182