SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
Schemas.h
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 #ifndef swcdb_manager_Schemas_h
7 #define swcdb_manager_Schemas_h
8 
9 
11 
12 
13 namespace SWC { namespace Manager {
14 
15 
16 class Schemas final : public DB::Schemas {
17  public:
18 
19  static constexpr const char schemas_file[] = "schemas.store";
20 
22  Schemas() noexcept { }
23 
24  ~Schemas() noexcept { }
25 
26  void store_create(int& err, uint8_t replicas, uint32_t blksz,
27  const DB::Types::Encoder cfg_encoder) {
28  auto& fs_if = Env::FsInterface::interface();
29  auto smartfd = FS::SmartFd::make_ptr(
31 
32  Env::FsInterface::interface()->create(err, smartfd, replicas);
33  if(err)
34  return store_remove(smartfd);
35 
36  uint64_t size = 0;
37  DynamicBuffer blk_buff;
38  blk_buff.ensure(blksz);
39 
40  DB::SchemasVec entries;
41  DB::Schemas::all(entries);
42  for(auto it = entries.cbegin(); it != entries.cend(); ) {
43  blk_buff.ensure((*it)->encoded_length());
44  (*it)->encode(&blk_buff.ptr);
45  if(++it == entries.cend() || blk_buff.fill() >= blksz) {
47  err,
48  &size,
49  blk_buff,
50  cfg_encoder,
51  smartfd,
52  it == entries.cend()
53  );
54  if(err)
55  return store_remove(smartfd);
56  blk_buff.clear();
57  blk_buff.ensure(blksz);
58  }
59  }
60  fs_if->close(err, smartfd);
61  if(!err && (fs_if->length(err, smartfd->filepath()) != size || err)) {
62  if(!err)
63  err = Error::FS_EOF;
64  SWC_LOGF(LOG_DEBUG, "Not-matching file-size=" SWC_FMT_LU, size);
65  }
66  if(err)
67  store_remove(smartfd);
68  }
69 
70  bool store_load(int& err) {
71  auto& fs_if = Env::FsInterface::interface();
72  auto smartfd = FS::SmartFd::make_ptr(schemas_file, 0);
73 
74  if(!fs_if->exists(err, smartfd->filepath())) {
75  if(err)
76  store_remove(smartfd);
77  return false;
78  }
79 
80  size_t length = fs_if->length(err, smartfd->filepath());
81  if(!err)
82  fs_if->open(err, smartfd);
83  auto& fs = fs_if->get_fs();
84  const uint8_t* ptr;
85  size_t remain;
86  if(!err) {
87  uint8_t buf[8];
88  ptr = buf;
89  remain = 8;
90  if(fs->pread(err, smartfd, length - 8, buf, 8) != 8 ||
91  Serialization::decode_i64(&ptr, &remain) != length) {
92  err = Error::FS_EOF;
93  } else {
94  length -= 8;
95  }
96  }
97  uint8_t buf[BLOCK_HEADER_SIZE];
98  StaticBuffer buffer;
99  for(size_t offset = 0; !err && offset < length; ) {
100  if(fs->pread(err, smartfd, offset, buf, BLOCK_HEADER_SIZE)
101  != BLOCK_HEADER_SIZE) {
102  err = Error::FS_EOF;
103  break;
104  }
105  offset += BLOCK_HEADER_SIZE;
106  ptr = buf;
107  remain = BLOCK_HEADER_SIZE;
109  Serialization::decode_i8(&ptr, &remain));
110  uint32_t size_enc = Serialization::decode_i32(&ptr, &remain);
111  uint32_t size_plain = Serialization::decode_i32(&ptr, &remain);
112  uint32_t checksum_data = Serialization::decode_i32(&ptr, &remain);
114  buf, BLOCK_HEADER_SIZE - 4)) {
116  break;
117  }
118 
119  if(fs->pread(err, smartfd, offset, &buffer, size_enc) != size_enc) {
120  err = Error::FS_EOF;
121  break;
122  }
123  offset += size_enc;
124  if(!Core::checksum_i32_chk(checksum_data, buffer.base, size_enc)) {
126  break;
127  }
128  if(encoder != DB::Types::Encoder::PLAIN) {
129  StaticBuffer decoded_buf(static_cast<size_t>(size_plain));
131  err, encoder,
132  buffer.base, size_enc,
133  decoded_buf.base, size_plain
134  );
135  if(err)
136  break;
137  buffer.set(decoded_buf);
138  }
139  ptr = buffer.base;
140  remain = size_plain;
141  while(remain) {
142  DB::Schemas::replace(DB::Schema::make(&ptr, &remain));
143  }
144  buffer.free();
145  }
146  store_remove(smartfd);
147  return err ? false : true;
148  }
149 
150 
151  private:
152 
153  static constexpr const uint8_t BLOCK_HEADER_SIZE = 1 + 4 + 4 + 4 + 4;
154 
156  int err = Error::OK;
157  if(smartfd->valid())
158  Env::FsInterface::interface()->close(err, smartfd);
159  Env::FsInterface::interface()->remove(err, smartfd->filepath());
160  }
161 
162  void store_make_block(int& err, uint64_t* sizep, DynamicBuffer& blk_buff,
164  bool last_blk) {
165  uint32_t size_plain = blk_buff.fill();
166  size_t len_enc = 0;
167  DynamicBuffer output;
169  &len_enc, output, BLOCK_HEADER_SIZE);
170  if(err)
171  return;
172 
173  uint32_t size_enc;
174  if(len_enc) {
175  size_enc = len_enc;
176  } else {
177  encoder = DB::Types::Encoder::PLAIN;
178  size_enc = size_plain;
179  }
180  uint8_t* ptr = output.base;
181  const uint8_t* base = ptr;
182  Serialization::encode_i8(&ptr, uint8_t(encoder));
183  Serialization::encode_i32(&ptr, size_enc);
185  Core::checksum_i32(base + BLOCK_HEADER_SIZE, size_enc, &ptr);
186  Core::checksum_i32(base, ptr, &ptr);
187 
188  *sizep += output.fill();
189  if(last_blk) {
190  // trailer (total-size)
191  *sizep += 8;
192  output.ensure(8);
193  Serialization::encode_i64(&output.ptr, *sizep);
194  }
195  StaticBuffer buff_write(output);
196  Env::FsInterface::fs()->append(
197  err,
198  smartfd,
199  buff_write,
201  );
202  }
203 
204 };
205 
206 
207 
208 }}
209 
210 #endif // swcdb_manager_Schemas_h
SWC::Manager::Schemas::store_make_block
void store_make_block(int &err, uint64_t *sizep, DynamicBuffer &blk_buff, DB::Types::Encoder encoder, FS::SmartFd::Ptr &smartfd, bool last_blk)
Definition: Schemas.h:162
SWC::Manager::Schemas::store_remove
void store_remove(FS::SmartFd::Ptr &smartfd)
Definition: Schemas.h:155
SWC::Core::BufferDyn::ptr
value_type * ptr
Definition: Buffer.h:293
SWC::Manager::Schemas::store_load
bool store_load(int &err)
Definition: Schemas.h:70
SWC_LOGF
#define SWC_LOGF(priority, fmt,...)
Definition: Logger.h:188
SWC::Serialization::encode_i64
SWC_CAN_INLINE void encode_i64(uint8_t **bufp, uint64_t val) noexcept
Definition: Serialization.h:151
SWC::Core::Encoder::Type
Type
Definition: Encoder.h:28
SWC::Serialization::encode_i32
SWC_CAN_INLINE void encode_i32(uint8_t **bufp, uint32_t val) noexcept
Definition: Serialization.h:138
SWC::Env::FsInterface::interface
static SWC_CAN_INLINE FS::Interface::Ptr & interface() noexcept
Definition: Interface.h:150
SWC::FS::SmartFd::make_ptr
static SWC_CAN_INLINE Ptr make_ptr(const std::string &filepath, uint32_t flags, int32_t fd=-1, uint64_t pos=0)
Definition: SmartFd.h:40
SWC::Manager::Schemas::store_create
void store_create(int &err, uint8_t replicas, uint32_t blksz, const DB::Types::Encoder cfg_encoder)
Definition: Schemas.h:26
SWC::Core::checksum_i32_chk
SWC_CAN_INLINE bool checksum_i32_chk(uint32_t checksum, const uint8_t *base, uint32_t len)
Definition: Checksum.h:94
SWC::Manager::Schemas
Definition: Schemas.h:16
SWC::Core::BufferDyn::ensure
SWC_CAN_INLINE void ensure(size_t len)
Definition: Buffer.h:212
SWC::Serialization::encode_i8
constexpr SWC_CAN_INLINE void encode_i8(uint8_t **bufp, uint8_t val) noexcept
Definition: Serialization.h:85
SWC::Manager::Schemas::Schemas
SWC_CAN_INLINE Schemas() noexcept
Definition: Schemas.h:22
SWC::Core::BufferDyn::clear
constexpr SWC_CAN_INLINE void clear() noexcept
Definition: Buffer.h:207
encoder
Core::Encoder::Type encoder
Buffer Encoder.
Definition: HeaderBufferInfo.h:50
SWC::DB::Schemas::size
uint64_t size()
Definition: Schemas.cc:14
SWC::Error::OK
@ OK
Definition: Error.h:45
SWC_CAN_INLINE
#define SWC_CAN_INLINE
Definition: Compat.h:102
SWC::LOG_DEBUG
@ LOG_DEBUG
Definition: Logger.h:36
SWC::Error::FS_EOF
@ FS_EOF
Definition: Error.h:96
SWC::Core::Encoder::decode
void decode(int &err, Type encoder, const uint8_t *src, size_t sz_enc, uint8_t *dst, size_t sz)
Definition: Encoder.cc:99
SWC::DB::Schemas::all
void all(SchemasVec &entries)
Definition: Schemas.cc:79
SWC::Core::Buffer::base
value_type * base
Definition: Buffer.h:131
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::FS::OPEN_FLAG_OVERWRITE
@ OPEN_FLAG_OVERWRITE
Definition: FileSystem.h:35
SWC::DB::Schemas::replace
void replace(const Schema::Ptr &schema)
Definition: Schemas.cc:45
SWC::DB::Schema::make
static SWC_CAN_INLINE Ptr make()
Definition: Schema.h:189
SWC::Core::BufferDyn< StaticBuffer >
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::Env::FsInterface::fs
static SWC_CAN_INLINE FS::FileSystem::Ptr & fs() noexcept
Definition: Interface.h:155
size_plain
uint32_t size_plain
Buffer set if Encoder not PLAIN.
Definition: HeaderBufferInfo.h:48
SWC::Comm::Protocol::FsBroker::Handler::length
void length(const ConnHandlerPtr &conn, const Event::Ptr &ev)
Definition: Length.h:17
SWC::DB::Schemas
Definition: Schemas.h:18
SWC_FMT_LU
#define SWC_FMT_LU
Definition: Compat.h:98
SWC::Core::Buffer::free
SWC_CAN_INLINE void free() noexcept
Definition: Buffer.h:85
SWC::Manager::Schemas::BLOCK_HEADER_SIZE
static constexpr const uint8_t BLOCK_HEADER_SIZE
Definition: Schemas.h:153
SWC::Core::Vector< Schema::Ptr >
SWC::DB::Types::Encoder
Core::Encoder::Type Encoder
Definition: Encoder.h:15
SWC::Core::BufferDyn::fill
constexpr SWC_CAN_INLINE size_t fill() const noexcept
Definition: Buffer.h:192
SWC::FS::SmartFd::Ptr
std::shared_ptr< SmartFd > Ptr
Definition: SmartFd.h:37
SWC::Manager::Schemas::schemas_file
static constexpr const char schemas_file[]
Definition: Schemas.h:19
SWC::Error::CHECKSUM_MISMATCH
@ CHECKSUM_MISMATCH
Definition: Error.h:62
Schemas.h
SWC::Core::Vector::cend
constexpr SWC_CAN_INLINE const_iterator cend() const noexcept
Definition: Vector.h:232
SWC::FS::FLUSH
@ FLUSH
Definition: FileSystem.h:43
SWC::Core::checksum_i32
SWC_CAN_INLINE void checksum_i32(const uint8_t *start, size_t len, uint8_t **ptr) noexcept
Definition: Checksum.h:62
SWC::Core::Encoder::encode
void encode(int &err, Type encoder, const uint8_t *src, size_t src_sz, size_t *sz_enc, DynamicBuffer &output, uint32_t reserve, bool no_plain_out=false, bool ok_more=false)
Definition: Encoder.cc:222
SWC::Core::Vector::cbegin
constexpr SWC_CAN_INLINE const_iterator cbegin() const noexcept
Definition: Vector.h:216
SWC::Serialization::decode_i8
constexpr SWC_CAN_INLINE uint8_t decode_i8(const uint8_t **bufp, size_t *remainp)
Definition: Serialization.h:91
SWC::Manager::Schemas::~Schemas
~Schemas() noexcept
Definition: Schemas.h:24
SWC::Serialization::decode_i64
SWC_CAN_INLINE uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Definition: Serialization.h:156
SWC::Serialization::decode_i32
SWC_CAN_INLINE uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Definition: Serialization.h:143
SWC::Core::Buffer::set
void set(value_type *data, size_t len, bool take_ownership) noexcept
Definition: Buffer.h:109