add cluster i/o pipeline
This commit is contained in:
156
src/pipeline.c
Normal file
156
src/pipeline.c
Normal file
@@ -0,0 +1,156 @@
|
||||
#include "pipeline.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
extern const struct ec3_pipeline_stage_type pipeline_zstd;
|
||||
extern const struct ec3_pipeline_stage_type pipeline_aes256;
|
||||
extern const struct ec3_pipeline_stage_type pipeline_file;
|
||||
|
||||
static const struct ec3_pipeline_stage_type *stage_types[] = {
|
||||
[EC3_PIPELINE_ZSTD] = &pipeline_zstd,
|
||||
[EC3_PIPELINE_AES256] = &pipeline_aes256,
|
||||
[EC3_PIPELINE_FILE] = &pipeline_file,
|
||||
};
|
||||
static const size_t nr_stage_types = sizeof stage_types / sizeof stage_types[0];
|
||||
|
||||
static enum ec3_status create_pipeline_stage(
|
||||
const struct ec3_pipeline_stage_type *type,
|
||||
size_t cluster_size,
|
||||
void *arg,
|
||||
struct ec3_pipeline_stage **out)
|
||||
{
|
||||
struct ec3_pipeline_stage *stage = malloc(sizeof *stage);
|
||||
if (!stage) {
|
||||
return EC3_ERR_NO_MEMORY;
|
||||
}
|
||||
|
||||
memset(stage, 0x0, sizeof *stage);
|
||||
|
||||
stage->s_type = type;
|
||||
stage->s_arg = arg;
|
||||
|
||||
if (type->t_flags & EC3_PIPELINE_F_BUFFERED) {
|
||||
stage->s_buf = malloc(cluster_size);
|
||||
|
||||
if (!stage) {
|
||||
free(stage);
|
||||
return EC3_ERR_NO_MEMORY;
|
||||
}
|
||||
|
||||
memset(stage->s_buf, 0x0, cluster_size);
|
||||
}
|
||||
|
||||
*out = stage;
|
||||
return EC3_SUCCESS;
|
||||
}
|
||||
|
||||
extern enum ec3_status ec3_pipeline_create(
|
||||
struct ec3_pipeline_stage_args stages[],
|
||||
size_t nr_stages,
|
||||
size_t cluster_size,
|
||||
struct ec3_pipeline **out)
|
||||
{
|
||||
enum ec3_status status = EC3_SUCCESS;
|
||||
struct ec3_pipeline *pipeline = malloc(sizeof *pipeline);
|
||||
|
||||
if (!pipeline) {
|
||||
return EC3_ERR_NO_MEMORY;
|
||||
}
|
||||
|
||||
memset(pipeline, 0x0, sizeof *pipeline);
|
||||
|
||||
for (size_t i = 0; i < nr_stages; i++) {
|
||||
struct ec3_pipeline_stage_args *args = &stages[i];
|
||||
|
||||
if (args->type == EC3_PIPELINE_NONE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (args->type < 0 || args->type >= nr_stage_types) {
|
||||
return EC3_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
const struct ec3_pipeline_stage_type *type
|
||||
= stage_types[stages[i].type];
|
||||
|
||||
if (!type) {
|
||||
return EC3_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
struct ec3_pipeline_stage *stage = NULL;
|
||||
status = create_pipeline_stage(
|
||||
type,
|
||||
cluster_size,
|
||||
args->arg,
|
||||
&stage);
|
||||
if (status != EC3_SUCCESS) {
|
||||
return status;
|
||||
}
|
||||
|
||||
b_queue_push_back(&pipeline->p_stages, &stage->s_entry);
|
||||
}
|
||||
|
||||
*out = pipeline;
|
||||
return status;
|
||||
}
|
||||
|
||||
void ec3_pipeline_destroy(struct ec3_pipeline *p)
|
||||
{
|
||||
}
|
||||
|
||||
enum ec3_status ec3_pipeline_data_out(
|
||||
struct ec3_pipeline *pipeline,
|
||||
void *p,
|
||||
size_t len,
|
||||
size_t *nr_written)
|
||||
{
|
||||
b_queue_entry *cur = b_queue_first(&pipeline->p_stages);
|
||||
enum ec3_status status = EC3_SUCCESS;
|
||||
void *src = p;
|
||||
|
||||
size_t stage_in_size = len;
|
||||
size_t stage_out_size = 0;
|
||||
|
||||
while (cur) {
|
||||
struct ec3_pipeline_stage *stage
|
||||
= b_unbox(struct ec3_pipeline_stage, cur, s_entry);
|
||||
|
||||
void *dest;
|
||||
if (stage->s_type->t_flags & EC3_PIPELINE_F_BUFFERED) {
|
||||
dest = stage->s_buf;
|
||||
} else {
|
||||
dest = src;
|
||||
}
|
||||
|
||||
status = stage->s_type->t_data_out(
|
||||
stage,
|
||||
src,
|
||||
stage_in_size,
|
||||
dest,
|
||||
&stage_out_size);
|
||||
|
||||
if (status != EC3_SUCCESS) {
|
||||
return status;
|
||||
}
|
||||
|
||||
src = dest;
|
||||
stage_in_size = stage_out_size;
|
||||
cur = b_queue_next(cur);
|
||||
}
|
||||
|
||||
if (nr_written) {
|
||||
*nr_written = stage_out_size;
|
||||
}
|
||||
|
||||
return EC3_SUCCESS;
|
||||
}
|
||||
|
||||
enum ec3_status ec3_pipeline_data_in(
|
||||
struct ec3_pipeline *pipeline,
|
||||
void *p,
|
||||
size_t max,
|
||||
size_t *nr_read)
|
||||
{
|
||||
return EC3_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
Reference in New Issue
Block a user