Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liblongpath-rsync
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
liblongpath
liblongpath-rsync
Commits
763880ba
Commit
763880ba
authored
Dec 12, 2009
by
Wayne Davison
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Turn iobuf.in into a circular input buffer.
parent
2885270b
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
67 additions
and
70 deletions
+67
-70
io.c
io.c
+67
-70
No files found.
io.c
View file @
763880ba
...
...
@@ -489,19 +489,6 @@ void restore_iobuf_size(xbuf *out)
}
}
static
void
slide_iobuf_in
(
size_t
needed
)
{
memmove
(
iobuf
.
in
.
buf
,
iobuf
.
in
.
buf
+
iobuf
.
in
.
pos
,
iobuf
.
in
.
len
);
if
(
DEBUG_GTE
(
IO
,
4
))
{
rprintf
(
FINFO
,
"[%s] moved %ld bytes from %ld to 0 in the input buffer (size=%ld, needed=%ld).
\n
"
,
who_am_i
(),
(
long
)
iobuf
.
in
.
len
,
(
long
)
iobuf
.
in
.
pos
,
(
long
)
iobuf
.
in
.
size
,
(
long
)
needed
);
}
if
(
iobuf
.
raw_input_ends_before
)
iobuf
.
raw_input_ends_before
-=
iobuf
.
in
.
pos
;
iobuf
.
in
.
pos
=
0
;
}
/* Perform buffered input and output until specified conditions are met. When
* given a "needed" read requirement, we'll return without doing any I/O if the
* iobuf.in bytes are already available. When reading, we'll read as many
...
...
@@ -515,10 +502,11 @@ static void slide_iobuf_in(size_t needed)
* The iobuf.out and iobuf.msg buffers are circular, so some writes into them
* will need to be split when the data needs to wrap around to the start. In
* order to help make this easier for some operations (such as the use of
* SIVAL() into the buffer) the buffers MUST have 4 bytes of overflow space at
* the end that is not not counted in the "size". The iobuf.in buffer is not
* (currently) circular. To facilitate the handling of MSG_DATA bytes as they
* are read-from/written-into the buffers, see the three raw_* iobuf vars.
* SIVAL() into the buffer) a buffer may be temporarily shortened, but the
* original size will be automatically restored. The iobuf.in buffer is also
* circular, so callers may need to split their reading of the data if it spans
* the end. See also the 3 raw_* iobuf vars that are used in the handling of
* MSG_DATA bytes as they are read-from/written-into the buffers.
*
* When writing, we flush data in the following priority order:
*
...
...
@@ -535,8 +523,6 @@ static void slide_iobuf_in(size_t needed)
*
* - Make this routine able to read the generator-to-receiver batch flow?
*
* - Make the input buffer circular?
*
* Unlike the old routines that this replaces, it is OK to read ahead as far as
* we can because the read_a_msg() routine now reads its bytes out of the input
* buffer. In the old days, only raw data was in the input buffer, and any
...
...
@@ -558,24 +544,17 @@ static char *perform_io(size_t needed, int flags)
switch
(
flags
&
PIO_NEED_FLAGS
)
{
case
PIO_NEED_INPUT
:
/* We never resize the circular input buffer. */
if
(
iobuf
.
in
.
size
<
needed
)
{
rprintf
(
FERROR
,
"need to read %ld bytes, iobuf.in.buf is only %ld bytes.
\n
"
,
(
long
)
needed
,
(
long
)
iobuf
.
in
.
size
);
exit_cleanup
(
RERR_PROTOCOL
);
}
if
(
DEBUG_GTE
(
IO
,
3
))
{
rprintf
(
FINFO
,
"[%s] perform_io(%ld, %sinput)
\n
"
,
who_am_i
(),
(
long
)
needed
,
flags
&
PIO_CONSUME_INPUT
?
"consume&"
:
""
);
}
/* Make sure the input buffer is big enough to hold "needed" bytes.
* Also make sure it will fit in the free space at the end, or
* else we need to shift some bytes. */
if
(
needed
&&
iobuf
.
in
.
size
<
needed
)
{
size_t
new_size
=
ROUND_UP_1024
(
needed
);
if
(
DEBUG_GTE
(
IO
,
4
))
{
rprintf
(
FINFO
,
"[%s] resizing input buffer from %ld to %ld bytes.
\n
"
,
who_am_i
(),
(
long
)
iobuf
.
in
.
size
,
(
long
)
new_size
);
}
realloc_xbuf
(
&
iobuf
.
in
,
new_size
);
}
if
(
iobuf
.
in
.
size
-
iobuf
.
in
.
pos
<
needed
)
slide_iobuf_in
(
needed
);
break
;
case
PIO_NEED_OUTROOM
:
...
...
@@ -637,14 +616,11 @@ static char *perform_io(size_t needed, int flags)
break
;
}
if
(
iobuf
.
in
.
len
<
1024
&&
iobuf
.
in
.
size
-
(
iobuf
.
in
.
pos
+
iobuf
.
in
.
len
)
<
1024
)
slide_iobuf_in
(
flags
&
PIO_NEED_INPUT
?
needed
:
0
);
max_fd
=
-
1
;
FD_ZERO
(
&
r_fds
);
FD_ZERO
(
&
e_fds
);
if
(
iobuf
.
in_fd
>=
0
&&
iobuf
.
in
.
size
-
(
iobuf
.
in
.
pos
+
iobuf
.
in
.
len
)
)
{
if
(
iobuf
.
in_fd
>=
0
&&
iobuf
.
in
.
size
-
iobuf
.
in
.
len
)
{
if
(
!
read_batch
||
batch_fd
>=
0
)
{
FD_SET
(
iobuf
.
in_fd
,
&
r_fds
);
FD_SET
(
iobuf
.
in_fd
,
&
e_fds
);
...
...
@@ -761,9 +737,13 @@ static char *perform_io(size_t needed, int flags)
}
if
(
iobuf
.
in_fd
>=
0
&&
FD_ISSET
(
iobuf
.
in_fd
,
&
r_fds
))
{
size_t
pos
=
iobuf
.
in
.
pos
+
iobuf
.
in
.
len
;
size_t
len
=
iobuf
.
in
.
size
-
pos
;
size_t
len
,
pos
=
iobuf
.
in
.
pos
+
iobuf
.
in
.
len
;
int
n
;
if
(
pos
>=
iobuf
.
in
.
size
)
{
pos
-=
iobuf
.
in
.
size
;
len
=
iobuf
.
in
.
size
-
iobuf
.
in
.
len
;
}
else
len
=
iobuf
.
in
.
size
-
pos
;
if
((
n
=
read
(
iobuf
.
in_fd
,
iobuf
.
in
.
buf
+
pos
,
len
))
<=
0
)
{
if
(
n
==
0
)
{
/* Signal that input has become invalid. */
...
...
@@ -868,11 +848,40 @@ static char *perform_io(size_t needed, int flags)
if
(
flags
&
PIO_CONSUME_INPUT
)
{
iobuf
.
in
.
len
-=
needed
;
iobuf
.
in
.
pos
+=
needed
;
if
(
iobuf
.
in
.
pos
==
iobuf
.
raw_input_ends_before
)
iobuf
.
raw_input_ends_before
=
0
;
if
(
iobuf
.
in
.
pos
>=
iobuf
.
in
.
size
)
{
iobuf
.
in
.
pos
-=
iobuf
.
in
.
size
;
if
(
iobuf
.
raw_input_ends_before
)
iobuf
.
raw_input_ends_before
-=
iobuf
.
in
.
size
;
}
}
return
data
;
}
static
void
raw_read_buf
(
char
*
buf
,
size_t
len
)
{
size_t
pos
=
iobuf
.
in
.
pos
;
char
*
data
=
perform_io
(
len
,
PIO_INPUT_AND_CONSUME
);
if
(
iobuf
.
in
.
pos
<=
pos
&&
len
)
{
size_t
siz
=
len
-
iobuf
.
in
.
pos
;
memcpy
(
buf
,
data
,
siz
);
memcpy
(
buf
+
siz
,
iobuf
.
in
.
buf
,
iobuf
.
in
.
pos
);
}
else
memcpy
(
buf
,
data
,
len
);
}
static
int32
raw_read_int
(
void
)
{
char
*
data
,
buf
[
4
];
if
(
iobuf
.
in
.
size
-
iobuf
.
in
.
pos
>=
4
)
data
=
perform_io
(
4
,
PIO_INPUT_AND_CONSUME
);
else
raw_read_buf
(
data
=
buf
,
4
);
return
IVAL
(
data
,
0
);
}
void
noop_io_until_death
(
void
)
{
char
buf
[
1024
];
...
...
@@ -1338,12 +1347,11 @@ void stop_flist_forward(void)
/* Read a message from a multiplexed source. */
static
void
read_a_msg
(
void
)
{
char
*
data
,
line
[
BIGPATHBUFLEN
];
char
line
[
BIGPATHBUFLEN
];
int
tag
,
val
;
size_t
msg_bytes
;
data
=
perform_io
(
4
,
PIO_INPUT_AND_CONSUME
);
tag
=
IVAL
(
data
,
0
);
tag
=
raw_read_int
();
msg_bytes
=
tag
&
0xFFFFFF
;
tag
=
(
tag
>>
24
)
-
MPLEX_BASE
;
...
...
@@ -1364,20 +1372,17 @@ static void read_a_msg(void)
case
MSG_STATS
:
if
(
msg_bytes
!=
sizeof
stats
.
total_read
||
!
am_generator
)
goto
invalid_msg
;
data
=
perform_io
(
sizeof
stats
.
total_read
,
PIO_INPUT_AND_CONSUME
);
memcpy
((
char
*
)
&
stats
.
total_read
,
data
,
sizeof
stats
.
total_read
);
raw_read_buf
((
char
*
)
&
stats
.
total_read
,
sizeof
stats
.
total_read
);
break
;
case
MSG_REDO
:
if
(
msg_bytes
!=
4
||
!
am_generator
)
goto
invalid_msg
;
data
=
perform_io
(
4
,
PIO_INPUT_AND_CONSUME
);
got_flist_entry_status
(
FES_REDO
,
IVAL
(
data
,
0
));
got_flist_entry_status
(
FES_REDO
,
raw_read_int
());
break
;
case
MSG_IO_ERROR
:
if
(
msg_bytes
!=
4
||
am_sender
)
goto
invalid_msg
;
data
=
perform_io
(
4
,
PIO_INPUT_AND_CONSUME
);
val
=
IVAL
(
data
,
0
);
val
=
raw_read_int
();
io_error
|=
val
;
if
(
!
am_generator
)
send_msg_int
(
MSG_IO_ERROR
,
val
);
...
...
@@ -1385,8 +1390,7 @@ static void read_a_msg(void)
case
MSG_IO_TIMEOUT
:
if
(
msg_bytes
!=
4
||
am_server
||
am_generator
)
goto
invalid_msg
;
data
=
perform_io
(
4
,
PIO_INPUT_AND_CONSUME
);
val
=
IVAL
(
data
,
0
);
val
=
raw_read_int
();
if
(
!
io_timeout
||
io_timeout
>
val
)
{
if
(
INFO_GTE
(
MISC
,
2
))
rprintf
(
FINFO
,
"Setting --timeout=%d to match server
\n
"
,
val
);
...
...
@@ -1401,7 +1405,7 @@ static void read_a_msg(void)
if
(
msg_bytes
>=
sizeof
line
)
goto
overflow
;
if
(
am_generator
)
{
memcpy
(
line
,
perform_io
(
msg_bytes
,
PIO_INPUT_AND_CONSUME
)
,
msg_bytes
);
raw_read_buf
(
line
,
msg_bytes
);
send_msg
(
MSG_DELETED
,
line
,
msg_bytes
,
1
);
break
;
}
...
...
@@ -1417,7 +1421,7 @@ static void read_a_msg(void)
while
(
msg_bytes
)
{
size_t
len
=
msg_bytes
>
sizeof
ibuf
-
inbuf
.
len
?
sizeof
ibuf
-
inbuf
.
len
:
msg_bytes
;
memcpy
(
ibuf
+
inbuf
.
len
,
perform_io
(
len
,
PIO_INPUT_AND_CONSUME
)
,
len
);
raw_read_buf
(
ibuf
+
inbuf
.
len
,
len
);
inbuf
.
pos
=
0
;
inbuf
.
len
+=
len
;
if
(
!
(
msg_bytes
-=
len
)
&&
!
ibuf
[
inbuf
.
len
-
1
])
...
...
@@ -1439,7 +1443,7 @@ static void read_a_msg(void)
msg_bytes
=
outbuf
.
len
;
}
else
#endif
memcpy
(
line
,
perform_io
(
msg_bytes
,
PIO_INPUT_AND_CONSUME
)
,
msg_bytes
);
raw_read_buf
(
line
,
msg_bytes
);
/* A directory name was sent with the trailing null */
if
(
msg_bytes
>
0
&&
!
line
[
msg_bytes
-
1
])
log_delete
(
line
,
S_IFDIR
);
...
...
@@ -1456,8 +1460,7 @@ static void read_a_msg(void)
inc_recurse
?
"/inc"
:
""
);
exit_cleanup
(
RERR_STREAMIO
);
}
data
=
perform_io
(
4
,
PIO_INPUT_AND_CONSUME
);
val
=
IVAL
(
data
,
0
);
val
=
raw_read_int
();
if
(
am_generator
)
got_flist_entry_status
(
FES_SUCCESS
,
val
);
else
...
...
@@ -1466,8 +1469,7 @@ static void read_a_msg(void)
case
MSG_NO_SEND
:
if
(
msg_bytes
!=
4
)
goto
invalid_msg
;
data
=
perform_io
(
4
,
PIO_INPUT_AND_CONSUME
);
val
=
IVAL
(
data
,
0
);
val
=
raw_read_int
();
if
(
am_generator
)
got_flist_entry_status
(
FES_NO_SEND
,
val
);
else
...
...
@@ -1494,7 +1496,7 @@ static void read_a_msg(void)
inc_recurse
?
"/inc"
:
""
);
exit_cleanup
(
RERR_STREAMIO
);
}
memcpy
(
line
,
perform_io
(
msg_bytes
,
PIO_INPUT_AND_CONSUME
)
,
msg_bytes
);
raw_read_buf
(
line
,
msg_bytes
);
rwrite
((
enum
logcode
)
tag
,
line
,
msg_bytes
,
!
am_generator
);
if
(
first_message
)
{
if
(
list_only
&&
!
am_sender
&&
tag
==
1
&&
msg_bytes
<
sizeof
line
)
{
...
...
@@ -1518,8 +1520,7 @@ static void read_a_msg(void)
}
val
=
0
;
}
else
if
(
msg_bytes
==
4
)
{
data
=
perform_io
(
4
,
PIO_INPUT_AND_CONSUME
);
val
=
IVAL
(
data
,
0
);
val
=
raw_read_int
();
if
(
protocol_version
>=
31
)
{
if
(
am_generator
)
{
if
(
DEBUG_GTE
(
EXIT
,
3
))
{
...
...
@@ -1557,8 +1558,9 @@ static void drain_multiplex_messages(void)
iobuf
.
in
.
len
=
0
;
break
;
}
iobuf
.
in
.
pos
+=
raw_len
;
iobuf
.
in
.
len
-=
raw_len
;
if
((
iobuf
.
in
.
pos
+=
raw_len
)
>=
iobuf
.
in
.
size
)
iobuf
.
in
.
pos
-=
iobuf
.
in
.
size
;
}
read_a_msg
();
}
...
...
@@ -1724,7 +1726,7 @@ void read_buf(int f, char *buf, size_t len)
}
if
(
!
IN_MULTIPLEXED
)
{
memcpy
(
buf
,
perform_io
(
len
,
PIO_INPUT_AND_CONSUME
)
,
len
);
raw_read_buf
(
buf
,
len
);
total_data_read
+=
len
;
if
(
forward_flist_data
)
write_buf
(
iobuf
.
out_fd
,
buf
,
len
);
...
...
@@ -1735,20 +1737,15 @@ void read_buf(int f, char *buf, size_t len)
}
while
(
1
)
{
char
*
data
;
size_t
siz
;
while
(
!
iobuf
.
raw_input_ends_before
)
read_a_msg
();
siz
=
MIN
(
len
,
iobuf
.
raw_input_ends_before
-
iobuf
.
in
.
pos
);
data
=
perform_io
(
siz
,
PIO_INPUT_AND_CONSUME
);
if
(
iobuf
.
in
.
pos
==
iobuf
.
raw_input_ends_before
)
iobuf
.
raw_input_ends_before
=
0
;
/* The bytes at the "data" pointer will survive long
* enough to make a copy, but not past future I/O. */
memcpy
(
buf
,
data
,
siz
);
if
(
siz
>=
iobuf
.
in
.
size
)
siz
=
iobuf
.
in
.
size
;
raw_read_buf
(
buf
,
siz
);
total_data_read
+=
siz
;
if
(
forward_flist_data
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment