compress: add byte-wise (de)compression stream data structure
This commit is contained in:
@@ -0,0 +1,444 @@
|
||||
#include "cstream.h"
|
||||
|
||||
#include <blue/compress/compressor.h>
|
||||
#include <blue/compress/cstream.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
enum b_status b_cstream_open(
|
||||
struct b_stream *endpoint, const struct b_compression_function *func,
|
||||
enum b_compression_mode mode, struct b_cstream **out)
|
||||
{
|
||||
size_t inbuf_size = 0, outbuf_size = 0;
|
||||
enum b_status status = b_compression_function_get_buffer_size(
|
||||
func, mode, &inbuf_size, &outbuf_size);
|
||||
if (!B_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
struct b_cstream *stream = malloc(sizeof *stream);
|
||||
if (!stream) {
|
||||
return B_ERR_NO_MEMORY;
|
||||
}
|
||||
|
||||
memset(stream, 0x0, sizeof *stream);
|
||||
|
||||
stream->s_mode = mode;
|
||||
stream->s_endpoint = endpoint;
|
||||
|
||||
status = b_ringbuffer_init(&stream->s_in, inbuf_size + 1);
|
||||
if (!B_OK(status)) {
|
||||
free(stream);
|
||||
return status;
|
||||
}
|
||||
|
||||
status = b_ringbuffer_init(&stream->s_out, outbuf_size + 1);
|
||||
if (!B_OK(status)) {
|
||||
b_ringbuffer_destroy(&stream->s_in);
|
||||
free(stream);
|
||||
return status;
|
||||
}
|
||||
|
||||
status = b_compressor_create(
|
||||
func, mode, &stream->s_in, &stream->s_out, &stream->s_compressor);
|
||||
if (!B_OK(status)) {
|
||||
b_ringbuffer_destroy(&stream->s_in);
|
||||
b_ringbuffer_destroy(&stream->s_out);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
*out = stream;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_close(struct b_cstream *stream)
|
||||
{
|
||||
b_compressor_destroy(stream->s_compressor);
|
||||
b_ringbuffer_destroy(&stream->s_in);
|
||||
b_ringbuffer_destroy(&stream->s_out);
|
||||
free(stream);
|
||||
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
static enum b_status read_uncompressed(
|
||||
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
size_t remaining = count;
|
||||
unsigned char *dest = buf;
|
||||
size_t nr_read_from_buf = 0;
|
||||
size_t nr_read_from_endpoint = 0;
|
||||
enum b_status status = B_SUCCESS;
|
||||
|
||||
/* liberal usage of begin_compressed_section and end_compressed_section
|
||||
* can result in uncompressed data getting stuck in the input buffer.
|
||||
* return any data remaining in the input buffer before reading more
|
||||
* from the endpoint */
|
||||
|
||||
while (remaining > 0) {
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = b_ringbuffer_open_read_buffer(
|
||||
&stream->s_in, &data, &available);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t to_copy = count;
|
||||
if (to_copy > available) {
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
memcpy(dest, data, to_copy);
|
||||
|
||||
b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy);
|
||||
|
||||
stream->s_tx_uncompressed_bytes += to_copy;
|
||||
dest += to_copy;
|
||||
remaining -= to_copy;
|
||||
nr_read_from_buf += to_copy;
|
||||
}
|
||||
|
||||
if (remaining == 0) {
|
||||
*out_nr_read = nr_read_from_buf;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
status = b_stream_read_bytes(
|
||||
stream->s_endpoint, dest, remaining, &nr_read_from_endpoint);
|
||||
stream->s_tx_uncompressed_bytes += nr_read_from_endpoint;
|
||||
|
||||
*out_nr_read = nr_read_from_endpoint + nr_read_from_buf;
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
/* read compressed data from the endpoint and store it in the input buffer.
|
||||
* note that uncompressed data that is trailing the compressed blob may
|
||||
* also be read by this function, but this will be handled by read_uncompressed.
|
||||
*/
|
||||
static enum b_status refill_input_buffer(struct b_cstream *stream)
|
||||
{
|
||||
enum b_status status = B_SUCCESS;
|
||||
size_t nr_read = 0;
|
||||
|
||||
while (1) {
|
||||
void *data;
|
||||
size_t capacity;
|
||||
status = b_ringbuffer_open_write_buffer(
|
||||
&stream->s_in, &data, &capacity);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t r = 0;
|
||||
status = b_stream_read_bytes(
|
||||
stream->s_endpoint, data, capacity, &r);
|
||||
|
||||
b_ringbuffer_close_write_buffer(&stream->s_in, &data, r);
|
||||
nr_read += r;
|
||||
|
||||
if (r < capacity) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (status == B_ERR_NO_SPACE && nr_read > 0) {
|
||||
status = B_SUCCESS;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
/* push compressed data out of the input buffer, through the (de)compressor,
|
||||
* and store the resulting uncompressed data in the output buffer */
|
||||
static enum b_status refill_output_buffer(struct b_cstream *stream)
|
||||
{
|
||||
enum b_status status = B_SUCCESS;
|
||||
|
||||
if (!b_ringbuffer_available_data_remaining(&stream->s_in)) {
|
||||
status = refill_input_buffer(stream);
|
||||
}
|
||||
|
||||
if (!B_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
size_t bytes_before = b_ringbuffer_available_data_remaining(&stream->s_in);
|
||||
status = b_compressor_step(stream->s_compressor);
|
||||
size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in);
|
||||
|
||||
stream->s_tx_compressed_bytes += (bytes_before - bytes_after);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_read(
|
||||
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) {
|
||||
return B_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (stream->s_compression_depth == 0) {
|
||||
return read_uncompressed(stream, buf, count, out_nr_read);
|
||||
}
|
||||
|
||||
if (b_compressor_eof(stream->s_compressor)) {
|
||||
*out_nr_read = 0;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
unsigned char *dest = buf;
|
||||
size_t nr_read = 0;
|
||||
size_t remaining = count;
|
||||
enum b_status status = B_SUCCESS;
|
||||
|
||||
while (remaining > 0) {
|
||||
if (!b_ringbuffer_available_data_remaining(&stream->s_out)) {
|
||||
status = refill_output_buffer(stream);
|
||||
}
|
||||
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = b_ringbuffer_open_read_buffer(
|
||||
&stream->s_out, &data, &available);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t to_copy = remaining;
|
||||
if (to_copy > available) {
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
memcpy(dest, data, to_copy);
|
||||
|
||||
b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy);
|
||||
|
||||
stream->s_tx_uncompressed_bytes += to_copy;
|
||||
dest += to_copy;
|
||||
nr_read += to_copy;
|
||||
remaining -= to_copy;
|
||||
}
|
||||
|
||||
if (status == B_ERR_NO_DATA) {
|
||||
status = B_SUCCESS;
|
||||
}
|
||||
|
||||
*out_nr_read = nr_read;
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum b_status write_uncompressed(
|
||||
struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written)
|
||||
{
|
||||
size_t w = 0;
|
||||
enum b_status status
|
||||
= b_stream_write_bytes(stream->s_endpoint, buf, count, &w);
|
||||
|
||||
stream->s_tx_uncompressed_bytes += w;
|
||||
|
||||
*nr_written = w;
|
||||
return status;
|
||||
}
|
||||
|
||||
/* push uncompressed data out of the input buffer, through the compressor,
|
||||
* and store the resulting compressed data in the output buffer */
|
||||
static enum b_status flush_input_buffer(struct b_cstream *stream)
|
||||
{
|
||||
if (!b_ringbuffer_available_data_remaining(&stream->s_in)) {
|
||||
return B_ERR_NO_DATA;
|
||||
}
|
||||
|
||||
return b_compressor_step(stream->s_compressor);
|
||||
}
|
||||
|
||||
/* push compressed data from the output buffer into the endpoint */
|
||||
static enum b_status flush_output_buffer(struct b_cstream *stream)
|
||||
{
|
||||
enum b_status status = B_SUCCESS;
|
||||
size_t nr_written = 0;
|
||||
|
||||
while (1) {
|
||||
const void *data;
|
||||
size_t capacity;
|
||||
status = b_ringbuffer_open_read_buffer(
|
||||
&stream->s_out, &data, &capacity);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t w = 0;
|
||||
status = b_stream_write_bytes(
|
||||
stream->s_endpoint, data, capacity, &w);
|
||||
|
||||
b_ringbuffer_close_read_buffer(&stream->s_out, &data, w);
|
||||
nr_written += w;
|
||||
stream->s_tx_compressed_bytes += w;
|
||||
|
||||
if (w < capacity) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (status == B_ERR_NO_DATA && nr_written > 0) {
|
||||
status = B_SUCCESS;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_write(
|
||||
struct b_cstream *stream, const void *buf, size_t count,
|
||||
size_t *out_nr_written)
|
||||
{
|
||||
if (stream->s_mode != B_COMPRESSION_MODE_COMPRESS) {
|
||||
return B_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (stream->s_compression_depth == 0) {
|
||||
return write_uncompressed(stream, buf, count, out_nr_written);
|
||||
}
|
||||
|
||||
const unsigned char *src = buf;
|
||||
size_t nr_written = 0;
|
||||
size_t remaining = count;
|
||||
enum b_status status = B_SUCCESS;
|
||||
|
||||
while (remaining > 0) {
|
||||
if (!b_ringbuffer_write_capacity_remaining(&stream->s_out)) {
|
||||
status = flush_output_buffer(stream);
|
||||
}
|
||||
|
||||
if (!b_ringbuffer_write_capacity_remaining(&stream->s_in)) {
|
||||
status = flush_input_buffer(stream);
|
||||
}
|
||||
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
void *data;
|
||||
size_t available;
|
||||
status = b_ringbuffer_open_write_buffer(
|
||||
&stream->s_in, &data, &available);
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t to_copy = remaining;
|
||||
if (to_copy > available) {
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
memcpy(data, src, to_copy);
|
||||
|
||||
b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy);
|
||||
|
||||
stream->s_tx_uncompressed_bytes += to_copy;
|
||||
src += to_copy;
|
||||
nr_written += to_copy;
|
||||
remaining -= to_copy;
|
||||
}
|
||||
|
||||
if (status == B_ERR_NO_DATA) {
|
||||
status = B_SUCCESS;
|
||||
}
|
||||
|
||||
*out_nr_written = nr_written;
|
||||
return status;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_begin_compressed_section(
|
||||
struct b_cstream *stream, size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
if (tx_uncompressed_bytes) {
|
||||
*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes;
|
||||
}
|
||||
|
||||
if (stream->s_compression_depth > 0) {
|
||||
stream->s_compression_depth++;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
stream->s_compression_depth = 1;
|
||||
stream->s_tx_uncompressed_bytes = 0;
|
||||
stream->s_tx_compressed_bytes = 0;
|
||||
b_compressor_reset(stream->s_compressor);
|
||||
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status b_cstream_end_compressed_section(
|
||||
struct b_cstream *stream, size_t *tx_compressed_bytes,
|
||||
size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
tx_compressed_bytes
|
||||
&& (*tx_compressed_bytes = stream->s_tx_compressed_bytes);
|
||||
|
||||
tx_uncompressed_bytes
|
||||
&& (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes);
|
||||
|
||||
if (stream->s_compression_depth > 1) {
|
||||
stream->s_compression_depth--;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
stream->s_compression_depth = 0;
|
||||
|
||||
if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) {
|
||||
stream->s_tx_compressed_bytes = 0;
|
||||
stream->s_tx_uncompressed_bytes = 0;
|
||||
return B_SUCCESS;
|
||||
}
|
||||
|
||||
enum b_status status = B_SUCCESS;
|
||||
while (1) {
|
||||
status = b_compressor_end(stream->s_compressor);
|
||||
if (!B_OK(status) && status != B_ERR_NO_SPACE) {
|
||||
break;
|
||||
}
|
||||
|
||||
status = flush_output_buffer(stream);
|
||||
|
||||
if (!B_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (b_compressor_eof(stream->s_compressor)) {
|
||||
status = B_SUCCESS;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* refresh these output variables to account for any data written by
|
||||
* b_compressor_end */
|
||||
tx_compressed_bytes
|
||||
&& (*tx_compressed_bytes = stream->s_tx_compressed_bytes);
|
||||
|
||||
tx_uncompressed_bytes
|
||||
&& (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes);
|
||||
|
||||
if (!B_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
stream->s_tx_compressed_bytes = 0;
|
||||
stream->s_tx_uncompressed_bytes = 0;
|
||||
|
||||
return flush_output_buffer(stream);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
#ifndef _CSTREAM_H_
|
||||
#define _CSTREAM_H_
|
||||
|
||||
#include <blue/compress/function.h>
|
||||
#include <blue/core/ringbuffer.h>
|
||||
#include <stddef.h>
|
||||
|
||||
struct b_stream;
|
||||
struct b_compressor;
|
||||
|
||||
struct b_cstream {
|
||||
struct b_stream *s_endpoint;
|
||||
struct b_compressor *s_compressor;
|
||||
/* s_in is the input buffer, and s_out is the output buffer.
|
||||
*
|
||||
* the input buffer holds data that will be provided to the
|
||||
* (de)compression function. in compression mode, this data is provided
|
||||
* by the code using the cstream (via b_cstream_write) in decompression
|
||||
* mode, this data is read from s_endpoint.
|
||||
*
|
||||
* the output buffer holds data produced by the (de)compression
|
||||
* function. in compression mode, this data will be written to
|
||||
* s_endpoint. in decompression mode, this data will be returned to the
|
||||
* code using the cstream (via b_cstream_read)
|
||||
*
|
||||
* heavy usage of cstream's compressed sections facility can result
|
||||
* in the input buffer holding uncompressed data while the stream is in
|
||||
* decompression mode. this is handled by the uncompressed read code path.
|
||||
*/
|
||||
struct b_ringbuffer s_in, s_out;
|
||||
enum b_compression_mode s_mode;
|
||||
|
||||
unsigned int s_compression_depth;
|
||||
/* tracks the number of compressed bytes that have passed through this
|
||||
* stream in the current section.
|
||||
*
|
||||
* in compression mode, this tracks the number of post-compression bytes
|
||||
* that have been written to the endpoint within the current section,
|
||||
* including any bytes written during end_compression_section()
|
||||
*
|
||||
* in decompression mode, this tracks the number of compressed bytes
|
||||
* that were decompressed while reading the current section. it does not
|
||||
* include any uncompressed bytes that may have been read from the
|
||||
* endpoint while reading a compressed section due to cstream's
|
||||
* read-ahead caching behaviour.
|
||||
*/
|
||||
size_t s_tx_compressed_bytes;
|
||||
/* tracks the number of uncompressed bytes that have passed through this
|
||||
* stream in the current section.
|
||||
*
|
||||
* in compression mode, this tracks the number of bytes given to
|
||||
* b_cstream_write
|
||||
*
|
||||
* in decompression mode, this tracks the number of bytes returned by
|
||||
* b_cstream_read
|
||||
*/
|
||||
size_t s_tx_uncompressed_bytes;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
#ifndef BLUELIB_COMPRESS_CSTREAM_H_
|
||||
#define BLUELIB_COMPRESS_CSTREAM_H_
|
||||
|
||||
#include <blue/compress/function.h>
|
||||
#include <blue/core/stream.h>
|
||||
|
||||
typedef struct b_cstream b_cstream;
|
||||
|
||||
BLUE_API b_status b_cstream_open(
|
||||
b_stream *endpoint, const b_compression_function *func,
|
||||
b_compression_mode mode, b_cstream **out);
|
||||
BLUE_API b_status b_cstream_close(b_cstream *stream);
|
||||
|
||||
BLUE_API b_status b_cstream_read(
|
||||
b_cstream *stream, void *buf, size_t count, size_t *nr_read);
|
||||
BLUE_API b_status b_cstream_write(
|
||||
b_cstream *stream, const void *buf, size_t count, size_t *nr_written);
|
||||
|
||||
BLUE_API b_status b_cstream_begin_compressed_section(
|
||||
b_cstream *stream, size_t *tx_uncompressed_bytes);
|
||||
BLUE_API b_status b_cstream_end_compressed_section(
|
||||
b_cstream *stream, size_t *tx_compressed_bytes,
|
||||
size_t *tx_uncompressed_bytes);
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user