diff --git a/compress/cstream.c b/compress/cstream.c index 283f145..5da6148 100644 --- a/compress/cstream.c +++ b/compress/cstream.c @@ -1,75 +1,90 @@ -#include "cstream.h" - #include #include #include #include -enum b_status b_cstream_open( - b_stream *endpoint, const struct b_compression_function *func, - enum b_compression_mode mode, struct b_cstream **out) -{ - size_t inbuf_size = 0, outbuf_size = 0; - enum b_status status = b_compression_function_get_buffer_size( - func, mode, &inbuf_size, &outbuf_size); - if (!B_OK(status)) { - return status; - } +/*** PRIVATE DATA *************************************************************/ - struct b_cstream *stream = malloc(sizeof *stream); - if (!stream) { - return B_ERR_NO_MEMORY; - } +enum cstream_flags { + CSTREAM_CURSOR_MOVED = 0x01u, +}; - memset(stream, 0x0, sizeof *stream); +struct b_cstream_p { + enum cstream_flags s_flags; + b_stream *s_endpoint; + b_compressor *s_compressor; + /* s_in is the input buffer, and s_out is the output buffer. + * + * the input buffer holds data that will be provided to the + * (de)compression function. in compression mode, this data is provided + * by the code using the cstream (via b_cstream_write). in decompression + * mode, this data is read from s_endpoint. + * + * the output buffer holds data produced by the (de)compression + * function. in compression mode, this data will be written to + * s_endpoint. in decompression mode, this data will be returned to the + * code using the cstream (via b_cstream_read) + * + * heavy usage of cstream's compressed sections facility can result + * in the input buffer holding uncompressed data while the stream is in + * decompression mode. this is handled by the uncompressed read code path. + */ + b_ringbuffer *s_in, *s_out; + b_compressor_mode s_mode; - stream->s_mode = mode; - stream->s_endpoint = endpoint; + unsigned int s_compression_depth; + /* tracks the number of bytes read from or written to the endpoint. + * this counter is not reset at the beginning/end of each section. + * + * during compressed sections, this counter is incremented by the number + * of compressed bytes written/consumed. + * + * during uncompressed sections, this counter is incremented by the + * number of uncompressed bytes written/returned. + * + * this does not include bytes read/written while the cursor is moved. + */ + size_t s_tx_bytes; + /* tracks the number of compressed bytes that have passed through this + * stream in the current section. + * + * in compression mode, this tracks the number of post-compression bytes + * that have been written to the endpoint within the current section, + * including any bytes written during end_compression_section() + * + * in decompression mode, this tracks the number of compressed bytes + * that were decompressed while reading the current section. it does not + * include any uncompressed bytes that may have been read from the + * endpoint while reading a compressed section due to cstream's + * read-ahead caching behaviour. + */ + size_t s_tx_bytes_compressed; + /* tracks the number of uncompressed bytes that have passed through this + * stream in the current section. + * + * in compression mode, this tracks the number of bytes given to + * b_cstream_write + * + * in decompression mode, this tracks the number of bytes returned by + * b_cstream_read + */ + size_t s_tx_bytes_uncompressed; - status = b_ringbuffer_init(&stream->s_in, inbuf_size + 1); - if (!B_OK(status)) { - free(stream); - return status; - } + /* when the endpoint cursor is moved, the previous cursor position is + * saved here so it can be restored later */ + size_t s_cursor; +}; - status = b_ringbuffer_init(&stream->s_out, outbuf_size + 1); - if (!B_OK(status)) { - b_ringbuffer_destroy(&stream->s_in); - free(stream); - return status; - } - - status = b_compressor_create( - func, mode, &stream->s_in, &stream->s_out, &stream->s_compressor); - if (!B_OK(status)) { - b_ringbuffer_destroy(&stream->s_in); - b_ringbuffer_destroy(&stream->s_out); - - return status; - } - - *out = stream; - return B_SUCCESS; -} - -enum b_status b_cstream_close(struct b_cstream *stream) -{ - b_compressor_destroy(stream->s_compressor); - b_ringbuffer_destroy(&stream->s_in); - b_ringbuffer_destroy(&stream->s_out); - free(stream); - - return B_SUCCESS; -} +/*** PRIVATE FUNCTIONS ********************************************************/ static enum b_status read_cursor( - struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) + struct b_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read) { return b_stream_read_bytes(stream->s_endpoint, buf, count, out_nr_read); } static enum b_status read_uncompressed( - struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) + struct b_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read) { size_t remaining = count; unsigned char *dest = buf; @@ -86,7 +101,7 @@ static enum b_status read_uncompressed( const void *data; size_t available; status = b_ringbuffer_open_read_buffer( - &stream->s_in, &data, &available); + stream->s_in, &data, &available); if (!B_OK(status)) { break; } @@ -98,7 +113,7 @@ static enum b_status read_uncompressed( memcpy(dest, data, to_copy); - b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy); + b_ringbuffer_close_read_buffer(stream->s_in, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; stream->s_tx_bytes += to_copy; @@ -126,7 +141,7 @@ static enum b_status read_uncompressed( * note that uncompressed data that is trailing the compressed blob may * also be read by this function, but this will be handled by read_uncompressed. */ -static enum b_status refill_input_buffer(struct b_cstream *stream) +static enum b_status refill_input_buffer(struct b_cstream_p *stream) { enum b_status status = B_SUCCESS; size_t nr_read = 0; @@ -135,7 +150,7 @@ static enum b_status refill_input_buffer(struct b_cstream *stream) void *data; size_t capacity; status = b_ringbuffer_open_write_buffer( - &stream->s_in, &data, &capacity); + stream->s_in, &data, &capacity); if (!B_OK(status)) { break; } @@ -144,7 +159,7 @@ static enum b_status refill_input_buffer(struct b_cstream *stream) status = b_stream_read_bytes( stream->s_endpoint, data, capacity, &r); - b_ringbuffer_close_write_buffer(&stream->s_in, &data, r); + b_ringbuffer_close_write_buffer(stream->s_in, &data, r); nr_read += r; if (r < capacity) { @@ -165,7 +180,7 @@ static enum b_status refill_input_buffer(struct b_cstream *stream) /* push compressed data out of the input buffer, through the (de)compressor, * and store the resulting uncompressed data in the output buffer */ -static enum b_status refill_output_buffer(struct b_cstream *stream) +static enum b_status refill_output_buffer(struct b_cstream_p *stream) { enum b_status status = B_SUCCESS; @@ -173,7 +188,7 @@ static enum b_status refill_output_buffer(struct b_cstream *stream) return B_ERR_NO_DATA; } - if (!b_ringbuffer_available_data_remaining(&stream->s_in)) { + if (!b_ringbuffer_available_data_remaining(stream->s_in)) { status = refill_input_buffer(stream); } @@ -181,9 +196,9 @@ static enum b_status refill_output_buffer(struct b_cstream *stream) return status; } - size_t bytes_before = b_ringbuffer_available_data_remaining(&stream->s_in); + size_t bytes_before = b_ringbuffer_available_data_remaining(stream->s_in); status = b_compressor_step(stream->s_compressor); - size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in); + size_t bytes_after = b_ringbuffer_available_data_remaining(stream->s_in); stream->s_tx_bytes_compressed += (bytes_before - bytes_after); stream->s_tx_bytes += (bytes_before - bytes_after); @@ -191,10 +206,10 @@ static enum b_status refill_output_buffer(struct b_cstream *stream) return status; } -enum b_status b_cstream_read( - struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) +static enum b_status cstream_read( + struct b_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read) { - if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) { + if (stream->s_mode != B_COMPRESSOR_MODE_DECOMPRESS) { return B_ERR_BAD_STATE; } @@ -212,7 +227,7 @@ enum b_status b_cstream_read( enum b_status status = B_SUCCESS; while (remaining > 0) { - if (!b_ringbuffer_available_data_remaining(&stream->s_out)) { + if (!b_ringbuffer_available_data_remaining(stream->s_out)) { status = refill_output_buffer(stream); } @@ -223,7 +238,7 @@ enum b_status b_cstream_read( const void *data; size_t available; status = b_ringbuffer_open_read_buffer( - &stream->s_out, &data, &available); + stream->s_out, &data, &available); if (!B_OK(status)) { break; } @@ -235,7 +250,7 @@ enum b_status b_cstream_read( memcpy(dest, data, to_copy); - b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy); + b_ringbuffer_close_read_buffer(stream->s_out, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; dest += to_copy; @@ -252,13 +267,13 @@ enum b_status b_cstream_read( } static enum b_status write_cursor( - struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written) + struct b_cstream_p *stream, const void *buf, size_t count, size_t *nr_written) { return b_stream_write_bytes(stream->s_endpoint, buf, count, nr_written); } static enum b_status write_uncompressed( - struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written) + struct b_cstream_p *stream, const void *buf, size_t count, size_t *nr_written) { size_t w = 0; enum b_status status @@ -273,9 +288,9 @@ static enum b_status write_uncompressed( /* push uncompressed data out of the input buffer, through the compressor, * and store the resulting compressed data in the output buffer */ -static enum b_status flush_input_buffer(struct b_cstream *stream) +static enum b_status flush_input_buffer(struct b_cstream_p *stream) { - if (!b_ringbuffer_available_data_remaining(&stream->s_in)) { + if (!b_ringbuffer_available_data_remaining(stream->s_in)) { return B_ERR_NO_DATA; } @@ -283,7 +298,7 @@ static enum b_status flush_input_buffer(struct b_cstream *stream) } /* push compressed data from the output buffer into the endpoint */ -static enum b_status flush_output_buffer(struct b_cstream *stream) +static enum b_status flush_output_buffer(struct b_cstream_p *stream) { enum b_status status = B_SUCCESS; size_t nr_written = 0; @@ -292,7 +307,7 @@ static enum b_status flush_output_buffer(struct b_cstream *stream) const void *data; size_t capacity; status = b_ringbuffer_open_read_buffer( - &stream->s_out, &data, &capacity); + stream->s_out, &data, &capacity); if (!B_OK(status)) { break; } @@ -301,7 +316,7 @@ static enum b_status flush_output_buffer(struct b_cstream *stream) status = b_stream_write_bytes( stream->s_endpoint, data, capacity, &w); - b_ringbuffer_close_read_buffer(&stream->s_out, &data, w); + b_ringbuffer_close_read_buffer(stream->s_out, &data, w); nr_written += w; stream->s_tx_bytes_compressed += w; stream->s_tx_bytes += w; @@ -322,11 +337,11 @@ static enum b_status flush_output_buffer(struct b_cstream *stream) return status; } -enum b_status b_cstream_write( - struct b_cstream *stream, const void *buf, size_t count, +static enum b_status cstream_write( + struct b_cstream_p *stream, const void *buf, size_t count, size_t *out_nr_written) { - if (stream->s_mode != B_COMPRESSION_MODE_COMPRESS) { + if (stream->s_mode != B_COMPRESSOR_MODE_COMPRESS) { return B_ERR_BAD_STATE; } @@ -344,11 +359,11 @@ enum b_status b_cstream_write( enum b_status status = B_SUCCESS; while (remaining > 0) { - if (!b_ringbuffer_write_capacity_remaining(&stream->s_out)) { + if (!b_ringbuffer_write_capacity_remaining(stream->s_out)) { status = flush_output_buffer(stream); } - if (!b_ringbuffer_write_capacity_remaining(&stream->s_in)) { + if (!b_ringbuffer_write_capacity_remaining(stream->s_in)) { status = flush_input_buffer(stream); } @@ -359,7 +374,7 @@ enum b_status b_cstream_write( void *data; size_t available; status = b_ringbuffer_open_write_buffer( - &stream->s_in, &data, &available); + stream->s_in, &data, &available); if (!B_OK(status)) { break; } @@ -371,7 +386,7 @@ enum b_status b_cstream_write( memcpy(data, src, to_copy); - b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy); + b_ringbuffer_close_write_buffer(stream->s_in, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; src += to_copy; @@ -388,7 +403,7 @@ enum b_status b_cstream_write( } static enum b_status skip_uncompressed( - struct b_cstream *stream, size_t count, size_t *out_nr_skipped) + struct b_cstream_p *stream, size_t count, size_t *out_nr_skipped) { size_t remaining = count; size_t nr_read_from_buf = 0; @@ -404,7 +419,7 @@ static enum b_status skip_uncompressed( const void *data; size_t available; status = b_ringbuffer_open_read_buffer( - &stream->s_in, &data, &available); + stream->s_in, &data, &available); if (!B_OK(status)) { break; } @@ -414,7 +429,7 @@ static enum b_status skip_uncompressed( to_copy = available; } - b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy); + b_ringbuffer_close_read_buffer(stream->s_in, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; stream->s_tx_bytes += to_copy; @@ -444,10 +459,10 @@ static enum b_status skip_uncompressed( return status; } -enum b_status b_cstream_skip( - struct b_cstream *stream, size_t count, size_t *out_nr_skipped) +static enum b_status cstream_skip( + struct b_cstream_p *stream, size_t count, size_t *out_nr_skipped) { - if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) { + if (stream->s_mode != B_COMPRESSOR_MODE_DECOMPRESS) { return B_ERR_BAD_STATE; } @@ -460,7 +475,7 @@ enum b_status b_cstream_skip( } if (b_compressor_eof(stream->s_compressor) - && !b_ringbuffer_available_data_remaining(&stream->s_out)) { + && !b_ringbuffer_available_data_remaining(stream->s_out)) { if (out_nr_skipped) { *out_nr_skipped = 0; } @@ -473,7 +488,7 @@ enum b_status b_cstream_skip( enum b_status status = B_SUCCESS; while (remaining > 0) { - if (!b_ringbuffer_available_data_remaining(&stream->s_out)) { + if (!b_ringbuffer_available_data_remaining(stream->s_out)) { status = refill_output_buffer(stream); } @@ -484,7 +499,7 @@ enum b_status b_cstream_skip( const void *data; size_t available; status = b_ringbuffer_open_read_buffer( - &stream->s_out, &data, &available); + stream->s_out, &data, &available); if (!B_OK(status)) { break; } @@ -494,7 +509,7 @@ enum b_status b_cstream_skip( to_copy = available; } - b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy); + b_ringbuffer_close_read_buffer(stream->s_out, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; nr_read += to_copy; @@ -512,9 +527,9 @@ enum b_status b_cstream_skip( return status; } -enum b_status b_cstream_reset(struct b_cstream *stream) +static enum b_status cstream_reset(struct b_cstream_p *stream) { - if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) { + if (stream->s_mode != B_COMPRESSOR_MODE_DECOMPRESS) { return B_ERR_BAD_STATE; } @@ -525,8 +540,8 @@ enum b_status b_cstream_reset(struct b_cstream *stream) stream->s_flags = 0; b_stream_seek(stream->s_endpoint, 0, B_STREAM_SEEK_START); - b_ringbuffer_reset(&stream->s_in); - b_ringbuffer_reset(&stream->s_out); + b_ringbuffer_clear(stream->s_in); + b_ringbuffer_clear(stream->s_out); b_compressor_reset(stream->s_compressor); stream->s_compression_depth = 0; @@ -538,8 +553,8 @@ enum b_status b_cstream_reset(struct b_cstream *stream) return B_SUCCESS; } -enum b_status b_cstream_begin_compressed_section( - struct b_cstream *stream, size_t *tx_uncompressed_bytes) +static enum b_status cstream_begin_compressed_section( + struct b_cstream_p *stream, size_t *tx_uncompressed_bytes) { if (stream->s_flags & CSTREAM_CURSOR_MOVED) { return B_ERR_BAD_STATE; @@ -562,8 +577,8 @@ enum b_status b_cstream_begin_compressed_section( return B_SUCCESS; } -enum b_status b_cstream_end_compressed_section( - struct b_cstream *stream, size_t *tx_compressed_bytes, +static enum b_status cstream_end_compressed_section( + struct b_cstream_p *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes) { if (stream->s_flags & CSTREAM_CURSOR_MOVED) { @@ -583,7 +598,7 @@ enum b_status b_cstream_end_compressed_section( stream->s_compression_depth = 0; - if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) { + if (stream->s_mode == B_COMPRESSOR_MODE_DECOMPRESS) { stream->s_tx_bytes_compressed = 0; stream->s_tx_bytes_uncompressed = 0; return B_SUCCESS; @@ -608,8 +623,8 @@ enum b_status b_cstream_end_compressed_section( } } - /* refresh these output variables to account for any data written by - * b_compressor_end */ + /* refresh these output variables to account for any data + * written by b_compressor_end */ tx_compressed_bytes && (*tx_compressed_bytes = stream->s_tx_bytes_compressed); @@ -626,32 +641,33 @@ enum b_status b_cstream_end_compressed_section( return flush_output_buffer(stream); } -bool b_cstream_in_compressed_section(const struct b_cstream *stream) +static bool cstream_in_compressed_section(const struct b_cstream_p *stream) { return stream->s_compression_depth > 0; } -enum b_status b_cstream_tx_bytes(const struct b_cstream *stream, size_t *out) +static enum b_status cstream_tx_bytes(const struct b_cstream_p *stream, size_t *out) { *out = stream->s_tx_bytes; return B_SUCCESS; } -enum b_status b_cstream_tx_bytes_compressed( - const struct b_cstream *stream, size_t *out) +static enum b_status cstream_tx_bytes_compressed( + const struct b_cstream_p *stream, size_t *out) { *out = stream->s_tx_bytes_compressed; return B_SUCCESS; } -enum b_status b_cstream_tx_bytes_uncompressed( - const struct b_cstream *stream, size_t *out) +static enum b_status cstream_tx_bytes_uncompressed( + const struct b_cstream_p *stream, size_t *out) { *out = stream->s_tx_bytes_uncompressed; return B_SUCCESS; } -enum b_status b_cstream_set_cursor_position(struct b_cstream *stream, size_t pos) +static enum b_status cstream_set_cursor_position( + struct b_cstream_p *stream, size_t pos) { if (stream->s_compression_depth > 0) { return B_ERR_BAD_STATE; @@ -675,7 +691,7 @@ enum b_status b_cstream_set_cursor_position(struct b_cstream *stream, size_t pos return B_SUCCESS; } -enum b_status b_cstream_restore_cursor_position(struct b_cstream *stream) +static enum b_status cstream_restore_cursor_position(struct b_cstream_p *stream) { if (!(stream->s_flags & CSTREAM_CURSOR_MOVED)) { return B_ERR_BAD_STATE; @@ -693,3 +709,182 @@ enum b_status b_cstream_restore_cursor_position(struct b_cstream *stream) return B_SUCCESS; } + +/*** PUBLIC FUNCTIONS *********************************************************/ + +enum b_status b_cstream_open( + b_stream *endpoint, b_type compressor_type, b_compressor_mode mode, + b_cstream **out) +{ + size_t inbuf_size = 0, outbuf_size = 0; + enum b_status status = b_compressor_get_buffer_size( + compressor_type, mode, &inbuf_size, &outbuf_size); + if (!B_OK(status)) { + return status; + } + + b_cstream *stream = b_object_create(B_TYPE_CSTREAM); + if (!stream) { + return B_ERR_NO_MEMORY; + } + + struct b_cstream_p *p = b_object_get_private(stream, B_TYPE_CSTREAM); + b_stream_cfg *cfg = b_object_get_protected(stream, B_TYPE_STREAM); + + p->s_mode = mode; + p->s_endpoint = endpoint; + + cfg->s_mode = (mode == B_COMPRESSOR_MODE_COMPRESS) ? B_STREAM_WRITE + : B_STREAM_READ; + + p->s_in = b_ringbuffer_create(inbuf_size + 1); + if (!B_OK(status)) { + free(stream); + return status; + } + + p->s_out = b_ringbuffer_create(outbuf_size + 1); + if (!B_OK(status)) { + b_cstream_unref(stream); + return status; + } + + p->s_compressor = b_object_create(compressor_type); + if (!p->s_compressor) { + b_cstream_unref(stream); + + return B_ERR_INVALID_ARGUMENT; + } + + b_compressor_set_buffer(p->s_compressor, p->s_in, p->s_out); + b_compressor_set_mode(p->s_compressor, mode); + + *out = stream; + return B_SUCCESS; +} + +enum b_status b_cstream_read( + b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) +{ + B_CLASS_DISPATCH_STATIC( + B_TYPE_CSTREAM, cstream_read, stream, buf, count, out_nr_read); +} + +enum b_status b_cstream_write( + b_cstream *stream, const void *buf, size_t count, size_t *out_nr_written) +{ + B_CLASS_DISPATCH_STATIC( + B_TYPE_CSTREAM, cstream_write, stream, buf, count, out_nr_written); +} + +enum b_status b_cstream_skip(b_cstream *stream, size_t count, size_t *out_nr_skipped) +{ + B_CLASS_DISPATCH_STATIC( + B_TYPE_CSTREAM, cstream_skip, stream, count, out_nr_skipped); +} + +enum b_status b_cstream_reset(b_cstream *stream) +{ + B_CLASS_DISPATCH_STATIC_0(B_TYPE_CSTREAM, cstream_reset, stream); +} + +enum b_status b_cstream_begin_compressed_section( + b_cstream *stream, size_t *tx_uncompressed_bytes) +{ + B_CLASS_DISPATCH_STATIC( + B_TYPE_CSTREAM, cstream_begin_compressed_section, stream, + tx_uncompressed_bytes); +} + +enum b_status b_cstream_end_compressed_section( + b_cstream *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes) +{ + B_CLASS_DISPATCH_STATIC( + B_TYPE_CSTREAM, cstream_end_compressed_section, stream, + tx_compressed_bytes, tx_uncompressed_bytes); +} + +bool b_cstream_in_compressed_section(const b_cstream *stream) +{ + B_CLASS_DISPATCH_STATIC_0( + B_TYPE_CSTREAM, cstream_in_compressed_section, stream); +} + +enum b_status b_cstream_tx_bytes(const b_cstream *stream, size_t *out) +{ + B_CLASS_DISPATCH_STATIC(B_TYPE_CSTREAM, cstream_tx_bytes, stream, out); +} + +enum b_status b_cstream_tx_bytes_compressed(const b_cstream *stream, size_t *out) +{ + B_CLASS_DISPATCH_STATIC( + B_TYPE_CSTREAM, cstream_tx_bytes_compressed, stream, out); +} + +enum b_status b_cstream_tx_bytes_uncompressed(const b_cstream *stream, size_t *out) +{ + B_CLASS_DISPATCH_STATIC( + B_TYPE_CSTREAM, cstream_tx_bytes_uncompressed, stream, out); +} + +enum b_status b_cstream_set_cursor_position(b_cstream *stream, size_t pos) +{ + B_CLASS_DISPATCH_STATIC( + B_TYPE_CSTREAM, cstream_set_cursor_position, stream, pos); +} + +enum b_status b_cstream_restore_cursor_position(b_cstream *stream) +{ + B_CLASS_DISPATCH_STATIC_0( + B_TYPE_CSTREAM, cstream_restore_cursor_position, stream); +} + +/*** VIRTUAL FUNCTIONS ********************************************************/ + +static void cstream_init(b_object *obj, void *priv) +{ +} + +static void cstream_fini(b_object *obj, void *priv) +{ + struct b_cstream_p *stream = priv; + + if (stream->s_compressor) { + b_compressor_unref(stream->s_compressor); + } + + if (stream->s_in) { + b_ringbuffer_unref(stream->s_in); + } + + if (stream->s_out) { + b_ringbuffer_unref(stream->s_out); + } +} + +/*** CLASS DEFINITION *********************************************************/ + +B_TYPE_CLASS_DEFINITION_BEGIN(b_cstream) + B_TYPE_CLASS_INTERFACE_BEGIN(b_object, B_TYPE_OBJECT) + B_INTERFACE_ENTRY(to_string) = NULL; + B_TYPE_CLASS_INTERFACE_END(b_object, B_TYPE_OBJECT) + + B_TYPE_CLASS_INTERFACE_BEGIN(b_stream, B_TYPE_STREAM) + B_INTERFACE_ENTRY(s_close) = NULL; + B_INTERFACE_ENTRY(s_seek) = NULL; + B_INTERFACE_ENTRY(s_tell) = NULL; + B_INTERFACE_ENTRY(s_getc) = NULL; + B_INTERFACE_ENTRY(s_read) = b_cstream_read; + B_INTERFACE_ENTRY(s_write) = b_cstream_write; + B_INTERFACE_ENTRY(s_reserve) = NULL; + B_TYPE_CLASS_INTERFACE_END(b_stream, B_TYPE_STREAM) +B_TYPE_CLASS_DEFINITION_END(b_cstream) + +B_TYPE_DEFINITION_BEGIN(b_cstream) + B_TYPE_ID(0xe1e899b5, 0x6a3c, 0x4f9c, 0xafd0, 0xaab3f156615c); + B_TYPE_EXTENDS(B_TYPE_STREAM); + B_TYPE_CLASS(b_cstream_class); + B_TYPE_INSTANCE_PRIVATE(struct b_cstream_p); + B_TYPE_INSTANCE_INIT(cstream_init); + B_TYPE_INSTANCE_FINI(cstream_fini); +B_TYPE_DEFINITION_END(b_cstream) diff --git a/compress/cstream.h b/compress/cstream.h deleted file mode 100644 index 7add9a6..0000000 --- a/compress/cstream.h +++ /dev/null @@ -1,82 +0,0 @@ -#ifndef _CSTREAM_H_ -#define _CSTREAM_H_ - -#include -#include -#include -#include - -struct b_stream; -struct b_compressor; - -enum cstream_flags { - CSTREAM_CURSOR_MOVED = 0x01u, -}; - -struct b_cstream { - enum cstream_flags s_flags; - b_stream *s_endpoint; - struct b_compressor *s_compressor; - /* s_in is the input buffer, and s_out is the output buffer. - * - * the input buffer holds data that will be provided to the - * (de)compression function. in compression mode, this data is provided - * by the code using the cstream (via b_cstream_write). in decompression - * mode, this data is read from s_endpoint. - * - * the output buffer holds data produced by the (de)compression - * function. in compression mode, this data will be written to - * s_endpoint. in decompression mode, this data will be returned to the - * code using the cstream (via b_cstream_read) - * - * heavy usage of cstream's compressed sections facility can result - * in the input buffer holding uncompressed data while the stream is in - * decompression mode. this is handled by the uncompressed read code path. - */ - struct b_ringbuffer s_in, s_out; - enum b_compression_mode s_mode; - - unsigned int s_compression_depth; - /* tracks the number of bytes read from or written to the endpoint. - * this counter is not reset at the beginning/end of each section. - * - * during compressed sections, this counter is incremented by the number - * of compressed bytes written/consumed. - * - * during uncompressed sections, this counter is incremented by the - * number of uncompressed bytes written/returned. - * - * this does not include bytes read/written while the cursor is moved. - */ - size_t s_tx_bytes; - /* tracks the number of compressed bytes that have passed through this - * stream in the current section. - * - * in compression mode, this tracks the number of post-compression bytes - * that have been written to the endpoint within the current section, - * including any bytes written during end_compression_section() - * - * in decompression mode, this tracks the number of compressed bytes - * that were decompressed while reading the current section. it does not - * include any uncompressed bytes that may have been read from the - * endpoint while reading a compressed section due to cstream's - * read-ahead caching behaviour. - */ - size_t s_tx_bytes_compressed; - /* tracks the number of uncompressed bytes that have passed through this - * stream in the current section. - * - * in compression mode, this tracks the number of bytes given to - * b_cstream_write - * - * in decompression mode, this tracks the number of bytes returned by - * b_cstream_read - */ - size_t s_tx_bytes_uncompressed; - - /* when the endpoint cursor is moved, the previous cursor position is - * saved here so it can be restored later */ - size_t s_cursor; -}; - -#endif diff --git a/compress/include/blue/compress/cstream.h b/compress/include/blue/compress/cstream.h index 9dd176e..f382326 100644 --- a/compress/include/blue/compress/cstream.h +++ b/compress/include/blue/compress/cstream.h @@ -1,16 +1,26 @@ -#ifndef BLUELIB_COMPRESS_CSTREAM_H_ -#define BLUELIB_COMPRESS_CSTREAM_H_ +#ifndef BLUE_COMPRESS_CSTREAM_H_ +#define BLUE_COMPRESS_CSTREAM_H_ -#include +#include #include #include -typedef struct b_cstream b_cstream; +B_DECLS_BEGIN; + +enum b_compressor_mode; + +#define B_TYPE_CSTREAM (b_cstream_get_type()) + +B_DECLARE_TYPE(b_cstream); + +B_TYPE_CLASS_DECLARATION_BEGIN(b_cstream) +B_TYPE_CLASS_DECLARATION_END(b_cstream) + +BLUE_API b_type b_cstream_get_type(void); BLUE_API b_status b_cstream_open( - b_stream *endpoint, const b_compression_function *func, - b_compression_mode mode, b_cstream **out); -BLUE_API b_status b_cstream_close(b_cstream *stream); + b_stream *endpoint, b_type compressor_type, enum b_compressor_mode mode, + b_cstream **out); BLUE_API b_status b_cstream_read( b_cstream *stream, void *buf, size_t count, size_t *nr_read); @@ -35,4 +45,6 @@ BLUE_API b_status b_cstream_tx_bytes_uncompressed( BLUE_API b_status b_cstream_set_cursor_position(b_cstream *stream, size_t pos); BLUE_API b_status b_cstream_restore_cursor_position(b_cstream *stream); +B_DECLS_END; + #endif