Commit e6b43a53 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Slice and dice cache_center to make space for streaming.

NB: no streaming happening yet.
parent 949d5320
......@@ -52,7 +52,7 @@ DOT label="Request received"
DOT ]
DOT ERROR [shape=plaintext]
DOT RESTART [shape=plaintext]
DOT acceptor -> start [style=bold,color=green,weight=4]
DOT acceptor -> start [style=bold,color=green]
*/
#include "config.h"
......@@ -134,8 +134,8 @@ cnt_wait(struct sess *sp)
/*--------------------------------------------------------------------
* We have a refcounted object on the session, now deliver it.
*
DOT subgraph xcluster_deliver {
DOT deliver [
DOT subgraph xcluster_prepresp {
DOT prepresp [
DOT shape=ellipse
DOT label="Filter obj.->resp."
DOT ]
......@@ -143,30 +143,34 @@ DOT vcl_deliver [
DOT shape=record
DOT label="vcl_deliver()|resp."
DOT ]
DOT deliver2 [
DOT shape=ellipse
DOT label="Send resp + body"
DOT ]
DOT deliver -> vcl_deliver [style=bold,color=green,weight=4]
DOT vcl_deliver -> deliver2 [style=bold,color=green,weight=4,label=deliver]
DOT prepresp -> vcl_deliver [style=bold,color=green]
DOT prepresp -> vcl_deliver [style=bold,color=cyan]
DOT prepresp -> vcl_deliver [style=bold,color=red]
DOT prepresp -> vcl_deliver [style=bold,color=blue,]
DOT vcl_deliver -> deliver [style=bold,color=green,label=deliver]
DOT vcl_deliver -> deliver [style=bold,color=red]
DOT vcl_deliver -> deliver [style=bold,color=blue]
DOT vcl_deliver -> errdeliver [label="error"]
DOT errdeliver [label="ERROR",shape=plaintext]
DOT vcl_deliver -> rstdeliver [label="restart",color=purple]
DOT rstdeliver [label="RESTART",shape=plaintext]
DOT vcl_deliver -> streambody [style=bold,color=cyan,label="deliver"]
DOT }
DOT deliver2 -> DONE [style=bold,color=green,weight=4]
*
*/
static int
cnt_deliver(struct sess *sp)
cnt_prepresp(struct sess *sp)
{
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
CHECK_OBJ_NOTNULL(sp->obj, OBJECT_MAGIC);
CHECK_OBJ_NOTNULL(sp->vcl, VCL_CONF_MAGIC);
sp->wrk->res_mode = RES_LEN;
sp->wrk->res_mode = 0;
if (!sp->wrk->do_stream)
sp->wrk->res_mode |= RES_LEN;
if (!sp->disable_esi && sp->obj->esidata != NULL) {
/* In ESI mode, we don't know the aggregate length */
......@@ -190,7 +194,7 @@ cnt_deliver(struct sess *sp)
}
if (!(sp->wrk->res_mode & (RES_LEN|RES_CHUNKED|RES_EOF))) {
if (sp->obj->len == 0)
if (sp->obj->len == 0 && !sp->wrk->do_stream)
/*
* If the object is empty, neither ESI nor GUNZIP
* can make it any different size
......@@ -222,7 +226,12 @@ cnt_deliver(struct sess *sp)
case VCL_RET_RESTART:
if (sp->restarts >= params->max_restarts)
break;
(void)HSH_Deref(sp->wrk, NULL, &sp->obj);
if (sp->wrk->do_stream) {
VDI_CloseFd(sp);
HSH_Drop(sp);
} else {
(void)HSH_Deref(sp->wrk, NULL, &sp->obj);
}
AZ(sp->obj);
sp->restarts++;
sp->director = NULL;
......@@ -234,6 +243,31 @@ cnt_deliver(struct sess *sp)
default:
WRONG("Illegal action in vcl_deliver{}");
}
if (sp->wrk->do_stream)
sp->step = STP_STREAMBODY;
else
sp->step = STP_DELIVER;
return (0);
}
/*--------------------------------------------------------------------
* Deliver an already stored object
*
DOT subgraph xcluster_deliver {
DOT deliver [
DOT shape=ellipse
DOT label="Send body"
DOT ]
DOT }
DOT deliver -> DONE [style=bold,color=green]
DOT deliver -> DONE [style=bold,color=red]
DOT deliver -> DONE [style=bold,color=blue]
*
*/
static int
cnt_deliver(struct sess *sp)
{
sp->director = NULL;
sp->restarts = 0;
......@@ -363,7 +397,7 @@ DOT shape=record
DOT label="vcl_error()|resp."
DOT ]
DOT ERROR -> vcl_error
DOT vcl_error-> deliver [label=deliver]
DOT vcl_error-> prepresp [label=deliver]
DOT }
*/
......@@ -426,7 +460,7 @@ cnt_error(struct sess *sp)
sp->err_code = 0;
sp->err_reason = NULL;
http_Setup(sp->wrk->bereq, NULL);
sp->step = STP_DELIVER;
sp->step = STP_PREPRESP;
return (0);
}
......@@ -442,15 +476,16 @@ DOT vcl_fetch [
DOT shape=record
DOT label="vcl_fetch()|req.\nbereq.\nberesp."
DOT ]
DOT fetch -> vcl_fetch [style=bold,color=blue,weight=2]
DOT fetch -> vcl_fetch [style=bold,color=blue]
DOT fetch -> vcl_fetch [style=bold,color=red]
DOT fetch_pass [
DOT shape=ellipse
DOT label="obj.pass=true"
DOT label="obj.f.pass=true"
DOT ]
DOT vcl_fetch -> fetch_pass [label="hit_for_pass",style=bold,color=red]
DOT }
DOT fetch_pass -> fetchbody [style=bold,color=red]
DOT vcl_fetch -> fetchbody [label="deliver",style=bold,color=blue,weight=2]
DOT vcl_fetch -> fetchbody [label="deliver",style=bold,color=blue]
DOT vcl_fetch -> rstfetch [label="restart",color=purple]
DOT rstfetch [label="RESTART",shape=plaintext]
DOT fetch -> errfetch
......@@ -571,11 +606,19 @@ cnt_fetch(struct sess *sp)
*
DOT subgraph xcluster_body {
DOT fetchbody [
DOT shape=diamond
DOT label="stream ?"
DOT ]
DOT fetchbody2 [
DOT shape=ellipse
DOT label="fetch body\nfrom backend\n"
DOT ]
DOT }
DOT fetchbody -> deliver [style=bold,color=red]
DOT fetchbody -> fetchbody2 [label=no,style=bold,color=red]
DOT fetchbody -> fetchbody2 [style=bold,color=blue]
DOT fetchbody -> prepresp [label=yes,style=bold,color=cyan]
DOT fetchbody2 -> prepresp [style=bold,color=red]
DOT fetchbody2 -> prepresp [style=bold,color=blue]
*/
......@@ -744,6 +787,13 @@ cnt_fetchbody(struct sess *sp)
else
sp->obj->last_modified = sp->wrk->entered;
assert(WRW_IsReleased(sp->wrk));
if (sp->wrk->do_stream) {
sp->step = STP_PREPRESP;
return (0);
}
/* Use unmodified headers*/
i = FetchBody(sp);
......@@ -764,6 +814,52 @@ cnt_fetchbody(struct sess *sp)
return (0);
}
if (sp->obj->objcore != NULL) {
EXP_Insert(sp->obj);
AN(sp->obj->objcore);
AN(sp->obj->objcore->ban);
HSH_Unbusy(sp);
}
sp->acct_tmp.fetch++;
sp->step = STP_PREPRESP;
return (0);
}
/*--------------------------------------------------------------------
* Stream the body as we fetch it
DOT subgraph xstreambody {
DOT streambody [
DOT shape=ellipse
DOT label="streaming\nfetch/deliver"
DOT ]
DOT }
DOT streambody -> DONE [style=bold,color=cyan]
*/
static int
cnt_streambody(struct sess *sp)
{
int i;
/* Use unmodified headers*/
i = FetchBody(sp);
sp->wrk->h_content_length = NULL;
http_Setup(sp->wrk->bereq, NULL);
http_Setup(sp->wrk->beresp, NULL);
sp->wrk->vfp = NULL;
AZ(sp->vbc);
AN(sp->director);
if (i) {
HSH_Drop(sp);
AZ(sp->obj);
sp->err_code = 503;
sp->step = STP_ERROR;
return (0);
}
if (sp->obj->objcore != NULL) {
EXP_Insert(sp->obj);
AN(sp->obj->objcore);
......@@ -821,7 +917,7 @@ DOT err_hit [label="ERROR",shape=plaintext]
DOT hit -> rst_hit [label="restart",color=purple]
DOT rst_hit [label="RESTART",shape=plaintext]
DOT hit -> pass [label=pass,style=bold,color=red]
DOT hit -> deliver [label="deliver",style=bold,color=green,weight=4]
DOT hit -> prepresp [label="deliver",style=bold,color=green]
*/
static int
......@@ -841,7 +937,7 @@ cnt_hit(struct sess *sp)
(void)FetchReqBody(sp);
AZ(sp->wrk->bereq->ws);
AZ(sp->wrk->beresp->ws);
sp->step = STP_DELIVER;
sp->step = STP_PREPRESP;
return (0);
}
......@@ -884,14 +980,14 @@ DOT label="obj in cache ?\ncreate if not"
DOT ]
DOT lookup2 [
DOT shape=diamond
DOT label="obj.pass ?"
DOT label="obj.f.pass ?"
DOT ]
DOT hash -> lookup [label="hash",style=bold,color=green,weight=4]
DOT lookup -> lookup2 [label="yes",style=bold,color=green,weight=4]
DOT hash -> lookup [label="hash",style=bold,color=green]
DOT lookup -> lookup2 [label="yes",style=bold,color=green]
DOT }
DOT lookup2 -> hit [label="no", style=bold,color=green,weight=4]
DOT lookup2 -> hit [label="no", style=bold,color=green]
DOT lookup2 -> pass [label="yes",style=bold,color=red]
DOT lookup -> miss [label="no",style=bold,color=blue,weight=2]
DOT lookup -> miss [label="no",style=bold,color=blue]
*/
static int
......@@ -960,13 +1056,13 @@ DOT vcl_miss [
DOT shape=record
DOT label="vcl_miss()|req.\nbereq."
DOT ]
DOT miss -> vcl_miss [style=bold,color=blue,weight=2]
DOT miss -> vcl_miss [style=bold,color=blue]
DOT }
DOT vcl_miss -> rst_miss [label="restart",color=purple]
DOT rst_miss [label="RESTART",shape=plaintext]
DOT vcl_miss -> err_miss [label="error"]
DOT err_miss [label="ERROR",shape=plaintext]
DOT vcl_miss -> fetch [label="fetch",style=bold,color=blue,weight=2]
DOT vcl_miss -> fetch [label="fetch",style=bold,color=blue]
DOT vcl_miss -> pass [label="pass",style=bold,color=red]
DOT
*/
......@@ -1145,7 +1241,7 @@ DOT recv -> pipe [label="pipe",style=bold,color=orange]
DOT recv -> pass2 [label="pass",style=bold,color=red]
DOT recv -> err_recv [label="error"]
DOT err_recv [label="ERROR",shape=plaintext]
DOT recv -> hash [label="lookup",style=bold,color=green,weight=4]
DOT recv -> hash [label="lookup",style=bold,color=green]
*/
static int
......@@ -1241,7 +1337,7 @@ cnt_recv(struct sess *sp)
* Handle a request, wherever it came from recv/restart.
*
DOT start [shape=box,label="Dissect request"]
DOT start -> recv [style=bold,color=green,weight=4]
DOT start -> recv [style=bold,color=green]
*/
static int
......
......@@ -41,6 +41,8 @@ STEP(miss, MISS)
STEP(hit, HIT)
STEP(fetch, FETCH)
STEP(fetchbody, FETCHBODY)
STEP(streambody,STREAMBODY)
STEP(prepresp, PREPRESP)
STEP(deliver, DELIVER)
STEP(error, ERROR)
STEP(done, DONE)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment