Commit 44f6acb4 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Shuffle things a little bit closer to streaming

parent 1b5a1f6d
......@@ -97,6 +97,9 @@ cnt_stream(struct worker *wrk, struct req *req)
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
bo = req->busyobj;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
AN(bo->do_stream);
INCOMPL();
CHECK_OBJ_NOTNULL(req->obj, OBJECT_MAGIC);
CHECK_OBJ_NOTNULL(req->vcl, VCL_CONF_MAGIC);
......@@ -106,7 +109,6 @@ cnt_stream(struct worker *wrk, struct req *req)
AZ(bo->do_esi);
if (wrk->handling == VCL_RET_RESTART) {
AN(bo->do_stream);
assert(req->obj == bo->fetch_obj);
req->obj = NULL;
VBO_DerefBusyObj(wrk, &req->busyobj);
......@@ -223,8 +225,13 @@ cnt_deliver(struct worker *wrk, struct req *req)
wrk->handling = VCL_RET_DELIVER;
if (req->busyobj != NULL) {
req->req_step = R_STP_STREAM;
return (REQ_FSM_MORE);
AN(req->busyobj->do_stream);
/* Don't stream if already finished */
if (req->busyobj->state != BOS_FINISHED) {
req->req_step = R_STP_STREAM;
return (REQ_FSM_MORE);
}
VBO_DerefBusyObj(wrk, &req->busyobj);
}
if (wrk->handling == VCL_RET_RESTART) {
......@@ -254,13 +261,11 @@ cnt_deliver(struct worker *wrk, struct req *req)
DOT subgraph xcluster_error {
DOT vcl_error [
DOT shape=record
DOT label="vcl_error()|resp."
DOT label="{{vcl_error()|resp.}|{<del>deliver?|<restart>restart?}}"
DOT ]
DOT ERROR -> vcl_error
DOT vcl_error-> deliver [label=deliver]
DOT vcl_error:del:s -> deliver [label=deliver]
DOT }
DOT vcl_error-> rsterr [label="restart",color=purple]
DOT rsterr [label="RESTART",shape=plaintext]
*/
static enum req_fsm_nxt
......@@ -350,40 +355,47 @@ cnt_error(struct worker *wrk, struct req *req)
DOT subgraph xcluster_body {
DOT fetch [
DOT shape=record
DOT label="{cnt_fetch:|start fetch_thread}"
DOT label="{cnt_fetch:|wait for fetch|{<ok>OK|<err>Failed}}"
DOT ]
DOT }
DOT fetch -> deliver [style=bold,color=red]
DOT fetch -> deliver [style=bold,color=blue]
DOT fetch:ok:s -> deliver [style=bold,color=red]
DOT fetch:ok:s -> deliver [style=bold,color=blue]
DOT fetch:err:s -> vcl_error
*/
static enum req_fsm_nxt
cnt_fetch(struct worker *wrk, struct req *req)
{
struct busyobj *bo;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
bo = req->busyobj;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
req->acct_req.fetch++;
AN(req->busyobj);
assert(req->busyobj->refcount > 0);
assert(bo->refcount > 0);
(void)HTTP1_DiscardReqBody(req);
while (req->busyobj->state < BOS_FINISHED) {
printf("YYY\n");
(void)usleep(100000);
}
if (req->busyobj->state == BOS_FAILED) {
/* If we are not allowed to stream, don't. */
if (1 || !bo->do_stream) // XXX
VBO_waitstate(bo, BOS_FINISHED);
else
VBO_waitstate(bo, BOS_FETCHING);
if (bo->state == BOS_FAILED) {
VBO_DerefBusyObj(wrk, &req->busyobj);
req->err_code = 503;
req->req_step = R_STP_ERROR;
} else {
assert (req->busyobj->state == BOS_FINISHED);
req->err_code = req->busyobj->err_code;
req->obj = req->busyobj->fetch_obj;
VBO_DerefBusyObj(wrk, &req->busyobj);
assert(WRW_IsReleased(wrk));
req->req_step = R_STP_DELIVER;
return (REQ_FSM_MORE);
}
assert (bo->state >= BOS_FETCHING);
req->err_code = bo->err_code;
req->obj = bo->fetch_obj;
VBO_DerefBusyObj(wrk, &req->busyobj);
assert(WRW_IsReleased(wrk));
req->req_step = R_STP_DELIVER;
return (REQ_FSM_MORE);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment