From ede5e72fc2ce9c85193b3f75e8ca7c4e91c0079d Mon Sep 17 00:00:00 2001 From: Max Wash Date: Thu, 31 Jul 2025 11:18:10 +0100 Subject: [PATCH] compress: cstream: add support for temporarily moving the cursor and performing limited i/o operations the cursor can only be moved during uncompressed i/o, and any read/write operations are performed directly on the underlying endpoint with no buffering, and don't count towards the transacted byte statistics. the cursor can only be moved once, after which it's position must be restored. --- compress/cstream.c | 71 ++++++++++++++++++++++++ compress/cstream.h | 9 +++ compress/include/blue/compress/cstream.h | 3 + 3 files changed, 83 insertions(+) diff --git a/compress/cstream.c b/compress/cstream.c index 776d34e..d104bdb 100644 --- a/compress/cstream.c +++ b/compress/cstream.c @@ -62,6 +62,12 @@ enum b_status b_cstream_close(struct b_cstream *stream) return B_SUCCESS; } +static enum b_status read_cursor( + struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) +{ + return b_stream_read_bytes(stream->s_endpoint, buf, count, out_nr_read); +} + static enum b_status read_uncompressed( struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) { @@ -185,6 +191,10 @@ enum b_status b_cstream_read( return B_ERR_BAD_STATE; } + if (stream->s_flags & CSTREAM_CURSOR_MOVED) { + return read_cursor(stream, buf, count, out_nr_read); + } + if (stream->s_compression_depth == 0) { return read_uncompressed(stream, buf, count, out_nr_read); } @@ -239,6 +249,12 @@ enum b_status b_cstream_read( return status; } +static enum b_status write_cursor( + struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written) +{ + return b_stream_write_bytes(stream->s_endpoint, buf, count, nr_written); +} + static enum b_status write_uncompressed( struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written) { @@ -310,6 +326,10 @@ enum b_status b_cstream_write( return B_ERR_BAD_STATE; } + if (stream->s_flags & CSTREAM_CURSOR_MOVED) { + return write_cursor(stream, buf, count, out_nr_written); + } + if (stream->s_compression_depth == 0) { return write_uncompressed(stream, buf, count, out_nr_written); } @@ -366,6 +386,10 @@ enum b_status b_cstream_write( enum b_status b_cstream_begin_compressed_section( struct b_cstream *stream, size_t *tx_uncompressed_bytes) { + if (stream->s_flags & CSTREAM_CURSOR_MOVED) { + return B_ERR_BAD_STATE; + } + if (tx_uncompressed_bytes) { *tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes; } @@ -387,6 +411,10 @@ enum b_status b_cstream_end_compressed_section( struct b_cstream *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes) { + if (stream->s_flags & CSTREAM_CURSOR_MOVED) { + return B_ERR_BAD_STATE; + } + tx_compressed_bytes && (*tx_compressed_bytes = stream->s_tx_compressed_bytes); @@ -442,3 +470,46 @@ enum b_status b_cstream_end_compressed_section( return flush_output_buffer(stream); } + +enum b_status b_cstream_set_cursor_position(struct b_cstream *stream, size_t pos) +{ + if (stream->s_compression_depth > 0) { + return B_ERR_BAD_STATE; + } + + if (stream->s_flags & CSTREAM_CURSOR_MOVED) { + return B_ERR_BAD_STATE; + } + + stream->s_cursor = b_stream_cursor(stream->s_endpoint); + + enum b_status status + = b_stream_seek(stream->s_endpoint, pos, B_STREAM_SEEK_START); + if (!B_OK(status)) { + stream->s_cursor = 0; + return status; + } + + stream->s_flags |= CSTREAM_CURSOR_MOVED; + + return B_SUCCESS; +} + +enum b_status b_cstream_restore_cursor_position(struct b_cstream *stream) +{ + if (!(stream->s_flags & CSTREAM_CURSOR_MOVED)) { + return B_ERR_BAD_STATE; + } + + enum b_status status = b_stream_seek( + stream->s_endpoint, stream->s_cursor, B_STREAM_SEEK_START); + stream->s_cursor = 0; + + if (!B_OK(status)) { + return status; + } + + stream->s_flags &= ~CSTREAM_CURSOR_MOVED; + + return B_SUCCESS; +} diff --git a/compress/cstream.h b/compress/cstream.h index a40127d..90cc46a 100644 --- a/compress/cstream.h +++ b/compress/cstream.h @@ -8,7 +8,12 @@ struct b_stream; struct b_compressor; +enum cstream_flags { + CSTREAM_CURSOR_MOVED = 0x01u, +}; + struct b_cstream { + enum cstream_flags s_flags; struct b_stream *s_endpoint; struct b_compressor *s_compressor; /* s_in is the input buffer, and s_out is the output buffer. @@ -55,6 +60,10 @@ struct b_cstream { * b_cstream_read */ size_t s_tx_uncompressed_bytes; + + /* when the endpoint cursor is moved, the previous cursor position is + * saved here so it can be restored later */ + size_t s_cursor; }; #endif diff --git a/compress/include/blue/compress/cstream.h b/compress/include/blue/compress/cstream.h index cdffc80..270468c 100644 --- a/compress/include/blue/compress/cstream.h +++ b/compress/include/blue/compress/cstream.h @@ -22,4 +22,7 @@ BLUE_API b_status b_cstream_end_compressed_section( b_cstream *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes); +BLUE_API b_status b_cstream_set_cursor_position(b_cstream *stream, size_t pos); +BLUE_API b_status b_cstream_restore_cursor_position(b_cstream *stream); + #endif