Commit b7e8827b by Vicent Martí

Merge pull request #895 from carlosmn/sideband

Add sideband support
parents 09fad506 0a1db746
...@@ -14,6 +14,13 @@ struct dl_data { ...@@ -14,6 +14,13 @@ struct dl_data {
int finished; int finished;
}; };
static void progress_cb(const char *str, int len, void *data)
{
data = data;
printf("remote: %.*s", len, str);
fflush(stdout); /* We don't have the \n to force the flush */
}
static void *download(void *ptr) static void *download(void *ptr)
{ {
struct dl_data *data = (struct dl_data *)ptr; struct dl_data *data = (struct dl_data *)ptr;
...@@ -43,6 +50,7 @@ exit: ...@@ -43,6 +50,7 @@ exit:
static int update_cb(const char *refname, const git_oid *a, const git_oid *b, void *data) static int update_cb(const char *refname, const git_oid *a, const git_oid *b, void *data)
{ {
char a_str[GIT_OID_HEXSZ+1], b_str[GIT_OID_HEXSZ+1]; char a_str[GIT_OID_HEXSZ+1], b_str[GIT_OID_HEXSZ+1];
data = data;
git_oid_fmt(b_str, b); git_oid_fmt(b_str, b);
b_str[GIT_OID_HEXSZ] = '\0'; b_str[GIT_OID_HEXSZ] = '\0';
...@@ -78,6 +86,7 @@ int fetch(git_repository *repo, int argc, char **argv) ...@@ -78,6 +86,7 @@ int fetch(git_repository *repo, int argc, char **argv)
// Set up the callbacks (only update_tips for now) // Set up the callbacks (only update_tips for now)
memset(&callbacks, 0, sizeof(callbacks)); memset(&callbacks, 0, sizeof(callbacks));
callbacks.update_tips = &update_cb; callbacks.update_tips = &update_cb;
callbacks.progress = &progress_cb;
git_remote_set_callbacks(remote, &callbacks); git_remote_set_callbacks(remote, &callbacks);
// Set up the information for the background worker thread // Set up the information for the background worker thread
...@@ -96,7 +105,10 @@ int fetch(git_repository *repo, int argc, char **argv) ...@@ -96,7 +105,10 @@ int fetch(git_repository *repo, int argc, char **argv)
// the download rate. // the download rate.
do { do {
usleep(10000); usleep(10000);
printf("\rReceived %d/%d objects in %zu bytes", stats.processed, stats.total, bytes);
if (stats.total > 0)
printf("Received %d/%d objects (%d) in %d bytes\r",
stats.received, stats.total, stats.processed, bytes);
} while (!data.finished); } while (!data.finished);
if (data.ret < 0) if (data.ret < 0)
......
...@@ -19,6 +19,8 @@ GIT_BEGIN_DECL ...@@ -19,6 +19,8 @@ GIT_BEGIN_DECL
typedef struct git_indexer_stats { typedef struct git_indexer_stats {
unsigned int total; unsigned int total;
unsigned int processed; unsigned int processed;
unsigned int received;
unsigned int data_received;
} git_indexer_stats; } git_indexer_stats;
......
...@@ -287,7 +287,7 @@ typedef enum git_remote_completion_type { ...@@ -287,7 +287,7 @@ typedef enum git_remote_completion_type {
* Set the calbacks to be called by the remote. * Set the calbacks to be called by the remote.
*/ */
struct git_remote_callbacks { struct git_remote_callbacks {
int (*progress)(const char *str, void *data); void (*progress)(const char *str, int len, void *data);
int (*completion)(git_remote_completion_type type, void *data); int (*completion)(git_remote_completion_type type, void *data);
int (*update_tips)(const char *refname, const git_oid *a, const git_oid *b, void *data); int (*update_tips)(const char *refname, const git_oid *a, const git_oid *b, void *data);
void *data; void *data;
......
...@@ -292,6 +292,31 @@ int git_fetch_download_pack(git_remote *remote, git_off_t *bytes, git_indexer_st ...@@ -292,6 +292,31 @@ int git_fetch_download_pack(git_remote *remote, git_off_t *bytes, git_indexer_st
} }
static int no_sideband(git_indexer_stream *idx, gitno_buffer *buf, git_off_t *bytes, git_indexer_stats *stats)
{
int recvd;
do {
if (git_indexer_stream_add(idx, buf->data, buf->offset, stats) < 0)
return -1;
gitno_consume_n(buf, buf->offset);
if ((recvd = gitno_recv(buf)) < 0)
return -1;
*bytes += recvd;
} while(recvd > 0 && stats->data_received);
if (!stats->data_received)
giterr_set(GITERR_NET, "Early EOF while downloading packfile");
if (git_indexer_stream_finalize(idx, stats))
return -1;
return 0;
}
/* Receiving data from a socket and storing it is pretty much the same for git and HTTP */ /* Receiving data from a socket and storing it is pretty much the same for git and HTTP */
int git_fetch__download_pack( int git_fetch__download_pack(
git_transport *t, git_transport *t,
...@@ -299,7 +324,6 @@ int git_fetch__download_pack( ...@@ -299,7 +324,6 @@ int git_fetch__download_pack(
git_off_t *bytes, git_off_t *bytes,
git_indexer_stats *stats) git_indexer_stats *stats)
{ {
int recvd;
git_buf path = GIT_BUF_INIT; git_buf path = GIT_BUF_INIT;
gitno_buffer *buf = &t->buffer; gitno_buffer *buf = &t->buffer;
git_indexer_stream *idx = NULL; git_indexer_stream *idx = NULL;
...@@ -314,20 +338,49 @@ int git_fetch__download_pack( ...@@ -314,20 +338,49 @@ int git_fetch__download_pack(
memset(stats, 0, sizeof(git_indexer_stats)); memset(stats, 0, sizeof(git_indexer_stats));
*bytes = 0; *bytes = 0;
do { /*
if (git_indexer_stream_add(idx, buf->data, buf->offset, stats) < 0) * If the remote doesn't support the side-band, we can feed
* the data directly to the indexer. Otherwise, we need to
* check which one belongs there.
*/
if (!t->caps.side_band && !t->caps.side_band_64k) {
if (no_sideband(idx, buf, bytes, stats) < 0)
goto on_error; goto on_error;
gitno_consume_n(buf, buf->offset); git_indexer_stream_free(idx);
return 0;
}
if ((recvd = gitno_recv(buf)) < 0) do {
git_pkt *pkt;
if (recv_pkt(&pkt, buf) < 0)
goto on_error; goto on_error;
*bytes += recvd; if (pkt->type == GIT_PKT_PROGRESS) {
} while(recvd > 0); if (t->progress_cb) {
git_pkt_progress *p = (git_pkt_progress *) pkt;
t->progress_cb(p->data, p->len, t->cb_data);
}
git__free(pkt);
} else if (pkt->type == GIT_PKT_DATA) {
git_pkt_data *p = (git_pkt_data *) pkt;
*bytes += p->len;
if (git_indexer_stream_add(idx, p->data, p->len, stats) < 0)
goto on_error;
git__free(pkt);
} else if (pkt->type == GIT_PKT_FLUSH) {
/* A flush indicates the end of the packfile */
git__free(pkt);
break;
}
} while (!stats->data_received);
if (!stats->data_received)
giterr_set(GITERR_NET, "Early EOF while downloading packfile");
if (git_indexer_stream_finalize(idx, stats)) if (git_indexer_stream_finalize(idx, stats))
goto on_error; return -1;
git_indexer_stream_free(idx); git_indexer_stream_free(idx);
return 0; return 0;
......
...@@ -324,8 +324,8 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz ...@@ -324,8 +324,8 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz
if (git_vector_init(&idx->deltas, (unsigned int)(idx->nr_objects / 2), NULL) < 0) if (git_vector_init(&idx->deltas, (unsigned int)(idx->nr_objects / 2), NULL) < 0)
return -1; return -1;
memset(stats, 0, sizeof(git_indexer_stats));
stats->total = (unsigned int)idx->nr_objects; stats->total = (unsigned int)idx->nr_objects;
stats->processed = 0;
} }
/* Now that we have data in the pack, let's try to parse it */ /* Now that we have data in the pack, let's try to parse it */
...@@ -361,6 +361,7 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz ...@@ -361,6 +361,7 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz
if (error < 0) if (error < 0)
return error; return error;
stats->received++;
continue; continue;
} }
...@@ -379,8 +380,17 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz ...@@ -379,8 +380,17 @@ int git_indexer_stream_add(git_indexer_stream *idx, const void *data, size_t siz
git__free(obj.data); git__free(obj.data);
stats->processed = (unsigned int)++processed; stats->processed = (unsigned int)++processed;
stats->received++;
} }
/*
* If we've received all of the objects and our packfile is
* one hash beyond the end of the last object, all of the
* packfile is here.
*/
if (stats->received == idx->nr_objects && idx->pack->mwf.size >= idx->off + 20)
stats->data_received = 1;
return 0; return 0;
on_error: on_error:
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "netops.h" #include "netops.h"
#include "posix.h" #include "posix.h"
#include "buffer.h" #include "buffer.h"
#include "protocol.h"
#include <ctype.h> #include <ctype.h>
...@@ -130,6 +131,42 @@ static int err_pkt(git_pkt **out, const char *line, size_t len) ...@@ -130,6 +131,42 @@ static int err_pkt(git_pkt **out, const char *line, size_t len)
return 0; return 0;
} }
static int data_pkt(git_pkt **out, const char *line, size_t len)
{
git_pkt_data *pkt;
line++;
len--;
pkt = git__malloc(sizeof(git_pkt_data) + len);
GITERR_CHECK_ALLOC(pkt);
pkt->type = GIT_PKT_DATA;
pkt->len = (int) len;
memcpy(pkt->data, line, len);
*out = (git_pkt *) pkt;
return 0;
}
static int progress_pkt(git_pkt **out, const char *line, size_t len)
{
git_pkt_progress *pkt;
line++;
len--;
pkt = git__malloc(sizeof(git_pkt_progress) + len);
GITERR_CHECK_ALLOC(pkt);
pkt->type = GIT_PKT_PROGRESS;
pkt->len = (int) len;
memcpy(pkt->data, line, len);
*out = (git_pkt *) pkt;
return 0;
}
/* /*
* Parse an other-ref line. * Parse an other-ref line.
*/ */
...@@ -263,8 +300,11 @@ int git_pkt_parse_line( ...@@ -263,8 +300,11 @@ int git_pkt_parse_line(
len -= PKT_LEN_SIZE; /* the encoded length includes its own size */ len -= PKT_LEN_SIZE; /* the encoded length includes its own size */
/* Assming the minimal size is actually 4 */ if (*line == GIT_SIDE_BAND_DATA)
if (!git__prefixcmp(line, "ACK")) ret = data_pkt(head, line, len);
else if (*line == GIT_SIDE_BAND_PROGRESS)
ret = progress_pkt(head, line, len);
else if (!git__prefixcmp(line, "ACK"))
ret = ack_pkt(head, line, len); ret = ack_pkt(head, line, len);
else if (!git__prefixcmp(line, "NAK")) else if (!git__prefixcmp(line, "NAK"))
ret = nak_pkt(head); ret = nak_pkt(head);
...@@ -301,6 +341,13 @@ static int buffer_want_with_caps(git_remote_head *head, git_transport_caps *caps ...@@ -301,6 +341,13 @@ static int buffer_want_with_caps(git_remote_head *head, git_transport_caps *caps
char oid[GIT_OID_HEXSZ +1] = {0}; char oid[GIT_OID_HEXSZ +1] = {0};
unsigned int len; unsigned int len;
/* Prefer side-band-64k if the server supports both */
if (caps->side_band) {
if (caps->side_band_64k)
git_buf_printf(&str, "%s ", GIT_CAP_SIDE_BAND_64K);
else
git_buf_printf(&str, "%s ", GIT_CAP_SIDE_BAND);
}
if (caps->ofs_delta) if (caps->ofs_delta)
git_buf_puts(&str, GIT_CAP_OFS_DELTA " "); git_buf_puts(&str, GIT_CAP_OFS_DELTA " ");
......
...@@ -24,6 +24,8 @@ enum git_pkt_type { ...@@ -24,6 +24,8 @@ enum git_pkt_type {
GIT_PKT_PACK, GIT_PKT_PACK,
GIT_PKT_COMMENT, GIT_PKT_COMMENT,
GIT_PKT_ERR, GIT_PKT_ERR,
GIT_PKT_DATA,
GIT_PKT_PROGRESS,
}; };
/* Used for multi-ack */ /* Used for multi-ack */
...@@ -67,6 +69,14 @@ typedef struct { ...@@ -67,6 +69,14 @@ typedef struct {
typedef struct { typedef struct {
enum git_pkt_type type; enum git_pkt_type type;
int len;
char data[GIT_FLEX_ARRAY];
} git_pkt_data;
typedef git_pkt_data git_pkt_progress;
typedef struct {
enum git_pkt_type type;
char error[GIT_FLEX_ARRAY]; char error[GIT_FLEX_ARRAY];
} git_pkt_err; } git_pkt_err;
......
...@@ -80,6 +80,20 @@ int git_protocol_detect_caps(git_pkt_ref *pkt, git_transport_caps *caps) ...@@ -80,6 +80,20 @@ int git_protocol_detect_caps(git_pkt_ref *pkt, git_transport_caps *caps)
continue; continue;
} }
/* Keep side-band check after side-band-64k */
if(!git__prefixcmp(ptr, GIT_CAP_SIDE_BAND_64K)) {
caps->common = caps->side_band_64k = 1;
ptr += strlen(GIT_CAP_SIDE_BAND_64K);
continue;
}
if(!git__prefixcmp(ptr, GIT_CAP_SIDE_BAND)) {
caps->common = caps->side_band = 1;
ptr += strlen(GIT_CAP_SIDE_BAND);
continue;
}
/* We don't know this capability, so skip it */ /* We don't know this capability, so skip it */
ptr = strchr(ptr, ' '); ptr = strchr(ptr, ' ');
} }
......
...@@ -14,4 +14,8 @@ ...@@ -14,4 +14,8 @@
int git_protocol_store_refs(git_transport *t, int flushes); int git_protocol_store_refs(git_transport *t, int flushes);
int git_protocol_detect_caps(git_pkt_ref *pkt, git_transport_caps *caps); int git_protocol_detect_caps(git_pkt_ref *pkt, git_transport_caps *caps);
#define GIT_SIDE_BAND_DATA 1
#define GIT_SIDE_BAND_PROGRESS 2
#define GIT_SIDE_BAND_ERROR 3
#endif #endif
...@@ -386,6 +386,9 @@ int git_remote_connect(git_remote *remote, int direction) ...@@ -386,6 +386,9 @@ int git_remote_connect(git_remote *remote, int direction)
if (git_transport_new(&t, url) < 0) if (git_transport_new(&t, url) < 0)
return -1; return -1;
t->progress_cb = remote->callbacks.progress;
t->cb_data = remote->callbacks.data;
t->check_cert = remote->check_cert; t->check_cert = remote->check_cert;
if (t->connect(t, direction) < 0) { if (t->connect(t, direction) < 0) {
goto on_error; goto on_error;
...@@ -646,4 +649,9 @@ void git_remote_set_callbacks(git_remote *remote, git_remote_callbacks *callback ...@@ -646,4 +649,9 @@ void git_remote_set_callbacks(git_remote *remote, git_remote_callbacks *callback
assert(remote && callbacks); assert(remote && callbacks);
memcpy(&remote->callbacks, callbacks, sizeof(git_remote_callbacks)); memcpy(&remote->callbacks, callbacks, sizeof(git_remote_callbacks));
if (remote->transport) {
remote->transport->progress_cb = remote->callbacks.progress;
remote->transport->cb_data = remote->callbacks.data;
}
} }
...@@ -21,11 +21,15 @@ ...@@ -21,11 +21,15 @@
#define GIT_CAP_OFS_DELTA "ofs-delta" #define GIT_CAP_OFS_DELTA "ofs-delta"
#define GIT_CAP_MULTI_ACK "multi_ack" #define GIT_CAP_MULTI_ACK "multi_ack"
#define GIT_CAP_SIDE_BAND "side-band"
#define GIT_CAP_SIDE_BAND_64K "side-band-64k"
typedef struct git_transport_caps { typedef struct git_transport_caps {
int common:1, int common:1,
ofs_delta:1, ofs_delta:1,
multi_ack: 1; multi_ack: 1,
side_band:1,
side_band_64k:1;
} git_transport_caps; } git_transport_caps;
#ifdef GIT_SSL #ifdef GIT_SSL
...@@ -84,6 +88,7 @@ struct git_transport { ...@@ -84,6 +88,7 @@ struct git_transport {
gitno_buffer buffer; gitno_buffer buffer;
GIT_SOCKET socket; GIT_SOCKET socket;
git_transport_caps caps; git_transport_caps caps;
void *cb_data;
/** /**
* Connect and store the remote heads * Connect and store the remote heads
*/ */
...@@ -113,6 +118,11 @@ struct git_transport { ...@@ -113,6 +118,11 @@ struct git_transport {
* Free the associated resources * Free the associated resources
*/ */
void (*free)(struct git_transport *transport); void (*free)(struct git_transport *transport);
/**
* Callbacks for the progress and error output
*/
void (*progress_cb)(const char *str, int len, void *data);
void (*error_cb)(const char *str, int len, void *data);
}; };
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
typedef struct { typedef struct {
git_transport parent; git_transport parent;
char buff[1024]; char buff[65536];
#ifdef GIT_WIN32 #ifdef GIT_WIN32
WSADATA wsd; WSADATA wsd;
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment