Commit 5634d68e authored by Nils Goroll's avatar Nils Goroll

cleanup push functions: make node type specific

parent 27ace870
......@@ -1168,130 +1168,157 @@ bytes_unpend_worklist(struct req *req, struct bytes_tree *tree,
}
/*
* deliver unpended notes while not holding the tree lock
*
* returns VDP_bytes upon any error
* node type delivery not holding the tree lock
*/
static int
bytes_push_worklist(struct req *req, struct bytes_tree *tree,
struct node_head *work)
push_subreq(struct req *req, struct bytes_tree *tree,
struct node *node, const struct node *next)
{
struct node *node, *next;
uint8_t tailbuf[GZIP_TAILBUF_SZ];
const void *p;
int retval = 0, tailbuf_used = 0;
struct pesi *pesi;
node = VSTAILQ_FIRST(work);
AN(node);
while (node) {
next = VSTAILQ_NEXT(node, unpend);
(void) next;
// assert_node needs lock
assert(node->state == ST_UNPENDING);
if (node->subreq.done == 0) {
Lck_Lock(&tree->tree_lock);
if (node->subreq.done == 0)
AZ(Lck_CondWait(
&node->subreq.cond,
&tree->tree_lock, 0));
Lck_Unlock(&tree->tree_lock);
}
AN(node->subreq.done);
/* XXX CLEANUP */
if (node->type == T_SUBREQ) {
struct pesi *pesi;
/* transfer all to local variables */
struct req *subreq = node->subreq.req;
struct boc *boc = node->subreq.boc;
struct objcore *objcore = node->subreq.oc;
if (node->subreq.done == 0) {
Lck_Lock(&tree->tree_lock);
if (node->subreq.done == 0)
AZ(Lck_CondWait(
&node->subreq.cond,
&tree->tree_lock, 0));
Lck_Unlock(&tree->tree_lock);
}
AN(node->subreq.done);
node->subreq.req = NULL;
node->subreq.boc = NULL;
node->subreq.oc = NULL;
/* transfer all to local variables */
struct req *subreq = node->subreq.req;
struct boc *boc = node->subreq.boc;
struct objcore *objcore = node->subreq.oc;
if ((node->subreq.oc_flags_saved & OC_F_FINAL) != 0)
objcore->flags = node->subreq.oc_flags_saved;
else
assert(objcore->flags == node->subreq.oc_flags_saved);
node->subreq.req = NULL;
node->subreq.boc = NULL;
node->subreq.oc = NULL;
CHECK_OBJ_NOTNULL(subreq, REQ_MAGIC);
CHECK_OBJ_ORNULL(boc, BOC_MAGIC);
if ((node->subreq.oc_flags_saved & OC_F_FINAL) != 0)
objcore->flags = node->subreq.oc_flags_saved;
else
assert(objcore->flags == node->subreq.oc_flags_saved);
AZ(subreq->objcore);
subreq->objcore = objcore;
objcore = NULL;
CHECK_OBJ_NOTNULL(subreq->objcore, OBJCORE_MAGIC);
CHECK_OBJ_NOTNULL(subreq, REQ_MAGIC);
CHECK_OBJ_ORNULL(boc, BOC_MAGIC);
// XXX NEED Bytes to topreq
// VDP_close(node->subreq.req);
AZ(subreq->objcore);
subreq->objcore = objcore;
objcore = NULL;
CHECK_OBJ_NOTNULL(subreq->objcore, OBJCORE_MAGIC);
/* wrk from topreq ! */
subreq->wrk = req->wrk;
// XXX HACK
node->subreq.topreq = req;
// XXX NEED Bytes to topreq
// VDP_close(node->subreq.req);
VSLdbg(subreq, "DeliverObj from top");
VDP_DeliverObj(subreq);
/* wrk from topreq ! */
subreq->wrk = req->wrk;
// XXX HACK
node->subreq.topreq = req;
/* bottom of cnt_transmit() */
HSH_Cancel(req->wrk, subreq->objcore, boc);
VSLdbg(subreq, "DeliverObj from top");
VDP_DeliverObj(subreq);
if (boc != NULL)
HSH_DerefBoc(req->wrk, subreq->objcore);
/* bottom of cnt_transmit() */
HSH_Cancel(req->wrk, subreq->objcore, boc);
(void)HSH_DerefObjCore(req->wrk, &subreq->objcore, 0);
if (boc != NULL)
HSH_DerefBoc(req->wrk, subreq->objcore);
TAKE_OBJ_NOTNULL(pesi, &subreq->transport_priv, PESI_MAGIC);
pesi_destroy(&pesi);
req_fini(subreq, req->wrk);
(void)HSH_DerefObjCore(req->wrk, &subreq->objcore, 0);
// tree->retval is sort-of inside out here, but
// that detail really should not matter
return (tree->retval);
}
TAKE_OBJ_NOTNULL(pesi, &subreq->transport_priv, PESI_MAGIC);
pesi_destroy(&pesi);
req_fini(subreq, req->wrk);
node = next;
continue;
}
static int
push_data(struct req *req, struct bytes_tree *tree,
struct node *node, const struct node *next)
{
uint8_t tailbuf[GZIP_TAILBUF_SZ];
const void *p;
enum vdp_action act;
assert(node->type == T_DATA);
assert(node->type == T_DATA);
p = node->data.ptr;
if (p == NULL && node->data.len > 0) {
CHECK_OBJ_NOTNULL(node->data.st, STORAGE_MAGIC);
p = node->data.st->ptr;
}
if (p == gzip_fini) {
AZ(tailbuf_used);
VSLdbgv(req, "bytes_unpend: gzip_fini "
"crc=%x l_crc=%zd", tree->bad_crc, tree->bad_l_crc);
// XXX unlocked ok?
gzip_tailbuf(tailbuf, tree);
node->data.ptr = p = tailbuf;
node->data.len = GZIP_TAILBUF_SZ;
node->data.act = VDP_FLUSH;
VSLdbgv(req, "bytes_unpend: "
"tailbuf=\\%o\\%o\\%o\\%o\\%o\\%o\\%o\\%o\\%o"
"\\%o\\%o\\%o\\%o="
"%x %x %x %x %x %x %x %x %x %x %x %x %x",
tailbuf[0], tailbuf[1], tailbuf[2], tailbuf[3],
tailbuf[4], tailbuf[5], tailbuf[6], tailbuf[7],
tailbuf[8], tailbuf[9], tailbuf[10],
tailbuf[11], tailbuf[12],
tailbuf[0], tailbuf[1], tailbuf[2], tailbuf[3],
tailbuf[4], tailbuf[5], tailbuf[6], tailbuf[7],
tailbuf[8], tailbuf[9], tailbuf[10],
tailbuf[11], tailbuf[12]);
tailbuf_used = 1;
}
/*
* we ignore any act from the node and push all we got
* with VDP_FLUSH at the last node
*/
act = (next == NULL) ? VDP_FLUSH : VDP_NULL;
/* pesi_buf is bypassed while we run.
* we ignore any act from the node and push all we got
* with VDP_FLUSH at the last node
*/
retval = VDP_bytes(req, next ? VDP_NULL : VDP_FLUSH,
p, node->data.len);
p = node->data.ptr;
if (p == NULL && node->data.len > 0) {
CHECK_OBJ_NOTNULL(node->data.st, STORAGE_MAGIC);
p = node->data.st->ptr;
}
if (p == gzip_fini) {
VSLdbgv(req, "bytes_unpend: gzip_fini "
"crc=%x l_crc=%zd", tree->bad_crc, tree->bad_l_crc);
// XXX unlocked ok?
gzip_tailbuf(tailbuf, tree);
node->data.ptr = p = tailbuf;
node->data.len = GZIP_TAILBUF_SZ;
node->data.act = VDP_FLUSH;
VSLdbgv(req, "bytes_unpend: "
"tailbuf=\\%o\\%o\\%o\\%o\\%o\\%o\\%o\\%o\\%o"
"\\%o\\%o\\%o\\%o="
"%x %x %x %x %x %x %x %x %x %x %x %x %x",
tailbuf[0], tailbuf[1], tailbuf[2], tailbuf[3],
tailbuf[4], tailbuf[5], tailbuf[6], tailbuf[7],
tailbuf[8], tailbuf[9], tailbuf[10],
tailbuf[11], tailbuf[12],
tailbuf[0], tailbuf[1], tailbuf[2], tailbuf[3],
tailbuf[4], tailbuf[5], tailbuf[6], tailbuf[7],
tailbuf[8], tailbuf[9], tailbuf[10],
tailbuf[11], tailbuf[12]);
act = VDP_FLUSH;
}
if (retval)
/* pesi_buf is bypassed while we run. */
return (VDP_bytes(req, act, p, node->data.len));
}
/*
* deliver unpended notes while not holding the tree lock
*
* returns VDP_bytes upon any error
*/
static int
bytes_push_worklist(struct req *req, struct bytes_tree *tree,
struct node_head *work)
{
struct node *node, *next;
int retval = 0;
node = VSTAILQ_FIRST(work);
AN(node);
while (node) {
next = VSTAILQ_NEXT(node, unpend);
// not using assert_node: it needs locking
assert(node->state == ST_UNPENDING);
switch (node->type) {
case T_SUBREQ:
retval = push_subreq(req, tree, node, next);
break;
case T_DATA:
retval = push_data(req, tree, node, next);
break;
default:
INCOMPL();
}
if (retval)
break;
node = next;
}
......@@ -1299,6 +1326,10 @@ bytes_push_worklist(struct req *req, struct bytes_tree *tree,
(void) VDP_bytes(req, VDP_FLUSH, NULL, 0);
VSTAILQ_FOREACH(node, work, unpend) {
// XXX UNIFY
if (node->type != T_DATA)
continue;
// assert (node->type == T_DATA);
if (node->data.st == NULL)
continue;
stv_transient->sml_free(node->data.st);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment