Commit 76c21947 authored by Wayne Davison's avatar Wayne Davison

Applying my updated version of Craig Barratt's buffered I/O patch.

parent 968c8030
...@@ -69,6 +69,21 @@ static int write_sparse(int f,char *buf,size_t len) ...@@ -69,6 +69,21 @@ static int write_sparse(int f,char *buf,size_t len)
return len; return len;
} }
static char *wf_writeBuf;
static size_t wf_writeBufSize;
static size_t wf_writeBufCnt;
int flush_write_file(int f)
{
int ret = write(f, wf_writeBuf, wf_writeBufCnt);
if (ret < 0)
return ret;
/* if (ret < wf_writeBufCnt) ??? */
wf_writeBufCnt = 0;
return ret;
}
/* /*
* write_file does not allow incomplete writes. It loops internally * write_file does not allow incomplete writes. It loops internally
* until len bytes are written or errno is set. * until len bytes are written or errno is set.
...@@ -83,7 +98,22 @@ int write_file(int f,char *buf,size_t len) ...@@ -83,7 +98,22 @@ int write_file(int f,char *buf,size_t len)
int len1 = MIN(len, SPARSE_WRITE_SIZE); int len1 = MIN(len, SPARSE_WRITE_SIZE);
r1 = write_sparse(f, buf, len1); r1 = write_sparse(f, buf, len1);
} else { } else {
r1 = write(f, buf, len); if (!wf_writeBuf) {
wf_writeBufSize = MAX_MAP_SIZE;
wf_writeBufCnt = 0;
wf_writeBuf = new_array(char, MAX_MAP_SIZE);
if (!wf_writeBuf) out_of_memory("write_file");
}
r1 = MIN(len, wf_writeBufSize - wf_writeBufCnt);
if (r1) {
memcpy(wf_writeBuf + wf_writeBufCnt, buf, r1);
wf_writeBufCnt += r1;
}
if (wf_writeBufCnt == wf_writeBufSize) {
if (flush_write_file(f) < 0) return -1;
if (!r1 && len)
continue;
}
} }
if (r1 <= 0) { if (r1 <= 0) {
if (ret > 0) return ret; if (ret > 0) return ret;
......
...@@ -924,7 +924,7 @@ struct file_list *send_file_list(int f, int argc, char *argv[]) ...@@ -924,7 +924,7 @@ struct file_list *send_file_list(int f, int argc, char *argv[])
flist = flist_new(); flist = flist_new();
if (f != -1) { if (f != -1) {
io_start_buffering(f); io_start_buffering_out(f);
if (filesfrom_fd >= 0) { if (filesfrom_fd >= 0) {
if (argv[0] && !push_dir(argv[0], 0)) { if (argv[0] && !push_dir(argv[0], 0)) {
rprintf(FERROR, "push_dir %s failed: %s\n", rprintf(FERROR, "push_dir %s failed: %s\n",
......
...@@ -41,8 +41,8 @@ ...@@ -41,8 +41,8 @@
static int io_multiplexing_out; static int io_multiplexing_out;
static int io_multiplexing_in; static int io_multiplexing_in;
static int multiplex_in_fd; static int multiplex_in_fd = -1;
static int multiplex_out_fd; static int multiplex_out_fd = -1;
static time_t last_io; static time_t last_io;
static int no_flush; static int no_flush;
...@@ -440,17 +440,31 @@ static int read_unbuffered(int fd, char *buf, size_t len) ...@@ -440,17 +440,31 @@ static int read_unbuffered(int fd, char *buf, size_t len)
static size_t remaining; static size_t remaining;
int tag, ret = 0; int tag, ret = 0;
char line[1024]; char line[1024];
static char *buffer;
static size_t bufferIdx = 0;
static size_t bufferSz;
if (!io_multiplexing_in || fd != multiplex_in_fd) if (fd != multiplex_in_fd)
return read_timeout(fd, buf, len); return read_timeout(fd, buf, len);
if (!io_multiplexing_in && remaining == 0) {
if (!buffer) {
bufferSz = 2 * IO_BUFFER_SIZE;
buffer = new_array(char, bufferSz);
if (!buffer) out_of_memory("read_unbuffered");
}
remaining = read_timeout(fd, buffer, bufferSz);
bufferIdx = 0;
}
while (ret == 0) { while (ret == 0) {
if (remaining) { if (remaining) {
len = MIN(len, remaining); len = MIN(len, remaining);
read_loop(fd, buf, len); memcpy(buf, buffer + bufferIdx, len);
bufferIdx += len;
remaining -= len; remaining -= len;
ret = len; ret = len;
continue; break;
} }
read_loop(fd, line, 4); read_loop(fd, line, 4);
...@@ -459,8 +473,16 @@ static int read_unbuffered(int fd, char *buf, size_t len) ...@@ -459,8 +473,16 @@ static int read_unbuffered(int fd, char *buf, size_t len)
remaining = tag & 0xFFFFFF; remaining = tag & 0xFFFFFF;
tag = tag >> 24; tag = tag >> 24;
if (tag == MPLEX_BASE) if (tag == MPLEX_BASE) {
if (!buffer || remaining > bufferSz) {
buffer = realloc_array(buffer, char, remaining);
if (!buffer) out_of_memory("read_unbuffered");
bufferSz = remaining;
}
read_loop(fd, buffer, remaining);
bufferIdx = 0;
continue; continue;
}
tag -= MPLEX_BASE; tag -= MPLEX_BASE;
...@@ -482,6 +504,9 @@ static int read_unbuffered(int fd, char *buf, size_t len) ...@@ -482,6 +504,9 @@ static int read_unbuffered(int fd, char *buf, size_t len)
remaining = 0; remaining = 0;
} }
if (remaining == 0)
io_flush();
return ret; return ret;
} }
...@@ -498,8 +523,6 @@ static void readfd(int fd, char *buffer, size_t N) ...@@ -498,8 +523,6 @@ static void readfd(int fd, char *buffer, size_t N)
size_t total=0; size_t total=0;
while (total < N) { while (total < N) {
io_flush();
ret = read_unbuffered(fd, buffer + total, N-total); ret = read_unbuffered(fd, buffer + total, N-total);
total += ret; total += ret;
} }
...@@ -682,7 +705,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) ...@@ -682,7 +705,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
static char *io_buffer; static char *io_buffer;
static int io_buffer_count; static int io_buffer_count;
void io_start_buffering(int fd) void io_start_buffering_out(int fd)
{ {
if (io_buffer) return; if (io_buffer) return;
multiplex_out_fd = fd; multiplex_out_fd = fd;
...@@ -691,6 +714,11 @@ void io_start_buffering(int fd) ...@@ -691,6 +714,11 @@ void io_start_buffering(int fd)
io_buffer_count = 0; io_buffer_count = 0;
} }
void io_start_buffering_in(int fd)
{
multiplex_in_fd = fd;
}
/** /**
* Write an message to a multiplexed stream. If this fails then rsync * Write an message to a multiplexed stream. If this fails then rsync
* exits. * exits.
...@@ -881,7 +909,7 @@ void io_start_multiplex_out(int fd) ...@@ -881,7 +909,7 @@ void io_start_multiplex_out(int fd)
{ {
multiplex_out_fd = fd; multiplex_out_fd = fd;
io_flush(); io_flush();
io_start_buffering(fd); io_start_buffering_out(fd);
io_multiplexing_out = 1; io_multiplexing_out = 1;
} }
......
...@@ -380,6 +380,8 @@ static void do_server_sender(int f_in, int f_out, int argc,char *argv[]) ...@@ -380,6 +380,8 @@ static void do_server_sender(int f_in, int f_out, int argc,char *argv[])
exit_cleanup(0); exit_cleanup(0);
} }
io_start_buffering_in(f_in);
io_start_buffering_out(f_out);
send_files(flist,f_out,f_in); send_files(flist,f_out,f_in);
io_flush(); io_flush();
report(f_out); report(f_out);
...@@ -454,7 +456,7 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name) ...@@ -454,7 +456,7 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
close(error_pipe[1]); close(error_pipe[1]);
if (f_in != f_out) close(f_in); if (f_in != f_out) close(f_in);
io_start_buffering(f_out); io_start_buffering_out(f_out);
io_set_error_fd(error_pipe[0]); io_set_error_fd(error_pipe[0]);
...@@ -508,6 +510,7 @@ static void do_server_recv(int f_in, int f_out, int argc,char *argv[]) ...@@ -508,6 +510,7 @@ static void do_server_recv(int f_in, int f_out, int argc,char *argv[])
} }
} }
io_start_buffering_in(f_in);
if (delete_mode && !delete_excluded) if (delete_mode && !delete_excluded)
recv_exclude_list(f_in); recv_exclude_list(f_in);
...@@ -606,6 +609,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) ...@@ -606,6 +609,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[])
extern int cvs_exclude; extern int cvs_exclude;
extern int delete_mode; extern int delete_mode;
extern int delete_excluded; extern int delete_excluded;
io_start_buffering_out(f_out);
if (cvs_exclude) if (cvs_exclude)
add_cvs_excludes(); add_cvs_excludes();
if (delete_mode && !delete_excluded) if (delete_mode && !delete_excluded)
...@@ -617,7 +621,10 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) ...@@ -617,7 +621,10 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[])
if (verbose > 3) if (verbose > 3)
rprintf(FINFO,"file list sent\n"); rprintf(FINFO,"file list sent\n");
io_flush();
io_start_buffering_out(f_out);
send_files(flist,f_out,f_in); send_files(flist,f_out,f_in);
io_flush();
if (protocol_version >= 24) { if (protocol_version >= 24) {
/* final goodbye message */ /* final goodbye message */
read_int(f_in); read_int(f_in);
...@@ -629,6 +636,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) ...@@ -629,6 +636,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[])
wait_process(pid, &status); wait_process(pid, &status);
} }
report(-1); report(-1);
io_flush();
exit_cleanup(status); exit_cleanup(status);
} }
......
...@@ -303,6 +303,8 @@ static int receive_data(int f_in,struct map_struct *mapbuf,int fd,char *fname, ...@@ -303,6 +303,8 @@ static int receive_data(int f_in,struct map_struct *mapbuf,int fd,char *fname,
offset += len; offset += len;
} }
flush_write_file(fd);
if (do_progress) if (do_progress)
end_progress(total_size); end_progress(total_size);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment