#include "cstream.h" #include #include #include #include enum b_status b_cstream_open( struct b_stream *endpoint, const struct b_compression_function *func, enum b_compression_mode mode, struct b_cstream **out) { size_t inbuf_size = 0, outbuf_size = 0; enum b_status status = b_compression_function_get_buffer_size( func, mode, &inbuf_size, &outbuf_size); if (!B_OK(status)) { return status; } struct b_cstream *stream = malloc(sizeof *stream); if (!stream) { return B_ERR_NO_MEMORY; } memset(stream, 0x0, sizeof *stream); stream->s_mode = mode; stream->s_endpoint = endpoint; status = b_ringbuffer_init(&stream->s_in, inbuf_size + 1); if (!B_OK(status)) { free(stream); return status; } status = b_ringbuffer_init(&stream->s_out, outbuf_size + 1); if (!B_OK(status)) { b_ringbuffer_destroy(&stream->s_in); free(stream); return status; } status = b_compressor_create( func, mode, &stream->s_in, &stream->s_out, &stream->s_compressor); if (!B_OK(status)) { b_ringbuffer_destroy(&stream->s_in); b_ringbuffer_destroy(&stream->s_out); return status; } *out = stream; return B_SUCCESS; } enum b_status b_cstream_close(struct b_cstream *stream) { b_compressor_destroy(stream->s_compressor); b_ringbuffer_destroy(&stream->s_in); b_ringbuffer_destroy(&stream->s_out); free(stream); return B_SUCCESS; } static enum b_status read_uncompressed( struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) { size_t remaining = count; unsigned char *dest = buf; size_t nr_read_from_buf = 0; size_t nr_read_from_endpoint = 0; enum b_status status = B_SUCCESS; /* liberal usage of begin_compressed_section and end_compressed_section * can result in uncompressed data getting stuck in the input buffer. * return any data remaining in the input buffer before reading more * from the endpoint */ while (remaining > 0) { const void *data; size_t available; status = b_ringbuffer_open_read_buffer( &stream->s_in, &data, &available); if (!B_OK(status)) { break; } size_t to_copy = count; if (to_copy > available) { to_copy = available; } memcpy(dest, data, to_copy); b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy); stream->s_tx_uncompressed_bytes += to_copy; dest += to_copy; remaining -= to_copy; nr_read_from_buf += to_copy; } if (remaining == 0) { *out_nr_read = nr_read_from_buf; return B_SUCCESS; } status = b_stream_read_bytes( stream->s_endpoint, dest, remaining, &nr_read_from_endpoint); stream->s_tx_uncompressed_bytes += nr_read_from_endpoint; *out_nr_read = nr_read_from_endpoint + nr_read_from_buf; return status; } /* read compressed data from the endpoint and store it in the input buffer. * note that uncompressed data that is trailing the compressed blob may * also be read by this function, but this will be handled by read_uncompressed. */ static enum b_status refill_input_buffer(struct b_cstream *stream) { enum b_status status = B_SUCCESS; size_t nr_read = 0; while (1) { void *data; size_t capacity; status = b_ringbuffer_open_write_buffer( &stream->s_in, &data, &capacity); if (!B_OK(status)) { break; } size_t r = 0; status = b_stream_read_bytes( stream->s_endpoint, data, capacity, &r); b_ringbuffer_close_write_buffer(&stream->s_in, &data, r); nr_read += r; if (r < capacity) { break; } if (!B_OK(status)) { break; } } if (status == B_ERR_NO_SPACE && nr_read > 0) { status = B_SUCCESS; } return status; } /* push compressed data out of the input buffer, through the (de)compressor, * and store the resulting uncompressed data in the output buffer */ static enum b_status refill_output_buffer(struct b_cstream *stream) { enum b_status status = B_SUCCESS; if (!b_ringbuffer_available_data_remaining(&stream->s_in)) { status = refill_input_buffer(stream); } if (!B_OK(status)) { return status; } size_t bytes_before = b_ringbuffer_available_data_remaining(&stream->s_in); status = b_compressor_step(stream->s_compressor); size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in); stream->s_tx_compressed_bytes += (bytes_before - bytes_after); return status; } enum b_status b_cstream_read( struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read) { if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) { return B_ERR_BAD_STATE; } if (stream->s_compression_depth == 0) { return read_uncompressed(stream, buf, count, out_nr_read); } if (b_compressor_eof(stream->s_compressor)) { *out_nr_read = 0; return B_SUCCESS; } unsigned char *dest = buf; size_t nr_read = 0; size_t remaining = count; enum b_status status = B_SUCCESS; while (remaining > 0) { if (!b_ringbuffer_available_data_remaining(&stream->s_out)) { status = refill_output_buffer(stream); } if (!B_OK(status)) { break; } const void *data; size_t available; status = b_ringbuffer_open_read_buffer( &stream->s_out, &data, &available); if (!B_OK(status)) { break; } size_t to_copy = remaining; if (to_copy > available) { to_copy = available; } memcpy(dest, data, to_copy); b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy); stream->s_tx_uncompressed_bytes += to_copy; dest += to_copy; nr_read += to_copy; remaining -= to_copy; } if (status == B_ERR_NO_DATA) { status = B_SUCCESS; } *out_nr_read = nr_read; return status; } static enum b_status write_uncompressed( struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written) { size_t w = 0; enum b_status status = b_stream_write_bytes(stream->s_endpoint, buf, count, &w); stream->s_tx_uncompressed_bytes += w; *nr_written = w; return status; } /* push uncompressed data out of the input buffer, through the compressor, * and store the resulting compressed data in the output buffer */ static enum b_status flush_input_buffer(struct b_cstream *stream) { if (!b_ringbuffer_available_data_remaining(&stream->s_in)) { return B_ERR_NO_DATA; } return b_compressor_step(stream->s_compressor); } /* push compressed data from the output buffer into the endpoint */ static enum b_status flush_output_buffer(struct b_cstream *stream) { enum b_status status = B_SUCCESS; size_t nr_written = 0; while (1) { const void *data; size_t capacity; status = b_ringbuffer_open_read_buffer( &stream->s_out, &data, &capacity); if (!B_OK(status)) { break; } size_t w = 0; status = b_stream_write_bytes( stream->s_endpoint, data, capacity, &w); b_ringbuffer_close_read_buffer(&stream->s_out, &data, w); nr_written += w; stream->s_tx_compressed_bytes += w; if (w < capacity) { break; } if (!B_OK(status)) { break; } } if (status == B_ERR_NO_DATA && nr_written > 0) { status = B_SUCCESS; } return status; } enum b_status b_cstream_write( struct b_cstream *stream, const void *buf, size_t count, size_t *out_nr_written) { if (stream->s_mode != B_COMPRESSION_MODE_COMPRESS) { return B_ERR_BAD_STATE; } if (stream->s_compression_depth == 0) { return write_uncompressed(stream, buf, count, out_nr_written); } const unsigned char *src = buf; size_t nr_written = 0; size_t remaining = count; enum b_status status = B_SUCCESS; while (remaining > 0) { if (!b_ringbuffer_write_capacity_remaining(&stream->s_out)) { status = flush_output_buffer(stream); } if (!b_ringbuffer_write_capacity_remaining(&stream->s_in)) { status = flush_input_buffer(stream); } if (!B_OK(status)) { break; } void *data; size_t available; status = b_ringbuffer_open_write_buffer( &stream->s_in, &data, &available); if (!B_OK(status)) { break; } size_t to_copy = remaining; if (to_copy > available) { to_copy = available; } memcpy(data, src, to_copy); b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy); stream->s_tx_uncompressed_bytes += to_copy; src += to_copy; nr_written += to_copy; remaining -= to_copy; } if (status == B_ERR_NO_DATA) { status = B_SUCCESS; } *out_nr_written = nr_written; return status; } enum b_status b_cstream_begin_compressed_section( struct b_cstream *stream, size_t *tx_uncompressed_bytes) { if (tx_uncompressed_bytes) { *tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes; } if (stream->s_compression_depth > 0) { stream->s_compression_depth++; return B_SUCCESS; } stream->s_compression_depth = 1; stream->s_tx_uncompressed_bytes = 0; stream->s_tx_compressed_bytes = 0; b_compressor_reset(stream->s_compressor); return B_SUCCESS; } enum b_status b_cstream_end_compressed_section( struct b_cstream *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes) { tx_compressed_bytes && (*tx_compressed_bytes = stream->s_tx_compressed_bytes); tx_uncompressed_bytes && (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes); if (stream->s_compression_depth > 1) { stream->s_compression_depth--; return B_SUCCESS; } stream->s_compression_depth = 0; if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) { stream->s_tx_compressed_bytes = 0; stream->s_tx_uncompressed_bytes = 0; return B_SUCCESS; } enum b_status status = B_SUCCESS; while (1) { status = b_compressor_end(stream->s_compressor); if (!B_OK(status) && status != B_ERR_NO_SPACE) { break; } status = flush_output_buffer(stream); if (!B_OK(status)) { break; } if (b_compressor_eof(stream->s_compressor)) { status = B_SUCCESS; break; } } /* refresh these output variables to account for any data written by * b_compressor_end */ tx_compressed_bytes && (*tx_compressed_bytes = stream->s_tx_compressed_bytes); tx_uncompressed_bytes && (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes); if (!B_OK(status)) { return status; } stream->s_tx_compressed_bytes = 0; stream->s_tx_uncompressed_bytes = 0; return flush_output_buffer(stream); }