Fix races for streaming busy objects

For streaming busy objects, we basically rely on the varnish-cache
ObjExtend() / ObjWaitExtend() API to never read past the object: In
fellow_stream_f(), we always wait for more data (or the end of the
object) before returning, such that fellow_cache_obj_iter(), which
iterates over segments, should never touch a segment past the final
FCS_BUSY segment.

Yet - it did, by means of the read-ahead and the peek-ahead to determine
whether or not OBJ_ITER_END should be signaled.

We fix this issue by reading/peeking ahead only for segments with a
state beyond FCS_BUSY.

There is now also extensive test infrastructure to specifically test
concurrent access ti busy objects. To keep layers separate,
fellow_cache_test uses a lightweight signal/wait implementation
analogous to the ObjExtend() / ObjWaitExtend() Varnish-Cache
interface.

An earlier version of t_busyobj() had run on my dev laptop for 3.5
hours without crashing, while without the fixes it had run into
assertion failures within seconds.

Fixes #35 and #36 (I hope)
parent 83bc6afe
......@@ -407,6 +407,17 @@ fcsc_next(struct fcscursor *c)
DBG("fcsc " #c " %p u=%u nsegs=%u next=%p", c, (c)->u, \
(c)->fdsl->nsegs, (c)->fcsl->next), fcsc_next(c))
// look at the next element, but do not change the cursor
static inline struct fellow_cache_seg *
fcsc_peek(const struct fcscursor *ca)
{
struct fcscursor c;
AN(ca);
c = *ca;
return (FCSC_NEXT(&c));
}
/*
* fdo attribues we want to keep our own enum and, consequently, bitfield
* stable. so we define the enum manually and then generate code from the table
......@@ -2196,6 +2207,10 @@ fellow_busy_region_free(struct fellow_busy *fbo, struct buddy_off_extent *fdr)
WRONG("region to free not found");
}
#ifdef TEST_DRIVER
static uint16_t XXX_LIMIT_LDSEGS = 0;
#endif
static struct fellow_cache_res
fellow_body_seglist_alloc(struct fellow_busy *fbo,
struct fellow_cache_seglist *ofcsl)
......@@ -2238,6 +2253,10 @@ fellow_body_seglist_alloc(struct fellow_busy *fbo,
if (sz > FELLOW_DISK_SEGLIST_MAX_SEGS)
sz = FELLOW_DISK_SEGLIST_MAX_SEGS;
ldsegs = (uint16_t)sz;
#ifdef TEST_DRIVER
if (XXX_LIMIT_LDSEGS > 0)
ldsegs = XXX_LIMIT_LDSEGS;
#endif
sz = fellow_disk_seglist_size(fdsl, ldsegs);
......@@ -3764,7 +3783,7 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
unsigned mod = readahead + 1;
struct fellow_cache_seg *fcsnext, *fcs, *fcsra,
*ra[mod], *raio[mod], *deref[mod];
struct fcscursor c, rac, nextc;
struct fcscursor c, rac;
unsigned n = 0, ran = 0, raion, derefn = 0, flags, flush;
const char *err;
ssize_t sz;
......@@ -3785,6 +3804,12 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
flags = final ? OBJ_ITER_FLUSH : 0;
flush = 0;
while ((fcs = FCSC_NEXT(&c)) != NULL) {
/*
* fellow_stream_f/test_iter_f ensure
* that we do not read past the last busy segment
*/
assert(FCOS(fcs->state) >= FCOS_BUSY);
if (ra[n % mod] == NULL) {
assert(n == ran);
......@@ -3797,18 +3822,25 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
}
assert(ra[n % mod] == fcs);
/* only read ahead if both the current segment and the next
* are > BUSY
*/
raion = 0;
while (ran <= n + readahead) {
fcsra = FCSC_NEXT(&rac);
if (fcsra == NULL)
break;
raio[raion++] = fcsra;
TAKEZN(ra[ran % mod], fcsra);
ran++;
if (FCOS(fcs->state) > FCOS_BUSY) {
while (ran <= n + readahead &&
(fcsnext = fcsc_peek(&rac)) != NULL &&
FCOS(fcsnext->state) > FCOS_BUSY) {
fcsra = FCSC_NEXT(&rac);
assert(fcsra == fcsnext); // because peek
raio[raion++] = fcsra;
TAKEZN(ra[ran % mod], fcsra);
ran++;
}
if (raion) {
fellow_cache_seg_ref_in(fc, FCIO_ASYNC,
raio, raion);
}
}
if (raion)
fellow_cache_seg_ref_in(fc, FCIO_ASYNC,
raio, raion);
AZ(fcr.r.integer);
if (fcs->state == FCS_READING) {
......@@ -3848,12 +3880,12 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
* derefn == mod - 1 == 0
*/
AZ(flags & OBJ_ITER_END);
nextc = c;
fcsnext = FCSC_NEXT(&nextc);
flush = 0;
if (fcs->state == FCS_BUSY)
if (fcs->state == FCS_BUSY) {
flush = OBJ_ITER_FLUSH;
else if ((fcsnext == NULL || fcsnext->state == FCS_USABLE)) {
}
else if (((fcsnext = fcsc_peek(&c)) == NULL) ||
fcsnext->state == FCS_USABLE) {
flags |= OBJ_ITER_END;
flush = OBJ_ITER_FLUSH;
}
......@@ -5519,6 +5551,141 @@ static void test_fellow_cache_obj_iter(
test_fellow_cache_obj_iter_final(fc, fcop, h1, final);
}
/* simulate v-c ObjExtend/ObjWaitExtend */
struct test_iter_wait_s {
unsigned magic;
#define TEST_ITER_WAIT_MAGIC 0x9b229927
unsigned done;
unsigned flush;
ssize_t r, w;
pthread_mutex_t mtx;
pthread_cond_t cond;
// iter func
void *priv;
objiterate_f *func;
};
static void
init_test_iter_wait(struct test_iter_wait_s *tiw)
{
INIT_OBJ(tiw, TEST_ITER_WAIT_MAGIC);
AZ(pthread_mutex_init(&tiw->mtx, NULL));
AZ(pthread_cond_init(&tiw->cond, NULL));
}
static void
fini_test_iter_wait(struct test_iter_wait_s *tiw)
{
CHECK_OBJ_NOTNULL(tiw, TEST_ITER_WAIT_MAGIC);
AZ(pthread_mutex_destroy(&tiw->mtx));
AZ(pthread_cond_destroy(&tiw->cond));
}
static void
test_iter_extend(struct test_iter_wait_s *tiw, ssize_t sz)
{
CHECK_OBJ_NOTNULL(tiw, TEST_ITER_WAIT_MAGIC);
AZ(pthread_mutex_lock(&tiw->mtx));
tiw->w += sz;
AZ(pthread_cond_broadcast(&tiw->cond));
AZ(pthread_mutex_unlock(&tiw->mtx));
}
static void
test_iter_wait(struct test_iter_wait_s *tiw)
{
CHECK_OBJ_NOTNULL(tiw, TEST_ITER_WAIT_MAGIC);
AZ(pthread_mutex_lock(&tiw->mtx));
assert(tiw->r <= tiw->w);
while (! tiw->done && tiw->r == tiw->w)
AZ(pthread_cond_wait(&tiw->cond, &tiw->mtx));
AZ(pthread_mutex_unlock(&tiw->mtx));
}
// analogous to fellow_stream_f in fellow_storage.c
static int
test_iter_f(void *priv, unsigned flush, const void *ptr, ssize_t len)
{
struct test_iter_wait_s *tiw;
const char *p;
ssize_t l;
int r = 0;
CAST_OBJ_NOTNULL(tiw, priv, TEST_ITER_WAIT_MAGIC);
DBG("r=%zd w=%zd (%zd) done=%u flush=%u ptr=%p len=%zd",
tiw->r, tiw->w, tiw->w - tiw->r, tiw->done, flush, ptr, len);
if (ptr == NULL || len == 0)
return (tiw->func(tiw->priv, flush, ptr, len));
// can only see OBJ_ITER_END once
if (flush & OBJ_ITER_END) {
AZ(tiw->flush);
tiw->flush = flush;
}
p = ptr;
assert(tiw->w >= tiw->r);
l = vmin_t(ssize_t, len, tiw->w - tiw->r);
if (flush & OBJ_ITER_END)
assert(l <= len);
do {
r = tiw->func(tiw->priv, flush, p, l);
if (r)
return (r);
assert(len >= l);
tiw->r += l;
len -= l;
p += l;
test_iter_wait(tiw);
assert(tiw->w >= tiw->r);
l = vmin_t(ssize_t, len, tiw->w - tiw->r);
} while (len > 0);
AZ(len);
return (r);
}
/* to pass arguments to the iter thread */
struct test_iter_thread_s {
unsigned magic;
#define TEST_ITER_THREAD_MAGIC 0xcbcd1884
struct fellow_cache *fc;
struct fellow_cache_obj *fco;
void *priv;
objiterate_f *func;
int final;
// result
struct fellow_cache_res res;
// the thread itself
pthread_t thr;
};
/* iterate in a separate thread */
static void *
test_fellow_cache_obj_iter_thread_f(void *p)
{
struct test_iter_thread_s *args;
struct test_iter_wait_s *tiw;
CAST_OBJ_NOTNULL(args, p, TEST_ITER_THREAD_MAGIC);
// analogous to sfemem_iterator():
// have some data ready for obj_iter
CAST_OBJ_NOTNULL(tiw, args->priv, TEST_ITER_WAIT_MAGIC);
test_iter_wait(tiw);
args->res = fellow_cache_obj_iter(args->fc, args->fco,
args->priv, args->func, args->final);
return (NULL);
}
#define DBGSZ(x) \
DBG(#x "\t%3zu", sizeof(struct x))
......@@ -5600,6 +5767,96 @@ t_lcb(struct fellow_cache *fc)
DBG("done %s","---");
}
// test concurrent iter while filling object
static void
t_busyobj(unsigned chksum, struct fellow_cache *fc)
{
unsigned char h1[SHA256_LEN], h2[SHA256_LEN];
struct fellow_cache_obj *fco;
uint8_t hash[DIGEST_LEN];
struct fellow_busy *fbo;
VSHA256_CTX sha256ctx;
uintptr_t priv2;
unsigned u;
char *ptr;
size_t sz;
VSHA256_Init(&sha256ctx);
fbo = fellow_busy_obj_alloc(fc, &fco, &priv2, 1234).r.ptr;
CHECK_OBJ_NOTNULL(fbo, FELLOW_BUSY_MAGIC);
// --- set up thread to iterate while we create the busy obj
VSHA256_CTX thr_sha256;
VSHA256_Init(&thr_sha256);
struct test_iter_wait_s tiw[1];
init_test_iter_wait(tiw);
tiw->priv = &thr_sha256;
tiw->func = iter_sha256;
struct test_iter_thread_s iter_thr[1];
INIT_OBJ(iter_thr, TEST_ITER_THREAD_MAGIC);
iter_thr->fc = fc;
iter_thr->fco = fco;
iter_thr->priv = tiw;
iter_thr->func = test_iter_f;
AZ(pthread_create(&iter_thr->thr, NULL,
test_fellow_cache_obj_iter_thread_f, iter_thr));
DBG("concurrent test_iter thread %p", (void *)iter_thr->thr);
u = fco->seglist.fdsl->lsegs + 72 * 2;
XXX_LIMIT_LDSEGS = 1;
while (u-- > 0) {
void *p;
sz = 1234;
AZ(fellow_busy_obj_getspace(fbo, &sz, (uint8_t **)&ptr).status);
memset(ptr, 0, sz);
p = ptr;
(void) snprintf(ptr, sz, "We got %zu bytes at %p", sz, p);
if (u > 0 && sz > 1234)
sz = 1234;
VSHA256_Update(&sha256ctx, ptr, sz);
fellow_busy_obj_extend(fbo, sz);
test_iter_extend(tiw, sz);
if (u & 1)
usleep(100);
}
XXX_LIMIT_LDSEGS = 0;
VSHA256_Final(h1, &sha256ctx);
// test trim of a 0-sized busy fcs
AZ(fellow_busy_obj_getspace(fbo, &sz, (uint8_t **)&ptr).status);
fellow_busy_obj_trimstore(fbo);
tiw->done = 1;
test_iter_extend(tiw, 0);
// if this fails, u is too low
AN(fco->seglist.next);
// fixattr always return a pointer
for (u = OA_VARY; u < OA__MAX; u++)
AZ(fellow_cache_obj_getattr(fc, fco, u, &sz));
for (u = 0; u < OA__MAX; u++)
AN(fellow_busy_setattr(fbo, u, strlen(oatest[u]), oatest[u]));
t_getattr(fc, fco);
test_bocdone(fbo, TRUST_ME(hash), 1);
priv2 = fco->fdb.fdb;
// check thread status
DBG("concurrent test_iter thread join %p", (void *)iter_thr->thr);
AZ(pthread_join(iter_thr->thr, NULL));
VSHA256_Final(h2, &thr_sha256);
AZ(memcmp(h1, h2, sizeof *h1));
DBG("concurrent test_iter hash ok %02x%02x%02x%02x...",
h1[0], h1[1], h1[2], h1[3]);
fini_test_iter_wait(tiw);
/* dumb wait until writes are complete */
while (FCO_REFCNT(fco) > 1)
usleep(1000);
//fellow_cache_obj_deref(fc, fco);
fellow_cache_obj_delete(fc, fco, hash);
}
static void
t_cache(unsigned chksum)
{
......@@ -5620,6 +5877,7 @@ t_cache(unsigned chksum)
const size_t dsksz = 100 * 1024 * 1024;
const size_t objsize_hint = 1 * 1024 * 1024;
struct fellow_cache_res fcr;
char *ptr;
DBGSZ(fellow_disk_seg);
DBGSZ(fellow_disk_seglist);
......@@ -5666,7 +5924,6 @@ t_cache(unsigned chksum)
fbo = fellow_busy_obj_alloc(fc, &fco, &priv2, 1234).r.ptr;
CHECK_OBJ_NOTNULL(fbo, FELLOW_BUSY_MAGIC);
{
char *ptr;
sz = 1234;
AZ(fellow_busy_obj_getspace(fbo, &sz, (uint8_t **)&ptr).status);
}
......@@ -5677,7 +5934,6 @@ t_cache(unsigned chksum)
fbo = fellow_busy_obj_alloc(fc, &fco, &priv2, 1234).r.ptr;
CHECK_OBJ_NOTNULL(fbo, FELLOW_BUSY_MAGIC);
{
char *ptr;
sz = 1234;
AZ(fellow_busy_obj_getspace(fbo, &sz, (uint8_t **)&ptr).status);
}
......@@ -5689,8 +5945,6 @@ t_cache(unsigned chksum)
fbo = fellow_busy_obj_alloc(fc, &fco, &priv2, 1234).r.ptr;
CHECK_OBJ_NOTNULL(fbo, FELLOW_BUSY_MAGIC);
do {
char *ptr;
sz = 1234;
fcr = fellow_busy_obj_getspace(fbo, &sz, (uint8_t **)&ptr);
if (fcr.status == fcr_ok) {
......@@ -5709,8 +5963,6 @@ t_cache(unsigned chksum)
fbo = fellow_busy_obj_alloc(fc, &fco, &priv2, 1234).r.ptr;
CHECK_OBJ_NOTNULL(fbo, FELLOW_BUSY_MAGIC);
do {
char *ptr;
if (fbo->nregion < FCO_MAX_REGIONS - FCO_REGIONS_RESERVE)
FC_INJ_SZLIM_SET(4096);
......@@ -5730,6 +5982,9 @@ t_cache(unsigned chksum)
test_fellow_cache_unbusy_inject(fc);
for (u = 0; u < 100; u++)
t_busyobj(chksum, fc);
// === alloc, then evict
VSHA256_Init(&sha256ctx);
fbo = fellow_busy_obj_alloc(fc, &fco, &priv2, 1234).r.ptr;
......@@ -5737,7 +5992,6 @@ t_cache(unsigned chksum)
u = fco->seglist.fdsl->lsegs + 72 * 2;
while (u-- > 0) {
void *p;
char *ptr;
sz = 1234;
AZ(fellow_busy_obj_getspace(fbo, &sz, (uint8_t **)&ptr).status);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment