compress: cstream: record the total number of bytes that pass through the endpoint across all (un)compressed sections

This commit is contained in:
2025-07-31 11:23:30 +01:00
parent ede5e72fc2
commit 17b6a02a4f
3 changed files with 69 additions and 20 deletions

View File

@@ -100,7 +100,8 @@ static enum b_status read_uncompressed(
b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy); b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy);
stream->s_tx_uncompressed_bytes += to_copy; stream->s_tx_bytes_uncompressed += to_copy;
stream->s_tx_bytes += to_copy;
dest += to_copy; dest += to_copy;
remaining -= to_copy; remaining -= to_copy;
nr_read_from_buf += to_copy; nr_read_from_buf += to_copy;
@@ -113,7 +114,8 @@ static enum b_status read_uncompressed(
status = b_stream_read_bytes( status = b_stream_read_bytes(
stream->s_endpoint, dest, remaining, &nr_read_from_endpoint); stream->s_endpoint, dest, remaining, &nr_read_from_endpoint);
stream->s_tx_uncompressed_bytes += nr_read_from_endpoint; stream->s_tx_bytes_uncompressed += nr_read_from_endpoint;
stream->s_tx_bytes += nr_read_from_endpoint;
*out_nr_read = nr_read_from_endpoint + nr_read_from_buf; *out_nr_read = nr_read_from_endpoint + nr_read_from_buf;
@@ -179,7 +181,8 @@ static enum b_status refill_output_buffer(struct b_cstream *stream)
status = b_compressor_step(stream->s_compressor); status = b_compressor_step(stream->s_compressor);
size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in); size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in);
stream->s_tx_compressed_bytes += (bytes_before - bytes_after); stream->s_tx_bytes_compressed += (bytes_before - bytes_after);
stream->s_tx_bytes += (bytes_before - bytes_after);
return status; return status;
} }
@@ -235,7 +238,7 @@ enum b_status b_cstream_read(
b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy); b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy);
stream->s_tx_uncompressed_bytes += to_copy; stream->s_tx_bytes_uncompressed += to_copy;
dest += to_copy; dest += to_copy;
nr_read += to_copy; nr_read += to_copy;
remaining -= to_copy; remaining -= to_copy;
@@ -262,7 +265,8 @@ static enum b_status write_uncompressed(
enum b_status status enum b_status status
= b_stream_write_bytes(stream->s_endpoint, buf, count, &w); = b_stream_write_bytes(stream->s_endpoint, buf, count, &w);
stream->s_tx_uncompressed_bytes += w; stream->s_tx_bytes_uncompressed += w;
stream->s_tx_bytes += w;
*nr_written = w; *nr_written = w;
return status; return status;
@@ -300,7 +304,8 @@ static enum b_status flush_output_buffer(struct b_cstream *stream)
b_ringbuffer_close_read_buffer(&stream->s_out, &data, w); b_ringbuffer_close_read_buffer(&stream->s_out, &data, w);
nr_written += w; nr_written += w;
stream->s_tx_compressed_bytes += w; stream->s_tx_bytes_compressed += w;
stream->s_tx_bytes += w;
if (w < capacity) { if (w < capacity) {
break; break;
@@ -369,7 +374,7 @@ enum b_status b_cstream_write(
b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy); b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy);
stream->s_tx_uncompressed_bytes += to_copy; stream->s_tx_bytes_uncompressed += to_copy;
src += to_copy; src += to_copy;
nr_written += to_copy; nr_written += to_copy;
remaining -= to_copy; remaining -= to_copy;
@@ -391,7 +396,7 @@ enum b_status b_cstream_begin_compressed_section(
} }
if (tx_uncompressed_bytes) { if (tx_uncompressed_bytes) {
*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes; *tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed;
} }
if (stream->s_compression_depth > 0) { if (stream->s_compression_depth > 0) {
@@ -400,8 +405,8 @@ enum b_status b_cstream_begin_compressed_section(
} }
stream->s_compression_depth = 1; stream->s_compression_depth = 1;
stream->s_tx_uncompressed_bytes = 0; stream->s_tx_bytes_uncompressed = 0;
stream->s_tx_compressed_bytes = 0; stream->s_tx_bytes_compressed = 0;
b_compressor_reset(stream->s_compressor); b_compressor_reset(stream->s_compressor);
return B_SUCCESS; return B_SUCCESS;
@@ -416,10 +421,10 @@ enum b_status b_cstream_end_compressed_section(
} }
tx_compressed_bytes tx_compressed_bytes
&& (*tx_compressed_bytes = stream->s_tx_compressed_bytes); && (*tx_compressed_bytes = stream->s_tx_bytes_compressed);
tx_uncompressed_bytes tx_uncompressed_bytes
&& (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes); && (*tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed);
if (stream->s_compression_depth > 1) { if (stream->s_compression_depth > 1) {
stream->s_compression_depth--; stream->s_compression_depth--;
@@ -429,8 +434,8 @@ enum b_status b_cstream_end_compressed_section(
stream->s_compression_depth = 0; stream->s_compression_depth = 0;
if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) { if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) {
stream->s_tx_compressed_bytes = 0; stream->s_tx_bytes_compressed = 0;
stream->s_tx_uncompressed_bytes = 0; stream->s_tx_bytes_uncompressed = 0;
return B_SUCCESS; return B_SUCCESS;
} }
@@ -456,21 +461,46 @@ enum b_status b_cstream_end_compressed_section(
/* refresh these output variables to account for any data written by /* refresh these output variables to account for any data written by
* b_compressor_end */ * b_compressor_end */
tx_compressed_bytes tx_compressed_bytes
&& (*tx_compressed_bytes = stream->s_tx_compressed_bytes); && (*tx_compressed_bytes = stream->s_tx_bytes_compressed);
tx_uncompressed_bytes tx_uncompressed_bytes
&& (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes); && (*tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed);
if (!B_OK(status)) { if (!B_OK(status)) {
return status; return status;
} }
stream->s_tx_compressed_bytes = 0; stream->s_tx_bytes_compressed = 0;
stream->s_tx_uncompressed_bytes = 0; stream->s_tx_bytes_uncompressed = 0;
return flush_output_buffer(stream); return flush_output_buffer(stream);
} }
bool b_cstream_in_compressed_section(const struct b_cstream *stream)
{
return stream->s_compression_depth > 0;
}
enum b_status b_cstream_tx_bytes(const struct b_cstream *stream, size_t *out)
{
*out = stream->s_tx_bytes;
return B_SUCCESS;
}
enum b_status b_cstream_tx_bytes_compressed(
const struct b_cstream *stream, size_t *out)
{
*out = stream->s_tx_bytes_compressed;
return B_SUCCESS;
}
enum b_status b_cstream_tx_bytes_uncompressed(
const struct b_cstream *stream, size_t *out)
{
*out = stream->s_tx_bytes_uncompressed;
return B_SUCCESS;
}
enum b_status b_cstream_set_cursor_position(struct b_cstream *stream, size_t pos) enum b_status b_cstream_set_cursor_position(struct b_cstream *stream, size_t pos)
{ {
if (stream->s_compression_depth > 0) { if (stream->s_compression_depth > 0) {

View File

@@ -36,6 +36,18 @@ struct b_cstream {
enum b_compression_mode s_mode; enum b_compression_mode s_mode;
unsigned int s_compression_depth; unsigned int s_compression_depth;
/* tracks the number of bytes read from or written to the endpoint.
* this counter is not reset at the beginning/end of each section.
*
* during compressed sections, this counter is incremented by the number
* of compressed bytes written/consumed.
*
* during uncompressed sections, this counter is incremented by the
* number of uncompressed bytes written/returned.
*
* this does not include bytes read/written while the cursor is moved.
*/
size_t s_tx_bytes;
/* tracks the number of compressed bytes that have passed through this /* tracks the number of compressed bytes that have passed through this
* stream in the current section. * stream in the current section.
* *
@@ -49,7 +61,7 @@ struct b_cstream {
* endpoint while reading a compressed section due to cstream's * endpoint while reading a compressed section due to cstream's
* read-ahead caching behaviour. * read-ahead caching behaviour.
*/ */
size_t s_tx_compressed_bytes; size_t s_tx_bytes_compressed;
/* tracks the number of uncompressed bytes that have passed through this /* tracks the number of uncompressed bytes that have passed through this
* stream in the current section. * stream in the current section.
* *
@@ -59,7 +71,7 @@ struct b_cstream {
* in decompression mode, this tracks the number of bytes returned by * in decompression mode, this tracks the number of bytes returned by
* b_cstream_read * b_cstream_read
*/ */
size_t s_tx_uncompressed_bytes; size_t s_tx_bytes_uncompressed;
/* when the endpoint cursor is moved, the previous cursor position is /* when the endpoint cursor is moved, the previous cursor position is
* saved here so it can be restored later */ * saved here so it can be restored later */

View File

@@ -3,6 +3,7 @@
#include <blue/compress/function.h> #include <blue/compress/function.h>
#include <blue/core/stream.h> #include <blue/core/stream.h>
#include <stdbool.h>
typedef struct b_cstream b_cstream; typedef struct b_cstream b_cstream;
@@ -21,6 +22,12 @@ BLUE_API b_status b_cstream_begin_compressed_section(
BLUE_API b_status b_cstream_end_compressed_section( BLUE_API b_status b_cstream_end_compressed_section(
b_cstream *stream, size_t *tx_compressed_bytes, b_cstream *stream, size_t *tx_compressed_bytes,
size_t *tx_uncompressed_bytes); size_t *tx_uncompressed_bytes);
BLUE_API bool b_cstream_in_compressed_section(const b_cstream *stream);
BLUE_API b_status b_cstream_tx_bytes(const b_cstream *stream, size_t *out);
BLUE_API b_status b_cstream_tx_bytes_compressed(
const b_cstream *stream, size_t *out);
BLUE_API b_status b_cstream_tx_bytes_uncompressed(
const b_cstream *stream, size_t *out);
BLUE_API b_status b_cstream_set_cursor_position(b_cstream *stream, size_t pos); BLUE_API b_status b_cstream_set_cursor_position(b_cstream *stream, size_t pos);
BLUE_API b_status b_cstream_restore_cursor_position(b_cstream *stream); BLUE_API b_status b_cstream_restore_cursor_position(b_cstream *stream);