Commit fdbb6f03 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Add a stream_context structure to hold the state for streaming

when we need it.
parent 5d5c3453
...@@ -247,7 +247,6 @@ struct exp { ...@@ -247,7 +247,6 @@ struct exp {
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
/* WRW related fields */
struct wrw { struct wrw {
int *wfd; int *wfd;
unsigned werr; /* valid after WRK_Flush() */ unsigned werr; /* valid after WRK_Flush() */
...@@ -259,6 +258,21 @@ struct wrw { ...@@ -259,6 +258,21 @@ struct wrw {
unsigned ciov; /* Chunked header marker */ unsigned ciov; /* Chunked header marker */
}; };
/*--------------------------------------------------------------------*/
struct stream_ctx {
unsigned magic;
#define STREAM_CTX_MAGIC 0x8213728b
#if 0
struct vgz *vgz;
void *obuf;
ssize_t obuf_len;
ssize_t obuf_ptr;
#endif
ssize_t stream_next;
};
/*--------------------------------------------------------------------*/
struct worker { struct worker {
unsigned magic; unsigned magic;
#define WORKER_MAGIC 0x6391adcf #define WORKER_MAGIC 0x6391adcf
...@@ -312,7 +326,7 @@ struct worker { ...@@ -312,7 +326,7 @@ struct worker {
char *h_content_length; char *h_content_length;
/* Stream state */ /* Stream state */
ssize_t stream_next; struct stream_ctx *sctx;
/* ESI stuff */ /* ESI stuff */
struct vep_state *vep; struct vep_state *vep;
......
...@@ -839,6 +839,13 @@ static int ...@@ -839,6 +839,13 @@ static int
cnt_streambody(struct sess *sp) cnt_streambody(struct sess *sp)
{ {
int i; int i;
struct stream_ctx sctx;
memset(&sctx, 0, sizeof sctx);
sctx.magic = STREAM_CTX_MAGIC;
AZ(sp->wrk->sctx);
sp->wrk->sctx = &sctx;
RES_StreamStart(sp); RES_StreamStart(sp);
...@@ -854,6 +861,7 @@ cnt_streambody(struct sess *sp) ...@@ -854,6 +861,7 @@ cnt_streambody(struct sess *sp)
AN(sp->director); AN(sp->director);
if (i) { if (i) {
sp->wrk->sctx = NULL;
HSH_Drop(sp); HSH_Drop(sp);
AZ(sp->obj); AZ(sp->obj);
sp->err_code = 503; sp->err_code = 503;
...@@ -873,6 +881,7 @@ cnt_streambody(struct sess *sp) ...@@ -873,6 +881,7 @@ cnt_streambody(struct sess *sp)
RES_StreamEnd(sp); RES_StreamEnd(sp);
sp->wrk->sctx = NULL;
assert(WRW_IsReleased(sp->wrk)); assert(WRW_IsReleased(sp->wrk));
assert(sp->wrk->wrw.ciov == sp->wrk->wrw.siov); assert(sp->wrk->wrw.ciov == sp->wrk->wrw.siov);
(void)HSH_Deref(sp->wrk, NULL, &sp->obj); (void)HSH_Deref(sp->wrk, NULL, &sp->obj);
......
...@@ -394,6 +394,10 @@ RES_WriteObj(struct sess *sp) ...@@ -394,6 +394,10 @@ RES_WriteObj(struct sess *sp)
void void
RES_StreamStart(struct sess *sp) RES_StreamStart(struct sess *sp)
{ {
struct stream_ctx *sctx;
sctx = sp->wrk->sctx;
CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
AZ(sp->wrk->res_mode & RES_ESI_CHILD); AZ(sp->wrk->res_mode & RES_ESI_CHILD);
AN(sp->wantbody); AN(sp->wantbody);
...@@ -410,31 +414,32 @@ RES_StreamStart(struct sess *sp) ...@@ -410,31 +414,32 @@ RES_StreamStart(struct sess *sp)
if (sp->wrk->res_mode & RES_CHUNKED) if (sp->wrk->res_mode & RES_CHUNKED)
WRW_Chunked(sp->wrk); WRW_Chunked(sp->wrk);
sp->wrk->stream_next = 0;
} }
void void
RES_StreamPoll(const struct sess *sp) RES_StreamPoll(const struct sess *sp)
{ {
struct stream_ctx *sctx;
struct storage *st; struct storage *st;
ssize_t l, l2; ssize_t l, l2;
void *ptr; void *ptr;
if (sp->obj->len == sp->wrk->stream_next) sctx = sp->wrk->sctx;
CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
if (sp->obj->len == sctx->stream_next)
return; return;
assert(sp->obj->len > sp->wrk->stream_next); assert(sp->obj->len > sctx->stream_next);
l = 0; l = 0;
VTAILQ_FOREACH(st, &sp->obj->store, list) { VTAILQ_FOREACH(st, &sp->obj->store, list) {
if (st->len + l <= sp->wrk->stream_next) { if (st->len + l <= sctx->stream_next) {
l += st->len; l += st->len;
continue; continue;
} }
l2 = st->len + l - sp->wrk->stream_next; l2 = st->len + l - sctx->stream_next;
ptr = st->ptr + (sp->wrk->stream_next - l); ptr = st->ptr + (sctx->stream_next - l);
(void)WRW_Write(sp->wrk, ptr, l2); (void)WRW_Write(sp->wrk, ptr, l2);
l += st->len; l += st->len;
sp->wrk->stream_next += l2; sctx->stream_next += l2;
} }
(void)WRW_Flush(sp->wrk); (void)WRW_Flush(sp->wrk);
} }
...@@ -442,6 +447,10 @@ RES_StreamPoll(const struct sess *sp) ...@@ -442,6 +447,10 @@ RES_StreamPoll(const struct sess *sp)
void void
RES_StreamEnd(struct sess *sp) RES_StreamEnd(struct sess *sp)
{ {
struct stream_ctx *sctx;
sctx = sp->wrk->sctx;
CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
if (sp->wrk->res_mode & RES_GUNZIP) { if (sp->wrk->res_mode & RES_GUNZIP) {
INCOMPL(); INCOMPL();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment