Files
bluelib/compress/cstream.c

445 lines
9.8 KiB
C

#include "cstream.h"
#include <blue/compress/compressor.h>
#include <blue/compress/cstream.h>
#include <stdlib.h>
#include <string.h>
enum b_status b_cstream_open(
struct b_stream *endpoint, const struct b_compression_function *func,
enum b_compression_mode mode, struct b_cstream **out)
{
size_t inbuf_size = 0, outbuf_size = 0;
enum b_status status = b_compression_function_get_buffer_size(
func, mode, &inbuf_size, &outbuf_size);
if (!B_OK(status)) {
return status;
}
struct b_cstream *stream = malloc(sizeof *stream);
if (!stream) {
return B_ERR_NO_MEMORY;
}
memset(stream, 0x0, sizeof *stream);
stream->s_mode = mode;
stream->s_endpoint = endpoint;
status = b_ringbuffer_init(&stream->s_in, inbuf_size + 1);
if (!B_OK(status)) {
free(stream);
return status;
}
status = b_ringbuffer_init(&stream->s_out, outbuf_size + 1);
if (!B_OK(status)) {
b_ringbuffer_destroy(&stream->s_in);
free(stream);
return status;
}
status = b_compressor_create(
func, mode, &stream->s_in, &stream->s_out, &stream->s_compressor);
if (!B_OK(status)) {
b_ringbuffer_destroy(&stream->s_in);
b_ringbuffer_destroy(&stream->s_out);
return status;
}
*out = stream;
return B_SUCCESS;
}
enum b_status b_cstream_close(struct b_cstream *stream)
{
b_compressor_destroy(stream->s_compressor);
b_ringbuffer_destroy(&stream->s_in);
b_ringbuffer_destroy(&stream->s_out);
free(stream);
return B_SUCCESS;
}
static enum b_status read_uncompressed(
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
{
size_t remaining = count;
unsigned char *dest = buf;
size_t nr_read_from_buf = 0;
size_t nr_read_from_endpoint = 0;
enum b_status status = B_SUCCESS;
/* liberal usage of begin_compressed_section and end_compressed_section
* can result in uncompressed data getting stuck in the input buffer.
* return any data remaining in the input buffer before reading more
* from the endpoint */
while (remaining > 0) {
const void *data;
size_t available;
status = b_ringbuffer_open_read_buffer(
&stream->s_in, &data, &available);
if (!B_OK(status)) {
break;
}
size_t to_copy = count;
if (to_copy > available) {
to_copy = available;
}
memcpy(dest, data, to_copy);
b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy);
stream->s_tx_uncompressed_bytes += to_copy;
dest += to_copy;
remaining -= to_copy;
nr_read_from_buf += to_copy;
}
if (remaining == 0) {
*out_nr_read = nr_read_from_buf;
return B_SUCCESS;
}
status = b_stream_read_bytes(
stream->s_endpoint, dest, remaining, &nr_read_from_endpoint);
stream->s_tx_uncompressed_bytes += nr_read_from_endpoint;
*out_nr_read = nr_read_from_endpoint + nr_read_from_buf;
return status;
}
/* read compressed data from the endpoint and store it in the input buffer.
* note that uncompressed data that is trailing the compressed blob may
* also be read by this function, but this will be handled by read_uncompressed.
*/
static enum b_status refill_input_buffer(struct b_cstream *stream)
{
enum b_status status = B_SUCCESS;
size_t nr_read = 0;
while (1) {
void *data;
size_t capacity;
status = b_ringbuffer_open_write_buffer(
&stream->s_in, &data, &capacity);
if (!B_OK(status)) {
break;
}
size_t r = 0;
status = b_stream_read_bytes(
stream->s_endpoint, data, capacity, &r);
b_ringbuffer_close_write_buffer(&stream->s_in, &data, r);
nr_read += r;
if (r < capacity) {
break;
}
if (!B_OK(status)) {
break;
}
}
if (status == B_ERR_NO_SPACE && nr_read > 0) {
status = B_SUCCESS;
}
return status;
}
/* push compressed data out of the input buffer, through the (de)compressor,
* and store the resulting uncompressed data in the output buffer */
static enum b_status refill_output_buffer(struct b_cstream *stream)
{
enum b_status status = B_SUCCESS;
if (!b_ringbuffer_available_data_remaining(&stream->s_in)) {
status = refill_input_buffer(stream);
}
if (!B_OK(status)) {
return status;
}
size_t bytes_before = b_ringbuffer_available_data_remaining(&stream->s_in);
status = b_compressor_step(stream->s_compressor);
size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in);
stream->s_tx_compressed_bytes += (bytes_before - bytes_after);
return status;
}
enum b_status b_cstream_read(
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
{
if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) {
return B_ERR_BAD_STATE;
}
if (stream->s_compression_depth == 0) {
return read_uncompressed(stream, buf, count, out_nr_read);
}
if (b_compressor_eof(stream->s_compressor)) {
*out_nr_read = 0;
return B_SUCCESS;
}
unsigned char *dest = buf;
size_t nr_read = 0;
size_t remaining = count;
enum b_status status = B_SUCCESS;
while (remaining > 0) {
if (!b_ringbuffer_available_data_remaining(&stream->s_out)) {
status = refill_output_buffer(stream);
}
if (!B_OK(status)) {
break;
}
const void *data;
size_t available;
status = b_ringbuffer_open_read_buffer(
&stream->s_out, &data, &available);
if (!B_OK(status)) {
break;
}
size_t to_copy = remaining;
if (to_copy > available) {
to_copy = available;
}
memcpy(dest, data, to_copy);
b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy);
stream->s_tx_uncompressed_bytes += to_copy;
dest += to_copy;
nr_read += to_copy;
remaining -= to_copy;
}
if (status == B_ERR_NO_DATA) {
status = B_SUCCESS;
}
*out_nr_read = nr_read;
return status;
}
static enum b_status write_uncompressed(
struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written)
{
size_t w = 0;
enum b_status status
= b_stream_write_bytes(stream->s_endpoint, buf, count, &w);
stream->s_tx_uncompressed_bytes += w;
*nr_written = w;
return status;
}
/* push uncompressed data out of the input buffer, through the compressor,
* and store the resulting compressed data in the output buffer */
static enum b_status flush_input_buffer(struct b_cstream *stream)
{
if (!b_ringbuffer_available_data_remaining(&stream->s_in)) {
return B_ERR_NO_DATA;
}
return b_compressor_step(stream->s_compressor);
}
/* push compressed data from the output buffer into the endpoint */
static enum b_status flush_output_buffer(struct b_cstream *stream)
{
enum b_status status = B_SUCCESS;
size_t nr_written = 0;
while (1) {
const void *data;
size_t capacity;
status = b_ringbuffer_open_read_buffer(
&stream->s_out, &data, &capacity);
if (!B_OK(status)) {
break;
}
size_t w = 0;
status = b_stream_write_bytes(
stream->s_endpoint, data, capacity, &w);
b_ringbuffer_close_read_buffer(&stream->s_out, &data, w);
nr_written += w;
stream->s_tx_compressed_bytes += w;
if (w < capacity) {
break;
}
if (!B_OK(status)) {
break;
}
}
if (status == B_ERR_NO_DATA && nr_written > 0) {
status = B_SUCCESS;
}
return status;
}
enum b_status b_cstream_write(
struct b_cstream *stream, const void *buf, size_t count,
size_t *out_nr_written)
{
if (stream->s_mode != B_COMPRESSION_MODE_COMPRESS) {
return B_ERR_BAD_STATE;
}
if (stream->s_compression_depth == 0) {
return write_uncompressed(stream, buf, count, out_nr_written);
}
const unsigned char *src = buf;
size_t nr_written = 0;
size_t remaining = count;
enum b_status status = B_SUCCESS;
while (remaining > 0) {
if (!b_ringbuffer_write_capacity_remaining(&stream->s_out)) {
status = flush_output_buffer(stream);
}
if (!b_ringbuffer_write_capacity_remaining(&stream->s_in)) {
status = flush_input_buffer(stream);
}
if (!B_OK(status)) {
break;
}
void *data;
size_t available;
status = b_ringbuffer_open_write_buffer(
&stream->s_in, &data, &available);
if (!B_OK(status)) {
break;
}
size_t to_copy = remaining;
if (to_copy > available) {
to_copy = available;
}
memcpy(data, src, to_copy);
b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy);
stream->s_tx_uncompressed_bytes += to_copy;
src += to_copy;
nr_written += to_copy;
remaining -= to_copy;
}
if (status == B_ERR_NO_DATA) {
status = B_SUCCESS;
}
*out_nr_written = nr_written;
return status;
}
enum b_status b_cstream_begin_compressed_section(
struct b_cstream *stream, size_t *tx_uncompressed_bytes)
{
if (tx_uncompressed_bytes) {
*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes;
}
if (stream->s_compression_depth > 0) {
stream->s_compression_depth++;
return B_SUCCESS;
}
stream->s_compression_depth = 1;
stream->s_tx_uncompressed_bytes = 0;
stream->s_tx_compressed_bytes = 0;
b_compressor_reset(stream->s_compressor);
return B_SUCCESS;
}
enum b_status b_cstream_end_compressed_section(
struct b_cstream *stream, size_t *tx_compressed_bytes,
size_t *tx_uncompressed_bytes)
{
tx_compressed_bytes
&& (*tx_compressed_bytes = stream->s_tx_compressed_bytes);
tx_uncompressed_bytes
&& (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes);
if (stream->s_compression_depth > 1) {
stream->s_compression_depth--;
return B_SUCCESS;
}
stream->s_compression_depth = 0;
if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) {
stream->s_tx_compressed_bytes = 0;
stream->s_tx_uncompressed_bytes = 0;
return B_SUCCESS;
}
enum b_status status = B_SUCCESS;
while (1) {
status = b_compressor_end(stream->s_compressor);
if (!B_OK(status) && status != B_ERR_NO_SPACE) {
break;
}
status = flush_output_buffer(stream);
if (!B_OK(status)) {
break;
}
if (b_compressor_eof(stream->s_compressor)) {
status = B_SUCCESS;
break;
}
}
/* refresh these output variables to account for any data written by
* b_compressor_end */
tx_compressed_bytes
&& (*tx_compressed_bytes = stream->s_tx_compressed_bytes);
tx_uncompressed_bytes
&& (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes);
if (!B_OK(status)) {
return status;
}
stream->s_tx_compressed_bytes = 0;
stream->s_tx_uncompressed_bytes = 0;
return flush_output_buffer(stream);
}