Commit 195fa50c authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Split the expire lock into expire and per-lru list locks.

The lock order is lru->exp lock, so that the timer_index
can be safely examined while holding only the lru lock,
eliminating the need for the separate OC_F_ONLRU flag.

The critical trick is that in EXP_Touch() the lru_lock
is sufficient:  We just move the object around on the
lru list, we don't add or delete it.

This hopefully reduces lock contention on these locks.
parent f9848a68
......@@ -100,8 +100,6 @@ struct vsc_lck;
struct waitinglist;
struct vef_priv;
struct lock { void *priv; }; // Opaque
#define DIGEST_LEN 32
/* Name of transient storage */
......@@ -375,7 +373,6 @@ struct objcore {
struct objhead *objhead;
double timer_when;
unsigned flags;
#define OC_F_ONLRU (1<<0)
#define OC_F_BUSY (1<<1)
#define OC_F_PASS (1<<2)
#define OC_F_LRUDONTMOVE (1<<4)
......@@ -391,6 +388,9 @@ static inline struct object *
oc_getobj(struct worker *wrk, struct objcore *oc)
{
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
AN(oc->methods);
AN(oc->methods->getobj);
return (oc->methods->getobj(wrk, oc));
}
......@@ -398,6 +398,8 @@ static inline void
oc_updatemeta(struct objcore *oc)
{
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
AN(oc->methods);
if (oc->methods->updatemeta != NULL)
oc->methods->updatemeta(oc);
}
......@@ -406,6 +408,9 @@ static inline void
oc_freeobj(struct objcore *oc)
{
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
AN(oc->methods);
AN(oc->methods->freeobj);
oc->methods->freeobj(oc);
}
......@@ -413,6 +418,9 @@ static inline struct lru *
oc_getlru(const struct objcore *oc)
{
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
AN(oc->methods);
AN(oc->methods->getlru);
return (oc->methods->getlru(oc));
}
......
......@@ -62,7 +62,7 @@ static struct lock exp_mtx;
* so that other users of the object will not stumble trying to change the
* ttl or lru position.
*/
#define BINHEAP_NOIDX 0
#define BINHEAP_NOIDX 0 /* XXX: should be in binary_heap.h */
/*--------------------------------------------------------------------
* When & why does the timer fire for this object ?
......@@ -99,7 +99,6 @@ exp_insert(struct objcore *oc, struct lru *lru)
binheap_insert(exp_heap, oc);
assert(oc->timer_idx != BINHEAP_NOIDX);
VTAILQ_INSERT_TAIL(&lru->lru_head, oc, lru_list);
oc->flags |= OC_F_ONLRU;
}
/*--------------------------------------------------------------------
......@@ -115,10 +114,12 @@ EXP_Inject(struct objcore *oc, struct lru *lru, double when)
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
CHECK_OBJ_NOTNULL(lru, LRU_MAGIC);
Lck_Lock(&lru->mtx);
Lck_Lock(&exp_mtx);
oc->timer_when = when;
exp_insert(oc, lru);
Lck_Unlock(&exp_mtx);
Lck_Unlock(&lru->mtx);
}
/*--------------------------------------------------------------------
......@@ -145,10 +146,12 @@ EXP_Insert(struct object *o)
lru = oc_getlru(oc);
CHECK_OBJ_NOTNULL(lru, LRU_MAGIC);
Lck_Lock(&lru->mtx);
Lck_Lock(&exp_mtx);
(void)update_object_when(o);
exp_insert(oc, lru);
Lck_Unlock(&exp_mtx);
Lck_Unlock(&lru->mtx);
oc_updatemeta(oc);
}
......@@ -183,17 +186,23 @@ EXP_Touch(struct object *o, double tnow)
lru = oc_getlru(oc);
CHECK_OBJ_NOTNULL(lru, LRU_MAGIC);
if (Lck_Trylock(&exp_mtx))
/*
* We only need the LRU lock here. The locking order is LRU->EXP
* so we can trust the content of the oc->timer_idx without the
* EXP lock. Since each lru list has its own lock, this should
* reduce contention a fair bit
*/
if (Lck_Trylock(&lru->mtx))
return;
if (oc->flags & OC_F_ONLRU) { /* XXX ?? */
if (oc->timer_idx != BINHEAP_NOIDX) {
VTAILQ_REMOVE(&lru->lru_head, oc, lru_list);
VTAILQ_INSERT_TAIL(&lru->lru_head, oc, lru_list);
VSC_main->n_lru_moved++;
o->last_lru = tnow;
}
Lck_Unlock(&exp_mtx);
Lck_Unlock(&lru->mtx);
}
/*--------------------------------------------------------------------
......@@ -209,12 +218,15 @@ void
EXP_Rearm(const struct object *o)
{
struct objcore *oc;
struct lru *lru;
CHECK_OBJ_NOTNULL(o, OBJECT_MAGIC);
oc = o->objcore;
if (oc == NULL)
return;
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
lru = oc_getlru(oc);
Lck_Lock(&lru->mtx);
Lck_Lock(&exp_mtx);
/*
* The hang-man might have this object of the binheap while
......@@ -226,10 +238,10 @@ EXP_Rearm(const struct object *o)
assert(oc->timer_idx != BINHEAP_NOIDX);
}
Lck_Unlock(&exp_mtx);
Lck_Unlock(&lru->mtx);
oc_updatemeta(oc);
}
/*--------------------------------------------------------------------
* This thread monitors the root of the binary heap and whenever an
* object expires, accounting also for graceability, it is killed.
......@@ -244,27 +256,50 @@ exp_timer(struct sess *sp, void *priv)
(void)priv;
t = TIM_real();
oc = NULL;
while (1) {
if (oc == NULL) {
WSL_Flush(sp->wrk, 0);
WRK_SumStat(sp->wrk);
TIM_sleep(params->expiry_sleep);
t = TIM_real();
}
Lck_Lock(&exp_mtx);
oc = binheap_root(exp_heap);
CHECK_OBJ_ORNULL(oc, OBJCORE_MAGIC);
if (oc == NULL) {
Lck_Unlock(&exp_mtx);
continue;
}
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
/*
* We may have expired so many objects that our timestamp
* got out of date, refresh it and check again.
*/
if (oc != NULL && oc->timer_when > t)
if (oc->timer_when > t)
t = TIM_real();
if (oc == NULL || oc->timer_when > t) { /* XXX: > or >= ? */
if (oc->timer_when > t) {
Lck_Unlock(&exp_mtx);
WSL_Flush(sp->wrk, 0);
WRK_SumStat(sp->wrk);
TIM_sleep(params->expiry_sleep);
t = TIM_real();
oc = NULL;
continue;
}
/* It's time... */
CHECK_OBJ_NOTNULL(oc->objhead, OBJHEAD_MAGIC);
/*
* It's time...
* Technically we should drop the exp_mtx, get the lru->mtx
* get the exp_mtx again and then check that the oc is still
* on the binheap. We take the shorter route and try to
* get the lru->mtx and punt if we fail.
*/
lru = oc_getlru(oc);
CHECK_OBJ_NOTNULL(lru, LRU_MAGIC);
if (Lck_Trylock(&lru->mtx)) {
Lck_Unlock(&exp_mtx);
oc = NULL;
continue;
}
/* Remove from binheap */
assert(oc->timer_idx != BINHEAP_NOIDX);
......@@ -272,13 +307,11 @@ exp_timer(struct sess *sp, void *priv)
assert(oc->timer_idx == BINHEAP_NOIDX);
/* And from LRU */
if (oc->flags & OC_F_ONLRU) {
lru = oc_getlru(oc);
VTAILQ_REMOVE(&lru->lru_head, oc, lru_list);
oc->flags &= ~OC_F_ONLRU;
}
lru = oc_getlru(oc);
VTAILQ_REMOVE(&lru->lru_head, oc, lru_list);
Lck_Unlock(&exp_mtx);
Lck_Unlock(&lru->mtx);
VSC_main->n_expired++;
......@@ -301,26 +334,32 @@ EXP_NukeOne(const struct sess *sp, struct lru *lru)
struct object *o;
/* Find the first currently unused object on the LRU. */
Lck_Lock(&lru->mtx);
Lck_Lock(&exp_mtx);
VTAILQ_FOREACH(oc, &lru->lru_head, lru_list) {
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
if (oc->timer_idx == BINHEAP_NOIDX) /* exp_timer has it */
continue;
assert (oc->timer_idx != BINHEAP_NOIDX);
/*
* It wont release any space if we cannot release the last
* reference, besides, if somebody else has a reference,
* it's a bad idea to nuke this object anyway.
*/
if (oc->refcnt == 1)
break;
}
if (oc != NULL) {
VTAILQ_REMOVE(&lru->lru_head, oc, lru_list);
oc->flags &= ~OC_F_ONLRU;
binheap_delete(exp_heap, oc->timer_idx);
assert(oc->timer_idx == BINHEAP_NOIDX);
VSC_main->n_lru_nuked++;
}
Lck_Unlock(&exp_mtx);
Lck_Unlock(&lru->mtx);
if (oc == NULL)
return (-1);
/* XXX: bad idea for -spersistent */
o = oc_getobj(sp->wrk, oc);
WSL(sp->wrk, SLT_ExpKill, 0, "%u LRU", o->xid);
(void)HSH_Deref(sp->wrk, NULL, &o);
......
......@@ -162,6 +162,7 @@ Lck__New(struct lock *lck, struct vsc_lck *st, const char *w)
{
struct ilck *ilck;
AN(st);
AZ(lck->priv);
ALLOC_OBJ(ilck, ILCK_MAGIC);
AN(ilck);
......
......@@ -90,3 +90,6 @@ void vsm_iter_n(struct vsm_chunk **pp);
#define VSM_CLASS_PARAM "Params"
#define VSM_CLASS_MARK "MgrCld"
#define VSM_COOL_TIME 5
/* cache_lck.c */
struct lock { void *priv; }; // Opaque
......@@ -44,6 +44,7 @@ LOCK(herder)
LOCK(wq)
LOCK(objhdr)
LOCK(exp)
LOCK(lru)
LOCK(cli)
LOCK(ban)
LOCK(vbp)
......
......@@ -71,6 +71,7 @@ LRU_Alloc(void)
ALLOC_OBJ(l, LRU_MAGIC);
AN(l);
VTAILQ_INIT(&l->lru_head);
Lck_New(&l->mtx, lck_lru);
return (l);
}
......
......@@ -56,6 +56,7 @@ struct lru {
unsigned magic;
#define LRU_MAGIC 0x3fec7bb0
VTAILQ_HEAD(,objcore) lru_head;
struct lock mtx;
};
/*--------------------------------------------------------------------*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment