|
SWC-DB
v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
|
Go to the documentation of this file.
7 #ifndef swcdb_common_Files_TSV_h
8 #define swcdb_common_Files_TSV_h
15 namespace SWC {
namespace DB {
namespace Cells {
57 if(ext.length() == 3) {
92 for(
cid_t cid : hdlr->get_cids()) {
100 for(
cid_t cid : hdlr->get_cids()) {
101 col_type =
schemas[cid]->col_type;
104 hdlr->get_cells(cid,
cells);
110 for(
auto cell :
cells) {
125 write(*cell, col_type);
140 header.append(
"TIMESTAMP\t");
142 header.append(
"FLEN\tKEY\tFLAG\t");
147 :
"ORDER\tVLEN\tVALUE");
150 header.append(
"\tENCODER");
153 _stream->add(
reinterpret_cast<const uint8_t*
>(header.c_str()), header.size());
166 const uint8_t* ptr = cell.
key.
data;
167 for(uint32_t n=1; n<=cell.
key.
count; ++n, ptr+=len) {
175 for(uint32_t n=1; n<=cell.
key.
count; ++n, ptr+=len) {
177 cell_buff.
add(ptr, len);
196 std::string v(i > 0 ?
"+" :
"");
225 cell_buff.
add(std::string(
"\t1"));
268 fd->pos(
interface->get_fs()->length(
err, fd->filepath()));
289 std::unordered_map<cid_t, DB::Schema::Ptr>
schemas;
290 std::unique_ptr<Core::BufferStreamOut>
_stream;
312 client::Query::Update::Handlers::Common::make(
313 clients, nullptr, nullptr, flag)),
338 for(
auto& entry : entries) {
341 for(
auto it = files.
cbegin(); ; ++it) {
342 if(it == files.
cend() ||
344 reinterpret_cast<const uint8_t*
>((*it)->name.c_str()),
346 reinterpret_cast<const uint8_t*
>(entry.name.c_str()),
347 entry.name.size() ) ) {
354 for(
auto file : files) {
356 p.reserve(
base_path.size() + file->name.size());
358 p.append(file->name);
374 for(
auto& fd :
fds) {
380 hdlr->commit_if_need();
390 auto colp =
hdlr->get_base_ptr(
cid);
392 std::unique_ptr<Core::BufferStreamIn> instream;
393 const std::string& path = smartfd->filepath();
395 "zst", path.c_str()+(path.length()-3), path.length()-3)) {
409 size_t cell_mark = 0;
422 r_sz =
length - offset > 1048576 ? 1048576 :
length - offset;
428 err, smartfd, offset, &buffer, r_sz) != r_sz) {
442 instream->add(buffer);
445 if(!instream->get(buffer_read)) {
451 const uint8_t* ptr = buffer_read.
base;
452 size_t remain = buffer_read.
size;
457 message.append(smartfd->filepath());
460 header.
empty() ?
"columns defintion" :
"value columns");
461 message.append(
" in header\n");
469 ok =
read(&ptr, &remain, has_ts,
schema->col_type, cell);
470 cell_pos += cell_mark-remain;
476 hdlr->commit_or_wait(colp);
480 instream->put_back(ptr, remain);
489 message.append(
", corrupted '");
490 message.append(smartfd->filepath());
491 message.append(
"' starting at-offset=");
504 if(!instream->empty()) {
505 message.append(
"early file end");
506 message.append(
", corrupted '");
507 message.append(smartfd->filepath());
508 message.append(
"' starting at-offset=");
518 const uint8_t* ptr = *bufp;
519 size_t remain = *remainp;
521 const uint8_t* s = *bufp;
522 while(remain && *ptr !=
'\n') {
525 if(*ptr ==
'\t' || *ptr ==
'\n') {
526 header.
emplace_back(
reinterpret_cast<const char*
>(s), ptr-s);
531 if(header.
empty() || !remain || *ptr !=
'\n')
535 header.
front().c_str(),
"timestamp", 9);
537 header.
back().c_str(),
"encoder", 7);
538 if(header.
size() <
size_t(6 + has_ts + has_encoder))
549 bool read(
const uint8_t** bufp,
size_t* remainp,
bool has_ts,
551 const uint8_t* ptr = *bufp;
552 size_t remain = *remainp;
553 const uint8_t* s = ptr;
556 while(remain && *ptr !=
'\t') {
563 std::stoll(std::string(
reinterpret_cast<const char*
>(s), ptr-s)));
570 if(*ptr ==
',' || *ptr ==
'\t') {
572 std::stoul(std::string(
reinterpret_cast<const char*
>(s), ptr-s)));
587 for(
auto len : flen) {
598 while(remain && (*ptr !=
'\t' && *ptr !=
'\n')) {
605 throw std::runtime_error(
"Bad cell Flag");
609 throw std::runtime_error(
"Expected end of line");
613 throw std::runtime_error(
"Expected a tab");
617 while(remain && (*ptr !=
'\t' && *ptr !=
'\n')) {
625 int64_t counter = std::stoll(
626 std::string(
reinterpret_cast<const char*
>(s), ptr-s));
634 throw std::runtime_error(
"Expected EQ symbol");
644 while(remain && *ptr !=
'\n') {
651 std::string(
reinterpret_cast<const char*
>(s), ptr-s));
662 while(remain && *ptr !=
'\t') {
669 cell.
vlen = std::stoul(
670 std::string(
reinterpret_cast<const char*
>(s), ptr-s));
671 if(--remain < cell.
vlen+1)
675 cell.
value =
const_cast<uint8_t*
>(ptr);
679 if(remain && *ptr ==
'\t') {
693 if(!remain || *ptr !=
'\n')
712 #endif // swcdb_common_Files_TSV_h
client::Query::Update::Handlers::Common::Ptr hdlr
void set_counter(uint8_t op, int64_t v, Types::Column typ=Types::Column::COUNTER_I64, int64_t rev=TIMESTAMP_NULL)
constexpr SWC_CAN_INLINE reference front() noexcept
virtual ~FileWriter() noexcept
client::Clients::Ptr clients
FileReader(const client::Clients::Ptr &clients, const client::Clients::Flag flag)
@ SQL_BAD_LOAD_FILE_FORMAT
value_type * add(const value_type *data, size_t len)
void write(const Cell &cell, Types::Column typ)
std::shared_ptr< Schema > Ptr
bool header_read(const uint8_t **bufp, size_t *remainp, Types::Column, bool &has_ts, Core::Vector< std::string > &header)
const char *SWC_CONST_FUNC to_string(Flag flag) noexcept
FS::Interface::Ptr interface
std::shared_ptr< BaseUnorderedMap > Ptr
SWC_CAN_INLINE bool lt_volume(const uint8_t *p1, uint32_t p1_len, const uint8_t *p2, uint32_t p2_len) noexcept
void get_length(Core::Vector< FS::SmartFd::Ptr > &files)
bool read(const uint8_t **bufp, size_t *remainp, bool has_ts, Types::Column typ, Cell &cell)
SWC_CAN_INLINE void add(const std::string_view &fraction)
bool str_eq(const char *s1, const char *s2) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
Core::Vector< FS::SmartFd::Ptr > fds
constexpr SWC_CAN_INLINE bool empty() const noexcept
constexpr SWC_CAN_INLINE bool is_time_order_desc() const noexcept
constexpr SWC_CAN_INLINE reference back() noexcept
#define SWC_CURRENT_EXCEPTION(_msg_)
The SWC-DB C++ namespace 'SWC'.
constexpr SWC_CAN_INLINE bool have_encoder() const noexcept
bool SWC_CONST_FUNC is_counter(const Column typ) noexcept
constexpr const uint8_t HAVE_ENCODER
Types::Encoder get_value(StaticBuffer &v, bool owner) const
void length(const ConnHandlerPtr &conn, const Event::Ptr &ev)
constexpr SWC_CAN_INLINE void set_time_order_desc(bool desc) noexcept
int set_extension(const std::string &ext, int level)
constexpr const int64_t TIMESTAMP_NULL
SWC_CAN_INLINE void free() noexcept
std::unique_ptr< Core::BufferStreamOut > _stream
std::shared_ptr< Common > Ptr
void read(FS::SmartFd::Ptr &smartfd)
void write_header(Types::Column typ)
constexpr SWC_CAN_INLINE size_t fill() const noexcept
std::shared_ptr< SmartFd > Ptr
std::string message() const
constexpr SWC_CAN_INLINE size_t encoded_length(bool no_value=false) const noexcept
constexpr SWC_CAN_INLINE int64_t get_timestamp() const noexcept
constexpr const uint8_t OP_EQUAL
FileWriter(const client::Clients::Ptr &a_clients)
std::unordered_map< cid_t, DB::Schema::Ptr > schemas
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
constexpr SWC_CAN_INLINE int64_t get_counter() const
std::string filepath(cid_t cid)
std::shared_ptr< Interface > Ptr
constexpr SWC_CAN_INLINE size_t size_bytes() const noexcept
SWC_CAN_INLINE void push_back(ArgsT &&... args)
Flag SWC_PURE_FUNC flag_from(const uint8_t *rptr, uint32_t len) noexcept
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
SWC_CAN_INLINE std::string to_string(const BitFieldInt< T, SZ > &v)
Core::Vector< FS::SmartFd::Ptr > fds
constexpr SWC_CAN_INLINE size_type size() const noexcept
bool str_case_eq(const char *s1, const char *s2, size_t count) noexcept SWC_ATTRIBS((SWC_ATTRIB_O3))
FS::Interface::Ptr interface
SWC_CAN_INLINE reference emplace_back(ArgsT &&... args)
void write(const client::Query::Select::Handlers::BaseUnorderedMap::Ptr &hdlr)
SWC_CAN_INLINE iterator insert(size_type offset, ArgsT &&... args)
SWC_CAN_INLINE void reserve(size_type cap)
constexpr SWC_CAN_INLINE uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
constexpr SWC_CAN_INLINE void set_timestamp(int64_t ts) noexcept