Commit 5a3ad89d by Carlos Martín Nieto

indexer: make use of streaming also for deltas

Up to now, deltas needed to be enterily in the packfile, and we tried
to decompress then in their entirety over and over again.

Adjust the logic so we read them as they come, just as we do for full
objects. This also allows us to simplify the logic and have less
nested code. The delta resolving phase still needs to decompress the
whole object into memory, as there is not yet any streaming
delta-apply support, but it helps in speeding up the downloading
process and reduces the amount of memory allocations we need to do.
parent f56f8585
...@@ -39,7 +39,8 @@ struct git_indexer { ...@@ -39,7 +39,8 @@ struct git_indexer {
struct git_indexer_stream { struct git_indexer_stream {
unsigned int parsed_header :1, unsigned int parsed_header :1,
opened_pack :1, opened_pack :1,
have_stream :1; have_stream :1,
have_delta :1;
struct git_pack_file *pack; struct git_pack_file *pack;
git_filebuf pack_file; git_filebuf pack_file;
git_filebuf index_file; git_filebuf index_file;
...@@ -180,39 +181,13 @@ cleanup: ...@@ -180,39 +181,13 @@ cleanup:
} }
/* Try to store the delta so we can try to resolve it later */ /* Try to store the delta so we can try to resolve it later */
static int store_delta(git_indexer_stream *idx, git_off_t entry_start, size_t entry_size, git_otype type) static int store_delta(git_indexer_stream *idx)
{ {
git_mwindow *w = NULL;
struct delta_info *delta; struct delta_info *delta;
git_rawobj obj;
int error;
assert(type == GIT_OBJ_REF_DELTA || type == GIT_OBJ_OFS_DELTA);
if (type == GIT_OBJ_REF_DELTA) {
idx->off += GIT_OID_RAWSZ;
} else {
git_off_t base_off;
base_off = get_delta_base(idx->pack, &w, &idx->off, type, entry_start);
git_mwindow_close(&w);
if (base_off < 0)
return (int)base_off;
}
error = packfile_unpack_compressed(&obj, idx->pack, &w, &idx->off, entry_size, type);
if (error == GIT_EBUFS) {
idx->off = entry_start;
return GIT_EBUFS;
} else if (error < 0){
return -1;
}
delta = git__calloc(1, sizeof(struct delta_info)); delta = git__calloc(1, sizeof(struct delta_info));
GITERR_CHECK_ALLOC(delta); GITERR_CHECK_ALLOC(delta);
delta->delta_off = entry_start; delta->delta_off = idx->entry_start;
git__free(obj.data);
if (git_vector_insert(&idx->deltas, delta) < 0) if (git_vector_insert(&idx->deltas, delta) < 0)
return -1; return -1;
...@@ -249,7 +224,44 @@ static int hash_object_stream(git_hash_ctx *ctx, git_packfile_stream *stream) ...@@ -249,7 +224,44 @@ static int hash_object_stream(git_hash_ctx *ctx, git_packfile_stream *stream)
return 0; return 0;
} }
static int store_cache(git_indexer_stream *idx, git_hash_ctx *ctx, git_off_t entry_start) /* In order to create the packfile stream, we need to skip over the delta base description */
static int advance_delta_offset(git_indexer_stream *idx, git_otype type)
{
git_mwindow *w = NULL;
assert(type == GIT_OBJ_REF_DELTA || type == GIT_OBJ_OFS_DELTA);
if (type == GIT_OBJ_REF_DELTA) {
idx->off += GIT_OID_RAWSZ;
} else {
git_off_t base_off = get_delta_base(idx->pack, &w, &idx->off, type, idx->entry_start);
git_mwindow_close(&w);
if (base_off < 0)
return (int)base_off;
}
return 0;
}
/* Read from the stream and discard any output */
static int read_object_stream(git_packfile_stream *stream)
{
char buffer[4*1024];
ssize_t read;
assert(stream);
do {
read = git_packfile_stream_read(stream, buffer, sizeof(buffer));
} while (read > 0);
if (read < 0)
return (int)read;
return 0;
}
static int store_object(git_indexer_stream *idx)
{ {
int i; int i;
git_oid oid; git_oid oid;
...@@ -258,8 +270,10 @@ static int store_cache(git_indexer_stream *idx, git_hash_ctx *ctx, git_off_t ent ...@@ -258,8 +270,10 @@ static int store_cache(git_indexer_stream *idx, git_hash_ctx *ctx, git_off_t ent
struct entry *entry; struct entry *entry;
git_off_t entry_size; git_off_t entry_size;
git_mwindow *w = NULL; git_mwindow *w = NULL;
git_mwindow_file *mwf = &idx->pack->mwf;
struct git_pack_entry *pentry; struct git_pack_entry *pentry;
git_hash_ctx *ctx = &idx->hash_ctx;
git_mwindow_file *mwf = &idx->pack->mwf;
git_off_t entry_start = idx->entry_start;
entry = git__calloc(1, sizeof(*entry)); entry = git__calloc(1, sizeof(*entry));
GITERR_CHECK_ALLOC(entry); GITERR_CHECK_ALLOC(entry);
...@@ -278,8 +292,10 @@ static int store_cache(git_indexer_stream *idx, git_hash_ctx *ctx, git_off_t ent ...@@ -278,8 +292,10 @@ static int store_cache(git_indexer_stream *idx, git_hash_ctx *ctx, git_off_t ent
git_oid_cpy(&pentry->sha1, &oid); git_oid_cpy(&pentry->sha1, &oid);
pentry->offset = entry_start; pentry->offset = entry_start;
if (git_vector_insert(&idx->pack->cache, pentry) < 0) if (git_vector_insert(&idx->pack->cache, pentry) < 0) {
git__free(pentry);
goto on_error; goto on_error;
}
git_oid_cpy(&entry->oid, &oid); git_oid_cpy(&entry->oid, &oid);
entry->crc = crc32(0L, Z_NULL, 0); entry->crc = crc32(0L, Z_NULL, 0);
...@@ -302,7 +318,6 @@ static int store_cache(git_indexer_stream *idx, git_hash_ctx *ctx, git_off_t ent ...@@ -302,7 +318,6 @@ static int store_cache(git_indexer_stream *idx, git_hash_ctx *ctx, git_off_t ent
return 0; return 0;
on_error: on_error:
git__free(pentry);
git__free(entry); git__free(entry);
return -1; return -1;
...@@ -461,44 +476,59 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz ...@@ -461,44 +476,59 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz
git_mwindow_close(&w); git_mwindow_close(&w);
idx->entry_start = entry_start; idx->entry_start = entry_start;
git_hash_ctx_init(&idx->hash_ctx);
if (type == GIT_OBJ_REF_DELTA || type == GIT_OBJ_OFS_DELTA) { if (type == GIT_OBJ_REF_DELTA || type == GIT_OBJ_OFS_DELTA) {
error = store_delta(idx, entry_start, entry_size, type); error = advance_delta_offset(idx, type);
if (error == GIT_EBUFS) { if (error == GIT_EBUFS) {
idx->off = entry_start; idx->off = entry_start;
return 0; return 0;
} }
if (error < 0) if (error < 0)
return error; return -1;
stats->received_objects++; idx->have_delta = 1;
do_progress_callback(idx, stats); } else {
continue; idx->have_delta = 0;
hash_header(&idx->hash_ctx, entry_size, type);
} }
/* If we got this far, we create the stream for our object */
idx->have_stream = 1; idx->have_stream = 1;
git_hash_ctx_init(&idx->hash_ctx);
hash_header(&idx->hash_ctx, entry_size, type);
idx->entry_start = entry_start;
if (git_packfile_stream_open(stream, idx->pack, idx->off) < 0) if (git_packfile_stream_open(stream, idx->pack, idx->off) < 0)
goto on_error; goto on_error;
} }
if (idx->have_delta) {
error = read_object_stream(stream);
} else {
error = hash_object_stream(&idx->hash_ctx, stream); error = hash_object_stream(&idx->hash_ctx, stream);
idx->off = idx->stream.curpos; }
idx->off = stream->curpos;
if (error == GIT_EBUFS) if (error == GIT_EBUFS)
return 0; return 0;
/* We want to free the stream reasorces no matter what here */
idx->have_stream = 0;
git_packfile_stream_free(stream);
if (error < 0) if (error < 0)
goto on_error; goto on_error;
git_packfile_stream_free(&idx->stream); if (idx->have_delta) {
if (store_cache(idx, &idx->hash_ctx, idx->entry_start) < 0) error = store_delta(idx);
} else {
error = store_object(idx);
}
if (error < 0)
goto on_error; goto on_error;
if (!idx->have_delta) {
stats->indexed_objects = (unsigned int)++processed; stats->indexed_objects = (unsigned int)++processed;
}
stats->received_objects++; stats->received_objects++;
idx->have_stream = 0;
do_progress_callback(idx, stats); do_progress_callback(idx, stats);
} }
...@@ -506,7 +536,6 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz ...@@ -506,7 +536,6 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz
return 0; return 0;
on_error: on_error:
git_packfile_stream_free(&idx->stream);
git_mwindow_free_all(mwf); git_mwindow_free_all(mwf);
return -1; return -1;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment