Commit abb29f41 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

This is a megacommit which introduces VFP's: Fetch-Processors.

parent 2d229657
...@@ -123,7 +123,6 @@ struct req; ...@@ -123,7 +123,6 @@ struct req;
struct sess; struct sess;
struct sesspool; struct sesspool;
struct vbc; struct vbc;
struct vef_priv;
struct vrt_backend; struct vrt_backend;
struct vsb; struct vsb;
struct waitinglist; struct waitinglist;
...@@ -265,20 +264,18 @@ struct dstat { ...@@ -265,20 +264,18 @@ struct dstat {
/* Fetch processors --------------------------------------------------*/ /* Fetch processors --------------------------------------------------*/
typedef void vfp_begin_f(struct busyobj *bo, size_t ); enum vfp_status {
typedef int vfp_bytes_f(struct busyobj *bo, struct http_conn *, ssize_t); VFP_ERROR = -1,
typedef int vfp_end_f(struct busyobj *bo); VFP_OK = 0,
VFP_END = 1,
struct vfp {
vfp_begin_f *begin;
vfp_bytes_f *bytes;
vfp_end_f *end;
}; };
typedef enum vfp_status vfp_pull_f(struct busyobj *bo, void *p, ssize_t *len, intptr_t *priv);
extern const struct vfp vfp_gunzip; extern vfp_pull_f vfp_gunzip_pull;
extern const struct vfp vfp_gzip; extern vfp_pull_f vfp_gzip_pull;
extern const struct vfp vfp_testgzip; extern vfp_pull_f vfp_testgunzip_pull;
extern const struct vfp vfp_esi; extern vfp_pull_f vfp_esi_pull;
extern vfp_pull_f vfp_esi_gzip_pull;
/* Deliver processors ------------------------------------------------*/ /* Deliver processors ------------------------------------------------*/
...@@ -545,10 +542,12 @@ struct busyobj { ...@@ -545,10 +542,12 @@ struct busyobj {
unsigned is_gzip; unsigned is_gzip;
unsigned is_gunzip; unsigned is_gunzip;
const struct vfp *vfp; #define N_VFPS 5
struct vep_state *vep; vfp_pull_f *vfps[N_VFPS];
intptr_t vfps_priv[N_VFPS];
int vfp_nxt;
enum busyobj_state_e state; enum busyobj_state_e state;
struct vgz *vgz_rx;
struct ws ws[1]; struct ws ws[1];
struct vbc *vbc; struct vbc *vbc;
...@@ -564,8 +563,6 @@ struct busyobj { ...@@ -564,8 +563,6 @@ struct busyobj {
struct pool_task fetch_task; struct pool_task fetch_task;
struct vef_priv *vef_priv;
unsigned should_close; unsigned should_close;
char *h_content_length; char *h_content_length;
...@@ -859,7 +856,7 @@ void VBO_waitstate(struct busyobj *bo, enum busyobj_state_e want); ...@@ -859,7 +856,7 @@ void VBO_waitstate(struct busyobj *bo, enum busyobj_state_e want);
/* cache_http1_fetch.c [V1F] */ /* cache_http1_fetch.c [V1F] */
int V1F_fetch_hdr(struct worker *wrk, struct busyobj *bo, struct req *req); int V1F_fetch_hdr(struct worker *wrk, struct busyobj *bo, struct req *req);
void V1F_fetch_body(struct busyobj *bo); ssize_t V1F_Setup_Fetch(struct busyobj *bo);
/* cache_http1_fsm.c [HTTP1] */ /* cache_http1_fsm.c [HTTP1] */
typedef int (req_body_iter_f)(struct req *, void *priv, void *ptr, size_t); typedef int (req_body_iter_f)(struct req *, void *priv, void *ptr, size_t);
...@@ -944,10 +941,14 @@ void VBF_Fetch(struct worker *wrk, struct req *req, ...@@ -944,10 +941,14 @@ void VBF_Fetch(struct worker *wrk, struct req *req,
/* cache_fetch_proc.c */ /* cache_fetch_proc.c */
struct storage *VFP_GetStorage(struct busyobj *, ssize_t sz); struct storage *VFP_GetStorage(struct busyobj *, ssize_t sz);
int VFP_Error2(struct busyobj *, const char *error, const char *more); enum vfp_status VFP_Error(struct busyobj *, const char *fmt, ...)
int VFP_Error(struct busyobj *, const char *error); __printflike(2, 3);
void VFP_Init(void); void VFP_Init(void);
extern const struct vfp VFP_nop; void VFP_Fetch_Body(struct busyobj *bo, ssize_t est);
void VFP_Push(struct busyobj *, vfp_pull_f *func, intptr_t priv);
enum vfp_status VFP_Suck(struct busyobj *, void *p, ssize_t *lp);
extern char vfp_init[];
extern char vfp_fini[];
/* cache_gzip.c */ /* cache_gzip.c */
struct vgz; struct vgz;
...@@ -966,7 +967,6 @@ void VGZ_Ibuf(struct vgz *, const void *, ssize_t len); ...@@ -966,7 +967,6 @@ void VGZ_Ibuf(struct vgz *, const void *, ssize_t len);
int VGZ_IbufEmpty(const struct vgz *vg); int VGZ_IbufEmpty(const struct vgz *vg);
void VGZ_Obuf(struct vgz *, void *, ssize_t len); void VGZ_Obuf(struct vgz *, void *, ssize_t len);
int VGZ_ObufFull(const struct vgz *vg); int VGZ_ObufFull(const struct vgz *vg);
int VGZ_ObufStorage(struct busyobj *, struct vgz *vg);
enum vgzret_e VGZ_Gzip(struct vgz *, const void **, size_t *len, enum vgz_flag); enum vgzret_e VGZ_Gzip(struct vgz *, const void **, size_t *len, enum vgz_flag);
enum vgzret_e VGZ_Gunzip(struct vgz *, const void **, size_t *len); enum vgzret_e VGZ_Gunzip(struct vgz *, const void **, size_t *len);
enum vgzret_e VGZ_Destroy(struct vgz **); enum vgzret_e VGZ_Destroy(struct vgz **);
...@@ -1141,6 +1141,7 @@ void VSM_Free(void *ptr); ...@@ -1141,6 +1141,7 @@ void VSM_Free(void *ptr);
#ifdef VSL_ENDMARKER #ifdef VSL_ENDMARKER
void VSL(enum VSL_tag_e tag, uint32_t vxid, const char *fmt, ...) void VSL(enum VSL_tag_e tag, uint32_t vxid, const char *fmt, ...)
__printflike(3, 4); __printflike(3, 4);
void VSLbv(struct vsl_log *, enum VSL_tag_e tag, const char *fmt, va_list va);
void VSLb(struct vsl_log *, enum VSL_tag_e tag, const char *fmt, ...) void VSLb(struct vsl_log *, enum VSL_tag_e tag, const char *fmt, ...)
__printflike(3, 4); __printflike(3, 4);
void VSLbt(struct vsl_log *, enum VSL_tag_e tag, txt t); void VSLbt(struct vsl_log *, enum VSL_tag_e tag, txt t);
......
...@@ -39,8 +39,10 @@ ...@@ -39,8 +39,10 @@
#define VEC_S8 (0x60 + 8) #define VEC_S8 (0x60 + 8)
#define VEC_INCL 'I' #define VEC_INCL 'I'
typedef ssize_t vep_callback_t(struct busyobj *, ssize_t l, enum vgz_flag flg); typedef ssize_t vep_callback_t(struct busyobj *, void *priv, ssize_t l,
enum vgz_flag flg);
void VEP_Init(struct busyobj *, vep_callback_t *cb); struct vep_state *VEP_Init(struct busyobj *, vep_callback_t *cb, void *cb_priv);
void VEP_Parse(const struct busyobj *, const char *p, size_t l); void VEP_Parse(struct vep_state *, const struct busyobj *, const char *p,
struct vsb *VEP_Finish(struct busyobj *); size_t l);
struct vsb *VEP_Finish(struct vep_state *, const struct busyobj *);
This diff is collapsed.
...@@ -63,6 +63,7 @@ struct vep_state { ...@@ -63,6 +63,7 @@ struct vep_state {
struct busyobj *bo; struct busyobj *bo;
int dogzip; int dogzip;
vep_callback_t *cb; vep_callback_t *cb;
void *cb_priv;
/* Internal Counter for default call-back function */ /* Internal Counter for default call-back function */
ssize_t cb_x; ssize_t cb_x;
...@@ -329,7 +330,7 @@ vep_mark_common(struct vep_state *vep, const char *p, enum vep_mark mark) ...@@ -329,7 +330,7 @@ vep_mark_common(struct vep_state *vep, const char *p, enum vep_mark mark)
*/ */
if (vep->last_mark != mark && (vep->o_wait > 0 || vep->startup)) { if (vep->last_mark != mark && (vep->o_wait > 0 || vep->startup)) {
lcb = vep->cb(vep->bo, 0, lcb = vep->cb(vep->bo, vep->cb_priv, 0,
mark == VERBATIM ? VGZ_RESET : VGZ_ALIGN); mark == VERBATIM ? VGZ_RESET : VGZ_ALIGN);
if (lcb - vep->o_last > 0) if (lcb - vep->o_last > 0)
vep_emit_common(vep, lcb - vep->o_last, vep->last_mark); vep_emit_common(vep, lcb - vep->o_last, vep->last_mark);
...@@ -339,7 +340,8 @@ vep_mark_common(struct vep_state *vep, const char *p, enum vep_mark mark) ...@@ -339,7 +340,8 @@ vep_mark_common(struct vep_state *vep, const char *p, enum vep_mark mark)
/* Transfer pending bytes CRC into active mode CRC */ /* Transfer pending bytes CRC into active mode CRC */
if (vep->o_pending) { if (vep->o_pending) {
(void)vep->cb(vep->bo, vep->o_pending, VGZ_NORMAL); (void)vep->cb(vep->bo, vep->cb_priv, vep->o_pending,
VGZ_NORMAL);
if (vep->o_crc == 0) { if (vep->o_crc == 0) {
vep->crc = vep->crcp; vep->crc = vep->crcp;
vep->o_crc = vep->o_pending; vep->o_crc = vep->o_pending;
...@@ -363,7 +365,7 @@ vep_mark_common(struct vep_state *vep, const char *p, enum vep_mark mark) ...@@ -363,7 +365,7 @@ vep_mark_common(struct vep_state *vep, const char *p, enum vep_mark mark)
vep->o_wait += l; vep->o_wait += l;
vep->last_mark = mark; vep->last_mark = mark;
(void)vep->cb(vep->bo, l, VGZ_NORMAL); (void)vep->cb(vep->bo, vep->cb_priv, l, VGZ_NORMAL);
} }
static void static void
...@@ -565,15 +567,14 @@ vep_do_include(struct vep_state *vep, enum dowhat what) ...@@ -565,15 +567,14 @@ vep_do_include(struct vep_state *vep, enum dowhat what)
*/ */
void void
VEP_Parse(const struct busyobj *bo, const char *p, size_t l) VEP_Parse(struct vep_state *vep, const struct busyobj *bo, const char *p,
size_t l)
{ {
struct vep_state *vep;
const char *e; const char *e;
struct vep_match *vm; struct vep_match *vm;
int i; int i;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
vep = bo->vep;
CHECK_OBJ_NOTNULL(vep, VEP_MAGIC); CHECK_OBJ_NOTNULL(vep, VEP_MAGIC);
assert(l > 0); assert(l > 0);
...@@ -1013,29 +1014,27 @@ VEP_Parse(const struct busyobj *bo, const char *p, size_t l) ...@@ -1013,29 +1014,27 @@ VEP_Parse(const struct busyobj *bo, const char *p, size_t l)
*/ */
static ssize_t __match_proto__() static ssize_t __match_proto__()
vep_default_cb(struct busyobj *bo, ssize_t l, enum vgz_flag flg) vep_default_cb(struct busyobj *bo, void *priv, ssize_t l, enum vgz_flag flg)
{ {
struct vep_state *vep; ssize_t *s;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
vep = bo->vep; AN(priv);
CHECK_OBJ_NOTNULL(vep, VEP_MAGIC); s = priv;
assert(vep->bo == bo); *s += l;
(void)flg; (void)flg;
vep->cb_x += l; return (*s);
return (vep->cb_x);
} }
/*--------------------------------------------------------------------- /*---------------------------------------------------------------------
*/ */
void struct vep_state *
VEP_Init(struct busyobj *bo, vep_callback_t *cb) VEP_Init(struct busyobj *bo, vep_callback_t *cb, void *cb_priv)
{ {
struct vep_state *vep; struct vep_state *vep;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
AZ(bo->vep);
vep = (void*)WS_Alloc(bo->ws, sizeof *vep); vep = (void*)WS_Alloc(bo->ws, sizeof *vep);
AN(vep); AN(vep);
...@@ -1044,15 +1043,16 @@ VEP_Init(struct busyobj *bo, vep_callback_t *cb) ...@@ -1044,15 +1043,16 @@ VEP_Init(struct busyobj *bo, vep_callback_t *cb)
vep->bo = bo; vep->bo = bo;
vep->vsb = VSB_new_auto(); vep->vsb = VSB_new_auto();
AN(vep->vsb); AN(vep->vsb);
bo->vep = vep;
if (cb != NULL) { if (cb != NULL) {
vep->dogzip = 1; vep->dogzip = 1;
/* XXX */ /* XXX */
VSB_printf(vep->vsb, "%c", VEC_GZ); VSB_printf(vep->vsb, "%c", VEC_GZ);
vep->cb = cb; vep->cb = cb;
vep->cb_priv = cb_priv;
} else { } else {
vep->cb = vep_default_cb; vep->cb = vep_default_cb;
vep->cb_priv = &vep->cb_x;
} }
vep->state = VEP_START; vep->state = VEP_START;
...@@ -1069,31 +1069,29 @@ VEP_Init(struct busyobj *bo, vep_callback_t *cb) ...@@ -1069,31 +1069,29 @@ VEP_Init(struct busyobj *bo, vep_callback_t *cb)
vep->last_mark = SKIP; vep->last_mark = SKIP;
vep_mark_common(vep, vep->ver_p, VERBATIM); vep_mark_common(vep, vep->ver_p, VERBATIM);
vep->startup = 0; vep->startup = 0;
return (vep);
} }
/*--------------------------------------------------------------------- /*---------------------------------------------------------------------
*/ */
struct vsb * struct vsb *
VEP_Finish(struct busyobj *bo) VEP_Finish(struct vep_state *vep, const struct busyobj *bo)
{ {
struct vep_state *vep;
ssize_t l, lcb; ssize_t l, lcb;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
vep = bo->vep;
CHECK_OBJ_NOTNULL(vep, VEP_MAGIC); CHECK_OBJ_NOTNULL(vep, VEP_MAGIC);
assert(vep->bo == bo); assert(vep->bo == bo);
if (vep->o_pending) if (vep->o_pending)
vep_mark_common(vep, vep->ver_p, vep->last_mark); vep_mark_common(vep, vep->ver_p, vep->last_mark);
if (vep->o_wait > 0) { if (vep->o_wait > 0) {
lcb = vep->cb(vep->bo, 0, VGZ_ALIGN); lcb = vep->cb(vep->bo, vep->cb_priv, 0, VGZ_ALIGN);
vep_emit_common(vep, lcb - vep->o_last, vep->last_mark); vep_emit_common(vep, lcb - vep->o_last, vep->last_mark);
} }
(void)vep->cb(vep->bo, 0, VGZ_FINISH); (void)vep->cb(vep->bo, vep->cb_priv, 0, VGZ_FINISH);
bo->vep = NULL;
AZ(VSB_finish(vep->vsb)); AZ(VSB_finish(vep->vsb));
l = VSB_len(vep->vsb); l = VSB_len(vep->vsb);
if (vep->esi_found && l > 0) if (vep->esi_found && l > 0)
......
...@@ -259,6 +259,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -259,6 +259,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
struct vsb *vary = NULL; struct vsb *vary = NULL;
int varyl = 0; int varyl = 0;
struct object *obj; struct object *obj;
ssize_t est = -1;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
...@@ -309,24 +310,27 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -309,24 +310,27 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
/* But we can't do both at the same time */ /* But we can't do both at the same time */
assert(bo->do_gzip == 0 || bo->do_gunzip == 0); assert(bo->do_gzip == 0 || bo->do_gunzip == 0);
/* ESI takes precedence and handles gzip/gunzip itself */ if (bo->vbc != NULL)
if (bo->do_esi) { est = V1F_Setup_Fetch(bo);
bo->vfp = &vfp_esi;
/* if (bo->do_gunzip || (bo->is_gzip && bo->do_esi)) {
* The one case were we do not weaken Etag is where RFC2616_Weaken_Etag(bo->beresp);
* incoming obj is not gzip'ed and we don't gzip either VFP_Push(bo, vfp_gunzip_pull, 0);
* If we ESI expand it on deliver, we weaken there. }
*/
if (bo->is_gzip || bo->do_gzip | bo->do_gunzip) if (bo->do_esi && bo->do_gzip) {
RFC2616_Weaken_Etag(bo->beresp); VFP_Push(bo, vfp_esi_gzip_pull, 0);
} else if (bo->do_gunzip) { RFC2616_Weaken_Etag(bo->beresp);
bo->vfp = &vfp_gunzip; } else if (bo->do_esi && bo->is_gzip && !bo->do_gunzip) {
VFP_Push(bo, vfp_esi_gzip_pull, 0);
RFC2616_Weaken_Etag(bo->beresp); RFC2616_Weaken_Etag(bo->beresp);
} else if (bo->do_esi) {
VFP_Push(bo, vfp_esi_pull, 0);
} else if (bo->do_gzip) { } else if (bo->do_gzip) {
bo->vfp = &vfp_gzip; VFP_Push(bo, vfp_gzip_pull, 0);
RFC2616_Weaken_Etag(bo->beresp); RFC2616_Weaken_Etag(bo->beresp);
} else if (bo->is_gzip) { } else if (bo->is_gzip && !bo->do_gunzip) {
bo->vfp = &vfp_testgzip; VFP_Push(bo, vfp_testgunzip_pull, 0);
} }
if (bo->fetch_objcore->flags & OC_F_PRIVATE) if (bo->fetch_objcore->flags & OC_F_PRIVATE)
...@@ -437,9 +441,6 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -437,9 +441,6 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
if (bo->do_stream) if (bo->do_stream)
HSH_Unbusy(&wrk->stats, obj->objcore); HSH_Unbusy(&wrk->stats, obj->objcore);
if (bo->vfp == NULL)
bo->vfp = &VFP_nop;
assert(bo->state == BOS_REQ_DONE); assert(bo->state == BOS_REQ_DONE);
VBO_setstate(bo, BOS_FETCHING); VBO_setstate(bo, BOS_FETCHING);
...@@ -455,8 +456,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -455,8 +456,7 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
if (bo->vbc == NULL) if (bo->vbc == NULL)
(void)VFP_Error(bo, "Backend connection gone"); (void)VFP_Error(bo, "Backend connection gone");
else else
V1F_fetch_body(bo); VFP_Fetch_Body(bo, est);
break;
} }
bo->stats = NULL; bo->stats = NULL;
...@@ -471,14 +471,12 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo) ...@@ -471,14 +471,12 @@ vbf_stp_fetch(struct worker *wrk, struct busyobj *bo)
AZ(bo->vbc); AZ(bo->vbc);
} }
bo->vfp = NULL; http_Teardown(bo->bereq);
http_Teardown(bo->beresp);
VSLb(bo->vsl, SLT_Fetch_Body, "%u(%s)", VSLb(bo->vsl, SLT_Fetch_Body, "%u(%s)",
bo->htc.body_status, body_status_2str(bo->htc.body_status)); bo->htc.body_status, body_status_2str(bo->htc.body_status));
http_Teardown(bo->bereq);
http_Teardown(bo->beresp);
if (bo->state == BOS_FAILED) { if (bo->state == BOS_FAILED) {
wrk->stats.fetch_failed++; wrk->stats.fetch_failed++;
} else { } else {
......
...@@ -43,6 +43,9 @@ ...@@ -43,6 +43,9 @@
static unsigned fetchfrag; static unsigned fetchfrag;
char vfp_init[] = "<init>";
char vfp_fini[] = "<fini>";
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* We want to issue the first error we encounter on fetching and * We want to issue the first error we encounter on fetching and
* supress the rest. This function does that. * supress the rest. This function does that.
...@@ -52,113 +55,23 @@ static unsigned fetchfrag; ...@@ -52,113 +55,23 @@ static unsigned fetchfrag;
* For convenience, always return -1 * For convenience, always return -1
*/ */
int enum vfp_status
VFP_Error2(struct busyobj *bo, const char *error, const char *more) VFP_Error(struct busyobj *bo, const char *fmt, ...)
{ {
va_list ap;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
if (bo->state < BOS_FAILED) { if (bo->state < BOS_FAILED) {
if (more == NULL) va_start(ap, fmt);
VSLb(bo->vsl, SLT_FetchError, "%s", error); VSLbv(bo->vsl, SLT_FetchError, fmt, ap);
else va_end(ap);
VSLb(bo->vsl, SLT_FetchError, "%s: %s", error, more);
if (bo->fetch_objcore != NULL) if (bo->fetch_objcore != NULL)
HSH_Fail(bo->fetch_objcore); HSH_Fail(bo->fetch_objcore);
VBO_setstate(bo, BOS_FAILED); VBO_setstate(bo, BOS_FAILED);
} }
return (-1); return (VFP_ERROR);
}
int
VFP_Error(struct busyobj *bo, const char *error)
{
return(VFP_Error2(bo, error, NULL));
}
/*--------------------------------------------------------------------
* VFP_NOP
*
* This fetch-processor does nothing but store the object.
* It also documents the API
*/
/*--------------------------------------------------------------------
* VFP_BEGIN
*
* Called to set up stuff.
*
* 'estimate' is the estimate of the number of bytes we expect to receive,
* as seen on the socket, or zero if unknown.
*/
static void __match_proto__(vfp_begin_f)
vfp_nop_begin(struct busyobj *bo, size_t estimate)
{
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
if (estimate > 0)
(void)VFP_GetStorage(bo, estimate);
}
/*--------------------------------------------------------------------
* VFP_BYTES
*
* Process (up to) 'bytes' from the socket.
*
* Return -1 on error, issue VFP_Error()
* will not be called again, once error happens.
* Return 0 on EOF on socket even if bytes not reached.
* Return 1 when 'bytes' have been processed.
*/
static int __match_proto__(vfp_bytes_f)
vfp_nop_bytes(struct busyobj *bo, struct http_conn *htc, ssize_t bytes)
{
ssize_t l, wl;
struct storage *st;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
while (bytes > 0) {
st = VFP_GetStorage(bo, 0);
if (st == NULL)
return(-1);
l = st->space - st->len;
if (l > bytes)
l = bytes;
wl = HTTP1_Read(htc, st->ptr + st->len, l);
if (wl <= 0)
return (wl);
VBO_extend(bo, wl);
bytes -= wl;
}
return (1);
} }
/*--------------------------------------------------------------------
* VFP_END
*
* Finish & cleanup
*
* Return -1 for error
* Return 0 for OK
*/
static int __match_proto__(vfp_end_f)
vfp_nop_end(struct busyobj *bo)
{
(void)bo;
return (0);
}
const struct vfp VFP_nop = {
.begin = vfp_nop_begin,
.bytes = vfp_nop_bytes,
.end = vfp_nop_end,
};
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* Fetch Storage to put object into. * Fetch Storage to put object into.
* *
...@@ -196,6 +109,160 @@ VFP_GetStorage(struct busyobj *bo, ssize_t sz) ...@@ -196,6 +109,160 @@ VFP_GetStorage(struct busyobj *bo, ssize_t sz)
return (st); return (st);
} }
/**********************************************************************
*/
static enum vfp_status
vfp_call(struct busyobj *bo, int nbr, void *p, ssize_t *lp)
{
AN(bo->vfps[nbr]);
return (bo->vfps[nbr](bo, p, lp, &bo->vfps_priv[nbr]));
}
static void
vfp_suck_fini(struct busyobj *bo)
{
int i;
for (i = 0; i < bo->vfp_nxt; i++) {
if(bo->vfps[i] != NULL)
(void)vfp_call(bo, i, vfp_fini, NULL);
}
}
static enum vfp_status
vfp_suck_init(struct busyobj *bo)
{
enum vfp_status retval = VFP_ERROR;
int i;
for (i = 0; i < bo->vfp_nxt; i++) {
retval = vfp_call(bo, i, vfp_init, NULL);
if (retval != VFP_OK) {
vfp_suck_fini(bo);
break;
}
}
return (retval);
}
/**********************************************************************
* Suck data up from lower levels.
* Once a layer return non VFP_OK, clean it up and produce the same
* return value for any subsequent calls.
*/
enum vfp_status
VFP_Suck(struct busyobj *bo, void *p, ssize_t *lp)
{
enum vfp_status vp;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
AN(p);
AN(lp);
assert(bo->vfp_nxt > 0);
bo->vfp_nxt--;
if (bo->vfps[bo->vfp_nxt] == NULL) {
*lp = 0;
vp = (enum vfp_status)bo->vfps_priv[bo->vfp_nxt];
} else {
vp = vfp_call(bo, bo->vfp_nxt, p, lp);
if (vp != VFP_OK) {
(void)vfp_call(bo, bo->vfp_nxt, vfp_fini, NULL);
bo->vfps[bo->vfp_nxt] = NULL;
bo->vfps_priv[bo->vfp_nxt] = vp;
}
}
bo->vfp_nxt++;
return (vp);
}
/*--------------------------------------------------------------------
*/
void
VFP_Fetch_Body(struct busyobj *bo, ssize_t est)
{
ssize_t l;
enum vfp_status vfps = VFP_ERROR;
struct storage *st = NULL;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
AN(bo->vfp_nxt);
if (est < 0)
est = 0;
if (vfp_suck_init(bo) != VFP_OK) {
(void)VFP_Error(bo, "Fetch Pipeline failed to initialize");
bo->should_close = 1;
return;
}
do {
if (st == NULL) {
l = fetchfrag;
if (l == 0) {
l = est;
est = 0;
}
if (l == 0)
l = cache_param->fetch_chunksize;
st = STV_alloc(bo, l);
if (st == NULL) {
bo->should_close = 1;
/* XXX Close VFP stack */
(void)VFP_Error(bo, "Out of storage");
break;
}
AZ(st->len);
Lck_Lock(&bo->mtx);
VTAILQ_INSERT_TAIL(&bo->fetch_obj->store, st, list);
Lck_Unlock(&bo->mtx);
}
l = st->space - st->len;
vfps = VFP_Suck(bo, st->ptr + st->len, &l);
if (l > 0)
VBO_extend(bo, l);
if (st->len == st->space)
st = NULL;
} while (vfps == VFP_OK);
if (vfps == VFP_ERROR) {
(void)VFP_Error(bo, "Fetch Pipeline failed to process");
bo->should_close = 1;
}
vfp_suck_fini(bo);
/*
* Trim or delete the last segment, if any
*/
st = VTAILQ_LAST(&bo->fetch_obj->store, storagehead);
/* XXX: Temporary: Only trim if we are not streaming */
if (st != NULL && !bo->do_stream) {
/* None of this this is safe under streaming */
if (st->len == 0) {
VTAILQ_REMOVE(&bo->fetch_obj->store, st, list);
STV_free(st);
} else if (st->len < st->space) {
STV_trim(st, st->len, 1);
}
}
}
void
VFP_Push(struct busyobj *bo, vfp_pull_f *func, intptr_t priv)
{
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
bo->vfps_priv[bo->vfp_nxt] = priv;
bo->vfps[bo->vfp_nxt] = func;
bo->vfp_nxt++;
}
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* Debugging aids * Debugging aids
*/ */
......
This diff is collapsed.
...@@ -69,22 +69,35 @@ vbf_fetch_number(const char *nbr, int radix) ...@@ -69,22 +69,35 @@ vbf_fetch_number(const char *nbr, int radix)
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
static int static enum vfp_status __match_proto__(vfp_pull_f)
vbf_fetch_straight(struct busyobj *bo, struct http_conn *htc, ssize_t cl) v1f_pull_straight(struct busyobj *bo, void *p, ssize_t *lp, intptr_t *priv)
{ {
int i; ssize_t l, lr;
assert(htc->body_status == BS_LENGTH);
if (cl < 0) {
return (VFP_Error(bo, "straight length field bogus"));
} else if (cl == 0)
return (0);
i = bo->vfp->bytes(bo, htc, cl); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
if (i <= 0) if (p == vfp_init)
return (VFP_OK);
if (p == vfp_fini)
return (VFP_ERROR);
AN(p);
AN(lp);
AN(priv);
l = *lp;
*lp = 0;
if (!*priv) // XXX: Optimize Content-Len: 0 out earlier
return (VFP_END);
if (*priv < l)
l = *priv;
lr = HTTP1_Read(&bo->htc, p, l);
if (lr <= 0)
return (VFP_Error(bo, "straight insufficient bytes")); return (VFP_Error(bo, "straight insufficient bytes"));
return (0); *lp = lr;
*priv -= lr;
if (*priv == 0)
return (VFP_END);
return (VFP_OK);
} }
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
...@@ -93,29 +106,38 @@ vbf_fetch_straight(struct busyobj *bo, struct http_conn *htc, ssize_t cl) ...@@ -93,29 +106,38 @@ vbf_fetch_straight(struct busyobj *bo, struct http_conn *htc, ssize_t cl)
* XXX: Reading one byte at a time is pretty pessimal. * XXX: Reading one byte at a time is pretty pessimal.
*/ */
static int static enum vfp_status __match_proto__(vfp_pull_f)
vbf_fetch_chunked(struct busyobj *bo, struct http_conn *htc) v1f_pull_chunked(struct busyobj *bo, void *p, ssize_t *lp, intptr_t *priv)
{ {
int i; int i;
char buf[20]; /* XXX: 20 is arbitrary */ char buf[20]; /* XXX: 20 is arbitrary */
unsigned u; unsigned u;
ssize_t cl; ssize_t cl, l, lr;
assert(htc->body_status == BS_CHUNKED); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
do { if (p == vfp_init)
return (VFP_OK);
if (p == vfp_fini)
return (VFP_ERROR);
AN(p);
AN(lp);
AN(priv);
l = *lp;
*lp = 0;
if (*priv == -1) {
/* Skip leading whitespace */ /* Skip leading whitespace */
do { do {
if (HTTP1_Read(htc, buf, 1) <= 0) if (HTTP1_Read(&bo->htc, buf, 1) <= 0)
return (VFP_Error(bo, "chunked read err")); return (VFP_Error(bo, "chunked read err"));
} while (vct_islws(buf[0])); } while (vct_islws(buf[0]));
if (!vct_ishex(buf[0])) if (!vct_ishex(buf[0]))
return (VFP_Error(bo, "chunked header non-hex")); return (VFP_Error(bo, "chunked header non-hex"));
/* Collect hex digits, skipping leading zeros */ /* Collect hex digits, skipping leading zeros */
for (u = 1; u < sizeof buf; u++) { for (u = 1; u < sizeof buf; u++) {
do { do {
if (HTTP1_Read(htc, buf + u, 1) <= 0) if (HTTP1_Read(&bo->htc, buf + u, 1) <= 0)
return (VFP_Error(bo, return (VFP_Error(bo,
"chunked read err")); "chunked read err"));
} while (u == 1 && buf[0] == '0' && buf[u] == '0'); } while (u == 1 && buf[0] == '0' && buf[u] == '0');
...@@ -128,40 +150,98 @@ vbf_fetch_chunked(struct busyobj *bo, struct http_conn *htc) ...@@ -128,40 +150,98 @@ vbf_fetch_chunked(struct busyobj *bo, struct http_conn *htc)
/* Skip trailing white space */ /* Skip trailing white space */
while(vct_islws(buf[u]) && buf[u] != '\n') while(vct_islws(buf[u]) && buf[u] != '\n')
if (HTTP1_Read(htc, buf + u, 1) <= 0) if (HTTP1_Read(&bo->htc, buf + u, 1) <= 0)
return (VFP_Error(bo, "chunked read err")); return (VFP_Error(bo, "chunked read err"));
if (buf[u] != '\n') if (buf[u] != '\n')
return (VFP_Error(bo,"chunked header no NL")); return (VFP_Error(bo,"chunked header no NL"));
buf[u] = '\0'; buf[u] = '\0';
cl = vbf_fetch_number(buf, 16); cl = vbf_fetch_number(buf, 16);
if (cl < 0) if (cl < 0)
return (VFP_Error(bo,"chunked header number syntax")); return (VFP_Error(bo,"chunked header number syntax"));
*priv = cl;
if (cl > 0 && bo->vfp->bytes(bo, htc, cl) <= 0) }
return (VFP_Error(bo, "chunked read err")); if (*priv > 0) {
if (*priv < l)
i = HTTP1_Read(htc, buf, 1); l = *priv;
if (i <= 0) lr = HTTP1_Read(&bo->htc, p, l);
return (VFP_Error(bo, "chunked read err")); if (lr <= 0)
if (buf[0] == '\r' && HTTP1_Read( htc, buf, 1) <= 0) return (VFP_Error(bo, "straight insufficient bytes"));
return (VFP_Error(bo, "chunked read err")); *lp = lr;
if (buf[0] != '\n') *priv -= lr;
return (VFP_Error(bo,"chunked tail no NL")); if (*priv == 0)
} while (cl > 0); *priv = -1;
return (0); return (VFP_OK);
}
AZ(*priv);
i = HTTP1_Read(&bo->htc, buf, 1);
if (i <= 0)
return (VFP_Error(bo, "chunked read err"));
if (buf[0] == '\r' && HTTP1_Read(&bo->htc, buf, 1) <= 0)
return (VFP_Error(bo, "chunked read err"));
if (buf[0] != '\n')
return (VFP_Error(bo,"chunked tail no NL"));
return (VFP_END);
} }
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
static void static enum vfp_status __match_proto__(vfp_pull_f)
vbf_fetch_eof(struct busyobj *bo, struct http_conn *htc) v1f_pull_eof(struct busyobj *bo, void *p, ssize_t *lp, intptr_t *priv)
{ {
ssize_t l, lr;
assert(htc->body_status == BS_EOF); CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
if (bo->vfp->bytes(bo, htc, SSIZE_MAX) < 0) if (p == vfp_init)
(void)VFP_Error(bo,"eof socket fail"); return (VFP_OK);
if (p == vfp_fini)
return (VFP_ERROR);
AN(p);
AN(lp);
AN(priv);
l = *lp;
*lp = 0;
lr = HTTP1_Read(&bo->htc, p, l);
if (lr < 0)
return (VFP_Error(bo,"eof socket fail"));
if (lr == 0)
return (VFP_END);
*lp = lr;
return (VFP_OK);
}
/*--------------------------------------------------------------------
*/
ssize_t
V1F_Setup_Fetch(struct busyobj *bo)
{
struct http_conn *htc;
ssize_t cl;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
htc = &bo->htc;
CHECK_OBJ_NOTNULL(htc, HTTP_CONN_MAGIC);
CHECK_OBJ_NOTNULL(bo->vbc, VBC_MAGIC);
switch(htc->body_status) {
case BS_EOF:
VFP_Push(bo, v1f_pull_eof, 0);
return(-1);
case BS_LENGTH:
cl = vbf_fetch_number(bo->h_content_length, 10);
VFP_Push(bo, v1f_pull_straight, cl);
return (cl);
case BS_CHUNKED:
VFP_Push(bo, v1f_pull_chunked, -1);
return (-1);
default:
break;
}
return (-1);
} }
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
...@@ -267,6 +347,8 @@ V1F_fetch_hdr(struct worker *wrk, struct busyobj *bo, struct req *req) ...@@ -267,6 +347,8 @@ V1F_fetch_hdr(struct worker *wrk, struct busyobj *bo, struct req *req)
HTTP1_Init(htc, bo->ws, vc->fd, vc->vsl, HTTP1_Init(htc, bo->ws, vc->fd, vc->vsl,
cache_param->http_resp_size, cache_param->http_resp_size,
cache_param->http_resp_hdr_len); cache_param->http_resp_hdr_len);
CHECK_OBJ_NOTNULL(htc, HTTP_CONN_MAGIC);
CHECK_OBJ_NOTNULL(&bo->htc, HTTP_CONN_MAGIC);
VTCP_set_read_timeout(vc->fd, vc->first_byte_timeout); VTCP_set_read_timeout(vc->fd, vc->first_byte_timeout);
...@@ -308,81 +390,3 @@ V1F_fetch_hdr(struct worker *wrk, struct busyobj *bo, struct req *req) ...@@ -308,81 +390,3 @@ V1F_fetch_hdr(struct worker *wrk, struct busyobj *bo, struct req *req)
return (0); return (0);
} }
/*--------------------------------------------------------------------
* This function is either called by the requesting thread OR by a
* dedicated body-fetch work-thread.
*
* We get passed the busyobj in the priv arg, and we inherit a
* refcount on it, which we must release, when done fetching.
*/
void
V1F_fetch_body(struct busyobj *bo)
{
struct storage *st;
ssize_t cl;
struct http_conn *htc;
struct object *obj;
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
htc = &bo->htc;
CHECK_OBJ_ORNULL(bo->vbc, VBC_MAGIC);
obj = bo->fetch_obj;
CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC);
CHECK_OBJ_NOTNULL(obj->http, HTTP_MAGIC);
AN(bo->vbc);
assert(bo->state == BOS_FETCHING);
AN(bo->vfp);
AZ(bo->vgz_rx);
assert(VTAILQ_EMPTY(&obj->store));
/* XXX: pick up estimate from objdr ? */
cl = 0;
switch (htc->body_status) {
case BS_LENGTH:
cl = vbf_fetch_number(bo->h_content_length, 10);
bo->vfp->begin(bo, cl);
if (bo->state == BOS_FETCHING && cl > 0)
bo->should_close |= vbf_fetch_straight(bo, htc, cl);
if (bo->vfp->end(bo))
assert(bo->state == BOS_FAILED);
break;
case BS_CHUNKED:
bo->vfp->begin(bo, cl > 0 ? cl : 0);
if (bo->state == BOS_FETCHING)
bo->should_close |= vbf_fetch_chunked(bo, htc);
if (bo->vfp->end(bo))
assert(bo->state == BOS_FAILED);
break;
case BS_EOF:
bo->vfp->begin(bo, cl > 0 ? cl : 0);
if (bo->state == BOS_FETCHING)
vbf_fetch_eof(bo, htc);
bo->should_close = 1;
if (bo->vfp->end(bo))
assert(bo->state == BOS_FAILED);
break;
default:
WRONG("Wrong body_status");
}
AZ(bo->vgz_rx);
/*
* Trim or delete the last segment, if any
*/
st = VTAILQ_LAST(&bo->fetch_obj->store, storagehead);
/* XXX: Temporary: Only trim if we are not streaming */
if (st != NULL && !bo->do_stream) {
/* XXX: is any of this safe under streaming ? */
if (st->len == 0) {
VTAILQ_REMOVE(&bo->fetch_obj->store, st, list);
STV_free(st);
} else if (st->len < st->space) {
STV_trim(st, st->len, 1);
}
}
}
...@@ -296,12 +296,11 @@ VSLbt(struct vsl_log *vsl, enum VSL_tag_e tag, txt t) ...@@ -296,12 +296,11 @@ VSLbt(struct vsl_log *vsl, enum VSL_tag_e tag, txt t)
*/ */
void void
VSLb(struct vsl_log *vsl, enum VSL_tag_e tag, const char *fmt, ...) VSLbv(struct vsl_log *vsl, enum VSL_tag_e tag, const char *fmt, va_list ap)
{ {
char *p; char *p;
const char *u, *f; const char *u, *f;
unsigned n, mlen; unsigned n, mlen;
va_list ap;
txt t; txt t;
AN(fmt); AN(fmt);
...@@ -329,9 +328,7 @@ VSLb(struct vsl_log *vsl, enum VSL_tag_e tag, const char *fmt, ...) ...@@ -329,9 +328,7 @@ VSLb(struct vsl_log *vsl, enum VSL_tag_e tag, const char *fmt, ...)
VSL_Flush(vsl, 1); VSL_Flush(vsl, 1);
p = VSL_DATA(vsl->wlp); p = VSL_DATA(vsl->wlp);
va_start(ap, fmt);
n = vsnprintf(p, mlen, fmt, ap); n = vsnprintf(p, mlen, fmt, ap);
va_end(ap);
if (n > mlen - 1) if (n > mlen - 1)
n = mlen - 1; /* we truncate long fields */ n = mlen - 1; /* we truncate long fields */
p[n++] = '\0'; /* NUL-terminated */ p[n++] = '\0'; /* NUL-terminated */
...@@ -343,6 +340,16 @@ VSLb(struct vsl_log *vsl, enum VSL_tag_e tag, const char *fmt, ...) ...@@ -343,6 +340,16 @@ VSLb(struct vsl_log *vsl, enum VSL_tag_e tag, const char *fmt, ...)
VSL_Flush(vsl, 0); VSL_Flush(vsl, 0);
} }
void
VSLb(struct vsl_log *vsl, enum VSL_tag_e tag, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
VSLbv(vsl, tag, fmt, ap);
va_end(ap);
}
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* Setup a VSL buffer, allocate space if none provided. * Setup a VSL buffer, allocate space if none provided.
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment