Commit 9f8dba34 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Start the divorce that separates struct busyobj into a "busy objcore"

(struct boc) and "the backend fetch context" (struct bereq).
parent 559e1cf5
...@@ -374,6 +374,33 @@ struct storeobj { ...@@ -374,6 +374,33 @@ struct storeobj {
uintptr_t priv2; uintptr_t priv2;
}; };
/* Busy Objcore structure --------------------------------------------
*
*/
/*
* The macro-states we expose outside the fetch code
*/
enum busyobj_state_e {
BOS_INVALID = 0, /* don't touch (yet) */
BOS_REQ_DONE, /* beresp.* can be examined */
BOS_STREAM, /* beresp.* can be examined */
BOS_FINISHED, /* object is complete */
BOS_FAILED, /* something went wrong */
};
struct boc {
unsigned magic;
#define BOC_MAGIC 0x70c98476
unsigned refcount;
struct lock mtx;
pthread_cond_t cond;
void *stevedore_priv;
enum busyobj_state_e state;
uint8_t *vary;
};
/* Object core structure --------------------------------------------- /* Object core structure ---------------------------------------------
* Objects have sideways references in the binary heap and the LRU list * Objects have sideways references in the binary heap and the LRU list
* and we want to avoid paging in a lot of objects just to move them up * and we want to avoid paging in a lot of objects just to move them up
...@@ -425,17 +452,6 @@ struct objcore { ...@@ -425,17 +452,6 @@ struct objcore {
* streaming delivery will make use of. * streaming delivery will make use of.
*/ */
/*
* The macro-states we expose outside the fetch code
*/
enum busyobj_state_e {
BOS_INVALID = 0, /* don't touch (yet) */
BOS_REQ_DONE, /* beresp.* can be examined */
BOS_STREAM, /* beresp.* can be examined */
BOS_FINISHED, /* object is complete */
BOS_FAILED, /* something went wrong */
};
enum director_state_e { enum director_state_e {
DIR_S_NULL = 0, DIR_S_NULL = 0,
DIR_S_HDRS = 1, DIR_S_HDRS = 1,
...@@ -445,25 +461,21 @@ enum director_state_e { ...@@ -445,25 +461,21 @@ enum director_state_e {
struct busyobj { struct busyobj {
unsigned magic; unsigned magic;
#define BUSYOBJ_MAGIC 0x23b95567 #define BUSYOBJ_MAGIC 0x23b95567
struct lock mtx;
pthread_cond_t cond; struct boc boc[1];
char *end; char *end;
/* /*
* All fields from refcount and down are zeroed when the busyobj * All fields from refcount and down are zeroed when the busyobj
* is recycled. * is recycled.
*/ */
unsigned refcount;
int retries; int retries;
struct req *req; struct req *req;
struct worker *wrk; struct worker *wrk;
uint8_t *vary;
struct vfp_ctx vfc[1]; struct vfp_ctx vfc[1];
enum busyobj_state_e state;
struct ws ws[1]; struct ws ws[1];
char *ws_bo; char *ws_bo;
struct http *bereq0; struct http *bereq0;
...@@ -500,8 +512,6 @@ struct busyobj { ...@@ -500,8 +512,6 @@ struct busyobj {
struct vsl_log vsl[1]; struct vsl_log vsl[1];
void *stevedore_priv;
uint8_t digest[DIGEST_LEN]; uint8_t digest[DIGEST_LEN];
struct vrt_privs privs[1]; struct vrt_privs privs[1];
}; };
......
...@@ -68,8 +68,9 @@ vbo_New(void) ...@@ -68,8 +68,9 @@ vbo_New(void)
XXXAN(bo); XXXAN(bo);
bo->magic = BUSYOBJ_MAGIC; bo->magic = BUSYOBJ_MAGIC;
bo->end = (char *)bo + sz; bo->end = (char *)bo + sz;
Lck_New(&bo->mtx, lck_busyobj); INIT_OBJ(bo->boc, BOC_MAGIC);
AZ(pthread_cond_init(&bo->cond, NULL)); Lck_New(&bo->boc->mtx, lck_busyobj);
AZ(pthread_cond_init(&bo->boc->cond, NULL));
return (bo); return (bo);
} }
...@@ -83,9 +84,9 @@ vbo_Free(struct busyobj **bop) ...@@ -83,9 +84,9 @@ vbo_Free(struct busyobj **bop)
*bop = NULL; *bop = NULL;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
AZ(bo->htc); AZ(bo->htc);
AZ(bo->refcount); AZ(bo->boc->refcount);
AZ(pthread_cond_destroy(&bo->cond)); AZ(pthread_cond_destroy(&bo->boc->cond));
Lck_Delete(&bo->mtx); Lck_Delete(&bo->boc->mtx);
MPL_Free(vbopool, bo); MPL_Free(vbopool, bo);
} }
...@@ -101,9 +102,9 @@ VBO_GetBusyObj(struct worker *wrk, const struct req *req) ...@@ -101,9 +102,9 @@ VBO_GetBusyObj(struct worker *wrk, const struct req *req)
bo = vbo_New(); bo = vbo_New();
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
AZ(bo->refcount); AZ(bo->boc->refcount);
bo->refcount = 1; bo->boc->refcount = 1;
p = (void*)(bo + 1); p = (void*)(bo + 1);
p = (void*)PRNDUP(p); p = (void*)PRNDUP(p);
...@@ -171,14 +172,14 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo) ...@@ -171,14 +172,14 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo)
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC); CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
CHECK_OBJ_NOTNULL(oc->objhead, OBJHEAD_MAGIC); CHECK_OBJ_NOTNULL(oc->objhead, OBJHEAD_MAGIC);
Lck_Lock(&oc->objhead->mtx); Lck_Lock(&oc->objhead->mtx);
assert(bo->refcount > 0); assert(bo->boc->refcount > 0);
r = --bo->refcount; r = --bo->boc->refcount;
Lck_Unlock(&oc->objhead->mtx); Lck_Unlock(&oc->objhead->mtx);
} else { } else {
Lck_Lock(&bo->mtx); Lck_Lock(&bo->boc->mtx);
assert(bo->refcount > 0); assert(bo->boc->refcount > 0);
r = --bo->refcount; r = --bo->boc->refcount;
Lck_Unlock(&bo->mtx); Lck_Unlock(&bo->boc->mtx);
} }
if (r) if (r)
...@@ -211,11 +212,16 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo) ...@@ -211,11 +212,16 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo)
VCL_Rel(&bo->vcl); VCL_Rel(&bo->vcl);
if (bo->vary != NULL) AZ(bo->boc->stevedore_priv);
free(bo->vary); AZ(bo->boc->refcount);
bo->boc->state = BOS_INVALID;
if (bo->boc->vary != NULL) {
free(bo->boc->vary);
bo->boc->vary = NULL;
}
memset(&bo->refcount, 0, memset(&bo->retries, 0,
sizeof *bo - offsetof(struct busyobj, refcount)); sizeof *bo - offsetof(struct busyobj, retries));
vbo_Free(&bo); vbo_Free(&bo);
} }
...@@ -231,10 +237,10 @@ VBO_extend(struct worker *wrk, struct objcore *oc, struct busyobj *bo, ...@@ -231,10 +237,10 @@ VBO_extend(struct worker *wrk, struct objcore *oc, struct busyobj *bo,
if (l == 0) if (l == 0)
return; return;
assert(l > 0); assert(l > 0);
Lck_Lock(&bo->mtx); Lck_Lock(&bo->boc->mtx);
ObjExtend(wrk, oc, l); ObjExtend(wrk, oc, l);
AZ(pthread_cond_broadcast(&bo->cond)); AZ(pthread_cond_broadcast(&bo->boc->cond));
Lck_Unlock(&bo->mtx); Lck_Unlock(&bo->boc->mtx);
} }
ssize_t ssize_t
...@@ -246,16 +252,16 @@ VBO_waitlen(struct worker *wrk, struct objcore *oc, struct busyobj *bo, ...@@ -246,16 +252,16 @@ VBO_waitlen(struct worker *wrk, struct objcore *oc, struct busyobj *bo,
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
xxxassert(bo->fetch_objcore == oc); xxxassert(bo->fetch_objcore == oc);
Lck_Lock(&bo->mtx); Lck_Lock(&bo->boc->mtx);
rv = ObjGetLen(wrk, oc); rv = ObjGetLen(wrk, oc);
while (1) { while (1) {
assert(l <= rv || bo->state == BOS_FAILED); assert(l <= rv || bo->boc->state == BOS_FAILED);
if (rv > l || bo->state >= BOS_FINISHED) if (rv > l || bo->boc->state >= BOS_FINISHED)
break; break;
(void)Lck_CondWait(&bo->cond, &bo->mtx, 0); (void)Lck_CondWait(&bo->boc->cond, &bo->boc->mtx, 0);
rv = ObjGetLen(wrk, oc); rv = ObjGetLen(wrk, oc);
} }
Lck_Unlock(&bo->mtx); Lck_Unlock(&bo->boc->mtx);
return (rv); return (rv);
} }
...@@ -264,21 +270,21 @@ VBO_setstate(struct busyobj *bo, enum busyobj_state_e next) ...@@ -264,21 +270,21 @@ VBO_setstate(struct busyobj *bo, enum busyobj_state_e next)
{ {
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
assert(bo->do_stream || next != BOS_STREAM); assert(bo->do_stream || next != BOS_STREAM);
assert(next > bo->state); assert(next > bo->boc->state);
Lck_Lock(&bo->mtx); Lck_Lock(&bo->boc->mtx);
bo->state = next; bo->boc->state = next;
AZ(pthread_cond_broadcast(&bo->cond)); AZ(pthread_cond_broadcast(&bo->boc->cond));
Lck_Unlock(&bo->mtx); Lck_Unlock(&bo->boc->mtx);
} }
void void
VBO_waitstate(struct busyobj *bo, enum busyobj_state_e want) VBO_waitstate(struct busyobj *bo, enum busyobj_state_e want)
{ {
Lck_Lock(&bo->mtx); Lck_Lock(&bo->boc->mtx);
while (1) { while (1) {
if (bo->state >= want) if (bo->boc->state >= want)
break; break;
(void)Lck_CondWait(&bo->cond, &bo->mtx, 0); (void)Lck_CondWait(&bo->boc->cond, &bo->boc->mtx, 0);
} }
Lck_Unlock(&bo->mtx); Lck_Unlock(&bo->boc->mtx);
} }
...@@ -168,7 +168,7 @@ vbf_stp_mkbereq(const struct worker *wrk, struct busyobj *bo) ...@@ -168,7 +168,7 @@ vbf_stp_mkbereq(const struct worker *wrk, struct busyobj *bo)
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
CHECK_OBJ_NOTNULL(bo->req, REQ_MAGIC); CHECK_OBJ_NOTNULL(bo->req, REQ_MAGIC);
assert(bo->state == BOS_INVALID); assert(bo->boc->state == BOS_INVALID);
AZ(bo->storage_hint); AZ(bo->storage_hint);
HTTP_Setup(bo->bereq0, bo->ws, bo->vsl, SLT_BereqMethod); HTTP_Setup(bo->bereq0, bo->ws, bo->vsl, SLT_BereqMethod);
...@@ -221,7 +221,7 @@ vbf_stp_retry(struct worker *wrk, struct busyobj *bo) ...@@ -221,7 +221,7 @@ vbf_stp_retry(struct worker *wrk, struct busyobj *bo)
vfc = bo->vfc; vfc = bo->vfc;
CHECK_OBJ_NOTNULL(vfc, VFP_CTX_MAGIC); CHECK_OBJ_NOTNULL(vfc, VFP_CTX_MAGIC);
assert(bo->state == BOS_REQ_DONE); assert(bo->boc->state == BOS_REQ_DONE);
VSLb_ts_busyobj(bo, "Retry", W_TIM_real(wrk)); VSLb_ts_busyobj(bo, "Retry", W_TIM_real(wrk));
...@@ -281,7 +281,7 @@ vbf_stp_startfetch(struct worker *wrk, struct busyobj *bo) ...@@ -281,7 +281,7 @@ vbf_stp_startfetch(struct worker *wrk, struct busyobj *bo)
HTTP_Setup(bo->beresp, bo->ws, bo->vsl, SLT_BerespMethod); HTTP_Setup(bo->beresp, bo->ws, bo->vsl, SLT_BerespMethod);
assert(bo->state <= BOS_REQ_DONE); assert(bo->boc->state <= BOS_REQ_DONE);
AZ(bo->htc); AZ(bo->htc);
i = VDI_GetHdr(wrk, bo); i = VDI_GetHdr(wrk, bo);
...@@ -451,7 +451,7 @@ vbf_stp_startfetch(struct worker *wrk, struct busyobj *bo) ...@@ -451,7 +451,7 @@ vbf_stp_startfetch(struct worker *wrk, struct busyobj *bo)
return (F_STP_ERROR); return (F_STP_ERROR);
} }
assert(bo->state == BOS_REQ_DONE); assert(bo->boc->state == BOS_REQ_DONE);
if (bo->do_esi) if (bo->do_esi)
bo->do_stream = 0; bo->do_stream = 0;
...@@ -642,9 +642,9 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -642,9 +642,9 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
if (bo->htc->body_status != BS_NONE) if (bo->htc->body_status != BS_NONE)
AZ(VDI_GetBody(bo->wrk, bo)); AZ(VDI_GetBody(bo->wrk, bo));
assert(bo->refcount >= 1); assert(bo->boc->refcount >= 1);
assert(bo->state == BOS_REQ_DONE); assert(bo->boc->state == BOS_REQ_DONE);
if (bo->do_stream) { if (bo->do_stream) {
HSH_Unbusy(wrk, bo->fetch_objcore); HSH_Unbusy(wrk, bo->fetch_objcore);
...@@ -663,7 +663,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -663,7 +663,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
if (bo->vfc->failed) { if (bo->vfc->failed) {
VDI_Finish(bo->wrk, bo); VDI_Finish(bo->wrk, bo);
if (!bo->do_stream) { if (!bo->do_stream) {
assert(bo->state < BOS_STREAM); assert(bo->boc->state < BOS_STREAM);
// XXX: doclose = ? // XXX: doclose = ?
return (F_STP_ERROR); return (F_STP_ERROR);
} else { } else {
...@@ -672,9 +672,9 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -672,9 +672,9 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
} }
if (bo->do_stream) if (bo->do_stream)
assert(bo->state == BOS_STREAM); assert(bo->boc->state == BOS_STREAM);
else { else {
assert(bo->state == BOS_REQ_DONE); assert(bo->boc->state == BOS_REQ_DONE);
HSH_Unbusy(wrk, bo->fetch_objcore); HSH_Unbusy(wrk, bo->fetch_objcore);
} }
...@@ -872,7 +872,7 @@ vbf_stp_fail(struct worker *wrk, struct busyobj *bo) ...@@ -872,7 +872,7 @@ vbf_stp_fail(struct worker *wrk, struct busyobj *bo)
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
CHECK_OBJ_NOTNULL(bo->fetch_objcore, OBJCORE_MAGIC); CHECK_OBJ_NOTNULL(bo->fetch_objcore, OBJCORE_MAGIC);
assert(bo->state < BOS_FINISHED); assert(bo->boc->state < BOS_FINISHED);
HSH_Fail(bo->fetch_objcore); HSH_Fail(bo->fetch_objcore);
if (bo->fetch_objcore->exp_flags & OC_EF_EXP) { if (bo->fetch_objcore->exp_flags & OC_EF_EXP) {
/* Already unbusied - expire it */ /* Already unbusied - expire it */
...@@ -926,7 +926,7 @@ vbf_fetch_thread(struct worker *wrk, void *priv) ...@@ -926,7 +926,7 @@ vbf_fetch_thread(struct worker *wrk, void *priv)
while (stp != F_STP_DONE) { while (stp != F_STP_DONE) {
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
assert(bo->refcount >= 1); assert(bo->boc->refcount >= 1);
switch(stp) { switch(stp) {
#define FETCH_STEP(l, U, arg) \ #define FETCH_STEP(l, U, arg) \
case F_STP_##U: \ case F_STP_##U: \
...@@ -944,7 +944,7 @@ vbf_fetch_thread(struct worker *wrk, void *priv) ...@@ -944,7 +944,7 @@ vbf_fetch_thread(struct worker *wrk, void *priv)
http_Teardown(bo->bereq); http_Teardown(bo->bereq);
http_Teardown(bo->beresp); http_Teardown(bo->beresp);
if (bo->state == BOS_FINISHED) { if (bo->boc->state == BOS_FINISHED) {
AZ(bo->fetch_objcore->flags & OC_F_FAILED); AZ(bo->fetch_objcore->flags & OC_F_FAILED);
HSH_Complete(bo->fetch_objcore); HSH_Complete(bo->fetch_objcore);
VSLb(bo->vsl, SLT_Length, "%ju", VSLb(bo->vsl, SLT_Length, "%ju",
...@@ -992,7 +992,7 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc, ...@@ -992,7 +992,7 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc,
THR_SetBusyobj(bo); THR_SetBusyobj(bo);
bo_fetch = bo; bo_fetch = bo;
bo->refcount = 2; bo->boc->refcount = 2;
oc->busyobj = bo; oc->busyobj = bo;
...@@ -1001,7 +1001,7 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc, ...@@ -1001,7 +1001,7 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc,
if (mode == VBF_PASS) if (mode == VBF_PASS)
bo->do_pass = 1; bo->do_pass = 1;
bo->vary = req->vary_b; bo->boc->vary = req->vary_b;
req->vary_b = NULL; req->vary_b = NULL;
if (mode != VBF_BACKGROUND) if (mode != VBF_BACKGROUND)
...@@ -1034,7 +1034,7 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc, ...@@ -1034,7 +1034,7 @@ VBF_Fetch(struct worker *wrk, struct req *req, struct objcore *oc,
VBO_waitstate(bo, BOS_REQ_DONE); VBO_waitstate(bo, BOS_REQ_DONE);
} else { } else {
VBO_waitstate(bo, BOS_STREAM); VBO_waitstate(bo, BOS_STREAM);
if (bo->state == BOS_FAILED) { if (bo->boc->state == BOS_FAILED) {
AN((oc->flags & OC_F_FAILED)); AN((oc->flags & OC_F_FAILED));
} else { } else {
AZ(bo->fetch_objcore->flags & OC_F_BUSY); AZ(bo->fetch_objcore->flags & OC_F_BUSY);
......
...@@ -397,8 +397,8 @@ HSH_Lookup(struct req *req, struct objcore **ocp, struct objcore **bocp, ...@@ -397,8 +397,8 @@ HSH_Lookup(struct req *req, struct objcore **ocp, struct objcore **bocp,
continue; continue;
if (oc->busyobj != NULL && if (oc->busyobj != NULL &&
oc->busyobj->vary != NULL && oc->busyobj->boc->vary != NULL &&
!VRY_Match(req, oc->busyobj->vary)) !VRY_Match(req, oc->busyobj->boc->vary))
continue; continue;
busy_found = 1; busy_found = 1;
...@@ -720,7 +720,7 @@ HSH_RefBusy(const struct objcore *oc) ...@@ -720,7 +720,7 @@ HSH_RefBusy(const struct objcore *oc)
bo = oc->busyobj; bo = oc->busyobj;
CHECK_OBJ_ORNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_ORNULL(bo, BUSYOBJ_MAGIC);
if (bo != NULL) if (bo != NULL)
bo->refcount++; bo->boc->refcount++;
Lck_Unlock(&oh->mtx); Lck_Unlock(&oh->mtx);
return (bo); return (bo);
} }
......
...@@ -272,10 +272,10 @@ pan_busyobj(struct vsb *vsb, const struct busyobj *bo) ...@@ -272,10 +272,10 @@ pan_busyobj(struct vsb *vsb, const struct busyobj *bo)
VSB_printf(vsb, "busyobj = %p {\n", bo); VSB_printf(vsb, "busyobj = %p {\n", bo);
VSB_indent(vsb, 2); VSB_indent(vsb, 2);
pan_ws(vsb, bo->ws); pan_ws(vsb, bo->ws);
VSB_printf(vsb, "refcnt = %u,\n", bo->refcount); VSB_printf(vsb, "refcnt = %u,\n", bo->boc->refcount);
VSB_printf(vsb, "retries = %d, ", bo->retries); VSB_printf(vsb, "retries = %d, ", bo->retries);
VSB_printf(vsb, "failed = %d, ", bo->vfc->failed); VSB_printf(vsb, "failed = %d, ", bo->vfc->failed);
VSB_printf(vsb, "state = %d,\n", (int)bo->state); VSB_printf(vsb, "state = %d,\n", (int)bo->boc->state);
VSB_printf(vsb, "flags = {"); VSB_printf(vsb, "flags = {");
p = ""; p = "";
/*lint -save -esym(438,p) */ /*lint -save -esym(438,p) */
......
...@@ -199,7 +199,7 @@ cnt_deliver(struct worker *wrk, struct req *req) ...@@ -199,7 +199,7 @@ cnt_deliver(struct worker *wrk, struct req *req)
/* Grab a ref to the bo if there is one, and hand it down */ /* Grab a ref to the bo if there is one, and hand it down */
bo = HSH_RefBusy(req->objcore); bo = HSH_RefBusy(req->objcore);
if (bo != NULL) { if (bo != NULL) {
if (req->esi_level == 0 && bo->state == BOS_FINISHED) { if (req->esi_level == 0 && bo->boc->state == BOS_FINISHED) {
VBO_DerefBusyObj(wrk, &bo); VBO_DerefBusyObj(wrk, &bo);
} else if (!bo->do_stream) { } else if (!bo->do_stream) {
VBO_waitstate(bo, BOS_FINISHED); VBO_waitstate(bo, BOS_FINISHED);
......
...@@ -258,16 +258,16 @@ sml_iterator(struct worker *wrk, struct objcore *oc, ...@@ -258,16 +258,16 @@ sml_iterator(struct worker *wrk, struct objcore *oc,
while (1) { while (1) {
ol = len; ol = len;
nl = VBO_waitlen(wrk, oc, bo, ol); nl = VBO_waitlen(wrk, oc, bo, ol);
if (bo->state == BOS_FAILED) { if (bo->boc->state == BOS_FAILED) {
ret = -1; ret = -1;
break; break;
} }
if (nl == ol) { if (nl == ol) {
if (bo->state == BOS_FINISHED) if (bo->boc->state == BOS_FINISHED)
break; break;
continue; continue;
} }
Lck_Lock(&bo->mtx); Lck_Lock(&bo->boc->mtx);
AZ(VTAILQ_EMPTY(&obj->list)); AZ(VTAILQ_EMPTY(&obj->list));
if (checkpoint == NULL) { if (checkpoint == NULL) {
st = VTAILQ_FIRST(&obj->list); st = VTAILQ_FIRST(&obj->list);
...@@ -300,8 +300,8 @@ sml_iterator(struct worker *wrk, struct objcore *oc, ...@@ -300,8 +300,8 @@ sml_iterator(struct worker *wrk, struct objcore *oc,
st = VTAILQ_NEXT(st, list); st = VTAILQ_NEXT(st, list);
if (st != NULL && st->len == 0) if (st != NULL && st->len == 0)
st = NULL; st = NULL;
Lck_Unlock(&bo->mtx); Lck_Unlock(&bo->boc->mtx);
assert(l > 0 || bo->state == BOS_FINISHED); assert(l > 0 || bo->boc->state == BOS_FINISHED);
if (func(priv, st != NULL ? 0 : 1, p, l)) { if (func(priv, st != NULL ? 0 : 1, p, l)) {
ret = -1; ret = -1;
break; break;
...@@ -375,9 +375,9 @@ sml_getspace(struct worker *wrk, struct objcore *oc, ssize_t *sz, ...@@ -375,9 +375,9 @@ sml_getspace(struct worker *wrk, struct objcore *oc, ssize_t *sz,
if (oc->busyobj != NULL) { if (oc->busyobj != NULL) {
CHECK_OBJ_NOTNULL(oc->busyobj, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(oc->busyobj, BUSYOBJ_MAGIC);
Lck_Lock(&oc->busyobj->mtx); Lck_Lock(&oc->busyobj->boc->mtx);
VTAILQ_INSERT_TAIL(&o->list, st, list); VTAILQ_INSERT_TAIL(&o->list, st, list);
Lck_Unlock(&oc->busyobj->mtx); Lck_Unlock(&oc->busyobj->boc->mtx);
} else { } else {
AN(oc->flags & (OC_F_PRIVATE)); AN(oc->flags & (OC_F_PRIVATE));
VTAILQ_INSERT_TAIL(&o->list, st, list); VTAILQ_INSERT_TAIL(&o->list, st, list);
...@@ -443,9 +443,9 @@ sml_trimstore(struct worker *wrk, struct objcore *oc) ...@@ -443,9 +443,9 @@ sml_trimstore(struct worker *wrk, struct objcore *oc)
if (st->len == 0) { if (st->len == 0) {
if (oc->busyobj != NULL) { if (oc->busyobj != NULL) {
Lck_Lock(&oc->busyobj->mtx); Lck_Lock(&oc->busyobj->boc->mtx);
VTAILQ_REMOVE(&o->list, st, list); VTAILQ_REMOVE(&o->list, st, list);
Lck_Unlock(&oc->busyobj->mtx); Lck_Unlock(&oc->busyobj->boc->mtx);
} else { } else {
VTAILQ_REMOVE(&o->list, st, list); VTAILQ_REMOVE(&o->list, st, list);
} }
...@@ -468,10 +468,10 @@ sml_trimstore(struct worker *wrk, struct objcore *oc) ...@@ -468,10 +468,10 @@ sml_trimstore(struct worker *wrk, struct objcore *oc)
memcpy(st1->ptr, st->ptr, st->len); memcpy(st1->ptr, st->ptr, st->len);
st1->len = st->len; st1->len = st->len;
if (oc->busyobj != NULL) { if (oc->busyobj != NULL) {
Lck_Lock(&oc->busyobj->mtx); Lck_Lock(&oc->busyobj->boc->mtx);
VTAILQ_REMOVE(&o->list, st, list); VTAILQ_REMOVE(&o->list, st, list);
VTAILQ_INSERT_TAIL(&o->list, st1, list); VTAILQ_INSERT_TAIL(&o->list, st1, list);
Lck_Unlock(&oc->busyobj->mtx); Lck_Unlock(&oc->busyobj->boc->mtx);
} else { } else {
VTAILQ_REMOVE(&o->list, st, list); VTAILQ_REMOVE(&o->list, st, list);
VTAILQ_INSERT_TAIL(&o->list, st1, list); VTAILQ_INSERT_TAIL(&o->list, st1, list);
...@@ -480,8 +480,8 @@ sml_trimstore(struct worker *wrk, struct objcore *oc) ...@@ -480,8 +480,8 @@ sml_trimstore(struct worker *wrk, struct objcore *oc)
sml_stv_free(stv, st); sml_stv_free(stv, st);
} else { } else {
/* sml_stable frees this */ /* sml_stable frees this */
AZ(oc->busyobj->stevedore_priv); AZ(oc->busyobj->boc->stevedore_priv);
oc->busyobj->stevedore_priv = st; oc->busyobj->boc->stevedore_priv = st;
} }
} }
...@@ -495,10 +495,10 @@ sml_stable(struct worker *wrk, struct objcore *oc, struct busyobj *bo) ...@@ -495,10 +495,10 @@ sml_stable(struct worker *wrk, struct objcore *oc, struct busyobj *bo)
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC); CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
if (bo->stevedore_priv == NULL) if (bo->boc->stevedore_priv == NULL)
return; return;
CAST_OBJ_NOTNULL(st, bo->stevedore_priv, STORAGE_MAGIC); CAST_OBJ_NOTNULL(st, bo->boc->stevedore_priv, STORAGE_MAGIC);
bo->stevedore_priv = 0; bo->boc->stevedore_priv = 0;
CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
stv = oc->stobj->stevedore; stv = oc->stobj->stevedore;
CHECK_OBJ_NOTNULL(stv, STEVEDORE_MAGIC); CHECK_OBJ_NOTNULL(stv, STEVEDORE_MAGIC);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment