implement separate buffering of tag data and cluster table

This commit is contained in:
2025-02-16 08:46:22 +00:00
parent 8b10c344d6
commit 2e4ee5c1b6
6 changed files with 244 additions and 31 deletions

View File

@@ -94,6 +94,12 @@ static void node_set_entry(
memmove(&node->g_clusters[index], entry, sizeof(struct ec3_cluster));
}
static void node_kill_entry(b_tree_node *n, unsigned long index)
{
struct ec3_cluster_group *node = (struct ec3_cluster_group *)n;
memset(&node->g_clusters[index], 0x0, sizeof(struct ec3_cluster));
}
static unsigned long node_get_child(b_tree_node *n, unsigned long index)
{
struct ec3_cluster_group *node = (struct ec3_cluster_group *)n;
@@ -143,6 +149,7 @@ static const struct b_tree_ops cluster_table_ops = {
.node_set_nr_entries = node_set_nr_entries,
.node_get_entry = node_get_entry,
.node_set_entry = node_set_entry,
.node_kill_entry = node_kill_entry,
.node_get_child = node_get_child,
.node_set_child = node_set_child,
@@ -194,7 +201,7 @@ static void encode_cluster(const struct cluster *in, struct ec3_cluster *out)
out->c_bounds1 = b_i32_htob(bounds_hi);
}
static void decode_cluster(const struct ec3_cluster *in, struct cluster *out)
void cluster_decode(const struct ec3_cluster *in, struct cluster *out)
{
out->c_id = b_i32_btoh(in->c_id);
out->c_flags = b_i16_btoh(in->c_flags);
@@ -221,7 +228,7 @@ int cluster_table_get(
return err;
}
decode_cluster(&entry, out);
cluster_decode(&entry, out);
return 0;
}
@@ -232,3 +239,19 @@ int cluster_table_put(struct cluster_table *table, const struct cluster *in)
return b_tree_put(&table->t_base, (b_tree_node_entry *)&entry);
}
int cluster_table_get_group(
struct cluster_table *table,
unsigned long index,
struct ec3_cluster_group *out)
{
size_t offset
= table->t_offset + (index * sizeof(struct ec3_cluster_group));
fseek(table->t_storage, offset, SEEK_SET);
size_t r
= fread(out,
sizeof(struct ec3_cluster_group),
1,
table->t_storage);
return r == 1 ? 0 : -1;
}

View File

