fellow_cache: Refactor fco error latching

parent db8b7ee3
...@@ -786,16 +786,6 @@ struct fellow_cache_obj { ...@@ -786,16 +786,6 @@ struct fellow_cache_obj {
#define FCR_ALLOCFAIL(str) FCR_ALLOCERR(FC_ERRSTR(str)) #define FCR_ALLOCFAIL(str) FCR_ALLOCERR(FC_ERRSTR(str))
#define FCR_IOFAIL(str) FCR_IOERR(FC_ERRSTR(str)) #define FCR_IOFAIL(str) FCR_IOERR(FC_ERRSTR(str))
static inline void
fco_latch_err(struct fellow_cache_obj *fco, struct fellow_cache_res fcr)
{
assert(fcr.status != fcr_ok);
if (fco->fcr.status != fcr_ok)
return;
fco->fcr = fcr;
}
const char * const fellow_cache_res_s[FCR_LIM] = { const char * const fellow_cache_res_s[FCR_LIM] = {
[fcr_ok] = "ok", [fcr_ok] = "ok",
[fcr_allocerr] = "allocation", [fcr_allocerr] = "allocation",
...@@ -1016,6 +1006,7 @@ fellow_cache_res_check(const struct fellow_cache *fc, ...@@ -1016,6 +1006,7 @@ fellow_cache_res_check(const struct fellow_cache *fc,
return; return;
break; break;
case fcr_ok: case fcr_ok:
return;
default: default:
FC_WRONG("fcr.status %d", fcr.status); FC_WRONG("fcr.status %d", fcr.status);
} }
...@@ -1024,6 +1015,31 @@ fellow_cache_res_check(const struct fellow_cache *fc, ...@@ -1024,6 +1015,31 @@ fellow_cache_res_check(const struct fellow_cache *fc,
fcr.status, fcr.r.err); fcr.status, fcr.r.err);
} }
// check with latch
static struct fellow_cache_res
fellow_cache_obj_res(const struct fellow_cache *fc,
struct fellow_cache_obj *fco, struct fellow_cache_res fcr)
{
CHECK_OBJ_NOTNULL(fc, FELLOW_CACHE_MAGIC);
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
fellow_cache_res_check(fc, fcr);
if (fco->fcr.status != fcr_ok)
return (fco->fcr);
/*
* the latched ok value must be the fco, but we do return other values,
* depending on context
*/
assert(fco->fcr.status == fcr_ok);
if (fcr.status != fcr_ok)
fco->fcr = fcr;
else if (fco->fcr.r.ptr == NULL)
fco->fcr.r.ptr = fco;
else
assert(fco->fcr.r.ptr == fco);
return (fcr);
}
/* ============================================================ /* ============================================================
* fwd decl * fwd decl
...@@ -2133,7 +2149,7 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv) ...@@ -2133,7 +2149,7 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv)
struct fellow_lru_chgbatch lcb[1] = struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1); FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
struct fellow_cache_res res = FCR_OK(NULL); struct fellow_cache_res fcr = FCR_OK(fco);
// invalidates fclsp, no use after this point // invalidates fclsp, no use after this point
TAKE(mem, fclsp->mem); TAKE(mem, fclsp->mem);
...@@ -2169,26 +2185,26 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv) ...@@ -2169,26 +2185,26 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv)
next.size, 0); next.size, 0);
fdsl = fdsl_mem.ptr; fdsl = fdsl_mem.ptr;
if (FC_INJ || fdsl == NULL) { if (FC_INJ || fdsl == NULL) {
res = FCR_ALLOCFAIL("disk seglist fdsl"); fcr = FCR_ALLOCFAIL("disk seglist fdsl");
goto err; goto err;
} }
assert(fdsl_mem.size >= next.size); assert(fdsl_mem.size >= next.size);
ssz = fellow_io_pread_sync(fc->ffd, fdsl, next.size, next.off); ssz = fellow_io_pread_sync(fc->ffd, fdsl, next.size, next.off);
if (FC_INJ || ssz < 0 || (size_t)ssz != next.size) { if (FC_INJ || ssz < 0 || (size_t)ssz != next.size) {
res = FCR_IOFAIL("disk seglist read"); fcr = FCR_IOFAIL("disk seglist read");
goto err; goto err;
} }
err = fellow_disk_seglist_check(fdsl); err = fellow_disk_seglist_check(fdsl);
if (err != NULL) { if (err != NULL) {
res = FCR_IOFAIL(err); fcr = FCR_IOFAIL(err);
goto err; goto err;
} }
fcsl_mem = buddy_alloc1_ptr_extent_wait(fc->membuddy, FEP_META, fcsl_mem = buddy_alloc1_ptr_extent_wait(fc->membuddy, FEP_META,
SEGLIST_SIZE(fcsl, fdsl->nsegs), 0); SEGLIST_SIZE(fcsl, fdsl->nsegs), 0);
if (FC_INJ || fcsl_mem.ptr == NULL) { if (FC_INJ || fcsl_mem.ptr == NULL) {
res = FCR_ALLOCFAIL("disk seglist fcsl"); fcr = FCR_ALLOCFAIL("disk seglist fcsl");
goto err; goto err;
} }
...@@ -2237,7 +2253,7 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv) ...@@ -2237,7 +2253,7 @@ fellow_cache_seglists_load(struct worker *wrk, void *priv)
fellow_cache_seg_transition_locked(lcb, FCO_FCS(fco), fellow_cache_seg_transition_locked(lcb, FCO_FCS(fco),
FCO_STATE(fco), FCO_READFAIL); FCO_STATE(fco), FCO_READFAIL);
} }
fco_latch_err(fco, res); (void) fellow_cache_obj_res(fc, fco, fcr);
refcnt = fellow_cache_obj_deref_locked(lcb, fc, fco); refcnt = fellow_cache_obj_deref_locked(lcb, fc, fco);
AZ(pthread_cond_broadcast(&fco->cond)); AZ(pthread_cond_broadcast(&fco->cond));
fellow_cache_lru_chgbatch_apply(lcb); fellow_cache_lru_chgbatch_apply(lcb);
...@@ -2348,11 +2364,10 @@ fellow_cache_obj_new( ...@@ -2348,11 +2364,10 @@ fellow_cache_obj_new(
{ {
struct fellow_disk_obj *fdo; struct fellow_disk_obj *fdo;
struct fellow_cache_obj *fco; struct fellow_cache_obj *fco;
struct fellow_cache_res fcr;
struct fellow_cache_seg *fcs; struct fellow_cache_seg *fcs;
struct buddy_reqs *reqs; struct buddy_reqs *reqs;
struct buddy_ptr_extent fco_mem = buddy_ptr_extent_nil; struct buddy_ptr_extent fco_mem;
struct buddy_ptr_extent fdo_mem = buddy_ptr_extent_nil; struct buddy_ptr_extent fdo_mem;
size_t mem_sz; size_t mem_sz;
unsigned u; unsigned u;
...@@ -2381,8 +2396,7 @@ fellow_cache_obj_new( ...@@ -2381,8 +2396,7 @@ fellow_cache_obj_new(
u = buddy_alloc_wait(reqs); u = buddy_alloc_wait(reqs);
if (FC_INJ || u != 2 + (dowry ? 1 : 0) + (fbo_mem ? 1 : 0)) { if (FC_INJ || u != 2 + (dowry ? 1 : 0) + (fbo_mem ? 1 : 0)) {
buddy_alloc_wait_done(reqs); buddy_alloc_wait_done(reqs);
fcr = FCR_ALLOCFAIL("fellow_cache_obj_new alloc failed"); return (FCR_ALLOCFAIL("fellow_cache_obj_new alloc failed"));
goto err;
} }
if (dowry != NULL) { if (dowry != NULL) {
...@@ -2420,14 +2434,7 @@ fellow_cache_obj_new( ...@@ -2420,14 +2434,7 @@ fellow_cache_obj_new(
fcs->alloc = fdo_mem; fcs->alloc = fdo_mem;
fcs->u.fco_fdo = fdo; fcs->u.fco_fdo = fdo;
return (fellow_cache_obj_res(fc, fco, FCR_OK(fco)));
return (FCR_OK(fco));
err:
AZ(fco_mem.ptr);
AZ(fdo_mem.ptr);
fellow_cache_res_check(fc, fcr);
return (fcr);
} }
static void static void
...@@ -4632,10 +4639,12 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco, ...@@ -4632,10 +4639,12 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
// stack usage // stack usage
assert(readahead <= 31); assert(readahead <= 31);
if (fco->fcr.status != fcr_ok)
return (fco->fcr);
// fcr_ok is also returned if func() != 0 // fcr_ok is also returned if func() != 0
fcr = fco->fcr; memset(&fcr, 0, sizeof fcr);
if (fcr.status != fcr_ok) fcr.status = fcr_ok;
return (fcr);
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC); CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
...@@ -4784,13 +4793,10 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco, ...@@ -4784,13 +4793,10 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
} }
// if ok, wait for seglist loading to complete to see errors // if ok, wait for seglist loading to complete to see errors
if (fcr.status != fcr_ok) if (fcr.status == fcr_ok)
fco_latch_err(fco, fcr);
else
fellow_cache_seglists_wait(fco); fellow_cache_seglists_wait(fco);
// to properly return func() return value with fcr_ok return (fellow_cache_obj_res(fc, fco, fcr));
return (fco->fcr.status == fcr_ok ? fcr : fco->fcr);
} }
/* Auxiliary attributes /* Auxiliary attributes
...@@ -5749,7 +5755,8 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit) ...@@ -5749,7 +5755,8 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
break; break;
case FCO_READFAIL: case FCO_READFAIL:
fellow_cache_obj_deref(fc, fco); fellow_cache_obj_deref(fc, fco);
fco_latch_err(fco, FCR_IOFAIL("hit READFAIL object")); (void) fellow_cache_obj_res(fc, fco,
FCR_IOFAIL("hit READFAIL object"));
break; break;
case FCO_EVICT: case FCO_EVICT:
// race, retry // race, retry
...@@ -5764,6 +5771,7 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit) ...@@ -5764,6 +5771,7 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
TAKEZN(fco, nfco); TAKEZN(fco, nfco);
assert(fco->fdb.fdb == fdba.fdb); assert(fco->fdb.fdb == fdba.fdb);
fcr = FCR_OK(fco);
/* not using fellow_cache_seg_read_sync() /* not using fellow_cache_seg_read_sync()
* because our disk size is smaller than mem size in fcs * because our disk size is smaller than mem size in fcs
...@@ -5884,8 +5892,7 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit) ...@@ -5884,8 +5892,7 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
if (dowry.bits) if (dowry.bits)
AN(buddy_return_ptr_page(rets, &dowry)); AN(buddy_return_ptr_page(rets, &dowry));
buddy_return(rets); buddy_return(rets);
fco->fcr = FCR_OK(fco); return (fellow_cache_obj_res(fc, fco, FCR_OK(fco)));
return (fco->fcr);
err: err:
if (ocp) { if (ocp) {
AZ(*ocp); AZ(*ocp);
...@@ -5901,14 +5908,12 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit) ...@@ -5901,14 +5908,12 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
if (fcsl_mem.ptr) if (fcsl_mem.ptr)
AN(buddy_return_ptr_extent(rets, &fcsl_mem)); AN(buddy_return_ptr_extent(rets, &fcsl_mem));
buddy_return(rets);
if (strstr(err, "alloc")) if (strstr(err, "alloc"))
fcr = FCR_ALLOCERR(err); fcr = FCR_ALLOCERR(err);
else else
fcr = FCR_IOERR(err); fcr = FCR_IOERR(err);
fellow_cache_res_check(fc, fcr); return (fellow_cache_obj_res(fc, fco, fcr));
fco_latch_err(fco, fcr);
buddy_return(rets);
return (fco->fcr);
} }
static void static void
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment