Commit 7d51b837 authored by Wayne Davison's avatar Wayne Davison

Made the new msg2sndr handling even better.

parent a27042b5
...@@ -1063,6 +1063,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) ...@@ -1063,6 +1063,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
size_t n, total = 0; size_t n, total = 0;
fd_set w_fds, r_fds; fd_set w_fds, r_fds;
int maxfd, count, cnt, using_r_fds; int maxfd, count, cnt, using_r_fds;
int defer_save = defer_forwarding_messages;
struct timeval tv; struct timeval tv;
no_flush++; no_flush++;
...@@ -1101,18 +1102,6 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) ...@@ -1101,18 +1102,6 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
if (!FD_ISSET(fd, &w_fds)) if (!FD_ISSET(fd, &w_fds))
continue; continue;
if (msg2sndr.head && !defer_forwarding_messages) {
struct msg_list_item *m = msg2sndr.head;
int code = *((uchar*)m->buf+3) - MPLEX_BASE;
if (!(msg2sndr.head = m->next))
msg2sndr.tail = NULL;
defer_forwarding_messages = 1;
io_multiplex_write(code, m->buf+4, m->len-4);
defer_forwarding_messages = 0;
free(m);
continue;
}
n = len - total; n = len - total;
if (bwlimit_writemax && n > bwlimit_writemax) if (bwlimit_writemax && n > bwlimit_writemax)
n = bwlimit_writemax; n = bwlimit_writemax;
...@@ -1154,11 +1143,28 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) ...@@ -1154,11 +1143,28 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
sleep_for_bwlimit(cnt); sleep_for_bwlimit(cnt);
} }
} }
defer_forwarding_messages = 0;
defer_forwarding_messages = defer_save;
no_flush--; no_flush--;
} }
static void msg2sndr_flush(void)
{
if (defer_forwarding_messages)
return;
while (msg2sndr.head) {
struct msg_list_item *m = msg2sndr.head;
if (!(msg2sndr.head = m->next))
msg2sndr.tail = NULL;
stats.total_written += m->len;
defer_forwarding_messages = 1;
writefd_unbuffered(sock_f_out, m->buf, m->len);
defer_forwarding_messages = 0;
free(m);
}
}
/** /**
* Write an message to a multiplexed stream. If this fails then rsync * Write an message to a multiplexed stream. If this fails then rsync
* exits. * exits.
...@@ -1180,13 +1186,18 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) ...@@ -1180,13 +1186,18 @@ static void mplex_write(enum msgcode code, char *buf, size_t len)
len -= n; len -= n;
buf += n; buf += n;
if (len) if (len) {
defer_forwarding_messages = 1;
writefd_unbuffered(sock_f_out, buf, len); writefd_unbuffered(sock_f_out, buf, len);
defer_forwarding_messages = 0;
msg2sndr_flush();
}
} }
void io_flush(int flush_it_all) void io_flush(int flush_it_all)
{ {
msg2genr_flush(flush_it_all); msg2genr_flush(flush_it_all);
msg2sndr_flush();
if (!iobuf_out_cnt || no_flush) if (!iobuf_out_cnt || no_flush)
return; return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment