fellow_storage: Handle inconsistency of ObjWaitExtend() vs boc->state

No matter when we read the boc state, before or after ObjWaitExtend(),
we might read an inconsistent, outdated or too fresh, value with respect
to the available bytes returned by ObjWaitExtend(), because our read
does not happen from the critical region of ObjWaitExtend().

We work around this issue with two changes:

- in fellow_stream_wait(), call ObjWaitExtend again if the state as
  seen from outside ObjWaitExtend() changes.

- in fellow_stream_f(), use ObjWaitState() to ensure that the busy
  object is done when we are called with OBJ_ITER_END, which means
  that we are called on the last fcs.

Should fix #44 - please open if not
parent b8923c29
...@@ -715,24 +715,36 @@ struct fellow_stream { ...@@ -715,24 +715,36 @@ struct fellow_stream {
ssize_t checkpoint; // up to this segment ssize_t checkpoint; // up to this segment
}; };
/*
* NOTE on state shenanigans:
*
* ObjWaitExtend does not provide a consistent read of available bytes and
* state.
*/
static enum boc_state_e static enum boc_state_e
fellow_stream_wait(struct fellow_stream *fs) fellow_stream_wait(struct fellow_stream *fs)
{ {
enum boc_state_e state; enum boc_state_e state1, state2;
volatile enum boc_state_e *statep =
(volatile enum boc_state_e *)&fs->boc->state;
uint64_t l; uint64_t l;
CHECK_OBJ_NOTNULL(fs, FELLOW_STREAM_MAGIC); CHECK_OBJ_NOTNULL(fs, FELLOW_STREAM_MAGIC);
l = (uint64_t)fs->available;
state2 = *statep;
do { do {
l = ObjWaitExtend(fs->wrk, fs->oc, (uint64_t)fs->available); state1 = state2;
state = fs->boc->state; l = ObjWaitExtend(fs->wrk, fs->oc, l);
state2 = *statep;
} while (state1 != state2);
if (state == BOS_FINISHED || state == BOS_FAILED) fs->state = state2;
break;
assert(state == BOS_STREAM);
} while (l == (uint64_t)fs->available);
fs->state = state;
fs->available = (ssize_t)l; fs->available = (ssize_t)l;
return (state);
return (state2);
} }
/* called on each (potentially busy) segment's data */ /* called on each (potentially busy) segment's data */
...@@ -757,9 +769,9 @@ fellow_stream_f(void *priv, unsigned flush, const void *ptr, ssize_t len) ...@@ -757,9 +769,9 @@ fellow_stream_f(void *priv, unsigned flush, const void *ptr, ssize_t len)
return (fs->func(fs->priv, flush, ptr, len)); return (fs->func(fs->priv, flush, ptr, len));
if (flush & OBJ_ITER_END) { if (flush & OBJ_ITER_END) {
for (r = 0; fs->state != BOS_FINISHED && r < 2; r++) ObjWaitState(fs->oc, BOS_FINISHED);
if (fellow_stream_wait(fs) == BOS_FAILED) if (fellow_stream_wait(fs) == BOS_FAILED)
return (-1); return (-1);
assert(fs->state == BOS_FINISHED); assert(fs->state == BOS_FINISHED);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment