#include #include #include #include /*** PRIVATE DATA *************************************************************/ enum cstream_flags { CSTREAM_CURSOR_MOVED = 0x01u, }; struct fx_cstream_p { enum cstream_flags s_flags; fx_stream *s_endpoint; fx_compressor *s_compressor; /* s_in is the input buffer, and s_out is the output buffer. * * the input buffer holds data that will be provided to the * (de)compression function. in compression mode, this data is provided * by the code using the cstream (via fx_cstream_write). in decompression * mode, this data is read from s_endpoint. * * the output buffer holds data produced by the (de)compression * function. in compression mode, this data will be written to * s_endpoint. in decompression mode, this data will be returned to the * code using the cstream (via fx_cstream_read) * * heavy usage of cstream's compressed sections facility can result * in the input buffer holding uncompressed data while the stream is in * decompression mode. this is handled by the uncompressed read code path. */ fx_ringbuffer *s_in, *s_out; fx_compressor_mode s_mode; unsigned int s_compression_depth; /* tracks the number of bytes read from or written to the endpoint. * this counter is not reset at the beginning/end of each section. * * during compressed sections, this counter is incremented by the number * of compressed bytes written/consumed. * * during uncompressed sections, this counter is incremented by the * number of uncompressed bytes written/returned. * * this does not include bytes read/written while the cursor is moved. */ size_t s_tx_bytes; /* tracks the number of compressed bytes that have passed through this * stream in the current section. * * in compression mode, this tracks the number of post-compression bytes * that have been written to the endpoint within the current section, * including any bytes written during end_compression_section() * * in decompression mode, this tracks the number of compressed bytes * that were decompressed while reading the current section. it does not * include any uncompressed bytes that may have been read from the * endpoint while reading a compressed section due to cstream's * read-ahead caching behaviour. */ size_t s_tx_bytes_compressed; /* tracks the number of uncompressed bytes that have passed through this * stream in the current section. * * in compression mode, this tracks the number of bytes given to * fx_cstream_write * * in decompression mode, this tracks the number of bytes returned by * fx_cstream_read */ size_t s_tx_bytes_uncompressed; /* when the endpoint cursor is moved, the previous cursor position is * saved here so it can be restored later */ size_t s_cursor; }; /*** PRIVATE FUNCTIONS ********************************************************/ static enum fx_status read_cursor( struct fx_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read) { return fx_stream_read_bytes(stream->s_endpoint, buf, count, out_nr_read); } static enum fx_status read_uncompressed( struct fx_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read) { size_t remaining = count; unsigned char *dest = buf; size_t nr_read_from_buf = 0; size_t nr_read_from_endpoint = 0; enum fx_status status = FX_SUCCESS; /* liberal usage of begin_compressed_section and end_compressed_section * can result in uncompressed data getting stuck in the input buffer. * return any data remaining in the input buffer before reading more * from the endpoint */ while (remaining > 0) { const void *data; size_t available; status = fx_ringbuffer_open_read_buffer( stream->s_in, &data, &available); if (!FX_OK(status)) { break; } size_t to_copy = remaining; if (to_copy > available) { to_copy = available; } memcpy(dest, data, to_copy); fx_ringbuffer_close_read_buffer(stream->s_in, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; stream->s_tx_bytes += to_copy; dest += to_copy; remaining -= to_copy; nr_read_from_buf += to_copy; } if (remaining == 0) { *out_nr_read = nr_read_from_buf; return FX_SUCCESS; } status = fx_stream_read_bytes( stream->s_endpoint, dest, remaining, &nr_read_from_endpoint); stream->s_tx_bytes_uncompressed += nr_read_from_endpoint; stream->s_tx_bytes += nr_read_from_endpoint; *out_nr_read = nr_read_from_endpoint + nr_read_from_buf; return status; } /* read compressed data from the endpoint and store it in the input buffer. * note that uncompressed data that is trailing the compressed blob may * also be read by this function, but this will be handled by read_uncompressed. */ static enum fx_status refill_input_buffer(struct fx_cstream_p *stream) { enum fx_status status = FX_SUCCESS; size_t nr_read = 0; while (1) { void *data; size_t capacity; status = fx_ringbuffer_open_write_buffer( stream->s_in, &data, &capacity); if (!FX_OK(status)) { break; } size_t r = 0; status = fx_stream_read_bytes( stream->s_endpoint, data, capacity, &r); fx_ringbuffer_close_write_buffer(stream->s_in, &data, r); nr_read += r; if (r < capacity) { break; } if (!FX_OK(status)) { break; } } if (status == FX_ERR_NO_SPACE && nr_read > 0) { status = FX_SUCCESS; } return status; } /* push compressed data out of the input buffer, through the (de)compressor, * and store the resulting uncompressed data in the output buffer */ static enum fx_status refill_output_buffer(struct fx_cstream_p *stream) { enum fx_status status = FX_SUCCESS; if (fx_compressor_eof(stream->s_compressor)) { return FX_ERR_NO_DATA; } if (!fx_ringbuffer_available_data_remaining(stream->s_in)) { status = refill_input_buffer(stream); } if (!FX_OK(status)) { return status; } size_t bytes_before = fx_ringbuffer_available_data_remaining(stream->s_in); status = fx_compressor_step(stream->s_compressor); size_t bytes_after = fx_ringbuffer_available_data_remaining(stream->s_in); stream->s_tx_bytes_compressed += (bytes_before - bytes_after); stream->s_tx_bytes += (bytes_before - bytes_after); return status; } static enum fx_status cstream_read( struct fx_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read) { if (stream->s_mode != FX_COMPRESSOR_MODE_DECOMPRESS) { return FX_ERR_BAD_STATE; } if (stream->s_flags & CSTREAM_CURSOR_MOVED) { return read_cursor(stream, buf, count, out_nr_read); } if (stream->s_compression_depth == 0) { return read_uncompressed(stream, buf, count, out_nr_read); } unsigned char *dest = buf; size_t nr_read = 0; size_t remaining = count; enum fx_status status = FX_SUCCESS; while (remaining > 0) { if (!fx_ringbuffer_available_data_remaining(stream->s_out)) { status = refill_output_buffer(stream); } if (!FX_OK(status)) { break; } const void *data; size_t available; status = fx_ringbuffer_open_read_buffer( stream->s_out, &data, &available); if (!FX_OK(status)) { break; } size_t to_copy = remaining; if (to_copy > available) { to_copy = available; } memcpy(dest, data, to_copy); fx_ringbuffer_close_read_buffer(stream->s_out, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; dest += to_copy; nr_read += to_copy; remaining -= to_copy; } if (status == FX_ERR_NO_DATA) { status = FX_SUCCESS; } *out_nr_read = nr_read; return status; } static enum fx_status write_cursor( struct fx_cstream_p *stream, const void *buf, size_t count, size_t *nr_written) { return fx_stream_write_bytes(stream->s_endpoint, buf, count, nr_written); } static enum fx_status write_uncompressed( struct fx_cstream_p *stream, const void *buf, size_t count, size_t *nr_written) { size_t w = 0; enum fx_status status = fx_stream_write_bytes(stream->s_endpoint, buf, count, &w); stream->s_tx_bytes_uncompressed += w; stream->s_tx_bytes += w; *nr_written = w; return status; } /* push uncompressed data out of the input buffer, through the compressor, * and store the resulting compressed data in the output buffer */ static enum fx_status flush_input_buffer(struct fx_cstream_p *stream) { if (!fx_ringbuffer_available_data_remaining(stream->s_in)) { return FX_ERR_NO_DATA; } return fx_compressor_step(stream->s_compressor); } /* push compressed data from the output buffer into the endpoint */ static enum fx_status flush_output_buffer(struct fx_cstream_p *stream) { enum fx_status status = FX_SUCCESS; size_t nr_written = 0; while (1) { const void *data; size_t capacity; status = fx_ringbuffer_open_read_buffer( stream->s_out, &data, &capacity); if (!FX_OK(status)) { break; } size_t w = 0; status = fx_stream_write_bytes( stream->s_endpoint, data, capacity, &w); fx_ringbuffer_close_read_buffer(stream->s_out, &data, w); nr_written += w; stream->s_tx_bytes_compressed += w; stream->s_tx_bytes += w; if (w < capacity) { break; } if (!FX_OK(status)) { break; } } if (status == FX_ERR_NO_DATA && nr_written > 0) { status = FX_SUCCESS; } return status; } static enum fx_status cstream_write( struct fx_cstream_p *stream, const void *buf, size_t count, size_t *out_nr_written) { if (stream->s_mode != FX_COMPRESSOR_MODE_COMPRESS) { return FX_ERR_BAD_STATE; } if (stream->s_flags & CSTREAM_CURSOR_MOVED) { return write_cursor(stream, buf, count, out_nr_written); } if (stream->s_compression_depth == 0) { return write_uncompressed(stream, buf, count, out_nr_written); } const unsigned char *src = buf; size_t nr_written = 0; size_t remaining = count; enum fx_status status = FX_SUCCESS; while (remaining > 0) { if (!fx_ringbuffer_write_capacity_remaining(stream->s_out)) { status = flush_output_buffer(stream); } if (!fx_ringbuffer_write_capacity_remaining(stream->s_in)) { status = flush_input_buffer(stream); } if (!FX_OK(status)) { break; } void *data; size_t available; status = fx_ringbuffer_open_write_buffer( stream->s_in, &data, &available); if (!FX_OK(status)) { break; } size_t to_copy = remaining; if (to_copy > available) { to_copy = available; } memcpy(data, src, to_copy); fx_ringbuffer_close_write_buffer(stream->s_in, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; src += to_copy; nr_written += to_copy; remaining -= to_copy; } if (status == FX_ERR_NO_DATA) { status = FX_SUCCESS; } *out_nr_written = nr_written; return status; } static enum fx_status skip_uncompressed( struct fx_cstream_p *stream, size_t count, size_t *out_nr_skipped) { size_t remaining = count; size_t nr_read_from_buf = 0; size_t nr_read_from_endpoint = 0; enum fx_status status = FX_SUCCESS; /* liberal usage of begin_compressed_section and end_compressed_section * can result in uncompressed data getting stuck in the input buffer. * return any data remaining in the input buffer before reading more * from the endpoint */ while (remaining > 0) { const void *data; size_t available; status = fx_ringbuffer_open_read_buffer( stream->s_in, &data, &available); if (!FX_OK(status)) { break; } size_t to_copy = remaining; if (to_copy > available) { to_copy = available; } fx_ringbuffer_close_read_buffer(stream->s_in, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; stream->s_tx_bytes += to_copy; remaining -= to_copy; nr_read_from_buf += to_copy; } if (remaining == 0) { if (out_nr_skipped) { *out_nr_skipped = nr_read_from_buf; } return FX_SUCCESS; } size_t cursor = fx_stream_cursor(stream->s_endpoint); status = fx_stream_seek(stream->s_endpoint, remaining, FX_STREAM_SEEK_CURRENT); nr_read_from_endpoint = fx_stream_cursor(stream->s_endpoint) - cursor; stream->s_tx_bytes_uncompressed += nr_read_from_endpoint; stream->s_tx_bytes += nr_read_from_endpoint; if (out_nr_skipped) { *out_nr_skipped = nr_read_from_endpoint + nr_read_from_buf; } return status; } static enum fx_status cstream_skip( struct fx_cstream_p *stream, size_t count, size_t *out_nr_skipped) { if (stream->s_mode != FX_COMPRESSOR_MODE_DECOMPRESS) { return FX_ERR_BAD_STATE; } if (stream->s_flags & CSTREAM_CURSOR_MOVED) { return FX_ERR_BAD_STATE; } if (stream->s_compression_depth == 0) { return skip_uncompressed(stream, count, out_nr_skipped); } if (fx_compressor_eof(stream->s_compressor) && !fx_ringbuffer_available_data_remaining(stream->s_out)) { if (out_nr_skipped) { *out_nr_skipped = 0; } return FX_SUCCESS; } size_t nr_read = 0; size_t remaining = count; enum fx_status status = FX_SUCCESS; while (remaining > 0) { if (!fx_ringbuffer_available_data_remaining(stream->s_out)) { status = refill_output_buffer(stream); } if (!FX_OK(status)) { break; } const void *data; size_t available; status = fx_ringbuffer_open_read_buffer( stream->s_out, &data, &available); if (!FX_OK(status)) { break; } size_t to_copy = remaining; if (to_copy > available) { to_copy = available; } fx_ringbuffer_close_read_buffer(stream->s_out, &data, to_copy); stream->s_tx_bytes_uncompressed += to_copy; nr_read += to_copy; remaining -= to_copy; } if (status == FX_ERR_NO_DATA) { status = FX_SUCCESS; } if (out_nr_skipped) { *out_nr_skipped = nr_read; } return status; } static enum fx_status cstream_reset(struct fx_cstream_p *stream) { if (stream->s_mode != FX_COMPRESSOR_MODE_DECOMPRESS) { return FX_ERR_BAD_STATE; } if (stream->s_flags & CSTREAM_CURSOR_MOVED) { return FX_ERR_BAD_STATE; } stream->s_flags = 0; fx_stream_seek(stream->s_endpoint, 0, FX_STREAM_SEEK_START); fx_ringbuffer_clear(stream->s_in); fx_ringbuffer_clear(stream->s_out); fx_compressor_reset(stream->s_compressor); stream->s_compression_depth = 0; stream->s_tx_bytes = 0; stream->s_tx_bytes_uncompressed = 0; stream->s_tx_bytes_compressed = 0; stream->s_cursor = 0; return FX_SUCCESS; } static enum fx_status cstream_begin_compressed_section( struct fx_cstream_p *stream, size_t *tx_uncompressed_bytes) { if (stream->s_flags & CSTREAM_CURSOR_MOVED) { return FX_ERR_BAD_STATE; } if (tx_uncompressed_bytes) { *tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed; } if (stream->s_compression_depth > 0) { stream->s_compression_depth++; return FX_SUCCESS; } stream->s_compression_depth = 1; stream->s_tx_bytes_uncompressed = 0; stream->s_tx_bytes_compressed = 0; fx_compressor_reset(stream->s_compressor); return FX_SUCCESS; } static enum fx_status cstream_end_compressed_section( struct fx_cstream_p *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes) { if (stream->s_flags & CSTREAM_CURSOR_MOVED) { return FX_ERR_BAD_STATE; } tx_compressed_bytes && (*tx_compressed_bytes = stream->s_tx_bytes_compressed); tx_uncompressed_bytes && (*tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed); if (stream->s_compression_depth > 1) { stream->s_compression_depth--; return FX_SUCCESS; } stream->s_compression_depth = 0; if (stream->s_mode == FX_COMPRESSOR_MODE_DECOMPRESS) { stream->s_tx_bytes_compressed = 0; stream->s_tx_bytes_uncompressed = 0; return FX_SUCCESS; } enum fx_status status = FX_SUCCESS; while (1) { status = fx_compressor_end(stream->s_compressor); if (!FX_OK(status) && status != FX_ERR_NO_SPACE) { break; } status = flush_output_buffer(stream); if (!FX_OK(status)) { break; } if (fx_compressor_eof(stream->s_compressor)) { status = FX_SUCCESS; break; } } /* refresh these output variables to account for any data * written by fx_compressor_end */ tx_compressed_bytes && (*tx_compressed_bytes = stream->s_tx_bytes_compressed); tx_uncompressed_bytes && (*tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed); if (!FX_OK(status)) { return status; } stream->s_tx_bytes_compressed = 0; stream->s_tx_bytes_uncompressed = 0; return flush_output_buffer(stream); } static bool cstream_in_compressed_section(const struct fx_cstream_p *stream) { return stream->s_compression_depth > 0; } static enum fx_status cstream_tx_bytes(const struct fx_cstream_p *stream, size_t *out) { *out = stream->s_tx_bytes; return FX_SUCCESS; } static enum fx_status cstream_tx_bytes_compressed( const struct fx_cstream_p *stream, size_t *out) { *out = stream->s_tx_bytes_compressed; return FX_SUCCESS; } static enum fx_status cstream_tx_bytes_uncompressed( const struct fx_cstream_p *stream, size_t *out) { *out = stream->s_tx_bytes_uncompressed; return FX_SUCCESS; } static enum fx_status cstream_set_cursor_position( struct fx_cstream_p *stream, size_t pos) { if (stream->s_compression_depth > 0) { return FX_ERR_BAD_STATE; } if (stream->s_flags & CSTREAM_CURSOR_MOVED) { return FX_ERR_BAD_STATE; } stream->s_cursor = fx_stream_cursor(stream->s_endpoint); enum fx_status status = fx_stream_seek(stream->s_endpoint, pos, FX_STREAM_SEEK_START); if (!FX_OK(status)) { stream->s_cursor = 0; return status; } stream->s_flags |= CSTREAM_CURSOR_MOVED; return FX_SUCCESS; } static enum fx_status cstream_restore_cursor_position(struct fx_cstream_p *stream) { if (!(stream->s_flags & CSTREAM_CURSOR_MOVED)) { return FX_ERR_BAD_STATE; } enum fx_status status = fx_stream_seek( stream->s_endpoint, stream->s_cursor, FX_STREAM_SEEK_START); stream->s_cursor = 0; if (!FX_OK(status)) { return status; } stream->s_flags &= ~CSTREAM_CURSOR_MOVED; return FX_SUCCESS; } /*** PUBLIC FUNCTIONS *********************************************************/ enum fx_status fx_cstream_open( fx_stream *endpoint, fx_type compressor_type, fx_compressor_mode mode, fx_cstream **out) { size_t inbuf_size = 0, outbuf_size = 0; enum fx_status status = fx_compressor_get_buffer_size( compressor_type, mode, &inbuf_size, &outbuf_size); if (!FX_OK(status)) { return status; } fx_cstream *stream = fx_object_create(FX_TYPE_CSTREAM); if (!stream) { return FX_ERR_NO_MEMORY; } struct fx_cstream_p *p = fx_object_get_private(stream, FX_TYPE_CSTREAM); fx_stream_cfg *cfg = fx_object_get_protected(stream, FX_TYPE_STREAM); p->s_mode = mode; p->s_endpoint = endpoint; cfg->s_mode = (mode == FX_COMPRESSOR_MODE_COMPRESS) ? FX_STREAM_WRITE : FX_STREAM_READ; p->s_in = fx_ringbuffer_create(inbuf_size + 1); if (!FX_OK(status)) { free(stream); return status; } p->s_out = fx_ringbuffer_create(outbuf_size + 1); if (!FX_OK(status)) { fx_cstream_unref(stream); return status; } p->s_compressor = fx_object_create(compressor_type); if (!p->s_compressor) { fx_cstream_unref(stream); return FX_ERR_INVALID_ARGUMENT; } fx_compressor_set_buffer(p->s_compressor, p->s_in, p->s_out); fx_compressor_set_mode(p->s_compressor, mode); *out = stream; return FX_SUCCESS; } enum fx_status fx_cstream_read( fx_cstream *stream, void *buf, size_t count, size_t *out_nr_read) { FX_CLASS_DISPATCH_STATIC( FX_TYPE_CSTREAM, cstream_read, stream, buf, count, out_nr_read); } enum fx_status fx_cstream_write( fx_cstream *stream, const void *buf, size_t count, size_t *out_nr_written) { FX_CLASS_DISPATCH_STATIC( FX_TYPE_CSTREAM, cstream_write, stream, buf, count, out_nr_written); } enum fx_status fx_cstream_skip(fx_cstream *stream, size_t count, size_t *out_nr_skipped) { FX_CLASS_DISPATCH_STATIC( FX_TYPE_CSTREAM, cstream_skip, stream, count, out_nr_skipped); } enum fx_status fx_cstream_reset(fx_cstream *stream) { FX_CLASS_DISPATCH_STATIC_0(FX_TYPE_CSTREAM, cstream_reset, stream); } enum fx_status fx_cstream_begin_compressed_section( fx_cstream *stream, size_t *tx_uncompressed_bytes) { FX_CLASS_DISPATCH_STATIC( FX_TYPE_CSTREAM, cstream_begin_compressed_section, stream, tx_uncompressed_bytes); } enum fx_status fx_cstream_end_compressed_section( fx_cstream *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes) { FX_CLASS_DISPATCH_STATIC( FX_TYPE_CSTREAM, cstream_end_compressed_section, stream, tx_compressed_bytes, tx_uncompressed_bytes); } bool fx_cstream_in_compressed_section(const fx_cstream *stream) { FX_CLASS_DISPATCH_STATIC_0( FX_TYPE_CSTREAM, cstream_in_compressed_section, stream); } enum fx_status fx_cstream_tx_bytes(const fx_cstream *stream, size_t *out) { FX_CLASS_DISPATCH_STATIC(FX_TYPE_CSTREAM, cstream_tx_bytes, stream, out); } enum fx_status fx_cstream_tx_bytes_compressed(const fx_cstream *stream, size_t *out) { FX_CLASS_DISPATCH_STATIC( FX_TYPE_CSTREAM, cstream_tx_bytes_compressed, stream, out); } enum fx_status fx_cstream_tx_bytes_uncompressed(const fx_cstream *stream, size_t *out) { FX_CLASS_DISPATCH_STATIC( FX_TYPE_CSTREAM, cstream_tx_bytes_uncompressed, stream, out); } enum fx_status fx_cstream_set_cursor_position(fx_cstream *stream, size_t pos) { FX_CLASS_DISPATCH_STATIC( FX_TYPE_CSTREAM, cstream_set_cursor_position, stream, pos); } enum fx_status fx_cstream_restore_cursor_position(fx_cstream *stream) { FX_CLASS_DISPATCH_STATIC_0( FX_TYPE_CSTREAM, cstream_restore_cursor_position, stream); } /*** VIRTUAL FUNCTIONS ********************************************************/ static void cstream_init(fx_object *obj, void *priv) { } static void cstream_fini(fx_object *obj, void *priv) { struct fx_cstream_p *stream = priv; if (stream->s_compressor) { fx_compressor_unref(stream->s_compressor); } if (stream->s_in) { fx_ringbuffer_unref(stream->s_in); } if (stream->s_out) { fx_ringbuffer_unref(stream->s_out); } } /*** CLASS DEFINITION *********************************************************/ FX_TYPE_CLASS_DEFINITION_BEGIN(fx_cstream) FX_TYPE_CLASS_INTERFACE_BEGIN(fx_object, FX_TYPE_OBJECT) FX_INTERFACE_ENTRY(to_string) = NULL; FX_TYPE_CLASS_INTERFACE_END(fx_object, FX_TYPE_OBJECT) FX_TYPE_CLASS_INTERFACE_BEGIN(fx_stream, FX_TYPE_STREAM) FX_INTERFACE_ENTRY(s_close) = NULL; FX_INTERFACE_ENTRY(s_seek) = NULL; FX_INTERFACE_ENTRY(s_tell) = NULL; FX_INTERFACE_ENTRY(s_getc) = NULL; FX_INTERFACE_ENTRY(s_read) = fx_cstream_read; FX_INTERFACE_ENTRY(s_write) = fx_cstream_write; FX_INTERFACE_ENTRY(s_reserve) = NULL; FX_TYPE_CLASS_INTERFACE_END(fx_stream, FX_TYPE_STREAM) FX_TYPE_CLASS_DEFINITION_END(fx_cstream) FX_TYPE_DEFINITION_BEGIN(fx_cstream) FX_TYPE_ID(0xe1e899b5, 0x6a3c, 0x4f9c, 0xafd0, 0xaab3f156615c); FX_TYPE_EXTENDS(FX_TYPE_STREAM); FX_TYPE_CLASS(fx_cstream_class); FX_TYPE_INSTANCE_PRIVATE(struct fx_cstream_p); FX_TYPE_INSTANCE_INIT(cstream_init); FX_TYPE_INSTANCE_FINI(cstream_fini); FX_TYPE_DEFINITION_END(fx_cstream)