SWC-DB  v0.5.12 C++ documentations
SWC-DB© (Super Wide Column Database) - High Performance Scalable Database (https://github.com/kashirin-alex/swc-db)
BufferStream.cc
Go to the documentation of this file.
1 /*
2  * SWC-DB© Copyright since 2019 Alex Kashirin <kashirin.alex@gmail.com>
3  * License details at <https://github.com/kashirin-alex/swc-db/#license>
4  */
5 
6 
7 #include "swcdb/core/Error.h"
9 
10 
11 namespace SWC { namespace Core {
12 
13 
14 // STREAM-OUT
15 
16 bool BufferStreamOut::empty() const {
17  return !buffer.fill();
18 }
19 
20 bool BufferStreamOut::full() const {
21  return buffer.fill() >= commit_size;
22 }
23 
25  return buffer.fill();
26 }
27 
28 void BufferStreamOut::add(const uint8_t* ptr, size_t len) {
29  buffer.add(ptr, len);
30 }
31 
33  output.set(buffer);
35 }
36 
37 
38 
40  int level, size_t a_pre_alloc, size_t a_commit_size)
41  : BufferStreamOut(a_pre_alloc, a_commit_size),
42  cstream(ZSTD_createCStream()),
43  has_data(false), plain_size(0),
44  tmp_buff(), out_buff() {
45  if(ZSTD_isError(
46  ZSTD_initCStream(cstream, level ? level : ZSTD_CLEVEL_DEFAULT)))
48  else if(ZSTD_isError(
49  ZSTD_CCtx_setParameter(cstream, ZSTD_c_checksumFlag, 1)))
51 }
52 
54  ZSTD_freeCStream(cstream);
55 }
56 
58  return !has_data && BufferStreamOut::empty();
59 }
60 
63 }
64 
67 }
68 
69 void BufferStreamOut_ZSTD_OnAdd::add(const uint8_t* ptr, size_t len) {
70  has_data = true;
71  plain_size += len;
72  tmp_buff.ensure(ZSTD_compressBound(len));
73  out_buff.dst = tmp_buff.base;
74  out_buff.size = tmp_buff.size;
75  out_buff.pos = 0;
76  ZSTD_inBuffer inBuff = { ptr, len, 0 };
77  if(ZSTD_isError(ZSTD_compressStream(cstream, &out_buff, &inBuff))) {
79 
80  } else if (out_buff.pos) {
82  size_t remain;
83  out_buff.pos = 0;
84  do {
85  remain = ZSTD_flushStream(cstream, &out_buff);
86  if(ZSTD_isError(remain)) {
88  return;
89  }
90  if(out_buff.pos) {
92  out_buff.pos = 0;
93  }
94  } while (remain);
95  }
96 }
97 
99  tmp_buff.ensure(ZSTD_compressBound(128));
100  out_buff.dst = tmp_buff.base;
101  out_buff.size = tmp_buff.size;
102  out_buff.pos = 0;
103  size_t remain;
104  do {
105  remain = ZSTD_endStream(cstream, &out_buff);
106  if(ZSTD_isError(remain)) {
108  return;
109  }
110  if(out_buff.pos) {
112  out_buff.pos = 0;
113  }
114  } while (remain);
115 
116  BufferStreamOut::get(output);
117  plain_size = 0;
118  has_data = false;
119 }
120 
121 
122 
124  int level, size_t a_pre_alloc, size_t a_commit_size)
125  : BufferStreamOut(a_pre_alloc, a_commit_size),
126  cstream(ZSTD_createCStream()) {
127  if(ZSTD_isError(
128  ZSTD_initCStream(cstream, level ? level : ZSTD_CLEVEL_DEFAULT)))
130  else if(ZSTD_isError(
131  ZSTD_CCtx_setParameter(cstream, ZSTD_c_checksumFlag, 1)))
133  else if(ZSTD_isError(
134  ZSTD_CCtx_setParameter(cstream, ZSTD_c_contentSizeFlag, 1)))
136 }
137 
139  ZSTD_freeCStream(cstream);
140 }
141 
143  ZSTD_inBuffer input = {buffer.base, buffer.fill(), 0};
144 
145  DynamicBuffer tmp_buff(ZSTD_compressBound(input.size));
146  ZSTD_outBuffer out_buff = {tmp_buff.base, tmp_buff.size, 0};
147 
148  size_t remain = ZSTD_compressStream2(
149  cstream, &out_buff, &input, ZSTD_e_end);
150  if(remain || ZSTD_isError(remain)) {
152  return;
153  }
154  tmp_buff.ptr += out_buff.pos;
155  buffer.clear();
156  output.set(tmp_buff);
157 }
158 
159 
160 
162  Encoder::Type a_encoder, size_t a_pre_alloc, size_t a_commit_size)
163  : BufferStreamOut(a_pre_alloc, a_commit_size),
164  encoder(a_encoder) {
165 }
166 
168  DynamicBuffer tmp_buff;
169  size_t sz_enc;
171  buffer.base, buffer.fill(),
172  &sz_enc, tmp_buff,
173  0, true, true);
174  if(!sz_enc) {
176  } else {
177  buffer.clear();
178  output.set(tmp_buff);
179  }
180 }
181 
182 
183 
184 
185 
186 
187 // STREAM-IN
188 
189 bool BufferStreamIn::empty() const {
190  return !buffer.fill();
191  }
192 
194  if(buffer.fill()) {
195  buffer.add(inbuffer.base, inbuffer.size);
196  inbuffer.free();
197  } else {
198  buffer.take_ownership(inbuffer);
199  }
200 }
201 
202 void BufferStreamIn::put_back(const uint8_t* ptr, size_t len) {
203  buffer.add(ptr, len);
204 }
205 
207  if(buffer.fill()) {
208  output.set(buffer);
209  return true;
210  }
211  return false;
212 }
213 
214 
215 
217  : dstream(ZSTD_createDStream()),
218  buffer_enc(),
219  offset(0), frame_complete(true),
220  tmp_buff() {
221  if(ZSTD_isError(ZSTD_initDStream(dstream)))
223 }
224 
226  ZSTD_freeDStream(dstream);
227 }
228 
230  return !buffer.fill() && !buffer_enc.fill();
231 }
232 
234  if(buffer_enc.fill()) {
235  if(offset) {
236  StaticBuffer tmp(
237  buffer_enc.base + offset, buffer_enc.fill() - offset, true);
238  buffer_enc.clear();
239  buffer_enc.add(tmp.base, tmp.size);
240  }
241  buffer_enc.add(inbuffer.base, inbuffer.size);
242  inbuffer.free();
243  } else {
244  buffer_enc.take_ownership(inbuffer);
245  }
246 }
247 
249  size_t fill = buffer_enc.fill();
250  if(fill <= offset)
251  return false;
252  size_t remain;
253  if(frame_complete) {
254  remain = ZSTD_getFrameContentSize(buffer_enc.base+offset, fill-offset);
255  if(remain == ZSTD_CONTENTSIZE_ERROR || remain == ZSTD_CONTENTSIZE_UNKNOWN)
256  remain = 262144;
257  } else {
258  remain = 262144;
259  }
260  buffer.ensure(remain);
261  tmp_buff.ensure(remain);
262  ZSTD_outBuffer out_buff = {tmp_buff.base, tmp_buff.size, 0};
263  ZSTD_inBuffer in_buff = {buffer_enc.base, fill, offset};
264  frame_complete = false;
265  do {
266  remain = ZSTD_decompressStream(dstream, &out_buff, &in_buff);
267  if(!out_buff.pos || ZSTD_isError(remain))
268  break;
269  buffer.add(tmp_buff.base, out_buff.pos);
270  out_buff.pos = 0;
271  } while(remain);
272 
273  if(!remain)
274  frame_complete = true;
275 
276  if(in_buff.size > in_buff.pos) {
277  offset = in_buff.pos;
278  } else {
279  buffer_enc.clear();
280  offset = 0;
281  }
282  if(buffer.fill()) {
283  output.set(buffer);
284  return true;
285  }
286  return false;
287 }
288 
289 
290 }} // namespace SWC::Core
SWC::Core::BufferStreamOut::commit_size
size_t commit_size
Definition: BufferStream.h:57
SWC::Core::BufferDyn::ptr
value_type * ptr
Definition: Buffer.h:293
SWC::Core::BufferStreamOut_ZSTD::~BufferStreamOut_ZSTD
virtual ~BufferStreamOut_ZSTD() noexcept
Definition: BufferStream.cc:138
SWC::Core::BufferDyn::add
value_type * add(const value_type *data, size_t len)
Definition: Buffer.h:249
SWC::Core::BufferStreamOut::buffer
DynamicBuffer buffer
Definition: BufferStream.h:58
SWC::Core::BufferStreamIn::empty
virtual bool SWC_PURE_FUNC empty() const
Definition: BufferStream.cc:189
SWC::Core::BufferStreamOut_ZSTD_OnAdd::available
virtual size_t SWC_PURE_FUNC available() override
Definition: BufferStream.cc:65
SWC::Core::Encoder::Type
Type
Definition: Encoder.h:28
SWC::Core::BufferStreamOut::pre_alloc
size_t pre_alloc
Definition: BufferStream.h:56
SWC::Core::BufferStreamIn_ZSTD::BufferStreamIn_ZSTD
BufferStreamIn_ZSTD()
Definition: BufferStream.cc:216
SWC::Core::BufferStreamOut::error
int error
Definition: BufferStream.h:24
SWC::Core::BufferDyn::ensure
SWC_CAN_INLINE void ensure(size_t len)
Definition: Buffer.h:212
SWC::Core::BufferStreamIn::buffer
DynamicBuffer buffer
Definition: BufferStream.h:177
SWC::Core::BufferStreamOut::empty
virtual bool SWC_PURE_FUNC empty() const
Definition: BufferStream.cc:16
SWC::Core::BufferStreamOut_ZSTD_OnAdd::has_data
bool has_data
Definition: BufferStream.h:91
SWC::Core::BufferStreamOut_ZSTD_OnAdd::add
virtual void add(const uint8_t *ptr, size_t len) override
Definition: BufferStream.cc:69
SWC::Core::BufferStreamIn_ZSTD::empty
virtual bool SWC_PURE_FUNC empty() const override
Definition: BufferStream.cc:229
BufferStream.h
SWC::Core::BufferDyn::clear
constexpr SWC_CAN_INLINE void clear() noexcept
Definition: Buffer.h:207
encoder
Core::Encoder::Type encoder
Buffer Encoder.
Definition: HeaderBufferInfo.h:50
SWC::Core::BufferStreamIn_ZSTD::~BufferStreamIn_ZSTD
virtual ~BufferStreamIn_ZSTD() noexcept
Definition: BufferStream.cc:225
SWC::Core::BufferStreamOut_ZSTD::cstream
ZSTD_CStream *const cstream
Definition: BufferStream.h:119
SWC::Core::BufferStreamIn_ZSTD::dstream
ZSTD_DStream *const dstream
Definition: BufferStream.h:204
SWC::Core::BufferStreamIn_ZSTD::offset
size_t offset
Definition: BufferStream.h:206
SWC::Core::BufferStreamOut_ZSTD_OnAdd::empty
virtual bool SWC_PURE_FUNC empty() const override
Definition: BufferStream.cc:57
SWC::Core::BufferStreamIn::get
virtual bool get(StaticBuffer &output)
Definition: BufferStream.cc:206
SWC::Core::BufferStreamOut
Definition: BufferStream.h:22
SWC::Core::Buffer::base
value_type * base
Definition: Buffer.h:131
SWC
The SWC-DB C++ namespace 'SWC'.
Definition: main.cc:12
SWC::Core::BufferStreamOut_ZSTD_OnAdd::~BufferStreamOut_ZSTD_OnAdd
virtual ~BufferStreamOut_ZSTD_OnAdd() noexcept
Definition: BufferStream.cc:53
SWC::Core::BufferStreamOut_ZSTD::BufferStreamOut_ZSTD
BufferStreamOut_ZSTD(int level=0, size_t pre_alloc=12582912, size_t commit_size=8388608)
Definition: BufferStream.cc:123
SWC::Core::BufferDyn< StaticBuffer >
SWC::Core::Buffer
Definition: Buffer.h:18
SWC::Core::BufferStreamIn_ZSTD::tmp_buff
DynamicBuffer tmp_buff
Definition: BufferStream.h:208
SWC::Core::Buffer::size
size_t size
Definition: Buffer.h:130
Error.h
SWC::Core::BufferStreamOut_ENCODER::encoder
const Encoder::Type encoder
Definition: BufferStream.h:127
SWC::Core::BufferStreamOut_ZSTD_OnAdd::tmp_buff
DynamicBuffer tmp_buff
Definition: BufferStream.h:93
SWC::Error::ENCODER_ENCODE
@ ENCODER_ENCODE
Definition: Error.h:76
SWC::Core::BufferStreamOut_ZSTD_OnAdd::out_buff
ZSTD_outBuffer out_buff
Definition: BufferStream.h:94
SWC::Core::BufferStreamOut_ZSTD::get
virtual void get(StaticBuffer &output) override
Definition: BufferStream.cc:142
SWC::Core::BufferStreamOut_ZSTD_OnAdd::plain_size
size_t plain_size
Definition: BufferStream.h:92
SWC::Core::Buffer::free
SWC_CAN_INLINE void free() noexcept
Definition: Buffer.h:85
SWC::Core::BufferStreamOut::available
virtual size_t SWC_PURE_FUNC available()
Definition: BufferStream.cc:24
SWC::Core::BufferStreamIn_ZSTD::add
virtual void add(StaticBuffer &inbuffer) override
Definition: BufferStream.cc:233
SWC::Core::BufferStreamOut_ZSTD_OnAdd::BufferStreamOut_ZSTD_OnAdd
BufferStreamOut_ZSTD_OnAdd(int level=0, size_t pre_alloc=12582912, size_t commit_size=8388608)
Definition: BufferStream.cc:39
SWC::Core::BufferStreamOut::add
virtual void add(const uint8_t *ptr, size_t len)
Definition: BufferStream.cc:28
SWC::Core::BufferStreamOut::get
virtual void get(StaticBuffer &output)
Definition: BufferStream.cc:32
SWC::Core::BufferDyn::take_ownership
void take_ownership(BufferDyn< BufferT > &other) noexcept
Definition: Buffer.h:272
SWC::Core::BufferDyn::fill
constexpr SWC_CAN_INLINE size_t fill() const noexcept
Definition: Buffer.h:192
SWC::Core::BufferStreamIn_ZSTD::buffer_enc
DynamicBuffer buffer_enc
Definition: BufferStream.h:205
SWC::Core::BufferStreamIn::error
int error
Definition: BufferStream.h:153
SWC::Core::BufferStreamOut_ENCODER::get
virtual void get(StaticBuffer &output) override
Definition: BufferStream.cc:167
SWC::Core::Encoder::encode
void encode(int &err, Type encoder, const uint8_t *src, size_t src_sz, size_t *sz_enc, DynamicBuffer &output, uint32_t reserve, bool no_plain_out=false, bool ok_more=false)
Definition: Encoder.cc:222
SWC::Core::BufferStreamOut_ZSTD_OnAdd::get
virtual void get(StaticBuffer &output) override
Definition: BufferStream.cc:98
SWC::Core::BufferStreamIn_ZSTD::frame_complete
bool frame_complete
Definition: BufferStream.h:207
SWC::Core::BufferStreamOut::full
virtual bool SWC_PURE_FUNC full() const
Definition: BufferStream.cc:20
SWC::Core::BufferStreamIn::add
virtual void add(StaticBuffer &inbuffer)
Definition: BufferStream.cc:193
SWC::Core::BufferStreamOut_ZSTD_OnAdd::full
virtual bool SWC_PURE_FUNC full() const override
Definition: BufferStream.cc:61
SWC::Core::BufferStreamIn::put_back
virtual void put_back(const uint8_t *ptr, size_t len)
Definition: BufferStream.cc:202
SWC::Core::BufferStreamOut_ENCODER::BufferStreamOut_ENCODER
BufferStreamOut_ENCODER(Encoder::Type encoder, size_t pre_alloc=12582912, size_t commit_size=8388608)
Definition: BufferStream.cc:161
SWC::Core::BufferStreamOut_ZSTD_OnAdd::cstream
ZSTD_CStream *const cstream
Definition: BufferStream.h:90
SWC::Core::BufferStreamIn_ZSTD::get
virtual bool get(StaticBuffer &output) override
Definition: BufferStream.cc:248
SWC::Core::Buffer::set
void set(value_type *data, size_t len, bool take_ownership) noexcept
Definition: Buffer.h:109