Commit 3479b0d8 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Move VBO_*() for state management of objcores being filled to OBJ*().

parent de6e9b67
...@@ -680,10 +680,6 @@ double BAN_Time(const struct ban *ban); ...@@ -680,10 +680,6 @@ double BAN_Time(const struct ban *ban);
/* cache_busyobj.c */ /* cache_busyobj.c */
struct busyobj *VBO_GetBusyObj(struct worker *, const struct req *); struct busyobj *VBO_GetBusyObj(struct worker *, const struct req *);
void VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj); void VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj);
void VBO_extend(struct worker *, struct objcore *, struct boc *, ssize_t);
ssize_t VBO_waitlen(struct worker *, struct objcore *, struct boc *, ssize_t l);
void VBO_setstate(struct boc *, enum busyobj_state_e next);
void VBO_waitstate(struct boc *, enum busyobj_state_e want);
/* cache_cli.c [CLI] */ /* cache_cli.c [CLI] */
extern pthread_t cli_thread; extern pthread_t cli_thread;
...@@ -854,7 +850,10 @@ typedef int objiterate_f(void *priv, int flush, const void *ptr, ssize_t len); ...@@ -854,7 +850,10 @@ typedef int objiterate_f(void *priv, int flush, const void *ptr, ssize_t len);
int ObjIterate(struct worker *, struct objcore *, int ObjIterate(struct worker *, struct objcore *,
void *priv, objiterate_f *func); void *priv, objiterate_f *func);
int ObjGetSpace(struct worker *, struct objcore *, ssize_t *sz, uint8_t **ptr); int ObjGetSpace(struct worker *, struct objcore *, ssize_t *sz, uint8_t **ptr);
void ObjExtend(struct worker *, struct objcore *, ssize_t l); void ObjExtend(struct worker *, struct objcore *, struct boc *, ssize_t l);
ssize_t ObjWaitExtend(struct worker *, struct objcore *, struct boc *, ssize_t l);
void ObjSetState(struct boc *, enum busyobj_state_e next);
void ObjWaitState(struct boc *, enum busyobj_state_e want);
void ObjTrimStore(struct worker *, struct objcore *); void ObjTrimStore(struct worker *, struct objcore *);
void ObjTouch(struct worker *wrk, struct objcore *oc, double now); void ObjTouch(struct worker *wrk, struct objcore *oc, double now);
unsigned ObjGetXID(struct worker *, struct objcore *); unsigned ObjGetXID(struct worker *, struct objcore *);
......
...@@ -225,71 +225,3 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo) ...@@ -225,71 +225,3 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo)
vbo_Free(&bo); vbo_Free(&bo);
} }
void
VBO_extend(struct worker *wrk, struct objcore *oc, struct boc *boc,
ssize_t l)
{
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
if (l == 0)
return;
assert(l > 0);
Lck_Lock(&boc->mtx);
ObjExtend(wrk, oc, l);
AZ(pthread_cond_broadcast(&boc->cond));
Lck_Unlock(&boc->mtx);
}
ssize_t
VBO_waitlen(struct worker *wrk, struct objcore *oc, struct boc *boc,
ssize_t l)
{
ssize_t rv;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
Lck_Lock(&boc->mtx);
rv = ObjGetLen(wrk, oc);
while (1) {
assert(l <= rv || boc->state == BOS_FAILED);
if (rv > l || boc->state >= BOS_FINISHED)
break;
(void)Lck_CondWait(&boc->cond, &boc->mtx, 0);
rv = ObjGetLen(wrk, oc);
}
Lck_Unlock(&boc->mtx);
return (rv);
}
void
VBO_setstate(struct boc *boc, enum busyobj_state_e next)
{
CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
// assert(bo->do_stream || next != BOS_STREAM);
assert(next > boc->state);
Lck_Lock(&boc->mtx);
boc->state = next;
AZ(pthread_cond_broadcast(&boc->cond));
Lck_Unlock(&boc->mtx);
}
void
VBO_waitstate(struct boc *boc, enum busyobj_state_e want)
{
CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
Lck_Lock(&boc->mtx);
while (1) {
if (boc->state >= want)
break;
(void)Lck_CondWait(&boc->cond, &boc->mtx, 0);
}
Lck_Unlock(&boc->mtx);
}
...@@ -689,7 +689,7 @@ ved_stripgzip(struct req *req, struct busyobj *bo) ...@@ -689,7 +689,7 @@ ved_stripgzip(struct req *req, struct busyobj *bo)
/* XXX: Is this really required ? */ /* XXX: Is this really required ? */
if (bo != NULL) if (bo != NULL)
VBO_waitstate(bo->boc, BOS_FINISHED); ObjWaitState(bo->boc, BOS_FINISHED);
AN(ObjCheckFlag(req->wrk, req->objcore, OF_GZIPED)); AN(ObjCheckFlag(req->wrk, req->objcore, OF_GZIPED));
......
...@@ -95,8 +95,10 @@ vfp_vep_callback(struct vfp_ctx *vc, void *priv, ssize_t l, enum vgz_flag flg) ...@@ -95,8 +95,10 @@ vfp_vep_callback(struct vfp_ctx *vc, void *priv, ssize_t l, enum vgz_flag flg)
VGZ_Obuf(vef->vgz, ptr, dl); VGZ_Obuf(vef->vgz, ptr, dl);
i = VGZ_Gzip(vef->vgz, &dp, &dl, flg); i = VGZ_Gzip(vef->vgz, &dp, &dl, flg);
VGZ_UpdateObj(vc, vef->vgz, VUA_UPDATE); VGZ_UpdateObj(vc, vef->vgz, VUA_UPDATE);
vef->tot += dl; if (dl > 0) {
VFP_Extend(vc, dl); vef->tot += dl;
VFP_Extend(vc, dl);
}
} while (i != VGZ_ERROR && } while (i != VGZ_ERROR &&
(!VGZ_IbufEmpty(vef->vgz) || VGZ_ObufFull(vef->vgz))); (!VGZ_IbufEmpty(vef->vgz) || VGZ_ObufFull(vef->vgz)));
assert(i == VGZ_ERROR || VGZ_IbufEmpty(vef->vgz)); assert(i == VGZ_ERROR || VGZ_IbufEmpty(vef->vgz));
......
...@@ -202,7 +202,7 @@ vbf_stp_mkbereq(const struct worker *wrk, struct busyobj *bo) ...@@ -202,7 +202,7 @@ vbf_stp_mkbereq(const struct worker *wrk, struct busyobj *bo)
bo->ws_bo = WS_Snapshot(bo->ws); bo->ws_bo = WS_Snapshot(bo->ws);
HTTP_Copy(bo->bereq, bo->bereq0); HTTP_Copy(bo->bereq, bo->bereq0);
VBO_setstate(bo->boc, BOS_REQ_DONE); ObjSetState(bo->boc, BOS_REQ_DONE);
return (F_STP_STARTFETCH); return (F_STP_STARTFETCH);
} }
...@@ -648,7 +648,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -648,7 +648,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
if (bo->do_stream) { if (bo->do_stream) {
HSH_Unbusy(wrk, bo->fetch_objcore); HSH_Unbusy(wrk, bo->fetch_objcore);
VBO_setstate(bo->boc, BOS_STREAM); ObjSetState(bo->boc, BOS_STREAM);
} }
VSLb(bo->vsl, SLT_Fetch_Body, "%u %s %s", VSLb(bo->vsl, SLT_Fetch_Body, "%u %s %s",
...@@ -682,7 +682,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -682,7 +682,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
give predictable backend reuse behavior for varnishtest */ give predictable backend reuse behavior for varnishtest */
VDI_Finish(bo->wrk, bo); VDI_Finish(bo->wrk, bo);
VBO_setstate(bo->boc, BOS_FINISHED); ObjSetState(bo->boc, BOS_FINISHED);
VSLb_ts_busyobj(bo, "BerespBody", W_TIM_real(wrk)); VSLb_ts_busyobj(bo, "BerespBody", W_TIM_real(wrk));
if (bo->stale_oc != NULL) if (bo->stale_oc != NULL)
EXP_Rearm(bo->stale_oc, bo->stale_oc->exp.t_origin, 0, 0, 0); EXP_Rearm(bo->stale_oc, bo->stale_oc->exp.t_origin, 0, 0, 0);
...@@ -736,7 +736,7 @@ vbf_stp_condfetch(struct worker *wrk, struct busyobj *bo) ...@@ -736,7 +736,7 @@ vbf_stp_condfetch(struct worker *wrk, struct busyobj *bo)
if (bo->do_stream) { if (bo->do_stream) {
HSH_Unbusy(wrk, bo->fetch_objcore); HSH_Unbusy(wrk, bo->fetch_objcore);
VBO_setstate(bo->boc, BOS_STREAM); ObjSetState(bo->boc, BOS_STREAM);
} }
if (ObjIterate(wrk, bo->stale_oc, bo, vbf_objiterator)) if (ObjIterate(wrk, bo->stale_oc, bo, vbf_objiterator))
...@@ -758,7 +758,7 @@ vbf_stp_condfetch(struct worker *wrk, struct busyobj *bo) ...@@ -758,7 +758,7 @@ vbf_stp_condfetch(struct worker *wrk, struct busyobj *bo)
give predictable backend reuse behavior for varnishtest */ give predictable backend reuse behavior for varnishtest */
VDI_Finish(bo->wrk, bo); VDI_Finish(bo->wrk, bo);
VBO_setstate(bo->boc, BOS_FINISHED); ObjSetState(bo->boc, BOS_FINISHED);
VSLb_ts_busyobj(bo, "BerespBody", W_TIM_real(wrk)); VSLb_ts_busyobj(bo, "BerespBody", W_TIM_real(wrk));
return (F_STP_DONE); return (F_STP_DONE);
} }
...@@ -858,7 +858,7 @@ vbf_stp_error(struct worker *wrk, struct busyobj *bo) ...@@ -858,7 +858,7 @@ vbf_stp_error(struct worker *wrk, struct busyobj *bo)
VSB_delete(synth_body); VSB_delete(synth_body);
HSH_Unbusy(wrk, bo->fetch_objcore); HSH_Unbusy(wrk, bo->fetch_objcore);
VBO_setstate(bo->boc, BOS_FINISHED); ObjSetState(bo->boc, BOS_FINISHED);
return (F_STP_DONE); return (F_STP_DONE);
} }
...@@ -881,7 +881,7 @@ vbf_stp_fail(struct worker *wrk, struct busyobj *bo) ...@@ -881,7 +881,7 @@ vbf_stp_fail(struct worker *wrk, struct busyobj *bo)
bo->fetch_objcore->exp.t_origin, 0, 0, 0); bo->fetch_objcore->exp.t_origin, 0, 0, 0);
} }
wrk->stats->fetch_failed++; wrk->stats->fetch_failed++;
VBO_setstate(bo->boc, BOS_FAILED); ObjSetState(bo->boc, BOS_FAILED);
return (F_STP_DONE); return (F_STP_DONE);
} }
...@@ -1031,9 +1031,9 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc, ...@@ -1031,9 +1031,9 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc,
} else { } else {
bo_fetch = NULL; /* ref transferred to fetch thread */ bo_fetch = NULL; /* ref transferred to fetch thread */
if (mode == VBF_BACKGROUND) { if (mode == VBF_BACKGROUND) {
VBO_waitstate(bo->boc, BOS_REQ_DONE); ObjWaitState(bo->boc, BOS_REQ_DONE);
} else { } else {
VBO_waitstate(bo->boc, BOS_STREAM); ObjWaitState(bo->boc, BOS_STREAM);
if (bo->boc->state == BOS_FAILED) { if (bo->boc->state == BOS_FAILED) {
AN((oc->flags & OC_F_FAILED)); AN((oc->flags & OC_F_FAILED));
} else { } else {
......
...@@ -98,7 +98,7 @@ VFP_Extend(const struct vfp_ctx *vc, ssize_t sz) ...@@ -98,7 +98,7 @@ VFP_Extend(const struct vfp_ctx *vc, ssize_t sz)
{ {
CHECK_OBJ_NOTNULL(vc, VFP_CTX_MAGIC); CHECK_OBJ_NOTNULL(vc, VFP_CTX_MAGIC);
VBO_extend(vc->wrk, vc->oc, vc->bo->boc, sz); ObjExtend(vc->wrk, vc->oc, vc->bo->boc, sz);
} }
/********************************************************************** /**********************************************************************
......
...@@ -107,15 +107,83 @@ ObjGetSpace(struct worker *wrk, struct objcore *oc, ssize_t *sz, uint8_t **ptr) ...@@ -107,15 +107,83 @@ ObjGetSpace(struct worker *wrk, struct objcore *oc, ssize_t *sz, uint8_t **ptr)
*/ */
void void
ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l) ObjExtend(struct worker *wrk, struct objcore *oc, struct boc *boc, ssize_t l)
{ {
const struct obj_methods *om = obj_getmethods(oc); const struct obj_methods *om = obj_getmethods(oc);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
AN(om->objextend);
assert(l > 0); assert(l > 0);
AN(om->objextend); if (boc != NULL) {
om->objextend(wrk, oc, l); CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
Lck_Lock(&boc->mtx);
om->objextend(wrk, oc, l);
AZ(pthread_cond_broadcast(&boc->cond));
Lck_Unlock(&boc->mtx);
} else {
om->objextend(wrk, oc, l);
}
}
/*====================================================================
*/
ssize_t
ObjWaitExtend(struct worker *wrk, struct objcore *oc, struct boc *boc,
ssize_t l)
{
ssize_t rv;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
Lck_Lock(&boc->mtx);
rv = ObjGetLen(wrk, oc);
while (1) {
assert(l <= rv || boc->state == BOS_FAILED);
if (rv > l || boc->state >= BOS_FINISHED)
break;
(void)Lck_CondWait(&boc->cond, &boc->mtx, 0);
rv = ObjGetLen(wrk, oc);
}
Lck_Unlock(&boc->mtx);
return (rv);
}
/*====================================================================
*/
void
ObjSetState(struct boc *boc, enum busyobj_state_e next)
{
CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
// assert(bo->do_stream || next != BOS_STREAM);
assert(next > boc->state);
Lck_Lock(&boc->mtx);
boc->state = next;
AZ(pthread_cond_broadcast(&boc->cond));
Lck_Unlock(&boc->mtx);
}
/*====================================================================
*/
void
ObjWaitState(struct boc *boc, enum busyobj_state_e want)
{
CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
Lck_Lock(&boc->mtx);
while (1) {
if (boc->state >= want)
break;
(void)Lck_CondWait(&boc->cond, &boc->mtx, 0);
}
Lck_Unlock(&boc->mtx);
} }
/*==================================================================== /*====================================================================
......
...@@ -254,7 +254,7 @@ VRB_Cache(struct req *req, ssize_t maxsize) ...@@ -254,7 +254,7 @@ VRB_Cache(struct req *req, ssize_t maxsize)
req->acct.req_bodybytes += l; req->acct.req_bodybytes += l;
if (yet >= l) if (yet >= l)
yet -= l; yet -= l;
ObjExtend(req->wrk, req->body_oc, l); ObjExtend(req->wrk, req->body_oc, NULL, l);
} }
} while (vfps == VFP_OK); } while (vfps == VFP_OK);
......
...@@ -202,7 +202,7 @@ cnt_deliver(struct worker *wrk, struct req *req) ...@@ -202,7 +202,7 @@ cnt_deliver(struct worker *wrk, struct req *req)
if (req->esi_level == 0 && bo->boc->state == BOS_FINISHED) { if (req->esi_level == 0 && bo->boc->state == BOS_FINISHED) {
VBO_DerefBusyObj(wrk, &bo); VBO_DerefBusyObj(wrk, &bo);
} else if (!bo->do_stream) { } else if (!bo->do_stream) {
VBO_waitstate(bo->boc, BOS_FINISHED); ObjWaitState(bo->boc, BOS_FINISHED);
VBO_DerefBusyObj(wrk, &bo); VBO_DerefBusyObj(wrk, &bo);
} }
} }
...@@ -216,7 +216,7 @@ cnt_deliver(struct worker *wrk, struct req *req) ...@@ -216,7 +216,7 @@ cnt_deliver(struct worker *wrk, struct req *req)
if (req->objcore->flags & (OC_F_PRIVATE | OC_F_PASS)) { if (req->objcore->flags & (OC_F_PRIVATE | OC_F_PASS)) {
if (bo != NULL) if (bo != NULL)
VBO_waitstate(bo->boc, BOS_FINISHED); ObjWaitState(bo->boc, BOS_FINISHED);
ObjSlim(wrk, req->objcore); ObjSlim(wrk, req->objcore);
} }
...@@ -296,7 +296,7 @@ cnt_synth(struct worker *wrk, struct req *req) ...@@ -296,7 +296,7 @@ cnt_synth(struct worker *wrk, struct req *req)
sz = szl; sz = szl;
if (ObjGetSpace(wrk, req->objcore, &sz, &ptr) && sz >= szl) { if (ObjGetSpace(wrk, req->objcore, &sz, &ptr) && sz >= szl) {
memcpy(ptr, VSB_data(synth_body), szl); memcpy(ptr, VSB_data(synth_body), szl);
ObjExtend(wrk, req->objcore, szl); ObjExtend(wrk, req->objcore, NULL, szl);
} else } else
szl = -1; szl = -1;
} }
......
...@@ -257,7 +257,7 @@ sml_iterator(struct worker *wrk, struct objcore *oc, ...@@ -257,7 +257,7 @@ sml_iterator(struct worker *wrk, struct objcore *oc,
while (1) { while (1) {
ol = len; ol = len;
nl = VBO_waitlen(wrk, oc, bo->boc, ol); nl = ObjWaitExtend(wrk, oc, bo->boc, ol);
if (bo->boc->state == BOS_FAILED) { if (bo->boc->state == BOS_FAILED) {
ret = -1; ret = -1;
break; break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment