fellow_cache: Refactor fco error latching

parent a2192e8d
......@@ -789,16 +789,6 @@ struct fellow_cache_obj {
#define FCR_ALLOCFAIL(str) FCR_ALLOCERR(FC_ERRSTR(str))
#define FCR_IOFAIL(str) FCR_IOERR(FC_ERRSTR(str))
static inline void
fco_latch_err(struct fellow_cache_obj *fco, struct fellow_cache_res fcr)
{
assert(fcr.status != fcr_ok);
if (fco->fcr.status != fcr_ok)
return;
fco->fcr = fcr;
}
const char * const fellow_cache_res_s[FCR_LIM] = {
[fcr_ok] = "ok",
[fcr_allocerr] = "allocation",
......@@ -1019,6 +1009,7 @@ fellow_cache_res_check(const struct fellow_cache *fc,
return;
break;
case fcr_ok:
return;
default:
FC_WRONG("fcr.status %d", fcr.status);
}
......@@ -1027,6 +1018,31 @@ fellow_cache_res_check(const struct fellow_cache *fc,
fcr.status, fcr.r.err);
}
// check with latch
static struct fellow_cache_res
fellow_cache_obj_res(const struct fellow_cache *fc,
struct fellow_cache_obj *fco, struct fellow_cache_res fcr)
{
CHECK_OBJ_NOTNULL(fc, FELLOW_CACHE_MAGIC);
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
fellow_cache_res_check(fc, fcr);
if (fco->fcr.status != fcr_ok)
return (fco->fcr);
/*
* the latched ok value must be the fco, but we do return other values,
* depending on context
*/
assert(fco->fcr.status == fcr_ok);
if (fcr.status != fcr_ok)
fco->fcr = fcr;
else if (fco->fcr.r.ptr == NULL)
fco->fcr.r.ptr = fco;
else
assert(fco->fcr.r.ptr == fco);
return (fcr);
}
/* ============================================================
* fwd decl
......@@ -2154,7 +2170,7 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv)
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
struct fellow_cache_res res = FCR_OK(NULL);
struct fellow_cache_res fcr = FCR_OK(fco);
// invalidates fclsp, no use after this point
TAKE(mem, fclsp->mem);
......@@ -2190,26 +2206,26 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv)
next.size, 0);
fdsl = fdsl_mem.ptr;
if (FC_INJ || fdsl == NULL) {
res = FCR_ALLOCFAIL("disk seglist fdsl");
fcr = FCR_ALLOCFAIL("disk seglist fdsl");
goto err;
}
assert(fdsl_mem.size >= next.size);
ssz = fellow_io_pread_sync(fc->ffd, fdsl, next.size, next.off);
if (FC_INJ || ssz < 0 || (size_t)ssz != next.size) {
res = FCR_IOFAIL("disk seglist read");
fcr = FCR_IOFAIL("disk seglist read");
goto err;
}
err = fellow_disk_seglist_check(fdsl);
if (err != NULL) {
res = FCR_IOFAIL(err);
fcr = FCR_IOFAIL(err);
goto err;
}
fcsl_mem = buddy_alloc1_ptr_extent_wait(fc->membuddy, FEP_META,
SEGLIST_SIZE(fcsl, fdsl->nsegs), 0);
if (FC_INJ || fcsl_mem.ptr == NULL) {
res = FCR_ALLOCFAIL("disk seglist fcsl");
fcr = FCR_ALLOCFAIL("disk seglist fcsl");
goto err;
}
......@@ -2258,7 +2274,7 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv)
fellow_cache_seg_transition_locked(lcb, FCO_FCS(fco),
FCO_STATE(fco), FCO_READFAIL);
}
fco_latch_err(fco, res);
(void) fellow_cache_obj_res(fc, fco, fcr);
refcnt = fellow_cache_obj_deref_locked(lcb, fc, fco);
AZ(pthread_cond_broadcast(&fco->cond));
fellow_cache_lru_chgbatch_apply(lcb);
......@@ -2369,11 +2385,10 @@ fellow_cache_obj_new(
{
struct fellow_disk_obj *fdo;
struct fellow_cache_obj *fco;
struct fellow_cache_res fcr;
struct fellow_cache_seg *fcs;
struct buddy_reqs *reqs;
struct buddy_ptr_extent fco_mem = buddy_ptr_extent_nil;
struct buddy_ptr_extent fdo_mem = buddy_ptr_extent_nil;
struct buddy_ptr_extent fco_mem;
struct buddy_ptr_extent fdo_mem;
size_t mem_sz;
unsigned u;
......@@ -2402,8 +2417,7 @@ fellow_cache_obj_new(
u = buddy_alloc_wait(reqs);
if (FC_INJ || u != 2 + (dowry ? 1 : 0) + (fbo_mem ? 1 : 0)) {
buddy_alloc_wait_done(reqs);
fcr = FCR_ALLOCFAIL("fellow_cache_obj_new alloc failed");
goto err;
return (FCR_ALLOCFAIL("fellow_cache_obj_new alloc failed"));
}
if (dowry != NULL) {
......@@ -2441,14 +2455,7 @@ fellow_cache_obj_new(
fcs->alloc = fdo_mem;
fcs->u.fco_fdo = fdo;
return (FCR_OK(fco));
err:
AZ(fco_mem.ptr);
AZ(fdo_mem.ptr);
fellow_cache_res_check(fc, fcr);
return (fcr);
return (fellow_cache_obj_res(fc, fco, FCR_OK(fco)));
}
static void
......@@ -4654,10 +4661,12 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
// stack usage
assert(readahead <= 31);
if (fco->fcr.status != fcr_ok)
return (fco->fcr);
// fcr_ok is also returned if func() != 0
fcr = fco->fcr;
if (fcr.status != fcr_ok)
return (fcr);
memset(&fcr, 0, sizeof fcr);
fcr.status = fcr_ok;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
......@@ -4806,13 +4815,10 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
}
// if ok, wait for seglist loading to complete to see errors
if (fcr.status != fcr_ok)
fco_latch_err(fco, fcr);
else
if (fcr.status == fcr_ok)
fellow_cache_seglists_wait(fco);
// to properly return func() return value with fcr_ok
return (fco->fcr.status == fcr_ok ? fcr : fco->fcr);
return (fellow_cache_obj_res(fc, fco, fcr));
}
/* Auxiliary attributes
......@@ -5771,7 +5777,8 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
break;
case FCO_READFAIL:
fellow_cache_obj_deref(fc, fco);
fco_latch_err(fco, FCR_IOFAIL("hit READFAIL object"));
(void) fellow_cache_obj_res(fc, fco,
FCR_IOFAIL("hit READFAIL object"));
break;
case FCO_EVICT:
// race, retry
......@@ -5786,6 +5793,7 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
TAKEZN(fco, nfco);
assert(fco->fdb.fdb == fdba.fdb);
fcr = FCR_OK(fco);
/* not using fellow_cache_seg_read_sync()
* because our disk size is smaller than mem size in fcs
......@@ -5907,8 +5915,7 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
if (dowry.bits)
AN(buddy_return_ptr_page(rets, &dowry));
buddy_return(rets);
fco->fcr = FCR_OK(fco);
return (fco->fcr);
return (fellow_cache_obj_res(fc, fco, FCR_OK(fco)));
err:
if (ocp) {
AZ(*ocp);
......@@ -5924,14 +5931,12 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
if (fcsl_mem.ptr)
AN(buddy_return_ptr_extent(rets, &fcsl_mem));
buddy_return(rets);
if (strstr(err, "alloc"))
fcr = FCR_ALLOCERR(err);
else
fcr = FCR_IOERR(err);
fellow_cache_res_check(fc, fcr);
fco_latch_err(fco, fcr);
buddy_return(rets);
return (fco->fcr);
return (fellow_cache_obj_res(fc, fco, fcr));
}
static void
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment