Allocate additional log blocks early

This, hopefully, is part of a possible solution to the nasty issue #28:

When we do not have a sufficiently large pre-allocated log (log region)
as determined by objsize_hint in relation to the storage size, we need
to dynamically allocate disk blocks while we flush the log.

When the log flush includes object deletions (in particular when
triggered from the disk LRU), we run into a typical deadlock: To
complete the transaction to free space, we need the space...

This commit is part of an attempt to make this work by allocating
space early on: When we only have 20% of the log region left, we start
to reserve more blocks for the log.

The problem can, for example, be reproduced with an objsize_hint of 1MB
and an actual object size in the oder of 32KB.

Ref #28
parent 11d86e62
......@@ -371,6 +371,23 @@ assert_seq_macros(void)
== (seqvar)); \
} while (0)
// <8K for BUDDY_REQS
#define LBUF_DSK_RSV_REQS \
(((1<<13) - sizeof(struct buddy_reqs)) / \
sizeof(struct i_reqalloc))
//lint -e{506, 527}
static void __attribute__((constructor))
assert_lbuf_dsk_resv(void)
{
assert(LBUF_DSK_RSV_REQS <= BUDDY_REQS_MAX);
}
// dsk request
BUDDY_REQS(lbuf_dskrsv_s, LBUF_DSK_RSV_REQS);
// this is the memory request for dskrsv
BUDDY_REQS(lbuf_dskrsv_mem_s, 1);
struct fellow_logbuffer {
unsigned magic;
#define FELLOW_LOGBUFFER_MAGIC 0xe8454b5a
......@@ -405,6 +422,9 @@ struct fellow_logbuffer {
/* last written offset for coordination with _append() */
off_t tail_off;
struct fellow_logbuffer_ff ff;
struct lbuf_dskrsv_mem_s dskreqs_mem;
struct lbuf_dskrsv_s (*dskreqs)[2];
unsigned active_reqs;
};
#define LBUF_OK(lbuf) ( \
......@@ -1902,6 +1922,10 @@ logbuffer_wait_flush_fini(const struct fellow_logbuffer *lbuf)
static void
logbuffer_fini(struct fellow_logbuffer *lbuf)
{
struct buddy_returns *rets;
struct buddy_ptr_extent alloc;
struct buddy_ptr_page palloc;
size_t sz;
CHECK_OBJ_NOTNULL(lbuf, FELLOW_LOGBUFFER_MAGIC);
......@@ -1918,9 +1942,21 @@ logbuffer_fini(struct fellow_logbuffer *lbuf)
AZ(lbuf->fdil.ioctxp);
}
logbuffer_wait_flush_fini(lbuf);
rets = BUDDY_RETURNS_STK(lbuf->membuddy, 2);
if (lbuf->dskreqs != NULL) {
buddy_alloc_wait_done(&(*lbuf->dskreqs)[0].reqs);
buddy_alloc_wait_done(&(*lbuf->dskreqs)[1].reqs);
sz = buddy_rndup(lbuf->membuddy, sizeof *lbuf->dskreqs);
alloc = BUDDY_PTR_EXTENT(lbuf->dskreqs, sz);
AN(buddy_return_ptr_extent(rets, &alloc));
}
else if (lbuf->dskreqs_mem.reqs.magic == BUDDY_REQS_MAGIC)
buddy_alloc_wait_done(&lbuf->dskreqs_mem.reqs);
buddy_return1_ptr_page(lbuf->membuddy, &lbuf->alloc);
logbuffer_wait_flush_fini(lbuf);
TAKE(palloc, lbuf->alloc);
AN(buddy_return_ptr_page(rets, &palloc));
buddy_return(rets);
memset(lbuf, 0, sizeof *lbuf);
}
......@@ -2134,27 +2170,69 @@ static void
log_blocks_alloc_from_reqs(struct buddy_reqs *reqs,
struct fellow_alloc_log_block **arr, unsigned *n)
{
uint8_t nreq, r;
nreq = BUDDYF(alloc_async_ready)(reqs);
CHECK_OBJ_NOTNULL(reqs, BUDDY_REQS_MAGIC);
(void) BUDDYF(alloc_async_ready)(reqs);
for (r = 0; r < nreq; (*arr)++, (*n)--) {
for (; *n; (*arr)++, (*n)--) {
if ((*arr)->off > 0)
continue;
(*arr)->off = buddy_get_off_page(reqs, r++).off;
(*arr)->off = buddy_get_next_off_page(reqs).off;
if ((*arr)->off == BUDDY_OFF_NIL)
break;
assert((*arr)->off > 0);
FDBG(D_LOG_ALLOC, "next[extra] = %zu", (*arr)->off);
}
CHECK_OBJ_NOTNULL(reqs, BUDDY_REQS_MAGIC);
}
static void
logbuffer_alloc_some(const struct fellow_logbuffer *lbuf,
logbuffer_fill_dskreq(struct buddy_reqs *reqs) {
unsigned i;
CHECK_OBJ_NOTNULL(reqs, BUDDY_REQS_MAGIC);
buddy_alloc_wait_done(reqs);
for (i = 0; i < reqs->space; i++)
AN(buddy_req_page(reqs, MIN_FELLOW_BITS, 0));
(void) buddy_alloc_async(reqs);
CHECK_OBJ_NOTNULL(reqs, BUDDY_REQS_MAGIC);
}
static struct buddy_reqs *
logbuffer_get_dskreqs(struct fellow_logbuffer *lbuf)
{
struct buddy_reqs *reqs;
int i;
AN(lbuf->dskreqs);
/* i < 3: If both reqs are empty, at iteration 3 we
* must hit the first filled alloc
*/
for (i = 0; i < 3; i++) {
AZ(lbuf->active_reqs & ~1);
reqs = &(*lbuf->dskreqs)[lbuf->active_reqs].reqs;
CHECK_OBJ_NOTNULL(reqs, BUDDY_REQS_MAGIC);
if (buddy_reqs_next_ready(reqs))
return (reqs);
(void) buddy_alloc_async_wait(reqs);
if (buddy_reqs_next_ready(reqs))
return (reqs); // ret2
logbuffer_fill_dskreq(reqs);
lbuf->active_reqs = ! lbuf->active_reqs;
}
WRONG("Expected ret2 to return");
//NEEDLESS(return(NULL));
}
static void
logbuffer_alloc_some(struct fellow_logbuffer *lbuf,
struct fellow_alloc_log_block *arr, unsigned n)
{
const size_t blksz = sizeof(struct fellow_disk_log_block);
struct buddy_reqs *reqs;
struct lbuf_dskrsv_s (*dskreqs)[2];
struct buddy_ptr_extent alloc;
unsigned i;
uint8_t nreq, r;
//lint -e{506} constant boolean
assert(blksz == ((size_t)1 << MIN_FELLOW_BITS));
......@@ -2176,32 +2254,57 @@ logbuffer_alloc_some(const struct fellow_logbuffer *lbuf,
n--;
}
/*
* when the log region is becoming full, start
* pre-allocating disk blocks. For this, we need
* buddy_reqs, which, in turn, needs memory. We
* also allocate that async to not delay the happy
* path (we are in the middle of flushing logs under the lock)
*
* XXX is 20% a good measure?
*/
if (lbuf->dskreqs == NULL &&
lbuf->dskreqs_mem.reqs.magic == 0 &&
lbuf->logreg->free_n * 5 < lbuf->logreg->space) {
BUDDY_REQS_INIT(&lbuf->dskreqs_mem, lbuf->membuddy);
BUDDY_REQS_PRI(&lbuf->dskreqs_mem.reqs, FEP_LOG);
AN(buddy_req_extent(&lbuf->dskreqs_mem.reqs,
sizeof *lbuf->dskreqs, 0));
(void) buddy_alloc_async(&lbuf->dskreqs_mem.reqs);
}
if (lbuf->dskreqs == NULL && n > 0)
(void) buddy_alloc_async_wait(&lbuf->dskreqs_mem.reqs);
if (lbuf->dskreqs == NULL &&
lbuf->dskreqs_mem.reqs.magic == BUDDY_REQS_MAGIC &&
buddy_alloc_async_ready(&lbuf->dskreqs_mem.reqs)) {
alloc = buddy_get_ptr_extent(&lbuf->dskreqs_mem.reqs, 0);
buddy_alloc_wait_done(&lbuf->dskreqs_mem.reqs);
AN(alloc.ptr);
assert(alloc.size >= sizeof *lbuf->dskreqs);
dskreqs = alloc.ptr;
for (i = 0; i < 2; i++) {
BUDDY_REQS_INIT(&(*dskreqs)[i], lbuf->dskbuddy);
BUDDY_REQS_PRI(&(*dskreqs)[i].reqs, FEP_LOG);
logbuffer_fill_dskreq(&(*dskreqs)[i].reqs);
}
lbuf->dskreqs = dskreqs;
}
if (n == 0)
return;
reqs = BUDDY_REQS_STK(lbuf->dskbuddy, BUDDY_REQS_MAX);
BUDDY_REQS_PRI(reqs, FEP_LOG);
AN(lbuf->dskreqs);
while (n > 0) {
for (nreq = 0, i = 0;
nreq < BUDDY_REQS_MAX && i < n;
i++) {
if (arr[i].off > 0)
continue;
AN(buddy_req_page(reqs, MIN_FELLOW_BITS, 0));
nreq++;
}
if (nreq == 0)
break;
// XXX should not wait here
r = buddy_alloc_wait(reqs);
assert(r == nreq);
assert(r <= n);
log_blocks_alloc_from_reqs(reqs, &arr, &n);
buddy_alloc_wait_done(reqs);
log_blocks_alloc_from_reqs(
logbuffer_get_dskreqs(lbuf),
&arr, &n);
}
}
static void
......@@ -2356,7 +2459,6 @@ logbuffer_flush(struct fellow_fd *ffd,
CHECK_OBJ_NOTNULL(tune, STVFE_TUNE_MAGIC);
CHECK_OBJ_NOTNULL(lbuf, FELLOW_LOGBUFFER_MAGIC);
/* capability assertions on the logbuffer
* - we later limi by what we actually want to use
*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment