adjust to changed VDP interface

parent 7aafb824
......@@ -2,23 +2,23 @@
#include <stdio.h>
#include "vtim.h"
#define Debug(fmt, ...) printf(fmt, __VA_ARGS__)
#define VSLdbgv(req, fmt, ...) \
#define VSLdbgv(tng, fmt, ...) \
do { \
VSL(SLT_Debug, 0, "xid=%u t=%.6f " fmt, \
VXID(req->vsl->wid), VTIM_real(), __VA_ARGS__); \
VSLb(req->vsl, SLT_Debug, fmt, __VA_ARGS__); \
VXID((tng)->vsl->wid), VTIM_real(), __VA_ARGS__); \
VSLb((tng)->vsl, SLT_Debug, fmt, __VA_ARGS__); \
} while(0)
#define VSLdbg(req, msg) \
#define VSLdbg(tng, msg) \
do { \
VSL(SLT_Debug, 0, "xid=%u t=%.6f " msg, \
VXID(req->vsl->wid), VTIM_real()); \
VSLb(req->vsl, SLT_Debug, msg); \
VXID((tng)->vsl->wid), VTIM_real()); \
VSLb((tng)->vsl, SLT_Debug, msg); \
} while(0)
#define VSL0dbg(...) VSL(SLT_Debug, 0, __VA_ARGS__)
#else
#define VSLdbgv(req, fmt, ...) (void)0
#define VSLdbg(req, msg) (void)0
#define VSLdbgv(tng, fmt, ...) (void)0
#define VSLdbg(tng, msg) (void)0
#define VSL0dbg(...) (void)0
#define Debug(fmt, ...) /**/
#endif
......@@ -77,7 +77,7 @@ vped_pretend_gzip_fini(struct req *req, void **priv)
}
static int v_matchproto_(vdp_bytes_f)
vped_pretend_gzip_bytes(struct req *req, enum vdp_action act, void **priv,
vped_pretend_gzip_bytes(struct vdp_ctx *vdx, enum vdp_action act, void **priv,
const void *pv, ssize_t l)
{
uint8_t buf1[5], buf2[5];
......@@ -85,11 +85,12 @@ vped_pretend_gzip_bytes(struct req *req, enum vdp_action act, void **priv,
uint16_t lx;
struct nexus_gzip *gz;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(vdx, VDP_CTX_MAGIC);
CHECK_OBJ_NOTNULL(vdx->req, REQ_MAGIC);
CAST_OBJ_NOTNULL(gz, *priv, NEXUS_GZIP_MAGIC);
if (l == 0)
return (VDP_bytes(req, act, pv, l));
return (VDP_bytes(vdx, act, pv, l));
p = pv;
......@@ -105,23 +106,23 @@ vped_pretend_gzip_bytes(struct req *req, enum vdp_action act, void **priv,
while (l > 0) {
if (l >= 65535) {
lx = 65535;
if (VDP_bytes(req, VDP_NULL, buf1, sizeof buf1))
if (VDP_bytes(vdx, VDP_NULL, buf1, sizeof buf1))
return (-1);
} else {
lx = (uint16_t)l;
buf2[0] = 0;
vle16enc(buf2 + 1, lx);
vle16enc(buf2 + 3, ~lx);
if (VDP_bytes(req, VDP_NULL, buf2, sizeof buf2))
if (VDP_bytes(vdx, VDP_NULL, buf2, sizeof buf2))
return (-1);
}
if (VDP_bytes(req, VDP_NULL, p, lx))
if (VDP_bytes(vdx, VDP_NULL, p, lx))
return (-1);
l -= lx;
p += lx;
}
/* buf1 & buf2 is local, have to flush */
return (VDP_bytes(req, VDP_FLUSH, NULL, 0));
return (VDP_bytes(vdx, VDP_FLUSH, NULL, 0));
}
const struct vdp vped_pretend_gz = {
......@@ -179,7 +180,7 @@ vped_gzgz_init(struct req *req, void **priv)
* thus we need to flush at the end of each call
*/
static int v_matchproto_(vdp_bytes_f)
vped_gzgz_bytes(struct req *req, enum vdp_action act, void **priv,
vped_gzgz_bytes(struct vdp_ctx *vdx, enum vdp_action act, void **priv,
const void *ptr, ssize_t len)
{
struct vped_gzgz_priv *foo;
......@@ -207,7 +208,7 @@ vped_gzgz_bytes(struct req *req, enum vdp_action act, void **priv,
if (dl > 0) {
if (dl > len)
dl = len;
if (VDP_bytes(req, act, pp, dl))
if (VDP_bytes(vdx, act, pp, dl))
return(-1);
foo->ll += dl;
len -= dl;
......@@ -218,7 +219,7 @@ vped_gzgz_bytes(struct req *req, enum vdp_action act, void **priv,
/* Remove the "LAST" bit */
foo->dbits[0] = *pp;
foo->dbits[0] &= ~(1U << (foo->last & 7));
if (VDP_bytes(req, VDP_NULL, foo->dbits, 1))
if (VDP_bytes(vdx, VDP_NULL, foo->dbits, 1))
return (-1);
foo->ll++;
len--;
......@@ -230,7 +231,7 @@ vped_gzgz_bytes(struct req *req, enum vdp_action act, void **priv,
if (dl > 0) {
if (dl > len)
dl = len;
if (VDP_bytes(req, act, pp, dl))
if (VDP_bytes(vdx, act, pp, dl))
return (-1);
foo->ll += dl;
len -= dl;
......@@ -292,7 +293,7 @@ vped_gzgz_bytes(struct req *req, enum vdp_action act, void **priv,
default:
WRONG("compiler must be broken");
}
if (VDP_bytes(req, VDP_NULL, foo->dbits + 1, foo->lpad))
if (VDP_bytes(vdx, VDP_NULL, foo->dbits + 1, foo->lpad))
return (-1);
}
if (len > 0) {
......@@ -313,7 +314,7 @@ vped_gzgz_bytes(struct req *req, enum vdp_action act, void **priv,
}
}
assert(len == 0);
return (VDP_bytes(req, VDP_FLUSH, NULL, 0));
return (VDP_bytes(vdx, VDP_FLUSH, NULL, 0));
}
static int v_matchproto_(vdp_fini_f)
......
/*-
* Copyright 2019 UPLEX Nils Goroll Systemoptimierung
* Copyright 2019-2020 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
......@@ -49,7 +49,7 @@ req_fini(struct req **reqp, struct worker *wrk)
VSLdbg(req, "req_fini called");
VDP_close(req);
req->acct.resp_bodybytes += VDP_Close(req);
Req_Cleanup(req->sp, wrk, req);
Req_Release(req);
}
......@@ -57,9 +57,9 @@
/* ============================================================
* node finalizers
*/
static void fini_final(struct req *, struct node *);
static void fini_subreq(struct req *, struct node *);
static void fini_data(struct req *, struct node *);
static void fini_final(struct vdp_ctx *, struct node *);
static void fini_subreq(struct vdp_ctx *, struct node *);
static void fini_data(struct vdp_ctx *, struct node *);
/* ============================================================
* tuneables which we may or may not want to expose
......@@ -230,17 +230,17 @@ node_alloc(struct pesi *pesi)
}
static void
node_fini(struct req *req, struct node *node)
node_fini(struct vdp_ctx *vdx, struct node *node)
{
switch (node->type) {
case T_FINAL:
fini_final(req, node);
fini_final(vdx, node);
break;
case T_SUBREQ:
fini_subreq(req, node);
fini_subreq(vdx, node);
break;
case T_DATA:
fini_data(req, node);
fini_data(vdx, node);
break;
case T_CRC:
break;
......@@ -249,7 +249,7 @@ node_fini(struct req *req, struct node *node)
if (node->parent == NULL)
break;
assert(VSTAILQ_EMPTY(&node->nexus.children));
req_fini(&node->nexus.req, req->wrk);
req_fini(&node->nexus.req, vdx->wrk);
break;
default:
INCOMPL();
......@@ -257,7 +257,7 @@ node_fini(struct req *req, struct node *node)
}
void
tree_prune(struct req *req, struct node *node)
tree_prune(struct vdp_ctx *vdx, struct node *node)
{
struct node *child;
......@@ -265,7 +265,7 @@ tree_prune(struct req *req, struct node *node)
while ((child = VSTAILQ_FIRST(&node->nexus.children)) != NULL) {
VSTAILQ_REMOVE_HEAD(&node->nexus.children, sibling);
tree_free(req, child);
tree_free(vdx, child);
}
node->state = ST_PRUNED;
......@@ -273,12 +273,12 @@ tree_prune(struct req *req, struct node *node)
/* recursively free all nodes == free the tree */
void
tree_free(struct req *req, struct node *node)
tree_free(struct vdp_ctx *vdx, struct node *node)
{
if (node->type == T_NEXUS)
tree_prune(req, node);
tree_prune(vdx, node);
node_fini(req, node);
node_fini(vdx, node);
switch (node->allocator) {
case NA_WS:
......@@ -563,9 +563,9 @@ subreq_fixup(struct node *node, struct req *req)
* must be idempotent
*/
static void
fini_final(struct req *req, struct node *node)
fini_final(struct vdp_ctx *vdx, struct node *node)
{
(void) req;
(void) vdx;
assert(node->type == T_FINAL);
......@@ -594,12 +594,16 @@ fini_final(struct req *req, struct node *node)
}
static void
fini_subreq(struct req *req, struct node *node)
fini_subreq(struct vdp_ctx *vdx, struct node *node)
{
struct req *subreq;
struct req *req, *subreq;
struct boc *boc = NULL;
struct pesi *pesi;
CHECK_OBJ_NOTNULL(vdx, VDP_CTX_MAGIC);
req = vdx->req;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
assert(node->type == T_SUBREQ);
AN(node->subreq.done);
......@@ -618,27 +622,28 @@ fini_subreq(struct req *req, struct node *node)
TAKE_OBJ_NOTNULL(boc, &node->subreq.boc, BOC_MAGIC);
AZ(node->subreq.oc);
VDP_close(subreq); // need before pesi_destroy
// need before pesi_destroy
req->acct.resp_bodybytes += VDP_Close(subreq);
/* bottom of cnt_transmit() */
HSH_Cancel(req->wrk, subreq->objcore, boc);
HSH_Cancel(vdx->wrk, subreq->objcore, boc);
if (boc != NULL)
HSH_DerefBoc(req->wrk, subreq->objcore);
HSH_DerefBoc(vdx->wrk, subreq->objcore);
(void)HSH_DerefObjCore(req->wrk, &subreq->objcore, 0);
(void)HSH_DerefObjCore(vdx->wrk, &subreq->objcore, 0);
TAKE_OBJ_NOTNULL(pesi, &subreq->transport_priv, PESI_MAGIC);
pesi_destroy(&pesi);
req_fini(&subreq, req->wrk);
req_fini(&subreq, vdx->wrk);
AZ(pthread_cond_destroy(&node->subreq.cond));
}
static void
fini_data(struct req *req, struct node *node)
fini_data(struct vdp_ctx *vdx, struct node *node)
{
(void) req;
(void) vdx;
assert(node->type == T_DATA);
......@@ -655,10 +660,10 @@ fini_data(struct req *req, struct node *node)
*/
static int
push_final(struct req *req, struct bytes_tree *tree,
push_final(struct vdp_ctx *vdx, struct bytes_tree *tree,
struct node *node, const struct node *next)
{
(void) req;
(void) vdx;
(void) tree;
(void) next;
......@@ -681,7 +686,7 @@ push_final(struct req *req, struct bytes_tree *tree,
}
static int
push_subreq(struct req *req, struct bytes_tree *tree,
push_subreq(struct vdp_ctx *vdx, struct bytes_tree *tree,
struct node *node, const struct node *next)
{
assert(node->type == T_SUBREQ);
......@@ -697,13 +702,13 @@ push_subreq(struct req *req, struct bytes_tree *tree,
Lck_Unlock(&tree->tree_lock);
}
subreq_fixup(node, req);
subreq_fixup(node, vdx->req);
AZ(node->subreq.req->objcore->flags & OC_F_FINAL);
VSLdbg(req, "DeliverObj from top");
VSLdbg(vdx, "DeliverObj from top");
VDP_DeliverObj(node->subreq.req);
VDP_close(node->subreq.req);
vdx->req->acct.resp_bodybytes += VDP_Close(node->subreq.req);
// fini_subreq() happening later
......@@ -739,7 +744,7 @@ gzip_tailbuf(uint8_t tailbuf[GZIP_TAILBUF_SZ], struct nexus_gzip *gz)
}
static int
push_crc(struct req *req, struct bytes_tree *tree,
push_crc(struct vdp_ctx *vdx, struct bytes_tree *tree,
struct node *node, const struct node *next)
{
struct node *nex;
......@@ -748,7 +753,7 @@ push_crc(struct req *req, struct bytes_tree *tree,
assert(node->type == T_CRC);
(void) req;
(void) vdx;
(void) tree;
nex = node->parent;
......@@ -758,7 +763,7 @@ push_crc(struct req *req, struct bytes_tree *tree,
switch(node->crc.ctype) {
case GZIP_HDR:
return (VDP_bytes(nex->nexus.req,
return (VDP_bytes(nex->nexus.req->vdc,
VDP_NULL, gzip_hdr, sizeof gzip_hdr));
case UPDATE:
nex->nexus.gzip.crc = crc32_combine(
......@@ -778,7 +783,7 @@ push_crc(struct req *req, struct bytes_tree *tree,
parent_gzip->l_crc += nex->nexus.gzip.l_crc;
} else {
gzip_tailbuf(tailbuf, &nex->nexus.gzip);
return (VDP_bytes(nex->nexus.req,
return (VDP_bytes(nex->nexus.req->vdc,
VDP_FLUSH, tailbuf, sizeof(tailbuf)));
}
break;
......@@ -791,7 +796,7 @@ push_crc(struct req *req, struct bytes_tree *tree,
static int
push_data(struct req *req, struct bytes_tree *tree,
push_data(struct vdp_ctx *vdx, struct bytes_tree *tree,
struct node *node, const struct node *next)
{
const void *p;
......@@ -800,7 +805,7 @@ push_data(struct req *req, struct bytes_tree *tree,
assert(node->type == T_DATA);
(void) tree;
(void) req;
(void) vdx;
AN(node->parent);
assert(node->parent->type == T_NEXUS);
......@@ -820,7 +825,7 @@ push_data(struct req *req, struct bytes_tree *tree,
if (p == NULL || node->data.len == 0)
return (0);
return (VDP_bytes(node->parent->nexus.req, act, p, node->data.len));
return (VDP_bytes(node->parent->nexus.req->vdc, act, p, node->data.len));
}
/*
......@@ -829,7 +834,7 @@ push_data(struct req *req, struct bytes_tree *tree,
* returns VDP_bytes upon any error
*/
static int
worklist_push(struct req *req, struct bytes_tree *tree,
worklist_push(struct vdp_ctx *vdx, struct bytes_tree *tree,
struct node_head *work)
{
struct node *node, *next, *parent;
......@@ -844,9 +849,9 @@ worklist_push(struct req *req, struct bytes_tree *tree,
if (parent != NULL) {
assert (parent->type == T_NEXUS);
if (parent->nexus.req->wrk == NULL)
parent->nexus.req->wrk = req->wrk;
parent->nexus.req->wrk = vdx->wrk;
else
assert (parent->nexus.req->wrk == req->wrk);
assert (parent->nexus.req->wrk == vdx->wrk);
}
// not using assert_node: it needs locking
......@@ -854,16 +859,16 @@ worklist_push(struct req *req, struct bytes_tree *tree,
switch (node->type) {
case T_FINAL:
retval = push_final(req, tree, node, next);
retval = push_final(vdx, tree, node, next);
break;
case T_SUBREQ:
retval = push_subreq(req, tree, node, next);
retval = push_subreq(vdx, tree, node, next);
break;
case T_DATA:
retval = push_data(req, tree, node, next);
retval = push_data(vdx, tree, node, next);
break;
case T_CRC:
retval = push_crc(req, tree, node, next);
retval = push_crc(vdx, tree, node, next);
break;
default:
INCOMPL();
......@@ -878,13 +883,14 @@ worklist_push(struct req *req, struct bytes_tree *tree,
else if (parent->state < ST_CLOSED)
assert(next == NULL);
else
VDP_close(parent->nexus.req);
vdx->req->acct.resp_bodybytes +=
VDP_Close(parent->nexus.req);
}
node = next;
}
if (retval) {
(void) VDP_bytes(req, VDP_FLUSH, NULL, 0);
(void) VDP_bytes(vdx, VDP_FLUSH, NULL, 0);
return (retval);
}
......@@ -896,14 +902,14 @@ worklist_push(struct req *req, struct bytes_tree *tree,
VSTAILQ_FOREACH(node, work, unpend) {
assert(node->type != T_NEXUS);
node_fini(req, node);
node_fini(vdx, node);
}
return (retval);
}
static void
worklist_set_delivered(struct req *req, struct bytes_tree *tree,
worklist_set_delivered(struct vdp_ctx *vdx, struct bytes_tree *tree,
struct node_head *work)
{
struct node *node, *parent, *tmp;
......@@ -926,8 +932,8 @@ worklist_set_delivered(struct req *req, struct bytes_tree *tree,
parent->state = ST_PRUNED;
Lck_Unlock(&tree->tree_lock);
tree_prune(req, parent);
node_fini(req, parent);
tree_prune(vdx, parent);
node_fini(vdx, parent);
Lck_Lock(&tree->tree_lock);
}
......@@ -943,7 +949,7 @@ worklist_set_delivered(struct req *req, struct bytes_tree *tree,
} while(0)
static void
worklist_unpend(struct req *req, struct bytes_tree *tree,
worklist_unpend(struct vdp_ctx *vdx, struct bytes_tree *tree,
struct node_head *work)
{
struct node *node, *next;
......@@ -951,13 +957,13 @@ worklist_unpend(struct req *req, struct bytes_tree *tree,
(void) check; // for NODEBUG
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(vdx, VDP_CTX_MAGIC);
CHECK_OBJ_NOTNULL(tree, BYTES_TREE_MAGIC);
AN(work);
assert(VSTAILQ_EMPTY(work));
Lck_AssertHeld(&tree->tree_lock);
VSLdbgv(req, "bytes_unpend: retval=%d front=%p root=%p",
VSLdbgv(vdx->vsl, "bytes_unpend: retval=%d front=%p root=%p",
tree->retval, tree->front, tree->root);
if (tree->retval)
return;
......@@ -967,7 +973,7 @@ worklist_unpend(struct req *req, struct bytes_tree *tree,
if (tree->root->state == ST_DELIVERED ||
tree->root->state == ST_PRUNED) {
VSLdbg(req, "bytes_unpend: whole tree is delivered");
VSLdbg(vdx, "bytes_unpend: whole tree is delivered");
assert_node(tree->root, CHK_DELI);
return;
}
......@@ -982,7 +988,7 @@ worklist_unpend(struct req *req, struct bytes_tree *tree,
*/
AN(tree->front);
if (tree->front_owner == req->wrk)
if (tree->front_owner == vdx->wrk)
node = tree->front;
else
node = tree->root;
......@@ -995,7 +1001,7 @@ worklist_unpend(struct req *req, struct bytes_tree *tree,
while (node->state >= ST_UNPENDING) {
assert_node(node, check);
VSLdbgv(req, "bytes_unpend: delivered/unpending "
VSLdbgv(vdx, "bytes_unpend: delivered/unpending "
"node %p state %u", node, node->state);
next = VSTAILQ_NEXT(node, sibling);
......@@ -1005,10 +1011,10 @@ worklist_unpend(struct req *req, struct bytes_tree *tree,
}
if (node->parent && node->parent->state == ST_OPEN) {
VSLdbgv(req, "bytes_unpend: return at "
VSLdbgv(vdx, "bytes_unpend: return at "
"node %p state %u - await sibling",
node, node->state);
set_front(tree, node, req->wrk);
set_front(tree, node, vdx->wrk);
return;
}
......@@ -1027,16 +1033,16 @@ worklist_unpend(struct req *req, struct bytes_tree *tree,
if (node != NULL)
continue;
VSLdbg(req, "bytes_unpend: reached root");
set_front(tree, tree->root, req->wrk);
VSLdbg(vdx, "bytes_unpend: reached root");
set_front(tree, tree->root, vdx->wrk);
return;
}
if (node->state == ST_PRIVATE) {
VSLdbgv(req, "bytes_unpend: return at "
VSLdbgv(vdx, "bytes_unpend: return at "
"node %p state %u - private",
node, node->state);
set_front(tree, node, req->wrk);
set_front(tree, node, vdx->wrk);
return;
}
......@@ -1053,9 +1059,9 @@ worklist_unpend(struct req *req, struct bytes_tree *tree,
node = next;
continue;
case ST_OPEN:
set_front(tree, node, req->wrk);
set_front(tree, node, vdx->wrk);
AN(node->nexus.owner);
if (node->nexus.owner == req->wrk) {
if (node->nexus.owner == vdx->wrk) {
node = next;
continue;
}
......@@ -1079,39 +1085,39 @@ worklist_unpend(struct req *req, struct bytes_tree *tree,
#undef set_front
void
tree_deliver(struct req *req, struct bytes_tree *tree)
tree_deliver(struct vdp_ctx *vdx, struct bytes_tree *tree)
{
struct node_head work;
int retval;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(req->wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(vdx, VDP_CTX_MAGIC);
CHECK_OBJ_NOTNULL(vdx->wrk, WORKER_MAGIC);
if (tree->unpend_owner != NULL)
return;
tree->unpend_owner = req->wrk;
tree->unpend_owner = vdx->wrk;
do {
Lck_AssertHeld(&tree->tree_lock);
assert(tree->unpend_owner == req->wrk);
assert(tree->unpend_owner == vdx->wrk);
VSTAILQ_INIT(&work);
worklist_unpend(req, tree, &work);
worklist_unpend(vdx, tree, &work);
if (VSTAILQ_EMPTY(&work) || tree->retval)
break;
Lck_Unlock(&tree->tree_lock);
retval = worklist_push(req, tree, &work);
retval = worklist_push(vdx, tree, &work);
Lck_Lock(&tree->tree_lock);
if (retval) {
tree->retval = retval;
break;
}
worklist_set_delivered(req, tree, &work);
worklist_set_delivered(vdx, tree, &work);
} while (1);
tree->unpend_owner = NULL;
......
......@@ -269,9 +269,9 @@ void set_closed(struct bytes_tree *, struct node *, const struct worker *);
//--------------
void tree_prune(struct req *, struct node *);
void tree_free(struct req *, struct node *);
void tree_prune(struct vdp_ctx *, struct node *);
void tree_free(struct vdp_ctx *, struct node *);
//--------------
void tree_deliver(struct req *, struct bytes_tree *);
void tree_deliver(struct vdp_ctx *, struct bytes_tree *);
......@@ -101,15 +101,16 @@ vped_to_parent_fini(struct req *req, void **priv)
}
static int v_matchproto_(vdp_bytes_f)
vped_to_parent_bytes(struct req *req, enum vdp_action act, void **priv,
vped_to_parent_bytes(struct vdp_ctx *vdx, enum vdp_action act, void **priv,
const void *ptr, ssize_t len)
{
struct req *preq;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(vdx, VDP_CTX_MAGIC);
CAST_OBJ_NOTNULL(preq, *priv, REQ_MAGIC);
req->acct.resp_bodybytes += len;
return (VDP_bytes(preq, act, ptr, len));
if (act == VDP_END)
act = VDP_FLUSH;
return (VDP_bytes(preq->vdc, act, ptr, len));
}
static const struct vdp vped_to_parent = {
......@@ -256,7 +257,7 @@ vped_task(struct worker *wrk, void *priv)
*/
// finish the request
VDP_close(req);
(void) VDP_Close(req);
if (req->transport_priv != NULL)
assert(req->transport_priv == pesi);
......@@ -494,9 +495,10 @@ pesi_buf_fini(struct req *req, void **priv)
}
static int v_matchproto_(vdp_bytes_f)
pesi_buf_bytes(struct req *req, enum vdp_action act, void **priv,
const void *ptr, ssize_t len)
pesi_buf_bytes(struct vdp_ctx *vdx, enum vdp_action act, void **priv,
const void *ptr, ssize_t len)
{
struct req *req;
struct bytes_tree *tree;
struct node *node, *parent;
struct pesi *pesi;
......@@ -510,17 +512,19 @@ pesi_buf_bytes(struct req *req, enum vdp_action act, void **priv,
if (tree->retval || ptr == NULL || len == 0)
return (tree->retval);
CHECK_OBJ_NOTNULL(vdx, VDP_CTX_MAGIC);
req = vdx->req;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
parent = pesi->node;
CHECK_OBJ_NOTNULL(parent, NODE_MAGIC);
assert(parent->type == T_NEXUS);
VSLdbgv(req, "bytes_add: parent=%p front=%p",
VSLdbgv(vdx, "bytes_add: parent=%p front=%p",
parent, tree->front);
assert(req == parent->nexus.req);
VSLdbg(req, "bytes_add: adding data to node");
VSLdbg(vdx, "bytes_add: adding data to node");
node = node_alloc(pesi);
CHECK_OBJ_NOTNULL(node, NODE_MAGIC);
......@@ -555,17 +559,17 @@ pesi_buf_bytes(struct req *req, enum vdp_action act, void **priv,
AN(ptr);
AZ(node->data.ptr);
if (refok) {
VSLdbg(req, "bytes_add: stv ref");
VSLdbg(vdx, "bytes_add: stv ref");
node->data.ptr = ptr;
}
else {
VSLdbg(req, "bytes_add: allocating from transient");
VSLdbg(vdx, "bytes_add: allocating from transient");
node->data.st = stv_transient->sml_alloc(stv_transient, len);
if (node->data.st == NULL) {
VSLb(req->vsl, SLT_Error, "Cannot allocate transient "
VSLb(vdx->vsl, SLT_Error, "Cannot allocate transient "
"storage to buffer response data while waiting "
"for parallel ESI");
VSLdbg(req, "bytes_add: exit after transient alloc "
VSLdbg(vdx, "bytes_add: exit after transient alloc "
"failure");
Lck_Lock(&tree->tree_lock);
......@@ -601,21 +605,21 @@ pesi_buf_bytes(struct req *req, enum vdp_action act, void **priv,
(tree->front->parent == NULL ||
tree->front == parent ||
tree->front->parent == parent ||
tree->front_owner == req->wrk)) {
VSLdbg(req, "front push");
tree->front_owner == vdx->wrk)) {
VSLdbg(vdx, "front push");
// XXX for this node, we can spare buffering if got pushed
// XXX double locking with node_insert
Lck_Lock(&tree->tree_lock);
if (parent->state == ST_PRIVATE)
set_open(tree, parent, req->wrk);
set_open(tree, parent, vdx->wrk);
assert(parent->state = ST_OPEN);
tree_deliver(req, tree);
tree_deliver(vdx, tree);
Lck_Unlock(&tree->tree_lock);
}
VSLdbgv(req, "bytes_add to %s parent: exit",
VSLdbgv(vdx, "bytes_add to %s parent: exit",
parent->state == ST_PRIVATE ? "private" : "open");
return (tree->retval);
......@@ -820,7 +824,7 @@ vdp_pesi_fini(struct req *req, void **priv)
*
* we likely need more error handling
*/
tree_prune(req, bytes_tree->root);
tree_prune(req->vdc, bytes_tree->root);
AZ(pesi_tree->task_running);
assert(VTAILQ_EMPTY(&pesi_tree->task_head));
......@@ -830,7 +834,7 @@ vdp_pesi_fini(struct req *req, void **priv)
Lck_Delete(&bytes_tree->tree_lock);
AZ(pthread_cond_destroy(&bytes_tree->cond));
tree_free(req, bytes_tree->root);
tree_free(req->vdc, bytes_tree->root);
*priv = NULL;
return (0);
......@@ -867,9 +871,10 @@ vped_decode_len(struct req *req, const uint8_t **pp)
}
static int v_matchproto_(vdp_bytes_f)
vdp_pesi_bytes(struct req *req, enum vdp_action act, void **priv,
const void *ptr, ssize_t len)
vdp_pesi_bytes(struct vdp_ctx *vdx, enum vdp_action act, void **priv,
const void *ptr, ssize_t len)
{
struct req *req;
uint8_t *q, *r;
ssize_t l = 0;
const uint8_t *pp;
......@@ -880,6 +885,10 @@ vdp_pesi_bytes(struct req *req, enum vdp_action act, void **priv,
struct node *node, *child = NULL;
int retval = 0, parallel;
CHECK_OBJ_NOTNULL(vdx, VDP_CTX_MAGIC);
req = vdx->req;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
AN(priv);
CAST_OBJ_NOTNULL(pesi, *priv, PESI_MAGIC);
CHECK_OBJ_NOTNULL(pesi->pecx, PECX_MAGIC);
......@@ -1005,20 +1014,20 @@ vdp_pesi_bytes(struct req *req, enum vdp_action act, void **priv,
node_insert(tree, node, child);
VSLdbg(req, "vped_vdp: call vped_include");
VSLdbg(vdx, "vped_vdp: call vped_include");
parallel =
vped_include(req, (const char*)q,
(const char*)pecx->p,
pesi, child);
child = NULL;
VSLdbgv(req, "vped_vdp: vped_include()=%d",
VSLdbgv(vdx, "vped_vdp: vped_include()=%d",
parallel);
assert(parallel >= 0);
Debug("INCL [%s][%s] END\n", q, pecx->p);
pecx->p = r + 1;
break;
default:
VSLb(req->vsl, SLT_Error,
VSLb(vdx->vsl, SLT_Error,
"ESI corruption line %d 0x%02x [%s]\n",
__LINE__, *pecx->p, pecx->p);
WRONG("ESI-codes: Illegal code");
......@@ -1046,17 +1055,17 @@ vdp_pesi_bytes(struct req *req, enum vdp_action act, void **priv,
* in the same storage segment, so loop over storage
* until we have processed them all.
*/
VSLdbgv(req, "vped_vdp: pecx->state=%d", pecx->state);
VSLdbgv(vdx, "vped_vdp: pecx->state=%d", pecx->state);
if (pecx->l <= len) {
if (pecx->state == 3)
retval = VDP_bytes(req, act, pp, pecx->l);
retval = VDP_bytes(vdx, act, pp, pecx->l);
len -= pecx->l;
pp += pecx->l;
pecx->state = 1;
break;
}
if (pecx->state == 3 && len > 0)
retval = VDP_bytes(req, act, pp, len);
retval = VDP_bytes(vdx, act, pp, len);
pecx->l -= len;
return (tree->retval);
case 99:
......@@ -1079,17 +1088,17 @@ vdp_pesi_bytes(struct req *req, enum vdp_action act, void **priv,
vped_close_vdp(req, 1, &VDP_pesi_buf);
assert_vdp_next_not(req, &VDP_pesi);
tree_deliver(req, tree);
tree_deliver(vdx, tree);
while (!tree->retval
&& (tree->root->state < ST_DELIVERED ||
tree->npending > 0)) {
VSLdbgv(req, "vdp_pesi_bytes: waiting - "
VSLdbgv(vdx, "vdp_pesi_bytes: waiting - "
"state=%d npending=%d",
tree->root->state,
tree->npending);
AZ(Lck_CondWait(&tree->cond,
&tree->tree_lock, 0));
tree_deliver(req, tree);
tree_deliver(vdx, tree);
}
Lck_Unlock(&tree->tree_lock);
......@@ -1252,14 +1261,14 @@ vped_deliver(struct req *req, struct boc *boc, int wantbody)
assert(parent->type == T_NEXUS);
if (wantbody == 0) {
VDP_close(req);
(void) VDP_Close(req);
return;
}
VSLdbgv(req, "vped_deliver: ObjGetLen=%lu",
ObjGetLen(req->wrk, req->objcore));
if (boc == NULL && ObjGetLen(req->wrk, req->objcore) == 0) {
VDP_close(req);
(void) VDP_Close(req);
return;
}
......@@ -1277,7 +1286,7 @@ vped_deliver(struct req *req, struct boc *boc, int wantbody)
if (req->objcore->flags & OC_F_FAILED) {
/* No way of signalling errors in the middle of
the ESI body. Omit this ESI fragment. */
VDP_close(req);
(void) VDP_Close(req);
return;
}
......@@ -1285,7 +1294,7 @@ vped_deliver(struct req *req, struct boc *boc, int wantbody)
if (vgzgz == NULL) {
VSLb(req->vsl, SLT_Error,
"Insufficient workspace for ESI gzip data");
VDP_close(req);
(void) VDP_Close(req);
return;
}
INIT_OBJ(vgzgz, VPED_GZGZ_PRIV_MAGIC);
......@@ -1373,7 +1382,7 @@ vped_deliver(struct req *req, struct boc *boc, int wantbody)
(void)VDP_DeliverObj(req);
VDP_close(req);
req->acct.resp_bodybytes += VDP_Close(req);
node->final.fi_state = FI_DONE;
AZ(pthread_cond_signal(&node->final.fi_cond));
......@@ -1406,7 +1415,7 @@ vped_deliver(struct req *req, struct boc *boc, int wantbody)
/*
* to be done in the other thread:
* - VDP_DeliverObj()
* - VDP_close()
* - VDP_Close()
*
* from vped_task():
* - req_fini()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment