Commit 867743f0 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Split rushing of waiting list into two stages: 1 get the req's to be rushed

under the oh->mtx.  2 rush them out of the mtx.
parent 0895d2e6
...@@ -57,15 +57,21 @@ ...@@ -57,15 +57,21 @@
#include "cache.h" #include "cache.h"
#include "hash/hash_slinger.h" #include "hash/hash_slinger.h"
#include "vsha256.h" #include "vsha256.h"
#include "vtim.h" #include "vtim.h"
struct rush {
unsigned magic;
#define RUSH_MAGIC 0xa1af5f01
VTAILQ_HEAD(,req) reqs;
};
static const struct hash_slinger *hash; static const struct hash_slinger *hash;
static struct objhead *private_oh; static struct objhead *private_oh;
static void hsh_rush(struct worker *wrk, struct objhead *oh); static void hsh_rush1(struct worker *, struct objhead *, struct rush *, int);
static void hsh_rush2(struct worker *, struct rush *);
/*---------------------------------------------------------------------*/ /*---------------------------------------------------------------------*/
...@@ -258,6 +264,7 @@ HSH_Insert(struct worker *wrk, const void *digest, struct objcore *oc, ...@@ -258,6 +264,7 @@ HSH_Insert(struct worker *wrk, const void *digest, struct objcore *oc,
struct ban *ban) struct ban *ban)
{ {
struct objhead *oh; struct objhead *oh;
struct rush rush;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
AN(digest); AN(digest);
...@@ -266,6 +273,7 @@ HSH_Insert(struct worker *wrk, const void *digest, struct objcore *oc, ...@@ -266,6 +273,7 @@ HSH_Insert(struct worker *wrk, const void *digest, struct objcore *oc,
AN(oc->flags & OC_F_BUSY); AN(oc->flags & OC_F_BUSY);
AZ(oc->flags & OC_F_PRIVATE); AZ(oc->flags & OC_F_PRIVATE);
assert(oc->refcnt == 1); assert(oc->refcnt == 1);
INIT_OBJ(&rush, RUSH_MAGIC);
hsh_prealloc(wrk); hsh_prealloc(wrk);
...@@ -293,8 +301,9 @@ HSH_Insert(struct worker *wrk, const void *digest, struct objcore *oc, ...@@ -293,8 +301,9 @@ HSH_Insert(struct worker *wrk, const void *digest, struct objcore *oc,
VTAILQ_INSERT_HEAD(&oh->objcs, oc, hsh_list); VTAILQ_INSERT_HEAD(&oh->objcs, oc, hsh_list);
oc->flags &= ~OC_F_BUSY; oc->flags &= ~OC_F_BUSY;
if (!VTAILQ_EMPTY(&oh->waitinglist)) if (!VTAILQ_EMPTY(&oh->waitinglist))
hsh_rush(wrk, oh); hsh_rush1(wrk, oh, &rush, 0);
Lck_Unlock(&oh->mtx); Lck_Unlock(&oh->mtx);
hsh_rush2(wrk, &rush);
} }
/*--------------------------------------------------------------------- /*---------------------------------------------------------------------
...@@ -495,19 +504,21 @@ HSH_Lookup(struct req *req, struct objcore **ocp, struct objcore **bocp, ...@@ -495,19 +504,21 @@ HSH_Lookup(struct req *req, struct objcore **ocp, struct objcore **bocp,
} }
/*--------------------------------------------------------------------- /*---------------------------------------------------------------------
* Pick the req's we are going to rush from the waiting list
*/ */
static void static void
hsh_rush(struct worker *wrk, struct objhead *oh) hsh_rush1(struct worker *wrk, struct objhead *oh, struct rush *r, int all)
{ {
unsigned u; unsigned u;
struct req *req; struct req *req;
struct sess *sp;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(oh, OBJHEAD_MAGIC); CHECK_OBJ_NOTNULL(oh, OBJHEAD_MAGIC);
CHECK_OBJ_NOTNULL(r, RUSH_MAGIC);
VTAILQ_INIT(&r->reqs);
Lck_AssertHeld(&oh->mtx); Lck_AssertHeld(&oh->mtx);
for (u = 0; u < cache_param->rush_exponent; u++) { for (u = 0; u < cache_param->rush_exponent || all; u++) {
req = VTAILQ_FIRST(&oh->waitinglist); req = VTAILQ_FIRST(&oh->waitinglist);
if (req == NULL) if (req == NULL)
break; break;
...@@ -515,14 +526,32 @@ hsh_rush(struct worker *wrk, struct objhead *oh) ...@@ -515,14 +526,32 @@ hsh_rush(struct worker *wrk, struct objhead *oh)
wrk->stats->busy_wakeup++; wrk->stats->busy_wakeup++;
AZ(req->wrk); AZ(req->wrk);
VTAILQ_REMOVE(&oh->waitinglist, req, w_list); VTAILQ_REMOVE(&oh->waitinglist, req, w_list);
DSL(DBG_WAITINGLIST, req->vsl->wid, "off waiting list"); VTAILQ_INSERT_TAIL(&r->reqs, req, w_list);
if (SES_Reschedule_Req(req)) { }
/* }
* In case of overloads, we ditch the entire
* waiting list. /*---------------------------------------------------------------------
* Rush req's that came from waiting list.
*/ */
static void
hsh_rush2(struct worker *wrk, struct rush *r)
{
struct req *req;
struct sess *sp;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(r, RUSH_MAGIC);
while(!VTAILQ_EMPTY(&r->reqs)) {
req = VTAILQ_FIRST(&r->reqs);
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
VTAILQ_REMOVE(&r->reqs, req, w_list);
DSL(DBG_WAITINGLIST, req->vsl->wid, "off waiting list");
if (!SES_Reschedule_Req(req))
continue;
/* Couldn't schedule, ditch */
wrk->stats->busy_wakeup--; wrk->stats->busy_wakeup--;
while (1) {
wrk->stats->busy_killed++; wrk->stats->busy_killed++;
AN (req->vcl); AN (req->vcl);
VCL_Rel(&req->vcl); VCL_Rel(&req->vcl);
...@@ -531,16 +560,8 @@ hsh_rush(struct worker *wrk, struct objhead *oh) ...@@ -531,16 +560,8 @@ hsh_rush(struct worker *wrk, struct objhead *oh)
CNT_AcctLogCharge(wrk->stats, req); CNT_AcctLogCharge(wrk->stats, req);
Req_Release(req); Req_Release(req);
SES_Delete(sp, SC_OVERLOAD, NAN); SES_Delete(sp, SC_OVERLOAD, NAN);
req = VTAILQ_FIRST(&oh->waitinglist); DSL(DBG_WAITINGLIST, req->vsl->wid, "kill from waiting list");
if (req == NULL) (void)usleep(100000);
break;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
VTAILQ_REMOVE(&oh->waitinglist, req, w_list);
DSL(DBG_WAITINGLIST, req->vsl->wid,
"kill from waiting list");
}
break;
}
} }
} }
...@@ -657,11 +678,13 @@ void ...@@ -657,11 +678,13 @@ void
HSH_Unbusy(struct worker *wrk, struct objcore *oc) HSH_Unbusy(struct worker *wrk, struct objcore *oc)
{ {
struct objhead *oh; struct objhead *oh;
struct rush rush;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC); CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
oh = oc->objhead; oh = oc->objhead;
CHECK_OBJ(oh, OBJHEAD_MAGIC); CHECK_OBJ(oh, OBJHEAD_MAGIC);
INIT_OBJ(&rush, RUSH_MAGIC);
AN(oc->stobj->stevedore); AN(oc->stobj->stevedore);
AN(oc->flags & OC_F_BUSY); AN(oc->flags & OC_F_BUSY);
...@@ -684,10 +707,11 @@ HSH_Unbusy(struct worker *wrk, struct objcore *oc) ...@@ -684,10 +707,11 @@ HSH_Unbusy(struct worker *wrk, struct objcore *oc)
VTAILQ_INSERT_HEAD(&oh->objcs, oc, hsh_list); VTAILQ_INSERT_HEAD(&oh->objcs, oc, hsh_list);
oc->flags &= ~OC_F_BUSY; oc->flags &= ~OC_F_BUSY;
if (!VTAILQ_EMPTY(&oh->waitinglist)) if (!VTAILQ_EMPTY(&oh->waitinglist))
hsh_rush(wrk, oh); hsh_rush1(wrk, oh, &rush, 0);
Lck_Unlock(&oh->mtx); Lck_Unlock(&oh->mtx);
if (!(oc->flags & OC_F_PRIVATE)) if (!(oc->flags & OC_F_PRIVATE))
EXP_Insert(wrk, oc); EXP_Insert(wrk, oc);
hsh_rush2(wrk, &rush);
} }
/*==================================================================== /*====================================================================
...@@ -818,6 +842,7 @@ HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp) ...@@ -818,6 +842,7 @@ HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp)
{ {
struct objcore *oc; struct objcore *oc;
struct objhead *oh; struct objhead *oh;
struct rush rush;
unsigned r; unsigned r;
AN(ocp); AN(ocp);
...@@ -827,6 +852,7 @@ HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp) ...@@ -827,6 +852,7 @@ HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp)
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC); CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
assert(oc->refcnt > 0); assert(oc->refcnt > 0);
INIT_OBJ(&rush, RUSH_MAGIC);
oh = oc->objhead; oh = oc->objhead;
CHECK_OBJ_NOTNULL(oh, OBJHEAD_MAGIC); CHECK_OBJ_NOTNULL(oh, OBJHEAD_MAGIC);
...@@ -837,8 +863,9 @@ HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp) ...@@ -837,8 +863,9 @@ HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp)
if (!r) if (!r)
VTAILQ_REMOVE(&oh->objcs, oc, hsh_list); VTAILQ_REMOVE(&oh->objcs, oc, hsh_list);
if (!VTAILQ_EMPTY(&oh->waitinglist)) if (!VTAILQ_EMPTY(&oh->waitinglist))
hsh_rush(wrk, oh); hsh_rush1(wrk, oh, &rush, 0);
Lck_Unlock(&oh->mtx); Lck_Unlock(&oh->mtx);
hsh_rush2(wrk, &rush);
if (r != 0) if (r != 0)
return (r); return (r);
...@@ -861,10 +888,12 @@ int ...@@ -861,10 +888,12 @@ int
HSH_DerefObjHead(struct worker *wrk, struct objhead **poh) HSH_DerefObjHead(struct worker *wrk, struct objhead **poh)
{ {
struct objhead *oh; struct objhead *oh;
struct rush rush;
int r; int r;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
TAKE_OBJ_NOTNULL(oh, poh, OBJHEAD_MAGIC); TAKE_OBJ_NOTNULL(oh, poh, OBJHEAD_MAGIC);
INIT_OBJ(&rush, RUSH_MAGIC);
if (oh == private_oh) { if (oh == private_oh) {
assert(VTAILQ_EMPTY(&oh->waitinglist)); assert(VTAILQ_EMPTY(&oh->waitinglist));
...@@ -883,16 +912,14 @@ HSH_DerefObjHead(struct worker *wrk, struct objhead **poh) ...@@ -883,16 +912,14 @@ HSH_DerefObjHead(struct worker *wrk, struct objhead **poh)
* just make the hold the same ref's as objcore, that would * just make the hold the same ref's as objcore, that would
* confuse hashers. * confuse hashers.
*/ */
while (!VTAILQ_EMPTY(&oh->waitinglist)) {
Lck_Lock(&oh->mtx); Lck_Lock(&oh->mtx);
assert(oh->refcnt > 0); while (oh->refcnt == 1 && !VTAILQ_EMPTY(&oh->waitinglist)) {
r = oh->refcnt; hsh_rush1(wrk, oh, &rush, 1);
hsh_rush(wrk, oh);
Lck_Unlock(&oh->mtx); Lck_Unlock(&oh->mtx);
if (r > 1) hsh_rush2(wrk, &rush);
break; Lck_Lock(&oh->mtx);
usleep(100000);
} }
Lck_Unlock(&oh->mtx);
assert(oh->refcnt > 0); assert(oh->refcnt > 0);
r = hash->deref(oh); r = hash->deref(oh);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment