fellow_log: Throttle DLE_OBJ_CHG submission if dlechg limit reached

parent 2494942f
......@@ -680,6 +680,8 @@ struct fellow_fd {
buddy_t dskbuddy[1];
// mtx protects api functions on logbuf
pthread_mutex_t logmtx;
// notify when new logbuffer
pthread_cond_t new_logbuf_cond;
// watcher thread wakeup - under logmtx
pthread_cond_t watcher_cond;
pthread_t watcher_thread;
......@@ -3727,6 +3729,17 @@ logbuffer_append(struct fellow_fd *ffd,
AZ(from->regions_to_free);
}
// safe max obj_chg a logbuffer can take so rewrite does not run out of memory
static size_t
logbuffer_max_obj_chg(const struct fellow_logbuffer *lbuf)
{
size_t sz;
sz = buddy_size(lbuf->membuddy) / 8;
return (sz / sizeof(struct fellow_dlechg));
}
/*
* return a new disk region for the log if it needs
* resizing
......@@ -5469,6 +5482,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
logbuffer_fini(lbuf_pend);
return;
}
// IMPORTANT: post the rewriting flag only AFTER the BFA is alloc'ed
ffd->rewriting = 1;
switch (ffd->phase) {
......@@ -5506,6 +5520,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
logbuffer_take(lbuf_save, ffd->logbuf);
logbuffer_take(ffd->logbuf, lbuf_pend);
AZ(pthread_cond_broadcast(&ffd->new_logbuf_cond));
AZ(pthread_mutex_unlock(&ffd->logmtx));
......@@ -5712,6 +5727,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
if (lbuf != NULL)
logbuffer_flush(ffd, lbuf, 0, LBUF_ALL);
AZ(pthread_cond_broadcast(&ffd->new_logbuf_cond));
AZ(pthread_mutex_unlock(&ffd->logmtx));
if (tofree != NULL) {
......@@ -5751,6 +5767,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
ffd->rewriting = 0;
if (ffd->watcher_running)
AZ(pthread_cond_signal(&ffd->watcher_cond));
AZ(pthread_cond_broadcast(&ffd->new_logbuf_cond));
AZ(pthread_mutex_unlock(&ffd->logmtx));
ffd->diag("... done: %fs\n", VTIM_mono() - t0);
}
......@@ -5879,10 +5896,9 @@ fellow_logwatcher_need_openlog(const struct fellow_logbuffer *lbuf,
* substantial amount of DLE changes
*/
sz = (buddy_size(lbuf->membuddy) / 8) /
sizeof(struct fellow_dlechg);
sz = logbuffer_max_obj_chg(lbuf);
if (s->obj_chg > sz) {
if (s->obj_chg >= sz) {
if (why) {
bprintf(why->s, "obj_chg %u > %zu",
s->obj_chg, sz);
......@@ -6588,6 +6604,7 @@ fellow_log_open(struct fellow_fd *ffd,
pthread_t mf_thread;
AZ(pthread_mutex_init(&ffd->logmtx, NULL));
AZ(pthread_cond_init(&ffd->new_logbuf_cond, NULL));
fellow_logwatcher_init(ffd);
INIT_OBJ(mf, FELLOW_LOG_MEMFAIL_MAGIC);
......@@ -6646,6 +6663,7 @@ fellow_log_close(struct fellow_fd **ffdp)
fellow_mutex_destroy(&ffd->phase_mtx);
AZ(pthread_cond_destroy(&ffd->phase_cond));
AZ(pthread_cond_destroy(&ffd->new_logbuf_cond));
fellow_mutex_destroy(&ffd->logmtx);
......@@ -6693,10 +6711,32 @@ fellow_log_dle_submit(struct fellow_fd *ffd,
prealloc = regionlist_alloc_nowait(ffd->membuddy);
}
/*
* for now, only a single obj_chg is submitted at a time
* (assertion for below)
*/
assert(prep->dle_stats.obj_chg <= 1);
AZ(pthread_mutex_lock(&ffd->logmtx));
if (ffd->logbuf->regions_to_free == NULL && prealloc != NULL)
TAKEZN(ffd->logbuf->regions_to_free, prealloc);
/*
* for now, only a single obj_chg is submitted at a time
*
* if we hit the max, we kick for a rewrite or, if rewrite is
* underway, wait for the new log to appear
*/
if (prep->dle_stats.obj_chg > 0 &&
ffd->logbuf->dle_stats.obj_chg
>= logbuffer_max_obj_chg(ffd->logbuf)) {
if (ffd->rewriting) {
AZ(pthread_cond_wait(&ffd->new_logbuf_cond,
&ffd->logmtx));
}
else
fellow_logwatcher_kick_locked(ffd);
}
fellow_log_entries_add(ffd, ffd->logbuf, prep);
AZ(pthread_mutex_unlock(&ffd->logmtx));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment