Commit 8ef246e0 authored by Wayne Davison's avatar Wayne Davison

- Handle the new incremental-recursion mode.

- Changed some function names to make them more consistent.
parent c7e6f84f
......@@ -41,7 +41,10 @@ extern int am_server;
extern int am_daemon;
extern int am_sender;
extern int am_generator;
extern int incremental;
extern int io_error;
extern int eol_nulls;
extern int flist_eof;
extern int read_batch;
extern int csum_length;
extern int checksum_seed;
......@@ -50,12 +53,12 @@ extern int remove_source_files;
extern int preserve_hard_links;
extern char *filesfrom_host;
extern struct stats stats;
extern struct file_list *the_file_list;
extern struct file_list *cur_flist, *first_flist;
const char phase_unknown[] = "unknown";
int ignore_timeout = 0;
int batch_fd = -1;
int batch_gen_fd = -1;
int done_cnt = 0;
/* Ignore an EOF error if non-zero. See whine_about_eof(). */
int kluge_around_eof = 0;
......@@ -65,6 +68,18 @@ int msg_fd_out = -1;
int sock_f_in = -1;
int sock_f_out = -1;
static int iobuf_f_in = -1;
static char *iobuf_in;
static size_t iobuf_in_siz;
static size_t iobuf_in_ndx;
static size_t iobuf_in_remaining;
static int iobuf_f_out = -1;
static char *iobuf_out;
static int iobuf_out_cnt;
int flist_forward_from = -1;
static int io_multiplexing_out;
static int io_multiplexing_in;
static time_t last_io_in;
......@@ -92,7 +107,11 @@ static char int_byte_cnt[64] = {
5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */
};
static void read_loop(int fd, char *buf, size_t len);
static int readfd_unbuffered(int fd, char *buf, size_t len);
static void writefd(int fd, const char *buf, size_t len);
static void writefd_unbuffered(int fd, const char *buf, size_t len);
static void decrement_active_files(int ndx);
static void decrement_flist_in_progress(int ndx, int redo);
struct flist_ndx_item {
struct flist_ndx_item *next;
......@@ -237,6 +256,7 @@ static void read_msg_fd(void)
{
char buf[2048];
size_t n;
struct file_list *flist;
int fd = msg_fd_in;
int tag, len;
......@@ -244,7 +264,7 @@ static void read_msg_fd(void)
* to this routine from writefd_unbuffered(). */
msg_fd_in = -1;
read_loop(fd, buf, 4);
readfd_unbuffered(fd, buf, 4);
tag = IVAL(buf, 0);
len = tag & 0xFFFFFF;
......@@ -253,50 +273,71 @@ static void read_msg_fd(void)
switch (tag) {
case MSG_DONE:
if (len != 0 || !am_generator) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
invalid_msg:
rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
tag, len, who_am_i(),
incremental ? "/incremental" : "");
exit_cleanup(RERR_STREAMIO);
}
flist_ndx_push(&redo_list, -1);
done_cnt++;
break;
case MSG_REDO:
if (len != 4 || !am_generator) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
read_loop(fd, buf, 4);
if (len != 4 || !am_generator)
goto invalid_msg;
readfd_unbuffered(fd, buf, 4);
if (remove_source_files)
decrement_active_files(IVAL(buf,0));
flist_ndx_push(&redo_list, IVAL(buf,0));
if (incremental)
decrement_flist_in_progress(IVAL(buf,0), 1);
break;
case MSG_FLIST:
if (len != 4 || !am_generator || !incremental)
goto invalid_msg;
readfd_unbuffered(fd, buf, 4);
/* Read extra file list from receiver. */
assert(iobuf_in != NULL);
assert(iobuf_f_in == fd);
flist = recv_file_list(fd);
flist->parent_ndx = IVAL(buf,0);
break;
case MSG_FLIST_EOF:
if (len != 0 || !am_generator || !incremental)
goto invalid_msg;
flist_eof = 1;
break;
case MSG_DELETED:
if (len >= (int)sizeof buf || !am_generator) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
read_loop(fd, buf, len);
if (len >= (int)sizeof buf || !am_generator)
goto invalid_msg;
readfd_unbuffered(fd, buf, len);
send_msg(MSG_DELETED, buf, len);
break;
case MSG_SUCCESS:
if (len != 4 || !am_generator) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
read_loop(fd, buf, len);
if (len != 4 || !am_generator)
goto invalid_msg;
readfd_unbuffered(fd, buf, len);
if (remove_source_files) {
decrement_active_files(IVAL(buf,0));
send_msg(MSG_SUCCESS, buf, len);
}
if (preserve_hard_links)
flist_ndx_push(&hlink_list, IVAL(buf,0));
if (incremental)
decrement_flist_in_progress(IVAL(buf,0), 0);
break;
case MSG_NO_SEND:
if (len != 4 || !am_generator)
goto invalid_msg;
readfd_unbuffered(fd, buf, len);
if (incremental)
decrement_flist_in_progress(IVAL(buf,0), 0);
break;
case MSG_SOCKERR:
case MSG_CLIENT:
if (!am_generator) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
if (!am_generator)
goto invalid_msg;
if (tag == MSG_SOCKERR)
close_multiplexing_out();
io_end_multiplex_out();
/* FALL THROUGH */
case MSG_INFO:
case MSG_ERROR:
......@@ -305,7 +346,7 @@ static void read_msg_fd(void)
n = len;
if (n >= sizeof buf)
n = sizeof buf - 1;
read_loop(fd, buf, n);
readfd_unbuffered(fd, buf, n);
rwrite((enum logcode)tag, buf, n);
len -= n;
}
......@@ -334,51 +375,65 @@ void increment_active_files(int ndx, int itemizing, enum logcode code)
}
active_filecnt++;
active_bytecnt += F_LENGTH(the_file_list->files[ndx]);
active_bytecnt += F_LENGTH(cur_flist->files[ndx]);
}
void decrement_active_files(int ndx)
static void decrement_active_files(int ndx)
{
struct file_list *flist = flist_for_ndx(ndx);
assert(flist != NULL);
active_filecnt--;
active_bytecnt -= F_LENGTH(the_file_list->files[ndx]);
active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
}
static void decrement_flist_in_progress(int ndx, int redo)
{
struct file_list *flist = cur_flist ? cur_flist : first_flist;
while (ndx < flist->ndx_start) {
if (flist == first_flist) {
invalid_ndx:
rprintf(FERROR,
"Invalid file index: %d (%d - %d) [%s]\n",
ndx, first_flist->ndx_start,
first_flist->prev->ndx_start + first_flist->prev->count - 1,
who_am_i());
exit_cleanup(RERR_PROTOCOL);
}
flist = flist->prev;
}
while (ndx >= flist->ndx_start + flist->count) {
if (!(flist = flist->next))
goto invalid_ndx;
}
flist->in_progress--;
if (redo)
flist->to_redo++;
}
/* Try to push messages off the list onto the wire. If we leave with more
* to do, return 0. On error, return -1. If everything flushed, return 1.
* This is only active in the receiver. */
static int msg2genr_flush(int flush_it_all)
static int msg2genr_flush(void)
{
static int written = 0;
struct timeval tv;
fd_set fds;
if (msg_fd_out < 0)
if (msg_fd_out < 0 || no_flush)
return -1;
no_flush++;
while (msg2genr.head) {
struct msg_list_item *m = msg2genr.head;
int n = write(msg_fd_out, m->buf + written, m->len - written);
if (n < 0) {
if (errno == EINTR)
continue;
if (errno != EWOULDBLOCK && errno != EAGAIN)
return -1;
if (!flush_it_all)
return 0;
FD_ZERO(&fds);
FD_SET(msg_fd_out, &fds);
tv.tv_sec = select_timeout;
tv.tv_usec = 0;
if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
check_timeout();
} else if ((written += n) == m->len) {
msg2genr.head = m->next;
if (!msg2genr.head)
msg2genr.tail = NULL;
free(m);
written = 0;
}
writefd(msg_fd_out, m->buf, m->len);
msg2genr.head = m->next;
if (!msg2genr.head)
msg2genr.tail = NULL;
free(m);
}
if (iobuf_out_cnt) {
writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
iobuf_out_cnt = 0;
}
no_flush--;
return 1;
}
......@@ -393,7 +448,7 @@ int send_msg(enum msgcode code, const char *buf, int len)
return 1;
}
msg_list_add(&msg2genr, code, buf, len);
msg2genr_flush(NORMAL_FLUSH);
msg2genr_flush();
return 1;
}
......@@ -404,18 +459,13 @@ void send_msg_int(enum msgcode code, int num)
send_msg(code, numbuf, 4);
}
int get_redo_num(int itemizing, enum logcode code)
void wait_for_receiver(void)
{
while (1) {
#ifdef SUPPORT_HARD_LINKS
if (hlink_list.head)
check_for_finished_hlinks(itemizing, code);
#endif
if (redo_list.head)
break;
read_msg_fd();
}
read_msg_fd();
}
int get_redo_num(void)
{
return flist_ndx_pop(&redo_list);
}
......@@ -538,7 +588,7 @@ static int read_timeout(int fd, char *buf, size_t len)
}
if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
msg2genr_flush(NORMAL_FLUSH);
msg2genr_flush();
if (io_filesfrom_f_out >= 0) {
if (io_filesfrom_buflen) {
......@@ -617,7 +667,7 @@ static int read_timeout(int fd, char *buf, size_t len)
/* Don't write errors on a dead socket. */
if (fd == sock_f_in) {
close_multiplexing_out();
io_end_multiplex_out();
rsyserr(FSOCKERR, errno, "read error");
} else
rsyserr(FERROR, errno, "read error");
......@@ -688,37 +738,51 @@ int read_filesfrom_line(int fd, char *fname)
return s - fname;
}
static char *iobuf_out;
static int iobuf_out_cnt;
void io_start_buffering_out(void)
int io_start_buffering_out(int f_out)
{
if (iobuf_out)
return;
if (iobuf_out) {
assert(f_out == iobuf_f_out);
return 0;
}
if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
out_of_memory("io_start_buffering_out");
iobuf_out_cnt = 0;
iobuf_f_out = f_out;
return 1;
}
static char *iobuf_in;
static size_t iobuf_in_siz;
void io_start_buffering_in(void)
int io_start_buffering_in(int f_in)
{
if (iobuf_in)
return;
if (iobuf_in) {
assert(f_in == iobuf_f_in);
return 0;
}
iobuf_in_siz = 2 * IO_BUFFER_SIZE;
if (!(iobuf_in = new_array(char, iobuf_in_siz)))
out_of_memory("io_start_buffering_in");
iobuf_f_in = f_in;
return 1;
}
void io_end_buffering(void)
void io_end_buffering_in(void)
{
io_flush(NORMAL_FLUSH);
if (!io_multiplexing_out) {
free(iobuf_out);
iobuf_out = NULL;
}
if (!iobuf_in)
return;
free(iobuf_in);
iobuf_in = NULL;
iobuf_in_ndx = 0;
iobuf_in_remaining = 0;
iobuf_f_in = -1;
}
void io_end_buffering_out(void)
{
if (!iobuf_out)
return;
io_flush(FULL_FLUSH);
free(iobuf_out);
iobuf_out = NULL;
iobuf_f_out = -1;
}
void maybe_flush_socket(void)
......@@ -733,14 +797,31 @@ void maybe_send_keepalive(void)
if (!iobuf_out || !iobuf_out_cnt) {
if (protocol_version < 29)
return; /* there's nothing we can do */
write_int(sock_f_out, the_file_list->count);
write_shortint(sock_f_out, ITEM_IS_NEW);
if (protocol_version >= 30)
send_msg(MSG_NOOP, "", 0);
else {
write_int(sock_f_out, cur_flist->count);
write_shortint(sock_f_out, ITEM_IS_NEW);
}
}
if (iobuf_out)
io_flush(NORMAL_FLUSH);
}
}
void start_flist_forward(int f_in)
{
assert(iobuf_out != NULL);
assert(iobuf_f_out == msg_fd_out);
flist_forward_from = f_in;
}
void stop_flist_forward()
{
io_flush(NORMAL_FLUSH);
flist_forward_from = -1;
}
/**
* Continue trying to read len bytes - don't return until len has been
* read.
......@@ -763,26 +844,24 @@ static void read_loop(int fd, char *buf, size_t len)
*/
static int readfd_unbuffered(int fd, char *buf, size_t len)
{
static size_t remaining;
static size_t iobuf_in_ndx;
size_t msg_bytes;
int tag, cnt = 0;
char line[BIGPATHBUFLEN];
if (!iobuf_in || fd != sock_f_in)
if (!iobuf_in || fd != iobuf_f_in)
return read_timeout(fd, buf, len);
if (!io_multiplexing_in && remaining == 0) {
remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
if (!io_multiplexing_in && iobuf_in_remaining == 0) {
iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
iobuf_in_ndx = 0;
}
while (cnt == 0) {
if (remaining) {
len = MIN(len, remaining);
if (iobuf_in_remaining) {
len = MIN(len, iobuf_in_remaining);
memcpy(buf, iobuf_in + iobuf_in_ndx, len);
iobuf_in_ndx += len;
remaining -= len;
iobuf_in_remaining -= len;
cnt = len;
break;
}
......@@ -802,9 +881,19 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
iobuf_in_siz = msg_bytes;
}
read_loop(fd, iobuf_in, msg_bytes);
remaining = msg_bytes;
iobuf_in_remaining = msg_bytes;
iobuf_in_ndx = 0;
break;
case MSG_NOOP:
if (am_sender)
maybe_send_keepalive();
break;
case MSG_IO_ERROR:
if (msg_bytes != 4)
goto invalid_msg;
read_loop(fd, line, msg_bytes);
io_error |= IVAL(line, 0);
break;
case MSG_DELETED:
if (msg_bytes >= sizeof line)
goto overflow;
......@@ -819,6 +908,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
break;
case MSG_SUCCESS:
if (msg_bytes != 4) {
invalid_msg:
rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
tag, (long)msg_bytes, who_am_i());
exit_cleanup(RERR_STREAMIO);
......@@ -826,6 +916,12 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
read_loop(fd, line, msg_bytes);
successful_send(IVAL(line, 0));
break;
case MSG_NO_SEND:
if (msg_bytes != 4)
goto invalid_msg;
read_loop(fd, line, msg_bytes);
send_msg_int(MSG_NO_SEND, IVAL(line, 0));
break;
case MSG_INFO:
case MSG_ERROR:
if (msg_bytes >= sizeof line) {
......@@ -845,7 +941,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
}
}
if (remaining == 0)
if (iobuf_in_remaining == 0)
io_flush(NORMAL_FLUSH);
return cnt;
......@@ -871,6 +967,9 @@ static void readfd(int fd, char *buffer, size_t N)
exit_cleanup(RERR_FILEIO);
}
if (fd == flist_forward_from)
writefd(iobuf_f_out, buffer, total);
if (fd == sock_f_in)
stats.total_read += total;
}
......@@ -1171,7 +1270,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
/* Don't try to write errors back across the stream. */
if (fd == sock_f_out)
close_multiplexing_out();
io_end_multiplex_out();
rsyserr(FERROR, errno,
"writefd_unbuffered failed to write %ld bytes [%s]",
(long)len, who_am_i());
......@@ -1246,9 +1345,9 @@ static void mplex_write(enum msgcode code, const char *buf, size_t len)
}
}
void io_flush(int flush_it_all)
void io_flush(UNUSED(int flush_it_all))
{
msg2genr_flush(flush_it_all);
msg2genr_flush();
msg2sndr_flush();
if (!iobuf_out_cnt || no_flush)
......@@ -1257,17 +1356,12 @@ void io_flush(int flush_it_all)
if (io_multiplexing_out)
mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
else
writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
iobuf_out_cnt = 0;
}
static void writefd(int fd, const char *buf, size_t len)
{
if (fd == msg_fd_out) {
rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
exit_cleanup(RERR_PROTOCOL);
}
if (fd == sock_f_out)
stats.total_written += len;
......@@ -1276,7 +1370,7 @@ static void writefd(int fd, const char *buf, size_t len)
exit_cleanup(RERR_FILEIO);
}
if (!iobuf_out || fd != sock_f_out) {
if (!iobuf_out || fd != iobuf_f_out) {
writefd_unbuffered(fd, buf, len);
return;
}
......@@ -1499,7 +1593,7 @@ void io_printf(int fd, const char *format, ...)
void io_start_multiplex_out(void)
{
io_flush(NORMAL_FLUSH);
io_start_buffering_out();
io_start_buffering_out(sock_f_out);
io_multiplexing_out = 1;
}
......@@ -1507,7 +1601,7 @@ void io_start_multiplex_out(void)
void io_start_multiplex_in(void)
{
io_flush(NORMAL_FLUSH);
io_start_buffering_in();
io_start_buffering_in(sock_f_in);
io_multiplexing_in = 1;
}
......@@ -1516,22 +1610,23 @@ int io_multiplex_write(enum msgcode code, const char *buf, size_t len)
{
if (!io_multiplexing_out)
return 0;
io_flush(NORMAL_FLUSH);
stats.total_written += (len+4);
mplex_write(code, buf, len);
return 1;
}
void close_multiplexing_in(void)
void io_end_multiplex_in(void)
{
io_multiplexing_in = 0;
io_end_buffering_in();
}
/** Stop output multiplexing. */
void close_multiplexing_out(void)
void io_end_multiplex_out(void)
{
io_multiplexing_out = 0;
io_end_buffering_out();
}
void start_write_batch(int fd)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment