Batch LRU changes

the lru_mtx is our most contended mtx.

As a first improvement, batch changes to LRU for multiple segments
and maintain the effective change locally outside the lru mtx (but
while holding the obj mtx).
parent e73e97da
......@@ -179,13 +179,32 @@ assert_fcos_transition(enum fcos_state f, enum fcos_state t)
fcos_state_s[f], fcos_state_s[t]);
}
/* batch of changes to be applied on fcses */
struct fellow_lru_chgbatch {
unsigned magic;
#define FELLOW_LRU_CHGBATCH_MAGIC 0xaab452d9
uint16_t l, n;
struct fellow_cache *fc;
struct fellow_cache_lru_head add;
struct fellow_cache_seg **fcs;
};
#define FELLOW_LRU_CHGBATCH_INIT(name, fca, size) {{ \
.magic = FELLOW_LRU_CHGBATCH_MAGIC, \
.l = (size), \
.n = 0, \
.fc = (fca), \
.add = VTAILQ_HEAD_INITIALIZER((name)->add), \
.fcs = (struct fellow_cache_seg*[size + 1]){0} \
}}
static inline void
fellow_cache_seg_transition_locked_notincore(struct fellow_cache_seg *fcs,
enum fcos_state to);
static inline void
fellow_cache_seg_transition_locked(
struct fellow_cache *fc, struct fellow_cache_seg *fcs,
struct fellow_lru_chgbatch *lcb, struct fellow_cache_seg *fcs,
enum fcos_state from, enum fcos_state to);
struct fellow_disk_seg {
......@@ -238,6 +257,8 @@ struct fellow_cache_seg {
enum fcos_state state;
unsigned fcs_onlru:1;
unsigned fco_infdb:1;
unsigned lcb_add:1;
unsigned lcb_remove:1;
/*
* for FCO, protected by fdb_mtx
......@@ -668,6 +689,7 @@ fellow_cache_res_check(const struct fellow_cache *fc,
/* ============================================================
* fwd decl
*/
static void
fellow_cache_obj_fini(const struct fellow_cache_obj *fco);
static void
......@@ -675,16 +697,14 @@ fellow_cache_obj_redundant(struct fellow_cache *fc,
struct fellow_cache_obj **fcop);
static inline unsigned
fellow_cache_seg_ref_locked(struct fellow_cache *fc,
fellow_cache_seg_ref_locked(struct fellow_lru_chgbatch *lcb,
struct fellow_cache_seg *fcs);
static inline unsigned
fellow_cache_seg_deref_locked(struct fellow_cache *fc,
fellow_cache_seg_deref_locked(struct fellow_lru_chgbatch *lcb,
struct fellow_cache_seg *fcs);
static unsigned
fellow_cache_obj_deref_locked(struct fellow_cache *fc,
struct fellow_cache_obj *fco);
static const char *
fellow_cache_seg_check(struct fellow_cache *fc, struct fellow_cache_seg *fcs);
fellow_cache_obj_deref_locked(struct fellow_lru_chgbatch *lcb,
struct fellow_cache *fc, struct fellow_cache_obj *fco);
static void
fellow_cache_async_write_complete(struct fellow_cache *fc,
void *fbio, int32_t result);
......@@ -903,34 +923,120 @@ fellow_cache_shouldlru(enum fcos_state state, const struct objcore *oc,
return (refcnt == 0);
}
static void
fellow_cache_lru_chgbatch_apply(struct fellow_lru_chgbatch *lcb)
{
struct fellow_cache_seg *fcs;
struct fellow_cache *fc;
unsigned n;
CHECK_OBJ_NOTNULL(lcb, FELLOW_LRU_CHGBATCH_MAGIC);
//DBG("%u/%u", lcb->n, !VTAILQ_EMPTY(&lcb->add));
if (lcb->n == 0 && VTAILQ_EMPTY(&lcb->add))
return;
fc = lcb->fc;
CHECK_OBJ_NOTNULL(fc, FELLOW_CACHE_MAGIC);
n = lcb->n;
while (n--) {
fcs = lcb->fcs[n];
AZ(fcs->lcb_add);
AN(fcs->lcb_remove);
fcs->lcb_remove = 0;
}
VTAILQ_FOREACH(fcs, &lcb->add, lru_list) {
AZ(fcs->lcb_remove);
AN(fcs->lcb_add);
fcs->lcb_add = 0;
}
AZ(pthread_mutex_lock(&fc->lru_mtx));
while (lcb->n) {
lcb->n--;
TAKE_OBJ_NOTNULL(fcs, &lcb->fcs[lcb->n],
FELLOW_CACHE_SEG_MAGIC);
VTAILQ_REMOVE(&fc->lru_head, fcs, lru_list);
}
VTAILQ_CONCAT(&fc->lru_head, &lcb->add, lru_list);
AZ(pthread_mutex_unlock(&fc->lru_mtx));
AZ(lcb->n);
}
/* chg is fellow_cache_shouldlru(new) - fellow_cache_shouldlru(old)
*
* iow: 0 -> noop, 1 -> add, -1 remove
*
* to be called after the change
*
* the lcb can be null if the caller knows that always chg == 0
*/
static inline void
fellow_cache_lru_chg(struct fellow_cache *fc, struct fellow_cache_seg *fcs,
int chg)
fellow_cache_lru_chg(struct fellow_lru_chgbatch *lcb,
struct fellow_cache_seg *fcs, int chg)
{
uint16_t i;
if (chg == 0)
return;
int add = chg > 0;
CHECK_OBJ_NOTNULL(lcb, FELLOW_LRU_CHGBATCH_MAGIC);
AN(lcb->fcs);
unsigned add = chg > 0;
#ifdef EXTRA_ASSERTIONS
assert(add ==
fellow_cache_shouldlru(fcs->state, fcs->fco->oc, fcs->refcnt));
#endif
assert(fcs->fcs_onlru == (unsigned)!add);
fcs->fcs_onlru = (unsigned)add;
if (add) {
AZ(pthread_mutex_lock(&fc->lru_mtx));
VTAILQ_INSERT_TAIL(&fc->lru_head, fcs, lru_list);
AZ(pthread_mutex_unlock(&fc->lru_mtx));
} else {
AZ(pthread_mutex_lock(&fc->lru_mtx));
VTAILQ_REMOVE(&fc->lru_head, fcs, lru_list);
AZ(pthread_mutex_unlock(&fc->lru_mtx));
fcs->fcs_onlru = add;
if (add && fcs->lcb_remove) {
AZ(fcs->lcb_add);
//DBG("%p -rem", fcs);
// remove the remove
AN(lcb->n);
for (i = 0; i < lcb->n; i++) {
if (lcb->fcs[i] != fcs)
continue;
lcb->fcs[i] = NULL;
break;
}
assert(i < lcb->n);
if (i + 1 < lcb->n) {
memmove(&lcb->fcs[i], &lcb->fcs[i + 1],
sizeof lcb->fcs[0] * (lcb->n - (i + 1)));
}
lcb->n--;
fcs->lcb_remove = 0;
}
else if (add) {
//DBG("%p +add", fcs);
AZ(fcs->lcb_add);
AZ(fcs->lcb_remove);
fcs->lcb_add = 1;
VTAILQ_INSERT_TAIL(&lcb->add, fcs, lru_list);
}
else if (fcs->lcb_add) {
//DBG("%p -add", fcs);
AZ(fcs->lcb_remove);
VTAILQ_REMOVE(&lcb->add, fcs, lru_list);
fcs->lcb_add = 0;
}
else {
//DBG("%p +rem", fcs);
AZ(fcs->lcb_remove);
AZ(fcs->lcb_add);
if (lcb->n == lcb->l) {
fellow_cache_lru_chgbatch_apply(lcb);
AZ(lcb->n);
}
fcs->lcb_remove = 1;
lcb->fcs[lcb->n++] = fcs;
}
}
......@@ -1015,7 +1121,7 @@ fellow_cache_seg_free(struct fellow_cache *fc,
DBG("%p deref %u", fcs, deref);
while (fcs->state == FCS_READING || fcs->state == FCS_WRITING) {
(void) fellow_cache_seg_ref_locked(fc, fcs);
(void) fellow_cache_seg_ref_locked(NULL, fcs);
fellow_cache_seg_wait_locked(fcs);
deref++;
}
......@@ -1031,20 +1137,20 @@ fellow_cache_seg_free(struct fellow_cache *fc,
fcs->state == FCS_CHECK ||
fcs->state == FCS_READFAIL);
assert_cache_seg_consistency(fcs);
assert(fcs->refcnt == deref);
if (fcs->refcnt) {
fcs->refcnt = 1;
AZ(fellow_cache_seg_deref_locked(fc, fcs));
}
if (fcs->alloc.ptr) {
buddy_return1_ptr_extent(fc->membuddy, &fcs->alloc);
fcs->len = 0;
}
if (fcs->state == FCS_INCORE) {
fellow_cache_seg_transition_locked(fc, fcs,
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
fellow_cache_seg_transition_locked(lcb, fcs,
FCS_INCORE, FCS_DISK);
assert_cache_seg_consistency(fcs);
fellow_cache_lru_chgbatch_apply(lcb);
}
if (fcs->alloc.ptr)
buddy_return1_ptr_extent(fc->membuddy, &fcs->alloc);
AZ(fcs->fcs_onlru);
assert(fcs->refcnt == deref);
fcs->refcnt = 0;
fcs->len = 0;
/*
* at this point, the fcs is not consistent in all cases, e.g. FCS_EVICT
* has no memory - but this is the point where it does no longer exist
......@@ -1211,6 +1317,8 @@ fellow_obj_regions(const struct fellow_fd *ffd,
static void
fellow_cache_obj_free(struct fellow_cache *fc, struct fellow_cache_obj **fcop)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
struct fellow_cache_obj *fco;
struct fellow_cache_seg *fcs;
......@@ -1219,8 +1327,9 @@ fellow_cache_obj_free(struct fellow_cache *fc, struct fellow_cache_obj **fcop)
fcs = FCO_FCS(fco);
AZ(FCO_REFCNT(fco));
assert_cache_seg_consistency(fcs);
fellow_cache_seg_transition_locked(fc, FCO_FCS(fco),
fellow_cache_seg_transition_locked(lcb, FCO_FCS(fco),
FCO_STATE(fco), FCO_EVICT);
fellow_cache_lru_chgbatch_apply(lcb);
DBG("fco %p", fco);
fellow_cache_seglist_free(fc, &fco->seglist);
......@@ -2157,7 +2266,7 @@ fellow_cache_seg_transition_locked_notincore(struct fellow_cache_seg *fcs,
static inline void
fellow_cache_seg_transition_locked(
struct fellow_cache *fc, struct fellow_cache_seg *fcs,
struct fellow_lru_chgbatch *lcb, struct fellow_cache_seg *fcs,
enum fcos_state from, enum fcos_state to)
{
int o, n;
......@@ -2169,7 +2278,7 @@ fellow_cache_seg_transition_locked(
assert(fcs->state == from);
fcs->state = to;
fellow_cache_lru_chg(fc, fcs, n - o);
fellow_cache_lru_chg(lcb, fcs, n - o);
}
static void
......@@ -2178,6 +2287,8 @@ fellow_cache_seg_transition(
struct fellow_cache_seg *fcs,
enum fcos_state from, enum fcos_state to)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
struct fellow_cache_obj *fco;
CHECK_OBJ_NOTNULL(fcs, FELLOW_CACHE_SEG_MAGIC);
......@@ -2188,7 +2299,8 @@ fellow_cache_seg_transition(
assert_fcos_transition(from, to);
AZ(pthread_mutex_lock(&fco->mtx));
fellow_cache_seg_transition_locked(fc, fcs, from, to);
fellow_cache_seg_transition_locked(lcb, fcs, from, to);
fellow_cache_lru_chgbatch_apply(lcb);
assert_cache_seg_consistency(fcs);
if (fcs->refcnt)
AZ(pthread_cond_broadcast(&fco->cond));
......@@ -2197,7 +2309,7 @@ fellow_cache_seg_transition(
/* return old refcnt */
static inline unsigned
fellow_cache_seg_ref_locked(struct fellow_cache *fc,
fellow_cache_seg_ref_locked(struct fellow_lru_chgbatch *lcb,
struct fellow_cache_seg *fcs)
{
unsigned refcnt;
......@@ -2208,13 +2320,13 @@ fellow_cache_seg_ref_locked(struct fellow_cache *fc,
o = fellow_cache_shouldlru(fcs->state, fcs->fco->oc, refcnt);
n = fellow_cache_shouldlru(fcs->state, fcs->fco->oc, fcs->refcnt);
fellow_cache_lru_chg(fc, fcs, n - o);
fellow_cache_lru_chg(lcb, fcs, n - o);
return (refcnt);
}
/* return new refcnt */
static inline unsigned
fellow_cache_seg_deref_locked(struct fellow_cache *fc,
fellow_cache_seg_deref_locked(struct fellow_lru_chgbatch *lcb,
struct fellow_cache_seg *fcs)
{
unsigned refcnt;
......@@ -2226,7 +2338,7 @@ fellow_cache_seg_deref_locked(struct fellow_cache *fc,
o = fellow_cache_shouldlru(fcs->state, fcs->fco->oc, refcnt + 1);
n = fellow_cache_shouldlru(fcs->state, fcs->fco->oc, refcnt);
fellow_cache_lru_chg(fc, fcs, n - o);
fellow_cache_lru_chg(lcb, fcs, n - o);
return (refcnt);
}
......@@ -2246,10 +2358,14 @@ fellow_cache_seg_unbusy(struct fellow_busy *fbo, struct fellow_cache_seg *fcs){
AN(fcs->len);
assert_fcos_transition(from, to);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fbo->fc, 1);
AZ(pthread_mutex_lock(&fco->mtx));
fbo->io_outstanding++;
fbio = fellow_busy_io_get(fbo, fbiostk);
fellow_cache_seg_transition_locked(fbo->fc, fcs, from, to);
fellow_cache_seg_transition_locked(lcb, fcs, from, to);
fellow_cache_lru_chgbatch_apply(lcb);
assert_cache_seg_consistency(fcs);
#ifdef WAIT_FOR_FCS_WRITING
// we should never have a wait for FCS_WRITING
......@@ -2284,6 +2400,9 @@ fellow_cache_obj_unbusy(struct fellow_busy *fbo, enum fcol_state wantlog)
fco = fbo->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
fcs = FCO_FCS(fco);
assert_fcos_transition(from, to);
......@@ -2307,7 +2426,8 @@ fellow_cache_obj_unbusy(struct fellow_busy *fbo, enum fcol_state wantlog)
* fbo->io_outstanding++;
*/
fbio = fellow_busy_io_get(fbo, fbiostk);
fellow_cache_seg_transition_locked(fc, fcs, from, to);
fellow_cache_seg_transition_locked(lcb, fcs, from, to);
fellow_cache_lru_chgbatch_apply(lcb);
assert_cache_seg_consistency(fcs);
#ifdef WAIT_FOR_FCS_WRITING
// we should never have a wait for FCS_WRITING
......@@ -2335,6 +2455,8 @@ fellow_cache_obj_unbusy(struct fellow_busy *fbo, enum fcol_state wantlog)
static void
fellow_cache_read_complete(struct fellow_cache *fc, void *ptr, int32_t result)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
enum fcos_state fcos_next = (typeof(fcos_next))FCOS_CHECK;
struct fellow_cache_seg *fcs;
struct fellow_cache_obj *fco;
......@@ -2364,11 +2486,12 @@ fellow_cache_read_complete(struct fellow_cache *fc, void *ptr, int32_t result)
//lint --e{456} Two execution paths are being combined...
//lint --e{454} A thread mutex has been locked...
AZ(pthread_mutex_lock(&fco->mtx));
fellow_cache_seg_transition_locked(fc, fcs, fcs->state, fcos_next);
fellow_cache_seg_transition_locked(lcb, fcs, fcs->state, fcos_next);
// io holds a ref on the seg and the fco
if (fellow_cache_seg_deref_locked(fc, fcs))
if (fellow_cache_seg_deref_locked(lcb, fcs))
AZ(pthread_cond_broadcast(&fco->cond));
refcount = fellow_cache_obj_deref_locked(fc, fco);
refcount = fellow_cache_obj_deref_locked(lcb, fc, fco);
fellow_cache_lru_chgbatch_apply(lcb);
if (refcount == 0)
fellow_cache_obj_free(fc, &fco);
else
......@@ -2380,6 +2503,9 @@ static void
fellow_cache_async_write_complete(struct fellow_cache *fc,
void *ptr, int32_t result)
{
// can potentially change the object and a segment
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 2);
struct fellow_busy_io *fbio;
struct fellow_busy *fbo;
struct fellow_cache_seg *fcs = NULL;
......@@ -2460,9 +2586,9 @@ fellow_cache_async_write_complete(struct fellow_cache *fc,
if (FCOS_HIGH(fcs->state) == FCS_HIGH) {
/* transition a segment */
assert(fcos_next != FCOS_INVAL);
fellow_cache_seg_transition_locked(fc, fcs,
fellow_cache_seg_transition_locked(lcb, fcs,
fcs->state, fcos_next);
if (fellow_cache_seg_deref_locked(fc, fcs))
if (fellow_cache_seg_deref_locked(lcb, fcs))
AZ(pthread_cond_broadcast(&fco->cond));
}
}
......@@ -2501,9 +2627,10 @@ fellow_cache_async_write_complete(struct fellow_cache *fc,
fcos_next = FCO_INCORE;
/* transition the FCO */
fellow_cache_seg_transition_locked(fc, fcs,
fellow_cache_seg_transition_locked(lcb, fcs,
fcs->state, fcos_next);
refcount = fellow_cache_obj_deref_locked(fc, fco);
refcount = fellow_cache_obj_deref_locked(lcb, fc, fco);
fellow_cache_lru_chgbatch_apply(lcb);
if (refcount == 0) {
fellow_cache_obj_free(fc, &fco);
}
......@@ -2512,6 +2639,7 @@ fellow_cache_async_write_complete(struct fellow_cache *fc,
AZ(pthread_mutex_unlock(&fco->mtx));
}
} else {
fellow_cache_lru_chgbatch_apply(lcb);
AZ(pthread_mutex_unlock(&fco->mtx));
}
......@@ -2672,6 +2800,8 @@ static inline void
fellow_cache_seg_evict_locked(
struct fellow_cache_seg *fcs, struct buddy_ptr_extent *alloc)
{
AZ(fcs->lcb_add);
AZ(fcs->lcb_remove);
AZ(fcs->refcnt);
AZ(fcs->fcs_onlru);
assert(fcs->state == FCS_INCORE);
......@@ -2701,6 +2831,7 @@ fellow_cache_obj_slim(struct fellow_cache *fc, struct fellow_cache_obj *fco)
struct buddy_ptr_extent alloc;
struct fellow_cache_seg *fcs;
struct fcscursor c;
unsigned ref;
fcsc_init(&c, &fco->seglist);
......@@ -2719,19 +2850,37 @@ fellow_cache_obj_slim(struct fellow_cache *fc, struct fellow_cache_obj *fco)
while ((fcs = fcsc_next(&c)) != NULL) {
if (fcs->alloc.ptr == NULL)
break;
ref = 0;
while (fcs->state == FCS_READING || fcs->state == FCS_WRITING) {
AZ(pthread_mutex_unlock(&fc->lru_mtx));
(void) fellow_cache_seg_ref_locked(fc, fcs);
// NULL: can not be on lru
(void) fellow_cache_seg_ref_locked(NULL, fcs);
ref = 1;
fellow_cache_seg_wait_locked(fcs);
AZ(fellow_cache_seg_deref_locked(fc, fcs));
AZ(pthread_mutex_lock(&fc->lru_mtx));
}
if (fcs->refcnt || fcs->state != FCS_INCORE)
if (fcs->state == FCS_INCORE && ref == 1 && fcs->refcnt == ref) {
/* we hold the only ref, can not be on LRU,
* so no need to go through full deref
*/
fcs->refcnt = 0;
fellow_cache_seg_evict_locked(fcs, &alloc);
AN(buddy_return_ptr_extent(rets, &alloc));
continue;
}
if (fcs->state == FCS_INCORE && fcs->refcnt == 0) {
AZ(ref);
// remove from LRU
fellow_cache_lru_seg_evict_locked(fcs, &alloc, fc);
AN(buddy_return_ptr_extent(rets, &alloc));
continue;
fellow_cache_lru_seg_evict_locked(fcs, &alloc, fc);
AN(buddy_return_ptr_extent(rets, &alloc));
}
AZ(fcs->fcs_onlru);
if (ref)
(void) fellow_cache_seg_deref_locked(NULL, fcs);
}
AZ(pthread_mutex_unlock(&fc->lru_mtx));
......@@ -3039,6 +3188,9 @@ static void
fellow_cache_seg_deref(struct fellow_cache *fc,
struct fellow_cache_seg * const *segs, unsigned n)
{
#define DEREF_BATCH 64
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, DEREF_BATCH);
struct fellow_cache_seg *fcs;
struct fellow_cache_obj *fco;
......@@ -3051,13 +3203,17 @@ fellow_cache_seg_deref(struct fellow_cache *fc,
while (n--) {
fcs = *segs++;
CHECK_OBJ_NOTNULL(fcs, FELLOW_CACHE_SEG_MAGIC);
(void) fellow_cache_seg_deref_locked(fc, fcs);
(void) fellow_cache_seg_deref_locked(lcb, fcs);
if (lcb->n == DEREF_BATCH)
fellow_cache_lru_chgbatch_apply(lcb);
}
fellow_cache_lru_chgbatch_apply(lcb);
AZ(pthread_mutex_unlock(&fco->mtx));
#undef DEREF_BATCH
}
static const char *
fellow_cache_seg_check(struct fellow_cache *fc, struct fellow_cache_seg *fcs)
fellow_cache_seg_check(struct fellow_cache_seg *fcs)
{
const struct fellow_disk_seg *fds;
struct fellow_cache_obj *fco;
......@@ -3089,9 +3245,10 @@ fellow_cache_seg_check(struct fellow_cache *fc, struct fellow_cache_seg *fcs)
fco = fcs->fco;
AZ(pthread_mutex_lock(&fco->mtx));
AN(fcs->refcnt);
if (fcs->state == FCS_CHECK) {
// re-check, could have raced
fellow_cache_seg_transition_locked(fc, fcs,
fellow_cache_seg_transition_locked(NULL, fcs,
FCS_CHECK, to);
AZ(pthread_cond_broadcast(&fco->cond));
}
......@@ -3113,6 +3270,9 @@ static void
fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
struct fellow_cache_seg * const *segs, const unsigned n)
{
#define REF_BATCH 64
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, REF_BATCH);
struct fellow_cache_seg *fcs, *iosegs[n], *racesegs[n];
struct fellow_cache_obj *fco;
unsigned u, ion = 0, racen = 0;
......@@ -3188,11 +3348,15 @@ fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
for (u = 0; u < n; u++) {
fcs = segs[u];
(void) fellow_cache_seg_ref_locked(fc, fcs);
(void) fellow_cache_seg_ref_locked(lcb, fcs);
if (lcb->n == REF_BATCH)
fellow_cache_lru_chgbatch_apply(lcb);
while (type == FCIO_SYNC &&
(fcs->state == FCS_BUSY || fcs->state == FCS_READING))
(fcs->state == FCS_BUSY || fcs->state == FCS_READING)) {
fellow_cache_lru_chgbatch_apply(lcb);
fellow_cache_seg_wait_locked(fcs);
}
switch (fcs->state) {
case FCS_BUSY:
......@@ -3213,13 +3377,13 @@ fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
* want to malloc under the fco mtx, so we
* restart later
*/
(void) fellow_cache_seg_deref_locked(fc, fcs);
(void) fellow_cache_seg_deref_locked(NULL, fcs);
racesegs[racen++] = fcs;
break;
}
// reference for io
(void) fellow_cache_seg_ref_locked(fc, fcs);
(void) fellow_cache_seg_ref_locked(NULL, fcs);
fcs->alloc = mem[u];
mem[u] = buddy_ptr_extent_nil;
......@@ -3239,6 +3403,7 @@ fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
*/
AN(FCO_REFCNT(fco));
FCO_REFCNT(fco) += ion;
fellow_cache_lru_chgbatch_apply(lcb);
AZ(pthread_mutex_unlock(&fco->mtx));
/* free unused allocations */
......@@ -3264,6 +3429,7 @@ fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
/* retry any raced */
if (racen > 0)
fellow_cache_seg_ref_in(fc, type, racesegs, racen);
#undef REF_BATCH
}
/*
......@@ -3359,7 +3525,7 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
if (fcs->state == FCS_USABLE)
break;
err = fellow_cache_seg_check(fc, fcs);
err = fellow_cache_seg_check(fcs);
if (err != NULL) {
fcr = FCR_IOFAIL(err);
break;
......@@ -3407,8 +3573,10 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
AZ(pthread_mutex_lock(&fcs->fco->mtx));
if (fcs->refcnt == 1)
fellow_cache_seg_free(fc, fcs, 1);
else
AN(fellow_cache_seg_deref_locked(fc, fcs));
else {
// NULL: refcount must be > 1, so cant be on LRU
AN(fellow_cache_seg_deref_locked(NULL, fcs));
}
AZ(pthread_mutex_unlock(&fcs->fco->mtx));
} else if (flush) {
assert(derefn < mod);
......@@ -4099,8 +4267,8 @@ fellow_cache_obj_redundant(struct fellow_cache *fc,
* if returns 0, fellow_cache_obj_free() must be called under the lock
*/
static unsigned
fellow_cache_obj_deref_locked(struct fellow_cache *fc,
struct fellow_cache_obj *fco)
fellow_cache_obj_deref_locked(struct fellow_lru_chgbatch *lcb,
struct fellow_cache *fc, struct fellow_cache_obj *fco)
{
struct fellow_cache_seg *fcs;
unsigned last, refcnt;
......@@ -4114,7 +4282,7 @@ fellow_cache_obj_deref_locked(struct fellow_cache *fc,
assert_cache_seg_consistency(fcs);
last = fcs->refcnt == 1 && fcs->fco_infdb;
refcnt = fellow_cache_seg_deref_locked(fc, fcs);
refcnt = fellow_cache_seg_deref_locked(lcb, fcs);
if (last) {
fcs->fco_infdb = 0;
AZ(refcnt);
......@@ -4129,13 +4297,16 @@ fellow_cache_obj_deref_locked(struct fellow_cache *fc,
void
fellow_cache_obj_deref(struct fellow_cache *fc, struct fellow_cache_obj *fco)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
unsigned refcount;
CHECK_OBJ_NOTNULL(fc, FELLOW_CACHE_MAGIC);
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
AZ(pthread_mutex_lock(&fco->mtx));
refcount = fellow_cache_obj_deref_locked(fc, fco);
refcount = fellow_cache_obj_deref_locked(lcb, fc, fco);
fellow_cache_lru_chgbatch_apply(lcb);
DBG("fco %p refcount %u", fco, refcount);
//lint --e{456} Two execution paths are being combined...
......@@ -4156,6 +4327,8 @@ struct fellow_cache_res
fellow_cache_obj_get(struct fellow_cache *fc,
struct objcore **ocp, uintptr_t priv2, unsigned crit)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
struct fellow_cache_res fcr;
struct fellow_cache_obj *fco, *nfco;
struct fellow_cache_seg *fcs;
......@@ -4287,8 +4460,8 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
assert_cache_seg_consistency(fcs);
AN(fcs->fco_infdb);
fcs->disk_seg = &fdo->fdo_fds;
fellow_cache_seg_transition_locked(fc, fcs,
fcs->state, FCO_INCORE);
fellow_cache_seg_transition_locked(lcb, fcs, fcs->state, FCO_INCORE);
fellow_cache_lru_chgbatch_apply(lcb);
if (fcs->refcnt > 1)
AZ(pthread_cond_broadcast(&fco->cond));
assert_cache_seg_consistency(fcs);
......@@ -4351,6 +4524,8 @@ void
fellow_cache_obj_delete(struct fellow_cache *fc,
struct fellow_cache_obj *fco, const uint8_t hash[DIGEST_LEN])
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
const struct fellow_disk_obj *fdo;
/* one additional region for the object itself */
struct buddy_off_extent region[FCO_MAX_REGIONS + 1] = {{0}};
......@@ -4398,12 +4573,10 @@ fellow_cache_obj_delete(struct fellow_cache *fc,
//lint --e{456} Two execution paths are being combined...
//lint --e{454} A thread mutex has been locked...
u = fellow_cache_obj_deref_locked(fc, fco);
DBG("fco %p refcount %u", fco, u);
if (u == 0)
fellow_cache_obj_free(fc, &fco);
else
AZ(pthread_mutex_unlock(&fco->mtx));
// must have been the last reference
AZ(fellow_cache_obj_deref_locked(lcb, fc, fco));
fellow_cache_lru_chgbatch_apply(lcb);
fellow_cache_obj_free(fc, &fco);
/* UNLOCKED because these states are all final */
switch (logstate) {
......@@ -4480,7 +4653,7 @@ fellow_cache_obj_getattr(struct fellow_cache *fc,
if (fcs->disk_seg->seg.size == 0) \
return (NULL); \
fellow_cache_seg_ref_in(fc, FCIO_SYNC, &fcs, 1); \
XXXAZ(fellow_cache_seg_check(fc, fcs)); \
XXXAZ(fellow_cache_seg_check(fcs)); \
*len = fcs->len; \
return (fcs->alloc.ptr); \
}
......@@ -5026,7 +5199,76 @@ void fellow_busy_obj_trimstore(struct fellow_busy *fbo);
#define DBGSZ(x) \
DBG(#x "\t%3zu", sizeof(struct x))
void
static unsigned
lru_find(struct fellow_cache *fc, struct fellow_cache_seg *fcs)
{
struct fellow_cache_seg *needle;
VTAILQ_FOREACH(needle, &fc->lru_head, lru_list)
if (needle == fcs)
return (1);
return (0);
}
#define LCBMAX 16
static void
t_1lcb(struct fellow_cache *fc, struct fellow_cache_seg *fcs,
uint8_t n, uint8_t i, uint8_t j, uint8_t k)
{
uint8_t u,v;
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, LCBMAX);
assert(n <= LCBMAX);
for (u = 1; u < n + 2; u++) { // length of remove
AZ(lcb->n);
lcb->l = u;
#define chg(from, to) \
DBG("%x->%x", from, to); \
for (v = 0; v < n; v++) { \
fellow_cache_lru_chg(lcb, &fcs[v], \
(int)(!!(to & 1<<v)) - (int)(!!(from & 1<<v))); \
}
#define apply(pos, to) \
if (k & (1<<pos)) { \
fellow_cache_lru_chgbatch_apply(lcb); \
for (v = 0; v < n; v++) { \
assert(!!(to & 1<<v) == \
lru_find(fc, &fcs[v])); \
} \
}
chg(0, i);
apply(0, i);
chg(i, j);
apply(1, j);
chg(j, k);
apply(2, k);
chg(k, i);
apply(3, i);
chg(i, 0);
fellow_cache_lru_chgbatch_apply(lcb);
}
}
static void
t_lcb(struct fellow_cache *fc)
{
const uint8_t nfcs = 4;
uint8_t i,j,k;
struct fellow_cache_seg fcs[nfcs];
for (i = 0; i < nfcs; i++)
INIT_OBJ(&fcs[i], FELLOW_CACHE_SEG_MAGIC);
for (i = 0; i < 1<<nfcs; i++)
for (j = 0; j < 1<<nfcs; j++)
for (k = 0; k < 1<<nfcs; k++)
t_1lcb(fc, fcs, nfcs, i, j, k);
DBG("done %s","---");
}
static void
t_cache(unsigned chksum)
{
size_t sz;
......@@ -5076,6 +5318,8 @@ t_cache(unsigned chksum)
fc = fellow_cache_init(ffd, membuddy, tune, fellow_simple_task_run);
t_lcb(fc);
// === empty obj
fbo = fellow_busy_obj_alloc(fc, &fco, &priv2, 1234).r.ptr;
CHECK_OBJ_NOTNULL(fbo, FELLOW_BUSY_MAGIC);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment