fellow_log: share logblk mempool and differentiate priorities

Share the mempool between reading (flc) and writing (logbuffer).

Use different mempools and priorities for log rewrite and log flushes.
parent 777a3a3f
......@@ -484,7 +484,7 @@ struct fellow_logbuffer {
off_t tail_off;
struct fellow_logbuffer_ff *ff;
struct lbuf_ffpool ffpool[1];
struct logblk_mempool mempool[1];
struct logblk_mempool *mempool;
struct lbuf_dskpool_mem_s dskpool_mem;
struct lbuf_dskpool *dskpool;
};
......@@ -703,6 +703,7 @@ struct fellow_fd {
struct fellow_disk_log_info log_info;
struct fellow_disk_log_info last_log_info;
struct fellow_log_region logreg[LOGREGIONS];
struct logblk_mempool logblk_pool[1];
struct fellow_logbuffer logbuf[1];
uint8_t logbuf_id;
......@@ -945,6 +946,7 @@ struct flivs {
// const
struct fellow_logbuffer * const lbuf;
struct bitfs * const bitfs;
struct logblk_mempool * const mempool;
// non const
struct regionlist *tofree;
......@@ -1101,7 +1103,7 @@ struct fellow_logcache {
struct fellow_fd_ioctx_lease fdil;
const struct buddy_off_extent *region;
struct buddy_ptr_page alloc_entry;
struct logblk_mempool mempool[1];
struct logblk_mempool *mempool;
struct fellow_logcache_entry *current;
struct flehead free;
struct flehead used;
......@@ -1160,8 +1162,6 @@ fellow_logcache_fini(struct fellow_logcache *flc)
CHECK_OBJ_NOTNULL(flc, FELLOW_LOGCACHE_MAGIC);
rets = BUDDY_RETURNS_STK(flc->ffd->membuddy, BUDDY_RETURNS_MAX);
BUDDY_POOL_FINI(flc->mempool);
/* wait for all IO to complete before returning memory and
* the ioctx
*/
......@@ -1185,6 +1185,7 @@ fellow_logcache_fini(struct fellow_logcache *flc)
static void
fellow_logcache_init(struct fellow_logcache *flc, struct fellow_fd *ffd,
struct logblk_mempool *mempool,
uint8_t id, const struct buddy_off_extent *region)
{
struct fellow_logcache_entry *fle;
......@@ -1205,6 +1206,7 @@ fellow_logcache_init(struct fellow_logcache *flc, struct fellow_fd *ffd,
INIT_OBJ(flc, FELLOW_LOGCACHE_MAGIC);
flc->ffd = ffd;
flc->mempool = mempool;
flc->id = id;
flc->region = region;
......@@ -1225,8 +1227,6 @@ fellow_logcache_init(struct fellow_logcache *flc, struct fellow_fd *ffd,
assert(sz >= sizeof *fle);
sz /= sizeof *fle;
BUDDY_POOL_INIT(flc->mempool, ffd->membuddy, FEP_MEM_FLC,
logblk_mempool_fill, NULL);
assert(sz < UINT_MAX);
flc->n = (unsigned)sz;
......@@ -2078,7 +2078,6 @@ logbuffer_fini(struct fellow_logbuffer *lbuf)
AZ(lbuf->regions_to_free);
logbuffer_fini_dskpool(lbuf);
BUDDY_POOL_FINI(lbuf->mempool);
BUDDY_POOL_FINI(lbuf->ffpool);
if (lbuf->fdil.ioctx != NULL) {
......@@ -2195,7 +2194,7 @@ logbuffer_recycle(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
static void
logbuffer_init(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
struct fellow_log_region *logreg)
struct logblk_mempool *mempool, struct fellow_log_region *logreg)
{
const struct stvfe_tune *tune;
size_t b;
......@@ -2208,6 +2207,7 @@ logbuffer_init(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
INIT_OBJ(lbuf, FELLOW_LOGBUFFER_MAGIC);
lbuf->membuddy = ffd->membuddy;
lbuf->dskbuddy = ffd->dskbuddy;
lbuf->mempool = mempool;
b = log2up(tune->logbuffer_size *
sizeof *lbuf->arr / (FELLOW_DISK_LOG_BLOCK_ENTRIES / 2));
......@@ -2215,8 +2215,6 @@ logbuffer_init(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
assert(b <= UINT8_MAX);
logbuffer_grow(lbuf, (uint8_t)b);
BUDDY_POOL_INIT(lbuf->mempool, lbuf->membuddy, FEP_MEM_LOG,
logblk_mempool_fill, lbuf);
BUDDY_POOL_INIT(lbuf->ffpool, lbuf->membuddy, FEP_MEM_LOG,
logbuffer_fill_ffpool, lbuf);
......@@ -2227,18 +2225,18 @@ logbuffer_init(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
logbuffer_recycle(ffd, lbuf, logreg);
}
/* the ffd->logbuf needs to use the ffd->logblk_pool */
static inline void
logbuffer_take(struct fellow_logbuffer *to, struct fellow_logbuffer *from)
logbuffer_take(struct fellow_logbuffer *to, struct logblk_mempool *to_mempool,
struct fellow_logbuffer *from)
{
logbuffer_wait_flush_fini(from);
*to = *from;
BUDDY_POOL_INIT(to->mempool, from->membuddy, FEP_MEM_LOG,
logblk_mempool_fill, to);
to->mempool = to_mempool;
BUDDY_POOL_INIT(to->ffpool, from->membuddy, FEP_MEM_LOG,
logbuffer_fill_ffpool, to);
logbuffer_fini_dskpool_mem(from);
BUDDY_POOL_FINI(from->mempool);
BUDDY_POOL_FINI(from->ffpool);
memset(from, 0, sizeof *from);
......@@ -2246,7 +2244,7 @@ logbuffer_take(struct fellow_logbuffer *to, struct fellow_logbuffer *from)
static struct fellow_disk_log_block *
fellow_logblk_new(
struct fellow_logbuffer *lbuf,
const struct fellow_logbuffer *lbuf,
unsigned wait, uint8_t fht, uint8_t id)
{
struct fellow_disk_log_block *logblk;
......@@ -3540,7 +3538,8 @@ logbuffer_append(struct fellow_fd *ffd,
*/
dsk_init:
AN(from->id);
fellow_logcache_init(flc, ffd, from->id, from->logreg->region);
fellow_logcache_init(flc, ffd, to->mempool,
from->id, from->logreg->region);
fellow_logcache_prune_direction(flc, -1);
AZ(locked);
dsk:
......@@ -5153,7 +5152,8 @@ fellow_logs_iter(const struct flics *flics, struct flivs *flivs,
CHECK_OBJ_NOTNULL(flics, FLICS_MAGIC);
CHECK_OBJ_NOTNULL(flivs, FLIVS_MAGIC);
fellow_logcache_init(flc, flics->ffd, 0, active_logregion);
fellow_logcache_init(flc, flics->ffd, flivs->mempool,
0, active_logregion);
assert(flics->ffd->phase > FP_INVAL);
assert(flics->ffd->phase < FP_LIM);
......@@ -5432,6 +5432,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
struct fellow_logbuffer *lbuf;
struct fellow_logbuffer lbuf_save[1], lbuf_pend[1];
struct fellow_log_region *logreg;
struct logblk_mempool logblk_mempool[1];
struct buddy_off_extent fdr_tmp, *fdr;
off_t logblk_off, pendblk_off = 0;
struct regionlist *tofree = NULL;
......@@ -5477,6 +5478,10 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
//flexelint
memset(lbuf_pend, 0, sizeof lbuf_pend);
/* shared pool for logbuffer and logcache */
BUDDY_POOL_INIT(logblk_mempool, ffd->membuddy, FEP_MEM_REWR,
logblk_mempool_fill, NULL);
/*
* we put in place as the global buffer our temporary buffer
* backed by the pending region, which ultimately it is going to
......@@ -5485,7 +5490,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
if (ffd->phase == FP_OPEN) {
logreg = &ffd->logreg[pend_region(log_info->region)];
logregion_reset(logreg);
logbuffer_init(ffd, lbuf_pend, logreg);
logbuffer_init(ffd, lbuf_pend, logblk_mempool, logreg);
}
struct bitfalloc bfa[1];
......@@ -5501,6 +5506,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
assert(lbuf_pend->state == LBUF_INIT);
lbuf_pend->state = LBUF_FINI;
logbuffer_fini(lbuf_pend);
BUDDY_POOL_FINI(logblk_mempool);
return;
}
// IMPORTANT: post the rewriting flag only AFTER the BFA is alloc'ed
......@@ -5525,7 +5531,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
pendblk_off = log_info->off.pendblk;
AZ(ffd->logbuf->magic);
logbuffer_init(ffd, ffd->logbuf,
logbuffer_init(ffd, ffd->logbuf, ffd->logblk_pool,
&ffd->logreg[empty_region(log_info->region)]);
lbuf = ffd->logbuf;
......@@ -5539,8 +5545,8 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
assert(lbuf_pend->state == LBUF_INIT);
lbuf_pend->state = LBUF_PEND;
logbuffer_take(lbuf_save, ffd->logbuf);
logbuffer_take(ffd->logbuf, lbuf_pend);
logbuffer_take(lbuf_save, logblk_mempool, ffd->logbuf);
logbuffer_take(ffd->logbuf, ffd->logblk_pool, lbuf_pend);
AZ(pthread_cond_broadcast(&ffd->new_logbuf_cond));
AZ(pthread_mutex_unlock(&ffd->logmtx));
......@@ -5648,6 +5654,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
.oob = 0,
.lbuf = lbuf,
.bitfs = bfa->u.bitfs,
.mempool = logblk_mempool,
.tofree = tofree,
.ban_dles = {{0}},
.fdct = {{0}},
......@@ -5717,6 +5724,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
break;
case FP_OPEN:
assert(lbuf == lbuf_save);
assert(lbuf->mempool == logblk_mempool);
logbuffer_append(ffd,
&ffd->logmtx,
ffd->logreg[active_region(log_info->region)].region,
......@@ -5730,7 +5738,7 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
/* put back the old global buffer which we saved to tmp */
assert(lbuf == lbuf_save);
logbuffer_take(ffd->logbuf, lbuf);
logbuffer_take(ffd->logbuf, ffd->logblk_pool, lbuf);
lbuf = ffd->logbuf;
ffd->log_info.region = empty_region(log_info->region);
......@@ -5751,6 +5759,13 @@ fellow_logs_rewrite(struct fellow_fd *ffd,
AZ(pthread_cond_broadcast(&ffd->new_logbuf_cond));
AZ(pthread_mutex_unlock(&ffd->logmtx));
if (ffd->phase != FP_FINI) {
assert(ffd->logbuf->mempool == ffd->logblk_pool);
assert(ffd->logblk_pool->magic == BUDDY_POOL_MAGIC);
}
BUDDY_POOL_FINI(logblk_mempool);
if (tofree != NULL) {
struct fellow_fd_ioctx_lease fdil = {0};
......@@ -6628,6 +6643,9 @@ fellow_log_open(struct fellow_fd *ffd,
AZ(pthread_cond_init(&ffd->new_logbuf_cond, NULL));
fellow_logwatcher_init(ffd);
BUDDY_POOL_INIT(ffd->logblk_pool, ffd->membuddy, FEP_MEM_LOG,
logblk_mempool_fill, NULL);
INIT_OBJ(mf, FELLOW_LOG_MEMFAIL_MAGIC);
mf->membuddy = ffd->membuddy;
mf->phase = 1;
......@@ -6677,6 +6695,8 @@ fellow_log_close(struct fellow_fd **ffdp)
ffd->phase = FP_FINI;
fellow_logs_close(ffd);
BUDDY_POOL_FINI(ffd->logblk_pool);
AZ(pthread_mutex_lock(&ffd->phase_mtx));
while (ffd->nff > 0)
AZ(pthread_cond_wait(&ffd->phase_cond, &ffd->phase_mtx));
......
......@@ -60,6 +60,7 @@
//buddy.h
-esym(768, buddy_reqs::func) // member not referenced
-esym(768, buddy_reqs::line) // member not referenced
-emacro(506, BUDDY_POOL_FINI) // Constant value Boolean
//buddy.c API not used externally yet
-esym(7*,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment