From a1c1fee30135c815e0f7f3e89fcb55352e497276 Mon Sep 17 00:00:00 2001 From: Max Wash Date: Sat, 9 Aug 2025 19:45:34 +0100 Subject: [PATCH] compress: cstream: add skip() and reset() functions --- compress/cstream.c | 162 ++++++++++++++++++++++- compress/cstream.h | 2 +- compress/include/blue/compress/cstream.h | 3 + 3 files changed, 160 insertions(+), 7 deletions(-) diff --git a/compress/cstream.c b/compress/cstream.c index 1650e0a..04d969c 100644 --- a/compress/cstream.c +++ b/compress/cstream.c @@ -91,7 +91,7 @@ static enum b_status read_uncompressed( break; } - size_t to_copy = count; + size_t to_copy = remaining; if (to_copy > available) { to_copy = available; } @@ -169,6 +169,10 @@ static enum b_status refill_output_buffer(struct b_cstream *stream) { enum b_status status = B_SUCCESS; + if (b_compressor_eof(stream->s_compressor)) { + return B_ERR_NO_DATA; + } + if (!b_ringbuffer_available_data_remaining(&stream->s_in)) { status = refill_input_buffer(stream); } @@ -202,11 +206,6 @@ enum b_status b_cstream_read( return read_uncompressed(stream, buf, count, out_nr_read); } - if (b_compressor_eof(stream->s_compressor)) { - *out_nr_read = 0; - return B_SUCCESS; - } - unsigned char *dest = buf; size_t nr_read = 0; size_t remaining = count; @@ -388,6 +387,157 @@ enum b_status b_cstream_write( return status; } +static enum b_status skip_uncompressed( + struct b_cstream *stream, size_t count, size_t *out_nr_skipped) +{ + size_t remaining = count; + size_t nr_read_from_buf = 0; + size_t nr_read_from_endpoint = 0; + enum b_status status = B_SUCCESS; + + /* liberal usage of begin_compressed_section and end_compressed_section + * can result in uncompressed data getting stuck in the input buffer. + * return any data remaining in the input buffer before reading more + * from the endpoint */ + + while (remaining > 0) { + const void *data; + size_t available; + status = b_ringbuffer_open_read_buffer( + &stream->s_in, &data, &available); + if (!B_OK(status)) { + break; + } + + size_t to_copy = remaining; + if (to_copy > available) { + to_copy = available; + } + + b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy); + + stream->s_tx_bytes_uncompressed += to_copy; + stream->s_tx_bytes += to_copy; + remaining -= to_copy; + nr_read_from_buf += to_copy; + } + + if (remaining == 0) { + if (out_nr_skipped) { + *out_nr_skipped = nr_read_from_buf; + } + + return B_SUCCESS; + } + + size_t cursor = b_stream_cursor(stream->s_endpoint); + + status = b_stream_seek(stream->s_endpoint, remaining, B_STREAM_SEEK_CURRENT); + nr_read_from_endpoint = b_stream_cursor(stream->s_endpoint) - cursor; + stream->s_tx_bytes_uncompressed += nr_read_from_endpoint; + stream->s_tx_bytes += nr_read_from_endpoint; + + if (out_nr_skipped) { + *out_nr_skipped = nr_read_from_endpoint + nr_read_from_buf; + } + + return status; +} + +enum b_status b_cstream_skip( + struct b_cstream *stream, size_t count, size_t *out_nr_skipped) +{ + if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) { + return B_ERR_BAD_STATE; + } + + if (stream->s_flags & CSTREAM_CURSOR_MOVED) { + return B_ERR_BAD_STATE; + } + + if (stream->s_compression_depth == 0) { + return skip_uncompressed(stream, count, out_nr_skipped); + } + + if (b_compressor_eof(stream->s_compressor) + && !b_ringbuffer_available_data_remaining(&stream->s_out)) { + if (out_nr_skipped) { + *out_nr_skipped = 0; + } + + return B_SUCCESS; + } + + size_t nr_read = 0; + size_t remaining = count; + enum b_status status = B_SUCCESS; + + while (remaining > 0) { + if (!b_ringbuffer_available_data_remaining(&stream->s_out)) { + status = refill_output_buffer(stream); + } + + if (!B_OK(status)) { + break; + } + + const void *data; + size_t available; + status = b_ringbuffer_open_read_buffer( + &stream->s_out, &data, &available); + if (!B_OK(status)) { + break; + } + + size_t to_copy = remaining; + if (to_copy > available) { + to_copy = available; + } + + b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy); + + stream->s_tx_bytes_uncompressed += to_copy; + nr_read += to_copy; + remaining -= to_copy; + } + + if (status == B_ERR_NO_DATA) { + status = B_SUCCESS; + } + + if (out_nr_skipped) { + *out_nr_skipped = nr_read; + } + + return status; +} + +enum b_status b_cstream_reset(struct b_cstream *stream) +{ + if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) { + return B_ERR_BAD_STATE; + } + + if (stream->s_flags & CSTREAM_CURSOR_MOVED) { + return B_ERR_BAD_STATE; + } + + stream->s_flags = 0; + + b_stream_seek(stream->s_endpoint, 0, B_STREAM_SEEK_START); + b_ringbuffer_reset(&stream->s_in); + b_ringbuffer_reset(&stream->s_out); + b_compressor_reset(stream->s_compressor); + + stream->s_compression_depth = 0; + stream->s_tx_bytes = 0; + stream->s_tx_bytes_uncompressed = 0; + stream->s_tx_bytes_compressed = 0; + stream->s_cursor = 0; + + return B_SUCCESS; +} + enum b_status b_cstream_begin_compressed_section( struct b_cstream *stream, size_t *tx_uncompressed_bytes) { diff --git a/compress/cstream.h b/compress/cstream.h index e27dc4e..70c906d 100644 --- a/compress/cstream.h +++ b/compress/cstream.h @@ -20,7 +20,7 @@ struct b_cstream { * * the input buffer holds data that will be provided to the * (de)compression function. in compression mode, this data is provided - * by the code using the cstream (via b_cstream_write) in decompression + * by the code using the cstream (via b_cstream_write). in decompression * mode, this data is read from s_endpoint. * * the output buffer holds data produced by the (de)compression diff --git a/compress/include/blue/compress/cstream.h b/compress/include/blue/compress/cstream.h index 08750ef..9dd176e 100644 --- a/compress/include/blue/compress/cstream.h +++ b/compress/include/blue/compress/cstream.h @@ -16,6 +16,9 @@ BLUE_API b_status b_cstream_read( b_cstream *stream, void *buf, size_t count, size_t *nr_read); BLUE_API b_status b_cstream_write( b_cstream *stream, const void *buf, size_t count, size_t *nr_written); +BLUE_API b_status b_cstream_skip( + b_cstream *stream, size_t count, size_t *nr_skipped); +BLUE_API b_status b_cstream_reset(b_cstream *stream); BLUE_API b_status b_cstream_begin_compressed_section( b_cstream *stream, size_t *tx_uncompressed_bytes);