Commit 5b188225 by Jameson Miller

Support cancellation in push operation

This commit adds cancellation for the push operation. This work consists of:

1) Support cancellation during push operation
    - During object counting phase
    - During network transfer phase
        - Propagate GIT_EUSER error code out to caller
2) Improve cancellation support during fetch
    - Handle cancellation request during network transfer phase
    - Clear error string when cancelled during indexing
3) Fix error handling in git_smart__download_pack

Cancellation during push is still only handled in the pack building and
network transfer stages of push (and not during packbuilding).
parent 5bfead1d
...@@ -158,7 +158,7 @@ GIT_EXTERN(uint32_t) git_packbuilder_object_count(git_packbuilder *pb); ...@@ -158,7 +158,7 @@ GIT_EXTERN(uint32_t) git_packbuilder_object_count(git_packbuilder *pb);
GIT_EXTERN(uint32_t) git_packbuilder_written(git_packbuilder *pb); GIT_EXTERN(uint32_t) git_packbuilder_written(git_packbuilder *pb);
/** Packbuilder progress notification function */ /** Packbuilder progress notification function */
typedef void (*git_packbuilder_progress)( typedef int (*git_packbuilder_progress)(
int stage, int stage,
unsigned int current, unsigned int current,
unsigned int total, unsigned int total,
......
...@@ -40,7 +40,7 @@ typedef struct { ...@@ -40,7 +40,7 @@ typedef struct {
#define GIT_PUSH_OPTIONS_INIT { GIT_PUSH_OPTIONS_VERSION } #define GIT_PUSH_OPTIONS_INIT { GIT_PUSH_OPTIONS_VERSION }
/** Push network progress notification function */ /** Push network progress notification function */
typedef void (*git_push_transfer_progress)( typedef int (*git_push_transfer_progress)(
unsigned int current, unsigned int current,
unsigned int total, unsigned int total,
size_t bytes, size_t bytes,
......
...@@ -556,6 +556,7 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz ...@@ -556,6 +556,7 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz
stats->received_objects++; stats->received_objects++;
if (do_progress_callback(idx, stats) != 0) { if (do_progress_callback(idx, stats) != 0) {
giterr_clear();
error = GIT_EUSER; error = GIT_EUSER;
goto on_error; goto on_error;
} }
......
...@@ -216,15 +216,19 @@ int git_packbuilder_insert(git_packbuilder *pb, const git_oid *oid, ...@@ -216,15 +216,19 @@ int git_packbuilder_insert(git_packbuilder *pb, const git_oid *oid,
assert(ret != 0); assert(ret != 0);
kh_value(pb->object_ix, pos) = po; kh_value(pb->object_ix, pos) = po;
pb->done = false;
if (pb->progress_cb) { if (pb->progress_cb) {
double current_time = git__timer(); double current_time = git__timer();
if ((current_time - pb->last_progress_report_time) >= MIN_PROGRESS_UPDATE_INTERVAL) { if ((current_time - pb->last_progress_report_time) >= MIN_PROGRESS_UPDATE_INTERVAL) {
pb->last_progress_report_time = current_time; pb->last_progress_report_time = current_time;
pb->progress_cb(GIT_PACKBUILDER_ADDING_OBJECTS, pb->nr_objects, 0, pb->progress_cb_payload); if (pb->progress_cb(GIT_PACKBUILDER_ADDING_OBJECTS, pb->nr_objects, 0, pb->progress_cb_payload)) {
giterr_clear();
return GIT_EUSER;
}
} }
} }
pb->done = false;
return 0; return 0;
} }
...@@ -591,49 +595,50 @@ static int write_pack(git_packbuilder *pb, ...@@ -591,49 +595,50 @@ static int write_pack(git_packbuilder *pb,
enum write_one_status status; enum write_one_status status;
struct git_pack_header ph; struct git_pack_header ph;
unsigned int i = 0; unsigned int i = 0;
int error = 0;
write_order = compute_write_order(pb); write_order = compute_write_order(pb);
if (write_order == NULL) if (write_order == NULL) {
goto on_error; error = -1;
goto done;
}
/* Write pack header */ /* Write pack header */
ph.hdr_signature = htonl(PACK_SIGNATURE); ph.hdr_signature = htonl(PACK_SIGNATURE);
ph.hdr_version = htonl(PACK_VERSION); ph.hdr_version = htonl(PACK_VERSION);
ph.hdr_entries = htonl(pb->nr_objects); ph.hdr_entries = htonl(pb->nr_objects);
if (cb(&ph, sizeof(ph), data) < 0) if ((error = cb(&ph, sizeof(ph), data)) < 0)
goto on_error; goto done;
if (git_hash_update(&pb->ctx, &ph, sizeof(ph)) < 0) if ((error = git_hash_update(&pb->ctx, &ph, sizeof(ph))) < 0)
goto on_error; goto done;
pb->nr_remaining = pb->nr_objects; pb->nr_remaining = pb->nr_objects;
do { do {
pb->nr_written = 0; pb->nr_written = 0;
for ( ; i < pb->nr_objects; ++i) { for ( ; i < pb->nr_objects; ++i) {
po = write_order[i]; po = write_order[i];
if (write_one(&buf, pb, po, &status) < 0) if ((error = write_one(&buf, pb, po, &status)) < 0)
goto on_error; goto done;
if (cb(buf.ptr, buf.size, data) < 0) if ((error = cb(buf.ptr, buf.size, data)) < 0)
goto on_error; goto done;
git_buf_clear(&buf); git_buf_clear(&buf);
} }
pb->nr_remaining -= pb->nr_written; pb->nr_remaining -= pb->nr_written;
} while (pb->nr_remaining && i < pb->nr_objects); } while (pb->nr_remaining && i < pb->nr_objects);
git__free(write_order);
git_buf_free(&buf);
if (git_hash_final(&pb->pack_oid, &pb->ctx) < 0) if ((error = git_hash_final(&pb->pack_oid, &pb->ctx)) < 0)
goto on_error; goto done;
return cb(pb->pack_oid.id, GIT_OID_RAWSZ, data); error = cb(pb->pack_oid.id, GIT_OID_RAWSZ, data);
on_error: done:
git__free(write_order); git__free(write_order);
git_buf_free(&buf); git_buf_free(&buf);
return -1; return error;
} }
static int write_pack_buf(void *buf, size_t size, void *data) static int write_pack_buf(void *buf, size_t size, void *data)
......
...@@ -582,7 +582,7 @@ static int calculate_work(git_push *push) ...@@ -582,7 +582,7 @@ static int calculate_work(git_push *push)
static int do_push(git_push *push) static int do_push(git_push *push)
{ {
int error; int error = 0;
git_transport *transport = push->remote->transport; git_transport *transport = push->remote->transport;
if (!transport->push) { if (!transport->push) {
...@@ -611,8 +611,6 @@ static int do_push(git_push *push) ...@@ -611,8 +611,6 @@ static int do_push(git_push *push)
(error = transport->push(transport, push)) < 0) (error = transport->push(transport, push)) < 0)
goto on_error; goto on_error;
error = 0;
on_error: on_error:
git_packbuilder_free(push->pb); git_packbuilder_free(push->pb);
return error; return error;
......
...@@ -23,8 +23,13 @@ static int git_smart__recv_cb(gitno_buffer *buf) ...@@ -23,8 +23,13 @@ static int git_smart__recv_cb(gitno_buffer *buf)
buf->offset += bytes_read; buf->offset += bytes_read;
if (t->packetsize_cb) if (t->packetsize_cb && !t->cancelled.val)
t->packetsize_cb(bytes_read, t->packetsize_payload); if (t->packetsize_cb(bytes_read, t->packetsize_payload)) {
git_atomic_set(&t->cancelled, 1);
giterr_clear();
return GIT_EUSER;
}
return (int)(buf->offset - old_len); return (int)(buf->offset - old_len);
} }
......
...@@ -119,7 +119,7 @@ typedef struct transport_smart_caps { ...@@ -119,7 +119,7 @@ typedef struct transport_smart_caps {
report_status:1; report_status:1;
} transport_smart_caps; } transport_smart_caps;
typedef void (*packetsize_cb)(size_t received, void *payload); typedef int (*packetsize_cb)(size_t received, void *payload);
typedef struct { typedef struct {
git_transport parent; git_transport parent;
......
...@@ -425,7 +425,7 @@ struct network_packetsize_payload ...@@ -425,7 +425,7 @@ struct network_packetsize_payload
size_t last_fired_bytes; size_t last_fired_bytes;
}; };
static void network_packetsize(size_t received, void *payload) static int network_packetsize(size_t received, void *payload)
{ {
struct network_packetsize_payload *npp = (struct network_packetsize_payload*)payload; struct network_packetsize_payload *npp = (struct network_packetsize_payload*)payload;
...@@ -435,8 +435,12 @@ static void network_packetsize(size_t received, void *payload) ...@@ -435,8 +435,12 @@ static void network_packetsize(size_t received, void *payload)
/* Fire notification if the threshold is reached */ /* Fire notification if the threshold is reached */
if ((npp->stats->received_bytes - npp->last_fired_bytes) > NETWORK_XFER_THRESHOLD) { if ((npp->stats->received_bytes - npp->last_fired_bytes) > NETWORK_XFER_THRESHOLD) {
npp->last_fired_bytes = npp->stats->received_bytes; npp->last_fired_bytes = npp->stats->received_bytes;
npp->callback(npp->stats, npp->payload);
if(npp->callback(npp->stats, npp->payload))
return GIT_EUSER;
} }
return 0;
} }
int git_smart__download_pack( int git_smart__download_pack(
...@@ -450,7 +454,7 @@ int git_smart__download_pack( ...@@ -450,7 +454,7 @@ int git_smart__download_pack(
gitno_buffer *buf = &t->buffer; gitno_buffer *buf = &t->buffer;
git_odb *odb; git_odb *odb;
struct git_odb_writepack *writepack = NULL; struct git_odb_writepack *writepack = NULL;
int error = -1; int error = 0;
struct network_packetsize_payload npp = {0}; struct network_packetsize_payload npp = {0};
memset(stats, 0, sizeof(git_transfer_progress)); memset(stats, 0, sizeof(git_transfer_progress));
...@@ -463,13 +467,14 @@ int git_smart__download_pack( ...@@ -463,13 +467,14 @@ int git_smart__download_pack(
t->packetsize_payload = &npp; t->packetsize_payload = &npp;
/* We might have something in the buffer already from negotiate_fetch */ /* We might have something in the buffer already from negotiate_fetch */
if (t->buffer.offset > 0) if (t->buffer.offset > 0 && !t->cancelled.val)
t->packetsize_cb(t->buffer.offset, t->packetsize_payload); if(t->packetsize_cb(t->buffer.offset, t->packetsize_payload))
git_atomic_set(&t->cancelled, 1);
} }
if ((error = git_repository_odb__weakptr(&odb, repo)) < 0 || if ((error = git_repository_odb__weakptr(&odb, repo)) < 0 ||
((error = git_odb_write_pack(&writepack, odb, progress_cb, progress_payload)) < 0)) ((error = git_odb_write_pack(&writepack, odb, progress_cb, progress_payload)) < 0))
goto on_error; goto done;
/* /*
* If the remote doesn't support the side-band, we can feed * If the remote doesn't support the side-band, we can feed
...@@ -477,23 +482,29 @@ int git_smart__download_pack( ...@@ -477,23 +482,29 @@ int git_smart__download_pack(
* check which one belongs there. * check which one belongs there.
*/ */
if (!t->caps.side_band && !t->caps.side_band_64k) { if (!t->caps.side_band && !t->caps.side_band_64k) {
if (no_sideband(t, writepack, buf, stats) < 0) error = no_sideband(t, writepack, buf, stats);
goto on_error; goto done;
goto on_success;
} }
do { do {
git_pkt *pkt; git_pkt *pkt;
/* Check cancellation before network call */
if (t->cancelled.val) { if (t->cancelled.val) {
giterr_set(GITERR_NET, "The fetch was cancelled by the user"); giterr_set(GITERR_NET, "The fetch was cancelled by the user");
error = GIT_EUSER; error = GIT_EUSER;
goto on_error; goto done;
} }
if (recv_pkt(&pkt, buf) < 0) if ((error = recv_pkt(&pkt, buf)) < 0)
goto on_error; goto done;
/* Check cancellation after network call */
if (t->cancelled.val) {
giterr_set(GITERR_NET, "The fetch was cancelled by the user");
error = GIT_EUSER;
goto done;
}
if (pkt->type == GIT_PKT_PROGRESS) { if (pkt->type == GIT_PKT_PROGRESS) {
if (t->progress_cb) { if (t->progress_cb) {
...@@ -507,7 +518,7 @@ int git_smart__download_pack( ...@@ -507,7 +518,7 @@ int git_smart__download_pack(
git__free(pkt); git__free(pkt);
if (error < 0) if (error < 0)
goto on_error; goto done;
} else if (pkt->type == GIT_PKT_FLUSH) { } else if (pkt->type == GIT_PKT_FLUSH) {
/* A flush indicates the end of the packfile */ /* A flush indicates the end of the packfile */
git__free(pkt); git__free(pkt);
...@@ -515,13 +526,10 @@ int git_smart__download_pack( ...@@ -515,13 +526,10 @@ int git_smart__download_pack(
} }
} while (1); } while (1);
if (writepack->commit(writepack, stats) < 0) if ((error = writepack->commit(writepack, stats)) < 0)
goto on_error; goto done;
on_success:
error = 0;
on_error: done:
if (writepack) if (writepack)
writepack->free(writepack); writepack->free(writepack);
...@@ -828,7 +836,10 @@ static int stream_thunk(void *buf, size_t size, void *data) ...@@ -828,7 +836,10 @@ static int stream_thunk(void *buf, size_t size, void *data)
if ((current_time - payload->last_progress_report_time) >= MIN_PROGRESS_UPDATE_INTERVAL) { if ((current_time - payload->last_progress_report_time) >= MIN_PROGRESS_UPDATE_INTERVAL) {
payload->last_progress_report_time = current_time; payload->last_progress_report_time = current_time;
payload->cb(payload->pb->nr_written, payload->pb->nr_objects, payload->last_bytes, payload->cb_payload); if(payload->cb(payload->pb->nr_written, payload->pb->nr_objects, payload->last_bytes, payload->cb_payload)) {
giterr_clear();
error = GIT_EUSER;
}
} }
} }
...@@ -840,7 +851,7 @@ int git_smart__push(git_transport *transport, git_push *push) ...@@ -840,7 +851,7 @@ int git_smart__push(git_transport *transport, git_push *push)
transport_smart *t = (transport_smart *)transport; transport_smart *t = (transport_smart *)transport;
struct push_packbuilder_payload packbuilder_payload = {0}; struct push_packbuilder_payload packbuilder_payload = {0};
git_buf pktline = GIT_BUF_INIT; git_buf pktline = GIT_BUF_INIT;
int error = -1, need_pack = 0; int error = 0, need_pack = 0;
push_spec *spec; push_spec *spec;
unsigned int i; unsigned int i;
...@@ -882,20 +893,21 @@ int git_smart__push(git_transport *transport, git_push *push) ...@@ -882,20 +893,21 @@ int git_smart__push(git_transport *transport, git_push *push)
} }
} }
if (git_smart__get_push_stream(t, &packbuilder_payload.stream) < 0 || if ((error = git_smart__get_push_stream(t, &packbuilder_payload.stream)) < 0 ||
gen_pktline(&pktline, push) < 0 || (error = gen_pktline(&pktline, push)) < 0 ||
packbuilder_payload.stream->write(packbuilder_payload.stream, git_buf_cstr(&pktline), git_buf_len(&pktline)) < 0) (error = packbuilder_payload.stream->write(packbuilder_payload.stream, git_buf_cstr(&pktline), git_buf_len(&pktline))) < 0)
goto on_error; goto done;
if (need_pack && git_packbuilder_foreach(push->pb, &stream_thunk, &packbuilder_payload) < 0) if (need_pack &&
goto on_error; (error = git_packbuilder_foreach(push->pb, &stream_thunk, &packbuilder_payload)) < 0)
goto done;
/* If we sent nothing or the server doesn't support report-status, then /* If we sent nothing or the server doesn't support report-status, then
* we consider the pack to have been unpacked successfully */ * we consider the pack to have been unpacked successfully */
if (!push->specs.length || !push->report_status) if (!push->specs.length || !push->report_status)
push->unpack_ok = 1; push->unpack_ok = 1;
else if (parse_report(&t->buffer, push) < 0) else if ((error = parse_report(&t->buffer, push)) < 0)
goto on_error; goto done;
/* If progress is being reported write the final report */ /* If progress is being reported write the final report */
if (push->transfer_progress_cb) { if (push->transfer_progress_cb) {
...@@ -903,13 +915,10 @@ int git_smart__push(git_transport *transport, git_push *push) ...@@ -903,13 +915,10 @@ int git_smart__push(git_transport *transport, git_push *push)
} }
if (push->status.length && if (push->status.length &&
update_refs_from_report(&t->refs, &push->specs, &push->status) < 0) (error = update_refs_from_report(&t->refs, &push->specs, &push->status)) < 0)
goto on_error; goto done;
error = 0; done:
on_error:
git_buf_free(&pktline); git_buf_free(&pktline);
return error; return error;
} }
...@@ -349,16 +349,18 @@ void test_online_push__cleanup(void) ...@@ -349,16 +349,18 @@ void test_online_push__cleanup(void)
cl_git_sandbox_cleanup(); cl_git_sandbox_cleanup();
} }
static void push_pack_progress_cb(int stage, unsigned int current, unsigned int total, void* payload) static int push_pack_progress_cb(int stage, unsigned int current, unsigned int total, void* payload)
{ {
int *was_called = (int *) payload; int *was_called = (int *) payload;
*was_called = 1; *was_called = 1;
return 0;
} }
static void push_transfer_progress_cb(unsigned int current, unsigned int total, size_t bytes, void* payload) static int push_transfer_progress_cb(unsigned int current, unsigned int total, size_t bytes, void* payload)
{ {
int *was_called = (int *) payload; int *was_called = (int *) payload;
*was_called = 1; *was_called = 1;
return 0;
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment