Add missing bits for full streaming support

Previous code would only deliver fully received segments even if a
busy object was being written to by the backend side (streaming).

I guess at some point before the public release I must have thought
about this and decided that streaming only completed segments should
be acceptable at least to begin with, but I overlooked the fact that
the previous implementation could lead to short body writes due to a
lack of coordination between the writing and reading side.

This commit introduces proper streaming support, also within busy
segments.

In particular, this should also solve an issue reported by @tomazz75
on gitlab where no body was sent at all. This problem could lead to
the client stalling on a wait for body data which never came.

The nature of this problem was a race condition, which I was only able
to reproduce on my system with the following patch to favor the race:

diff --git a/src/fellow_cache.c b/src/fellow_cache.c
index 3073b35..c9d9a4e 100644
--- a/src/fellow_cache.c
+++ b/src/fellow_cache.c
@@ -3338,6 +3338,9 @@ fellow_busy_obj_getspace(struct fellow_busy *fbo, size_t *sz, uint8_t **ptr)
        assert(*sz > 0);
        AN(ptr);

+       // XXX DEBUG
+       usleep(10000);
+
        CHECK_OBJ_NOTNULL(fbo->fc, FELLOW_CACHE_MAGIC);
        CHECK_OBJ_NOTNULL(fbo->fc->tune, STVFE_TUNE_MAGIC);
        max = (size_t)1 << fbo->fc->tune->chunk_exponent;

This fix survived 1000 calls to the respective test case:

	/tmp/bin/varnishtest \
		-Dlibvmod_slash=/tmp/lib/varnish/vmods/libvmod_slash.so \
		src/vtc/fellow_c00093.vtc \
		-n 1000 -j 20

Thank you to @tomazz75 for reporting the bug and help with additional
information to track it down.

Fixes #2
parent a3cd9bc8
......@@ -2845,6 +2845,7 @@ fellow_cache_seg_check(struct fellow_cache *fc, struct fellow_cache_seg *fcs)
switch (fcs->state) {
case FCS_READFAIL:
return ("segment FCS_READFAIL");
case FCS_BUSY:
case FCS_WRITING:
case FCS_INCORE:
return (NULL);
......@@ -3043,25 +3044,6 @@ fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
fellow_cache_seg_ref_in(fc, type, racesegs, racen);
}
static inline int
fcs_state_read_needwait(enum fcos_state s)
{
switch (s) {
case FCS_BUSY:
case FCS_READING:
return (1);
case FCS_CHECK:
case FCS_WRITING:
case FCS_INCORE:
case FCS_READFAIL:
case FCS_USABLE:
return (0);
default:
// INIT, DISK
WRONG("state when waiting for read");
}
}
/*
* ra[] is a ring of pointers to fcses which we (potentially)
* read ahead.
......@@ -3089,6 +3071,7 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
struct fcscursor c, rac, nextc;
unsigned n = 0, ran = 0, raion, derefn = 0, flags, flush;
const char *err;
ssize_t sz;
int ret2;
fcr.status = fcr_ok; // also if func() != 0
......@@ -3132,7 +3115,7 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
raio, raion);
AZ(fcr.r.integer);
if (fcs_state_read_needwait(fcs->state)) {
if (fcs->state == FCS_READING) {
/* We can not get here for readahead == 0, because of
* the _ref_in(FCIO_SYNC) above
*
......@@ -3145,7 +3128,7 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
AZ(derefn);
AZ(pthread_mutex_lock(&fcs->fco->mtx));
while (fcs_state_read_needwait(fcs->state))
while (fcs->state == FCS_READING)
fellow_cache_seg_wait_locked(fcs);
AZ(pthread_mutex_unlock(&fcs->fco->mtx));
}
......@@ -3172,7 +3155,9 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
nextc = c;
fcsnext = fcsc_next(&nextc);
flush = 0;
if (fcsnext == NULL || fcsnext->state == FCS_USABLE) {
if (fcs->state == FCS_BUSY)
flush = OBJ_ITER_FLUSH;
else if ((fcsnext == NULL || fcsnext->state == FCS_USABLE)) {
flags |= OBJ_ITER_END;
flush = OBJ_ITER_FLUSH;
}
......@@ -3182,8 +3167,15 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
assert(ra[n % mod] == fcs);
ra[n % mod] = NULL;
fcr.r.integer = func(priv, flags | flush,
fcs->alloc.ptr, (ssize_t)fcs->disk_seg->seg.size);
// for BUSY, we pass the maximum segment size
// and our func-wrapper will look after the amount
// to write
if (fcs->state == FCS_BUSY)
sz = (ssize_t)fcs->alloc.size;
else
sz = (ssize_t)fcs->disk_seg->seg.size;
fcr.r.integer = func(priv, flags | flush, fcs->alloc.ptr, sz);
if (final) {
AZ(derefn);
......
......@@ -483,6 +483,102 @@ stvfe_fcr_handle_iter(struct worker *wrk, struct objcore *oc,
return (-1);
}
/*
* our iterator at the cache layer should not need to know about varnish-cache
* specifics, so we encapsulate the busy object / streaming coordination in an
* iterator function.
*
* the basic assumption at the cache layer is that there always exists a segment
* if there is any data to be iterated over, so we always call ObjWaitExtend()
* via fellow_stream_wait() before returning.
*/
struct fellow_stream {
unsigned magic;
#define FELLOW_STREAM_MAGIC 0x374d85b8
enum boc_state_e state;
struct worker *wrk;
struct objcore *oc;
void *priv;
objiterate_f *func;
struct boc *boc;
ssize_t available;
ssize_t written;
ssize_t checkpoint; // up to this segment
};
static enum boc_state_e
fellow_stream_wait(struct fellow_stream *fs)
{
enum boc_state_e state;
uint64_t l;
CHECK_OBJ_NOTNULL(fs, FELLOW_STREAM_MAGIC);
do {
l = ObjWaitExtend(fs->wrk, fs->oc, (uint64_t)fs->available);
state = fs->boc->state;
if (state == BOS_FINISHED || state == BOS_FAILED)
break;
assert(state == BOS_STREAM);
} while (l == (uint64_t)fs->available);
fs->state = state;
fs->available = (ssize_t)l;
return (state);
}
/* called on each (potentially busy) segment's data */
static int v_matchproto_(objiterate_f)
fellow_stream_f(void *priv, unsigned flush, const void *ptr, ssize_t len)
{
struct fellow_stream *fs;
const char *p;
ssize_t l;
int r;
CAST_OBJ_NOTNULL(fs, priv, FELLOW_STREAM_MAGIC);
assert((fs->state == BOS_FINISHED && fs->available >= fs->written) ||
(fs->state == BOS_STREAM && fs->available > fs->written));
assert(fs->checkpoint == fs->written);
if (ptr == NULL || len == 0)
return (fs->func(fs->priv, flush, ptr, len));
if (flush & OBJ_ITER_END) {
if (fellow_stream_wait(fs) == BOS_FAILED)
return (-1);
assert(fs->state == BOS_FINISHED);
}
p = ptr;
assert(fs->available >= fs->written);
l = vmin_t(ssize_t, len, fs->available - fs->written);
if (flush & OBJ_ITER_END)
assert(l <= len);
do {
r = fs->func(fs->priv, flush, p, l);
if (r)
return (r);
assert(len >= l);
fs->written += l;
len -= l;
p += l;
if (fellow_stream_wait(fs) == BOS_FAILED)
return (-1);
assert(fs->available >= fs->written);
l = vmin_t(ssize_t, len, fs->available - fs->written);
} while (l > 0);
fs->checkpoint = fs->written;
return (r);
}
static int v_matchproto_(objiterate_f)
sfemem_iterator(struct worker *wrk, struct objcore *oc,
void *priv, objiterate_f *func, int final)
......@@ -491,9 +587,44 @@ sfemem_iterator(struct worker *wrk, struct objcore *oc,
const struct stvfe *stvfe = stv_stvfe(stv);
struct fellow_cache_obj *fco = stvfe_memoc_fco(stv, stvfe, oc);
struct fellow_cache_res fcr;
struct fellow_stream fs;
struct boc *boc;
int ret;
boc = HSH_RefBoc(oc);
fcr = fellow_cache_obj_iter(stvfe->fc, fco, priv, func, final);
return (stvfe_fcr_handle_iter(wrk, oc, stv, fcr));
if (boc == NULL) {
fcr = fellow_cache_obj_iter(stvfe->fc, fco, priv, func, final);
return (stvfe_fcr_handle_iter(wrk, oc, stv, fcr));
}
INIT_OBJ(&fs, FELLOW_STREAM_MAGIC);
fs.wrk = wrk;
fs.oc = oc;
fs.priv = priv;
fs.func = func;
fs.boc = boc;
// flush header if waiting for data
if (fs.boc->BOC_FETCHED_SO_FAR == 0) {
ret = func(priv, OBJ_ITER_FLUSH, NULL, (ssize_t)0);
if (ret)
goto out;
}
// have some data ready for obj_iter
if (fellow_stream_wait(&fs) == BOS_FAILED) {
ret = -1;
goto out;
}
fcr = fellow_cache_obj_iter(stvfe->fc, fco,
&fs, fellow_stream_f, final);
ret = stvfe_fcr_handle_iter(wrk, oc, stv, fcr);
out:
HSH_DerefBoc(wrk, oc);
return (ret);
}
static int v_matchproto_(objiterator_f)
......@@ -508,6 +639,8 @@ sfedsk_iterator(struct worker *wrk, struct objcore *dskoc,
if (fcoc.fco == NULL)
return (-1);
AZ(HSH_RefBoc(dskoc));
fcr = fellow_cache_obj_iter(stvfe->fc, fcoc.fco, priv, func, final);
fcoc_fini(stvfe->fc, &fcoc);
return (stvfe_fcr_handle_iter(wrk, dskoc, stv, fcr));
......
......@@ -21,6 +21,7 @@
-emacro(835, DLE_N) // A zero has been given...
-emacro(525, DLE_N) // Negative indent
-emacro(160, VSTAILQ_LAST) // sequence ({
-emacro(160, _vtake) // sequence ({
-esym(760, FH_*) // defined identically...
//bitf.h only used for test
-esym(768, bitf::extra)
......
......@@ -7,16 +7,21 @@ barrier b2 sock 2
server s1 -listen "${tmpdir}/s1.sock" {
rxreq
txresp -nolen -hdr "Content-Length: 65536"
txresp -nolen -hdr "Transfer-Encoding: chunked"
barrier b1 sync
barrier b2 sync
delay .5
send_n 128 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
delay .5
send_n 128 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
chunkedlen 65536
delay .2
chunkedlen 65536
delay .2
chunkedlen 65536
delay .2
chunkedlen 65536
chunkedlen 0
} -start
varnish v1 \
-arg "-p fetch_maxchunksize=64k" \
-arg "-E${libvmod_slash}" \
-arg "-sfellow=fellow,${tmpdir}/fellow_global.stv,100MB,1MB,64KB" \
-vcl+backend {
......@@ -45,7 +50,7 @@ client c1 {
txreq
rxresp
expect resp.http.streaming == true
expect resp.bodylen == 65536
expect resp.bodylen == 262144
} -run
logexpect l1 -wait
......@@ -56,5 +61,5 @@ client c2 {
txreq
rxresp
expect resp.http.streaming == false
expect resp.bodylen == 65536
expect resp.bodylen == 262144
} -run
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment