Refactor LRU as a separate object

parent d1a3e476
......@@ -62,6 +62,50 @@ VRBT_HEAD(fellow_cache_fdb_head, fellow_cache_obj);
struct fellow_cache_seg;
VTAILQ_HEAD(fellow_cache_lru_head, fellow_cache_seg);
/* ============================================================
* LRU
*/
struct fellow_cache_lru {
unsigned magic;
#define FELLOW_CACHE_LRU_MAGIC 0x5fd80809
struct fellow_cache *fc;
pthread_mutex_t lru_mtx;
struct fellow_cache_lru_head lru_head;
pthread_t lru_thread;
};
static struct fellow_cache_lru *
fellow_cache_lru_new(struct fellow_cache *fc)
{
struct fellow_cache_lru *lru;
AN(fc);
ALLOC_OBJ(lru, FELLOW_CACHE_LRU_MAGIC);
AN(lru);
lru->fc = fc;
AZ(pthread_mutex_init(&lru->lru_mtx, NULL));
VTAILQ_INIT(&lru->lru_head);
return (lru);
}
static void
fellow_cache_lru_fini(struct fellow_cache_lru **lrup)
{
struct fellow_cache_lru *lru;
void *r;
TAKE_OBJ_NOTNULL(lru, lrup, FELLOW_CACHE_LRU_MAGIC);
if (lru->lru_thread != 0) {
AZ(pthread_join(lru->lru_thread, &r));
AZ(r);
}
assert(VTAILQ_EMPTY(&lru->lru_head));
AZ(pthread_mutex_destroy(&lru->lru_mtx));
}
/*
* lifetime of a fellow_obj in cache
*
......@@ -186,16 +230,16 @@ struct fellow_lru_chgbatch {
unsigned magic;
#define FELLOW_LRU_CHGBATCH_MAGIC 0xaab452d9
uint16_t l, n;
struct fellow_cache *fc;
struct fellow_cache_obj *fco;
struct fellow_cache_lru_head add;
struct fellow_cache_seg **fcs;
};
#define FELLOW_LRU_CHGBATCH_INIT(name, fca, size) {{ \
#define FELLOW_LRU_CHGBATCH_INIT(name, fcoa, size) {{ \
.magic = FELLOW_LRU_CHGBATCH_MAGIC, \
.l = (size), \
.n = 0, \
.fc = (fca), \
.fco = (fcoa), \
.add = VTAILQ_HEAD_INITIALIZER((name)->add), \
.fcs = (struct fellow_cache_seg*[size + 1]){0} \
}}
......@@ -519,6 +563,8 @@ struct fellow_cache_obj {
// protected by fco mtx (in fcs)
enum fcol_state logstate;
struct fellow_cache_lru *lru;
struct buddy_ptr_page fco_mem; // nil == embedded
pthread_mutex_t mtx;
......@@ -646,8 +692,7 @@ struct fellow_cache {
pthread_mutex_t fdb_mtx;
struct fellow_cache_fdb_head fdb_head;
pthread_mutex_t lru_mtx;
struct fellow_cache_lru_head lru_head;
struct fellow_cache_lru *lru;
pthread_mutex_t async_mtx;
pthread_cond_t async_cond;
......@@ -696,7 +741,7 @@ fellow_cache_res_check(const struct fellow_cache *fc,
static void
fellow_cache_obj_fini(const struct fellow_cache_obj *fco);
static void
fellow_cache_obj_redundant(struct fellow_cache *fc,
fellow_cache_obj_redundant(const struct fellow_cache *fc,
struct fellow_cache_obj **fcop);
static inline unsigned
......@@ -947,8 +992,9 @@ fellow_cache_shouldlru(enum fcos_state state, const struct objcore *oc,
static void
fellow_cache_lru_chgbatch_apply(struct fellow_lru_chgbatch *lcb)
{
struct fellow_cache_obj *fco;
struct fellow_cache_seg *fcs;
struct fellow_cache *fc;
struct fellow_cache_lru *lru;
unsigned n;
CHECK_OBJ_NOTNULL(lcb, FELLOW_LRU_CHGBATCH_MAGIC);
......@@ -958,8 +1004,10 @@ fellow_cache_lru_chgbatch_apply(struct fellow_lru_chgbatch *lcb)
if (lcb->n == 0 && VTAILQ_EMPTY(&lcb->add))
return;
fc = lcb->fc;
CHECK_OBJ_NOTNULL(fc, FELLOW_CACHE_MAGIC);
fco = lcb->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
lru = fco->lru;
CHECK_OBJ_NOTNULL(lru, FELLOW_CACHE_LRU_MAGIC);
n = lcb->n;
while (n--) {
......@@ -974,15 +1022,16 @@ fellow_cache_lru_chgbatch_apply(struct fellow_lru_chgbatch *lcb)
fcs->lcb_add = 0;
}
AZ(pthread_mutex_lock(&fc->lru_mtx));
AZ(pthread_mutex_lock(&lru->lru_mtx));
while (lcb->n) {
lcb->n--;
TAKE_OBJ_NOTNULL(fcs, &lcb->fcs[lcb->n],
FELLOW_CACHE_SEG_MAGIC);
VTAILQ_REMOVE(&fc->lru_head, fcs, lru_list);
assert(fcs->fco == fco);
VTAILQ_REMOVE(&lru->lru_head, fcs, lru_list);
}
VTAILQ_CONCAT(&fc->lru_head, &lcb->add, lru_list);
AZ(pthread_mutex_unlock(&fc->lru_mtx));
VTAILQ_CONCAT(&lru->lru_head, &lcb->add, lru_list);
AZ(pthread_mutex_unlock(&lru->lru_mtx));
AZ(lcb->n);
}
......@@ -1006,6 +1055,8 @@ fellow_cache_lru_chg(struct fellow_lru_chgbatch *lcb,
CHECK_OBJ_NOTNULL(lcb, FELLOW_LRU_CHGBATCH_MAGIC);
AN(lcb->fcs);
assert(lcb->fco == fcs->fco);
unsigned add = chg > 0;
#ifdef EXTRA_ASSERTIONS
......@@ -1133,7 +1184,7 @@ fellow_cache_seg_wait_locked(const struct fellow_cache_seg *fcs)
* called holding the fco lock
*/
static void
fellow_cache_seg_free(struct fellow_cache *fc,
fellow_cache_seg_free(const struct fellow_cache *fc,
struct fellow_cache_seg *fcs, unsigned deref)
{
......@@ -1160,7 +1211,7 @@ fellow_cache_seg_free(struct fellow_cache *fc,
assert_cache_seg_consistency(fcs);
if (fcs->state == FCS_INCORE) {
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
FELLOW_LRU_CHGBATCH_INIT(lcb, fcs->fco, 1);
fellow_cache_seg_transition_locked(lcb, fcs,
FCS_INCORE, FCS_DISK);
......@@ -1185,7 +1236,7 @@ fellow_cache_seg_free(struct fellow_cache *fc,
* fco->mtx held
*/
static void
fellow_cache_seg_auxattr_free(struct fellow_cache *fc,
fellow_cache_seg_auxattr_free(const struct fellow_cache *fc,
struct fellow_cache_seg *fcs)
{
......@@ -1196,7 +1247,7 @@ fellow_cache_seg_auxattr_free(struct fellow_cache *fc,
/* fco->mtx held unless surplus seglist */
static void
fellow_cache_seglist_free(struct fellow_cache *fc,
fellow_cache_seglist_free(const struct fellow_cache *fc,
struct fellow_cache_seglist *fcsl)
{
struct fellow_cache_seglist *next;
......@@ -1336,15 +1387,17 @@ fellow_obj_regions(const struct fellow_fd *ffd,
/* fco mtx must be held, will be unlocked */
static void
fellow_cache_obj_free(struct fellow_cache *fc, struct fellow_cache_obj **fcop)
fellow_cache_obj_free(const struct fellow_cache *fc,
struct fellow_cache_obj **fcop)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
struct fellow_cache_obj *fco;
struct fellow_cache_seg *fcs;
TAKE_OBJ_NOTNULL(fco, fcop, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
fcs = FCO_FCS(fco);
AZ(FCO_REFCNT(fco));
assert_cache_seg_consistency(fcs);
......@@ -1388,15 +1441,20 @@ fellow_cache_obj_free(struct fellow_cache *fc, struct fellow_cache_obj **fcop)
*/
void
fellow_cache_obj_evict_mutate(struct fellow_cache *fc,
fellow_cache_obj_evict_mutate(struct fellow_cache_lru *lru,
struct fellow_cache_obj *fco)
{
struct fellow_cache_seg *fcs;
struct fellow_cache *fc;
CHECK_OBJ_NOTNULL(lru, FELLOW_CACHE_LRU_MAGIC);
fc = lru->fc;
CHECK_OBJ_NOTNULL(fc, FELLOW_CACHE_MAGIC);
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
assert(lru == fco->lru);
fcs = FCO_FCS(fco);
assert_cache_seg_consistency(fcs);
......@@ -1417,7 +1475,7 @@ fellow_cache_obj_evict_mutate(struct fellow_cache *fc,
fco->oc = NULL;
// lru
VTAILQ_REMOVE(&fc->lru_head, fcs, lru_list);
VTAILQ_REMOVE(&lru->lru_head, fcs, lru_list);
AN(fcs->fcs_onlru);
fcs->fcs_onlru = 0;
......@@ -1782,6 +1840,7 @@ fellow_cache_obj_new(const struct fellow_cache *fc,
INIT_OBJ(fco, FELLOW_CACHE_OBJ_MAGIC);
DBG("fco %p", fco);
fco->fco_mem = fco_mem;
fco->lru = fc->lru;
AZ(pthread_mutex_init(&fco->mtx, NULL));
AZ(pthread_cond_init(&fco->cond, NULL));
......@@ -2305,17 +2364,18 @@ fellow_cache_seg_transition_locked(
static void
fellow_cache_seg_transition(
struct fellow_cache *fc,
struct fellow_cache_seg *fcs,
enum fcos_state from, enum fcos_state to)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
struct fellow_cache_obj *fco;
CHECK_OBJ_NOTNULL(fcs, FELLOW_CACHE_SEG_MAGIC);
fco = fcs->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
assert(from != to);
assert_fcos_transition(from, to);
......@@ -2381,7 +2441,7 @@ fellow_cache_seg_unbusy(struct fellow_busy *fbo, struct fellow_cache_seg *fcs){
assert_fcos_transition(from, to);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fbo->fc, 1);
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
AZ(pthread_mutex_lock(&fco->mtx));
fbo->io_outstanding++;
......@@ -2423,7 +2483,7 @@ fellow_cache_obj_unbusy(struct fellow_busy *fbo, enum fcol_state wantlog)
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
fcs = FCO_FCS(fco);
assert_fcos_transition(from, to);
......@@ -2477,8 +2537,6 @@ fellow_cache_obj_unbusy(struct fellow_busy *fbo, enum fcol_state wantlog)
static void
fellow_cache_read_complete(struct fellow_cache *fc, void *ptr, int32_t result)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
enum fcos_state fcos_next = (typeof(fcos_next))FCOS_CHECK;
struct fellow_cache_seg *fcs;
struct fellow_cache_obj *fco;
......@@ -2503,6 +2561,9 @@ fellow_cache_read_complete(struct fellow_cache *fc, void *ptr, int32_t result)
fco = fcs->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
assert_fcos_transition(fcs->state, fcos_next);
//lint --e{456} Two execution paths are being combined...
......@@ -2525,9 +2586,6 @@ static void
fellow_cache_async_write_complete(struct fellow_cache *fc,
void *ptr, int32_t result)
{
// can potentially change the object and a segment
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 2);
struct fellow_busy_io *fbio;
struct fellow_busy *fbo;
struct fellow_cache_seg *fcs = NULL;
......@@ -2555,6 +2613,10 @@ fellow_cache_async_write_complete(struct fellow_cache *fc,
fco = fbo->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
// can potentially change the object and a segment
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 2);
if (type == FBIO_SEG) {
fcs = fbio->u.fcs;
CHECK_OBJ_NOTNULL(fcs, FELLOW_CACHE_SEG_MAGIC);
......@@ -2836,27 +2898,32 @@ fellow_cache_seg_evict_locked(
static inline void
fellow_cache_lru_seg_evict_locked(
struct fellow_cache_seg *fcs, struct buddy_ptr_extent *alloc,
struct fellow_cache *fc)
struct fellow_cache_lru *lru)
{
AN(fcs->fcs_onlru);
fcs->fcs_onlru = 0;
VTAILQ_REMOVE(&fc->lru_head, fcs, lru_list);
VTAILQ_REMOVE(&lru->lru_head, fcs, lru_list);
fellow_cache_seg_evict_locked(fcs, alloc);
}
/* evict all segments of an object */
void
fellow_cache_obj_slim(struct fellow_cache *fc, struct fellow_cache_obj *fco)
fellow_cache_obj_slim(const struct fellow_cache *fc,
struct fellow_cache_obj *fco)
{
struct buddy_returns *rets =
BUDDY_RETURNS_STK(fc->membuddy, BUDDY_RETURNS_MAX);
struct buddy_ptr_extent alloc;
struct fellow_cache_seg *fcs;
struct fellow_cache_lru *lru;
struct fcscursor c;
unsigned ref;
fcsc_init(&c, &fco->seglist);
lru = fco->lru;
CHECK_OBJ_NOTNULL(lru, FELLOW_CACHE_LRU_MAGIC);
AZ(pthread_mutex_lock(&fco->mtx));
/* anything to do at all? */
......@@ -2868,20 +2935,20 @@ fellow_cache_obj_slim(struct fellow_cache *fc, struct fellow_cache_obj *fco)
goto out;
fcsc_init(&c, &fco->seglist);
AZ(pthread_mutex_lock(&fc->lru_mtx));
AZ(pthread_mutex_lock(&lru->lru_mtx));
while ((fcs = fcsc_next(&c)) != NULL) {
if (fcs->alloc.ptr == NULL)
break;
ref = 0;
while (fcs->state == FCS_READING || fcs->state == FCS_WRITING) {
AZ(pthread_mutex_unlock(&fc->lru_mtx));
AZ(pthread_mutex_unlock(&lru->lru_mtx));
// NULL: can not be on lru
(void) fellow_cache_seg_ref_locked(NULL, fcs);
ref = 1;
fellow_cache_seg_wait_locked(fcs);
AZ(pthread_mutex_lock(&fc->lru_mtx));
AZ(pthread_mutex_lock(&lru->lru_mtx));
}
if (fcs->state == FCS_INCORE && ref == 1 && fcs->refcnt == ref) {
/* we hold the only ref, can not be on LRU,
......@@ -2895,7 +2962,7 @@ fellow_cache_obj_slim(struct fellow_cache *fc, struct fellow_cache_obj *fco)
if (fcs->state == FCS_INCORE && fcs->refcnt == 0) {
AZ(ref);
// remove from LRU
fellow_cache_lru_seg_evict_locked(fcs, &alloc, fc);
fellow_cache_lru_seg_evict_locked(fcs, &alloc, lru);
AN(buddy_return_ptr_extent(rets, &alloc));
continue;
}
......@@ -2904,7 +2971,7 @@ fellow_cache_obj_slim(struct fellow_cache *fc, struct fellow_cache_obj *fco)
if (ref)
(void) fellow_cache_seg_deref_locked(NULL, fcs);
}
AZ(pthread_mutex_unlock(&fc->lru_mtx));
AZ(pthread_mutex_unlock(&lru->lru_mtx));
out:
AZ(pthread_mutex_unlock(&fco->mtx));
......@@ -2912,9 +2979,9 @@ fellow_cache_obj_slim(struct fellow_cache *fc, struct fellow_cache_obj *fco)
}
static int
fellow_cache_lru_work(struct worker *wrk, struct fellow_cache *fc)
fellow_cache_lru_work(struct worker *wrk, struct fellow_cache_lru *lru)
{
buddy_t *buddy = fc->membuddy;
buddy_t *buddy;
struct fellow_cache_seg *fcs, *fcss;
struct fellow_cache_obj *fco;
struct objcore *oc;
......@@ -2923,11 +2990,14 @@ fellow_cache_lru_work(struct worker *wrk, struct fellow_cache *fc)
//lint --e{456} flexelint does not grok trylock
CHECK_OBJ_NOTNULL(lru, FELLOW_CACHE_LRU_MAGIC);
buddy = lru->fc->membuddy;
alloc = buddy_ptr_extent_nil;
oc = NULL;
fco = NULL;
AZ(pthread_mutex_lock(&fc->lru_mtx));
VTAILQ_FOREACH_SAFE(fcs, &fc->lru_head, lru_list, fcss) {
AZ(pthread_mutex_lock(&lru->lru_mtx));
VTAILQ_FOREACH_SAFE(fcs, &lru->lru_head, lru_list, fcss) {
// no use trying the same object again and again
if (fcs->fco == fco)
continue;
......@@ -2940,7 +3010,7 @@ fellow_cache_lru_work(struct worker *wrk, struct fellow_cache *fc)
}
if (fcs->state == FCS_INCORE) {
fellow_cache_lru_seg_evict_locked(fcs,
&alloc, fc);
&alloc, lru);
AZ(pthread_mutex_unlock(&fco->mtx));
fco = NULL;
break;
......@@ -2948,7 +3018,7 @@ fellow_cache_lru_work(struct worker *wrk, struct fellow_cache *fc)
if (fcs->state == FCO_INCORE) {
oc = fco->oc;
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
r = stvfe_mutate(wrk, oc);
r = stvfe_mutate(wrk, lru, oc);
AZ(pthread_mutex_unlock(&fco->mtx));
if (r) {
/* mutate was successful
......@@ -2963,7 +3033,7 @@ fellow_cache_lru_work(struct worker *wrk, struct fellow_cache *fc)
AZ(pthread_mutex_unlock(&fco->mtx));
WRONG("fcs state in lru");
}
AZ(pthread_mutex_unlock(&fc->lru_mtx));
AZ(pthread_mutex_unlock(&lru->lru_mtx));
if (fcs == NULL) {
// VSLb(wrk->vsl, SLT_ExpKill, "LRU_Fail");
......@@ -2981,7 +3051,7 @@ fellow_cache_lru_work(struct worker *wrk, struct fellow_cache *fc)
// VSLb(wrk->vsl, SLT_ExpKill, "LRU x=%u", ObjGetXID(wrk, oc));
// we removed the oc's reference on fco in stvfe_mutate()
fellow_cache_obj_deref(fc, fco);
fellow_cache_obj_deref(lru->fc, fco);
} else {
INCOMPL();
}
......@@ -3028,6 +3098,7 @@ reserve_fill(struct buddy_ptr_page *r, const struct buddy_reqs *reqs, uint8_t n)
void *
fellow_cache_lru_thread(struct worker *wrk, void *priv)
{
struct fellow_cache_lru *lru;
struct vsl_log vsl;
struct buddy_reqs *reqs;
struct fellow_cache *fc;
......@@ -3038,9 +3109,15 @@ fellow_cache_lru_thread(struct worker *wrk, void *priv)
size_t sz;
CAST_OBJ_NOTNULL(fc, priv, FELLOW_CACHE_MAGIC);
lru = fc->lru;
CHECK_OBJ_NOTNULL(lru, FELLOW_CACHE_LRU_MAGIC);
buddy = fc->membuddy;
CHECK_OBJ(buddy, BUDDY_MAGIC);
AZ(lru->lru_thread);
lru->lru_thread = pthread_self();
// fellow_cache_lru_seg_evict_locked()
assert_fcos_transition(FCS_INCORE, FCS_DISK);
......@@ -3085,7 +3162,7 @@ fellow_cache_lru_thread(struct worker *wrk, void *priv)
(void) BUDDYF(alloc_async)(reqs);
while (buddy->waiting)
if (! fellow_cache_lru_work(wrk, fc))
if (! fellow_cache_lru_work(wrk, lru))
break;
n = BUDDYF(alloc_async_ready)(reqs);
......@@ -3121,7 +3198,7 @@ fellow_cache_lru_thread(struct worker *wrk, void *priv)
#endif
while (buddy->waiting)
if (! fellow_cache_lru_work(wrk, fc)) {
if (! fellow_cache_lru_work(wrk, lru)) {
/* give other threads a change to
* get the lru lock */
(void)usleep(10*1000);
......@@ -3135,14 +3212,15 @@ fellow_cache_lru_thread(struct worker *wrk, void *priv)
/* returns if moved */
int
fellow_cache_obj_lru_touch(struct fellow_cache *fc,
struct fellow_cache_obj *fco)
fellow_cache_obj_lru_touch(struct fellow_cache_obj *fco)
{
struct fellow_cache_lru *lru;
struct fellow_cache_seg *fcs;
int r;
CHECK_OBJ_NOTNULL(fc, FELLOW_CACHE_MAGIC);
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
lru = fco->lru;
CHECK_OBJ_NOTNULL(lru, FELLOW_CACHE_LRU_MAGIC);
fcs = FCO_FCS(fco);
CHECK_OBJ_NOTNULL(fcs, FELLOW_CACHE_SEG_MAGIC);
......@@ -3151,15 +3229,15 @@ fellow_cache_obj_lru_touch(struct fellow_cache *fc,
assert(fellow_cache_shouldlru(fcs->state, fco->oc, fcs->refcnt));
r = pthread_mutex_trylock(&fc->lru_mtx);
r = pthread_mutex_trylock(&lru->lru_mtx);
if (r != 0) {
assert(r == EBUSY);
return (0);
}
VTAILQ_REMOVE(&fc->lru_head, fcs, lru_list);
VTAILQ_INSERT_TAIL(&fc->lru_head, fcs, lru_list);
VTAILQ_REMOVE(&lru->lru_head, fcs, lru_list);
VTAILQ_INSERT_TAIL(&lru->lru_head, fcs, lru_list);
//lint -e{455} flexelint does not grok trylock
AZ(pthread_mutex_unlock(&fc->lru_mtx));
AZ(pthread_mutex_unlock(&lru->lru_mtx));
return (1);
}
......@@ -3207,12 +3285,9 @@ fellow_cache_async_fini(struct fellow_cache *fc)
*/
static void
fellow_cache_seg_deref(struct fellow_cache *fc,
struct fellow_cache_seg * const *segs, unsigned n)
fellow_cache_seg_deref(struct fellow_cache_seg * const *segs, unsigned n)
{
#define DEREF_BATCH 64
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, DEREF_BATCH);
struct fellow_cache_seg *fcs;
struct fellow_cache_obj *fco;
......@@ -3221,6 +3296,9 @@ fellow_cache_seg_deref(struct fellow_cache *fc,
fco = (*segs)->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, DEREF_BATCH);
AZ(pthread_mutex_lock(&fco->mtx));
while (n--) {
fcs = *segs++;
......@@ -3293,8 +3371,6 @@ fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
struct fellow_cache_seg * const *segs, const unsigned n)
{
#define REF_BATCH 64
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, REF_BATCH);
struct fellow_cache_seg *fcs, *iosegs[n], *racesegs[n];
struct fellow_cache_obj *fco;
unsigned u, ion = 0, racen = 0;
......@@ -3309,6 +3385,9 @@ fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
fco = (*segs)->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, REF_BATCH);
assert(n <= BUDDY_REQS_MAX);
reqs = BUDDY_REQS_STK(fc->membuddy, BUDDY_REQS_MAX);
rets = BUDDY_RETURNS_STK(fc->membuddy, BUDDY_RETURNS_MAX);
......@@ -3603,7 +3682,7 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
} else if (flush) {
assert(derefn < mod);
deref[derefn++] = fcs;
fellow_cache_seg_deref(fc, deref, derefn);
fellow_cache_seg_deref(deref, derefn);
derefn = 0;
} else {
assert(derefn < mod);
......@@ -3618,14 +3697,14 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
if (fcr.status != fcr_ok || fcr.r.integer != 0)
while (mod--) {
if (ra[mod] != NULL)
fellow_cache_seg_deref(fc, &ra[mod], 1);
fellow_cache_seg_deref(&ra[mod], 1);
}
else
while (mod--)
AZ(ra[mod]);
if (derefn)
fellow_cache_seg_deref(fc, deref, derefn);
fellow_cache_seg_deref(deref, derefn);
if ((flags & OBJ_ITER_END) == 0) {
ret2 = func(priv, OBJ_ITER_END, NULL, (size_t)0);
......@@ -3994,7 +4073,7 @@ fellow_busy_obj_trimstore(struct fellow_busy *fbo)
fcs->alloc.ptr = NULL;
AZ(fcs->disk_seg->seg.size);
fcs->disk_seg->seg.off = 0;
fellow_cache_seg_transition(fbo->fc, fcs,
fellow_cache_seg_transition(fcs,
FCS_BUSY, FCS_USABLE);
fcsl->fdsl->nsegs--;
assert(fcs == &fcsl->segs[fcsl->fdsl->nsegs]);
......@@ -4260,7 +4339,7 @@ fellow_cache_obj_prepread(const struct fellow_cache *fc, fellow_disk_block fdba,
// undo fellow_cache_obj_prepread()
static void
fellow_cache_obj_redundant(struct fellow_cache *fc,
fellow_cache_obj_redundant(const struct fellow_cache *fc,
struct fellow_cache_obj **fcop)
{
struct fellow_cache_obj *fco;
......@@ -4321,13 +4400,14 @@ fellow_cache_obj_deref_locked(struct fellow_lru_chgbatch *lcb,
void
fellow_cache_obj_deref(struct fellow_cache *fc, struct fellow_cache_obj *fco)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
unsigned refcount;
CHECK_OBJ_NOTNULL(fc, FELLOW_CACHE_MAGIC);
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
AZ(pthread_mutex_lock(&fco->mtx));
refcount = fellow_cache_obj_deref_locked(lcb, fc, fco);
fellow_cache_lru_chgbatch_apply(lcb);
......@@ -4351,8 +4431,6 @@ struct fellow_cache_res
fellow_cache_obj_get(struct fellow_cache *fc,
struct objcore **ocp, uintptr_t priv2, unsigned crit)
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
struct fellow_cache_res fcr;
struct fellow_cache_obj *fco, *nfco;
struct fellow_cache_seg *fcs;
......@@ -4480,6 +4558,9 @@ struct objcore **ocp, uintptr_t priv2, unsigned crit)
FCS_USABLE : FCS_DISK);
#include "tbl/fellow_obj_attr.h"
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
AZ(pthread_mutex_lock(&fco->mtx));
assert(fcs->state == FCO_READING);
assert_cache_seg_consistency(fcs);
......@@ -4551,8 +4632,6 @@ void
fellow_cache_obj_delete(struct fellow_cache *fc,
struct fellow_cache_obj *fco, const uint8_t hash[DIGEST_LEN])
{
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, 1);
const struct fellow_disk_obj *fdo;
/* one additional region for the object itself */
struct buddy_off_extent region[FCO_MAX_REGIONS + 1] = {{0}};
......@@ -4566,6 +4645,9 @@ fellow_cache_obj_delete(struct fellow_cache *fc,
fdo = FCO_FDO(fco);
CHECK_OBJ_NOTNULL(fdo, FELLOW_DISK_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
fdba = fco->fdb;
n = fellow_obj_regions(fc->ffd, fco, region);
......@@ -4850,11 +4932,10 @@ fellow_cache_init(struct fellow_fd *ffd, buddy_t *membuddy,
fc->tune = tune;
fc->running = 1;
AZ(pthread_mutex_init(&fc->fdb_mtx, NULL));
AZ(pthread_mutex_init(&fc->lru_mtx, NULL));
fc->lru = fellow_cache_lru_new(fc);
AZ(pthread_mutex_init(&fc->fdb_mtx, NULL));
VRBT_INIT(&fc->fdb_head);
VTAILQ_INIT(&fc->lru_head);
fellow_cache_async_init(fc, taskrun);
......@@ -4865,26 +4946,21 @@ fellow_cache_init(struct fellow_fd *ffd, buddy_t *membuddy,
* the LRU thread is started by the storage layer after loading
*/
void
fellow_cache_fini(struct fellow_cache **fcp, const pthread_t *lru_thread)
fellow_cache_fini(struct fellow_cache **fcp)
{
struct fellow_cache *fc;
void *r;
TAKE_OBJ_NOTNULL(fc, fcp, FELLOW_CACHE_MAGIC);
fc->running = 0;
buddy_wait_kick(fc->membuddy);
if (lru_thread != NULL) {
AZ(pthread_join(*lru_thread, &r));
AZ(r);
}
fellow_cache_lru_fini(&fc->lru);
AZ(fc->lru);
assert(VTAILQ_EMPTY(&fc->lru_head));
assert(VRBT_EMPTY(&fc->fdb_head));
fellow_cache_async_fini(fc);
AZ(pthread_mutex_destroy(&fc->lru_mtx));
AZ(pthread_mutex_destroy(&fc->fdb_mtx));
FREE_OBJ(fc);
......@@ -5209,10 +5285,13 @@ static void test_fellow_cache_obj_iter(
DBG(#x "\t%3zu", sizeof(struct x))
static unsigned
lru_find(struct fellow_cache *fc, struct fellow_cache_seg *fcs)
lru_find(struct fellow_cache_seg *fcs)
{
struct fellow_cache_seg *needle;
VTAILQ_FOREACH(needle, &fc->lru_head, lru_list)
struct fellow_cache_lru *lru;
lru = fcs->fco->lru;
VTAILQ_FOREACH(needle, &lru->lru_head, lru_list)
if (needle == fcs)
return (1);
return (0);
......@@ -5220,12 +5299,12 @@ lru_find(struct fellow_cache *fc, struct fellow_cache_seg *fcs)
#define LCBMAX 16
static void
t_1lcb(struct fellow_cache *fc, struct fellow_cache_seg *fcs,
t_1lcb(struct fellow_cache_seg *fcs,
uint8_t n, uint8_t i, uint8_t j, uint8_t k)
{
uint8_t u,v;
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fc, LCBMAX);
FELLOW_LRU_CHGBATCH_INIT(lcb, fcs->fco, LCBMAX);
assert(n <= LCBMAX);
for (u = 1; u < n + 2; u++) { // length of remove
......@@ -5243,7 +5322,7 @@ t_1lcb(struct fellow_cache *fc, struct fellow_cache_seg *fcs,
fellow_cache_lru_chgbatch_apply(lcb); \
for (v = 0; v < n; v++) { \
assert(!!(to & 1<<v) == \
lru_find(fc, &fcs[v])); \
lru_find(&fcs[v])); \
} \
}
......@@ -5265,15 +5344,21 @@ t_lcb(struct fellow_cache *fc)
{
const uint8_t nfcs = 4;
uint8_t i,j,k;
struct fellow_cache_obj fco[1];
struct fellow_cache_seg fcs[nfcs];
for (i = 0; i < nfcs; i++)
INIT_OBJ(fco, FELLOW_CACHE_OBJ_MAGIC);
fco->lru = fc->lru;
for (i = 0; i < nfcs; i++) {
INIT_OBJ(&fcs[i], FELLOW_CACHE_SEG_MAGIC);
fcs[i].fco = fco;
}
for (i = 0; i < 1<<nfcs; i++)
for (j = 0; j < 1<<nfcs; j++)
for (k = 0; k < 1<<nfcs; k++)
t_1lcb(fc, fcs, nfcs, i, j, k);
t_1lcb(fcs, nfcs, i, j, k);
DBG("done %s","---");
}
......@@ -5307,7 +5392,7 @@ t_cache(unsigned chksum)
// canary so size increase does not happen unnoticed
sz = sizeof(struct fellow_cache_obj);
assert(sz <= 320);
assert(sz <= 328);
AZ(stvfe_tune_init(tune, memsz, dsksz, objsize_hint));
tune->hash_obj = chksum;
......@@ -5426,7 +5511,7 @@ t_cache(unsigned chksum)
fellow_cache_obj_delete(fc, fco, hash);
fellow_cache_fini(&fc, NULL);
fellow_cache_fini(&fc);
AZ(fc);
fellow_log_close(&ffd);
BWIT_ISEMPTY(membuddy->witness);
......
......@@ -50,7 +50,7 @@ struct fellow_cache_res {
struct fellow_cache *
fellow_cache_init(struct fellow_fd *, buddy_t *, struct stvfe_tune *,
fellow_task_run_t);
void fellow_cache_fini(struct fellow_cache **, const pthread_t *);
void fellow_cache_fini(struct fellow_cache **);
void fellow_cache_obj_delete(struct fellow_cache *fc,
struct fellow_cache_obj *fco, const uint8_t hash[32]);
......@@ -65,8 +65,7 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
void *fellow_cache_obj_getattr(struct fellow_cache *fc,
struct fellow_cache_obj *fco,
enum obj_attr attr, size_t *len);
int fellow_cache_obj_lru_touch(struct fellow_cache *fc,
struct fellow_cache_obj *fco);
int fellow_cache_obj_lru_touch(struct fellow_cache_obj *fco);
struct fellow_cache_res
fellow_busy_obj_alloc(struct fellow_cache *fc,
......
......@@ -20,14 +20,16 @@
* electronic and paper mail.
*/
void fellow_cache_obj_evict_mutate(struct fellow_cache *fc,
struct fellow_cache_lru;
void fellow_cache_obj_evict_mutate(struct fellow_cache_lru *lru,
struct fellow_cache_obj *fco);
void fellow_cache_obj_slim(struct fellow_cache *fc,
void fellow_cache_obj_slim(const struct fellow_cache *fc,
struct fellow_cache_obj *fco);
void stvfe_oc_log_removed(struct objcore *oc);
void stvfe_oc_log_submitted(struct objcore *oc);
void stvfe_oc_dle_obj(struct objcore *oc, struct fellow_dle *e);
int stvfe_mutate(struct worker *wrk, struct objcore *oc);
int stvfe_mutate(struct worker *wrk, struct fellow_cache_lru *lru,
struct objcore *oc);
void stvfe_sumstat(struct worker *wrk); // wraps Pool_Sumstat(wrk);
......@@ -296,7 +296,6 @@ struct stvfe {
buddy_t my_membuddy;
buddy_t *membuddy;
pthread_t mem_lru_thread;
pthread_t dsk_lru_thread;
uintptr_t oev;
......@@ -1034,7 +1033,7 @@ sfemem_getattr(struct worker *wrk, struct objcore *memoc, enum obj_attr attr,
*slen = (ssize_t)len;
if (attr == OA_VARY)
(void) fellow_cache_obj_lru_touch(stvfe->fc, fco);
(void) fellow_cache_obj_lru_touch(fco);
return (r);
}
......@@ -1109,7 +1108,7 @@ sfemem_touch(struct worker *wrk, struct objcore *memoc, vtim_real now)
* last_lru. Does it matter?
*/
(void) fellow_cache_obj_lru_touch(stvfe->fc, fco);
(void) fellow_cache_obj_lru_touch(fco);
LRU_Touch(wrk, memoc, now);
}
......@@ -1781,7 +1780,8 @@ stvfe_sumstat(struct worker *wrk)
* fco->mtx held!
*/
int
stvfe_mutate(struct worker *wrk, struct objcore *oc)
stvfe_mutate(struct worker *wrk, struct fellow_cache_lru *lru,
struct objcore *oc)
{
const struct stevedore *stv;
const struct stvfe *stvfe;
......@@ -1809,8 +1809,8 @@ stvfe_mutate(struct worker *wrk, struct objcore *oc)
AN(oc->stobj->priv);
AN(oc->stobj->priv2);
fellow_cache_obj_evict_mutate(stvfe->fc,
oc->stobj->priv);
// XXX LRU with change, reduce LRU mtx scope
fellow_cache_obj_evict_mutate(lru, oc->stobj->priv);
oc->stobj->priv = NULL;
oc->stobj->stevedore = stvfe->dskstv;
......@@ -2327,6 +2327,7 @@ sfe_open_scope(struct stevedore *stv)
struct sfe_resurrect_priv sfer[1];
struct sfe_open_worker_priv sow[1];
const char *err = NULL;
pthread_t mem_lru_thread;
ASSERT_CLI();
CAST_OBJ_NOTNULL(stvfe, stv->priv, STVFE_MAGIC);
......@@ -2412,8 +2413,9 @@ sfe_open_scope(struct stevedore *stv)
goto err;
}
WRK_BgThread(&stvfe->mem_lru_thread, "sfe-mem-lru",
WRK_BgThread(&mem_lru_thread, "sfe-mem-lru",
fellow_cache_lru_thread, stvfe->fc);
AN(mem_lru_thread);
WRK_BgThread(&stvfe->dsk_lru_thread, "sfe-dsk-lru",
sfedsk_lru_thread, stvfe->dskstv);
......@@ -2430,7 +2432,7 @@ sfe_open_scope(struct stevedore *stv)
}
if (fc != NULL)
fellow_cache_fini(&fc, NULL);
fellow_cache_fini(&fc);
AZ(fc);
if (ffd != NULL)
......@@ -2530,7 +2532,7 @@ sfe_close_real(struct stvfe *stvfe)
sfe_ban_space_return(stvfe);
buddy_alloc_async_done(&stvfe->ban_reqs->reqs);
fellow_cache_fini(&stvfe->fc, &stvfe->mem_lru_thread);
fellow_cache_fini(&stvfe->fc);
AZ(stvfe->fc);
fellow_log_close(&stvfe->ffd);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment