Commit b176eded by Jameson Miller

Initial Implementation of progress reports during push

This adds the basics of progress reporting during push. While progress
for all aspects of a push operation are not reported with this change,
it lays the foundation to add these later. Push progress reporting
can be improved in the future - and consumers of the API should
just get more accurate information at that point.

The main areas where this is lacking are:

1) packbuilding progress: does not report progress during deltafication,
   as this involves coordinating progress from multiple threads.

2) network progress: reports progress as objects and bytes are going
   to be written to the subtransport (instead of as client gets
   confirmation that they have been received by the server) and leaves
   out some of the bytes that are transfered as part of the push protocol.
   Basically, this reports the pack bytes that are written to the
   subtransport. It does not report the bytes sent on the wire that
   are received by the server. This should be a good estimate of
   progress (and an improvement over no progress).
parent 5b09db15
...@@ -56,6 +56,8 @@ FUNCTION(TARGET_OS_LIBRARIES target) ...@@ -56,6 +56,8 @@ FUNCTION(TARGET_OS_LIBRARIES target)
TARGET_LINK_LIBRARIES(${target} ws2_32) TARGET_LINK_LIBRARIES(${target} ws2_32)
ELSEIF(CMAKE_SYSTEM_NAME MATCHES "(Solaris|SunOS)") ELSEIF(CMAKE_SYSTEM_NAME MATCHES "(Solaris|SunOS)")
TARGET_LINK_LIBRARIES(${target} socket nsl) TARGET_LINK_LIBRARIES(${target} socket nsl)
ELSEIF(CMAKE_SYSTEM_NAME MATCHES "Linux")
TARGET_LINK_LIBRARIES(${target} rt)
ENDIF () ENDIF ()
IF(THREADSAFE) IF(THREADSAFE)
TARGET_LINK_LIBRARIES(${target} ${CMAKE_THREAD_LIBS_INIT}) TARGET_LINK_LIBRARIES(${target} ${CMAKE_THREAD_LIBS_INIT})
......
...@@ -46,6 +46,14 @@ ...@@ -46,6 +46,14 @@
GIT_BEGIN_DECL GIT_BEGIN_DECL
/** /**
* Stages that are reported by the packbuilder progress callback.
*/
typedef enum {
GIT_PACKBUILDER_ADDING_OBJECTS = 0,
GIT_PACKBUILDER_DELTAFICATION = 1,
} git_packbuilder_stage_t;
/**
* Initialize a new packbuilder * Initialize a new packbuilder
* *
* @param out The new packbuilder object * @param out The new packbuilder object
...@@ -149,6 +157,28 @@ GIT_EXTERN(uint32_t) git_packbuilder_object_count(git_packbuilder *pb); ...@@ -149,6 +157,28 @@ GIT_EXTERN(uint32_t) git_packbuilder_object_count(git_packbuilder *pb);
*/ */
GIT_EXTERN(uint32_t) git_packbuilder_written(git_packbuilder *pb); GIT_EXTERN(uint32_t) git_packbuilder_written(git_packbuilder *pb);
/** Packbuilder progress notification function */
typedef void (*git_packbuilder_progress)(
int stage,
unsigned int current,
unsigned int total,
void *payload);
/**
* Set the callbacks for a packbuilder
*
* @param pb The packbuilder object
* @param progress_cb Function to call with progress information during
* pack building. Be aware that this is called inline with pack building
* operations, so performance may be affected.
* @param progress_cb_payload Payload for progress callback.
* @return 0 or an error code
*/
GIT_EXTERN(int) git_packbuilder_set_callbacks(
git_packbuilder *pb,
git_packbuilder_progress progress_cb,
void *progress_cb_payload);
/** /**
* Free the packbuilder and all associated data * Free the packbuilder and all associated data
* *
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#define INCLUDE_git_push_h__ #define INCLUDE_git_push_h__
#include "common.h" #include "common.h"
#include "pack.h"
/** /**
* @file git2/push.h * @file git2/push.h
...@@ -38,6 +39,13 @@ typedef struct { ...@@ -38,6 +39,13 @@ typedef struct {
#define GIT_PUSH_OPTIONS_VERSION 1 #define GIT_PUSH_OPTIONS_VERSION 1
#define GIT_PUSH_OPTIONS_INIT { GIT_PUSH_OPTIONS_VERSION } #define GIT_PUSH_OPTIONS_INIT { GIT_PUSH_OPTIONS_VERSION }
/** Push network progress notification function */
typedef void (*git_push_transfer_progress)(
unsigned int current,
unsigned int total,
size_t bytes,
void* payload);
/** /**
* Create a new push object * Create a new push object
* *
...@@ -61,6 +69,27 @@ GIT_EXTERN(int) git_push_set_options( ...@@ -61,6 +69,27 @@ GIT_EXTERN(int) git_push_set_options(
const git_push_options *opts); const git_push_options *opts);
/** /**
* Set the callbacks for a push
*
* @param push The push object
* @param pack_progress_cb Function to call with progress information during
* pack building. Be aware that this is called inline with pack building
* operations, so performance may be affected.
* @param pack_progress_cb_payload Payload for the pack progress callback.
* @param transfer_progress_cb Function to call with progress information during
* the upload portion of a push. Be aware that this is called inline with
* pack building operations, so performance may be affected.
* @param transfer_progress_cb_payload Payload for the network progress callback.
* @return 0 or an error code
*/
GIT_EXTERN(int) git_push_set_callbacks(
git_push *push,
git_packbuilder_progress pack_progress_cb,
void *pack_progress_cb_payload,
git_push_transfer_progress transfer_progress_cb,
void *transfer_progress_cb_payload);
/**
* Add a refspec to be pushed * Add a refspec to be pushed
* *
* @param push The push object * @param push The push object
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "pack.h" #include "pack.h"
#include "thread-utils.h" #include "thread-utils.h"
#include "tree.h" #include "tree.h"
#include "util.h"
#include "git2/pack.h" #include "git2/pack.h"
#include "git2/commit.h" #include "git2/commit.h"
...@@ -57,6 +58,9 @@ struct pack_write_context { ...@@ -57,6 +58,9 @@ struct pack_write_context {
#define git_packbuilder__progress_lock(pb) GIT_PACKBUILDER__MUTEX_OP(pb, progress_mutex, lock) #define git_packbuilder__progress_lock(pb) GIT_PACKBUILDER__MUTEX_OP(pb, progress_mutex, lock)
#define git_packbuilder__progress_unlock(pb) GIT_PACKBUILDER__MUTEX_OP(pb, progress_mutex, unlock) #define git_packbuilder__progress_unlock(pb) GIT_PACKBUILDER__MUTEX_OP(pb, progress_mutex, unlock)
/* The minimal interval between progress updates (in seconds). */
#define MIN_PROGRESS_UPDATE_INTERVAL 0.5
static unsigned name_hash(const char *name) static unsigned name_hash(const char *name)
{ {
unsigned c, hash = 0; unsigned c, hash = 0;
...@@ -212,6 +216,14 @@ int git_packbuilder_insert(git_packbuilder *pb, const git_oid *oid, ...@@ -212,6 +216,14 @@ int git_packbuilder_insert(git_packbuilder *pb, const git_oid *oid,
assert(ret != 0); assert(ret != 0);
kh_value(pb->object_ix, pos) = po; kh_value(pb->object_ix, pos) = po;
if (pb->progress_cb) {
double current_time = git__timer();
if ((current_time - pb->last_progress_report_time) >= MIN_PROGRESS_UPDATE_INTERVAL) {
pb->last_progress_report_time = current_time;
pb->progress_cb(GIT_PACKBUILDER_ADDING_OBJECTS, pb->nr_objects, 0, pb->progress_cb_payload);
}
}
pb->done = false; pb->done = false;
return 0; return 0;
} }
...@@ -1207,6 +1219,13 @@ static int prepare_pack(git_packbuilder *pb) ...@@ -1207,6 +1219,13 @@ static int prepare_pack(git_packbuilder *pb)
if (pb->nr_objects == 0 || pb->done) if (pb->nr_objects == 0 || pb->done)
return 0; /* nothing to do */ return 0; /* nothing to do */
/*
* Although we do not report progress during deltafication, we
* at least report that we are in the deltafication stage
*/
if (pb->progress_cb)
pb->progress_cb(GIT_PACKBUILDER_DELTAFICATION, 0, pb->nr_objects, pb->progress_cb_payload);
delta_list = git__malloc(pb->nr_objects * sizeof(*delta_list)); delta_list = git__malloc(pb->nr_objects * sizeof(*delta_list));
GITERR_CHECK_ALLOC(delta_list); GITERR_CHECK_ALLOC(delta_list);
...@@ -1348,6 +1367,17 @@ uint32_t git_packbuilder_written(git_packbuilder *pb) ...@@ -1348,6 +1367,17 @@ uint32_t git_packbuilder_written(git_packbuilder *pb)
return pb->nr_written; return pb->nr_written;
} }
int git_packbuilder_set_callbacks(git_packbuilder *pb, git_packbuilder_progress progress_cb, void *progress_cb_payload)
{
if (!pb)
return -1;
pb->progress_cb = progress_cb;
pb->progress_cb_payload = progress_cb_payload;
return 0;
}
void git_packbuilder_free(git_packbuilder *pb) void git_packbuilder_free(git_packbuilder *pb)
{ {
if (pb == NULL) if (pb == NULL)
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "netops.h" #include "netops.h"
#include "git2/oid.h" #include "git2/oid.h"
#include "git2/pack.h"
#define GIT_PACK_WINDOW 10 /* number of objects to possibly delta against */ #define GIT_PACK_WINDOW 10 /* number of objects to possibly delta against */
#define GIT_PACK_DEPTH 50 /* max delta depth */ #define GIT_PACK_DEPTH 50 /* max delta depth */
...@@ -79,6 +80,10 @@ struct git_packbuilder { ...@@ -79,6 +80,10 @@ struct git_packbuilder {
int nr_threads; /* nr of threads to use */ int nr_threads; /* nr of threads to use */
git_packbuilder_progress progress_cb;
void *progress_cb_payload;
double last_progress_report_time; /* the time progress was last reported */
bool done; bool done;
}; };
......
...@@ -70,6 +70,25 @@ int git_push_set_options(git_push *push, const git_push_options *opts) ...@@ -70,6 +70,25 @@ int git_push_set_options(git_push *push, const git_push_options *opts)
return 0; return 0;
} }
int git_push_set_callbacks(
git_push *push,
git_packbuilder_progress pack_progress_cb,
void *pack_progress_cb_payload,
git_push_transfer_progress transfer_progress_cb,
void *transfer_progress_cb_payload)
{
if (!push)
return -1;
push->pack_progress_cb = pack_progress_cb;
push->pack_progress_cb_payload = pack_progress_cb_payload;
push->transfer_progress_cb = transfer_progress_cb;
push->transfer_progress_cb_payload = transfer_progress_cb_payload;
return 0;
}
static void free_refspec(push_spec *spec) static void free_refspec(push_spec *spec)
{ {
if (spec == NULL) if (spec == NULL)
...@@ -583,6 +602,10 @@ static int do_push(git_push *push) ...@@ -583,6 +602,10 @@ static int do_push(git_push *push)
git_packbuilder_set_threads(push->pb, push->pb_parallelism); git_packbuilder_set_threads(push->pb, push->pb_parallelism);
if (push->pack_progress_cb)
if ((error = git_packbuilder_set_callbacks(push->pb, push->pack_progress_cb, push->pack_progress_cb_payload)) < 0)
goto on_error;
if ((error = calculate_work(push)) < 0 || if ((error = calculate_work(push)) < 0 ||
(error = queue_objects(push)) < 0 || (error = queue_objects(push)) < 0 ||
(error = transport->push(transport, push)) < 0) (error = transport->push(transport, push)) < 0)
......
...@@ -39,6 +39,11 @@ struct git_push { ...@@ -39,6 +39,11 @@ struct git_push {
/* options */ /* options */
unsigned pb_parallelism; unsigned pb_parallelism;
git_packbuilder_progress pack_progress_cb;
void *pack_progress_cb_payload;
git_push_transfer_progress transfer_progress_cb;
void *transfer_progress_cb_payload;
}; };
/** /**
......
...@@ -13,8 +13,11 @@ ...@@ -13,8 +13,11 @@
#include "push.h" #include "push.h"
#include "pack-objects.h" #include "pack-objects.h"
#include "remote.h" #include "remote.h"
#include "util.h"
#define NETWORK_XFER_THRESHOLD (100*1024) #define NETWORK_XFER_THRESHOLD (100*1024)
/* The minimal interval between progress updates (in seconds). */
#define MIN_PROGRESS_UPDATE_INTERVAL 0.5
int git_smart__store_refs(transport_smart *t, int flushes) int git_smart__store_refs(transport_smart *t, int flushes)
{ {
...@@ -801,22 +804,53 @@ static int update_refs_from_report( ...@@ -801,22 +804,53 @@ static int update_refs_from_report(
return 0; return 0;
} }
struct push_packbuilder_payload
{
git_smart_subtransport_stream *stream;
git_packbuilder *pb;
git_push_transfer_progress cb;
void *cb_payload;
size_t last_bytes;
double last_progress_report_time;
};
static int stream_thunk(void *buf, size_t size, void *data) static int stream_thunk(void *buf, size_t size, void *data)
{ {
git_smart_subtransport_stream *s = (git_smart_subtransport_stream *)data; int error = 0;
struct push_packbuilder_payload *payload = data;
if ((error = payload->stream->write(payload->stream, (const char *)buf, size)) < 0)
return error;
if (payload->cb) {
double current_time = git__timer();
payload->last_bytes += size;
return s->write(s, (const char *)buf, size); if ((current_time - payload->last_progress_report_time) >= MIN_PROGRESS_UPDATE_INTERVAL) {
payload->last_progress_report_time = current_time;
payload->cb(payload->pb->nr_written, payload->pb->nr_objects, payload->last_bytes, payload->cb_payload);
}
}
return error;
} }
int git_smart__push(git_transport *transport, git_push *push) int git_smart__push(git_transport *transport, git_push *push)
{ {
transport_smart *t = (transport_smart *)transport; transport_smart *t = (transport_smart *)transport;
git_smart_subtransport_stream *s; struct push_packbuilder_payload packbuilder_payload = {0};
git_buf pktline = GIT_BUF_INIT; git_buf pktline = GIT_BUF_INIT;
int error = -1, need_pack = 0; int error = -1, need_pack = 0;
push_spec *spec; push_spec *spec;
unsigned int i; unsigned int i;
packbuilder_payload.pb = push->pb;
if (push->transfer_progress_cb) {
packbuilder_payload.cb = push->transfer_progress_cb;
packbuilder_payload.cb_payload = push->transfer_progress_cb_payload;
}
#ifdef PUSH_DEBUG #ifdef PUSH_DEBUG
{ {
git_remote_head *head; git_remote_head *head;
...@@ -848,12 +882,12 @@ int git_smart__push(git_transport *transport, git_push *push) ...@@ -848,12 +882,12 @@ int git_smart__push(git_transport *transport, git_push *push)
} }
} }
if (git_smart__get_push_stream(t, &s) < 0 || if (git_smart__get_push_stream(t, &packbuilder_payload.stream) < 0 ||
gen_pktline(&pktline, push) < 0 || gen_pktline(&pktline, push) < 0 ||
s->write(s, git_buf_cstr(&pktline), git_buf_len(&pktline)) < 0) packbuilder_payload.stream->write(packbuilder_payload.stream, git_buf_cstr(&pktline), git_buf_len(&pktline)) < 0)
goto on_error; goto on_error;
if (need_pack && git_packbuilder_foreach(push->pb, &stream_thunk, s) < 0) if (need_pack && git_packbuilder_foreach(push->pb, &stream_thunk, &packbuilder_payload) < 0)
goto on_error; goto on_error;
/* If we sent nothing or the server doesn't support report-status, then /* If we sent nothing or the server doesn't support report-status, then
...@@ -863,6 +897,11 @@ int git_smart__push(git_transport *transport, git_push *push) ...@@ -863,6 +897,11 @@ int git_smart__push(git_transport *transport, git_push *push)
else if (parse_report(&t->buffer, push) < 0) else if (parse_report(&t->buffer, push) < 0)
goto on_error; goto on_error;
/* If progress is being reported write the final report */
if (push->transfer_progress_cb) {
push->transfer_progress_cb(push->pb->nr_written, push->pb->nr_objects, packbuilder_payload.last_bytes, push->transfer_progress_cb_payload);
}
if (push->status.length && if (push->status.length &&
update_refs_from_report(&t->refs, &push->specs, &push->status) < 0) update_refs_from_report(&t->refs, &push->specs, &push->status) < 0)
goto on_error; goto on_error;
......
...@@ -353,4 +353,65 @@ GIT_INLINE(void) git__memzero(void *data, size_t size) ...@@ -353,4 +353,65 @@ GIT_INLINE(void) git__memzero(void *data, size_t size)
#endif #endif
} }
#ifdef GIT_WIN32
GIT_INLINE(double) git__timer(void)
{
/* We need the initial tick count to detect if the tick
* count has rolled over. */
static DWORD initial_tick_count = 0;
/* GetTickCount returns the number of milliseconds that have
* elapsed since the system was started. */
DWORD count = GetTickCount();
if(initial_tick_count == 0) {
initial_tick_count = count;
} else if (count < initial_tick_count) {
/* The tick count has rolled over - adjust for it. */
count = (0xFFFFFFFF - initial_tick_count) + count;
}
return (double) count / (double) 1000;
}
#elif __APPLE__
#include <mach/mach_time.h>
double git__timer(void)
{
uint64_t time = mach_absolute_time();
static double scaling_factor = 0;
if (scaling_factor == 0) {
mach_timebase_info_data_t info;
(void)mach_timebase_info(&info);
scaling_factor = (double)info.numer / (double)info.denom;
}
return (double)time * scaling_factor / 1.0E-9;
}
#else
#include <sys/time.h>
GIT_INLINE(double) git__timer(void)
{
struct timespec tp;
if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) {
return (double) tp.tv_sec + (double) tp.tv_nsec / 1E-9;
} else {
/* Fall back to using gettimeofday */
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
return (double)tv.tv_sec + (double)tv.tv_usec / 1E-6;
}
}
#endif
#endif /* INCLUDE_util_h__ */ #endif /* INCLUDE_util_h__ */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment