Rework the disk LRU

It was too complicated and limited by waiting for flushes to finish.

Now that we can issue multiple flushes, we can simplify it
substantially.

As a result from intermediate efforts, there is now also a facility to
base nuking on the amount of data currently in the process of freeing.
Leaving it in #ifdef'ed out in case we'll need it again.
parent 9e7fc536
......@@ -329,6 +329,7 @@ struct fellow_logbuffer_ff {
struct fellow_fd_ioctx_lease fdil;
struct fellow_alloc_log_block head;
struct regionlist *regions_to_free;
size_t freeing; // from regionlist
fellow_task_privstate taskstate;
};
......@@ -848,7 +849,6 @@ struct fellow_log_prep {
};
// fwd decl
static void fellow_logwatcher_kick_locked(struct fellow_fd *);
static void
fellow_privatelog_submit(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
struct fellow_dle *entry, unsigned n);
......@@ -2985,6 +2985,8 @@ logbuffer_flush_finish(struct fellow_fd *ffd,
TAKE(ff->head, lbuf->head);
TAKE(ff->regions_to_free, lbuf->regions_to_free);
TAKE(ff->todo, lbuf->todo);
if (ff->regions_to_free != NULL)
ff->freeing = ff->regions_to_free->size;
FF_TRANSITION(ff, FF_INVAL, FF_SCHEDULED);
......@@ -3091,6 +3093,7 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
logbuffer_flush_finish_need_ioctx(ff);
regionlist_discard(ffd, ff->fdil.ioctx,
&ff->regions_to_free);
ff->freeing = 0;
}
AZ(ff->regions_to_free);
......@@ -5913,7 +5916,7 @@ fellow_logwatcher_thread(void *priv)
return (NULL);
}
static void
void
fellow_logwatcher_kick_locked(struct fellow_fd *ffd)
{
if (ffd->phase == FP_OPEN && ffd->watcher_running)
......@@ -6586,6 +6589,32 @@ fellow_rndup(const struct fellow_fd *ffd, size_t sz)
return (buddy_rndup(ffd->dskbuddy, sz));
}
#ifdef UNUSED
size_t
fellow_log_freeing(struct fellow_fd *ffd)
{
struct fellow_logbuffer_ff *ff;
struct regionlist *rl;
size_t sz = 0;
CHECK_OBJ_NOTNULL(ffd, FELLOW_FD_MAGIC);
if (pthread_mutex_trylock(&ffd->phase_mtx) == 0) {
VLIST_FOREACH(ff, &ffd->ffhead, list)
sz += ff->freeing;
AZ(pthread_mutex_unlock(&ffd->phase_mtx)); //lint !e455
}
if (pthread_mutex_trylock(&ffd->logmtx) == 0) {
rl = ffd->logbuf->regions_to_free;
if (rl != NULL)
sz += rl->size;
AZ(pthread_mutex_unlock(&ffd->logmtx)); //lint !e455
}
return (sz);
}
#endif
// XXX SHOULD NOT BE NEEDED
int
fellow_fd(const struct fellow_fd *ffd)
......
......@@ -269,3 +269,7 @@ size_t fellow_minsize(void);
typedef void fellow_log_diag_f(const char *fmt, ...);
void fellow_log_set_diag(struct fellow_fd *ffd, fellow_log_diag_f *diag);
void fellow_fd_update_space_stats(struct fellow_fd *ffd);
#ifdef UNUSED
size_t fellow_log_freeing(struct fellow_fd *ffd);
#endif
void fellow_logwatcher_kick_locked(struct fellow_fd *ffd);
......@@ -1664,16 +1664,15 @@ sfedsk_nuke(struct worker *wrk, struct fellow_fd *ffd,
wrk->strangelove++;
break;
}
fellow_log_flush(ffd);
fellow_logwatcher_kick_locked(ffd);
return ((unsigned)((int)STVELOVE_NUKE - wrk->strangelove));
}
static void
sfedsk_reserve_req(struct sfedsk_reserve *r)
{
unsigned n = r->nr - r->filled;
AN(n);
unsigned n = r->nr;
if (n > BUDDY_REQS_MAX)
n = BUDDY_REQS_MAX;
......@@ -1689,15 +1688,10 @@ sfedsk_reserve_fill(struct sfedsk_reserve *r)
uint8_t u;
unsigned n = r->nr - r->filled;
AN(n);
if (n > BUDDY_REQS_MAX)
n = BUDDY_REQS_MAX;
u = buddy_alloc_async_ready(r->reqs);
if (u < n)
n = u;
assert(n == u);
assert(r->filled + u <= r->nr);
arr = r->arr + r->filled;
for (u = 0; u < n; u++) {
......@@ -1722,7 +1716,7 @@ sfedsk_lru_thread(struct worker *wrk, void *arg)
buddy_t *buddy;
struct sfedsk_reserve r[1];
unsigned nuked, rc, cb, u, n;
unsigned nuked, n;
struct buddy_reqs *reqs;
......@@ -1739,39 +1733,36 @@ sfedsk_lru_thread(struct worker *wrk, void *arg)
wrk->vsl = &vsl;
VSL_Setup(wrk->vsl, NULL, (size_t)0);
while (!stvfe->shutdown) {
cb = stvfe->tune.chunk_exponent;
rc = stvfe->tune.dsk_reserve_chunks;
buddy_wait_needspace(buddy);
// reserve pointers
sfedsk_reserve_resize(r, rc, cb);
while (!stvfe->shutdown) {
sfedsk_reserve_resize(r,
stvfe->tune.dsk_reserve_chunks,
stvfe->tune.chunk_exponent);
if (r->filled < r->nr)
if (r->filled < r->nr) {
LOG(wrk, "reserve need %u * %s",
r->nr - r->filled, pow2_units[cb]);
r->nr - r->filled,
pow2_units[stvfe->tune.chunk_exponent]);
} else {
LOG(wrk, "lru wakeup reserve full");
}
nuked = 0;
n = 0;
while (r->filled < r->nr) {
n = 0;
sfedsk_reserve_req(r);
if (buddy->waiting)
nuked += sfedsk_nuke(wrk, stvfe->ffd, stv, ++n);
while (buddy->waiting && buddy->wait_pri == 0)
nuked += sfedsk_nuke(wrk, stvfe->ffd, stv, ++n);
sfedsk_reserve_release(r);
nuked += sfedsk_nuke(wrk, stvfe->ffd, stv, ++n);
if (buddy->waiting) {
buddy_alloc_async_done(r->reqs);
break;
}
sfedsk_reserve_fill(r);
sfedsk_reserve_req(r);
while (buddy->waiting) {
sfedsk_reserve_release(r);
nuked += sfedsk_nuke(wrk, stvfe->ffd, stv, ++n);
}
sfedsk_reserve_fill(r);
if (nuked != UINT_MAX)
LOG(wrk, "reserve %u rounds nuked %u", n, nuked);
LOG(wrk, "%u rounds nuked %u", n, nuked);
if (r->filled < r->nr && nuked == 0) {
LOG(wrk, "reserve fail");
......@@ -1781,38 +1772,8 @@ sfedsk_lru_thread(struct worker *wrk, void *arg)
Pool_Sumstat(wrk);
VSL_Flush(wrk->vsl, 0);
buddy_wait_needspace(buddy);
while (buddy->waiting && r->filled)
sfedsk_reserve_release(r);
if (r->nr)
LOG(wrk, "waiting %u", buddy->waiting);
/*
* while there are allocations waiting,
* drain reserve and nuke
*/
nuked = 0;
n = 0;
while (buddy->waiting && r->filled)
sfedsk_reserve_release(r);
while (buddy->waiting) {
u = sfedsk_nuke(wrk, stvfe->ffd, stv, ++n);
if (u == 0 && r->filled == 0 &&
buddy->waiting > 0) {
LOG(wrk, "waiting %u fail", buddy->waiting);
VSL_Flush(wrk->vsl, 0);
buddy_wait_fail(buddy);
}
nuked += u;
}
if (nuked)
LOG(wrk, "acute nuked %u", nuked);
buddy_wait_needspace(buddy);
}
sfedsk_reserve_free(r);
return (NULL);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment