Commit bba038cb authored by Nils Goroll's avatar Nils Goroll

register an owner with ST_OPEN nodes

This is the first step towards bringing back safe front pushes: For a
thread in vdp_push bytes, it is safe to push upwards if it owns all
ST_OPEN T_NEXUS nodes upwards or if they are ST_CLOSED or ST_DELIVERED.

more to come in later commits
parent 3534fee1
......@@ -123,8 +123,8 @@ enum n_type {
* T_NEXUS only
*
* ST_OPEN: may receive pushes creating children
* unpending may deliver in parallel ("front delivery")
* any changes below loced only
* owning thread may run front delivery
* any changes below locked only
*
* T_NEXUS only
*
......@@ -160,6 +160,8 @@ struct node_nexus {
struct node_head children;
struct objcore *oc;
struct req *req;
const struct worker *owner; // ST_OPEN only
/* number of nodes pending under this node while state == ST_PRIVATE */
int npending_private;
/*
......@@ -726,7 +728,12 @@ assert_node(struct node *node, enum check_state check)
assert(node->type != T_NEXUS);
break;
case ST_OPEN:
AN(node->nexus.owner);
AZ(node->nexus.npending_private);
assert_nexus(node, CHK_ORDER);
break;
case ST_CLOSED:
AZ(node->nexus.owner);
AZ(node->nexus.npending_private);
assert_nexus(node, CHK_ORDER);
break;
......@@ -828,32 +835,27 @@ set_delivered(struct bytes_tree *tree, struct node *node)
/* ST_PRIVATE -> ST_OPEN transition
*
* make child notes available to unpending and require locking
* for all other operations
* child nodes may get unpended by owner
* any access locked only
*/
static void
set_open(struct bytes_tree *tree, struct node *node)
set_open(struct bytes_tree *tree, struct node *node, const struct worker *wrk)
{
CHECK_OBJ_NOTNULL(tree, BYTES_TREE_MAGIC);
CHECK_OBJ_NOTNULL(node, NODE_MAGIC);
Lck_AssertHeld(&tree->tree_lock);
VSL0dbg("set_open node=%p", node);
VSL0dbg("set_open node=%p wrk=%p", node, wrk);
assert(node->state == ST_PRIVATE);
AZ(node->nexus.owner);
node->state = ST_OPEN;
tree->npending += node->nexus.npending_private;
node->nexus.npending_private = 0;
/* unpending may wait for this node to open */
if (node->type == T_NEXUS &&
(node->parent == NULL ||
node->parent->state != ST_PRIVATE))
AZ(pthread_cond_signal(&tree->cond));
node->nexus.owner = wrk;
}
/* (ST_PRIVATE | ST_OPEN) -> CLOSE transition
......@@ -865,7 +867,7 @@ set_open(struct bytes_tree *tree, struct node *node)
*/
static void
set_closed(struct bytes_tree *tree, struct node *node)
set_closed(struct bytes_tree *tree, struct node *node, const struct worker *wrk)
{
struct node *child;
......@@ -874,15 +876,16 @@ set_closed(struct bytes_tree *tree, struct node *node)
Lck_AssertHeld(&tree->tree_lock);
if (node->state == ST_PRIVATE)
set_open(tree, node);
set_open(tree, node, wrk);
VSL0dbg("set_closed node=%p", node);
VSL0dbg("set_closed node=%p wrk=%p", node, wrk);
assert(node->state == ST_OPEN);
assert(node->nexus.owner == wrk);
assert_node(node, CHK_ORDER);
node->state = ST_CLOSED;
node->nexus.owner = NULL;
/* unpending may wait for this node to close */
AZ(pthread_cond_signal(&tree->cond));
......@@ -1014,7 +1017,7 @@ ved_task(struct worker *wrk, void *priv)
pesi = NULL;
Lck_Lock(&pesi_tree->tree->tree_lock);
set_closed(pesi_tree->tree, node);
set_closed(pesi_tree->tree, node, wrk);
Lck_Unlock(&pesi_tree->tree->tree_lock);
AZ(pesi);
task_fini(pesi_tree, pesi);
......@@ -1997,7 +2000,7 @@ pesi_buf_bytes(struct req *req, enum vdp_action act, void **priv,
if (parent->state == ST_PRIVATE && (refok == 0 || req->objcore->boc)) {
Lck_Lock(&tree->tree_lock);
set_open(tree, parent);
set_open(tree, parent, req->wrk);
Lck_Unlock(&tree->tree_lock);
}
#endif
......@@ -2461,7 +2464,7 @@ vdp_pesi_bytes(struct req *req, enum vdp_action act, void **priv,
* fini time, the V1D is already closed
*/
Lck_Lock(&tree->tree_lock);
set_closed(tree, node);
set_closed(tree, node, req->wrk);
assert_vdp_next_not(req, &VDP_pesi);
vped_close_vdp(req, 1, &VDP_pesi_buf);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment