Commit 2885270b authored by Wayne Davison's avatar Wayne Davison

Fix a hang that can happen when the sender is sending an extra file-list

and no one is reading (i.e. do advantageous reading in perform_io()).
parent 0c2e8f93
......@@ -69,6 +69,7 @@ int ignore_timeout = 0;
int batch_fd = -1;
int msgdone_cnt = 0;
int forward_flist_data = 0;
BOOL flist_receiving_enabled = False;
/* Ignore an EOF error if non-zero. See whine_about_eof(). */
int kluge_around_eof = 0;
......@@ -147,6 +148,7 @@ enum festatus { FES_SUCCESS, FES_REDO, FES_NO_SEND };
static flist_ndx_list redo_list, hlink_list;
static void read_a_msg(void);
static void drain_multiplex_messages(void);
static void sleep_for_bwlimit(int bytes_written);
......@@ -487,6 +489,19 @@ void restore_iobuf_size(xbuf *out)
}
}
static void slide_iobuf_in(size_t needed)
{
memmove(iobuf.in.buf, iobuf.in.buf + iobuf.in.pos, iobuf.in.len);
if (DEBUG_GTE(IO, 4)) {
rprintf(FINFO,
"[%s] moved %ld bytes from %ld to 0 in the input buffer (size=%ld, needed=%ld).\n",
who_am_i(), (long)iobuf.in.len, (long)iobuf.in.pos, (long)iobuf.in.size, (long)needed);
}
if (iobuf.raw_input_ends_before)
iobuf.raw_input_ends_before -= iobuf.in.pos;
iobuf.in.pos = 0;
}
/* Perform buffered input and output until specified conditions are met. When
* given a "needed" read requirement, we'll return without doing any I/O if the
* iobuf.in bytes are already available. When reading, we'll read as many
......@@ -559,19 +574,8 @@ static char *perform_io(size_t needed, int flags)
}
realloc_xbuf(&iobuf.in, new_size);
}
if (iobuf.in.size - iobuf.in.pos < needed
|| (iobuf.in.len < needed && iobuf.in.len < 1024
&& iobuf.in.size - (iobuf.in.pos + iobuf.in.len) < 1024)) {
memmove(iobuf.in.buf, iobuf.in.buf + iobuf.in.pos, iobuf.in.len);
if (DEBUG_GTE(IO, 4)) {
rprintf(FINFO,
"[%s] moved %ld bytes from %ld to 0 in the input buffer (size=%ld, needed=%ld).\n",
who_am_i(), (long)iobuf.in.len, (long)iobuf.in.pos, (long)iobuf.in.size, (long)needed);
}
if (iobuf.raw_input_ends_before)
iobuf.raw_input_ends_before -= iobuf.in.pos;
iobuf.in.pos = 0;
}
if (iobuf.in.size - iobuf.in.pos < needed)
slide_iobuf_in(needed);
break;
case PIO_NEED_OUTROOM:
......@@ -633,6 +637,9 @@ static char *perform_io(size_t needed, int flags)
break;
}
if (iobuf.in.len < 1024 && iobuf.in.size - (iobuf.in.pos + iobuf.in.len) < 1024)
slide_iobuf_in(flags & PIO_NEED_INPUT ? needed : 0);
max_fd = -1;
FD_ZERO(&r_fds);
......@@ -837,6 +844,15 @@ static char *perform_io(size_t needed, int flags)
}
}
/* We need to help prevent deadlock by doing what reading
* we can whenever we are here trying to write. */
if (IN_MULTIPLEXED && !(flags & PIO_NEED_INPUT)) {
while (!iobuf.raw_input_ends_before && iobuf.in.len > 512)
read_a_msg();
if (flist_receiving_enabled && iobuf.in.len > 512)
wait_for_receiver(); /* generator only */
}
if (ff_forward_fd >= 0 && FD_ISSET(ff_forward_fd, &r_fds)) {
/* This can potentially flush all output and enable
* multiplexed output, so keep this last in the loop
......@@ -1570,6 +1586,7 @@ void wait_for_receiver(void)
}
} else {
struct file_list *flist;
flist_receiving_enabled = False;
if (DEBUG_GTE(FLIST, 2)) {
rprintf(FINFO, "[%s] receiving flist for dir %d\n",
who_am_i(), ndx);
......@@ -1580,6 +1597,7 @@ void wait_for_receiver(void)
if (preserve_hard_links)
match_hard_links(flist);
#endif
flist_receiving_enabled = True;
}
}
}
......
......@@ -73,6 +73,7 @@ extern int send_msgs_to_gen;
extern pid_t cleanup_child_pid;
extern size_t bwlimit_writemax;
extern unsigned int module_dirlen;
extern BOOL flist_receiving_enabled;
extern BOOL shutting_down;
extern struct stats stats;
extern char *stdout_format;
......@@ -898,6 +899,7 @@ static int do_recv(int f_in, int f_out, char *local_name)
}
am_generator = 1;
flist_receiving_enabled = True;
io_end_multiplex_in(MPLX_SWITCHING);
if (write_batch && !am_server)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment