the cursor can only be moved during uncompressed i/o, and any read/write operations are performed directly on the underlying endpoint with no buffering, and don't count towards the transacted byte statistics. the cursor can only be moved once, after which it's position must be restored.
516 lines
11 KiB
C
516 lines
11 KiB
C
#include "cstream.h"
|
|
|
|
#include <blue/compress/compressor.h>
|
|
#include <blue/compress/cstream.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
enum b_status b_cstream_open(
|
|
struct b_stream *endpoint, const struct b_compression_function *func,
|
|
enum b_compression_mode mode, struct b_cstream **out)
|
|
{
|
|
size_t inbuf_size = 0, outbuf_size = 0;
|
|
enum b_status status = b_compression_function_get_buffer_size(
|
|
func, mode, &inbuf_size, &outbuf_size);
|
|
if (!B_OK(status)) {
|
|
return status;
|
|
}
|
|
|
|
struct b_cstream *stream = malloc(sizeof *stream);
|
|
if (!stream) {
|
|
return B_ERR_NO_MEMORY;
|
|
}
|
|
|
|
memset(stream, 0x0, sizeof *stream);
|
|
|
|
stream->s_mode = mode;
|
|
stream->s_endpoint = endpoint;
|
|
|
|
status = b_ringbuffer_init(&stream->s_in, inbuf_size + 1);
|
|
if (!B_OK(status)) {
|
|
free(stream);
|
|
return status;
|
|
}
|
|
|
|
status = b_ringbuffer_init(&stream->s_out, outbuf_size + 1);
|
|
if (!B_OK(status)) {
|
|
b_ringbuffer_destroy(&stream->s_in);
|
|
free(stream);
|
|
return status;
|
|
}
|
|
|
|
status = b_compressor_create(
|
|
func, mode, &stream->s_in, &stream->s_out, &stream->s_compressor);
|
|
if (!B_OK(status)) {
|
|
b_ringbuffer_destroy(&stream->s_in);
|
|
b_ringbuffer_destroy(&stream->s_out);
|
|
|
|
return status;
|
|
}
|
|
|
|
*out = stream;
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
enum b_status b_cstream_close(struct b_cstream *stream)
|
|
{
|
|
b_compressor_destroy(stream->s_compressor);
|
|
b_ringbuffer_destroy(&stream->s_in);
|
|
b_ringbuffer_destroy(&stream->s_out);
|
|
free(stream);
|
|
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
static enum b_status read_cursor(
|
|
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
|
{
|
|
return b_stream_read_bytes(stream->s_endpoint, buf, count, out_nr_read);
|
|
}
|
|
|
|
static enum b_status read_uncompressed(
|
|
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
|
{
|
|
size_t remaining = count;
|
|
unsigned char *dest = buf;
|
|
size_t nr_read_from_buf = 0;
|
|
size_t nr_read_from_endpoint = 0;
|
|
enum b_status status = B_SUCCESS;
|
|
|
|
/* liberal usage of begin_compressed_section and end_compressed_section
|
|
* can result in uncompressed data getting stuck in the input buffer.
|
|
* return any data remaining in the input buffer before reading more
|
|
* from the endpoint */
|
|
|
|
while (remaining > 0) {
|
|
const void *data;
|
|
size_t available;
|
|
status = b_ringbuffer_open_read_buffer(
|
|
&stream->s_in, &data, &available);
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
|
|
size_t to_copy = count;
|
|
if (to_copy > available) {
|
|
to_copy = available;
|
|
}
|
|
|
|
memcpy(dest, data, to_copy);
|
|
|
|
b_ringbuffer_close_read_buffer(&stream->s_in, &data, to_copy);
|
|
|
|
stream->s_tx_uncompressed_bytes += to_copy;
|
|
dest += to_copy;
|
|
remaining -= to_copy;
|
|
nr_read_from_buf += to_copy;
|
|
}
|
|
|
|
if (remaining == 0) {
|
|
*out_nr_read = nr_read_from_buf;
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
status = b_stream_read_bytes(
|
|
stream->s_endpoint, dest, remaining, &nr_read_from_endpoint);
|
|
stream->s_tx_uncompressed_bytes += nr_read_from_endpoint;
|
|
|
|
*out_nr_read = nr_read_from_endpoint + nr_read_from_buf;
|
|
|
|
return status;
|
|
}
|
|
|
|
/* read compressed data from the endpoint and store it in the input buffer.
|
|
* note that uncompressed data that is trailing the compressed blob may
|
|
* also be read by this function, but this will be handled by read_uncompressed.
|
|
*/
|
|
static enum b_status refill_input_buffer(struct b_cstream *stream)
|
|
{
|
|
enum b_status status = B_SUCCESS;
|
|
size_t nr_read = 0;
|
|
|
|
while (1) {
|
|
void *data;
|
|
size_t capacity;
|
|
status = b_ringbuffer_open_write_buffer(
|
|
&stream->s_in, &data, &capacity);
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
|
|
size_t r = 0;
|
|
status = b_stream_read_bytes(
|
|
stream->s_endpoint, data, capacity, &r);
|
|
|
|
b_ringbuffer_close_write_buffer(&stream->s_in, &data, r);
|
|
nr_read += r;
|
|
|
|
if (r < capacity) {
|
|
break;
|
|
}
|
|
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (status == B_ERR_NO_SPACE && nr_read > 0) {
|
|
status = B_SUCCESS;
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
/* push compressed data out of the input buffer, through the (de)compressor,
|
|
* and store the resulting uncompressed data in the output buffer */
|
|
static enum b_status refill_output_buffer(struct b_cstream *stream)
|
|
{
|
|
enum b_status status = B_SUCCESS;
|
|
|
|
if (!b_ringbuffer_available_data_remaining(&stream->s_in)) {
|
|
status = refill_input_buffer(stream);
|
|
}
|
|
|
|
if (!B_OK(status)) {
|
|
return status;
|
|
}
|
|
|
|
size_t bytes_before = b_ringbuffer_available_data_remaining(&stream->s_in);
|
|
status = b_compressor_step(stream->s_compressor);
|
|
size_t bytes_after = b_ringbuffer_available_data_remaining(&stream->s_in);
|
|
|
|
stream->s_tx_compressed_bytes += (bytes_before - bytes_after);
|
|
|
|
return status;
|
|
}
|
|
|
|
enum b_status b_cstream_read(
|
|
struct b_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
|
{
|
|
if (stream->s_mode != B_COMPRESSION_MODE_DECOMPRESS) {
|
|
return B_ERR_BAD_STATE;
|
|
}
|
|
|
|
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
|
return read_cursor(stream, buf, count, out_nr_read);
|
|
}
|
|
|
|
if (stream->s_compression_depth == 0) {
|
|
return read_uncompressed(stream, buf, count, out_nr_read);
|
|
}
|
|
|
|
if (b_compressor_eof(stream->s_compressor)) {
|
|
*out_nr_read = 0;
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
unsigned char *dest = buf;
|
|
size_t nr_read = 0;
|
|
size_t remaining = count;
|
|
enum b_status status = B_SUCCESS;
|
|
|
|
while (remaining > 0) {
|
|
if (!b_ringbuffer_available_data_remaining(&stream->s_out)) {
|
|
status = refill_output_buffer(stream);
|
|
}
|
|
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
|
|
const void *data;
|
|
size_t available;
|
|
status = b_ringbuffer_open_read_buffer(
|
|
&stream->s_out, &data, &available);
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
|
|
size_t to_copy = remaining;
|
|
if (to_copy > available) {
|
|
to_copy = available;
|
|
}
|
|
|
|
memcpy(dest, data, to_copy);
|
|
|
|
b_ringbuffer_close_read_buffer(&stream->s_out, &data, to_copy);
|
|
|
|
stream->s_tx_uncompressed_bytes += to_copy;
|
|
dest += to_copy;
|
|
nr_read += to_copy;
|
|
remaining -= to_copy;
|
|
}
|
|
|
|
if (status == B_ERR_NO_DATA) {
|
|
status = B_SUCCESS;
|
|
}
|
|
|
|
*out_nr_read = nr_read;
|
|
return status;
|
|
}
|
|
|
|
static enum b_status write_cursor(
|
|
struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written)
|
|
{
|
|
return b_stream_write_bytes(stream->s_endpoint, buf, count, nr_written);
|
|
}
|
|
|
|
static enum b_status write_uncompressed(
|
|
struct b_cstream *stream, const void *buf, size_t count, size_t *nr_written)
|
|
{
|
|
size_t w = 0;
|
|
enum b_status status
|
|
= b_stream_write_bytes(stream->s_endpoint, buf, count, &w);
|
|
|
|
stream->s_tx_uncompressed_bytes += w;
|
|
|
|
*nr_written = w;
|
|
return status;
|
|
}
|
|
|
|
/* push uncompressed data out of the input buffer, through the compressor,
|
|
* and store the resulting compressed data in the output buffer */
|
|
static enum b_status flush_input_buffer(struct b_cstream *stream)
|
|
{
|
|
if (!b_ringbuffer_available_data_remaining(&stream->s_in)) {
|
|
return B_ERR_NO_DATA;
|
|
}
|
|
|
|
return b_compressor_step(stream->s_compressor);
|
|
}
|
|
|
|
/* push compressed data from the output buffer into the endpoint */
|
|
static enum b_status flush_output_buffer(struct b_cstream *stream)
|
|
{
|
|
enum b_status status = B_SUCCESS;
|
|
size_t nr_written = 0;
|
|
|
|
while (1) {
|
|
const void *data;
|
|
size_t capacity;
|
|
status = b_ringbuffer_open_read_buffer(
|
|
&stream->s_out, &data, &capacity);
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
|
|
size_t w = 0;
|
|
status = b_stream_write_bytes(
|
|
stream->s_endpoint, data, capacity, &w);
|
|
|
|
b_ringbuffer_close_read_buffer(&stream->s_out, &data, w);
|
|
nr_written += w;
|
|
stream->s_tx_compressed_bytes += w;
|
|
|
|
if (w < capacity) {
|
|
break;
|
|
}
|
|
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (status == B_ERR_NO_DATA && nr_written > 0) {
|
|
status = B_SUCCESS;
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
enum b_status b_cstream_write(
|
|
struct b_cstream *stream, const void *buf, size_t count,
|
|
size_t *out_nr_written)
|
|
{
|
|
if (stream->s_mode != B_COMPRESSION_MODE_COMPRESS) {
|
|
return B_ERR_BAD_STATE;
|
|
}
|
|
|
|
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
|
return write_cursor(stream, buf, count, out_nr_written);
|
|
}
|
|
|
|
if (stream->s_compression_depth == 0) {
|
|
return write_uncompressed(stream, buf, count, out_nr_written);
|
|
}
|
|
|
|
const unsigned char *src = buf;
|
|
size_t nr_written = 0;
|
|
size_t remaining = count;
|
|
enum b_status status = B_SUCCESS;
|
|
|
|
while (remaining > 0) {
|
|
if (!b_ringbuffer_write_capacity_remaining(&stream->s_out)) {
|
|
status = flush_output_buffer(stream);
|
|
}
|
|
|
|
if (!b_ringbuffer_write_capacity_remaining(&stream->s_in)) {
|
|
status = flush_input_buffer(stream);
|
|
}
|
|
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
|
|
void *data;
|
|
size_t available;
|
|
status = b_ringbuffer_open_write_buffer(
|
|
&stream->s_in, &data, &available);
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
|
|
size_t to_copy = remaining;
|
|
if (to_copy > available) {
|
|
to_copy = available;
|
|
}
|
|
|
|
memcpy(data, src, to_copy);
|
|
|
|
b_ringbuffer_close_write_buffer(&stream->s_in, &data, to_copy);
|
|
|
|
stream->s_tx_uncompressed_bytes += to_copy;
|
|
src += to_copy;
|
|
nr_written += to_copy;
|
|
remaining -= to_copy;
|
|
}
|
|
|
|
if (status == B_ERR_NO_DATA) {
|
|
status = B_SUCCESS;
|
|
}
|
|
|
|
*out_nr_written = nr_written;
|
|
return status;
|
|
}
|
|
|
|
enum b_status b_cstream_begin_compressed_section(
|
|
struct b_cstream *stream, size_t *tx_uncompressed_bytes)
|
|
{
|
|
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
|
return B_ERR_BAD_STATE;
|
|
}
|
|
|
|
if (tx_uncompressed_bytes) {
|
|
*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes;
|
|
}
|
|
|
|
if (stream->s_compression_depth > 0) {
|
|
stream->s_compression_depth++;
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
stream->s_compression_depth = 1;
|
|
stream->s_tx_uncompressed_bytes = 0;
|
|
stream->s_tx_compressed_bytes = 0;
|
|
b_compressor_reset(stream->s_compressor);
|
|
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
enum b_status b_cstream_end_compressed_section(
|
|
struct b_cstream *stream, size_t *tx_compressed_bytes,
|
|
size_t *tx_uncompressed_bytes)
|
|
{
|
|
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
|
return B_ERR_BAD_STATE;
|
|
}
|
|
|
|
tx_compressed_bytes
|
|
&& (*tx_compressed_bytes = stream->s_tx_compressed_bytes);
|
|
|
|
tx_uncompressed_bytes
|
|
&& (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes);
|
|
|
|
if (stream->s_compression_depth > 1) {
|
|
stream->s_compression_depth--;
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
stream->s_compression_depth = 0;
|
|
|
|
if (stream->s_mode == B_COMPRESSION_MODE_DECOMPRESS) {
|
|
stream->s_tx_compressed_bytes = 0;
|
|
stream->s_tx_uncompressed_bytes = 0;
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
enum b_status status = B_SUCCESS;
|
|
while (1) {
|
|
status = b_compressor_end(stream->s_compressor);
|
|
if (!B_OK(status) && status != B_ERR_NO_SPACE) {
|
|
break;
|
|
}
|
|
|
|
status = flush_output_buffer(stream);
|
|
|
|
if (!B_OK(status)) {
|
|
break;
|
|
}
|
|
|
|
if (b_compressor_eof(stream->s_compressor)) {
|
|
status = B_SUCCESS;
|
|
break;
|
|
}
|
|
}
|
|
|
|
/* refresh these output variables to account for any data written by
|
|
* b_compressor_end */
|
|
tx_compressed_bytes
|
|
&& (*tx_compressed_bytes = stream->s_tx_compressed_bytes);
|
|
|
|
tx_uncompressed_bytes
|
|
&& (*tx_uncompressed_bytes = stream->s_tx_uncompressed_bytes);
|
|
|
|
if (!B_OK(status)) {
|
|
return status;
|
|
}
|
|
|
|
stream->s_tx_compressed_bytes = 0;
|
|
stream->s_tx_uncompressed_bytes = 0;
|
|
|
|
return flush_output_buffer(stream);
|
|
}
|
|
|
|
enum b_status b_cstream_set_cursor_position(struct b_cstream *stream, size_t pos)
|
|
{
|
|
if (stream->s_compression_depth > 0) {
|
|
return B_ERR_BAD_STATE;
|
|
}
|
|
|
|
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
|
return B_ERR_BAD_STATE;
|
|
}
|
|
|
|
stream->s_cursor = b_stream_cursor(stream->s_endpoint);
|
|
|
|
enum b_status status
|
|
= b_stream_seek(stream->s_endpoint, pos, B_STREAM_SEEK_START);
|
|
if (!B_OK(status)) {
|
|
stream->s_cursor = 0;
|
|
return status;
|
|
}
|
|
|
|
stream->s_flags |= CSTREAM_CURSOR_MOVED;
|
|
|
|
return B_SUCCESS;
|
|
}
|
|
|
|
enum b_status b_cstream_restore_cursor_position(struct b_cstream *stream)
|
|
{
|
|
if (!(stream->s_flags & CSTREAM_CURSOR_MOVED)) {
|
|
return B_ERR_BAD_STATE;
|
|
}
|
|
|
|
enum b_status status = b_stream_seek(
|
|
stream->s_endpoint, stream->s_cursor, B_STREAM_SEEK_START);
|
|
stream->s_cursor = 0;
|
|
|
|
if (!B_OK(status)) {
|
|
return status;
|
|
}
|
|
|
|
stream->s_flags &= ~CSTREAM_CURSOR_MOVED;
|
|
|
|
return B_SUCCESS;
|
|
}
|