Commit cef9da17 authored by Martin Blix Grydeland's avatar Martin Blix Grydeland

Make Dispatch use VSL_Check to handle SHM ptrs.

This allows Dispatch to run completely in SHM with no copying for most
cases, also when transactions come in multiple batches.
parent 63619877
......@@ -89,6 +89,8 @@ struct VSL_head {
#define VSL_BACKEND(ptr) (((ptr)[1]) & VSL_BACKENDMARKER)
#define VSL_DATA(ptr) ((char*)((ptr)+2))
#define VSL_CDATA(ptr) ((const char*)((ptr)+2))
#define VSL_BATCHLEN(ptr) ((ptr)[1])
#define VSL_BATCHID(ptr) (VSL_ID((ptr) + 2))
#define VSL_ENDMARKER (((uint32_t)SLT__Reserved << 24) | 0x454545) /* "EEE" */
#define VSL_WRAPMARKER (((uint32_t)SLT__Reserved << 24) | 0x575757) /* "WWW" */
......
......@@ -46,6 +46,9 @@
#define VTX_CACHE 10
#define VTX_BUFSIZE_MIN 64
#define VTX_CHUNKS 3
struct vtx;
enum vtx_type_e {
vtx_t_unknown,
......@@ -62,6 +65,33 @@ enum vtx_link_e {
vtx_l_bereq,
};
struct vslc_raw {
struct vslc c;
unsigned magic;
#define VSLC_RAW_MAGIC 0x247EBD44
const uint32_t *start;
ssize_t len;
const uint32_t *next;
};
struct vtx_chunk {
struct VSLC_ptr start;
ssize_t len;
ssize_t offset;
};
struct vslc_vtx {
struct vslc c;
unsigned magic;
#define VSLC_VTX_MAGIC 0x74C6523F
struct vtx *vtx;
unsigned chunk; /* Current chunk */
ssize_t offset; /* Offset of next record */
};
struct vtx_key {
unsigned vxid;
VRB_ENTRY(vtx_key) entry;
......@@ -74,12 +104,12 @@ struct vtx {
#define VTX_MAGIC 0xACC21D09
VTAILQ_ENTRY(vtx) list_child;
VTAILQ_ENTRY(vtx) list_incomplete;
VTAILQ_ENTRY(vtx) list_shm;
double t_start;
unsigned flags;
#define VTX_F_SHM 0x1
#define VTX_F_COMPLETE 0x2
#define VTX_F_READY 0x4
#define VTX_F_COMPLETE 0x1
#define VTX_F_READY 0x2
enum vtx_type_e type;
......@@ -89,33 +119,16 @@ struct vtx {
unsigned n_childready;
unsigned n_descend;
const uint32_t *start;
struct vslc_vtx c;
ssize_t len;
unsigned index;
struct vtx_chunk chunk[VTX_CHUNKS];
unsigned n_chunk;
uint32_t *buf;
ssize_t bufsize;
};
struct vslc_raw {
struct vslc c;
unsigned magic;
#define VSLC_RAW_MAGIC 0x247EBD44
const uint32_t *start;
ssize_t len;
const uint32_t *next;
};
struct vslc_vtx {
struct vslc c;
unsigned magic;
#define VSLC_VTX_MAGIC 0x74C6523F
struct vtx *vtx;
const uint32_t *next;
};
struct VSLQ {
unsigned magic;
#define VSLQ_MAGIC 0x23A8BE97
......@@ -131,6 +144,8 @@ struct VSLQ {
VTAILQ_HEAD(,vtx) incomplete;
unsigned n_incomplete;
VTAILQ_HEAD(,vtx) shmlist;
VTAILQ_HEAD(,vtx) cache;
unsigned n_cache;
};
......@@ -215,17 +230,37 @@ static int
vslc_vtx_next(void *cursor)
{
struct vslc_vtx *c;
struct vtx_chunk *chunk;
CAST_OBJ_NOTNULL(c, cursor, VSLC_VTX_MAGIC);
assert(c->next >= c->vtx->start);
assert(c->next <= c->vtx->start + c->vtx->len);
if (c->next < c->vtx->start + c->vtx->len) {
c->c.c.rec.ptr = c->next;
c->next = VSL_NEXT(c->next);
assert (c->offset <= c->vtx->len);
if (c->offset == c->vtx->len)
return (0);
if (c->vtx->n_chunk == 0) {
/* Buffer */
AN(c->vtx->buf);
assert(c->offset < c->vtx->bufsize);
c->c.c.rec.ptr = c->vtx->buf + c->offset;
c->offset += VSL_NEXT(c->c.c.rec.ptr) - c->c.c.rec.ptr;
return (1);
}
return (0);
/* Shmptr chunks */
assert(c->chunk < c->vtx->n_chunk);
chunk = &c->vtx->chunk[c->chunk];
assert(c->offset >= chunk->offset);
assert(c->offset <= chunk->offset + chunk->len);
if (c->offset == chunk->offset + chunk->len) {
c->chunk++;
chunk = &c->vtx->chunk[c->chunk];
}
AN(chunk->start.ptr);
c->c.c.rec.ptr = chunk->start.ptr + c->offset - chunk->offset;
c->offset += VSL_NEXT(c->c.c.rec.ptr) - c->c.c.rec.ptr;
return (1);
}
static int
......@@ -234,10 +269,8 @@ vslc_vtx_reset(void *cursor)
struct vslc_vtx *c;
CAST_OBJ_NOTNULL(c, cursor, VSLC_VTX_MAGIC);
assert(c->next >= c->vtx->start);
assert(c->next <= c->vtx->start + c->vtx->len);
c->next = c->vtx->start;
c->chunk = 0;
c->offset = 0;
c->c.c.rec.ptr = NULL;
return (0);
......@@ -251,23 +284,6 @@ static struct vslc_tbl vslc_vtx_tbl = {
.check = NULL,
};
static void
vslc_vtx_setup(struct vslc_vtx *c, struct vtx *vtx, unsigned level)
{
AN(c);
AN(vtx);
memset(c, 0, sizeof *c);
c->c.c.vxid = vtx->key.vxid;
c->c.c.level = level;
c->c.magic = VSLC_MAGIC;
c->c.tbl = &vslc_vtx_tbl;
c->magic = VSLC_VTX_MAGIC;
c->vtx = vtx;
c->next = c->vtx->start;
}
static struct vtx *
vtx_new(struct VSLQ *vslq)
{
......@@ -282,6 +298,10 @@ vtx_new(struct VSLQ *vslq)
} else {
ALLOC_OBJ(vtx, VTX_MAGIC);
AN(vtx);
vtx->c.c.magic = VSLC_MAGIC;
vtx->c.c.tbl = &vslc_vtx_tbl;
vtx->c.magic = VSLC_VTX_MAGIC;
vtx->c.vtx = vtx;
}
vtx->key.vxid = 0;
......@@ -293,9 +313,10 @@ vtx_new(struct VSLQ *vslq)
vtx->n_child = 0;
vtx->n_childready = 0;
vtx->n_descend = 0;
vtx->start = vtx->buf;
(void)vslc_vtx_reset(&vtx->c);
vtx->len = 0;
vtx->index = 0;
memset(&vtx->chunk, 0, sizeof vtx->chunk);
vtx->n_chunk = 0;
VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
vslq->n_incomplete++;
......@@ -349,6 +370,9 @@ vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
AZ(vtx->n_descend);
AN(VRB_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
if (vtx->n_chunk)
VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
if (vslq->n_cache < VTX_CACHE) {
VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
vslq->n_cache++;
......@@ -375,52 +399,81 @@ vtx_lori(struct VSLQ *vslq, unsigned vxid)
vtx = vtx_new(vslq);
AN(vtx);
vtx->key.vxid = vxid;
vtx->c.c.c.vxid = vxid;
AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
return (vtx);
}
static void
vtx_append(struct vtx *vtx, const uint32_t *ptr, ssize_t len, int shmptr_ok)
vtx_set_bufsize(struct vtx *vtx, ssize_t len)
{
ssize_t bufsize;
const uint32_t *ptr2;
ssize_t len2;
AN(vtx);
assert(len >= 0);
if (vtx->bufsize >= len)
return;
if (vtx->bufsize == 0)
vtx->bufsize = VTX_BUFSIZE_MIN;
while (vtx->bufsize < len)
vtx->bufsize *= 2;
vtx->buf = realloc(vtx->buf, sizeof (uint32_t) * vtx->bufsize);
}
if (vtx->flags & VTX_F_SHM) {
assert(vtx->start != vtx->buf);
ptr2 = vtx->start;
vtx->start = vtx->buf;
len2 = vtx->len;
vtx->len = 0;
vtx->flags &= ~VTX_F_SHM;
vtx_append(vtx, ptr2, len2, 0);
}
static void
vtx_buffer(struct VSLQ *vslq, struct vtx *vtx)
{
int i;
AN(vtx->n_chunk);
AN(vtx->len);
vtx_set_bufsize(vtx, vtx->len);
AN(vtx->buf);
assert(vtx->bufsize >= vtx->len);
for (i = 0; i < vtx->n_chunk; i++)
memcpy(vtx->buf + vtx->chunk[i].offset, vtx->chunk[i].start.ptr,
sizeof (uint32_t) * vtx->chunk[i].len);
memset(&vtx->chunk, 0, sizeof vtx->chunk);
VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
vtx->n_chunk = 0;
}
static void
vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
ssize_t len, int copy)
{
AN(vtx);
if (len == 0)
return;
AN(ptr);
if (shmptr_ok && vtx->len == 0) {
vtx->start = ptr;
vtx->len = len;
vtx->flags |= VTX_F_SHM;
AN(start);
if (vtx->len > 0 && vtx->n_chunk == 0)
/* Can't mix buffer and shmptr */
copy = 1;
if (!copy && vtx->n_chunk < VTX_CHUNKS) {
/* Add shmptr chunk */
AZ(vtx->chunk[vtx->n_chunk].len);
vtx->chunk[vtx->n_chunk].start = *start;
vtx->chunk[vtx->n_chunk].len = len;
vtx->chunk[vtx->n_chunk].offset = vtx->len;
vtx->len += len;
if (vtx->n_chunk == 0)
VTAILQ_INSERT_TAIL(&vslq->shmlist, vtx, list_shm);
vtx->n_chunk++;
return;
}
bufsize = vtx->bufsize;
if (bufsize == 0)
bufsize = VTX_BUFSIZE_MIN;
while (vtx->len + len > bufsize)
bufsize *= 2;
if (bufsize != vtx->bufsize) {
vtx->buf = realloc(vtx->buf, 4 * bufsize);
AN(vtx->buf);
vtx->bufsize = bufsize;
vtx->start = vtx->buf;
}
memcpy(&vtx->buf[vtx->len], ptr, 4 * len);
/* Append to buffer */
vtx_set_bufsize(vtx, vtx->len + len);
if (vtx->n_chunk)
vtx_buffer(vslq, vtx);
AZ(vtx->n_chunk);
AN(vtx->buf);
memcpy(vtx->buf + vtx->len, start->ptr, sizeof (uint32_t) * len);
vtx->len += len;
}
......@@ -454,13 +507,6 @@ vtx_check_ready(struct VSLQ *vslq, struct vtx *vtx)
/* Top level vtx ready */
return (ready);
if (vtx->flags & VTX_F_SHM) {
/* Not ready, append zero to make sure it's not a shm
reference */
vtx_append(vtx, NULL, 0, 0);
AZ(vtx->flags & VTX_F_SHM);
}
return (NULL);
}
......@@ -518,7 +564,7 @@ static int
vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
{
int i;
enum vtx_type_e p_type;
enum vtx_type_e type;
unsigned p_vxid;
struct vtx *p_vtx;
......@@ -527,25 +573,31 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
if (vtx->flags & VTX_F_READY)
return (vtx_diag_tag(vtx, ptr, "link too late"));
i = vtx_parsetag_bl(VSL_CDATA(ptr), VSL_LEN(ptr), &p_type, &p_vxid);
i = vtx_parsetag_bl(VSL_CDATA(ptr), VSL_LEN(ptr), &type, &p_vxid);
if (i < 1)
return (vtx_diag_tag(vtx, ptr, "parse error"));
/* Check/set vtx type */
assert(p_type != vtx_t_unknown);
if (vtx->type != vtx_t_unknown && vtx->type != p_type)
assert(type != vtx_t_unknown);
if (vtx->type != vtx_t_unknown && vtx->type != type)
return (vtx_diag_tag(vtx, ptr, "type mismatch"));
vtx->type = p_type;
vtx->type = type;
if (i == 1 || p_vxid == 0)
return (0);
if (vslq->grouping == VSL_g_vxid)
return (0); /* No links */
if (vslq->grouping == VSL_g_request && vtx->type == vtx_t_req)
return (0); /* No links */
/* Lookup and check parent vtx */
p_vtx = vtx_lori(vslq, p_vxid);
AN(p_vtx);
if (vtx->parent == p_vtx)
/* Link already exists */
return (0);
if (vtx->parent != NULL)
return (vtx_diag_tag(vtx, ptr, "duplicate link"));
if (p_vtx->flags & VTX_F_READY)
......@@ -574,6 +626,11 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
return (vtx_diag_tag(vtx, ptr, "parse error"));
assert(i == 2);
if (vslq->grouping == VSL_g_vxid)
return (0); /* No links */
if (vslq->grouping == VSL_g_request && vtx->type == vtx_t_sess)
return (0); /* No links */
/* Lookup and check child vtx */
c_vtx = vtx_lori(vslq, c_vxid);
AN(c_vtx);
......@@ -584,7 +641,6 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
return (vtx_diag_tag(vtx, ptr, "duplicate link"));
if (c_vtx->flags & VTX_F_READY)
return (vtx_diag_tag(vtx, ptr, "link too late"));
assert(c_type != vtx_t_unknown);
if (c_vtx->type != vtx_t_unknown && c_vtx->type != c_type)
return (vtx_diag_tag(vtx, ptr, "type mismatch"));
c_vtx->type = c_type;
......@@ -599,16 +655,17 @@ vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
{
const uint32_t *ptr;
enum VSL_tag_e tag;
int complete;
struct vtx *ret = NULL;
complete = (vtx->flags & VTX_F_COMPLETE ? 1 : 0);
ptr = vtx->start + vtx->index;
assert(ptr <= vtx->start + vtx->len);
for (; ptr < vtx->start + vtx->len; ptr = VSL_NEXT(ptr)) {
while (vslc_vtx_next(&vtx->c) == 1) {
ptr = vtx->c.c.c.rec.ptr;
tag = VSL_TAG(ptr);
if (complete) {
vtx_diag(vtx, "late log rec");
if (tag == SLT__Batch)
continue;
if (vtx->flags & VTX_F_COMPLETE) {
vtx_diag_tag(vtx, ptr, "late log rec");
continue;
}
......@@ -625,31 +682,21 @@ vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
break;
case SLT_End:
complete = 1;
AZ(vtx->flags & VTX_F_COMPLETE);
AZ(ret);
VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
vtx->flags |= VTX_F_COMPLETE;
AN(vslq->n_incomplete);
vslq->n_incomplete--;
ret = vtx_check_ready(vslq, vtx);
break;
default:
break;
}
}
vtx->index = ptr - vtx->start;
assert(vtx->index <= vtx->len);
if (!complete && vtx->flags & VTX_F_SHM) {
/* Append zero to make sure it's not a shm reference */
vtx_append(vtx, NULL, 0, 0);
AZ(vtx->flags & VTX_F_SHM);
}
if (complete) {
VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
vtx->flags |= VTX_F_COMPLETE;
AN(vslq->n_incomplete);
vslq->n_incomplete--;
return (vtx_check_ready(vslq, vtx));
}
return (NULL);
return (ret);
}
static struct vtx *
......@@ -672,9 +719,9 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
void *priv)
{
unsigned n = vtx->n_descend + 1;
struct vslc_vtx *cp[n];
struct VSL_cursor *CP[n + 1];
unsigned i, j;
struct vslc_vtx c[n];
struct VSL_cursor *cp[n + 1];
AN(vslq);
CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
......@@ -689,13 +736,17 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
return (0);
i = j = 0;
vslc_vtx_setup(&c[i], vtx, 0);
cp[i] = &vtx->c;
vslc_vtx_reset(cp[i]);
cp[i]->c.c.level = 0;
i++;
while (j < i) {
vtx = VTAILQ_FIRST(&c[j].vtx->child);
vtx = VTAILQ_FIRST(&cp[j]->vtx->child);
while (vtx) {
assert(i < n);
vslc_vtx_setup(&c[i], vtx, c[j].c.c.level + 1);
cp[i] = &vtx->c;
vslc_vtx_reset(cp[i]);
cp[i]->c.c.level = cp[j]->c.c.level + 1;
i++;
vtx = VTAILQ_NEXT(vtx, list_child);
}
......@@ -705,12 +756,12 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
/* Reverse order */
for (i = 0; i < n; i++)
cp[i] = &c[n - i - 1].c.c;
cp[i] = NULL;
CP[i] = &cp[n - i - 1]->c.c;
CP[i] = NULL;
/* Query test goes here */
if (vslq->query == NULL ? 1 : vslq_runquery(vslq->query, cp))
return ((func)(vslq->vsl, cp, priv));
if (vslq->query == NULL ? 1 : vslq_runquery(vslq->query, CP))
return ((func)(vslq->vsl, CP, priv));
else
return (0);
}
......@@ -744,6 +795,7 @@ VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
vslq->query = query;
VRB_INIT(&vslq->tree);
VTAILQ_INIT(&vslq->incomplete);
VTAILQ_INIT(&vslq->shmlist);
VTAILQ_INIT(&vslq->cache);
return (vslq);
......@@ -829,7 +881,6 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
struct VSL_cursor *c;
int i;
enum VSL_tag_e tag;
const uint32_t *ptr;
ssize_t len;
unsigned vxid;
struct vtx *vtx;
......@@ -842,24 +893,38 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
c = vslq->c;
while (1) {
/* Check shmlist and buffer on warning */
while ((vtx = VTAILQ_FIRST(&vslq->shmlist))) {
AN(vtx->n_chunk);
i = VSL_Check(c, &vtx->chunk[0].start);
if (i == 2)
break;
else if (i == 1)
vtx_buffer(vslq, vtx);
else
/* Too late */
return (-3);
}
i = VSL_Next(c);
if (i != 1)
break;
return (i);
tag = VSL_TAG(c->rec.ptr);
if (tag == SLT__Batch) {
ptr = VSL_NEXT(c->rec.ptr);
len = VSL_WORDS(c->rec.ptr[1]);
AZ(vsl_skip(c, len));
vxid = VSL_BATCHID(c->rec.ptr);
len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
c->rec.ptr;
} else {
ptr = c->rec.ptr;
len = VSL_NEXT(ptr) - ptr;
vxid = VSL_ID(c->rec.ptr);
len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
}
vxid = VSL_ID(ptr);
if (vxid == 0)
continue;
vtx = vtx_lori(vslq, vxid);
AN(vtx);
vtx_append(vtx, ptr, len, c->shmptr_ok);
vtx_append(vslq, vtx, &c->rec, len, VSL_Check(c, &c->rec) != 2);
if (tag == SLT__Batch)
AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
vtx = vtx_scan(vslq, vtx);
if (vtx) {
AN(vtx->flags & VTX_F_READY);
......@@ -867,7 +932,7 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
vtx_retire(vslq, &vtx);
AZ(vtx);
if (i)
break;
return (i);
}
}
if (i)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment