diff --git a/compress/cstream.c b/compress/cstream.c index e69de29..776d34e 100644 --- a/compress/cstream.c +++ b/compress/cstream.c @@ -0,0 +1,444 @@ +#include "cstream.h" + +#include +#include +#include +#include + +enum b_status b_cstream_open( + struct b_stream *endpoint, const struct b_compression_function *func, + enum b_compression_mode mode, struct b_cstream **out) +{ + size_t inbuf_size = 0, outbuf_size = 0; + enum b_status status = b_compression_function_get_buffer_size( + func, mode, &inbuf_size, &outbuf_size); + if (!B_OK(status)) { + return status; + } + + struct b_cstream *stream = malloc(sizeof *stream); + if (!stream) { + return B_ERR_NO_MEMORY; + } + + memset(stream, 0x0, sizeof *stream); + + stream->s_mode = mode; + stream->s_endpoint = endpoint; + + status = b_ringbuffer_init(&stream->s_in, inbuf_size + 1); + if (!B_OK(status)) { + free(stream); + return status; + } + + status = b_ringbuffer_init(&stream->s_out, outbuf_size + 1); + if (!B_OK(status)) { + b_ringbuffer_destroy(&stream->s_in); + free(stream); + return status; + } + + status = b_compressor_create( + func, mode, &stream->s_in, &stream->s_out, &stream->s_compressor); + if (!B_OK(status)) { + b_ringbuffer_destroy(&stream->s_in); + b_ringbuffer_destroy(&stream->s_out); + + return status; + } + + *out = stream; + return B_SUCCESS; +} + +enum b_status b_cstream_close(struct b_cstream *stream) +{ + b_compressor_destroy(stream->s_compressor); + b_ringbuffer_destroy(&stream->s_in); + b_ringbuffer_destroy(&stream->s_out); + free(stream); + + return B_SUCCESS; +} + +static enum b_status read_uncompressed( + struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) +{ + size_t remaining = count; + unsigned char *dest = buf; + size_t nr_read_from_buf = 0; + size_t nr_read_from_endpoint = 0; + enum b_status status = B_SUCCESS; + + /* liberal usage of begin_compressed_section and end_compressed_section + * can result in uncompressed data getting stuck in the input buffer. + * return any data remaining in the input buffer before reading more + * from the endpoint */ + + while (remaining > 0) { + const void *data; + size_t available; + status = b_ringbuffer_open_read_buffer( + &stream->s_in, &data, &available); + if (!B_OK(status)) { + break; + } + + size_t to_copy = count; + if (to_copy > available) { + to_copy = available; + } + + memcpy(dest, data, to_copy); + + b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy); + + stream->s_tx_uncompressed_bytes += to_copy; + dest += to_copy; + remaining -= to_copy; + nr_read_from_buf += to_copy; + } + + if (remaining == 0) { + *out_nr_read = nr_read_from_buf; + return B_SUCCESS; + } + + status = b_stream_read_bytes( + stream->s_endpoint, dest, remaining, &nr_read_from_endpoint); + stream->s_tx_uncompressed_bytes += nr_read_from_endpoint; + + *out_nr_read = nr_read_from_endpoint + nr_read_from_buf; + + return status; +} + +/* read compressed data from the endpoint and store it in the input buffer. + * note that uncompressed data that is trailing the compressed blob may + * also be read by this function, but this will be handled by read_uncompressed. + */ +static enum b_status refill_input_buffer(struct b_cstream *stream) +{ + enum b_status status = B_SUCCESS; + size_t nr_read = 0; + + while (1) { + void *data; + size_t capacity; + status = b_ringbuffer_open_write_buffer( + &stream->s_in, &data, &capacity); + if (!B_OK(status)) { + break; + } + + size_t r = 0; + status = b_stream_read_bytes( + stream->s_endpoint, data, capacity, &r); + + b_ringbuffer_close_write_buffer(&stream->s_in, &data, r); + nr_read += r; + + if (r < capacity) { + break; + } + + if (!B_OK(status)) { + break; + } + } + + if (status == B_ERR_NO_SPACE && nr_read > 0) { + status = B_SUCCESS; + } + + return status; +} + +/* push compressed data out of the input buffer, through the (de)compressor, + * and store the resulting uncompressed data in the output buffer */ +static enum b_status refill_output_buffer(struct b_cstream *stream) +{ + enum b_status status = B_SUCCESS; + + if (!b_ringbuffer_available_data_remaining(&stream->s_in)) { + status = refill_input_buffer(stream); + } + + if (!B_OK(status)) { + return status; + } + + size_t bytes_before = b_ringbuffer_available_data_remaining(&stream->s_in); + status = b_compressor_step(stream->s_compressor); + size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in); + + stream->s_tx_compressed_bytes += (bytes_before - bytes_after); + + return status; +} + +enum b_status b_cstream_read( + struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) +{ + if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) { + return B_ERR_BAD_STATE; + } + + if (stream->s_compression_depth == 0) { + return read_uncompressed(stream, buf, count, out_nr_read); + } + + if (b_compressor_eof(stream->s_compressor)) { + *out_nr_read = 0; + return B_SUCCESS; + } + + unsigned char *dest = buf; + size_t nr_read = 0; + size_t remaining = count; + enum b_status status = B_SUCCESS; + + while (remaining > 0) { + if (!b_ringbuffer_available_data_remaining(&stream->s_out)) { + status = refill_output_buffer(stream); + } + + if (!B_OK(status)) { + break; + } + + const void *data; + size_t available; + status = b_ringbuffer_open_read_buffer( + &stream->s_out, &data, &available); + if (!B_OK(status)) { + break; + } + + size_t to_copy = remaining; + if (to_copy > available) { + to_copy = available; + } + + memcpy(dest, data, to_copy); + + b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy); + + stream->s_tx_uncompressed_bytes += to_copy; + dest += to_copy; + nr_read += to_copy; + remaining -= to_copy; + } + + if (status == B_ERR_NO_DATA) { + status = B_SUCCESS; + } + + *out_nr_read = nr_read; + return status; +} + +static enum b_status write_uncompressed( + struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written) +{ + size_t w = 0; + enum b_status status + = b_stream_write_bytes(stream->s_endpoint, buf, count, &w); + + stream->s_tx_uncompressed_bytes += w; + + *nr_written = w; + return status; +} + +/* push uncompressed data out of the input buffer, through the compressor, + * and store the resulting compressed data in the output buffer */ +static enum b_status flush_input_buffer(struct b_cstream *stream) +{ + if (!b_ringbuffer_available_data_remaining(&stream->s_in)) { + return B_ERR_NO_DATA; + } + + return b_compressor_step(stream->s_compressor); +} + +/* push compressed data from the output buffer into the endpoint */ +static enum b_status flush_output_buffer(struct b_cstream *stream) +{ + enum b_status status = B_SUCCESS; + size_t nr_written = 0; + + while (1) { + const void *data; + size_t capacity; + status = b_ringbuffer_open_read_buffer( + &stream->s_out, &data, &capacity); + if (!B_OK(status)) { + break; + } + + size_t w = 0; + status = b_stream_write_bytes( + stream->s_endpoint, data, capacity, &w); + + b_ringbuffer_close_read_buffer(&stream->s_out, &data, w); + nr_written += w; + stream->s_tx_compressed_bytes += w; + + if (w < capacity) { + break; + } + + if (!B_OK(status)) { + break; + } + } + + if (status == B_ERR_NO_DATA && nr_written > 0) { + status = B_SUCCESS; + } + + return status; +} + +enum b_status b_cstream_write( + struct b_cstream *stream, const void *buf, size_t count, + size_t *out_nr_written) +{ + if (stream->s_mode != B_COMPRESSION_MODE_COMPRESS) { + return B_ERR_BAD_STATE; + } + + if (stream->s_compression_depth == 0) { + return write_uncompressed(stream, buf, count, out_nr_written); + } + + const unsigned char *src = buf; + size_t nr_written = 0; + size_t remaining = count; + enum b_status status = B_SUCCESS; + + while (remaining > 0) { + if (!b_ringbuffer_write_capacity_remaining(&stream->s_out)) { + status = flush_output_buffer(stream); + } + + if (!b_ringbuffer_write_capacity_remaining(&stream->s_in)) { + status = flush_input_buffer(stream); + } + + if (!B_OK(status)) { + break; + } + + void *data; + size_t available; + status = b_ringbuffer_open_write_buffer( + &stream->s_in, &data, &available); + if (!B_OK(status)) { + break; + } + + size_t to_copy = remaining; + if (to_copy > available) { + to_copy = available; + } + + memcpy(data, src, to_copy); + + b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy); + + stream->s_tx_uncompressed_bytes += to_copy; + src += to_copy; + nr_written += to_copy; + remaining -= to_copy; + } + + if (status == B_ERR_NO_DATA) { + status = B_SUCCESS; + } + + *out_nr_written = nr_written; + return status; +} + +enum b_status b_cstream_begin_compressed_section( + struct b_cstream *stream, size_t *tx_uncompressed_bytes) +{ + if (tx_uncompressed_bytes) { + *tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes; + } + + if (stream->s_compression_depth > 0) { + stream->s_compression_depth++; + return B_SUCCESS; + } + + stream->s_compression_depth = 1; + stream->s_tx_uncompressed_bytes = 0; + stream->s_tx_compressed_bytes = 0; + b_compressor_reset(stream->s_compressor); + + return B_SUCCESS; +} + +enum b_status b_cstream_end_compressed_section( + struct b_cstream *stream, size_t *tx_compressed_bytes, + size_t *tx_uncompressed_bytes) +{ + tx_compressed_bytes + && (*tx_compressed_bytes = stream->s_tx_compressed_bytes); + + tx_uncompressed_bytes + && (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes); + + if (stream->s_compression_depth > 1) { + stream->s_compression_depth--; + return B_SUCCESS; + } + + stream->s_compression_depth = 0; + + if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) { + stream->s_tx_compressed_bytes = 0; + stream->s_tx_uncompressed_bytes = 0; + return B_SUCCESS; + } + + enum b_status status = B_SUCCESS; + while (1) { + status = b_compressor_end(stream->s_compressor); + if (!B_OK(status) && status != B_ERR_NO_SPACE) { + break; + } + + status = flush_output_buffer(stream); + + if (!B_OK(status)) { + break; + } + + if (b_compressor_eof(stream->s_compressor)) { + status = B_SUCCESS; + break; + } + } + + /* refresh these output variables to account for any data written by + * b_compressor_end */ + tx_compressed_bytes + && (*tx_compressed_bytes = stream->s_tx_compressed_bytes); + + tx_uncompressed_bytes + && (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes); + + if (!B_OK(status)) { + return status; + } + + stream->s_tx_compressed_bytes = 0; + stream->s_tx_uncompressed_bytes = 0; + + return flush_output_buffer(stream); +} diff --git a/compress/cstream.h b/compress/cstream.h index e69de29..a40127d 100644 --- a/compress/cstream.h +++ b/compress/cstream.h @@ -0,0 +1,60 @@ +#ifndef _CSTREAM_H_ +#define _CSTREAM_H_ + +#include +#include +#include + +struct b_stream; +struct b_compressor; + +struct b_cstream { + struct b_stream *s_endpoint; + struct b_compressor *s_compressor; + /* s_in is the input buffer, and s_out is the output buffer. + * + * the input buffer holds data that will be provided to the + * (de)compression function. in compression mode, this data is provided + * by the code using the cstream (via b_cstream_write) in decompression + * mode, this data is read from s_endpoint. + * + * the output buffer holds data produced by the (de)compression + * function. in compression mode, this data will be written to + * s_endpoint. in decompression mode, this data will be returned to the + * code using the cstream (via b_cstream_read) + * + * heavy usage of cstream's compressed sections facility can result + * in the input buffer holding uncompressed data while the stream is in + * decompression mode. this is handled by the uncompressed read code path. + */ + struct b_ringbuffer s_in, s_out; + enum b_compression_mode s_mode; + + unsigned int s_compression_depth; + /* tracks the number of compressed bytes that have passed through this + * stream in the current section. + * + * in compression mode, this tracks the number of post-compression bytes + * that have been written to the endpoint within the current section, + * including any bytes written during end_compression_section() + * + * in decompression mode, this tracks the number of compressed bytes + * that were decompressed while reading the current section. it does not + * include any uncompressed bytes that may have been read from the + * endpoint while reading a compressed section due to cstream's + * read-ahead caching behaviour. + */ + size_t s_tx_compressed_bytes; + /* tracks the number of uncompressed bytes that have passed through this + * stream in the current section. + * + * in compression mode, this tracks the number of bytes given to + * b_cstream_write + * + * in decompression mode, this tracks the number of bytes returned by + * b_cstream_read + */ + size_t s_tx_uncompressed_bytes; +}; + +#endif diff --git a/compress/include/blue/compress/cstream.h b/compress/include/blue/compress/cstream.h index e69de29..cdffc80 100644 --- a/compress/include/blue/compress/cstream.h +++ b/compress/include/blue/compress/cstream.h @@ -0,0 +1,25 @@ +#ifndef BLUELIB_COMPRESS_CSTREAM_H_ +#define BLUELIB_COMPRESS_CSTREAM_H_ + +#include +#include + +typedef struct b_cstream b_cstream; + +BLUE_API b_status b_cstream_open( + b_stream *endpoint, const b_compression_function *func, + b_compression_mode mode, b_cstream **out); +BLUE_API b_status b_cstream_close(b_cstream *stream); + +BLUE_API b_status b_cstream_read( + b_cstream *stream, void *buf, size_t count, size_t *nr_read); +BLUE_API b_status b_cstream_write( + b_cstream *stream, const void *buf, size_t count, size_t *nr_written); + +BLUE_API b_status b_cstream_begin_compressed_section( + b_cstream *stream, size_t *tx_uncompressed_bytes); +BLUE_API b_status b_cstream_end_compressed_section( + b_cstream *stream, size_t *tx_compressed_bytes, + size_t *tx_uncompressed_bytes); + +#endif