Commit b4491b99 by Philip Kelley

Incremental improvements to pack-objects logic

Incorporate feedback for incr. improvements to pack-objects
parent 03452b34
...@@ -30,6 +30,25 @@ struct unpacked { ...@@ -30,6 +30,25 @@ struct unpacked {
unsigned int depth; unsigned int depth;
}; };
#ifdef GIT_THREADS
#define GIT_PACKBUILDER__MUTEX_OP(pb, mtx, op) do { \
int result = git_mutex_##op(&(pb)->mtx); \
assert(!result); \
GIT_UNUSED(result); \
} while (0)
#else
#define GIT_PACKBUILDER__MUTEX_OP(pb,mtx,op) GIT_UNUSED(pb)
#endif /* GIT_THREADS */
#define git_packbuilder__cache_lock(pb) GIT_PACKBUILDER__MUTEX_OP(pb, cache_mutex, lock)
#define git_packbuilder__cache_unlock(pb) GIT_PACKBUILDER__MUTEX_OP(pb, cache_mutex, unlock)
#define git_packbuilder__progress_lock(pb) GIT_PACKBUILDER__MUTEX_OP(pb, progress_mutex, lock)
#define git_packbuilder__progress_unlock(pb) GIT_PACKBUILDER__MUTEX_OP(pb, progress_mutex, unlock)
static unsigned name_hash(const char *name) static unsigned name_hash(const char *name)
{ {
unsigned c, hash = 0; unsigned c, hash = 0;
...@@ -84,29 +103,37 @@ int git_packbuilder_new(git_packbuilder **out, git_repository *repo) ...@@ -84,29 +103,37 @@ int git_packbuilder_new(git_packbuilder **out, git_repository *repo)
*out = NULL; *out = NULL;
pb = git__malloc(sizeof(*pb)); pb = git__calloc(sizeof(*pb), 1);
GITERR_CHECK_ALLOC(pb); GITERR_CHECK_ALLOC(pb);
memset(pb, 0x0, sizeof(*pb));
pb->object_ix = git_oidmap_alloc(); pb->object_ix = git_oidmap_alloc();
GITERR_CHECK_ALLOC(pb->object_ix);
if (!pb->object_ix)
goto on_error;
pb->repo = repo; pb->repo = repo;
pb->nr_threads = 1; /* do not spawn any thread by default */ pb->nr_threads = 1; /* do not spawn any thread by default */
pb->ctx = git_hash_new_ctx(); pb->ctx = git_hash_new_ctx();
if (git_repository_odb(&pb->odb, repo) < 0) if (!pb->ctx ||
git_repository_odb(&pb->odb, repo) < 0 ||
packbuilder_config(pb) < 0)
goto on_error; goto on_error;
if (packbuilder_config(pb) < 0) #ifdef GIT_THREADS
if (git_mutex_init(&pb->cache_mutex) ||
git_mutex_init(&pb->progress_mutex) ||
git_cond_init(&pb->progress_cond))
goto on_error; goto on_error;
#endif
*out = pb; *out = pb;
return 0; return 0;
on_error: on_error:
git__free(pb); git_packbuilder_free(pb);
return -1; return -1;
} }
...@@ -134,12 +161,13 @@ int git_packbuilder_insert(git_packbuilder *pb, const git_oid *oid, ...@@ -134,12 +161,13 @@ int git_packbuilder_insert(git_packbuilder *pb, const git_oid *oid,
const char *name) const char *name)
{ {
git_pobject *po; git_pobject *po;
git_odb_object *obj;
khiter_t pos; khiter_t pos;
int ret; int ret;
assert(pb && oid); assert(pb && oid);
/* If the object already exists in the hash table, then we don't
* have any work to do */
pos = kh_get(oid, pb->object_ix, oid); pos = kh_get(oid, pb->object_ix, oid);
if (pos != kh_end(pb->object_ix)) if (pos != kh_end(pb->object_ix))
return 0; return 0;
...@@ -152,16 +180,14 @@ int git_packbuilder_insert(git_packbuilder *pb, const git_oid *oid, ...@@ -152,16 +180,14 @@ int git_packbuilder_insert(git_packbuilder *pb, const git_oid *oid,
rehash(pb); rehash(pb);
} }
if (git_odb_read(&obj, pb->odb, oid) < 0) po = pb->object_list + pb->nr_objects;
return -1;
po = pb->object_list + pb->nr_objects++;
memset(po, 0x0, sizeof(*po)); memset(po, 0x0, sizeof(*po));
if (git_odb_read_header(&po->size, &po->type, pb->odb, oid) < 0)
return -1;
pb->nr_objects++;
git_oid_cpy(&po->id, oid); git_oid_cpy(&po->id, oid);
po->type = git_odb_object_type(obj);
po->size = git_odb_object_size(obj);
git_odb_object_free(obj);
po->hash = name_hash(name); po->hash = name_hash(name);
pos = kh_put(oid, pb->object_ix, &po->id, &ret); pos = kh_put(oid, pb->object_ix, &po->id, &ret);
...@@ -653,24 +679,6 @@ static int delta_cacheable(git_packbuilder *pb, unsigned long src_size, ...@@ -653,24 +679,6 @@ static int delta_cacheable(git_packbuilder *pb, unsigned long src_size,
return 0; return 0;
} }
#ifdef GIT_THREADS
static git_mutex cache_mutex;
#define cache_lock() git_mutex_lock(&cache_mutex);
#define cache_unlock() git_mutex_unlock(&cache_mutex);
static git_mutex progress_mutex;
#define progress_lock() git_mutex_lock(&progress_mutex);
#define progress_unlock() git_mutex_unlock(&progress_mutex);
static git_cond progress_cond;
#else
#define cache_lock() (void)0;
#define cache_unlock() (void)0;
#define progress_lock() (void)0;
#define progress_unlock() (void)0;
#endif
static int try_delta(git_packbuilder *pb, struct unpacked *trg, static int try_delta(git_packbuilder *pb, struct unpacked *trg,
struct unpacked *src, unsigned int max_depth, struct unpacked *src, unsigned int max_depth,
unsigned long *mem_usage, int *ret) unsigned long *mem_usage, int *ret)
...@@ -780,7 +788,7 @@ static int try_delta(git_packbuilder *pb, struct unpacked *trg, ...@@ -780,7 +788,7 @@ static int try_delta(git_packbuilder *pb, struct unpacked *trg,
} }
} }
cache_lock(); git_packbuilder__cache_lock(pb);
if (trg_object->delta_data) { if (trg_object->delta_data) {
git__free(trg_object->delta_data); git__free(trg_object->delta_data);
pb->delta_cache_size -= trg_object->delta_size; pb->delta_cache_size -= trg_object->delta_size;
...@@ -788,13 +796,13 @@ static int try_delta(git_packbuilder *pb, struct unpacked *trg, ...@@ -788,13 +796,13 @@ static int try_delta(git_packbuilder *pb, struct unpacked *trg,
} }
if (delta_cacheable(pb, src_size, trg_size, delta_size)) { if (delta_cacheable(pb, src_size, trg_size, delta_size)) {
pb->delta_cache_size += delta_size; pb->delta_cache_size += delta_size;
cache_unlock(); git_packbuilder__cache_unlock(pb);
trg_object->delta_data = git__realloc(delta_buf, delta_size); trg_object->delta_data = git__realloc(delta_buf, delta_size);
GITERR_CHECK_ALLOC(trg_object->delta_data); GITERR_CHECK_ALLOC(trg_object->delta_data);
} else { } else {
/* create delta when writing the pack */ /* create delta when writing the pack */
cache_unlock(); git_packbuilder__cache_unlock(pb);
git__free(delta_buf); git__free(delta_buf);
} }
...@@ -855,15 +863,15 @@ static int find_deltas(git_packbuilder *pb, git_pobject **list, ...@@ -855,15 +863,15 @@ static int find_deltas(git_packbuilder *pb, git_pobject **list,
unsigned int max_depth; unsigned int max_depth;
int j, best_base = -1; int j, best_base = -1;
progress_lock(); git_packbuilder__progress_lock(pb);
if (!*list_size) { if (!*list_size) {
progress_unlock(); git_packbuilder__progress_unlock(pb);
break; break;
} }
po = *list++; po = *list++;
(*list_size)--; (*list_size)--;
progress_unlock(); git_packbuilder__progress_unlock(pb);
mem_usage -= free_unpacked(n); mem_usage -= free_unpacked(n);
n->object = po; n->object = po;
...@@ -935,10 +943,10 @@ static int find_deltas(git_packbuilder *pb, git_pobject **list, ...@@ -935,10 +943,10 @@ static int find_deltas(git_packbuilder *pb, git_pobject **list,
po->z_delta_size = zbuf.size; po->z_delta_size = zbuf.size;
git_buf_clear(&zbuf); git_buf_clear(&zbuf);
cache_lock(); git_packbuilder__cache_lock(pb);
pb->delta_cache_size -= po->delta_size; pb->delta_cache_size -= po->delta_size;
pb->delta_cache_size += po->z_delta_size; pb->delta_cache_size += po->z_delta_size;
cache_unlock(); git_packbuilder__cache_unlock(pb);
} }
/* /*
...@@ -1006,22 +1014,6 @@ struct thread_params { ...@@ -1006,22 +1014,6 @@ struct thread_params {
int data_ready; int data_ready;
}; };
static void init_threaded_search(void)
{
git_mutex_init(&cache_mutex);
git_mutex_init(&progress_mutex);
git_cond_init(&progress_cond);
}
static void cleanup_threaded_search(void)
{
git_cond_free(&progress_cond);
git_mutex_free(&cache_mutex);
git_mutex_free(&progress_mutex);
}
static void *threaded_find_deltas(void *arg) static void *threaded_find_deltas(void *arg)
{ {
struct thread_params *me = arg; struct thread_params *me = arg;
...@@ -1032,10 +1024,10 @@ static void *threaded_find_deltas(void *arg) ...@@ -1032,10 +1024,10 @@ static void *threaded_find_deltas(void *arg)
; /* TODO */ ; /* TODO */
} }
progress_lock(); git_packbuilder__progress_lock(me->pb);
me->working = 0; me->working = 0;
git_cond_signal(&progress_cond); git_cond_signal(&me->pb->progress_cond);
progress_unlock(); git_packbuilder__progress_unlock(me->pb);
/* /*
* We must not set ->data_ready before we wait on the * We must not set ->data_ready before we wait on the
...@@ -1062,13 +1054,11 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list, ...@@ -1062,13 +1054,11 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list,
struct thread_params *p; struct thread_params *p;
int i, ret, active_threads = 0; int i, ret, active_threads = 0;
init_threaded_search();
if (!pb->nr_threads) if (!pb->nr_threads)
pb->nr_threads = git_online_cpus(); pb->nr_threads = git_online_cpus();
if (pb->nr_threads <= 1) { if (pb->nr_threads <= 1) {
find_deltas(pb, list, &list_size, window, depth); find_deltas(pb, list, &list_size, window, depth);
cleanup_threaded_search();
return 0; return 0;
} }
...@@ -1133,20 +1123,28 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list, ...@@ -1133,20 +1123,28 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list,
struct thread_params *victim = NULL; struct thread_params *victim = NULL;
unsigned sub_size = 0; unsigned sub_size = 0;
progress_lock(); /* Start by locating a thread that has transitioned its
* 'working' flag from 1 -> 0. This indicates that it is
* ready to receive more work using our work-stealing
* algorithm. */
git_packbuilder__progress_lock(pb);
for (;;) { for (;;) {
for (i = 0; !target && i < pb->nr_threads; i++) for (i = 0; !target && i < pb->nr_threads; i++)
if (!p[i].working) if (!p[i].working)
target = &p[i]; target = &p[i];
if (target) if (target)
break; break;
git_cond_wait(&progress_cond, &progress_mutex); git_cond_wait(&pb->progress_cond, &pb->progress_mutex);
} }
/* At this point we hold the progress lock and have located
* a thread to receive more work. We still need to locate a
* thread from which to steal work (the victim). */
for (i = 0; i < pb->nr_threads; i++) for (i = 0; i < pb->nr_threads; i++)
if (p[i].remaining > 2*window && if (p[i].remaining > 2*window &&
(!victim || victim->remaining < p[i].remaining)) (!victim || victim->remaining < p[i].remaining))
victim = &p[i]; victim = &p[i];
if (victim) { if (victim) {
sub_size = victim->remaining / 2; sub_size = victim->remaining / 2;
list = victim->list + victim->list_size - sub_size; list = victim->list + victim->list_size - sub_size;
...@@ -1172,7 +1170,7 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list, ...@@ -1172,7 +1170,7 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list,
target->list_size = sub_size; target->list_size = sub_size;
target->remaining = sub_size; target->remaining = sub_size;
target->working = 1; target->working = 1;
progress_unlock(); git_packbuilder__progress_unlock(pb);
git_mutex_lock(&target->mutex); git_mutex_lock(&target->mutex);
target->data_ready = 1; target->data_ready = 1;
...@@ -1187,7 +1185,6 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list, ...@@ -1187,7 +1185,6 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list,
} }
} }
cleanup_threaded_search();
git__free(p); git__free(p);
return 0; return 0;
} }
...@@ -1196,18 +1193,6 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list, ...@@ -1196,18 +1193,6 @@ static int ll_find_deltas(git_packbuilder *pb, git_pobject **list,
#define ll_find_deltas(pb, l, ls, w, d) find_deltas(pb, l, &ls, w, d) #define ll_find_deltas(pb, l, ls, w, d) find_deltas(pb, l, &ls, w, d)
#endif #endif
static void get_object_details(git_packbuilder *pb)
{
git_pobject *po;
unsigned int i;
for (i = 0; i < pb->nr_objects; ++i) {
po = &pb->object_list[i];
if (pb->big_file_threshold < po->size)
po->no_try_delta = 1;
}
}
static int prepare_pack(git_packbuilder *pb) static int prepare_pack(git_packbuilder *pb)
{ {
git_pobject **delta_list; git_pobject **delta_list;
...@@ -1216,18 +1201,14 @@ static int prepare_pack(git_packbuilder *pb) ...@@ -1216,18 +1201,14 @@ static int prepare_pack(git_packbuilder *pb)
if (pb->nr_objects == 0 || pb->done) if (pb->nr_objects == 0 || pb->done)
return 0; /* nothing to do */ return 0; /* nothing to do */
get_object_details(pb);
delta_list = git__malloc(pb->nr_objects * sizeof(*delta_list)); delta_list = git__malloc(pb->nr_objects * sizeof(*delta_list));
GITERR_CHECK_ALLOC(delta_list); GITERR_CHECK_ALLOC(delta_list);
for (i = 0; i < pb->nr_objects; ++i) { for (i = 0; i < pb->nr_objects; ++i) {
git_pobject *po = pb->object_list + i; git_pobject *po = pb->object_list + i;
if (po->size < 50) /* Make sure the item is within our size limits */
continue; if (po->size < 50 || po->size > pb->big_file_threshold)
if (po->no_try_delta)
continue; continue;
delta_list[n++] = po; delta_list[n++] = po;
...@@ -1310,9 +1291,25 @@ void git_packbuilder_free(git_packbuilder *pb) ...@@ -1310,9 +1291,25 @@ void git_packbuilder_free(git_packbuilder *pb)
if (pb == NULL) if (pb == NULL)
return; return;
git_odb_free(pb->odb); #ifdef GIT_THREADS
git_hash_free_ctx(pb->ctx);
git_oidmap_free(pb->object_ix); git_mutex_free(&pb->cache_mutex);
git__free(pb->object_list); git_mutex_free(&pb->progress_mutex);
git_cond_free(&pb->progress_cond);
#endif
if (pb->odb)
git_odb_free(pb->odb);
if (pb->ctx)
git_hash_free_ctx(pb->ctx);
if (pb->object_ix)
git_oidmap_free(pb->object_ix);
if (pb->object_list)
git__free(pb->object_list);
git__free(pb); git__free(pb);
} }
...@@ -43,7 +43,6 @@ typedef struct git_pobject { ...@@ -43,7 +43,6 @@ typedef struct git_pobject {
int written:1, int written:1,
recursing:1, recursing:1,
no_try_delta:1,
tagged:1, tagged:1,
filled:1; filled:1;
} git_pobject; } git_pobject;
...@@ -65,6 +64,11 @@ struct git_packbuilder { ...@@ -65,6 +64,11 @@ struct git_packbuilder {
git_oid pack_oid; /* hash of written pack */ git_oid pack_oid; /* hash of written pack */
/* synchronization objects */
git_mutex cache_mutex;
git_mutex progress_mutex;
git_cond progress_cond;
/* configs */ /* configs */
unsigned long delta_cache_size; unsigned long delta_cache_size;
unsigned long max_delta_cache_size; unsigned long max_delta_cache_size;
...@@ -77,8 +81,7 @@ struct git_packbuilder { ...@@ -77,8 +81,7 @@ struct git_packbuilder {
bool done; bool done;
}; };
int git_packbuilder_send(git_packbuilder *pb, git_transport *t); int git_packbuilder_send(git_packbuilder *pb, git_transport *t);
int git_packbuilder_write_buf(git_buf *buf, git_packbuilder *pb); int git_packbuilder_write_buf(git_buf *buf, git_packbuilder *pb);
#endif #endif /* INCLUDE_pack_objects_h__ */
...@@ -78,6 +78,7 @@ int pthread_cond_destroy(pthread_cond_t *cond) ...@@ -78,6 +78,7 @@ int pthread_cond_destroy(pthread_cond_t *cond)
closed = CloseHandle(*cond); closed = CloseHandle(*cond);
assert(closed); assert(closed);
GIT_UNUSED(closed);
*cond = NULL; *cond = NULL;
return 0; return 0;
...@@ -99,6 +100,7 @@ int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) ...@@ -99,6 +100,7 @@ int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
wait_result = WaitForSingleObject(*cond, INFINITE); wait_result = WaitForSingleObject(*cond, INFINITE);
assert(WAIT_OBJECT_0 == wait_result); assert(WAIT_OBJECT_0 == wait_result);
GIT_UNUSED(wait_result);
return pthread_mutex_lock(mutex); return pthread_mutex_lock(mutex);
} }
...@@ -112,6 +114,7 @@ int pthread_cond_signal(pthread_cond_t *cond) ...@@ -112,6 +114,7 @@ int pthread_cond_signal(pthread_cond_t *cond)
signaled = SetEvent(*cond); signaled = SetEvent(*cond);
assert(signaled); assert(signaled);
GIT_UNUSED(signaled);
return 0; return 0;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment