compress: cstream: convert to a b_object type
This commit is contained in:
@@ -1,75 +1,90 @@
|
||||
#include "cstream.h"
|
||||
|
||||
#include <blue/compress/compressor.h>
|
||||
#include <blue/compress/cstream.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
enum b_status b_cstream_open(
|
||||
b_stream *endpoint, const struct b_compression_function *func,
|
||||
enum b_compression_mode mode, struct b_cstream **out)
|
||||
{
|
||||
size_t inbuf_size = 0, outbuf_size = 0;
|
||||
enum b_status status = b_compression_function_get_buffer_size(
|
||||
func, mode, &inbuf_size, &outbuf_size);
|
||||
if (!B_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
/*** PRIVATE DATA *************************************************************/
|
||||
|
||||
struct b_cstream *stream = malloc(sizeof *stream);
|
||||
if (!stream) {
|
||||
return B_ERR_NO_MEMORY;
|
||||
}
|
||||
enum cstream_flags {
|
||||
CSTREAM_CURSOR_MOVED = 0x01u,
|
||||
};
|
||||
|
||||
memset(stream, 0x0, sizeof *stream);
|
||||
struct b_cstream_p {
|
||||
enum cstream_flags s_flags;
|
||||
b_stream *s_endpoint;
|
||||
b_compressor *s_compressor;
|
||||
/* s_in is the input buffer, and s_out is the output buffer.
|
||||
*
|
||||
* the input buffer holds data that will be provided to the
|
||||
* (de)compression function. in compression mode, this data is provided
|
||||
* by the code using the cstream (via b_cstream_write). in decompression
|
||||
* mode, this data is read from s_endpoint.
|
||||
*
|
||||
* the output buffer holds data produced by the (de)compression
|
||||
* function. in compression mode, this data will be written to
|
||||
* s_endpoint. in decompression mode, this data will be returned to the
|
||||
* code using the cstream (via b_cstream_read)
|
||||
*
|
||||
* heavy usage of cstream's compressed sections facility can result
|
||||
* in the input buffer holding uncompressed data while the stream is in
|
||||
* decompression mode. this is handled by the uncompressed read code path.
|
||||
*/
|
||||
b_ringbuffer *s_in, *s_out;
|
||||
b_compressor_mode s_mode;
|
||||
|
||||
stream->s_mode = mode;
|
||||
stream->s_endpoint = endpoint;
|
||||
unsigned int s_compression_depth;
|
||||
/* tracks the number of bytes read from or written to the endpoint.
|
||||
* this counter is not reset at the beginning/end of each section.
|
||||
*
|
||||
* during compressed sections, this counter is incremented by the number
|
||||
* of compressed bytes written/consumed.
|
||||
*
|
||||
* during uncompressed sections, this counter is incremented by the
|
||||
* number of uncompressed bytes written/returned.
|
||||
*
|
||||
* this does not include bytes read/written while the cursor is moved.
|
||||
*/
|
||||
size_t s_tx_bytes;
|
||||
/* tracks the number of compressed bytes that have passed through this
|
||||
* stream in the current section.
|
||||
*
|
||||
* in compression mode, this tracks the number of post-compression bytes
|
||||
* that have been written to the endpoint within the current section,
|
||||
* including any bytes written during end_compression_section()
|
||||
*
|
||||
* in decompression mode, this tracks the number of compressed bytes
|
||||
* that were decompressed while reading the current section. it does not
|
||||
* include any uncompressed bytes that may have been read from the
|
||||
* endpoint while reading a compressed section due to cstream's
|
||||
* read-ahead caching behaviour.
|
||||
*/
|
||||
size_t s_tx_bytes_compressed;
|
||||
/* tracks the number of uncompressed bytes that have passed through this
|
||||
* stream in the current section.
|
||||
*
|
||||
* in compression mode, this tracks the number of bytes given to
|
||||
* b_cstream_write
|
||||
*
|
||||
* in decompression mode, this tracks the number of bytes returned by
|
||||
* b_cstream_read
|
||||
*/
|
||||
size_t s_tx_bytes_uncompressed;
|
||||
|
||||
status = b_ringbuffer_init(&stream->s_in, inbuf_size + 1);
|
||||
if (!B_OK(status)) {
|
||||
free(stream);
|
||||
return status;
|
||||
}
|
||||
/* when the endpoint cursor is moved, the previous cursor position is
|
||||
* saved here so it can be restored later */
|
||||
size_t s_cursor;
|
||||
};
|
||||
|
||||
status = b_ringbuffer_init(&stream->s_out, outbuf_size + 1);
|
||||
if (!B_OK(status)) {
|
||||
b_ringbuffer_destroy(&stream->s_in);
|
||||
free(stream);
|
||||
return status;
|
||||
}
|
||||
|
||||
status = b_compressor_create(
|
||||
func, mode, &stream->s_in, &stream->s_out, &stream->s_compressor);
|
||||
if (!B_OK(status)) {
|
||||
b_ringbuffer_destroy(&stream->s_in);
|
||||
b_ringbuffer_destroy(&stream->s_out);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
*out = stream;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_close(struct b_cstream *stream)
|
||||
{
|
||||
b_compressor_destroy(stream->s_compressor);
|
||||
b_ringbuffer_destroy(&stream->s_in);
|
||||
b_ringbuffer_destroy(&stream->s_out);
|
||||
free(stream);
|
||||
|
||||
return B_SUCCESS;
|
||||
}
|
||||
/*** PRIVATE FUNCTIONS ********************************************************/
|
||||
|
||||
static enum b_status read_cursor(
|
||||
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
struct b_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
return b_stream_read_bytes(stream->s_endpoint, buf, count, out_nr_read);
|
||||
}
|
||||
|
||||
static enum b_status read_uncompressed(
|
||||
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
struct b_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
size_t remaining = count;
|
||||
unsigned char *dest = buf;
|
||||
@@ -86,7 +101,7 @@ static enum b_status read_uncompressed(
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = b_ringbuffer_open_read_buffer(
|
||||
&stream->s_in, &data, &available);
|
||||
stream->s_in, &data, &available);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
@@ -98,7 +113,7 @@ static enum b_status read_uncompressed(
|
||||
|
||||
memcpy(dest, data, to_copy);
|
||||
|
||||
b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy);
|
||||
b_ringbuffer_close_read_buffer(stream->s_in, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
stream->s_tx_bytes += to_copy;
|
||||
@@ -126,7 +141,7 @@ static enum b_status read_uncompressed(
|
||||
* note that uncompressed data that is trailing the compressed blob may
|
||||
* also be read by this function, but this will be handled by read_uncompressed.
|
||||
*/
|
||||
static enum b_status refill_input_buffer(struct b_cstream *stream)
|
||||
static enum b_status refill_input_buffer(struct b_cstream_p *stream)
|
||||
{
|
||||
enum b_status status = B_SUCCESS;
|
||||
size_t nr_read = 0;
|
||||
@@ -135,7 +150,7 @@ static enum b_status refill_input_buffer(struct b_cstream *stream)
|
||||
void *data;
|
||||
size_t capacity;
|
||||
status = b_ringbuffer_open_write_buffer(
|
||||
&stream->s_in, &data, &capacity);
|
||||
stream->s_in, &data, &capacity);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
@@ -144,7 +159,7 @@ static enum b_status refill_input_buffer(struct b_cstream *stream)
|
||||
status = b_stream_read_bytes(
|
||||
stream->s_endpoint, data, capacity, &r);
|
||||
|
||||
b_ringbuffer_close_write_buffer(&stream->s_in, &data, r);
|
||||
b_ringbuffer_close_write_buffer(stream->s_in, &data, r);
|
||||
nr_read += r;
|
||||
|
||||
if (r < capacity) {
|
||||
@@ -165,7 +180,7 @@ static enum b_status refill_input_buffer(struct b_cstream *stream)
|
||||
|
||||
/* push compressed data out of the input buffer, through the (de)compressor,
|
||||
* and store the resulting uncompressed data in the output buffer */
|
||||
static enum b_status refill_output_buffer(struct b_cstream *stream)
|
||||
static enum b_status refill_output_buffer(struct b_cstream_p *stream)
|
||||
{
|
||||
enum b_status status = B_SUCCESS;
|
||||
|
||||
@@ -173,7 +188,7 @@ static enum b_status refill_output_buffer(struct b_cstream *stream)
|
||||
return B_ERR_NO_DATA;
|
||||
}
|
||||
|
||||
if (!b_ringbuffer_available_data_remaining(&stream->s_in)) {
|
||||
if (!b_ringbuffer_available_data_remaining(stream->s_in)) {
|
||||
status = refill_input_buffer(stream);
|
||||
}
|
||||
|
||||
@@ -181,9 +196,9 @@ static enum b_status refill_output_buffer(struct b_cstream *stream)
|
||||
return status;
|
||||
}
|
||||
|
||||
size_t bytes_before = b_ringbuffer_available_data_remaining(&stream->s_in);
|
||||
size_t bytes_before = b_ringbuffer_available_data_remaining(stream->s_in);
|
||||
status = b_compressor_step(stream->s_compressor);
|
||||
size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in);
|
||||
size_t bytes_after = b_ringbuffer_available_data_remaining(stream->s_in);
|
||||
|
||||
stream->s_tx_bytes_compressed += (bytes_before - bytes_after);
|
||||
stream->s_tx_bytes += (bytes_before - bytes_after);
|
||||
@@ -191,10 +206,10 @@ static enum b_status refill_output_buffer(struct b_cstream *stream)
|
||||
return status;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_read(
|
||||
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
static enum b_status cstream_read(
|
||||
struct b_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) {
|
||||
if (stream->s_mode != B_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
return B_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
@@ -212,7 +227,7 @@ enum b_status b_cstream_read(
|
||||
enum b_status status = B_SUCCESS;
|
||||
|
||||
while (remaining > 0) {
|
||||
if (!b_ringbuffer_available_data_remaining(&stream->s_out)) {
|
||||
if (!b_ringbuffer_available_data_remaining(stream->s_out)) {
|
||||
status = refill_output_buffer(stream);
|
||||
}
|
||||
|
||||
@@ -223,7 +238,7 @@ enum b_status b_cstream_read(
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = b_ringbuffer_open_read_buffer(
|
||||
&stream->s_out, &data, &available);
|
||||
stream->s_out, &data, &available);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
@@ -235,7 +250,7 @@ enum b_status b_cstream_read(
|
||||
|
||||
memcpy(dest, data, to_copy);
|
||||
|
||||
b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy);
|
||||
b_ringbuffer_close_read_buffer(stream->s_out, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
dest += to_copy;
|
||||
@@ -252,13 +267,13 @@ enum b_status b_cstream_read(
|
||||
}
|
||||
|
||||
static enum b_status write_cursor(
|
||||
struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written)
|
||||
struct b_cstream_p *stream, const void *buf, size_t count, size_t *nr_written)
|
||||
{
|
||||
return b_stream_write_bytes(stream->s_endpoint, buf, count, nr_written);
|
||||
}
|
||||
|
||||
static enum b_status write_uncompressed(
|
||||
struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written)
|
||||
struct b_cstream_p *stream, const void *buf, size_t count, size_t *nr_written)
|
||||
{
|
||||
size_t w = 0;
|
||||
enum b_status status
|
||||
@@ -273,9 +288,9 @@ static enum b_status write_uncompressed(
|
||||
|
||||
/* push uncompressed data out of the input buffer, through the compressor,
|
||||
* and store the resulting compressed data in the output buffer */
|
||||
static enum b_status flush_input_buffer(struct b_cstream *stream)
|
||||
static enum b_status flush_input_buffer(struct b_cstream_p *stream)
|
||||
{
|
||||
if (!b_ringbuffer_available_data_remaining(&stream->s_in)) {
|
||||
if (!b_ringbuffer_available_data_remaining(stream->s_in)) {
|
||||
return B_ERR_NO_DATA;
|
||||
}
|
||||
|
||||
@@ -283,7 +298,7 @@ static enum b_status flush_input_buffer(struct b_cstream *stream)
|
||||
}
|
||||
|
||||
/* push compressed data from the output buffer into the endpoint */
|
||||
static enum b_status flush_output_buffer(struct b_cstream *stream)
|
||||
static enum b_status flush_output_buffer(struct b_cstream_p *stream)
|
||||
{
|
||||
enum b_status status = B_SUCCESS;
|
||||
size_t nr_written = 0;
|
||||
@@ -292,7 +307,7 @@ static enum b_status flush_output_buffer(struct b_cstream *stream)
|
||||
const void *data;
|
||||
size_t capacity;
|
||||
status = b_ringbuffer_open_read_buffer(
|
||||
&stream->s_out, &data, &capacity);
|
||||
stream->s_out, &data, &capacity);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
@@ -301,7 +316,7 @@ static enum b_status flush_output_buffer(struct b_cstream *stream)
|
||||
status = b_stream_write_bytes(
|
||||
stream->s_endpoint, data, capacity, &w);
|
||||
|
||||
b_ringbuffer_close_read_buffer(&stream->s_out, &data, w);
|
||||
b_ringbuffer_close_read_buffer(stream->s_out, &data, w);
|
||||
nr_written += w;
|
||||
stream->s_tx_bytes_compressed += w;
|
||||
stream->s_tx_bytes += w;
|
||||
@@ -322,11 +337,11 @@ static enum b_status flush_output_buffer(struct b_cstream *stream)
|
||||
return status;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_write(
|
||||
struct b_cstream *stream, const void *buf, size_t count,
|
||||
static enum b_status cstream_write(
|
||||
struct b_cstream_p *stream, const void *buf, size_t count,
|
||||
size_t *out_nr_written)
|
||||
{
|
||||
if (stream->s_mode != B_COMPRESSION_MODE_COMPRESS) {
|
||||
if (stream->s_mode != B_COMPRESSOR_MODE_COMPRESS) {
|
||||
return B_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
@@ -344,11 +359,11 @@ enum b_status b_cstream_write(
|
||||
enum b_status status = B_SUCCESS;
|
||||
|
||||
while (remaining > 0) {
|
||||
if (!b_ringbuffer_write_capacity_remaining(&stream->s_out)) {
|
||||
if (!b_ringbuffer_write_capacity_remaining(stream->s_out)) {
|
||||
status = flush_output_buffer(stream);
|
||||
}
|
||||
|
||||
if (!b_ringbuffer_write_capacity_remaining(&stream->s_in)) {
|
||||
if (!b_ringbuffer_write_capacity_remaining(stream->s_in)) {
|
||||
status = flush_input_buffer(stream);
|
||||
}
|
||||
|
||||
@@ -359,7 +374,7 @@ enum b_status b_cstream_write(
|
||||
void *data;
|
||||
size_t available;
|
||||
status = b_ringbuffer_open_write_buffer(
|
||||
&stream->s_in, &data, &available);
|
||||
stream->s_in, &data, &available);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
@@ -371,7 +386,7 @@ enum b_status b_cstream_write(
|
||||
|
||||
memcpy(data, src, to_copy);
|
||||
|
||||
b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy);
|
||||
b_ringbuffer_close_write_buffer(stream->s_in, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
src += to_copy;
|
||||
@@ -388,7 +403,7 @@ enum b_status b_cstream_write(
|
||||
}
|
||||
|
||||
static enum b_status skip_uncompressed(
|
||||
struct b_cstream *stream, size_t count, size_t *out_nr_skipped)
|
||||
struct b_cstream_p *stream, size_t count, size_t *out_nr_skipped)
|
||||
{
|
||||
size_t remaining = count;
|
||||
size_t nr_read_from_buf = 0;
|
||||
@@ -404,7 +419,7 @@ static enum b_status skip_uncompressed(
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = b_ringbuffer_open_read_buffer(
|
||||
&stream->s_in, &data, &available);
|
||||
stream->s_in, &data, &available);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
@@ -414,7 +429,7 @@ static enum b_status skip_uncompressed(
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy);
|
||||
b_ringbuffer_close_read_buffer(stream->s_in, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
stream->s_tx_bytes += to_copy;
|
||||
@@ -444,10 +459,10 @@ static enum b_status skip_uncompressed(
|
||||
return status;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_skip(
|
||||
struct b_cstream *stream, size_t count, size_t *out_nr_skipped)
|
||||
static enum b_status cstream_skip(
|
||||
struct b_cstream_p *stream, size_t count, size_t *out_nr_skipped)
|
||||
{
|
||||
if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) {
|
||||
if (stream->s_mode != B_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
return B_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
@@ -460,7 +475,7 @@ enum b_status b_cstream_skip(
|
||||
}
|
||||
|
||||
if (b_compressor_eof(stream->s_compressor)
|
||||
&& !b_ringbuffer_available_data_remaining(&stream->s_out)) {
|
||||
&& !b_ringbuffer_available_data_remaining(stream->s_out)) {
|
||||
if (out_nr_skipped) {
|
||||
*out_nr_skipped = 0;
|
||||
}
|
||||
@@ -473,7 +488,7 @@ enum b_status b_cstream_skip(
|
||||
enum b_status status = B_SUCCESS;
|
||||
|
||||
while (remaining > 0) {
|
||||
if (!b_ringbuffer_available_data_remaining(&stream->s_out)) {
|
||||
if (!b_ringbuffer_available_data_remaining(stream->s_out)) {
|
||||
status = refill_output_buffer(stream);
|
||||
}
|
||||
|
||||
@@ -484,7 +499,7 @@ enum b_status b_cstream_skip(
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = b_ringbuffer_open_read_buffer(
|
||||
&stream->s_out, &data, &available);
|
||||
stream->s_out, &data, &available);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
@@ -494,7 +509,7 @@ enum b_status b_cstream_skip(
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy);
|
||||
b_ringbuffer_close_read_buffer(stream->s_out, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
nr_read += to_copy;
|
||||
@@ -512,9 +527,9 @@ enum b_status b_cstream_skip(
|
||||
return status;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_reset(struct b_cstream *stream)
|
||||
static enum b_status cstream_reset(struct b_cstream_p *stream)
|
||||
{
|
||||
if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) {
|
||||
if (stream->s_mode != B_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
return B_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
@@ -525,8 +540,8 @@ enum b_status b_cstream_reset(struct b_cstream *stream)
|
||||
stream->s_flags = 0;
|
||||
|
||||
b_stream_seek(stream->s_endpoint, 0, B_STREAM_SEEK_START);
|
||||
b_ringbuffer_reset(&stream->s_in);
|
||||
b_ringbuffer_reset(&stream->s_out);
|
||||
b_ringbuffer_clear(stream->s_in);
|
||||
b_ringbuffer_clear(stream->s_out);
|
||||
b_compressor_reset(stream->s_compressor);
|
||||
|
||||
stream->s_compression_depth = 0;
|
||||
@@ -538,8 +553,8 @@ enum b_status b_cstream_reset(struct b_cstream *stream)
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_begin_compressed_section(
|
||||
struct b_cstream *stream, size_t *tx_uncompressed_bytes)
|
||||
static enum b_status cstream_begin_compressed_section(
|
||||
struct b_cstream_p *stream, size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
return B_ERR_BAD_STATE;
|
||||
@@ -562,8 +577,8 @@ enum b_status b_cstream_begin_compressed_section(
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_end_compressed_section(
|
||||
struct b_cstream *stream, size_t *tx_compressed_bytes,
|
||||
static enum b_status cstream_end_compressed_section(
|
||||
struct b_cstream_p *stream, size_t *tx_compressed_bytes,
|
||||
size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
@@ -583,7 +598,7 @@ enum b_status b_cstream_end_compressed_section(
|
||||
|
||||
stream->s_compression_depth = 0;
|
||||
|
||||
if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) {
|
||||
if (stream->s_mode == B_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
stream->s_tx_bytes_compressed = 0;
|
||||
stream->s_tx_bytes_uncompressed = 0;
|
||||
return B_SUCCESS;
|
||||
@@ -608,8 +623,8 @@ enum b_status b_cstream_end_compressed_section(
|
||||
}
|
||||
}
|
||||
|
||||
/* refresh these output variables to account for any data written by
|
||||
* b_compressor_end */
|
||||
/* refresh these output variables to account for any data
|
||||
* written by b_compressor_end */
|
||||
tx_compressed_bytes
|
||||
&& (*tx_compressed_bytes = stream->s_tx_bytes_compressed);
|
||||
|
||||
@@ -626,32 +641,33 @@ enum b_status b_cstream_end_compressed_section(
|
||||
return flush_output_buffer(stream);
|
||||
}
|
||||
|
||||
bool b_cstream_in_compressed_section(const struct b_cstream *stream)
|
||||
static bool cstream_in_compressed_section(const struct b_cstream_p *stream)
|
||||
{
|
||||
return stream->s_compression_depth > 0;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_tx_bytes(const struct b_cstream *stream, size_t *out)
|
||||
static enum b_status cstream_tx_bytes(const struct b_cstream_p *stream, size_t *out)
|
||||
{
|
||||
*out = stream->s_tx_bytes;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_tx_bytes_compressed(
|
||||
const struct b_cstream *stream, size_t *out)
|
||||
static enum b_status cstream_tx_bytes_compressed(
|
||||
const struct b_cstream_p *stream, size_t *out)
|
||||
{
|
||||
*out = stream->s_tx_bytes_compressed;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_tx_bytes_uncompressed(
|
||||
const struct b_cstream *stream, size_t *out)
|
||||
static enum b_status cstream_tx_bytes_uncompressed(
|
||||
const struct b_cstream_p *stream, size_t *out)
|
||||
{
|
||||
*out = stream->s_tx_bytes_uncompressed;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_set_cursor_position(struct b_cstream *stream, size_t pos)
|
||||
static enum b_status cstream_set_cursor_position(
|
||||
struct b_cstream_p *stream, size_t pos)
|
||||
{
|
||||
if (stream->s_compression_depth > 0) {
|
||||
return B_ERR_BAD_STATE;
|
||||
@@ -675,7 +691,7 @@ enum b_status b_cstream_set_cursor_position(struct b_cstream *stream, size_t pos
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_restore_cursor_position(struct b_cstream *stream)
|
||||
static enum b_status cstream_restore_cursor_position(struct b_cstream_p *stream)
|
||||
{
|
||||
if (!(stream->s_flags & CSTREAM_CURSOR_MOVED)) {
|
||||
return B_ERR_BAD_STATE;
|
||||
@@ -693,3 +709,182 @@ enum b_status b_cstream_restore_cursor_position(struct b_cstream *stream)
|
||||
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
/*** PUBLIC FUNCTIONS *********************************************************/
|
||||
|
||||
enum b_status b_cstream_open(
|
||||
b_stream *endpoint, b_type compressor_type, b_compressor_mode mode,
|
||||
b_cstream **out)
|
||||
{
|
||||
size_t inbuf_size = 0, outbuf_size = 0;
|
||||
enum b_status status = b_compressor_get_buffer_size(
|
||||
compressor_type, mode, &inbuf_size, &outbuf_size);
|
||||
if (!B_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
b_cstream *stream = b_object_create(B_TYPE_CSTREAM);
|
||||
if (!stream) {
|
||||
return B_ERR_NO_MEMORY;
|
||||
}
|
||||
|
||||
struct b_cstream_p *p = b_object_get_private(stream, B_TYPE_CSTREAM);
|
||||
b_stream_cfg *cfg = b_object_get_protected(stream, B_TYPE_STREAM);
|
||||
|
||||
p->s_mode = mode;
|
||||
p->s_endpoint = endpoint;
|
||||
|
||||
cfg->s_mode = (mode == B_COMPRESSOR_MODE_COMPRESS) ? B_STREAM_WRITE
|
||||
: B_STREAM_READ;
|
||||
|
||||
p->s_in = b_ringbuffer_create(inbuf_size + 1);
|
||||
if (!B_OK(status)) {
|
||||
free(stream);
|
||||
return status;
|
||||
}
|
||||
|
||||
p->s_out = b_ringbuffer_create(outbuf_size + 1);
|
||||
if (!B_OK(status)) {
|
||||
b_cstream_unref(stream);
|
||||
return status;
|
||||
}
|
||||
|
||||
p->s_compressor = b_object_create(compressor_type);
|
||||
if (!p->s_compressor) {
|
||||
b_cstream_unref(stream);
|
||||
|
||||
return B_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
b_compressor_set_buffer(p->s_compressor, p->s_in, p->s_out);
|
||||
b_compressor_set_mode(p->s_compressor, mode);
|
||||
|
||||
*out = stream;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_read(
|
||||
b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(
|
||||
B_TYPE_CSTREAM, cstream_read, stream, buf, count, out_nr_read);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_write(
|
||||
b_cstream *stream, const void *buf, size_t count, size_t *out_nr_written)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(
|
||||
B_TYPE_CSTREAM, cstream_write, stream, buf, count, out_nr_written);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_skip(b_cstream *stream, size_t count, size_t *out_nr_skipped)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(
|
||||
B_TYPE_CSTREAM, cstream_skip, stream, count, out_nr_skipped);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_reset(b_cstream *stream)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC_0(B_TYPE_CSTREAM, cstream_reset, stream);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_begin_compressed_section(
|
||||
b_cstream *stream, size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(
|
||||
B_TYPE_CSTREAM, cstream_begin_compressed_section, stream,
|
||||
tx_uncompressed_bytes);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_end_compressed_section(
|
||||
b_cstream *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(
|
||||
B_TYPE_CSTREAM, cstream_end_compressed_section, stream,
|
||||
tx_compressed_bytes, tx_uncompressed_bytes);
|
||||
}
|
||||
|
||||
bool b_cstream_in_compressed_section(const b_cstream *stream)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC_0(
|
||||
B_TYPE_CSTREAM, cstream_in_compressed_section, stream);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_tx_bytes(const b_cstream *stream, size_t *out)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(B_TYPE_CSTREAM, cstream_tx_bytes, stream, out);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_tx_bytes_compressed(const b_cstream *stream, size_t *out)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(
|
||||
B_TYPE_CSTREAM, cstream_tx_bytes_compressed, stream, out);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_tx_bytes_uncompressed(const b_cstream *stream, size_t *out)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(
|
||||
B_TYPE_CSTREAM, cstream_tx_bytes_uncompressed, stream, out);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_set_cursor_position(b_cstream *stream, size_t pos)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC(
|
||||
B_TYPE_CSTREAM, cstream_set_cursor_position, stream, pos);
|
||||
}
|
||||
|
||||
enum b_status b_cstream_restore_cursor_position(b_cstream *stream)
|
||||
{
|
||||
B_CLASS_DISPATCH_STATIC_0(
|
||||
B_TYPE_CSTREAM, cstream_restore_cursor_position, stream);
|
||||
}
|
||||
|
||||
/*** VIRTUAL FUNCTIONS ********************************************************/
|
||||
|
||||
static void cstream_init(b_object *obj, void *priv)
|
||||
{
|
||||
}
|
||||
|
||||
static void cstream_fini(b_object *obj, void *priv)
|
||||
{
|
||||
struct b_cstream_p *stream = priv;
|
||||
|
||||
if (stream->s_compressor) {
|
||||
b_compressor_unref(stream->s_compressor);
|
||||
}
|
||||
|
||||
if (stream->s_in) {
|
||||
b_ringbuffer_unref(stream->s_in);
|
||||
}
|
||||
|
||||
if (stream->s_out) {
|
||||
b_ringbuffer_unref(stream->s_out);
|
||||
}
|
||||
}
|
||||
|
||||
/*** CLASS DEFINITION *********************************************************/
|
||||
|
||||
B_TYPE_CLASS_DEFINITION_BEGIN(b_cstream)
|
||||
B_TYPE_CLASS_INTERFACE_BEGIN(b_object, B_TYPE_OBJECT)
|
||||
B_INTERFACE_ENTRY(to_string) = NULL;
|
||||
B_TYPE_CLASS_INTERFACE_END(b_object, B_TYPE_OBJECT)
|
||||
|
||||
B_TYPE_CLASS_INTERFACE_BEGIN(b_stream, B_TYPE_STREAM)
|
||||
B_INTERFACE_ENTRY(s_close) = NULL;
|
||||
B_INTERFACE_ENTRY(s_seek) = NULL;
|
||||
B_INTERFACE_ENTRY(s_tell) = NULL;
|
||||
B_INTERFACE_ENTRY(s_getc) = NULL;
|
||||
B_INTERFACE_ENTRY(s_read) = b_cstream_read;
|
||||
B_INTERFACE_ENTRY(s_write) = b_cstream_write;
|
||||
B_INTERFACE_ENTRY(s_reserve) = NULL;
|
||||
B_TYPE_CLASS_INTERFACE_END(b_stream, B_TYPE_STREAM)
|
||||
B_TYPE_CLASS_DEFINITION_END(b_cstream)
|
||||
|
||||
B_TYPE_DEFINITION_BEGIN(b_cstream)
|
||||
B_TYPE_ID(0xe1e899b5, 0x6a3c, 0x4f9c, 0xafd0, 0xaab3f156615c);
|
||||
B_TYPE_EXTENDS(B_TYPE_STREAM);
|
||||
B_TYPE_CLASS(b_cstream_class);
|
||||
B_TYPE_INSTANCE_PRIVATE(struct b_cstream_p);
|
||||
B_TYPE_INSTANCE_INIT(cstream_init);
|
||||
B_TYPE_INSTANCE_FINI(cstream_fini);
|
||||
B_TYPE_DEFINITION_END(b_cstream)
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
#ifndef _CSTREAM_H_
|
||||
#define _CSTREAM_H_
|
||||
|
||||
#include <blue/compress/function.h>
|
||||
#include <blue/core/ringbuffer.h>
|
||||
#include <blue/core/stream.h>
|
||||
#include <stddef.h>
|
||||
|
||||
struct b_stream;
|
||||
struct b_compressor;
|
||||
|
||||
enum cstream_flags {
|
||||
CSTREAM_CURSOR_MOVED = 0x01u,
|
||||
};
|
||||
|
||||
struct b_cstream {
|
||||
enum cstream_flags s_flags;
|
||||
b_stream *s_endpoint;
|
||||
struct b_compressor *s_compressor;
|
||||
/* s_in is the input buffer, and s_out is the output buffer.
|
||||
*
|
||||
* the input buffer holds data that will be provided to the
|
||||
* (de)compression function. in compression mode, this data is provided
|
||||
* by the code using the cstream (via b_cstream_write). in decompression
|
||||
* mode, this data is read from s_endpoint.
|
||||
*
|
||||
* the output buffer holds data produced by the (de)compression
|
||||
* function. in compression mode, this data will be written to
|
||||
* s_endpoint. in decompression mode, this data will be returned to the
|
||||
* code using the cstream (via b_cstream_read)
|
||||
*
|
||||
* heavy usage of cstream's compressed sections facility can result
|
||||
* in the input buffer holding uncompressed data while the stream is in
|
||||
* decompression mode. this is handled by the uncompressed read code path.
|
||||
*/
|
||||
struct b_ringbuffer s_in, s_out;
|
||||
enum b_compression_mode s_mode;
|
||||
|
||||
unsigned int s_compression_depth;
|
||||
/* tracks the number of bytes read from or written to the endpoint.
|
||||
* this counter is not reset at the beginning/end of each section.
|
||||
*
|
||||
* during compressed sections, this counter is incremented by the number
|
||||
* of compressed bytes written/consumed.
|
||||
*
|
||||
* during uncompressed sections, this counter is incremented by the
|
||||
* number of uncompressed bytes written/returned.
|
||||
*
|
||||
* this does not include bytes read/written while the cursor is moved.
|
||||
*/
|
||||
size_t s_tx_bytes;
|
||||
/* tracks the number of compressed bytes that have passed through this
|
||||
* stream in the current section.
|
||||
*
|
||||
* in compression mode, this tracks the number of post-compression bytes
|
||||
* that have been written to the endpoint within the current section,
|
||||
* including any bytes written during end_compression_section()
|
||||
*
|
||||
* in decompression mode, this tracks the number of compressed bytes
|
||||
* that were decompressed while reading the current section. it does not
|
||||
* include any uncompressed bytes that may have been read from the
|
||||
* endpoint while reading a compressed section due to cstream's
|
||||
* read-ahead caching behaviour.
|
||||
*/
|
||||
size_t s_tx_bytes_compressed;
|
||||
/* tracks the number of uncompressed bytes that have passed through this
|
||||
* stream in the current section.
|
||||
*
|
||||
* in compression mode, this tracks the number of bytes given to
|
||||
* b_cstream_write
|
||||
*
|
||||
* in decompression mode, this tracks the number of bytes returned by
|
||||
* b_cstream_read
|
||||
*/
|
||||
size_t s_tx_bytes_uncompressed;
|
||||
|
||||
/* when the endpoint cursor is moved, the previous cursor position is
|
||||
* saved here so it can be restored later */
|
||||
size_t s_cursor;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -1,16 +1,26 @@
|
||||
#ifndef BLUELIB_COMPRESS_CSTREAM_H_
|
||||
#define BLUELIB_COMPRESS_CSTREAM_H_
|
||||
#ifndef BLUE_COMPRESS_CSTREAM_H_
|
||||
#define BLUE_COMPRESS_CSTREAM_H_
|
||||
|
||||
#include <blue/compress/function.h>
|
||||
#include <blue/core/macros.h>
|
||||
#include <blue/core/stream.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
typedef struct b_cstream b_cstream;
|
||||
B_DECLS_BEGIN;
|
||||
|
||||
enum b_compressor_mode;
|
||||
|
||||
#define B_TYPE_CSTREAM (b_cstream_get_type())
|
||||
|
||||
B_DECLARE_TYPE(b_cstream);
|
||||
|
||||
B_TYPE_CLASS_DECLARATION_BEGIN(b_cstream)
|
||||
B_TYPE_CLASS_DECLARATION_END(b_cstream)
|
||||
|
||||
BLUE_API b_type b_cstream_get_type(void);
|
||||
|
||||
BLUE_API b_status b_cstream_open(
|
||||
b_stream *endpoint, const b_compression_function *func,
|
||||
b_compression_mode mode, b_cstream **out);
|
||||
BLUE_API b_status b_cstream_close(b_cstream *stream);
|
||||
b_stream *endpoint, b_type compressor_type, enum b_compressor_mode mode,
|
||||
b_cstream **out);
|
||||
|
||||
BLUE_API b_status b_cstream_read(
|
||||
b_cstream *stream, void *buf, size_t count, size_t *nr_read);
|
||||
@@ -35,4 +45,6 @@ BLUE_API b_status b_cstream_tx_bytes_uncompressed(
|
||||
BLUE_API b_status b_cstream_set_cursor_position(b_cstream *stream, size_t pos);
|
||||
BLUE_API b_status b_cstream_restore_cursor_position(b_cstream *stream);
|
||||
|
||||
B_DECLS_END;
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user