@@ -5,6 +5,9 @@
#include <stdio.h>
struct ec3_cluster;
struct ec3_cluster_group;
struct cluster_table {
struct b_tree t_base;
size_t t_nr_groups;
@@ -20,6 +23,8 @@ struct cluster {
unsigned int c_flags;
};
extern void cluster_decode(const struct ec3_cluster *in, struct cluster *out);
extern void cluster_table_init(
struct cluster_table *table,
FILE *storage,
@@ -34,4 +39,9 @@ extern int cluster_table_put(
struct cluster_table *table,
const struct cluster *in);
extern int cluster_table_get_group(
struct cluster_table *table,
unsigned long index,
struct ec3_cluster_group *out);
#endif

View File

@@ -271,6 +271,82 @@ enum ec3_status ec3_pipeline_read_cluster(
return EC3_SUCCESS;
}
enum ec3_status ec3_pipeline_copy_all(
struct ec3_pipeline *dest,
struct cluster_table *clusters,
FILE *data)
{
size_t cluster_id_offset = dest->p_next_cluster_id;
size_t cluster_id_limit = cluster_id_offset;
size_t data_offset = dest->p_data_offset;
size_t first_logical_cluster = 0;
struct ec3_cluster_group *group = malloc(sizeof *group);
if (!group) {
return EC3_ERR_NO_MEMORY;
}
for (size_t i = 0; i < clusters->t_nr_groups; i++) {
int err = cluster_table_get_group(clusters, i, group);
if (err != 0) {
free(group);
return EC3_ERR_IO_FAILURE;
}
unsigned int nr_entries = b_i16_btoh(group->g_nr_clusters);
for (unsigned int ii = 0; ii < nr_entries; ii++) {
struct ec3_cluster *raw_cluster
= &group->g_clusters[ii];
struct cluster cluster;
cluster_decode(raw_cluster, &cluster);
cluster.c_id += cluster_id_offset;
cluster.c_base += data_offset;
if (cluster.c_id > cluster_id_limit) {
cluster_id_limit = cluster.c_id;
}
cluster_table_put(&dest->p_cluster_table, &cluster);
}
}
fseek(data, 0, SEEK_END);
size_t data_len = ftell(data);
fseek(data, 0, SEEK_SET);
fseek(dest->p_target, 0, SEEK_END);
unsigned char *buf = (unsigned char *)group;
size_t buf_len = sizeof *group;
for (size_t i = 0; i < data_len; i += buf_len) {
size_t r = fread(buf, 1, buf_len, data);
fwrite(buf, 1, r, dest->p_target);
dest->p_data_offset += r;
if (r == buf_len) {
continue;
}
if (ferror(data) || ferror(dest->p_target)) {
free(group);
return EC3_ERR_IO_FAILURE;
}
break;
}
dest->p_next_cluster_id = cluster_id_limit + 1;
free(group);
return EC3_SUCCESS;
}
size_t ec3_get_cluster_size(unsigned int v)
{
return cluster_sizes[v];

View File

@@ -90,6 +90,11 @@ extern enum ec3_status ec3_pipeline_read_cluster(
size_t cluster_id,
size_t *nr_read);
extern enum ec3_status ec3_pipeline_copy_all(
struct ec3_pipeline *dest,
struct cluster_table *clusters,
FILE *data);
extern size_t ec3_get_cluster_size(unsigned int v);
extern enum ec3_pipeline_stage_type_id
ec3_get_pipeline_stage_for_encryption_func(unsigned int func);

View File

@@ -41,8 +41,13 @@ static enum ec3_status add_file(
}
struct ec3_tag_writer *tag = NULL;
enum ec3_status status
= ec3_writer_create_tag(writer, 0, type, id, 0, &tag);
enum ec3_status status = ec3_writer_create_tag(
writer,
EC3_TAG_WRITER_BUFFERED,
type,
id,
0,
&tag);
if (status != EC3_SUCCESS) {
b_err("cannot initialise EC3 tag writer");

View File

@@ -3,6 +3,7 @@
#include "bin.h"
#include "cluster.h"
#include "pipeline.h"
#include "string-table.h"
#include <blue/core/queue.h>
#include <stddef.h>
@@ -29,12 +30,20 @@ struct ec3_writer {
FILE *w_extent_table;
FILE *w_tag_table;
struct string_table w_strings;
struct ec3_pipeline *w_pipeline;
};
struct ec3_tag_writer {
size_t w_index;
struct ec3_writer *w_parent;
struct ec3_pipeline *w_pipeline;
FILE *w_data;
FILE *w_cluster_table;
unsigned long w_type;
uint64_t w_ident;
unsigned long w_flags;
@@ -45,6 +54,39 @@ struct ec3_tag_writer {
b_queue_entry w_entry;
};
static enum ec3_status create_pipeline(
const struct ec3_parameters *param,
FILE *data,
size_t data_offset,
FILE *cluster_table,
size_t cluster_table_offset,
struct ec3_pipeline **out)
{
struct ec3_pipeline_stage_args stages[2] = {0};
size_t cluster_size = ec3_get_cluster_size(param->p_cluster_size);
if (param->p_compression_func != EC3_COMPRESSION_NONE) {
stages[0].type = ec3_get_pipeline_stage_for_compression_func(
param->p_compression_func);
}
if (param->p_encryption_func != EC3_ENCRYPTION_NONE) {
stages[1].type = ec3_get_pipeline_stage_for_encryption_func(
param->p_encryption_func);
}
return ec3_pipeline_create(
stages,
sizeof stages / sizeof stages[0],
cluster_size,
data,
data_offset,
cluster_table,
cluster_table_offset,
out);
}
enum ec3_status ec3_writer_create(
const struct ec3_parameters *param,
struct ec3_writer **out)
@@ -57,8 +99,6 @@ enum ec3_status ec3_writer_create(
memset(writer, 0x0, sizeof *writer);
memcpy(&writer->w_param, param, sizeof *param);
size_t cluster_size = ec3_get_cluster_size(param->p_cluster_size);
writer->w_data = param->p_outp;
writer->w_extent_table = tmpfile();
writer->w_tag_table = tmpfile();
@@ -70,34 +110,19 @@ enum ec3_status ec3_writer_create(
return EC3_ERR_IO_FAILURE;
}
struct ec3_pipeline_stage_args stages[2] = {0};
if (param->p_compression_func != EC3_COMPRESSION_NONE) {
stages[0].type = ec3_get_pipeline_stage_for_compression_func(
param->p_compression_func);
}
if (param->p_encryption_func != EC3_ENCRYPTION_NONE) {
stages[1].type = ec3_get_pipeline_stage_for_encryption_func(
param->p_encryption_func);
}
struct ec3_pipeline *pipeline = NULL;
enum ec3_status status = ec3_pipeline_create(
stages,
sizeof stages / sizeof stages[0],
cluster_size,
enum ec3_status status = create_pipeline(
param,
writer->w_data,
0,
writer->w_cluster_table,
0,
&pipeline);
&writer->w_pipeline);
if (status != EC3_SUCCESS) {
return status;
}
writer->w_pipeline = pipeline;
string_table_init(&writer->w_strings);
*out = writer;
return EC3_SUCCESS;
@@ -138,13 +163,18 @@ static enum ec3_status copy_file(FILE *src, FILE *dest)
return status;
}
static enum ec3_status flush_extent_entry(struct ec3_writer *w)
static enum ec3_status write_extent_entry(
struct ec3_writer *w,
uint64_t tag,
size_t physical_start,
size_t logical_start,
size_t nr_clusters)
{
struct ec3_extent extent = {0};
extent.ex_owner = b_i64_htob(w->w_extent_tag);
extent.ex_physical_cluster = b_i32_htob(w->w_extent_physical_start);
extent.ex_logical_cluster = b_i32_htob(w->w_extent_logical_start);
extent.ex_count = b_i32_htob(w->w_extent_nr_clusters);
extent.ex_owner = b_i64_htob(tag);
extent.ex_physical_cluster = b_i32_htob(physical_start);
extent.ex_logical_cluster = b_i32_htob(logical_start);
extent.ex_count = b_i32_htob(nr_clusters);
size_t written = fwrite(&extent, sizeof extent, 1, w->w_extent_table);
if (written != 1) {
@@ -155,6 +185,16 @@ static enum ec3_status flush_extent_entry(struct ec3_writer *w)
return EC3_SUCCESS;
}
static enum ec3_status flush_extent_entry(struct ec3_writer *w)
{
return write_extent_entry(
w,
w->w_extent_tag,
w->w_extent_physical_start,
w->w_extent_logical_start,
w->w_extent_nr_clusters);
}
void ec3_writer_finish(struct ec3_writer *w)
{
enum ec3_status status = EC3_SUCCESS;
@@ -244,6 +284,25 @@ enum ec3_status ec3_writer_create_tag(
return EC3_ERR_NO_MEMORY;
}
enum ec3_status status = EC3_SUCCESS;
if (writer_flags & EC3_TAG_WRITER_BUFFERED) {
tag->w_data = tmpfile();
tag->w_cluster_table = tmpfile();
status = create_pipeline(
&w->w_param,
tag->w_data,
0,
tag->w_cluster_table,
0,
&tag->w_pipeline);
}
if (status != EC3_SUCCESS) {
return status;
}
tag->w_index = w->w_nr_tags++;
*out_writer = tag;
return EC3_SUCCESS;
@@ -255,12 +314,18 @@ static enum ec3_status flush_tag_buffer(struct ec3_tag_writer *w)
w->w_nr_clusters,
w->w_ident);
struct ec3_writer *container = w->w_parent;
struct ec3_pipeline *pipeline = w->w_pipeline;
if (!pipeline) {
pipeline = container->w_pipeline;
}
unsigned char *buf = w->w_buf;
enum ec3_status status = EC3_SUCCESS;
size_t nr_written = 0;
status = ec3_pipeline_write_cluster(
container->w_pipeline,
pipeline,
buf,
w->w_ptr,
&nr_written);
@@ -272,6 +337,12 @@ static enum ec3_status flush_tag_buffer(struct ec3_tag_writer *w)
w->w_ptr = 0;
w->w_nr_clusters++;
if (w->w_pipeline) {
/* when buffering is enabled, extent data will be written when
* the tag writer is finished. */
return EC3_SUCCESS;
}
if (container->w_extent_tag == w->w_ident) {
container->w_extent_nr_clusters++;
return EC3_SUCCESS;
@@ -356,5 +427,28 @@ enum ec3_status ec3_tag_writer_finish(struct ec3_tag_writer *w)
return EC3_ERR_IO_FAILURE;
}
if (w->w_pipeline) {
write_extent_entry(
w->w_parent,
w->w_ident,
w->w_parent->w_next_cluster_id,
0,
w->w_pipeline->p_next_cluster_id);
w->w_parent->w_next_cluster_id
+= w->w_pipeline->p_next_cluster_id;
status = ec3_pipeline_copy_all(
w->w_parent->w_pipeline,
&w->w_pipeline->p_cluster_table,
w->w_data);
fclose(w->w_data);
fclose(w->w_cluster_table);
ec3_pipeline_destroy(w->w_pipeline);
}
free(w);
return status;
}