Commit f2fdfead authored by Nils Goroll's avatar Nils Goroll

Rush waiters specifically

Waiters hang off the objhead, so the simple fact that we manipulate
some objcore on the same objhead does not imply that there are
relevant news to the waiters.

We now are more specific about when we wake up waiters and how many.

Never for
- expiry / ban / purge / nuke
- stale objects we just happen to reference

We do want to rush when
- fetching a busy object failed or got aborted: We should wake up one
  waiter to re-attempt the fetch
- we succeeded fetching a busy object or delivering: Rush more waiters
  by policy

This fixes the general issue described in #1928 that we should not
rush when expiring.

Closes #1945
parent 9dea74f7
......@@ -265,7 +265,7 @@ ban_lurker_test_ban(struct worker *wrk, struct vsl_log *vsl, struct ban *bt,
if (i)
ObjSendEvent(wrk, oc, OEV_BANCHG);
}
(void)HSH_DerefObjCore(wrk, &oc);
(void)HSH_DerefObjCore(wrk, &oc, 0);
}
}
......
......@@ -171,7 +171,8 @@ VBO_ReleaseBusyObj(struct worker *wrk, struct busyobj **pbo)
if (bo->fetch_objcore != NULL) {
AN(wrk);
(void)HSH_DerefObjCore(wrk, &bo->fetch_objcore);
(void)HSH_DerefObjCore(wrk, &bo->fetch_objcore,
HSH_RUSH_POLICY);
}
VCL_Rel(&bo->vcl);
......
......@@ -194,7 +194,7 @@ exp_inbox(struct exp_priv *ep, struct objcore *oc, unsigned flags)
assert(oc->refcnt > 0);
AZ(oc->exp_flags);
ObjSendEvent(ep->wrk, oc, OEV_EXPIRE);
(void)HSH_DerefObjCore(ep->wrk, &oc);
(void)HSH_DerefObjCore(ep->wrk, &oc, 0);
return;
}
......@@ -271,7 +271,7 @@ exp_expire(struct exp_priv *ep, double now)
VSLb(&ep->vsl, SLT_ExpKill, "EXP_Expired x=%u t=%.0f",
ObjGetXID(ep->wrk, oc), EXP_Ttl(NULL, oc) - now);
ObjSendEvent(ep->wrk, oc, OEV_EXPIRE);
(void)HSH_DerefObjCore(ep->wrk, &oc);
(void)HSH_DerefObjCore(ep->wrk, &oc, 0);
}
return (0);
}
......
......@@ -952,7 +952,7 @@ vbf_fetch_thread(struct worker *wrk, void *priv)
CHECK_OBJ_NOTNULL(bo->stale_oc, OBJCORE_MAGIC);
/* We don't want the oc/stevedore ops in fetching thread */
if (!ObjCheckFlag(wrk, bo->stale_oc, OF_IMSCAND))
(void)HSH_DerefObjCore(wrk, &bo->stale_oc);
(void)HSH_DerefObjCore(wrk, &bo->stale_oc, 0);
}
#endif
......@@ -984,7 +984,7 @@ vbf_fetch_thread(struct worker *wrk, void *priv)
// AZ(bo->fetch_objcore->boc); // XXX
if (bo->stale_oc != NULL)
(void)HSH_DerefObjCore(wrk, &bo->stale_oc);
(void)HSH_DerefObjCore(wrk, &bo->stale_oc, 0);
wrk->vsl = NULL;
HSH_DerefBoc(wrk, bo->fetch_objcore);
......@@ -1061,7 +1061,7 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc,
wrk->stats->fetch_no_thread++;
(void)vbf_stp_fail(req->wrk, bo);
if (bo->stale_oc != NULL)
(void)HSH_DerefObjCore(wrk, &bo->stale_oc);
(void)HSH_DerefObjCore(wrk, &bo->stale_oc, 0);
HSH_DerefBoc(wrk, oc);
SES_Rel(bo->sp);
VBO_ReleaseBusyObj(wrk, &bo);
......@@ -1083,6 +1083,6 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc,
assert(oc->boc == boc);
HSH_DerefBoc(wrk, oc);
if (mode == VBF_BACKGROUND)
(void)HSH_DerefObjCore(wrk, &oc);
(void)HSH_DerefObjCore(wrk, &oc, HSH_RUSH_POLICY);
THR_SetBusyobj(NULL);
}
......@@ -625,7 +625,7 @@ double keep)
oc = ocp[n];
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
EXP_Rearm(oc, now, ttl, grace, keep);
(void)HSH_DerefObjCore(wrk, &oc);
(void)HSH_DerefObjCore(wrk, &oc, 0);
}
} while (more);
WS_Release(wrk->aws, 0);
......@@ -843,7 +843,7 @@ HSH_DerefBoc(struct worker *wrk, struct objcore *oc)
*/
int
HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp)
HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp, int rushmax)
{
struct objcore *oc;
struct objhead *oh;
......@@ -868,7 +868,7 @@ HSH_DerefObjCore(struct worker *wrk, struct objcore **ocp)
if (!r)
VTAILQ_REMOVE(&oh->objcs, oc, hsh_list);
if (!VTAILQ_EMPTY(&oh->waitinglist))
hsh_rush1(wrk, oh, &rush, HSH_RUSH_POLICY);
hsh_rush1(wrk, oh, &rush, rushmax);
Lck_Unlock(&oh->mtx);
hsh_rush2(wrk, &rush);
if (r != 0)
......
......@@ -67,7 +67,7 @@ vrb_pull(struct req *req, ssize_t maxsize, objiterate_f *func, void *priv)
if (VFP_Open(vfc) < 0) {
req->req_body_status = REQ_BODY_FAIL;
HSH_DerefBoc(req->wrk, req->body_oc);
AZ(HSH_DerefObjCore(req->wrk, &req->body_oc));
AZ(HSH_DerefObjCore(req->wrk, &req->body_oc, 0));
return (-1);
}
......@@ -108,7 +108,7 @@ vrb_pull(struct req *req, ssize_t maxsize, objiterate_f *func, void *priv)
VSLb_ts_req(req, "ReqBody", VTIM_real());
if (func != NULL) {
HSH_DerefBoc(req->wrk, req->body_oc);
AZ(HSH_DerefObjCore(req->wrk, &req->body_oc));
AZ(HSH_DerefObjCore(req->wrk, &req->body_oc, 0));
if (vfps != VFP_END) {
req->req_body_status = REQ_BODY_FAIL;
if (r == 0)
......@@ -123,7 +123,7 @@ vrb_pull(struct req *req, ssize_t maxsize, objiterate_f *func, void *priv)
if (vfps != VFP_END) {
req->req_body_status = REQ_BODY_FAIL;
AZ(HSH_DerefObjCore(req->wrk, &req->body_oc));
AZ(HSH_DerefObjCore(req->wrk, &req->body_oc, 0));
return (-1);
}
......@@ -239,7 +239,7 @@ VRB_Free(struct req *req)
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
if (req->body_oc != NULL)
AZ(HSH_DerefObjCore(req->wrk, &req->body_oc));
AZ(HSH_DerefObjCore(req->wrk, &req->body_oc, 0));
}
/*----------------------------------------------------------------------
......
......@@ -109,7 +109,7 @@ cnt_deliver(struct worker *wrk, struct req *req)
wrk->handling = VCL_RET_DELIVER;
if (wrk->handling != VCL_RET_DELIVER) {
(void)HSH_DerefObjCore(wrk, &req->objcore);
(void)HSH_DerefObjCore(wrk, &req->objcore, HSH_RUSH_POLICY);
http_Teardown(req->resp);
switch (wrk->handling) {
......@@ -216,7 +216,7 @@ cnt_synth(struct worker *wrk, struct req *req)
VSLb(req->vsl, SLT_Error, "Could not get storage");
req->doclose = SC_OVERLOAD;
VSLb_ts_req(req, "Resp", W_TIM_real(wrk));
(void)HSH_DerefObjCore(wrk, &req->objcore);
(void)HSH_DerefObjCore(wrk, &req->objcore, 1);
http_Teardown(req->resp);
return (REQ_FSM_DONE);
}
......@@ -311,7 +311,7 @@ cnt_transmit(struct worker *wrk, struct req *req)
if (boc != NULL)
HSH_DerefBoc(wrk, req->objcore);
(void)HSH_DerefObjCore(wrk, &req->objcore);
(void)HSH_DerefObjCore(wrk, &req->objcore, HSH_RUSH_POLICY);
http_Teardown(req->resp);
return (REQ_FSM_DONE);
......@@ -335,7 +335,7 @@ cnt_fetch(struct worker *wrk, struct req *req)
if (req->objcore->flags & OC_F_FAILED) {
req->err_code = 503;
req->req_step = R_STP_SYNTH;
(void)HSH_DerefObjCore(wrk, &req->objcore);
(void)HSH_DerefObjCore(wrk, &req->objcore, 1);
AZ(req->objcore);
return (REQ_FSM_MORE);
}
......@@ -426,7 +426,8 @@ cnt_lookup(struct worker *wrk, struct req *req)
req->stale_oc = oc;
req->req_step = R_STP_MISS;
} else {
(void)HSH_DerefObjCore(wrk, &req->objcore);
(void)HSH_DerefObjCore(wrk, &req->objcore,
HSH_RUSH_POLICY);
/*
* We don't have a busy object, so treat this
* like a pass
......@@ -453,10 +454,10 @@ cnt_lookup(struct worker *wrk, struct req *req)
}
/* Drop our object, we won't need it */
(void)HSH_DerefObjCore(wrk, &req->objcore);
(void)HSH_DerefObjCore(wrk, &req->objcore, HSH_RUSH_POLICY);
if (busy != NULL) {
(void)HSH_DerefObjCore(wrk, &busy);
(void)HSH_DerefObjCore(wrk, &busy, 0);
VRY_Clear(req);
}
......@@ -482,7 +483,7 @@ cnt_miss(struct worker *wrk, struct req *req)
wrk->stats->cache_miss++;
VBF_Fetch(wrk, req, req->objcore, req->stale_oc, VBF_NORMAL);
if (req->stale_oc != NULL)
(void)HSH_DerefObjCore(wrk, &req->stale_oc);
(void)HSH_DerefObjCore(wrk, &req->stale_oc, 0);
req->req_step = R_STP_FETCH;
return (REQ_FSM_MORE);
case VCL_RET_SYNTH:
......@@ -499,8 +500,8 @@ cnt_miss(struct worker *wrk, struct req *req)
}
VRY_Clear(req);
if (req->stale_oc != NULL)
(void)HSH_DerefObjCore(wrk, &req->stale_oc);
AZ(HSH_DerefObjCore(wrk, &req->objcore));
(void)HSH_DerefObjCore(wrk, &req->stale_oc, 0);
AZ(HSH_DerefObjCore(wrk, &req->objcore, 1));
return (REQ_FSM_MORE);
}
......@@ -766,7 +767,7 @@ cnt_purge(struct worker *wrk, struct req *req)
HSH_Purge(wrk, boc->objhead, 0, 0, 0);
AZ(HSH_DerefObjCore(wrk, &boc));
AZ(HSH_DerefObjCore(wrk, &boc, 1));
VCL_purge_method(req->vcl, wrk, req, NULL, NULL);
switch (wrk->handling) {
......
......@@ -107,9 +107,9 @@ struct objhead {
void HSH_Fail(struct objcore *);
void HSH_Unbusy(struct worker *, struct objcore *);
void HSH_DeleteObjHead(struct worker *, struct objhead *oh);
int HSH_DerefObjHead(struct worker *, struct objhead **poh);
int HSH_DerefObjCore(struct worker *, struct objcore **ocp);
void HSH_DeleteObjHead(struct worker *, struct objhead *);
int HSH_DerefObjHead(struct worker *, struct objhead **);
int HSH_DerefObjCore(struct worker *, struct objcore **, int);
#define HSH_RUSH_POLICY -1
#define HSH_RUSH_ALL INT_MAX
#endif /* VARNISH_CACHE_CHILD */
......
......@@ -198,6 +198,6 @@ LRU_NukeOne(struct worker *wrk, struct lru *lru)
ObjSlim(wrk, oc);
VSLb(wrk->vsl, SLT_ExpKill, "LRU x=%u", ObjGetXID(wrk, oc));
(void)HSH_DerefObjCore(wrk, &oc); // Ref from HSH_Snipe
(void)HSH_DerefObjCore(wrk, &oc, 0); // Ref from HSH_Snipe
return (1);
}
......@@ -173,7 +173,7 @@ smp_load_seg(struct worker *wrk, const struct smp_sc *sc,
HSH_Insert(wrk, so->hash, oc, ban);
AN(oc->ban);
HSH_DerefBoc(wrk, oc); // XXX Keep it an stream resurrection?
(void)HSH_DerefObjCore(wrk, &oc);
(void)HSH_DerefObjCore(wrk, &oc, HSH_RUSH_POLICY);
wrk->stats->n_vampireobject++;
}
Pool_Sumstat(wrk);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment