Commit e9a5539c authored by Nils Goroll's avatar Nils Goroll

Change return value of VSL_Next()-ish functions to enum vsl_status

This should help readability of the code, hopefully.

The API remains unchanged.

Local variables for vsl_status are called 'r', variables holding
a VSLQ_dispatch_f return value are called 'i'
parent 3a9541e5
...@@ -138,6 +138,9 @@ typedef int VSLQ_dispatch_f(struct VSL_data *vsl, ...@@ -138,6 +138,9 @@ typedef int VSLQ_dispatch_f(struct VSL_data *vsl,
* Return value: * Return value:
* 0: OK - continue * 0: OK - continue
* !=0: Makes VSLQ_Dispatch return with this return value immediatly * !=0: Makes VSLQ_Dispatch return with this return value immediatly
*
* Return values of the callback function should be distinct from the
* values of enum vsl_status except for 0
*/ */
typedef void VSL_tagfind_f(int tag, void *priv); typedef void VSL_tagfind_f(int tag, void *priv);
...@@ -301,7 +304,8 @@ void VSL_DeleteCursor(const struct VSL_cursor *c); ...@@ -301,7 +304,8 @@ void VSL_DeleteCursor(const struct VSL_cursor *c);
* Delete the cursor pointed to by c * Delete the cursor pointed to by c
*/ */
int VSL_ResetCursor(const struct VSL_cursor *c); enum vsl_status
VSL_ResetCursor(const struct VSL_cursor *c);
/* /*
* Reset the cursor position to the head, so that the next call to * Reset the cursor position to the head, so that the next call to
* VSL_Next returns the first record. For VSM cursor, it will * VSL_Next returns the first record. For VSM cursor, it will
...@@ -309,7 +313,9 @@ int VSL_ResetCursor(const struct VSL_cursor *c); ...@@ -309,7 +313,9 @@ int VSL_ResetCursor(const struct VSL_cursor *c);
* from the tail. * from the tail.
* *
* Return values: * Return values:
* -1: Operation not supported * - vsl_end == success
* - and see enum vsl_status
*
*/ */
enum vsl_check { enum vsl_check {
...@@ -332,17 +338,21 @@ VSL_Check(const struct VSL_cursor *c, const struct VSLC_ptr *ptr); ...@@ -332,17 +338,21 @@ VSL_Check(const struct VSL_cursor *c, const struct VSLC_ptr *ptr);
* 2: Valid * 2: Valid
*/ */
int VSL_Next(const struct VSL_cursor *c); enum vsl_status {
vsl_e_io = -4, // I/O read error - see errno
vsl_e_overrun = -3, // Overrun
vsl_e_abandon = -2, // Remote abandoned or closed
vsl_e_eof = -1, // End of file
vsl_end = 0, // End of log/cursor
vsl_more = 1 // Cursor points to next log record
};
enum vsl_status
VSL_Next(const struct VSL_cursor *c);
/* /*
* Return raw pointer to next VSL record. * Return raw pointer to next VSL record.
* *
* Return values: * Return values: see enum vsl_status
* 1: Cursor points to next log record
* 0: End of log
* -1: End of file
* -2: Remote abandoned or closed
* -3: Overrun
* -4: I/O read error - see errno
*/ */
int VSL_Match(struct VSL_data *vsl, const struct VSL_cursor *c); int VSL_Match(struct VSL_data *vsl, const struct VSL_cursor *c);
...@@ -521,7 +531,7 @@ int VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv); ...@@ -521,7 +531,7 @@ int VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv);
* Return values: * Return values:
* 1: Call again * 1: Call again
* 0: No more log records available * 0: No more log records available
* !=0: The error code from VSL_Next() or func returned non-zero * !=0: func returned non-zero or enum vsl_status
*/ */
int VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv); int VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv);
......
...@@ -37,8 +37,8 @@ void vsl_vbm_bitset(int bit, void *priv); ...@@ -37,8 +37,8 @@ void vsl_vbm_bitset(int bit, void *priv);
void vsl_vbm_bitclr(int bit, void *priv); void vsl_vbm_bitclr(int bit, void *priv);
typedef void vslc_delete_f(const struct VSL_cursor *); typedef void vslc_delete_f(const struct VSL_cursor *);
typedef int vslc_next_f(const struct VSL_cursor *); typedef enum vsl_status vslc_next_f(const struct VSL_cursor *);
typedef int vslc_reset_f(const struct VSL_cursor *); typedef enum vsl_status vslc_reset_f(const struct VSL_cursor *);
typedef enum vsl_check vslc_check_f(const struct VSL_cursor *, typedef enum vsl_check vslc_check_f(const struct VSL_cursor *,
const struct VSLC_ptr *); const struct VSLC_ptr *);
......
...@@ -115,7 +115,7 @@ vslc_vsm_check(const struct VSL_cursor *cursor, const struct VSLC_ptr *ptr) ...@@ -115,7 +115,7 @@ vslc_vsm_check(const struct VSL_cursor *cursor, const struct VSLC_ptr *ptr)
return (vsl_check_valid); return (vsl_check_valid);
} }
static int static enum vsl_status v_matchproto_(vslc_next_f)
vslc_vsm_next(const struct VSL_cursor *cursor) vslc_vsm_next(const struct VSL_cursor *cursor)
{ {
struct vslc_vsm *c; struct vslc_vsm *c;
...@@ -129,9 +129,9 @@ vslc_vsm_next(const struct VSL_cursor *cursor) ...@@ -129,9 +129,9 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
i = vslc_vsm_check(&c->cursor, &c->next); i = vslc_vsm_check(&c->cursor, &c->next);
if (i < vsl_check_warn) { if (i < vsl_check_warn) {
if (VSM_StillValid(c->vsm, &c->vf) != VSM_valid) if (VSM_StillValid(c->vsm, &c->vf) != VSM_valid)
return (-2); /* VSL abandoned */ return (vsl_e_abandon);
else else
return (-3); /* Overrun */ return (vsl_e_overrun);
} }
t = *(volatile const uint32_t *)c->next.ptr; t = *(volatile const uint32_t *)c->next.ptr;
...@@ -148,10 +148,11 @@ vslc_vsm_next(const struct VSL_cursor *cursor) ...@@ -148,10 +148,11 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
if (t == VSL_ENDMARKER) { if (t == VSL_ENDMARKER) {
if (VSM_StillValid(c->vsm, &c->vf) != VSM_valid) if (VSM_StillValid(c->vsm, &c->vf) != VSM_valid)
return (-2); /* VSL abandoned */ return (vsl_e_abandon);
if (c->options & VSL_COPT_TAILSTOP) if (c->options & VSL_COPT_TAILSTOP)
return (-1); /* EOF */ return (vsl_e_eof);
return (0); /* No new records available */ /* No new records available */
return (vsl_end);
} }
c->cursor.rec = c->next; c->cursor.rec = c->next;
...@@ -174,16 +175,16 @@ vslc_vsm_next(const struct VSL_cursor *cursor) ...@@ -174,16 +175,16 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
assert(c->next.ptr >= c->head->log); assert(c->next.ptr >= c->head->log);
assert(c->next.ptr < c->end); assert(c->next.ptr < c->end);
return (1); return (vsl_more);
} }
} }
static int static enum vsl_status v_matchproto_(vslc_reset_f)
vslc_vsm_reset(const struct VSL_cursor *cursor) vslc_vsm_reset(const struct VSL_cursor *cursor)
{ {
struct vslc_vsm *c; struct vslc_vsm *c;
unsigned u, segment_n; unsigned u, segment_n;
int i; enum vsl_status r;
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VSM_MAGIC); CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VSM_MAGIC);
assert(&c->cursor == cursor); assert(&c->cursor == cursor);
...@@ -204,12 +205,12 @@ vslc_vsm_reset(const struct VSL_cursor *cursor) ...@@ -204,12 +205,12 @@ vslc_vsm_reset(const struct VSL_cursor *cursor)
if (c->head->segment_n - u > 1) { if (c->head->segment_n - u > 1) {
/* Give up if varnishd is moving faster /* Give up if varnishd is moving faster
than us */ than us */
return (-3); /* overrun */ return (vsl_e_overrun);
} }
i = vslc_vsm_next(&c->cursor); r = vslc_vsm_next(&c->cursor);
} while (i == 1); } while (r == vsl_more);
if (i) if (r != vsl_end)
return (i); return (r);
} else { } else {
/* Starting (VSL_SEGMENTS - 3) behind varnishd. This way /* Starting (VSL_SEGMENTS - 3) behind varnishd. This way
* even if varnishd advances segment_n immediately, we'll * even if varnishd advances segment_n immediately, we'll
...@@ -229,7 +230,7 @@ vslc_vsm_reset(const struct VSL_cursor *cursor) ...@@ -229,7 +230,7 @@ vslc_vsm_reset(const struct VSL_cursor *cursor)
} }
assert(c->next.ptr >= c->head->log); assert(c->next.ptr >= c->head->log);
assert(c->next.ptr < c->end); assert(c->next.ptr < c->end);
return (0); return (vsl_end);
} }
static const struct vslc_tbl vslc_vsm_tbl = { static const struct vslc_tbl vslc_vsm_tbl = {
...@@ -246,7 +247,7 @@ VSL_CursorVSM(struct VSL_data *vsl, struct vsm *vsm, unsigned options) ...@@ -246,7 +247,7 @@ VSL_CursorVSM(struct VSL_data *vsl, struct vsm *vsm, unsigned options)
struct vslc_vsm *c; struct vslc_vsm *c;
struct vsm_fantom vf; struct vsm_fantom vf;
struct VSL_head *head; struct VSL_head *head;
int i; enum vsl_status r;
CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC); CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
...@@ -284,9 +285,9 @@ VSL_CursorVSM(struct VSL_data *vsl, struct vsm *vsm, unsigned options) ...@@ -284,9 +285,9 @@ VSL_CursorVSM(struct VSL_data *vsl, struct vsm *vsm, unsigned options)
c->end = c->head->log + c->head->segsize * VSL_SEGMENTS; c->end = c->head->log + c->head->segsize * VSL_SEGMENTS;
assert(c->end <= (const uint32_t *)vf.e); assert(c->end <= (const uint32_t *)vf.e);
i = vslc_vsm_reset(&c->cursor); r = vslc_vsm_reset(&c->cursor);
if (i) { if (r != vsl_end) {
(void)vsl_diag(vsl, "Cursor initialization failure (%d)", i); (void)vsl_diag(vsl, "Cursor initialization failure (%d)", r);
FREE_OBJ(c); FREE_OBJ(c);
return (NULL); return (NULL);
} }
...@@ -337,7 +338,7 @@ vslc_file_readn(int fd, void *buf, size_t n) ...@@ -337,7 +338,7 @@ vslc_file_readn(int fd, void *buf, size_t n)
return (t); return (t);
} }
static int static enum vsl_status v_matchproto_(vslc_next_f)
vslc_file_next(const struct VSL_cursor *cursor) vslc_file_next(const struct VSL_cursor *cursor)
{ {
struct vslc_file *c; struct vslc_file *c;
...@@ -352,9 +353,9 @@ vslc_file_next(const struct VSL_cursor *cursor) ...@@ -352,9 +353,9 @@ vslc_file_next(const struct VSL_cursor *cursor)
assert(c->buflen >= 2); assert(c->buflen >= 2);
i = vslc_file_readn(c->fd, c->buf, VSL_BYTES(2)); i = vslc_file_readn(c->fd, c->buf, VSL_BYTES(2));
if (i < 0) if (i < 0)
return (-4); /* I/O error */ return (vsl_e_io);
if (i == 0) if (i == 0)
return (-1); /* EOF */ return (vsl_e_eof);
assert(i == VSL_BYTES(2)); assert(i == VSL_BYTES(2));
l = 2 + VSL_WORDS(VSL_LEN(c->buf)); l = 2 + VSL_WORDS(VSL_LEN(c->buf));
if (c->buflen < l) { if (c->buflen < l) {
...@@ -367,22 +368,22 @@ vslc_file_next(const struct VSL_cursor *cursor) ...@@ -367,22 +368,22 @@ vslc_file_next(const struct VSL_cursor *cursor)
i = vslc_file_readn(c->fd, c->buf + 2, i = vslc_file_readn(c->fd, c->buf + 2,
VSL_BYTES(l - 2)); VSL_BYTES(l - 2));
if (i < 0) if (i < 0)
return (-4); /* I/O error */ return (vsl_e_io);
if (i == 0) if (i == 0)
return (-1); /* EOF */ return (vsl_e_eof);
assert(i == VSL_BYTES(l - 2)); assert(i == VSL_BYTES(l - 2));
} }
c->cursor.rec.ptr = c->buf; c->cursor.rec.ptr = c->buf;
} while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch); } while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
return (1); return (vsl_more);
} }
static int static enum vsl_status v_matchproto_(vslc_reset_f)
vslc_file_reset(const struct VSL_cursor *cursor) vslc_file_reset(const struct VSL_cursor *cursor)
{ {
(void)cursor; (void)cursor;
/* XXX: Implement me */ /* XXX: Implement me */
return (-1); return (vsl_e_eof);
} }
static const struct vslc_tbl vslc_file_tbl = { static const struct vslc_tbl vslc_file_tbl = {
...@@ -464,18 +465,18 @@ VSL_DeleteCursor(const struct VSL_cursor *cursor) ...@@ -464,18 +465,18 @@ VSL_DeleteCursor(const struct VSL_cursor *cursor)
(tbl->delete)(cursor); (tbl->delete)(cursor);
} }
int enum vsl_status
VSL_ResetCursor(const struct VSL_cursor *cursor) VSL_ResetCursor(const struct VSL_cursor *cursor)
{ {
const struct vslc_tbl *tbl; const struct vslc_tbl *tbl;
CAST_OBJ_NOTNULL(tbl, cursor->priv_tbl, VSLC_TBL_MAGIC); CAST_OBJ_NOTNULL(tbl, cursor->priv_tbl, VSLC_TBL_MAGIC);
if (tbl->reset == NULL) if (tbl->reset == NULL)
return (-1); return (vsl_e_eof);
return ((tbl->reset)(cursor)); return ((tbl->reset)(cursor));
} }
int enum vsl_status
VSL_Next(const struct VSL_cursor *cursor) VSL_Next(const struct VSL_cursor *cursor)
{ {
const struct vslc_tbl *tbl; const struct vslc_tbl *tbl;
......
...@@ -225,7 +225,7 @@ vtx_keycmp(const struct vtx_key *a, const struct vtx_key *b) ...@@ -225,7 +225,7 @@ vtx_keycmp(const struct vtx_key *a, const struct vtx_key *b)
VRB_PROTOTYPE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp) VRB_PROTOTYPE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
VRB_GENERATE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp) VRB_GENERATE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
static int static enum vsl_status v_matchproto_(vslc_next_f)
vslc_raw_next(const struct VSL_cursor *cursor) vslc_raw_next(const struct VSL_cursor *cursor)
{ {
struct vslc_raw *c; struct vslc_raw *c;
...@@ -236,14 +236,14 @@ vslc_raw_next(const struct VSL_cursor *cursor) ...@@ -236,14 +236,14 @@ vslc_raw_next(const struct VSL_cursor *cursor)
AN(c->ptr); AN(c->ptr);
if (c->cursor.rec.ptr == NULL) { if (c->cursor.rec.ptr == NULL) {
c->cursor.rec.ptr = c->ptr; c->cursor.rec.ptr = c->ptr;
return (1); return (vsl_more);
} else { } else {
c->cursor.rec.ptr = NULL; c->cursor.rec.ptr = NULL;
return (0); return (vsl_end);
} }
} }
static int static enum vsl_status v_matchproto_(vslc_reset_f)
vslc_raw_reset(const struct VSL_cursor *cursor) vslc_raw_reset(const struct VSL_cursor *cursor)
{ {
struct vslc_raw *c; struct vslc_raw *c;
...@@ -254,7 +254,7 @@ vslc_raw_reset(const struct VSL_cursor *cursor) ...@@ -254,7 +254,7 @@ vslc_raw_reset(const struct VSL_cursor *cursor)
AN(c->ptr); AN(c->ptr);
c->cursor.rec.ptr = NULL; c->cursor.rec.ptr = NULL;
return (0); return (vsl_end);
} }
static const struct vslc_tbl vslc_raw_tbl = { static const struct vslc_tbl vslc_raw_tbl = {
...@@ -265,7 +265,7 @@ static const struct vslc_tbl vslc_raw_tbl = { ...@@ -265,7 +265,7 @@ static const struct vslc_tbl vslc_raw_tbl = {
.check = NULL, .check = NULL,
}; };
static int static enum vsl_status v_matchproto_(vslc_next_f)
vslc_vtx_next(const struct VSL_cursor *cursor) vslc_vtx_next(const struct VSL_cursor *cursor)
{ {
struct vslc_vtx *c; struct vslc_vtx *c;
...@@ -285,8 +285,7 @@ vslc_vtx_next(const struct VSL_cursor *cursor) ...@@ -285,8 +285,7 @@ vslc_vtx_next(const struct VSL_cursor *cursor)
} else { } else {
assert(c->offset <= c->vtx->len); assert(c->offset <= c->vtx->len);
if (c->offset == c->vtx->len) if (c->offset == c->vtx->len)
/* End of cursor */ return (vsl_end);
return (0);
/* Advance chunk pointer */ /* Advance chunk pointer */
if (c->chunk == NULL) { if (c->chunk == NULL) {
...@@ -313,10 +312,10 @@ vslc_vtx_next(const struct VSL_cursor *cursor) ...@@ -313,10 +312,10 @@ vslc_vtx_next(const struct VSL_cursor *cursor)
} }
} while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch); } while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
return (1); return (vsl_more);
} }
static int static enum vsl_status v_matchproto_(vslc_reset_f)
vslc_vtx_reset(const struct VSL_cursor *cursor) vslc_vtx_reset(const struct VSL_cursor *cursor)
{ {
struct vslc_vtx *c; struct vslc_vtx *c;
...@@ -330,7 +329,7 @@ vslc_vtx_reset(const struct VSL_cursor *cursor) ...@@ -330,7 +329,7 @@ vslc_vtx_reset(const struct VSL_cursor *cursor)
c->offset = 0; c->offset = 0;
c->cursor.rec.ptr = NULL; c->cursor.rec.ptr = NULL;
return (0); return (vsl_end);
} }
static const struct vslc_tbl vslc_vtx_tbl = { static const struct vslc_tbl vslc_vtx_tbl = {
...@@ -513,7 +512,7 @@ vtx_new(struct VSLQ *vslq) ...@@ -513,7 +512,7 @@ vtx_new(struct VSLQ *vslq)
vtx->n_childready = 0; vtx->n_childready = 0;
vtx->n_descend = 0; vtx->n_descend = 0;
vtx->len = 0; vtx->len = 0;
(void)vslc_vtx_reset(&vtx->c.cursor); AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
return (vtx); return (vtx);
} }
...@@ -935,7 +934,7 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func, ...@@ -935,7 +934,7 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
return (0); return (0);
/* Build transaction array */ /* Build transaction array */
(void)vslc_vtx_reset(&vtx->c.cursor); AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
vtxs[0] = vtx; vtxs[0] = vtx;
trans[0].level = 1; trans[0].level = 1;
trans[0].vxid = vtx->key.vxid; trans[0].vxid = vtx->key.vxid;
...@@ -948,7 +947,7 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func, ...@@ -948,7 +947,7 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
while (j < i) { while (j < i) {
VTAILQ_FOREACH(vtx, &vtxs[j]->child, list_child) { VTAILQ_FOREACH(vtx, &vtxs[j]->child, list_child) {
assert(i < n); assert(i < n);
(void)vslc_vtx_reset(&vtx->c.cursor); AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
vtxs[i] = vtx; vtxs[i] = vtx;
if (vtx->reason == VSL_r_restart) if (vtx->reason == VSL_r_restart)
/* Restarts stay at the same level as parent */ /* Restarts stay at the same level as parent */
...@@ -1158,17 +1157,17 @@ VSLQ_SetCursor(struct VSLQ *vslq, struct VSL_cursor **cp) ...@@ -1158,17 +1157,17 @@ VSLQ_SetCursor(struct VSLQ *vslq, struct VSL_cursor **cp)
static int static int
vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv) vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{ {
int i = 1; enum vsl_status r = vsl_more;
int r; int i;
assert(vslq->grouping == VSL_g_raw); assert(vslq->grouping == VSL_g_raw);
assert(vslq->raw.offset <= vslq->raw.len); assert(vslq->raw.offset <= vslq->raw.len);
do { do {
if (vslq->raw.offset == vslq->raw.len) { if (vslq->raw.offset == vslq->raw.len) {
i = VSL_Next(vslq->c); r = VSL_Next(vslq->c);
if (i <= 0) if (r != vsl_more)
return (i); return (r);
AN(vslq->c->rec.ptr); AN(vslq->c->rec.ptr);
vslq->raw.start = vslq->c->rec; vslq->raw.start = vslq->c->rec;
if (VSL_TAG(vslq->c->rec.ptr) == SLT__Batch) if (VSL_TAG(vslq->c->rec.ptr) == SLT__Batch)
...@@ -1188,67 +1187,70 @@ vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv) ...@@ -1188,67 +1187,70 @@ vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr; vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr;
} while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch); } while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch);
assert (r == vsl_more);
if (func == NULL) if (func == NULL)
return (i); return (r);
if (vslq->query != NULL && if (vslq->query != NULL &&
!vslq_runquery(vslq->query, vslq->raw.ptrans)) !vslq_runquery(vslq->query, vslq->raw.ptrans))
return (i);
r = (func)(vslq->vsl, vslq->raw.ptrans, priv);
if (r)
return (r); return (r);
return (i); i = (func)(vslq->vsl, vslq->raw.ptrans, priv);
if (i)
return (i);
return (r);
} }
/* Check the beginning of the shmref list, and buffer refs that are at /* Check the beginning of the shmref list, and buffer refs that are at
* warning level. * warning level.
*
* Returns:
* 0: OK
* -3: Failure
*/ */
static int static enum vsl_status
vslq_shmref_check(struct VSLQ *vslq) vslq_shmref_check(struct VSLQ *vslq)
{ {
struct chunk *chunk; struct chunk *chunk;
int i; enum vsl_check i;
while ((chunk = VTAILQ_FIRST(&vslq->shmrefs)) != NULL) { while ((chunk = VTAILQ_FIRST(&vslq->shmrefs)) != NULL) {
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC); CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
assert(chunk->type == chunk_t_shm); assert(chunk->type == chunk_t_shm);
i = VSL_Check(vslq->c, &chunk->shm.start); i = VSL_Check(vslq->c, &chunk->shm.start);
if (i == 2) switch (i) {
case vsl_check_valid:
/* First on list is OK, refs behind it must also /* First on list is OK, refs behind it must also
be OK */ be OK */
return (0); return (vsl_more);
else if (i == 1) case vsl_check_warn:
/* Warning level. Buffer this chunk */ /* Buffer this chunk */
chunk_shm_to_buf(vslq, chunk); chunk_shm_to_buf(vslq, chunk);
else break;
default:
/* Too late to buffer */ /* Too late to buffer */
return (-3); return (vsl_e_overrun);
}
} }
return (0); return (vsl_more);
} }
/* Process next input record */ /* Process next input record */
static int static enum vsl_status
vslq_next(struct VSLQ *vslq) vslq_next(struct VSLQ *vslq)
{ {
struct VSL_cursor *c; struct VSL_cursor *c;
int i; enum vsl_status r;
enum VSL_tag_e tag; enum VSL_tag_e tag;
ssize_t len; ssize_t len;
unsigned vxid; unsigned vxid;
struct vtx *vtx; struct vtx *vtx;
c = vslq->c; c = vslq->c;
i = VSL_Next(c); r = VSL_Next(c);
if (i != 1) if (r != vsl_more)
return (i); return (r);
assert (r == vsl_more);
tag = (enum VSL_tag_e)VSL_TAG(c->rec.ptr); tag = (enum VSL_tag_e)VSL_TAG(c->rec.ptr);
if (tag == SLT__Batch) { if (tag == SLT__Batch) {
...@@ -1256,7 +1258,7 @@ vslq_next(struct VSLQ *vslq) ...@@ -1256,7 +1258,7 @@ vslq_next(struct VSLQ *vslq)
len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) - len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
c->rec.ptr; c->rec.ptr;
if (len == 0) if (len == 0)
return (i); return (r);
tag = (enum VSL_tag_e)VSL_TAG(VSL_NEXT(c->rec.ptr)); tag = (enum VSL_tag_e)VSL_TAG(VSL_NEXT(c->rec.ptr));
} else { } else {
vxid = VSL_ID(c->rec.ptr); vxid = VSL_ID(c->rec.ptr);
...@@ -1265,7 +1267,7 @@ vslq_next(struct VSLQ *vslq) ...@@ -1265,7 +1267,7 @@ vslq_next(struct VSLQ *vslq)
assert(len > 0); assert(len > 0);
if (vxid == 0) if (vxid == 0)
/* Skip non-transactional records */ /* Skip non-transactional records */
return (i); return (r);
vtx = vtx_lookup(vslq, vxid); vtx = vtx_lookup(vslq, vxid);
if (vtx == NULL && tag == SLT_Begin) { if (vtx == NULL && tag == SLT_Begin) {
...@@ -1277,7 +1279,7 @@ vslq_next(struct VSLQ *vslq) ...@@ -1277,7 +1279,7 @@ vslq_next(struct VSLQ *vslq)
vtx_scan(vslq, vtx); vtx_scan(vslq, vtx);
} }
return (i); return (r);
} }
/* Test query and report any ready transactions */ /* Test query and report any ready transactions */
...@@ -1310,7 +1312,8 @@ vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv) ...@@ -1310,7 +1312,8 @@ vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
int int
VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv) VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{ {
int i, r; enum vsl_status r;
int i;
double now; double now;
struct vtx *vtx; struct vtx *vtx;
...@@ -1318,23 +1321,25 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv) ...@@ -1318,23 +1321,25 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
/* Check that we have a cursor */ /* Check that we have a cursor */
if (vslq->c == NULL) if (vslq->c == NULL)
return (-2); return (vsl_e_abandon);
if (vslq->grouping == VSL_g_raw) if (vslq->grouping == VSL_g_raw)
return (vslq_raw(vslq, func, priv)); return (vslq_raw(vslq, func, priv));
/* Process next cursor input */ /* Process next cursor input */
i = vslq_next(vslq); r = vslq_next(vslq);
if (i <= 0) if (r != vsl_more)
/* At end of log or cursor reports error condition */ /* At end of log or cursor reports error condition */
return (i); return (r);
/* Check shmref list and buffer if necessary */ /* Check shmref list and buffer if necessary */
r = vslq_shmref_check(vslq); r = vslq_shmref_check(vslq);
if (r) if (r != vsl_more)
/* Buffering of shm ref failed */ /* Buffering of shm ref failed */
return (r); return (r);
assert (r == vsl_more);
/* Check vtx timeout */ /* Check vtx timeout */
now = VTIM_mono(); now = VTIM_mono();
while (!VTAILQ_EMPTY(&vslq->incomplete)) { while (!VTAILQ_EMPTY(&vslq->incomplete)) {
...@@ -1353,22 +1358,21 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv) ...@@ -1353,22 +1358,21 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC); CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
vtx_force(vslq, vtx, "store overflow"); vtx_force(vslq, vtx, "store overflow");
AN(vtx->flags & VTX_F_COMPLETE); AN(vtx->flags & VTX_F_COMPLETE);
r = vslq_process_ready(vslq, func, priv); i = vslq_process_ready(vslq, func, priv);
if (r) if (i)
/* User return code */ /* User return code */
return (r); return (i);
} }
/* Check ready list */ /* Check ready list */
if (!VTAILQ_EMPTY(&vslq->ready)) { if (!VTAILQ_EMPTY(&vslq->ready)) {
r = vslq_process_ready(vslq, func, priv); i = vslq_process_ready(vslq, func, priv);
if (r) if (i)
/* User return code */ /* User return code */
return (r); return (i);
} }
/* Return cursor return value */ return (vsl_more);
return (i);
} }
/* Flush any incomplete vtx held on to. Do callbacks if func != NULL */ /* Flush any incomplete vtx held on to. Do callbacks if func != NULL */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment