Store new active block offsets to be updated at just the right time

with more than once flush finish, writing a header from an
old flush could race the logbuffer_ref() from a more recent one,
leading to an inconsistent log where a logblock with next_off == 0
became reachable.
parent e23a2bc2
...@@ -262,6 +262,20 @@ struct fellow_fd_ioctx_lease { ...@@ -262,6 +262,20 @@ struct fellow_fd_ioctx_lease {
fellow_ioctx_t *ioctx; fellow_ioctx_t *ioctx;
}; };
enum logoff_where {
LOGOFF_INVAL = 0,
LOGOFF_LOGBLK,
LOGOFF_PENDBLK
};
// saving new offsets to be written at the right time
struct fellow_disk_log_off_todo {
unsigned magic;
#define FELLOW_DISK_LOG_OFF_TODO_MAGIC 0x6f6bd249
enum logoff_where where;
off_t off, alt;
};
#define FF_TRANSITION(ff, from, to) do { \ #define FF_TRANSITION(ff, from, to) do { \
assert((ff)->state == from); \ assert((ff)->state == from); \
(ff)->state = to; \ (ff)->state = to; \
...@@ -308,6 +322,7 @@ struct fellow_logbuffer_ff { ...@@ -308,6 +322,7 @@ struct fellow_logbuffer_ff {
#ifdef DEBUG #ifdef DEBUG
vtim_mono t[2]; vtim_mono t[2];
#endif #endif
struct fellow_disk_log_off_todo todo;
off_t active_off; off_t active_off;
off_t *tail_off; off_t *tail_off;
struct fellow_fd *ffd; struct fellow_fd *ffd;
...@@ -434,6 +449,7 @@ struct fellow_logbuffer { ...@@ -434,6 +449,7 @@ struct fellow_logbuffer {
buddy_t *dskbuddy; buddy_t *dskbuddy;
struct fellow_fd_ioctx_lease fdil; struct fellow_fd_ioctx_lease fdil;
struct fellow_log_region *logreg; struct fellow_log_region *logreg;
struct fellow_disk_log_off_todo todo;
struct fellow_alloc_log_block head; // first logblk struct fellow_alloc_log_block head; // first logblk
struct fellow_alloc_log_block active; // only element or after array struct fellow_alloc_log_block active; // only element or after array
struct fellow_alloc_log_block *arr; struct fellow_alloc_log_block *arr;
...@@ -2435,35 +2451,56 @@ logbuffer_alloc(struct fellow_logbuffer *lbuf) ...@@ -2435,35 +2451,56 @@ logbuffer_alloc(struct fellow_logbuffer *lbuf)
} }
static void static void
logbuffer_ref(struct fellow_fd *ffd, const struct fellow_logbuffer *lbuf) log_off_todo_commit(struct fellow_fd *ffd,
struct fellow_disk_log_off_todo *todo)
{
CHECK_OBJ_NOTNULL(ffd, FELLOW_FD_MAGIC);
CHECK_OBJ_NOTNULL(todo, FELLOW_DISK_LOG_OFF_TODO_MAGIC);
switch(todo->where) {
case LOGOFF_LOGBLK:
ffd->log_info.off.logblk = todo->off;
ffd->log_info.alt.logblk = todo->alt;
break;
case LOGOFF_PENDBLK:
ffd->log_info.off.pendblk = todo->off;
ffd->log_info.alt.pendblk = todo->alt;
break;
default:
WRONG("todo->where");
}
memset(todo, 0, sizeof *todo);
}
static void
logbuffer_ref(struct fellow_logbuffer *lbuf)
{ {
struct fellow_disk_log_off_todo *todo = &lbuf->todo;
const struct fellow_disk_log_block *logblk; const struct fellow_disk_log_block *logblk;
off_t off, alt = 0;
assert(logbuffer_can(lbuf, LBUF_CAN_REF)); assert(logbuffer_can(lbuf, LBUF_CAN_REF));
off = lbuf->active.off; AZ(todo->magic);
assert(todo->where == LOGOFF_INVAL);
INIT_OBJ(todo, FELLOW_DISK_LOG_OFF_TODO_MAGIC);
todo->off = lbuf->active.off;
logblk = lbuf->active.block; logblk = lbuf->active.block;
if (off != 0 && logblk != NULL) { if (todo->off != 0 && logblk != NULL) {
CHECK_OBJ(logblk, FELLOW_DISK_LOG_BLOCK_MAGIC); CHECK_OBJ(logblk, FELLOW_DISK_LOG_BLOCK_MAGIC);
alt = logblk->prev_off; todo->alt = logblk->prev_off;
} }
// not necessarily written if (lbuf->state == LBUF_PEND)
if (lbuf->state == LBUF_PEND) { todo->where = LOGOFF_PENDBLK;
ffd->log_info.off.pendblk = off; else
ffd->log_info.alt.pendblk = alt; todo->where = LOGOFF_LOGBLK;
}
else {
ffd->log_info.off.logblk = off;
ffd->log_info.alt.logblk = alt;
}
} }
// avoid code duplication // avoid code duplication
static unsigned static unsigned
flush_active(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf, flush_active(const struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
unsigned opts) unsigned opts)
{ {
struct fellow_alloc_log_block blk; struct fellow_alloc_log_block blk;
...@@ -2474,7 +2511,7 @@ flush_active(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf, ...@@ -2474,7 +2511,7 @@ flush_active(struct fellow_fd *ffd, struct fellow_logbuffer *lbuf,
AN(lbuf->fdil.ioctx); AN(lbuf->fdil.ioctx);
AN(lbuf->active.off); AN(lbuf->active.off);
if (logbuffer_can(lbuf, LBUF_CAN_REF)) if (logbuffer_can(lbuf, LBUF_CAN_REF))
logbuffer_ref(ffd, lbuf); logbuffer_ref(lbuf);
blk = lbuf->active; blk = lbuf->active;
lbuf->active_off = blk.off; lbuf->active_off = blk.off;
...@@ -2608,7 +2645,7 @@ logbuffer_flush(struct fellow_fd *ffd, ...@@ -2608,7 +2645,7 @@ logbuffer_flush(struct fellow_fd *ffd,
return; return;
logbuffer_wait_flush_fini(lbuf); logbuffer_wait_flush_fini(lbuf);
AZ(lbuf->ff); AZ(lbuf->ff);
logbuffer_ref(ffd, lbuf); logbuffer_ref(lbuf);
logbuffer_flush_finish(ffd, lbuf, doclose, can); logbuffer_flush_finish(ffd, lbuf, doclose, can);
return; return;
} }
...@@ -2947,6 +2984,7 @@ logbuffer_flush_finish(struct fellow_fd *ffd, ...@@ -2947,6 +2984,7 @@ logbuffer_flush_finish(struct fellow_fd *ffd,
TAKE(ff->fdil, lbuf->fdil); TAKE(ff->fdil, lbuf->fdil);
TAKE(ff->head, lbuf->head); TAKE(ff->head, lbuf->head);
TAKE(ff->regions_to_free, lbuf->regions_to_free); TAKE(ff->regions_to_free, lbuf->regions_to_free);
TAKE(ff->todo, lbuf->todo);
FF_TRANSITION(ff, FF_INVAL, FF_SCHEDULED); FF_TRANSITION(ff, FF_INVAL, FF_SCHEDULED);
...@@ -3044,6 +3082,7 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff) ...@@ -3044,6 +3082,7 @@ logbuffer_flush_finish_work_one(struct fellow_logbuffer_ff *ff)
FF_TRANSITION(ff, FF_HEAD, FF_HDR); FF_TRANSITION(ff, FF_HEAD, FF_HDR);
log_off_todo_commit(ffd, &ff->todo);
AZ(fellow_io_write_hdr(ffd)); AZ(fellow_io_write_hdr(ffd));
FF_TRANSITION(ff, FF_HDR, FF_FREE); FF_TRANSITION(ff, FF_HDR, FF_FREE);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment