pipeline: handle incompressible clusters

This commit is contained in:
2025-06-23 13:21:36 +01:00
parent a98e51a869
commit 34ab04fe4e
7 changed files with 183 additions and 26 deletions

View File

@@ -1,8 +1,36 @@
#include "pipeline.h"
#include <stdlib.h>
#include <zstd.h>
static enum ec3_status compress(
#define CHECK(cond, ...) \
do { \
if (!(cond)) { \
fprintf(stderr, \
"%s:%d CHECK(%s) failed: ", \
__FILE__, \
__LINE__, \
#cond); \
fprintf(stderr, "" __VA_ARGS__); \
fprintf(stderr, "\n"); \
exit(1); \
} \
} while (0)
/*! CHECK_ZSTD
* Check the zstd error code and die if an error occurred after printing a
* message.
*/
#define CHECK_ZSTD(fn) \
do { \
size_t const err = (fn); \
CHECK(!ZSTD_isError(err), \
"%s (%d)", \
ZSTD_getErrorName(err), \
ZSTD_getErrorCode(err)); \
} while (0)
static struct ec3_pipeline_stage_result compress(
struct ec3_pipeline_stage *stage,
const void *src,
size_t len,
@@ -10,8 +38,20 @@ static enum ec3_status compress(
size_t dest_max,
size_t *nr_written)
{
*nr_written = ZSTD_compress(dest, dest_max, src, len, 10);
return EC3_SUCCESS;
size_t w = ZSTD_compress(dest, dest_max, src, len, 10);
if (!ZSTD_isError(w)) {
*nr_written = w;
return EC3_PIPELINE_STAGE_RESULT_OK();
}
int err = ZSTD_getErrorCode(w);
switch (err) {
case ZSTD_error_dstSize_tooSmall:
return EC3_PIPELINE_STAGE_RESULT_UNSUPPORTED();
default:
CHECK_ZSTD(w);
return EC3_PIPELINE_STAGE_RESULT_ERR(EC3_ERR_INTERNAL_FAILURE);
}
}
static enum ec3_status decompress(
@@ -22,12 +62,19 @@ static enum ec3_status decompress(
size_t dest_max,
size_t *nr_read)
{
*nr_read = ZSTD_decompress(dest, dest_max, src, len);
return EC3_SUCCESS;
size_t r = ZSTD_decompress(dest, dest_max, src, len);
if (!ZSTD_isError(r)) {
*nr_read = r;
return EC3_SUCCESS;
}
CHECK_ZSTD(r);
return EC3_ERR_INTERNAL_FAILURE;
}
const struct ec3_pipeline_stage_type pipeline_zstd = {
.t_id = EC3_PIPELINE_ZSTD,
.t_class = EC3_PIPELINE_CLASS_COMPRESSION,
.t_flags = EC3_PIPELINE_F_BUFFERED,
.t_cluster_in = decompress,
.t_cluster_out = compress,