#include "../compressor.h" #include "../function.h" #include #include struct zstd_ctx { union { ZSTD_CCtx *zstd_c; ZSTD_DCtx *zstd_d; }; }; static enum b_status buffer_size( enum b_compression_mode mode, size_t *inbuf_size, size_t *outbuf_size) { switch (mode) { case B_COMPRESSION_MODE_COMPRESS: *inbuf_size = ZSTD_CStreamInSize(); *outbuf_size = ZSTD_CStreamOutSize(); break; case B_COMPRESSION_MODE_DECOMPRESS: *inbuf_size = ZSTD_DStreamInSize(); *outbuf_size = ZSTD_DStreamOutSize(); break; default: return B_ERR_INVALID_ARGUMENT; } return B_SUCCESS; } static enum b_status init(struct b_compressor *compressor) { struct zstd_ctx *ctx = b_compressor_get_function_ctx(compressor); switch (compressor->c_mode) { case B_COMPRESSION_MODE_COMPRESS: ctx->zstd_c = ZSTD_createCCtx(); if (!ctx->zstd_c) { return B_ERR_NO_MEMORY; } break; case B_COMPRESSION_MODE_DECOMPRESS: ctx->zstd_d = ZSTD_createDCtx(); if (!ctx->zstd_d) { return B_ERR_NO_MEMORY; } break; default: return B_ERR_BAD_STATE; } return B_SUCCESS; } static enum b_status fini(struct b_compressor *compressor) { struct zstd_ctx *ctx = b_compressor_get_function_ctx(compressor); switch (compressor->c_mode) { case B_COMPRESSION_MODE_COMPRESS: ZSTD_freeCCtx(ctx->zstd_c); break; case B_COMPRESSION_MODE_DECOMPRESS: ZSTD_freeDCtx(ctx->zstd_d); break; default: return B_ERR_BAD_STATE; } return B_SUCCESS; } static enum b_status reset(struct b_compressor *compressor) { struct zstd_ctx *ctx = b_compressor_get_function_ctx(compressor); switch (compressor->c_mode) { case B_COMPRESSION_MODE_COMPRESS: ZSTD_CCtx_reset(ctx->zstd_c, ZSTD_reset_session_only); break; case B_COMPRESSION_MODE_DECOMPRESS: ZSTD_DCtx_reset(ctx->zstd_d, ZSTD_reset_session_only); break; default: return B_ERR_BAD_STATE; } return B_SUCCESS; } static enum b_status compress(struct b_compressor *compressor) { enum b_status status = B_SUCCESS; struct zstd_ctx *ctx = b_compressor_get_function_ctx(compressor); struct b_ringbuffer *in = compressor->c_in; struct b_ringbuffer *out = compressor->c_out; if (b_ringbuffer_available_data_remaining(in) == 0) { return B_ERR_NO_DATA; } if (b_ringbuffer_write_capacity_remaining(out) == 0) { return B_ERR_NO_SPACE; } size_t nr_consumed = 0; while (1) { size_t in_available = 0, out_capacity = 0; const void *in_buf = NULL; void *out_buf = NULL; status = b_ringbuffer_open_read_buffer(in, &in_buf, &in_available); if (!B_OK(status)) { break; } status = b_ringbuffer_open_write_buffer( out, &out_buf, &out_capacity); if (!B_OK(status)) { b_ringbuffer_close_read_buffer(in, &in_buf, 0); break; } ZSTD_inBuffer z_in = { .src = in_buf, .pos = 0, .size = in_available, }; ZSTD_outBuffer z_out = { .dst = out_buf, .pos = 0, .size = out_capacity, }; do { size_t ret = ZSTD_compressStream2( ctx->zstd_c, &z_out, &z_in, ZSTD_e_continue); if (ZSTD_isError(ret)) { status = B_ERR_COMPRESSION_FAILURE; break; } } while (z_in.pos < z_in.size && z_out.pos < z_out.size); nr_consumed += z_in.pos; b_ringbuffer_close_read_buffer(in, &in_buf, z_in.pos); b_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos); } if ((status == B_ERR_NO_SPACE || status == B_ERR_NO_DATA) && nr_consumed > 0) { status = B_SUCCESS; } return status; } static enum b_status compress_end(struct b_compressor *compressor) { enum b_status status = B_SUCCESS; struct zstd_ctx *ctx = b_compressor_get_function_ctx(compressor); struct b_ringbuffer *out = compressor->c_out; if (b_ringbuffer_write_capacity_remaining(out) == 0) { return B_ERR_NO_SPACE; } bool finished = false; do { void *out_buf = NULL; size_t out_capacity = 0; status = b_ringbuffer_open_write_buffer( out, &out_buf, &out_capacity); if (!B_OK(status)) { break; } ZSTD_inBuffer z_in = {0}; ZSTD_outBuffer z_out = { .dst = out_buf, .pos = 0, .size = out_capacity, }; do { size_t ret = ZSTD_compressStream2( ctx->zstd_c, &z_out, &z_in, ZSTD_e_end); if (ZSTD_isError(ret)) { status = B_ERR_COMPRESSION_FAILURE; finished = true; } if (ret == 0) { compressor->c_flags |= COMPRESSOR_EOF; finished = true; } } while (!finished && z_out.pos < z_out.size); b_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos); } while (!finished); return status; } static enum b_status decompress(struct b_compressor *compressor) { enum b_status status = B_SUCCESS; struct zstd_ctx *ctx = b_compressor_get_function_ctx(compressor); struct b_ringbuffer *in = compressor->c_in; struct b_ringbuffer *out = compressor->c_out; if (b_ringbuffer_available_data_remaining(in) == 0) { return B_ERR_NO_DATA; } if (b_ringbuffer_write_capacity_remaining(out) == 0) { return B_ERR_NO_SPACE; } size_t nr_consumed = 0; while (!(compressor->c_flags & COMPRESSOR_EOF)) { size_t in_available = 0, out_capacity = 0; const void *in_buf = NULL; void *out_buf = NULL; status = b_ringbuffer_open_read_buffer(in, &in_buf, &in_available); if (!B_OK(status)) { break; } status = b_ringbuffer_open_write_buffer( out, &out_buf, &out_capacity); if (!B_OK(status)) { b_ringbuffer_close_read_buffer(in, &in_buf, 0); break; } ZSTD_inBuffer z_in = { .src = in_buf, .pos = 0, .size = in_available, }; ZSTD_outBuffer z_out = { .dst = out_buf, .pos = 0, .size = out_capacity, }; do { size_t ret = ZSTD_decompressStream( ctx->zstd_d, &z_out, &z_in); if (ZSTD_isError(ret)) { status = B_ERR_COMPRESSION_FAILURE; break; } if (ret == 0) { compressor->c_flags |= COMPRESSOR_EOF; break; } } while (z_in.pos < z_in.size && z_out.pos < z_out.size); nr_consumed += z_in.pos; b_ringbuffer_close_read_buffer(in, &in_buf, z_in.pos); b_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos); } if ((status == B_ERR_NO_SPACE || status == B_ERR_NO_DATA) && nr_consumed > 0) { status = B_SUCCESS; } return status; } const struct b_compression_function z__b_compression_function_zstd = { .f_name = "zstd", .f_buffer_size = buffer_size, .f_init = init, .f_fini = fini, .f_reset = reset, .f_compress = compress, .f_compress_end = compress_end, .f_decompress = decompress, };