Commit 0249a503 by Vicent Martí

Merge pull request #1091 from carlosmn/stream-object

Indexer speedup with large objects
parents 25992373 3908c254
......@@ -38,15 +38,20 @@ struct git_indexer {
struct git_indexer_stream {
unsigned int parsed_header :1,
opened_pack;
opened_pack :1,
have_stream :1,
have_delta :1;
struct git_pack_file *pack;
git_filebuf pack_file;
git_filebuf index_file;
git_off_t off;
git_off_t entry_start;
git_packfile_stream stream;
size_t nr_objects;
git_vector objects;
git_vector deltas;
unsigned int fanout[256];
git_hash_ctx hash_ctx;
git_oid hash;
git_transfer_progress_callback progress_cb;
void *progress_payload;
......@@ -176,56 +181,169 @@ cleanup:
}
/* Try to store the delta so we can try to resolve it later */
static int store_delta(git_indexer_stream *idx, git_off_t entry_start, size_t entry_size, git_otype type)
static int store_delta(git_indexer_stream *idx)
{
git_mwindow *w = NULL;
struct delta_info *delta;
git_rawobj obj;
int error;
delta = git__calloc(1, sizeof(struct delta_info));
GITERR_CHECK_ALLOC(delta);
delta->delta_off = idx->entry_start;
if (git_vector_insert(&idx->deltas, delta) < 0)
return -1;
return 0;
}
static void hash_header(git_hash_ctx *ctx, git_off_t len, git_otype type)
{
char buffer[64];
size_t hdrlen;
hdrlen = git_odb__format_object_header(buffer, sizeof(buffer), len, type);
git_hash_update(ctx, buffer, hdrlen);
}
static int hash_object_stream(git_hash_ctx *ctx, git_packfile_stream *stream)
{
char buffer[8*1024];
ssize_t read;
assert(ctx && stream);
do {
if ((read = git_packfile_stream_read(stream, buffer, sizeof(buffer))) < 0)
break;
git_hash_update(ctx, buffer, read);
} while (read > 0);
if (read < 0)
return (int)read;
return 0;
}
/* In order to create the packfile stream, we need to skip over the delta base description */
static int advance_delta_offset(git_indexer_stream *idx, git_otype type)
{
git_mwindow *w = NULL;
assert(type == GIT_OBJ_REF_DELTA || type == GIT_OBJ_OFS_DELTA);
if (type == GIT_OBJ_REF_DELTA) {
idx->off += GIT_OID_RAWSZ;
} else {
git_off_t base_off;
base_off = get_delta_base(idx->pack, &w, &idx->off, type, entry_start);
git_off_t base_off = get_delta_base(idx->pack, &w, &idx->off, type, idx->entry_start);
git_mwindow_close(&w);
if (base_off < 0)
return (int)base_off;
}
error = packfile_unpack_compressed(&obj, idx->pack, &w, &idx->off, entry_size, type);
if (error == GIT_EBUFS) {
idx->off = entry_start;
return GIT_EBUFS;
} else if (error < 0){
return -1;
return 0;
}
/* Read from the stream and discard any output */
static int read_object_stream(git_packfile_stream *stream)
{
char buffer[4*1024];
ssize_t read;
assert(stream);
do {
read = git_packfile_stream_read(stream, buffer, sizeof(buffer));
} while (read > 0);
if (read < 0)
return (int)read;
return 0;
}
static int crc_object(uint32_t *crc_out, git_mwindow_file *mwf, git_off_t start, git_off_t size)
{
void *ptr;
uint32_t crc;
unsigned int left, len;
git_mwindow *w = NULL;
crc = crc32(0L, Z_NULL, 0);
while (size) {
ptr = git_mwindow_open(mwf, &w, start, size, &left);
if (ptr == NULL)
return -1;
len = min(left, size);
crc = crc32(crc, ptr, len);
size -= len;
start += len;
git_mwindow_close(&w);
}
delta = git__calloc(1, sizeof(struct delta_info));
GITERR_CHECK_ALLOC(delta);
delta->delta_off = entry_start;
*crc_out = htonl(crc);
return 0;
}
git__free(obj.data);
static int store_object(git_indexer_stream *idx)
{
int i;
git_oid oid;
struct entry *entry;
git_off_t entry_size;
struct git_pack_entry *pentry;
git_hash_ctx *ctx = &idx->hash_ctx;
git_off_t entry_start = idx->entry_start;
if (git_vector_insert(&idx->deltas, delta) < 0)
return -1;
entry = git__calloc(1, sizeof(*entry));
GITERR_CHECK_ALLOC(entry);
pentry = git__malloc(sizeof(struct git_pack_entry));
GITERR_CHECK_ALLOC(pentry);
git_hash_final(&oid, ctx);
entry_size = idx->off - entry_start;
if (entry_start > UINT31_MAX) {
entry->offset = UINT32_MAX;
entry->offset_long = entry_start;
} else {
entry->offset = (uint32_t)entry_start;
}
git_oid_cpy(&pentry->sha1, &oid);
pentry->offset = entry_start;
if (git_vector_insert(&idx->pack->cache, pentry) < 0) {
git__free(pentry);
goto on_error;
}
git_oid_cpy(&entry->oid, &oid);
if (crc_object(&entry->crc, &idx->pack->mwf, entry_start, entry_size) < 0)
goto on_error;
/* Add the object to the list */
if (git_vector_insert(&idx->objects, entry) < 0)
goto on_error;
for (i = oid.id[0]; i < 256; ++i) {
idx->fanout[i]++;
}
return 0;
on_error:
git__free(entry);
return -1;
}
static int hash_and_save(git_indexer_stream *idx, git_rawobj *obj, git_off_t entry_start)
{
int i;
git_oid oid;
void *packed;
size_t entry_size;
unsigned int left;
struct entry *entry;
git_mwindow *w = NULL;
git_mwindow_file *mwf = &idx->pack->mwf;
struct git_pack_entry *pentry;
entry = git__calloc(1, sizeof(*entry));
......@@ -258,13 +376,9 @@ static int hash_and_save(git_indexer_stream *idx, git_rawobj *obj, git_off_t ent
entry->crc = crc32(0L, Z_NULL, 0);
entry_size = (size_t)(idx->off - entry_start);
packed = git_mwindow_open(mwf, &w, entry_start, entry_size, &left);
if (packed == NULL)
if (crc_object(&entry->crc, &idx->pack->mwf, entry_start, entry_size) < 0)
goto on_error;
entry->crc = htonl(crc32(entry->crc, packed, (uInt)entry_size));
git_mwindow_close(&w);
/* Add the object to the list */
if (git_vector_insert(&idx->objects, entry) < 0)
goto on_error;
......@@ -349,7 +463,7 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz
/* As the file grows any windows we try to use will be out of date */
git_mwindow_free_all(mwf);
while (processed < idx->nr_objects) {
git_rawobj obj;
git_packfile_stream *stream = &idx->stream;
git_off_t entry_start = idx->off;
size_t entry_size;
git_otype type;
......@@ -358,46 +472,71 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz
if (idx->pack->mwf.size <= idx->off + 20)
return 0;
error = git_packfile_unpack_header(&entry_size, &type, mwf, &w, &idx->off);
if (error == GIT_EBUFS) {
idx->off = entry_start;
return 0;
}
if (error < 0)
return -1;
git_mwindow_close(&w);
if (type == GIT_OBJ_REF_DELTA || type == GIT_OBJ_OFS_DELTA) {
error = store_delta(idx, entry_start, entry_size, type);
if (!idx->have_stream) {
error = git_packfile_unpack_header(&entry_size, &type, mwf, &w, &idx->off);
if (error == GIT_EBUFS) {
idx->off = entry_start;
return 0;
}
if (error < 0)
return error;
return -1;
git_mwindow_close(&w);
idx->entry_start = entry_start;
git_hash_ctx_init(&idx->hash_ctx);
if (type == GIT_OBJ_REF_DELTA || type == GIT_OBJ_OFS_DELTA) {
error = advance_delta_offset(idx, type);
if (error == GIT_EBUFS) {
idx->off = entry_start;
return 0;
}
if (error < 0)
return -1;
idx->have_delta = 1;
} else {
idx->have_delta = 0;
hash_header(&idx->hash_ctx, entry_size, type);
}
idx->have_stream = 1;
if (git_packfile_stream_open(stream, idx->pack, idx->off) < 0)
goto on_error;
stats->received_objects++;
do_progress_callback(idx, stats);
continue;
}
idx->off = entry_start;
error = git_packfile_unpack(&obj, idx->pack, &idx->off);
if (error == GIT_EBUFS) {
idx->off = entry_start;
return 0;
if (idx->have_delta) {
error = read_object_stream(stream);
} else {
error = hash_object_stream(&idx->hash_ctx, stream);
}
if (error < 0)
return -1;
if (hash_and_save(idx, &obj, entry_start) < 0)
idx->off = stream->curpos;
if (error == GIT_EBUFS)
return 0;
/* We want to free the stream reasorces no matter what here */
idx->have_stream = 0;
git_packfile_stream_free(stream);
if (error < 0)
goto on_error;
git__free(obj.data);
if (idx->have_delta) {
error = store_delta(idx);
} else {
error = store_object(idx);
}
stats->indexed_objects = (unsigned int)++processed;
if (error < 0)
goto on_error;
if (!idx->have_delta) {
stats->indexed_objects = (unsigned int)++processed;
}
stats->received_objects++;
do_progress_callback(idx, stats);
}
......
......@@ -34,7 +34,7 @@ typedef struct
static int load_alternates(git_odb *odb, const char *objects_dir, int alternate_depth);
static int format_object_header(char *hdr, size_t n, size_t obj_len, git_otype obj_type)
int git_odb__format_object_header(char *hdr, size_t n, size_t obj_len, git_otype obj_type)
{
const char *type_str = git_object_type2string(obj_type);
int len = p_snprintf(hdr, n, "%s %"PRIuZ, type_str, obj_len);
......@@ -55,7 +55,7 @@ int git_odb__hashobj(git_oid *id, git_rawobj *obj)
if (!obj->data && obj->len != 0)
return -1;
hdrlen = format_object_header(header, sizeof(header), obj->len, obj->type);
hdrlen = git_odb__format_object_header(header, sizeof(header), obj->len, obj->type);
vec[0].data = header;
vec[0].len = hdrlen;
......@@ -133,7 +133,7 @@ int git_odb__hashfd(git_oid *out, git_file fd, size_t size, git_otype type)
if ((error = git_hash_ctx_init(&ctx)) < 0)
return -1;
hdr_len = format_object_header(hdr, sizeof(hdr), size, type);
hdr_len = git_odb__format_object_header(hdr, sizeof(hdr), size, type);
if ((error = git_hash_update(&ctx, hdr, hdr_len)) < 0)
goto done;
......
......@@ -46,6 +46,10 @@ struct git_odb {
int git_odb__hashobj(git_oid *id, git_rawobj *obj);
/*
* Format the object header such as it would appear in the on-disk object
*/
int git_odb__format_object_header(char *hdr, size_t n, size_t obj_len, git_otype obj_type);
/*
* Hash an open file descriptor.
* This is a performance call when the contents of a fd need to be hashed,
* but the fd is already open and we have the size of the contents.
......
......@@ -441,6 +441,72 @@ static void use_git_free(void *opaq, void *ptr)
git__free(ptr);
}
int git_packfile_stream_open(git_packfile_stream *obj, struct git_pack_file *p, git_off_t curpos)
{
int st;
memset(obj, 0, sizeof(git_packfile_stream));
obj->curpos = curpos;
obj->p = p;
obj->zstream.zalloc = use_git_alloc;
obj->zstream.zfree = use_git_free;
obj->zstream.next_in = Z_NULL;
obj->zstream.next_out = Z_NULL;
st = inflateInit(&obj->zstream);
if (st != Z_OK) {
git__free(obj);
giterr_set(GITERR_ZLIB, "Failed to inflate packfile");
return -1;
}
return 0;
}
ssize_t git_packfile_stream_read(git_packfile_stream *obj, void *buffer, size_t len)
{
unsigned char *in;
size_t written;
int st;
if (obj->done)
return 0;
in = pack_window_open(obj->p, &obj->mw, obj->curpos, &obj->zstream.avail_in);
if (in == NULL)
return GIT_EBUFS;
obj->zstream.next_out = buffer;
obj->zstream.avail_out = len;
obj->zstream.next_in = in;
st = inflate(&obj->zstream, Z_SYNC_FLUSH);
git_mwindow_close(&obj->mw);
obj->curpos += obj->zstream.next_in - in;
written = len - obj->zstream.avail_out;
if (st != Z_OK && st != Z_STREAM_END) {
giterr_set(GITERR_ZLIB, "Failed to inflate packfile");
return -1;
}
if (st == Z_STREAM_END)
obj->done = 1;
/* If we didn't write anything out but we're not done, we need more data */
if (!written && st != Z_STREAM_END)
return GIT_EBUFS;
return written;
}
void git_packfile_stream_free(git_packfile_stream *obj)
{
inflateEnd(&obj->zstream);
}
int packfile_unpack_compressed(
git_rawobj *obj,
struct git_pack_file *p,
......
......@@ -8,6 +8,8 @@
#ifndef INCLUDE_pack_h__
#define INCLUDE_pack_h__
#include <zlib.h>
#include "git2/oid.h"
#include "common.h"
......@@ -76,6 +78,14 @@ struct git_pack_entry {
struct git_pack_file *p;
};
typedef struct git_packfile_stream {
git_off_t curpos;
int done;
z_stream zstream;
struct git_pack_file *p;
git_mwindow *mw;
} git_packfile_stream;
int git_packfile_unpack_header(
size_t *size_p,
git_otype *type_p,
......@@ -98,6 +108,10 @@ int packfile_unpack_compressed(
size_t size,
git_otype type);
int git_packfile_stream_open(git_packfile_stream *obj, struct git_pack_file *p, git_off_t curpos);
ssize_t git_packfile_stream_read(git_packfile_stream *obj, void *buffer, size_t len);
void git_packfile_stream_free(git_packfile_stream *obj);
git_off_t get_delta_base(struct git_pack_file *p, git_mwindow **w_curs,
git_off_t *curpos, git_otype type,
git_off_t delta_obj_offset);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment