#include #include #include /*** PRIVATE DATA *************************************************************/ struct fx_zstd_compressor_p { union { ZSTD_CCtx *zstd_c; ZSTD_DCtx *zstd_d; }; }; /*** PUBLIC FUNCTIONS *********************************************************/ fx_status fx_zstd_compressor_get_buffer_size( fx_compressor_mode mode, size_t *inbuf_size, size_t *outbuf_size) { switch (mode) { case FX_COMPRESSOR_MODE_COMPRESS: *inbuf_size = ZSTD_CStreamInSize(); *outbuf_size = ZSTD_CStreamOutSize(); break; case FX_COMPRESSOR_MODE_DECOMPRESS: *inbuf_size = ZSTD_DStreamInSize(); *outbuf_size = ZSTD_DStreamOutSize(); break; default: return FX_ERR_INVALID_ARGUMENT; } return FX_SUCCESS; } /*** VIRTUAL FUNCTIONS ********************************************************/ static void zstd_compressor_init(fx_object *obj, void *priv) { } static void zstd_compressor_fini(fx_object *obj, void *priv) { fx_compressor_data *c = fx_object_get_protected(obj, FX_TYPE_COMPRESSOR); struct fx_zstd_compressor_p *ctx = priv; switch (c->c_mode) { case FX_COMPRESSOR_MODE_COMPRESS: ZSTD_freeCCtx(ctx->zstd_c); break; case FX_COMPRESSOR_MODE_DECOMPRESS: ZSTD_freeDCtx(ctx->zstd_d); break; default: break; } } static enum fx_status reset(fx_compressor *compressor) { struct fx_zstd_compressor_p *ctx = fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR); fx_compressor_data *data = fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR); if (!ctx || !data) { return FX_ERR_INVALID_ARGUMENT; } switch (data->c_mode) { case FX_COMPRESSOR_MODE_COMPRESS: ZSTD_CCtx_reset(ctx->zstd_c, ZSTD_reset_session_only); break; case FX_COMPRESSOR_MODE_DECOMPRESS: ZSTD_DCtx_reset(ctx->zstd_d, ZSTD_reset_session_only); break; default: return FX_ERR_BAD_STATE; } return FX_SUCCESS; } static enum fx_status compress(fx_compressor *compressor) { enum fx_status status = FX_SUCCESS; struct fx_zstd_compressor_p *ctx = fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR); fx_compressor_data *data = fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR); if (!ctx || !data) { return FX_ERR_INVALID_ARGUMENT; } fx_ringbuffer *in = data->c_in; fx_ringbuffer *out = data->c_out; if (fx_ringbuffer_available_data_remaining(in) == 0) { return FX_ERR_NO_DATA; } if (fx_ringbuffer_write_capacity_remaining(out) == 0) { return FX_ERR_NO_SPACE; } size_t nr_consumed = 0; while (1) { size_t in_available = 0, out_capacity = 0; const void *in_buf = NULL; void *out_buf = NULL; status = fx_ringbuffer_open_read_buffer(in, &in_buf, &in_available); if (!FX_OK(status)) { break; } status = fx_ringbuffer_open_write_buffer( out, &out_buf, &out_capacity); if (!FX_OK(status)) { fx_ringbuffer_close_read_buffer(in, &in_buf, 0); break; } ZSTD_inBuffer z_in = { .src = in_buf, .pos = 0, .size = in_available, }; ZSTD_outBuffer z_out = { .dst = out_buf, .pos = 0, .size = out_capacity, }; do { size_t ret = ZSTD_compressStream2( ctx->zstd_c, &z_out, &z_in, ZSTD_e_continue); if (ZSTD_isError(ret)) { status = FX_ERR_COMPRESSION_FAILURE; break; } } while (z_in.pos < z_in.size && z_out.pos < z_out.size); nr_consumed += z_in.pos; fx_ringbuffer_close_read_buffer(in, &in_buf, z_in.pos); fx_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos); } if ((status == FX_ERR_NO_SPACE || status == FX_ERR_NO_DATA) && nr_consumed > 0) { status = FX_SUCCESS; } return status; } static enum fx_status compress_end(fx_compressor *compressor) { enum fx_status status = FX_SUCCESS; struct fx_zstd_compressor_p *ctx = fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR); fx_compressor_data *data = fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR); if (!ctx || !data) { return FX_ERR_INVALID_ARGUMENT; } fx_ringbuffer *out = data->c_out; if (fx_ringbuffer_write_capacity_remaining(out) == 0) { return FX_ERR_NO_SPACE; } bool finished = false; do { void *out_buf = NULL; size_t out_capacity = 0; status = fx_ringbuffer_open_write_buffer( out, &out_buf, &out_capacity); if (!FX_OK(status)) { break; } ZSTD_inBuffer z_in = {0}; ZSTD_outBuffer z_out = { .dst = out_buf, .pos = 0, .size = out_capacity, }; do { size_t ret = ZSTD_compressStream2( ctx->zstd_c, &z_out, &z_in, ZSTD_e_end); if (ZSTD_isError(ret)) { status = FX_ERR_COMPRESSION_FAILURE; finished = true; } if (ret == 0) { data->c_flags |= FX_COMPRESSOR_EOF; finished = true; } } while (!finished && z_out.pos < z_out.size); fx_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos); } while (!finished); return status; } static enum fx_status decompress(fx_compressor *compressor) { enum fx_status status = FX_SUCCESS; struct fx_zstd_compressor_p *ctx = fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR); fx_compressor_data *data = fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR); if (!ctx || !data) { return FX_ERR_INVALID_ARGUMENT; } fx_ringbuffer *in = data->c_in; fx_ringbuffer *out = data->c_out; if (fx_ringbuffer_available_data_remaining(in) == 0) { return FX_ERR_NO_DATA; } if (fx_ringbuffer_write_capacity_remaining(out) == 0) { return FX_ERR_NO_SPACE; } size_t nr_consumed = 0; while (!(data->c_flags & FX_COMPRESSOR_EOF)) { size_t in_available = 0, out_capacity = 0; const void *in_buf = NULL; void *out_buf = NULL; status = fx_ringbuffer_open_read_buffer(in, &in_buf, &in_available); if (!FX_OK(status)) { break; } status = fx_ringbuffer_open_write_buffer( out, &out_buf, &out_capacity); if (!FX_OK(status)) { fx_ringbuffer_close_read_buffer(in, &in_buf, 0); break; } ZSTD_inBuffer z_in = { .src = in_buf, .pos = 0, .size = in_available, }; ZSTD_outBuffer z_out = { .dst = out_buf, .pos = 0, .size = out_capacity, }; do { size_t ret = ZSTD_decompressStream( ctx->zstd_d, &z_out, &z_in); if (ZSTD_isError(ret)) { status = FX_ERR_COMPRESSION_FAILURE; break; } if (ret == 0) { data->c_flags |= FX_COMPRESSOR_EOF; break; } } while (z_in.pos < z_in.size && z_out.pos < z_out.size); nr_consumed += z_in.pos; fx_ringbuffer_close_read_buffer(in, &in_buf, z_in.pos); fx_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos); } if ((status == FX_ERR_NO_SPACE || status == FX_ERR_NO_DATA) && nr_consumed > 0) { status = FX_SUCCESS; } return status; } static enum fx_status set_mode(fx_compressor *compressor, fx_compressor_mode mode) { struct fx_zstd_compressor_p *ctx = fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR); fx_compressor_data *data = fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR); if (!ctx || !data) { return FX_ERR_INVALID_ARGUMENT; } if (mode == data->c_mode) { return FX_SUCCESS; } switch (data->c_mode) { case FX_COMPRESSOR_MODE_COMPRESS: ZSTD_freeCCtx(ctx->zstd_c); break; case FX_COMPRESSOR_MODE_DECOMPRESS: ZSTD_freeDCtx(ctx->zstd_d); break; default: break; } data->c_mode = mode; switch (data->c_mode) { case FX_COMPRESSOR_MODE_COMPRESS: ctx->zstd_c = ZSTD_createCCtx(); break; case FX_COMPRESSOR_MODE_DECOMPRESS: ctx->zstd_d = ZSTD_createDCtx(); break; default: return FX_ERR_INVALID_ARGUMENT; } return FX_SUCCESS; } /*** CLASS DEFINITION *********************************************************/ FX_TYPE_CLASS_DEFINITION_BEGIN(fx_zstd_compressor) FX_TYPE_CLASS_INTERFACE_BEGIN(fx_object, FX_TYPE_OBJECT) FX_INTERFACE_ENTRY(to_string) = NULL; FX_TYPE_CLASS_INTERFACE_END(fx_object, FX_TYPE_OBJECT) FX_TYPE_CLASS_INTERFACE_BEGIN(fx_compressor, FX_TYPE_COMPRESSOR) FX_INTERFACE_ENTRY(c_buffer_size) = fx_zstd_compressor_get_buffer_size; FX_INTERFACE_ENTRY(c_compress) = compress; FX_INTERFACE_ENTRY(c_compress_end) = compress_end; FX_INTERFACE_ENTRY(c_decompress) = decompress; FX_INTERFACE_ENTRY(c_reset) = reset; FX_INTERFACE_ENTRY(c_set_mode) = set_mode; FX_TYPE_CLASS_INTERFACE_END(fx_compressor, FX_TYPE_COMPRESSOR) FX_TYPE_CLASS_DEFINITION_END(fx_zstd_compressor) FX_TYPE_DEFINITION_BEGIN(fx_zstd_compressor) FX_TYPE_ID(0x51d437fc, 0xe789, 0x4105, 0xbac7, 0xe6b3f45df198); FX_TYPE_EXTENDS(FX_TYPE_COMPRESSOR); FX_TYPE_CLASS(fx_zstd_compressor_class); FX_TYPE_INSTANCE_PRIVATE(struct fx_zstd_compressor_p); FX_TYPE_INSTANCE_INIT(zstd_compressor_init); FX_TYPE_INSTANCE_FINI(zstd_compressor_fini); FX_TYPE_DEFINITION_END(fx_zstd_compressor)