Commit 19a5542d authored by Martin Blix Grydeland's avatar Martin Blix Grydeland

Rework VSL_Dispatch

Highlights:

* Improved buffer handling. A vtx can now have a mix of buffered and
  shm chunks. And as shm refs are used and later buffered, they can be
  reused on the same vtx. This benefits long running transactions.

* Synth records are now returned by the cursor at the position the
  cursor was at when it was created. This solves a problem when
  writing to a file and a vtx times out, the reading of that file
  would also cause a time out delay.

* Begin records are now strictly honored, and any records before a
  Begin record are now ignored.

* More asserts on the structures

* More comments, should be easier to understand now

* Various other changes
parent 32ffec80
......@@ -46,7 +46,16 @@
#define VTX_CACHE 10
#define VTX_BUFSIZE_MIN 64
#define VTX_CHUNKS 3
#define VTX_SHMCHUNKS 3
static const char * const vsl_t_names[VSL_t__MAX] = {
[VSL_t_unknown] = "unknown",
[VSL_t_sess] = "sess",
[VSL_t_req] = "req",
[VSL_t_esireq] = "esireq",
[VSL_t_bereq] = "bereq",
[VSL_t_raw] = "raw",
};
struct vtx;
......@@ -61,20 +70,41 @@ struct vslc_raw {
const uint32_t *next;
};
struct vtx_chunk {
struct VSLC_ptr start;
ssize_t len;
ssize_t offset;
};
struct vtx_diag {
struct synth {
unsigned magic;
#define VTX_DIAG_MAGIC 0xC654479F
#define SYNTH_MAGIC 0xC654479F
VTAILQ_ENTRY(vtx_diag) list;
uint32_t chunk[2 + 256 / sizeof (uint32_t)];
VTAILQ_ENTRY(synth) list;
size_t offset;
uint32_t data[2 + 64 / sizeof (uint32_t)];
};
VTAILQ_HEAD(synthhead, synth);
enum chunk_t {
chunk_t__unassigned,
chunk_t_shm,
chunk_t_buf,
};
struct chunk {
unsigned magic;
#define CHUNK_MAGIC 0x48DC0194
enum chunk_t type;
union {
struct {
struct VSLC_ptr start;
VTAILQ_ENTRY(chunk) shmref;
} shm;
struct {
uint32_t *data;
size_t space;
} buf;
};
size_t len;
struct vtx *vtx;
VTAILQ_ENTRY(chunk) list;
};
VTAILQ_HEAD(chunkhead, chunk);
struct vslc_vtx {
unsigned magic;
......@@ -83,11 +113,10 @@ struct vslc_vtx {
struct VSL_cursor cursor;
struct vtx *vtx;
struct vtx_diag *diag; /* Current diag message pointer */
unsigned chunk; /* Current chunk */
ssize_t offset; /* Offset of next record */
struct synth *synth;
struct chunk *chunk;
size_t chunkstart;
size_t offset;
};
struct vtx_key {
......@@ -102,12 +131,15 @@ struct vtx {
#define VTX_MAGIC 0xACC21D09
VTAILQ_ENTRY(vtx) list_child;
VTAILQ_ENTRY(vtx) list_incomplete;
VTAILQ_ENTRY(vtx) list_shm;
double t_start;
unsigned flags;
#define VTX_F_COMPLETE 0x1
#define VTX_F_READY 0x2
#define VTX_F_BEGIN 0x1 /* Begin record processed */
#define VTX_F_END 0x2 /* End record processed */
#define VTX_F_COMPLETE 0x4 /* Marked complete. No new children
should be appended */
#define VTX_F_READY 0x8 /* This vtx and all it's children are
complete */
enum VSL_transaction_e type;
......@@ -117,17 +149,15 @@ struct vtx {
unsigned n_childready;
unsigned n_descend;
struct vslc_vtx c;
VTAILQ_HEAD(,vtx_diag) diag;
VTAILQ_HEAD(,synth) synth;
struct vtx_chunk chunk[VTX_CHUNKS];
unsigned n_chunk;
struct chunk shmchunks[VTX_SHMCHUNKS];
struct chunkhead shmchunks_free;
uint32_t *buf;
ssize_t bufsize;
struct chunkhead chunks;
size_t len;
ssize_t len;
struct vslc_vtx c;
};
struct VSLQ {
......@@ -145,14 +175,15 @@ struct VSLQ {
VTAILQ_HEAD(,vtx) incomplete;
unsigned n_incomplete;
VTAILQ_HEAD(,vtx) shmlist;
struct chunkhead shmrefs;
VTAILQ_HEAD(,vtx) cache;
unsigned n_cache;
};
static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
/*lint -esym(534, vtx_diag) */
static int vtx_diag(struct vtx *vtx, const char *fmt, ...);
static int vtx_diag(struct vtx *vtx, const char *msg);
/*lint -esym(534, vtx_diag_tag) */
static int vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr,
const char *reason);
......@@ -217,49 +248,49 @@ static int
vslc_vtx_next(struct VSL_cursor *cursor)
{
struct vslc_vtx *c;
struct vtx_chunk *chunk;
const uint32_t *ptr;
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
assert(&c->cursor == cursor);
CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
if (c->diag == NULL && VTAILQ_FIRST(&c->vtx->diag) != NULL) {
/* Send first diag msg */
c->diag = VTAILQ_FIRST(&c->vtx->diag);
c->cursor.rec.ptr = c->diag->chunk;
return (1);
} else if (c->diag != NULL && VTAILQ_NEXT(c->diag, list) != NULL) {
/* Send next diag msg */
c->diag = VTAILQ_NEXT(c->diag, list);
c->cursor.rec.ptr = c->diag->chunk;
return (1);
}
assert (c->offset <= c->vtx->len);
if (c->offset == c->vtx->len)
return (0);
if (c->vtx->n_chunk == 0) {
/* Buffer */
AN(c->vtx->buf);
assert(c->offset < c->vtx->bufsize);
c->cursor.rec.ptr = c->vtx->buf + c->offset;
c->offset += VSL_NEXT(c->cursor.rec.ptr) - c->cursor.rec.ptr;
return (1);
}
/* Shmptr chunks */
assert(c->chunk < c->vtx->n_chunk);
chunk = &c->vtx->chunk[c->chunk];
assert(c->offset >= chunk->offset);
assert(c->offset <= chunk->offset + chunk->len);
if (c->offset == chunk->offset + chunk->len) {
c->chunk++;
chunk = &c->vtx->chunk[c->chunk];
}
AN(chunk->start.ptr);
c->cursor.rec.ptr = chunk->start.ptr + c->offset - chunk->offset;
c->offset += VSL_NEXT(c->cursor.rec.ptr) - c->cursor.rec.ptr;
do {
CHECK_OBJ_ORNULL(c->synth, SYNTH_MAGIC);
if (c->synth != NULL && c->synth->offset == c->offset) {
/* We're at the offset of the next synth record,
point to it and advance the pointer */
c->cursor.rec.ptr = c->synth->data;
c->synth = VTAILQ_NEXT(c->synth, list);
} else {
assert(c->offset <= c->vtx->len);
if (c->offset == c->vtx->len)
/* End of cursor */
return (0);
/* Advance chunk pointer */
if (c->chunk == NULL) {
c->chunk = VTAILQ_FIRST(&c->vtx->chunks);
c->chunkstart = 0;
}
CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
while (c->offset >= c->chunkstart + c->chunk->len) {
c->chunkstart += c->chunk->len;
c->chunk = VTAILQ_NEXT(c->chunk, list);
CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
}
/* Point to the next stored record */
if (c->chunk->type == chunk_t_shm)
ptr = c->chunk->shm.start.ptr;
else {
assert(c->chunk->type == chunk_t_buf);
ptr = c->chunk->buf.data;
}
c->cursor.rec.ptr = ptr + c->offset - c->chunkstart;
c->offset += VSL_NEXT(c->cursor.rec.ptr) -
c->cursor.rec.ptr;
}
} while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
return (1);
}
......@@ -271,8 +302,10 @@ vslc_vtx_reset(struct VSL_cursor *cursor)
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
assert(&c->cursor == cursor);
c->diag = NULL;
c->chunk = 0;
CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
c->synth = VTAILQ_FIRST(&c->vtx->synth);
c->chunk = NULL;
c->chunkstart = 0;
c->offset = 0;
c->cursor.rec.ptr = NULL;
......@@ -288,10 +321,136 @@ static const struct vslc_tbl vslc_vtx_tbl = {
.check = NULL,
};
/* Create a buf chunk */
static struct chunk *
chunk_newbuf(struct vtx *vtx, const uint32_t *ptr, size_t len)
{
struct chunk *chunk;
ALLOC_OBJ(chunk, CHUNK_MAGIC);
chunk->type = chunk_t_buf;
chunk->vtx = vtx;
chunk->buf.space = VTX_BUFSIZE_MIN;
while (chunk->buf.space < len)
chunk->buf.space *= 2;
chunk->buf.data = malloc(sizeof (uint32_t) * chunk->buf.space);
AN(chunk->buf.data);
memcpy(chunk->buf.data, ptr, sizeof (uint32_t) * len);
chunk->len = len;
return (chunk);
}
/* Free a buf chunk */
static void
chunk_freebuf(struct chunk **pchunk)
{
CHECK_OBJ_NOTNULL(*pchunk, CHUNK_MAGIC);
assert((*pchunk)->type == chunk_t_buf);
free((*pchunk)->buf.data);
FREE_OBJ(*pchunk);
*pchunk = NULL;
}
/* Append a set of records to a chunk */
static void
chunk_appendbuf(struct chunk *chunk, const uint32_t *ptr, size_t len)
{
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
assert(chunk->type == chunk_t_buf);
if (chunk->buf.space < chunk->len + len) {
while (chunk->buf.space < chunk->len + len)
chunk->buf.space *= 2;
chunk->buf.data = realloc(chunk->buf.data,
sizeof (uint32_t) * chunk->buf.space);
}
memcpy(chunk->buf.data + chunk->len, ptr, sizeof (uint32_t) * len);
chunk->len += len;
}
/* Transform a shm chunk to a buf chunk */
static void
chunk_shm_to_buf(struct VSLQ *vslq, struct chunk *chunk)
{
struct vtx *vtx;
struct chunk *buf;
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
assert(chunk->type == chunk_t_shm);
vtx = chunk->vtx;
CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
buf = VTAILQ_PREV(chunk, chunkhead, list);
if (buf != NULL && buf->type == chunk_t_buf)
/* Previous is a buf chunk, append to it */
chunk_appendbuf(buf, chunk->shm.start.ptr, chunk->len);
else {
/* Create a new buf chunk and insert it before this */
buf = chunk_newbuf(vtx, chunk->shm.start.ptr, chunk->len);
AN(buf);
VTAILQ_INSERT_BEFORE(chunk, buf, list);
}
/* Reset cursor chunk pointer, vslc_vtx_next will set it correctly */
vtx->c.chunk = NULL;
/* Remove from the shmref list and vtx, and put chunk back
on the free list */
VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
VTAILQ_REMOVE(&vtx->chunks, chunk, list);
VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
}
/* Append a set of records to a vtx structure */
static void
vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
size_t len)
{
struct chunk *chunk;
AN(vtx);
if (len == 0)
return;
AN(start);
if (VSL_Check(vslq->c, start) == 2 &&
!VTAILQ_EMPTY(&vtx->shmchunks_free)) {
/* Shmref it */
chunk = VTAILQ_FIRST(&vtx->shmchunks_free);
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
assert(chunk->type == chunk_t_shm);
assert(chunk->vtx == vtx);
VTAILQ_REMOVE(&vtx->shmchunks_free, chunk, list);
chunk->shm.start = *start;
chunk->len = len;
VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
/* Append to shmref list */
VTAILQ_INSERT_TAIL(&vslq->shmrefs, chunk, shm.shmref);
} else {
/* Buffer it */
chunk = VTAILQ_LAST(&vtx->chunks, chunkhead);
CHECK_OBJ_ORNULL(chunk, CHUNK_MAGIC);
if (chunk != NULL && chunk->type == chunk_t_buf) {
/* Tail is a buf chunk, append to that */
chunk_appendbuf(chunk, start->ptr, len);
} else {
/* Append new buf chunk */
chunk = chunk_newbuf(vtx, start->ptr, len);
AN(chunk);
VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
}
}
vtx->len += len;
}
/* Allocate a new vtx structure */
static struct vtx *
vtx_new(struct VSLQ *vslq)
{
struct vtx *vtx;
int i;
AN(vslq);
if (vslq->n_cache) {
......@@ -302,52 +461,48 @@ vtx_new(struct VSLQ *vslq)
} else {
ALLOC_OBJ(vtx, VTX_MAGIC);
AN(vtx);
VTAILQ_INIT(&vtx->child);
VTAILQ_INIT(&vtx->shmchunks_free);
for (i = 0; i < VTX_SHMCHUNKS; i++) {
vtx->shmchunks[i].magic = CHUNK_MAGIC;
vtx->shmchunks[i].type = chunk_t_shm;
vtx->shmchunks[i].vtx = vtx;
VTAILQ_INSERT_TAIL(&vtx->shmchunks_free,
&vtx->shmchunks[i], list);
}
VTAILQ_INIT(&vtx->chunks);
VTAILQ_INIT(&vtx->synth);
vtx->c.magic = VSLC_VTX_MAGIC;
vtx->c.vtx = vtx;
vtx->c.cursor.priv_tbl = &vslc_vtx_tbl;
vtx->c.cursor.priv_data = &vtx->c;
}
CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
vtx->key.vxid = 0;
vtx->t_start = VTIM_mono();
vtx->flags = 0;
vtx->type = VSL_t_unknown;
vtx->parent = NULL;
VTAILQ_INIT(&vtx->child);
vtx->n_child = 0;
vtx->n_childready = 0;
vtx->n_descend = 0;
VTAILQ_INIT(&vtx->diag);
memset(vtx->chunk, 0, sizeof vtx->chunk);
vtx->n_chunk = 0;
vtx->len = 0;
(void)vslc_vtx_reset(&vtx->c.cursor);
VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
vslq->n_incomplete++;
return (vtx);
}
static void
vtx_free(struct vtx **pvtx)
{
struct vtx *vtx;
AN(pvtx);
vtx = *pvtx;
*pvtx = NULL;
free(vtx->buf);
FREE_OBJ(vtx);
}
/* Disuse a vtx and all it's children, freeing any resources held. Free or
cache the vtx for later use */
static void
vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
{
struct vtx *vtx;
struct vtx *child;
struct vtx_diag *diag;
struct synth *synth;
struct chunk *chunk;
AN(vslq);
AN(pvtx);
......@@ -374,177 +529,161 @@ vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
}
AZ(vtx->n_child);
AZ(vtx->n_descend);
vtx->n_childready = 0;
AN(VRB_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
vtx->key.vxid = 0;
vtx->flags = 0;
while (!VTAILQ_EMPTY(&vtx->diag)) {
diag = VTAILQ_FIRST(&vtx->diag);
VTAILQ_REMOVE(&vtx->diag, diag, list);
FREE_OBJ(diag);
while (!VTAILQ_EMPTY(&vtx->synth)) {
synth = VTAILQ_FIRST(&vtx->synth);
CHECK_OBJ_NOTNULL(synth, SYNTH_MAGIC);
VTAILQ_REMOVE(&vtx->synth, synth, list);
FREE_OBJ(synth);
}
if (vtx->n_chunk)
VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
while (!VTAILQ_EMPTY(&vtx->chunks)) {
chunk = VTAILQ_FIRST(&vtx->chunks);
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
VTAILQ_REMOVE(&vtx->chunks, chunk, list);
if (chunk->type == chunk_t_shm) {
VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
} else {
assert(chunk->type == chunk_t_buf);
chunk_freebuf(&chunk);
AZ(chunk);
}
}
vtx->len = 0;
if (vslq->n_cache < VTX_CACHE) {
VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
vslq->n_cache++;
} else {
vtx_free(&vtx);
AZ(vtx);
FREE_OBJ(vtx);
vtx = NULL;
}
}
/* Lookup a vtx by vxid from the managed list */
static struct vtx *
vtx_lori(struct VSLQ *vslq, unsigned vxid)
vtx_lookup(struct VSLQ *vslq, unsigned vxid)
{
struct vtx *vtx;
struct vtx_key lkey, *key;
struct vtx *vtx;
AN(vslq);
lkey.vxid = vxid;
key = VRB_FIND(vtx_tree, &vslq->tree, &lkey);
if (key != NULL) {
CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
return (vtx);
}
vtx = vtx_new(vslq);
AN(vtx);
vtx->key.vxid = vxid;
AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
if (key == NULL)
return (NULL);
CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
return (vtx);
}
static void
vtx_set_bufsize(struct vtx *vtx, ssize_t len)
{
AN(vtx);
assert(len >= 0);
if (vtx->bufsize >= len)
return;
if (vtx->bufsize == 0)
vtx->bufsize = VTX_BUFSIZE_MIN;
while (vtx->bufsize < len)
vtx->bufsize *= 2;
vtx->buf = realloc(vtx->buf, sizeof (uint32_t) * vtx->bufsize);
AN(vtx->buf);
}
static void
vtx_buffer(struct VSLQ *vslq, struct vtx *vtx)
{
int i;
AN(vtx->n_chunk);
AN(vtx->len);
vtx_set_bufsize(vtx, vtx->len);
AN(vtx->buf);
assert(vtx->bufsize >= vtx->len);
for (i = 0; i < vtx->n_chunk; i++)
memcpy(vtx->buf + vtx->chunk[i].offset, vtx->chunk[i].start.ptr,
sizeof (uint32_t) * vtx->chunk[i].len);
memset(vtx->chunk, 0, sizeof vtx->chunk);
VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
vtx->n_chunk = 0;
}
static void
vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
ssize_t len, int copy)
/* Insert a new vtx into the managed list */
static struct vtx *
vtx_add(struct VSLQ *vslq, unsigned vxid)
{
struct vtx *vtx;
AN(vslq);
vtx = vtx_new(vslq);
AN(vtx);
if (len == 0)
return;
AN(start);
if (vtx->len > 0 && vtx->n_chunk == 0)
/* Can't mix buffer and shmptr */
copy = 1;
if (!copy && vtx->n_chunk < VTX_CHUNKS) {
/* Add shmptr chunk */
AZ(vtx->chunk[vtx->n_chunk].len);
vtx->chunk[vtx->n_chunk].start = *start;
vtx->chunk[vtx->n_chunk].len = len;
vtx->chunk[vtx->n_chunk].offset = vtx->len;
vtx->len += len;
if (vtx->n_chunk == 0)
VTAILQ_INSERT_TAIL(&vslq->shmlist, vtx, list_shm);
vtx->n_chunk++;
return;
}
/* Append to buffer */
vtx_set_bufsize(vtx, vtx->len + len);
if (vtx->n_chunk)
vtx_buffer(vslq, vtx);
AZ(vtx->n_chunk);
AN(vtx->buf);
memcpy(vtx->buf + vtx->len, start->ptr, sizeof (uint32_t) * len);
vtx->len += len;
vtx->key.vxid = vxid;
AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
vslq->n_incomplete++;
return (vtx);
}
/* Mark a vtx complete, update child counters and if possible push it or
it's top parent to the ready state */
static struct vtx *
vtx_check_ready(struct VSLQ *vslq, struct vtx *vtx)
vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
{
struct vtx *ready;
AN(vslq);
AN(vtx->flags & VTX_F_COMPLETE);
AZ(vtx->flags & VTX_F_READY);
AN(vtx->flags & VTX_F_END);
AZ(vtx->flags & VTX_F_COMPLETE);
if (vtx->type == VSL_t_unknown)
vtx_diag(vtx, "vtx of unknown type marked complete");
ready = vtx;
vtx->flags |= VTX_F_COMPLETE;
VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
AN(vslq->n_incomplete);
vslq->n_incomplete--;
while (1) {
if (ready->flags & VTX_F_COMPLETE &&
ready->n_child == ready->n_childready)
ready->flags |= VTX_F_READY;
AZ(vtx->flags & VTX_F_READY);
if (vtx->flags & VTX_F_COMPLETE &&
vtx->n_child == vtx->n_childready)
vtx->flags |= VTX_F_READY;
else
break;
if (ready->parent == NULL)
break;
ready = ready->parent;
ready->n_childready++;
assert(ready->n_child >= ready->n_childready);
return (NULL);
if (vtx->parent == NULL) {
/* Top level vtx ready */
return (vtx);
}
vtx = vtx->parent;
vtx->n_childready++;
assert(vtx->n_child >= vtx->n_childready);
}
if (ready->flags & VTX_F_READY && ready->parent == NULL)
/* Top level vtx ready */
return (ready);
if (vtx->flags & VTX_F_READY && vtx->parent == NULL)
return (vtx);
return (NULL);
}
/* Add a child to a parent, and update child counters */
static void
vtx_set_parent(struct vtx *parent, struct vtx *child)
{
CHECK_OBJ_NOTNULL(parent, VTX_MAGIC);
CHECK_OBJ_NOTNULL(child, VTX_MAGIC);
AZ(parent->flags & VTX_F_COMPLETE);
AZ(child->flags & VTX_F_COMPLETE);
AZ(child->parent);
child->parent = parent;
VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
parent->n_child++;
do
parent->n_descend += 1 + child->n_descend;
while ((parent = parent->parent));
}
/* Parse a begin or link record. Returns the number of elements that was
successfully parsed. */
static int
vtx_parsetag_bl(const char *str, enum VSL_transaction_e *ptype,
vtx_parse_beginlink(const char *str, enum VSL_transaction_e *ptype,
unsigned *pvxid)
{
char buf[7];
char buf[8];
unsigned vxid;
int i;
int i, j;
enum VSL_transaction_e type = VSL_t_unknown;
AN(str);
i = sscanf(str, "%6s %u", buf, &vxid);
i = sscanf(str, "%7s %u", buf, &vxid);
if (i < 1)
return (-1);
if (!strcmp(buf, "sess"))
type = VSL_t_sess;
else if (!strcmp(buf, "req"))
type = VSL_t_req;
else if (!strcmp(buf, "esireq"))
type = VSL_t_esireq;
else if (!strcmp(buf, "bereq"))
type = VSL_t_bereq;
else
for (j = 0; j < VSL_t__MAX; j++)
if (!strcmp(buf, vsl_t_names[j]))
break;
switch (j) {
case VSL_t_sess:
case VSL_t_req:
case VSL_t_esireq:
case VSL_t_bereq:
/* Valid types */
type = j;
break;
default:
return (-1);
}
if (i == 1)
vxid = 0;
if (ptype)
......@@ -554,23 +693,9 @@ vtx_parsetag_bl(const char *str, enum VSL_transaction_e *ptype,
return (i);
}
static void
vtx_set_parent(struct vtx *parent, struct vtx *child)
{
AN(parent);
AN(child);
AZ(child->parent);
child->parent = parent;
VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
parent->n_child++;
do
parent->n_descend += 1 + child->n_descend;
while ((parent = parent->parent));
}
/* Parse and process a begin record */
static int
vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
vtx_scan_begin(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
{
int i;
enum VSL_transaction_e type;
......@@ -579,17 +704,19 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
assert(VSL_TAG(ptr) == SLT_Begin);
if (vtx->flags & VTX_F_READY)
return (vtx_diag_tag(vtx, ptr, "link too late"));
AZ(vtx->flags & VTX_F_READY);
i = vtx_parsetag_bl(VSL_CDATA(ptr), &type, &p_vxid);
i = vtx_parse_beginlink(VSL_CDATA(ptr), &type, &p_vxid);
if (i < 1)
return (vtx_diag_tag(vtx, ptr, "parse error"));
/* Check/set vtx type */
assert(type != VSL_t_unknown);
if (vtx->type != VSL_t_unknown && vtx->type != type)
if (vtx->type != VSL_t_unknown && vtx->type != type) {
/* Type not matching the one previously set by a link
record */
return (vtx_diag_tag(vtx, ptr, "type mismatch"));
}
vtx->type = type;
if (i == 1 || p_vxid == 0)
......@@ -600,25 +727,38 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_req)
return (0); /* No links */
/* Lookup and check parent vtx */
p_vtx = vtx_lori(vslq, p_vxid);
AN(p_vtx);
if (vtx->parent == p_vtx)
/* Link already exists */
if (vtx->parent != NULL) {
if (vtx->parent->key.vxid != p_vxid) {
/* This vtx already belongs to a different
parent */
return (vtx_diag_tag(vtx, ptr, "link mismatch"));
} else
/* Link already exists */
return (0);
}
p_vtx = vtx_lookup(vslq, p_vxid);
if (p_vtx == NULL) {
/* Not seen parent yet. Insert it and create link. */
p_vtx = vtx_add(vslq, p_vxid);
AN(p_vtx);
vtx_set_parent(p_vtx, vtx);
return (0);
}
if (vtx->parent != NULL)
return (vtx_diag_tag(vtx, ptr, "duplicate link"));
if (p_vtx->flags & VTX_F_READY)
CHECK_OBJ_NOTNULL(p_vtx, VTX_MAGIC);
if (p_vtx->flags & VTX_F_COMPLETE)
return (vtx_diag_tag(vtx, ptr, "link too late"));
/* Create link */
vtx_set_parent(p_vtx, vtx);
return (0);
}
/* Parse and process a link record */
static int
vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
{
int i;
enum VSL_transaction_e c_type;
......@@ -627,13 +767,13 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
assert(VSL_TAG(ptr) == SLT_Link);
if (vtx->flags & VTX_F_READY)
return (vtx_diag_tag(vtx, ptr, "link too late"));
AZ(vtx->flags & VTX_F_READY);
i = vtx_parsetag_bl(VSL_CDATA(ptr), &c_type, &c_vxid);
i = vtx_parse_beginlink(VSL_CDATA(ptr), &c_type, &c_vxid);
if (i < 2)
return (vtx_diag_tag(vtx, ptr, "parse error"));
assert(i == 2);
assert(c_type != VSL_t_unknown);
if (vslq->grouping == VSL_g_vxid)
return (0); /* No links */
......@@ -641,88 +781,92 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
return (0); /* No links */
/* Lookup and check child vtx */
c_vtx = vtx_lori(vslq, c_vxid);
AN(c_vtx);
c_vtx = vtx_lookup(vslq, c_vxid);
if (c_vtx == NULL) {
/* Child not seen before. Insert it and create link */
c_vtx = vtx_add(vslq, c_vxid);
AN(c_vtx);
c_vtx->type = c_type;
vtx_set_parent(vtx, c_vtx);
return (0);
}
CHECK_OBJ_NOTNULL(c_vtx, VTX_MAGIC);
if (c_vtx->parent == vtx)
/* Link already exists */
return (0);
if (c_vtx->parent != NULL)
if (c_vtx->parent != vtx)
return (vtx_diag_tag(vtx, ptr, "duplicate link"));
if (c_vtx->flags & VTX_F_READY)
if (c_vtx->flags & VTX_F_COMPLETE)
return (vtx_diag_tag(vtx, ptr, "link too late"));
if (c_vtx->type != VSL_t_unknown && c_vtx->type != c_type)
return (vtx_diag_tag(vtx, ptr, "type mismatch"));
c_vtx->type = c_type;
c_vtx->type = c_type;
vtx_set_parent(vtx, c_vtx);
return (0);
}
/* Scan the records of a vtx, performing processing actions on specific
records */
static struct vtx *
vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
{
const uint32_t *ptr;
enum VSL_tag_e tag;
struct vtx *ret = NULL;
if (vtx->flags & VTX_F_END)
return (NULL);
while (vslc_vtx_next(&vtx->c.cursor) == 1) {
ptr = vtx->c.cursor.rec.ptr;
tag = VSL_TAG(ptr);
if (tag == SLT__Batch || tag == SLT_VSL)
continue;
if (vtx->flags & VTX_F_COMPLETE) {
vtx_diag_tag(vtx, ptr, "late log rec");
continue;
}
if (vtx->type == VSL_t_unknown && tag != SLT_Begin)
vtx_diag_tag(vtx, ptr, "early log rec");
assert(tag != SLT__Batch);
switch (tag) {
case SLT_Begin:
(void)vtx_scan_begintag(vslq, vtx, ptr);
(void)vtx_scan_begin(vslq, vtx, ptr);
vtx->flags |= VTX_F_BEGIN;
break;
case SLT_Link:
(void)vtx_scan_linktag(vslq, vtx, ptr);
(void)vtx_scan_link(vslq, vtx, ptr);
break;
case SLT_End:
AZ(vtx->flags & VTX_F_COMPLETE);
AZ(ret);
VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
vtx->flags |= VTX_F_COMPLETE;
AN(vslq->n_incomplete);
vslq->n_incomplete--;
ret = vtx_check_ready(vslq, vtx);
break;
vtx->flags |= VTX_F_END;
return (vtx_mark_complete(vslq, vtx));
default:
break;
}
}
return (ret);
return (NULL);
}
/* Force a vtx into complete status by synthing the necessary outstanding
records */
static struct vtx *
vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
{
struct vtx *ret;
AZ(vtx->flags & VTX_F_COMPLETE);
AZ(vtx->flags & VTX_F_READY);
if (!(vtx->flags & VTX_F_BEGIN))
vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
vsl_t_names[vtx->type], vtx->key.vxid);
vtx_diag(vtx, reason);
VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
vtx->flags |= VTX_F_COMPLETE;
AN(vslq->n_incomplete);
vslq->n_incomplete--;
return (vtx_check_ready(vslq, vtx));
if (!(vtx->flags & VTX_F_END))
vtx_synth_rec(vtx, SLT_End, "synth");
ret = vtx_scan(vslq, vtx);
AN(vtx->flags & VTX_F_COMPLETE);
return (ret);
}
/* Build transaction array, do the query and callback. Returns 0 or the
return value from func */
static int
vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
void *priv)
......@@ -735,6 +879,7 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
AN(vslq);
CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
AN(vtx->flags & VTX_F_READY);
if (func == NULL)
return (0);
......@@ -784,19 +929,21 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
return ((func)(vslq->vsl, ptrans, priv));
}
static int
vtx_diag(struct vtx *vtx, const char *fmt, ...)
/* Create a synthetic log record. The record will be inserted at the
current cursor offset */
static void
vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...)
{
struct vtx_diag *diag;
struct synth *synth, *it;
va_list ap;
char *buf;
int l, buflen;
ALLOC_OBJ(diag, VTX_DIAG_MAGIC);
AN(diag);
ALLOC_OBJ(synth, SYNTH_MAGIC);
AN(synth);
buf = (char *)&diag->chunk[2];
buflen = sizeof (diag->chunk) - 2 * sizeof (uint32_t);
buf = (char *)&synth->data[2];
buflen = sizeof (synth->data) - 2 * sizeof (uint32_t);
va_start(ap, fmt);
l = vsnprintf(buf, buflen, fmt, ap);
assert(l >= 0);
......@@ -804,29 +951,56 @@ vtx_diag(struct vtx *vtx, const char *fmt, ...)
if (l > buflen - 1)
l = buflen - 1;
buf[l++] = '\0'; /* NUL-terminated */
diag->chunk[1] = vtx->key.vxid;
synth->data[1] = vtx->key.vxid;
switch (vtx->type) {
case VSL_t_req:
case VSL_t_esireq:
diag->chunk[1] |= VSL_CLIENTMARKER;
synth->data[1] |= VSL_CLIENTMARKER;
break;
case VSL_t_bereq:
diag->chunk[1] |= VSL_BACKENDMARKER;
synth->data[1] |= VSL_BACKENDMARKER;
break;
default:
break;
}
diag->chunk[0] = ((((unsigned)SLT_VSL & 0xff) << 24) | l);
VTAILQ_INSERT_TAIL(&vtx->diag, diag, list);
synth->data[0] = (((tag & 0xff) << 24) | l);
synth->offset = vtx->c.offset;
VTAILQ_FOREACH_REVERSE(it, &vtx->synth, synthhead, list) {
/* Make sure the synth list is sorted on offset */
CHECK_OBJ_NOTNULL(it, SYNTH_MAGIC);
if (synth->offset >= it->offset)
break;
}
if (it != NULL)
VTAILQ_INSERT_AFTER(&vtx->synth, it, synth, list);
else
VTAILQ_INSERT_HEAD(&vtx->synth, synth, list);
/* Update cursor */
CHECK_OBJ_ORNULL(vtx->c.synth, SYNTH_MAGIC);
if (vtx->c.synth == NULL || vtx->c.synth->offset > synth->offset)
vtx->c.synth = synth;
}
/* Add a diagnostic SLT_VSL synth record to the vtx. */
static int
vtx_diag(struct vtx *vtx, const char *msg)
{
vtx_synth_rec(vtx, SLT_VSL, msg);
return (-1);
}
/* Add a SLT_VSL diag synth record to the vtx. Takes an offending record
that will be included in the log record */
static int
vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr, const char *reason)
{
return (vtx_diag(vtx, "%s (%s: %.*s)", reason,
VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr)));
vtx_synth_rec(vtx, SLT_VSL, "%s (%s: %.*s)", reason,
VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr));
return (-1);
}
struct VSLQ *
......@@ -858,7 +1032,7 @@ VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
vslq->query = query;
VRB_INIT(&vslq->tree);
VTAILQ_INIT(&vslq->incomplete);
VTAILQ_INIT(&vslq->shmlist);
VTAILQ_INIT(&vslq->shmrefs);
VTAILQ_INIT(&vslq->cache);
return (vslq);
......@@ -889,13 +1063,14 @@ VSLQ_Delete(struct VSLQ **pvslq)
vtx = VTAILQ_FIRST(&vslq->cache);
VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
vslq->n_cache--;
vtx_free(&vtx);
AZ(vtx);
FREE_OBJ(vtx);
}
FREE_OBJ(vslq);
}
/* Regard each log line as a single transaction, feed it through the query
and do the callback */
static int
vslq_raw(const struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{
......@@ -944,11 +1119,45 @@ vslq_raw(const struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
return (i);
}
/* Check the beginning of the shmref list, and buffer refs that are at
* warning level.
*
* Returns:
* 0: OK
* -3: Failure
*/
static int
vslq_shmref_check(struct VSLQ *vslq)
{
struct chunk *chunk;
int i;
while ((chunk = VTAILQ_FIRST(&vslq->shmrefs))) {
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
assert(chunk->type == chunk_t_shm);
i = VSL_Check(vslq->c, &chunk->shm.start);
if (i == 2)
/* First on list is OK, refs behind it must also
be OK */
return (0);
else if (i == 1)
/* Warning level. Buffer this chunk */
chunk_shm_to_buf(vslq, chunk);
else
/* Too late to buffer */
return (-3);
}
return (0);
}
/* Process the input cursor, calling the callback function on matching
transaction sets */
int
VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{
struct VSL_cursor *c;
int i;
int i, batch;
enum VSL_tag_e tag;
ssize_t len;
unsigned vxid;
......@@ -962,39 +1171,45 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
c = vslq->c;
while (1) {
/* Check shmlist and buffer on warning */
while ((vtx = VTAILQ_FIRST(&vslq->shmlist))) {
AN(vtx->n_chunk);
i = VSL_Check(c, &vtx->chunk[0].start);
if (i == 2)
break;
else if (i == 1)
vtx_buffer(vslq, vtx);
else
/* Too late */
return (-3);
}
/* Check shmref list */
i = vslq_shmref_check(vslq);
if (i)
break;
i = VSL_Next(c);
if (i != 1)
break;
tag = VSL_TAG(c->rec.ptr);
if (tag == SLT__Batch) {
batch = 1;
vxid = VSL_BATCHID(c->rec.ptr);
len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
c->rec.ptr;
if (len == 0)
continue;
tag = VSL_TAG(VSL_NEXT(c->rec.ptr));
} else {
batch = 0;
vxid = VSL_ID(c->rec.ptr);
len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
}
assert(len > 0);
if (vxid == 0)
/* Skip non-transactional records */
continue;
vtx = vtx_lori(vslq, vxid);
AN(vtx);
vtx_append(vslq, vtx, &c->rec, len, VSL_Check(c, &c->rec) != 2);
if (tag == SLT__Batch)
vtx = vtx_lookup(vslq, vxid);
if (vtx == NULL && tag == SLT_Begin) {
vtx = vtx_add(vslq, vxid);
AN(vtx);
}
if (vtx != NULL) {
vtx_append(vslq, vtx, &c->rec, len);
vtx = vtx_scan(vslq, vtx);
}
if (batch)
AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
vtx = vtx_scan(vslq, vtx);
if (vtx) {
AN(vtx->flags & VTX_F_READY);
i = vslq_callback(vslq, vtx, func, priv);
......@@ -1042,6 +1257,8 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
return (i);
}
/* Flush incomplete any incomplete vtx held on to. Do callbacks if func !=
NULL */
int
VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment