Commit eba445d4 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Restructure to put all the streaming code in one place.

parent ec31b720
......@@ -79,43 +79,102 @@ DOT acceptor -> recv [style=bold,color=green]
* We have a refcounted object on the session, and possibly the busyobj
* which is fetching it, prepare a response.
*
DOT subgraph xcluster_prepresp {
DOT prepresp [
DOT stream [
DOT shape=record
DOT label="{cnt_prepresp:|Filter obj.-\>resp.|{vcl_deliver\{\}|{req.|resp.}}|{error?|restart?}|stream ?}"
DOT label="{cnt_stream:}"
DOT ]
DOT prepresp -> deliver [style=bold,color=green,label=deliver]
DOT prepresp -> deliver [style=bold,color=red]
DOT prepresp -> deliver [style=bold,color=blue]
DOT }
DOT stream:deliver:s -> DONE [style=bold,color=red]
DOT stream:deliver:s -> DONE [style=bold,color=blue]
*
*/
static enum req_fsm_nxt
cnt_prepresp(struct worker *wrk, struct req *req)
cnt_stream(struct worker *wrk, struct req *req)
{
struct busyobj *bo;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
bo = req->busyobj;
CHECK_OBJ_ORNULL(bo, BUSYOBJ_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
CHECK_OBJ_NOTNULL(req->obj, OBJECT_MAGIC);
CHECK_OBJ_NOTNULL(req->vcl, VCL_CONF_MAGIC);
req->res_mode = 0;
if (bo == NULL) {
if (!req->disable_esi && req->obj->esidata != NULL) {
/* In ESI mode, we can't know the aggregate length */
req->res_mode &= ~RES_LEN;
req->res_mode |= RES_ESI;
} else {
req->res_mode |= RES_LEN;
}
AZ(bo->do_esi);
if (wrk->handling == VCL_RET_RESTART) {
AN(bo->do_stream);
assert(req->obj == bo->fetch_obj);
req->obj = NULL;
VBO_DerefBusyObj(wrk, &req->busyobj);
AZ(req->obj);
http_Teardown(req->resp);
req->req_step = R_STP_RESTART;
return (REQ_FSM_MORE);
}
assert(wrk->handling == VCL_RET_DELIVER);
while (bo->state < BOS_FAILED)
(void)usleep(10000);
assert(bo->state >= BOS_FAILED);
if (bo->state == BOS_FAILED) {
(void)HSH_Deref(&wrk->stats, NULL, &req->obj);
VBO_DerefBusyObj(wrk, &req->busyobj);
req->err_code = 503;
req->req_step = R_STP_ERROR;
return (REQ_FSM_MORE);
}
VBO_DerefBusyObj(wrk, &req->busyobj);
AZ(req->busyobj);
RES_WriteObj(req);
/* No point in saving the body if it is hit-for-pass */
if (req->obj->objcore->flags & OC_F_PASS)
STV_Freestore(req->obj);
assert(WRW_IsReleased(wrk));
(void)HSH_Deref(&wrk->stats, NULL, &req->obj);
http_Teardown(req->resp);
return (REQ_FSM_DONE);
}
/*--------------------------------------------------------------------
* Deliver an already stored object
*
DOT deliver [
DOT shape=record
DOT label="{cnt_deliver:|Filter obj.-\>resp.|{vcl_deliver\{\}|{req.|resp.}}|{<stream>deliver(stream)?|restart?|<deliver>deliver?}}"
DOT ]
DOT deliver:deliver:s -> DONE [style=bold,color=green]
DOT deliver:stream:s -> stream [style=bold,color=red]
DOT deliver:stream:s -> stream [style=bold,color=blue]
*
*/
static enum req_fsm_nxt
cnt_deliver(struct worker *wrk, struct req *req)
{
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(req->obj, OBJECT_MAGIC);
CHECK_OBJ_NOTNULL(req->vcl, VCL_CONF_MAGIC);
req->res_mode = 0;
if (!req->disable_esi && req->obj->esidata != NULL) {
AZ(req->busyobj);
/* In ESI mode, we can't know the aggregate length */
req->res_mode &= ~RES_LEN;
req->res_mode |= RES_ESI;
} else {
AZ(bo->do_esi);
req->res_mode |= RES_LEN;
}
if (req->esi_level > 0) {
......@@ -159,75 +218,24 @@ cnt_prepresp(struct worker *wrk, struct req *req)
RES_BuildHttp(req);
VCL_deliver_method(req->vcl, wrk, req, NULL, req->http->ws);
switch (wrk->handling) {
case VCL_RET_DELIVER:
break;
case VCL_RET_RESTART:
if (req->restarts >= cache_param->max_restarts)
break;
if (bo != NULL) {
AN(bo->do_stream);
assert(req->obj == bo->fetch_obj);
req->obj = NULL;
VBO_DerefBusyObj(wrk, &req->busyobj);
} else {
(void)HSH_Deref(&wrk->stats, NULL, &req->obj);
}
/* Stop the insanity before it turns "Hotel California" on us */
if (req->restarts >= cache_param->max_restarts)
wrk->handling = VCL_RET_DELIVER;
if (req->busyobj != NULL) {
req->req_step = R_STP_STREAM;
return (REQ_FSM_MORE);
}
if (wrk->handling == VCL_RET_RESTART) {
(void)HSH_Deref(&wrk->stats, NULL, &req->obj);
AZ(req->obj);
http_Teardown(req->resp);
req->req_step = R_STP_RESTART;
return (REQ_FSM_MORE);
default:
WRONG("Illegal action in vcl_deliver{}");
}
req->req_step = R_STP_DELIVER;
return (REQ_FSM_MORE);
}
/*--------------------------------------------------------------------
* Deliver an already stored object
*
DOT subgraph xcluster_deliver {
DOT deliver [
DOT shape=record
DOT label="{cnt_deliver:|Send body}"
DOT ]
DOT }
DOT deliver -> DONE [style=bold,color=green]
DOT deliver -> DONE [style=bold,color=red]
DOT deliver -> DONE [style=bold,color=blue]
*
*/
static enum req_fsm_nxt
cnt_deliver(struct worker *wrk, struct req *req)
{
struct busyobj *bo;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(req->obj, OBJECT_MAGIC);
bo = req->busyobj;
CHECK_OBJ_ORNULL(bo, BUSYOBJ_MAGIC);
if (bo != NULL) {
while (bo->state < BOS_FAILED)
(void)usleep(10000);
assert(bo->state >= BOS_FAILED);
if (bo->state == BOS_FAILED) {
(void)HSH_Deref(&wrk->stats, NULL, &req->obj);
VBO_DerefBusyObj(wrk, &req->busyobj);
req->err_code = 503;
req->req_step = R_STP_ERROR;
return (REQ_FSM_MORE);
}
VBO_DerefBusyObj(wrk, &req->busyobj);
}
AZ(req->busyobj);
req->director = NULL;
req->restarts = 0;
assert(wrk->handling == VCL_RET_DELIVER);
RES_WriteObj(req);
......@@ -240,6 +248,7 @@ cnt_deliver(struct worker *wrk, struct req *req)
http_Teardown(req->resp);
return (REQ_FSM_DONE);
}
/*--------------------------------------------------------------------
* Emit an error
*
......@@ -249,7 +258,7 @@ DOT shape=record
DOT label="vcl_error()|resp."
DOT ]
DOT ERROR -> vcl_error
DOT vcl_error-> prepresp [label=deliver]
DOT vcl_error-> deliver [label=deliver]
DOT }
DOT vcl_error-> rsterr [label="restart",color=purple]
DOT rsterr [label="RESTART",shape=plaintext]
......@@ -312,15 +321,16 @@ cnt_error(struct worker *wrk, struct req *req)
http_PutResponse(h, http_StatusMessage(req->err_code));
VCL_error_method(req->vcl, wrk, req, NULL, req->http->ws);
if (wrk->handling == VCL_RET_RESTART &&
req->restarts < cache_param->max_restarts) {
/* Stop the insanity before it turns "Hotel California" on us */
if (req->restarts >= cache_param->max_restarts)
wrk->handling = VCL_RET_DELIVER;
if (wrk->handling == VCL_RET_RESTART) {
HSH_Drop(wrk, &req->obj);
VBO_DerefBusyObj(wrk, &req->busyobj);
req->req_step = R_STP_RESTART;
return (REQ_FSM_MORE);
} else if (wrk->handling == VCL_RET_RESTART)
wrk->handling = VCL_RET_DELIVER;
}
/* We always close when we take this path */
req->doclose = SC_TX_ERROR;
......@@ -331,7 +341,7 @@ cnt_error(struct worker *wrk, struct req *req)
req->err_reason = NULL;
http_Teardown(bo->bereq);
VBO_DerefBusyObj(wrk, &req->busyobj);
req->req_step = R_STP_PREPRESP;
req->req_step = R_STP_DELIVER;
return (REQ_FSM_MORE);
}
......@@ -344,8 +354,8 @@ DOT shape=record
DOT label="{cnt_fetch:|start fetch_thread}"
DOT ]
DOT }
DOT fetch -> prepresp [style=bold,color=red]
DOT fetch -> prepresp [style=bold,color=blue]
DOT fetch -> deliver [style=bold,color=red]
DOT fetch -> deliver [style=bold,color=blue]
*/
static enum req_fsm_nxt
......@@ -373,7 +383,7 @@ cnt_fetch(struct worker *wrk, struct req *req)
req->obj = req->busyobj->fetch_obj;
VBO_DerefBusyObj(wrk, &req->busyobj);
assert(WRW_IsReleased(wrk));
req->req_step = R_STP_PREPRESP;
req->req_step = R_STP_DELIVER;
}
return (REQ_FSM_MORE);
}
......@@ -403,7 +413,7 @@ DOT lookup:eb:s -> lookup2 [style=bold,color=green]
DOT lookup:h:s -> lookup2 [style=bold,color=green]
DOT lookup2:pass:s -> pass [style=bold,color=red]
DOT lookup2:fetch:s -> miss [style=bold,color=blue]
DOT lookup2:deliver:s -> prepresp:nw [style=bold,color=green]
DOT lookup2:deliver:s -> deliver:n [style=bold,color=green]
*/
static enum req_fsm_nxt
......@@ -496,7 +506,7 @@ cnt_lookup(struct worker *wrk, struct req *req)
(void)HTTP1_DiscardReqBody(req);// XXX: handle err
}
wrk->stats.cache_hit++;
req->req_step = R_STP_PREPRESP;
req->req_step = R_STP_DELIVER;
return (REQ_FSM_MORE);
case VCL_RET_FETCH:
(void)HSH_Deref(&wrk->stats, NULL, &req->obj);
......
......@@ -44,7 +44,7 @@ REQ_STEP(lookup, LOOKUP, (wrk, req))
REQ_STEP(purge, PURGE, (wrk, req))
REQ_STEP(miss, MISS, (wrk, req))
REQ_STEP(fetch, FETCH, (wrk, req))
REQ_STEP(prepresp, PREPRESP, (wrk, req))
REQ_STEP(stream, STREAM, (wrk, req))
REQ_STEP(deliver, DELIVER, (wrk, req))
REQ_STEP(error, ERROR, (wrk, req))
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment