Rework logbuffer flushing

To avoid having to wait for a previous flush to finish (in most cases),
we now allocate the flush finish state dynamically (and asynchronously).

For ordinary flushes, we can now start the next flush while a previous
one is still in flight, ordering the flush finish in a list to preserve
log consistency.
parent 3f795484
......@@ -639,10 +639,11 @@ struct fellow_fd {
struct stvfe_tune *tune;
struct VSC_fellow *stats;
// open notification, reused by logbuffer_flush_finish
// open notification & logbuffer_flush_finish
pthread_mutex_t phase_mtx;
pthread_cond_t phase_cond;
VLIST_HEAD(,fellow_logbuffer_ff) ffhead;
unsigned nff;
fellow_task_run_t *taskrun;
......@@ -1934,14 +1935,10 @@ logbuffer_wait_flush_fini(const struct fellow_logbuffer *lbuf)
pthread_mutex_t *phase_mtx;
pthread_cond_t *phase_cond;
if (lbuf->ff == NULL)
return;
// logbuffer zeroed?
phase_mtx = lbuf->phase_mtx;
phase_cond = lbuf->phase_cond;
if (phase_mtx == NULL || phase_cond == NULL)
return;
AN(phase_mtx);
AN(phase_cond);
AZ(pthread_mutex_lock(phase_mtx));
ff = lbuf->ff;
......@@ -1953,6 +1950,7 @@ logbuffer_wait_flush_fini(const struct fellow_logbuffer *lbuf)
while (ff->state < FF_DONE)
AZ(pthread_cond_wait(phase_cond, phase_mtx));
AN(ff->ref);
assert(lbuf->ff != ff);
if (--ff->ref == 0)
AZ(pthread_cond_broadcast(phase_cond));
unlock:
......@@ -2590,6 +2588,7 @@ logbuffer_flush(struct fellow_fd *ffd,
if (doclose) {
AN(logbuffer_can(lbuf, LBUF_CAN_REF));
logbuffer_wait_flush_fini(lbuf);
AZ(lbuf->ff);
}
// keep the options to flush at all
......@@ -2607,6 +2606,8 @@ logbuffer_flush(struct fellow_fd *ffd,
AZ(lbuf->head.off);
if (! cap(can, LBUF_CAN_REF) || lbuf->ff != NULL)
return;
logbuffer_wait_flush_fini(lbuf);
AZ(lbuf->ff);
logbuffer_ref(ffd, lbuf);
logbuffer_flush_finish(ffd, lbuf, doclose, can);
return;
......@@ -2619,6 +2620,8 @@ logbuffer_flush(struct fellow_fd *ffd,
if (lbuf->head.block == NULL) {
if (! cap(can, LBUF_CAN_REF) || lbuf->ff != NULL)
return;
logbuffer_wait_flush_fini(lbuf);
AZ(lbuf->ff);
// the active block is the head block
AZ(lbuf->n);
......@@ -2640,6 +2643,8 @@ logbuffer_flush(struct fellow_fd *ffd,
(void) fellow_io_log_wait_completions(
lbuf->membuddy,
lbuf->fdil.ioctx, 1);
if (doclose)
AZ(lbuf->active.block);
FDBG(D_LOG_FLUSH, "%p ref active single %zu / %zu",
lbuf, ffd->log_info.off.logblk, ffd->log_info.off.pendblk);
......@@ -2761,8 +2766,6 @@ logbuffer_flush(struct fellow_fd *ffd,
}
}
logbuffer_need_ioctx(ffd, lbuf);
/* On submitting early:
*
* As long as we calculate the hashes in the thread calling
......@@ -2804,6 +2807,8 @@ logbuffer_flush(struct fellow_fd *ffd,
logblk->entry[logblk->nentries - 1].seq);
}
logbuffer_need_ioctx(ffd, lbuf);
FDBG(D_LOG_FLUSH, "%p to submit %u", lbuf, n);
AN(lbuf->regions_to_free);
......@@ -2897,6 +2902,8 @@ logbuffer_flush(struct fellow_fd *ffd,
v += fellow_io_log_wait_completions(
lbuf->membuddy,
lbuf->fdil.ioctx, 1);
if (doclose)
AZ(lbuf->active.block);
// handled at top
assert(lbuf->active.block != lbuf->head.block);
......@@ -2946,6 +2953,7 @@ logbuffer_flush_finish(struct fellow_fd *ffd,
AZ(pthread_mutex_lock(lbuf->phase_mtx));
needthread = VLIST_EMPTY(&ffd->ffhead);
VLIST_INSERT_HEAD(&ffd->ffhead, ff, list);
ffd->nff++;
lbuf->ff = ff;
ff->lbuf_ff = &lbuf->ff;
AZ(pthread_mutex_unlock(lbuf->phase_mtx));
......@@ -2974,6 +2982,7 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
{
struct fellow_logbuffer_ff *next;
struct buddy_ptr_extent alloc;
struct fellow_fd *ffd;
pthread_mutex_t *phase_mtx;
pthread_cond_t *phase_cond;
unsigned n, u, v;
......@@ -2981,12 +2990,14 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
vtim_mono dt, t2, t3;
#endif
CHECK_OBJ_NOTNULL(ff, FELLOW_LOGBUFFER_FF_MAGIC);
ffd = ff->ffd;
CHECK_OBJ_NOTNULL(ffd, FELLOW_FD_MAGIC);
alloc.ptr = ff;
alloc.size = buddy_rndup(ff->ffd->membuddy, sizeof *ff);
alloc.size = buddy_rndup(ffd->membuddy, sizeof *ff);
phase_mtx = &ff->ffd->phase_mtx;
phase_cond = &ff->ffd->phase_cond;
phase_mtx = &ffd->phase_mtx;
phase_cond = &ffd->phase_cond;
FF_TRANSITION(ff, FF_SCHEDULED, FF_WAIT_OUTSTANDING);
......@@ -3000,7 +3011,7 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
do {
FDBG(D_LOG_FLUSH, "wait %u/%u", v, n);
u = fellow_io_log_wait_completions(
ff->ffd->membuddy,
ffd->membuddy,
ff->fdil.ioctx, UINT_MAX);
v += u;
} while (u);
......@@ -3016,7 +3027,7 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
FDBG(D_LOG_FLUSH, "finish head blk %zu", ff->head.off);
// NULL ioctx is sync
XXXAN(fellow_io_log_submit(ff->ffd, NULL,
XXXAN(fellow_io_log_submit(ffd, NULL,
FAIOT_DLB_FINISH, &ff->head));
}
AZ(ff->head.block);
......@@ -3028,25 +3039,25 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
t2 = VTIM_mono();
#endif
FDBG(D_LOG_FLUSH, "ref active %zu / %zu", ff->ffd->log_info.off.logblk,
ff->ffd->log_info.off.pendblk);
FDBG(D_LOG_FLUSH, "ref active %zu / %zu", ffd->log_info.off.logblk,
ffd->log_info.off.pendblk);
FF_TRANSITION(ff, FF_HEAD, FF_HDR);
AZ(fellow_io_write_hdr(ff->ffd));
AZ(fellow_io_write_hdr(ffd));
FF_TRANSITION(ff, FF_HDR, FF_FREE);
if (ff->regions_to_free) {
logbuffer_flush_finish_need_ioctx(ff);
regionlist_discard(ff->ffd, ff->fdil.ioctx,
regionlist_discard(ffd, ff->fdil.ioctx,
&ff->regions_to_free);
}
AZ(ff->regions_to_free);
if (ff->fdil.ioctx) {
(void) fellow_io_log_wait_completions(
ff->ffd->membuddy,
ffd->membuddy,
ff->fdil.ioctx, UINT_MAX);
fellow_fd_ioctx_return(&ff->fdil);
}
......@@ -3066,10 +3077,8 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
(t2 - ff->t[1]) * 100 / dt,
(t3 - t2) * 100 / dt);
FF_TRANSITION(ff, FF_FREE, FF_DONE);
AZ(pthread_mutex_lock(phase_mtx));
assert(ff->state == FF_DONE);
FF_TRANSITION(ff, FF_FREE, FF_DONE);
AN(ff->lbuf_ff);
/* there might be a later flush */
......@@ -3077,29 +3086,27 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
*ff->lbuf_ff = NULL;
ff->lbuf_ff = NULL;
AZ(VLIST_NEXT(ff, list));
next = VLIST_PREV(ff, &ffd->ffhead, fellow_logbuffer_ff, list);
VLIST_REMOVE(ff, list);
while (ff->ref > 0) {
// sync with logbuffer_wait_flush_fini()
AZ(pthread_cond_broadcast(phase_cond));
AZ(pthread_cond_wait(phase_cond, phase_mtx));
}
AZ(ff->ref);
AN(ffd->nff--);
AZ(VLIST_NEXT(ff, list));
next = VLIST_PREV(ff, &ff->ffd->ffhead, fellow_logbuffer_ff, list);
VLIST_REMOVE(ff, list);
if (UNLIKELY(ff->ffd->phase == FP_FINI && next == NULL)) {
/*
* sync with fellow_log_close(): return mem under
* to to avoid race with buddy destroy
*/
buddy_return1_ptr_extent(ff->ffd->membuddy, &alloc);
AZ(pthread_cond_broadcast(phase_cond));
AZ(pthread_mutex_unlock(phase_mtx));
return (NULL);
// return mem under mtx to avoid race with buddy destroy
if (UNLIKELY(ffd->phase == FP_FINI)) {
buddy_return1_ptr_extent(ffd->membuddy, &alloc);
if (ffd->nff == 0)
AZ(pthread_cond_broadcast(phase_cond));
}
AZ(pthread_mutex_unlock(phase_mtx));
buddy_return1_ptr_extent(ff->ffd->membuddy, &alloc);
if (LIKELY(alloc.ptr != NULL))
buddy_return1_ptr_extent(ffd->membuddy, &alloc);
return (next);
}
......@@ -5099,6 +5106,7 @@ fellow_logs_iter(const struct flics *flics, struct flivs *flivs,
*
* once going backwards past the last block, the next_off must
* always be set, or we are dealing with an old active block
* (head on second flush)
*/
if (direction >= 0 && logblk->next_off == 0)
direction = -1;
......@@ -6371,7 +6379,7 @@ fellow_log_close(struct fellow_fd **ffdp)
fellow_logs_close(ffd);
AZ(pthread_mutex_lock(&ffd->phase_mtx));
while (! VLIST_EMPTY(&ffd->ffhead))
while (ffd->nff > 0)
AZ(pthread_cond_wait(&ffd->phase_cond, &ffd->phase_mtx));
AZ(pthread_mutex_unlock(&ffd->phase_mtx));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment