Commit b5f18785 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Remove the current "ping-pong" streaming code, it is in the way of

the "real streaming" code we are working on.
parent 071f86ba
......@@ -608,15 +608,6 @@ struct req {
#define RES_ESI_CHILD (1<<5)
#define RES_GUNZIP (1<<6)
/* Stream gunzip instance */
struct vgz *stream_vgz;
/* Next byte we will take from storage */
ssize_t stream_next;
/* First byte of storage if we free it as we go (pass) */
ssize_t stream_front;
/* Transaction VSL buffer */
struct vsl_log vsl[1];
......@@ -958,9 +949,6 @@ void WSL_Flush(struct vsl_log *, int overflow);
/* cache_response.c */
void RES_BuildHttp(const struct sess *sp);
void RES_WriteObj(struct sess *sp);
void RES_StreamStart(struct sess *sp);
void RES_StreamEnd(struct sess *sp);
void RES_StreamPoll(struct worker *);
/* cache_vary.c */
struct vsb *VRY_Create(const struct sess *sp, const struct http *hp);
......
......@@ -194,7 +194,6 @@ DOT ]
DOT prepresp -> deliver [style=bold,color=green,label=deliver]
DOT prepresp -> deliver [style=bold,color=red]
DOT prepresp -> deliver [style=bold,color=blue]
DOT prepresp -> streambody [style=bold,color=cyan,label="deliver"]
DOT }
*
*/
......@@ -300,12 +299,7 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req)
default:
WRONG("Illegal action in vcl_deliver{}");
}
if (wrk->busyobj != NULL && wrk->busyobj->do_stream) {
AssertObjCorePassOrBusy(req->obj->objcore);
sp->step = STP_STREAMBODY;
} else {
sp->step = STP_DELIVER;
}
sp->step = STP_DELIVER;
return (0);
}
......@@ -858,13 +852,8 @@ cnt_prepfetch(struct sess *sp, struct worker *wrk, struct req *req)
AssertObjCorePassOrBusy(req->obj->objcore);
if (bo->do_stream) {
sp->step = STP_PREPRESP;
return (0);
} else {
sp->step = STP_FETCHBODY;
return (0);
}
sp->step = STP_FETCHBODY;
return (0);
}
/*--------------------------------------------------------------------
......@@ -924,63 +913,6 @@ cnt_fetchbody(struct sess *sp, struct worker *wrk, struct req *req)
return (0);
}
/*--------------------------------------------------------------------
* Stream the body as we fetch it
DOT subgraph xstreambody {
DOT streambody [
DOT shape=record
DOT label="{cnt_streambody:|ping_pong\nfetch/deliver}"
DOT ]
DOT }
DOT streambody -> DONE [style=bold,color=cyan]
*/
static int
cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
{
int i;
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
RES_StreamStart(sp);
AssertObjCorePassOrBusy(req->obj->objcore);
i = FetchBody(wrk, req->obj);
http_Teardown(wrk->busyobj->bereq);
http_Teardown(wrk->busyobj->beresp);
wrk->busyobj->vfp = NULL;
AZ(wrk->busyobj->vbc);
AN(req->director);
if (!i && req->obj->objcore != NULL) {
EXP_Insert(req->obj);
AN(req->obj->objcore);
AN(req->obj->objcore->ban);
AZ(req->obj->ws_o->overflow);
HSH_Unbusy(req->obj->objcore);
} else {
req->doclose = "Stream error";
}
wrk->acct_tmp.fetch++;
req->director = NULL;
req->restarts = 0;
RES_StreamEnd(sp);
assert(WRW_IsReleased(wrk));
(void)HSH_Deref(&wrk->stats, NULL, &req->obj);
VBO_DerefBusyObj(wrk, &wrk->busyobj);
http_Teardown(req->resp);
sp->step = STP_DONE;
return (0);
}
/*--------------------------------------------------------------------
* A freshly accepted socket
*
......
......@@ -126,8 +126,6 @@ vfp_nop_bytes(struct worker *wrk, struct http_conn *htc, ssize_t bytes)
st->len += wl;
wrk->busyobj->fetch_obj->len += wl;
bytes -= wl;
if (wrk->busyobj->do_stream)
RES_StreamPoll(wrk);
}
return (1);
}
......
......@@ -472,8 +472,6 @@ vfp_gunzip_bytes(struct worker *wrk, struct http_conn *htc, ssize_t bytes)
if (i != VGZ_OK && i != VGZ_END)
return(FetchError(wrk->busyobj, "Gunzip data error"));
wrk->busyobj->fetch_obj->len += dl;
if (wrk->busyobj->do_stream)
RES_StreamPoll(wrk);
}
assert(i == Z_OK || i == Z_STREAM_END);
return (1);
......@@ -554,8 +552,6 @@ vfp_gzip_bytes(struct worker *wrk, struct http_conn *htc, ssize_t bytes)
i = VGZ_Gzip(vg, &dp, &dl, VGZ_NORMAL);
assert(i == Z_OK);
wrk->busyobj->fetch_obj->len += dl;
if (wrk->busyobj->do_stream)
RES_StreamPoll(wrk);
}
return (1);
}
......@@ -584,8 +580,6 @@ vfp_gzip_end(struct worker *wrk)
i = VGZ_Gzip(vg, &dp, &dl, VGZ_FINISH);
wrk->busyobj->fetch_obj->len += dl;
} while (i != Z_STREAM_END);
if (wrk->busyobj->do_stream)
RES_StreamPoll(wrk);
VGZ_UpdateObj(vg, wrk->busyobj->fetch_obj);
if (VGZ_Destroy(&vg) != VGZ_END)
return(FetchError(wrk->busyobj, "Gzip error at the very end"));
......@@ -646,8 +640,6 @@ vfp_testgzip_bytes(struct worker *wrk, struct http_conn *htc, ssize_t bytes)
VGZ_Ibuf(vg, st->ptr + st->len, wl);
st->len += wl;
wrk->busyobj->fetch_obj->len += wl;
if (wrk->busyobj->do_stream)
RES_StreamPoll(wrk);
while (!VGZ_IbufEmpty(vg)) {
VGZ_Obuf(vg, vg->m_buf, vg->m_sz);
......
......@@ -305,115 +305,3 @@ RES_WriteObj(struct sess *sp)
if (WRW_FlushRelease(sp->wrk) && sp->fd >= 0)
SES_Close(sp, "remote closed");
}
/*--------------------------------------------------------------------*/
void
RES_StreamStart(struct sess *sp)
{
struct req *req;
req = sp->req;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
AZ(req->res_mode & RES_ESI_CHILD);
AN(req->wantbody);
AZ(req->stream_vgz);
AZ(req->stream_next);
AZ(req->stream_front);
WRW_Reserve(sp->wrk, &sp->fd);
if (req->res_mode & RES_GUNZIP) {
req->stream_vgz = VGZ_NewUngzip(sp->wrk->vsl, "U S -");
AZ(VGZ_WrwInit(req->stream_vgz));
http_Unset(req->resp, H_Content_Encoding);
}
if (!(req->res_mode & RES_CHUNKED) &&
sp->wrk->busyobj->h_content_length != NULL)
http_PrintfHeader(sp->req->resp,
"Content-Length: %s", sp->wrk->busyobj->h_content_length);
sp->wrk->acct_tmp.hdrbytes +=
http_Write(sp->wrk, sp->req->resp, 1);
if (req->res_mode & RES_CHUNKED)
WRW_Chunked(sp->wrk);
}
void
RES_StreamPoll(struct worker *wrk)
{
struct storage *st;
struct busyobj *bo;
struct req *req;
ssize_t l, l2;
void *ptr;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
bo = wrk->busyobj;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
CHECK_OBJ_NOTNULL(bo->fetch_obj, OBJECT_MAGIC);
req = wrk->sp->req;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
if (bo->fetch_obj->len == req->stream_next)
return;
assert(bo->fetch_obj->len > req->stream_next);
l = req->stream_front;
VTAILQ_FOREACH(st, &bo->fetch_obj->store, list) {
if (st->len + l <= req->stream_next) {
l += st->len;
continue;
}
l2 = st->len + l - req->stream_next;
ptr = st->ptr + (req->stream_next - l);
if (wrk->sp->req->res_mode & RES_GUNZIP)
(void)VGZ_WrwGunzip(wrk, req->stream_vgz, ptr, l2);
else
(void)WRW_Write(wrk, ptr, l2);
l += st->len;
req->stream_next += l2;
}
if (!(wrk->sp->req->res_mode & RES_GUNZIP))
(void)WRW_Flush(wrk);
if (bo->fetch_obj->objcore == NULL ||
(bo->fetch_obj->objcore->flags & OC_F_PASS)) {
/*
* This is a pass object, release storage as soon as we
* have delivered it.
*/
while (1) {
st = VTAILQ_FIRST(&bo->fetch_obj->store);
if (st == NULL ||
req->stream_front + st->len > req->stream_next)
break;
VTAILQ_REMOVE(&bo->fetch_obj->store, st, list);
req->stream_front += st->len;
STV_free(st);
}
}
}
void
RES_StreamEnd(struct sess *sp)
{
struct req *req;
req = sp->req;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
if (req->res_mode & RES_GUNZIP) {
AN(req->stream_vgz);
VGZ_WrwFlush(sp->wrk, req->stream_vgz);
(void)VGZ_Destroy(&req->stream_vgz);
}
if (req->res_mode & RES_CHUNKED && !(req->res_mode & RES_ESI_CHILD))
WRW_EndChunk(sp->wrk);
if (WRW_FlushRelease(sp->wrk))
SES_Close(sp, "remote closed");
req->stream_vgz = NULL;
req->stream_next = 0;
req->stream_front = 0;
}
......@@ -41,7 +41,6 @@ STEP(hit, HIT, (sp, sp->wrk, sp->req))
STEP(fetch, FETCH, (sp, sp->wrk, sp->req))
STEP(prepfetch, PREPFETCH, (sp, sp->wrk, sp->req))
STEP(fetchbody, FETCHBODY, (sp, sp->wrk, sp->req))
STEP(streambody, STREAMBODY, (sp, sp->wrk, sp->req))
STEP(prepresp, PREPRESP, (sp, sp->wrk, sp->req))
STEP(deliver, DELIVER, (sp, sp->wrk, sp->req))
STEP(error, ERROR, (sp, sp->wrk, sp->req))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment