Async allocation for logbuffer flush finish

parent bf9c6d6c
// from linux
#define LIKELY(x) __builtin_expect(!!(x), 1)
#define likely(x) __builtin_expect(!!(x), 1)
//#define unlikely(x) __builtin_expect(!!(x), 0)
#define UNLIKELY(x) __builtin_expect(!!(x), 0)
// not really compiler stuff, but compatiblity with master for varnish-cache 7.3
// taken from varnish-cache
......
......@@ -277,13 +277,34 @@ enum ff_state_e {
FF_DONE
};
// flush finish
/* flush finish
*
* flush finishes are organized a *reverse* VLIST with the HEAD
* as the most recent element.
*
* HEAD IS GLOBAL XXX
*
* ex For flushes 1, 2, 3 in that order:
*
* head
* V
* |1| -> |2| -> |3|
* | | <- | | <- | |
*
* lbuf_ff is where the logbuffer has a pointer to the _ff.
*
* lbuf finish code thus needs to wait for the flush finish to
* complete before returning memory
*/
struct fellow_logbuffer_ff {
unsigned magic;
#define FELLOW_LOGBUFFER_FF_MAGIC 0xcb1341d3
unsigned inuse;
unsigned can;
unsigned ref; // for wait_flush_fini
enum ff_state_e state;
VLIST_ENTRY(fellow_logbuffer_ff) list;
struct fellow_logbuffer_ff **lbuf_ff;
#ifdef DEBUG
vtim_mono t[2];
#endif
......@@ -387,6 +408,8 @@ assert_lbuf_dsk_resv(void)
BUDDY_REQS(lbuf_dskrsv_s, LBUF_DSK_RSV_REQS);
// this is the memory request for dskrsv
BUDDY_REQS(lbuf_dskrsv_mem_s, 1);
// memory for flush_finish
BUDDY_REQS(lbuf_ff_mem_s, 1);
struct fellow_logbuffer {
unsigned magic;
......@@ -402,8 +425,11 @@ struct fellow_logbuffer {
enum lbuf_state_e state;
struct buddy_ptr_page alloc; // arr allocation
enum f_phase_e *phase;
pthread_mutex_t *phase_mtx;
// protected by phase_mtx
pthread_cond_t *phase_cond;
buddy_t *membuddy;
buddy_t *dskbuddy;
struct fellow_fd_ioctx_lease fdil;
......@@ -421,7 +447,8 @@ struct fellow_logbuffer {
off_t active_off;
/* last written offset for coordination with _append() */
off_t tail_off;
struct fellow_logbuffer_ff ff;
struct fellow_logbuffer_ff *ff;
struct lbuf_ff_mem_s ff_mem;
struct lbuf_dskrsv_mem_s dskreqs_mem;
struct lbuf_dskrsv_s (*dskreqs)[2];
unsigned active_reqs;
......@@ -615,6 +642,7 @@ struct fellow_fd {
// open notification, reused by logbuffer_flush_finish
pthread_mutex_t phase_mtx;
pthread_cond_t phase_cond;
VLIST_HEAD(,fellow_logbuffer_ff) ffhead;
fellow_task_run_t *taskrun;
......@@ -682,7 +710,8 @@ fellow_fd_ioctx_get(struct fellow_fd *ffd, struct fellow_fd_ioctx_lease *fdil)
AZ(pthread_mutex_lock(&fdio->mtx));
f = bitf_ffs(fdio->bitf);
if (f == 0) {
while (f == 0) {
/* happens when a previous flush is still in progress */
/* this should never happen, but if it does,
* we do not want it to be fatal
*/
......@@ -754,7 +783,7 @@ fellow_wait_open(struct fellow_fd *ffd)
return;
ffd->diag("notice: operation waiting for FP_OPEN\n");
AZ(pthread_mutex_lock(&ffd->phase_mtx));
if (ffd->phase == FP_INIT)
while (ffd->phase == FP_INIT)
AZ(pthread_cond_wait(&ffd->phase_cond, &ffd->phase_mtx));
AZ(pthread_mutex_unlock(&ffd->phase_mtx));
assert(ffd->phase == FP_OPEN);
......@@ -1901,24 +1930,60 @@ logbuffer_assert_empty(const struct fellow_logbuffer *lbuf)
static void
logbuffer_wait_flush_fini(const struct fellow_logbuffer *lbuf)
{
struct fellow_logbuffer_ff *ff;
pthread_mutex_t *phase_mtx;
pthread_cond_t *phase_cond;
/* XXX we might race logbuffer_fini */
if (! lbuf->ff.inuse)
if (lbuf->ff == NULL)
return;
// logbuffer zeroed?
phase_mtx = lbuf->phase_mtx;
phase_cond = lbuf->phase_cond;
if (phase_mtx == NULL || phase_cond == NULL)
return;
AZ(pthread_mutex_lock(phase_mtx));
while (lbuf->ff.inuse)
ff = lbuf->ff;
if (ff == NULL)
goto unlock;
CHECK_OBJ(ff, FELLOW_LOGBUFFER_FF_MAGIC);
ff->ref++;
while (ff->state < FF_DONE)
AZ(pthread_cond_wait(phase_cond, phase_mtx));
AN(ff->ref);
if (--ff->ref == 0)
AZ(pthread_cond_broadcast(phase_cond));
unlock:
AZ(pthread_mutex_unlock(phase_mtx));
}
static void
logbuffer_alloc_ff_mem(struct fellow_logbuffer *lbuf)
{
struct buddy_reqs *reqs;
reqs = &lbuf->ff_mem.reqs;
BUDDY_REQS_PRI(reqs, FEP_MEM_LOG);
AN(buddy_req_extent(reqs, sizeof *lbuf->ff, 0));
(void) buddy_alloc_async(reqs);
}
static void
logbuffer_init_ff_mem(struct fellow_logbuffer *lbuf)
{
BUDDY_REQS_INIT(&lbuf->ff_mem, lbuf->membuddy);
logbuffer_alloc_ff_mem(lbuf);
}
static void
logbuffer_fini_ff_mem(struct fellow_logbuffer *lbuf)
{
buddy_alloc_wait_done(&lbuf->ff_mem.reqs);
lbuf->ff_mem.reqs.magic = 0;
}
static inline void
logbuffer_fini_dskreqs_mem(struct fellow_logbuffer *lbuf)
{
......@@ -1961,6 +2026,7 @@ logbuffer_fini(struct fellow_logbuffer *lbuf)
AZ(lbuf->regions_to_free);
logbuffer_fini_dskreqs(lbuf);
logbuffer_fini_ff_mem(lbuf);
if (lbuf->fdil.ioctx != NULL) {
(void)fellow_io_log_wait_completions(
......@@ -2096,6 +2162,8 @@ logbuffer_init(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
assert(b <= UINT8_MAX);
logbuffer_grow(lbuf, (uint8_t)b);
logbuffer_init_ff_mem(lbuf);
lbuf->state = LBUF_INIT;
lbuf->phase = &ffd->phase;
lbuf->phase_mtx = &ffd->phase_mtx;
......@@ -2107,8 +2175,10 @@ static inline void
logbuffer_take(struct fellow_logbuffer *to, struct fellow_logbuffer *from)
{
logbuffer_fini_dskreqs_mem(from);
logbuffer_fini_ff_mem(from);
logbuffer_wait_flush_fini(from);
TAKE(*to, *from);
logbuffer_init_ff_mem(to);
}
static struct fellow_disk_log_block *
......@@ -2517,8 +2587,10 @@ logbuffer_flush(struct fellow_fd *ffd,
if (logbuffer_can(lbuf, LBUF_CAN_LOGREG))
AN(lbuf->logreg);
if (doclose)
if (doclose) {
AN(logbuffer_can(lbuf, LBUF_CAN_REF));
logbuffer_wait_flush_fini(lbuf);
}
// keep the options to flush at all
can |= (LBUF_CAN_FLUSH | LBUF_CAN_LOGREG);
......@@ -2528,29 +2600,12 @@ logbuffer_flush(struct fellow_fd *ffd,
lbuf->flush_t0 = VTIM_mono();
#endif
/*
* no use to flush if previous flush has not finished.
*/
if (! doclose && lbuf->ff.inuse)
return;
/*
* if closing, wait for flush to finish
*/
if (doclose) {
AZ(pthread_mutex_lock(&ffd->phase_mtx));
while (lbuf->ff.inuse)
AZ(pthread_cond_wait(&ffd->phase_cond,
&ffd->phase_mtx));
AZ(pthread_mutex_unlock(&ffd->phase_mtx));
}
if (lbuf->active.block == NULL) {
AZ(lbuf->n);
AZ(lbuf->active.off);
AZ(lbuf->head.block);
AZ(lbuf->head.off);
if (! cap(can, LBUF_CAN_REF))
if (! cap(can, LBUF_CAN_REF) || lbuf->ff != NULL)
return;
logbuffer_ref(ffd, lbuf);
logbuffer_flush_finish(ffd, lbuf, doclose, can);
......@@ -2562,7 +2617,7 @@ logbuffer_flush(struct fellow_fd *ffd,
}
if (lbuf->head.block == NULL) {
if (! cap(can, LBUF_CAN_REF))
if (! cap(can, LBUF_CAN_REF) || lbuf->ff != NULL)
return;
// the active block is the head block
......@@ -2861,14 +2916,23 @@ logbuffer_flush_finish(struct fellow_fd *ffd,
struct fellow_logbuffer *lbuf, unsigned doclose, unsigned can)
{
struct fellow_logbuffer_ff *ff;
struct buddy_ptr_extent alloc;
struct buddy_reqs *reqs;
int needthread;
assert(cap(can, LBUF_CAN_REF));
ff = &lbuf->ff;
AZ(ff->inuse);
AZ(ff->regions_to_free);
reqs = &lbuf->ff_mem.reqs;
AN(buddy_alloc_async_wait(reqs));
alloc = buddy_get_ptr_extent(reqs, 0);
AN(alloc.ptr);
buddy_alloc_wait_done(reqs);
logbuffer_alloc_ff_mem(lbuf);
assert(alloc.size >= sizeof *ff);
ff = alloc.ptr;
INIT_OBJ(ff, FELLOW_LOGBUFFER_FF_MAGIC);
ff->inuse = 1;
ff->can = can;
ff->active_off = lbuf->active_off;
ff->tail_off = &lbuf->tail_off;
......@@ -2877,39 +2941,55 @@ logbuffer_flush_finish(struct fellow_fd *ffd,
TAKE(ff->head, lbuf->head);
TAKE(ff->regions_to_free, lbuf->regions_to_free);
FF_TRANSITION(ff, FF_INVAL, FF_SCHEDULED);
AZ(pthread_mutex_lock(lbuf->phase_mtx));
needthread = VLIST_EMPTY(&ffd->ffhead);
VLIST_INSERT_HEAD(&ffd->ffhead, ff, list);
lbuf->ff = ff;
ff->lbuf_ff = &lbuf->ff;
AZ(pthread_mutex_unlock(lbuf->phase_mtx));
#ifdef DEBUG
ff->t[0] = lbuf->flush_t0;
ff->t[1] = VTIM_mono();
#endif
FF_TRANSITION(ff, FF_INVAL, FF_SCHEDULED);
if (doclose) {
if (needthread)
logbuffer_flush_finish_work(NULL, ff);
else
logbuffer_wait_flush_fini(lbuf);
return;
}
if (doclose ||
// if scheduling a task fails, run in this thread
if (needthread &&
ffd->taskrun(logbuffer_flush_finish_work, ff, &ff->taskstate))
logbuffer_flush_finish_work(NULL, ff);
}
static void
logbuffer_flush_finish_work(struct worker *wrk, void *priv)
static struct fellow_logbuffer_ff *
logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
{
struct fellow_logbuffer_ff *ff;
struct fellow_logbuffer_ff *next;
struct buddy_ptr_extent alloc;
pthread_mutex_t *phase_mtx;
pthread_cond_t *phase_cond;
unsigned n, u, v;
#ifdef DEBUG
vtim_mono dt, t2, t3;
#endif
CHECK_OBJ_NOTNULL(ff, FELLOW_LOGBUFFER_FF_MAGIC);
alloc.ptr = ff;
alloc.size = buddy_rndup(ff->ffd->membuddy, sizeof *ff);
(void) wrk;
CAST_OBJ_NOTNULL(ff, priv, FELLOW_LOGBUFFER_FF_MAGIC);
phase_mtx = &ff->ffd->phase_mtx;
phase_cond = &ff->ffd->phase_cond;
FF_TRANSITION(ff, FF_SCHEDULED, FF_WAIT_OUTSTANDING);
assert(ff->inuse == 1);
ff->inuse = 2;
assert(cap(ff->can, LBUF_CAN_REF));
n = 0;
......@@ -2988,17 +3068,52 @@ logbuffer_flush_finish_work(struct worker *wrk, void *priv)
FF_TRANSITION(ff, FF_FREE, FF_DONE);
if (ff->ffd->phase != FP_OPEN) {
ff->inuse = 0;
return;
}
AZ(pthread_mutex_lock(phase_mtx));
assert(ff->state == FF_DONE);
assert(ff->inuse == 2);
ff->inuse = 0;
AZ(pthread_cond_broadcast(phase_cond));
AN(ff->lbuf_ff);
/* there might be a later flush */
if (*ff->lbuf_ff == ff)
*ff->lbuf_ff = NULL;
ff->lbuf_ff = NULL;
while (ff->ref > 0) {
// sync with logbuffer_wait_flush_fini()
AZ(pthread_cond_broadcast(phase_cond));
AZ(pthread_cond_wait(phase_cond, phase_mtx));
}
AZ(ff->ref);
AZ(VLIST_NEXT(ff, list));
next = VLIST_PREV(ff, &ff->ffd->ffhead, fellow_logbuffer_ff, list);
VLIST_REMOVE(ff, list);
if (UNLIKELY(ff->ffd->phase == FP_FINI && next == NULL)) {
/*
* sync with fellow_log_close(): return mem under
* to to avoid race with buddy destroy
*/
buddy_return1_ptr_extent(ff->ffd->membuddy, &alloc);
AZ(pthread_cond_broadcast(phase_cond));
AZ(pthread_mutex_unlock(phase_mtx));
return (NULL);
}
AZ(pthread_mutex_unlock(phase_mtx));
buddy_return1_ptr_extent(ff->ffd->membuddy, &alloc);
return (next);
}
static void
logbuffer_flush_finish_work(struct worker *wrk, void *priv)
{
struct fellow_logbuffer_ff *ff;
(void) wrk;
CAST_OBJ_NOTNULL(ff, priv, FELLOW_LOGBUFFER_FF_MAGIC);
do
ff = logbuffer_flush_finish_work_one(ff);
while (ff != NULL);
}
// XXX when should we flush anyway?
......@@ -5674,12 +5789,6 @@ fellow_logwatcher_thread(void *priv)
fellow_logwatcher_new_log_alloc(ffd, new_log_req, &new_log_fdr);
while (ffd->watcher_running) {
fellow_fd_update_space_stats(ffd);
if (ffd->logbuf->ff.inuse) {
AZ(pthread_mutex_unlock(&ffd->logmtx));
logbuffer_wait_flush_fini(ffd->logbuf);
AZ(pthread_mutex_lock(&ffd->logmtx));
continue;
}
if (new_log_fdr.size)
need = FLW_NEED_REWRITE;
......@@ -5946,6 +6055,7 @@ fellow_log_init(const char *path, size_t wantsize, size_t objsz_hint,
ffd->taskrun = taskrun;
ffd->diag = fellow_log_diag_stderr;
ffd->stats = stats;
VLIST_INIT(&ffd->ffhead);
fl = tryflags;
do {
......@@ -6230,13 +6340,28 @@ fellow_log_open(struct fellow_fd *ffd,
fellow_logwatcher_start(ffd);
}
/* looks like _destroy might hit an outdated value */
static void
fellow_mutex_destroy(pthread_mutex_t *mtx)
{
int err;
err = pthread_mutex_destroy(mtx);
if (err == 0)
return;
assert(err == EBUSY);
AZ(pthread_mutex_lock(mtx));
AZ(pthread_mutex_unlock(mtx));
AZ(pthread_mutex_destroy(mtx));
}
void
fellow_log_close(struct fellow_fd **ffdp)
{
struct buddy_off_extent e;
struct fellow_fd *ffd;
buddy_t *bp;
int err;
TAKE_OBJ_NOTNULL(ffd, ffdp, FELLOW_FD_MAGIC);
......@@ -6245,19 +6370,15 @@ fellow_log_close(struct fellow_fd **ffdp)
ffd->phase = FP_FINI;
fellow_logs_close(ffd);
AZ(pthread_mutex_lock(&ffd->phase_mtx));
while (! VLIST_EMPTY(&ffd->ffhead))
AZ(pthread_cond_wait(&ffd->phase_cond, &ffd->phase_mtx));
AZ(pthread_mutex_unlock(&ffd->phase_mtx));
fellow_mutex_destroy(&ffd->phase_mtx);
AZ(pthread_cond_destroy(&ffd->phase_cond));
AZ(pthread_mutex_destroy(&ffd->phase_mtx));
/* looks like _destroy might hit an outdated
* value
*/
err = pthread_mutex_destroy(&ffd->logmtx);
if (err != 0) {
assert(err == EBUSY);
AZ(pthread_mutex_lock(&ffd->logmtx));
AZ(pthread_mutex_unlock(&ffd->logmtx));
AZ(pthread_mutex_destroy(&ffd->logmtx));
}
fellow_mutex_destroy(&ffd->logmtx);
fellow_fd_ioctx_fini(ffd->fdio);
......@@ -6340,8 +6461,6 @@ fellow_log_flush(struct fellow_fd *ffd)
logbuffer_flush(ffd, lbuf, 0, LBUF_ALL);
}
AZ(pthread_mutex_unlock(&ffd->logmtx));
if (lbuf)
logbuffer_wait_flush_fini(lbuf);
}
void
......
......@@ -20,7 +20,7 @@
-emacro(774, REPLACE) // if (...)
-emacro(835, DLE_N) // A zero has been given...
-emacro(525, DLE_N) // Negative indent
-emacro(160, VSTAILQ_LAST) // sequence ({
-emacro(160, VSTAILQ_LAST, VLIST_PREV) // sequence ({
-emacro(160, _vtake) // sequence ({
// conflict between OA and VXID() macro
-esym(123, VXID)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment