Commit f4400e0c authored by Martin Blix Grydeland's avatar Martin Blix Grydeland

Allow delivery processors to finish early

A delivery processor may want to finish the delivery early. Change the
return value from VDP_bytes and the delivery processors into 3
categories. 0 means continue, negative is error and positive is
finished.

The latching of errors (to help ESI delivery continue while still
recording the error) will latch the lowest non-zero value observed.

VDP_gunzip is fixed to not unconditionally return errors.

Also add some text to explain the behaviour of VDP_bytes and its
delivery processors.
parent 6c701681
......@@ -602,7 +602,7 @@ struct req {
/* Deliver pipeline */
struct vdp_entry_s vdp;
struct vdp_entry *vdp_nxt;
unsigned vdp_errval;
unsigned vdp_retval;
/* Delivery mode */
unsigned res_mode;
......
......@@ -32,6 +32,22 @@
#include "cache.h"
#include "cache_filter.h"
/* VDP_bytes
*
* Pushes len bytes at ptr down the delivery processor list.
*
* This function picks and calls the next delivery processor from the
* list. The return value is the return value of the delivery
* processor. Upon seeing a non-zero return value, that lowest value
* observed is latched in req->vdp_retval and all subsequent calls to
* VDP_bytes will return that value directly without calling the next
* processor.
*
* Valid return values (of VDP_bytes and any VDP function):
* r < 0: Error, breaks out early on an error condition
* r == 0: Continue
* r > 0: Stop, breaks out early without error condition
*/
int
VDP_bytes(struct req *req, enum vdp_action act, const void *ptr, ssize_t len)
{
......@@ -40,8 +56,8 @@ VDP_bytes(struct req *req, enum vdp_action act, const void *ptr, ssize_t len)
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
assert(act == VDP_NULL || act == VDP_FLUSH);
if (req->vdp_errval)
return (req->vdp_errval);
if (req->vdp_retval)
return (req->vdp_retval);
vdp = req->vdp_nxt;
CHECK_OBJ_NOTNULL(vdp, VDP_ENTRY_MAGIC);
req->vdp_nxt = VTAILQ_NEXT(vdp, list);
......@@ -49,10 +65,10 @@ VDP_bytes(struct req *req, enum vdp_action act, const void *ptr, ssize_t len)
assert(act > VDP_NULL || len > 0);
/* Call the present layer, while pointing to the next layer down */
retval = vdp->func(req, act, &vdp->priv, ptr, len);
if (retval)
req->vdp_errval = retval; /* Latch error value */
if (retval && (req->vdp_retval == 0 || retval < req->vdp_retval))
req->vdp_retval = retval; /* Latch error value */
req->vdp_nxt = vdp;
return (retval);
return (req->vdp_retval);
}
void
......@@ -126,8 +142,12 @@ vdp_objiterator(void *priv, int flush, const void *ptr, ssize_t len)
int
VDP_DeliverObj(struct req *req)
{
int r;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
return (ObjIterate(req->wrk, req->objcore, req, vdp_objiterator,
req->objcore->flags & OC_F_PRIVATE ? 1 : 0));
r = ObjIterate(req->wrk, req->objcore, req, vdp_objiterator,
req->objcore->flags & OC_F_PRIVATE ? 1 : 0);
if (r < 0)
return (r);
return (0);
}
......@@ -347,7 +347,7 @@ VDP_gunzip(struct req *req, enum vdp_action act, void **priv,
return (-1);
if (vg->m_len == vg->m_sz || vr != VGZ_OK) {
if (VDP_bytes(req, VDP_FLUSH, vg->m_buf, vg->m_len))
return (-1);
return (req->vdp_retval);
vg->m_len = 0;
VGZ_Obuf(vg, vg->m_buf, vg->m_sz);
}
......
......@@ -668,6 +668,7 @@ cnt_recv(struct worker *wrk, struct req *req)
req->director_hint = VCL_DefaultDirector(req->vcl);
AN(req->director_hint);
req->vdp_retval = 0;
req->d_ttl = -1;
req->disable_esi = 0;
req->hash_always_miss = 0;
......
......@@ -263,8 +263,8 @@ sml_iterator(struct worker *wrk, struct objcore *oc,
if (boc == NULL) {
VTAILQ_FOREACH_SAFE(st, &obj->list, list, checkpoint) {
AN(st->len);
if (ret == 0 && func(priv, 1, st->ptr, st->len))
ret = -1;
if (ret == 0)
ret = func(priv, 1, st->ptr, st->len);
if (final) {
VTAILQ_REMOVE(&obj->list, st, list);
sml_stv_free(stv, st);
......@@ -329,10 +329,9 @@ sml_iterator(struct worker *wrk, struct objcore *oc,
st = NULL;
Lck_Unlock(&boc->mtx);
assert(l > 0 || boc->state == BOS_FINISHED);
if (func(priv, st != NULL ? final : 1, p, l)) {
ret = -1;
ret = func(priv, st != NULL ? final : 1, p, l);
if (ret)
break;
}
}
HSH_DerefBoc(wrk, oc);
return (ret);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment