Commit a2af8368 authored by Geoff Simmons's avatar Geoff Simmons

complete refactoring of the internal data structures, so that the

reader thread only copies the data needed for formatting -- the first
matching record --, and all accesses are indexed table lookups

'make check' does *not* succeed in this commit
parent 529d30f3
This diff is collapsed.
/*-
* Copyright (c) 2015 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2015 Otto Gmbh & Co KG
* All rights reserved.
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "varnishevent.h"
extern rec_t * const magic_end_rec;
This diff is collapsed.
......@@ -45,6 +45,7 @@ rec_t *get_tag(const tx_t *tx, enum VSL_tag_e tag);
char *get_hdr(const tx_t *tx, enum VSL_tag_e tag, const char *hdr);
char *get_fld(char *str, int n, size_t *len);
char *get_rec_fld(const rec_t *rec, int n, size_t *len);
int hdrcmp(const void *s1, const void *s2);
formatter_f format_b_client;
formatter_f format_b_backend;
......@@ -97,7 +98,9 @@ formatter_f format_Xt;
formatter_f format_Xttfb_client;
formatter_f format_Xttfb_backend;
#if 0
formatter_f format_VCL_disp;
#endif
formatter_f format_VCL_Log;
......
......@@ -29,6 +29,7 @@ test_format_LDADD = \
../config.$(OBJEXT) \
../strfTIM.$(OBJEXT) \
../base64.$(OBJEXT) \
../data.$(OBJEXT) \
../format.$(OBJEXT) \
@VARNISH_LIBS@ @VARNISH_LIBVARNISH_LIB@ -lm -lvarnish
......
......@@ -30,10 +30,15 @@
*/
#include <string.h>
#include <stddef.h>
#include "minunit.h"
#include "../varnishevent.h"
#include "../data.h"
/* __offsetof() used in VSTAILQ_LAST() */
#define __offsetof(t,f) offsetof(t,f)
int tests_run = 0;
......@@ -74,12 +79,26 @@ static char
MASSERT(txn[i].pvxid == -1);
MASSERT(txn[i].type == VSL_t_unknown);
MAZ(txn[i].t);
MASSERT(VSTAILQ_EMPTY(&txn[i].recs));
for (int j = 0; j <= max_idx; j++) {
MCHECK_OBJ_NOTNULL(txn[i].recs[j], REC_NODE_MAGIC);
MAZ(txn[i].recs[j]->rec);
if (txn[i].recs[j]->hdrs != NULL)
for (int k = 0; txn[i].recs[j]->hdrs[k] != magic_end_rec; k++)
MAZ(txn[i].recs[j]->hdrs[k]);
}
if (VSTAILQ_NEXT(&txn[i], freelist) != NULL)
tx_free++;
}
MASSERT(global_nfree_tx == config.max_data);
for (int i = 0; i < config.max_data * (max_idx + 1); i++) {
MCHECK_OBJ(&rec_nodes[i], REC_NODE_MAGIC);
MAZ(rec_nodes[i].rec);
if (rec_nodes[i].hdrs != NULL)
for (int j = 0; rec_nodes[i].hdrs[j] != magic_end_rec; j++)
MAZ(rec_nodes[i].hdrs[j]);
}
for (int i = 0; i < nrecords; i++) {
MCHECK_OBJ(&records[i], RECORD_MAGIC);
MASSERT(!OCCUPIED(&records[i]));
......@@ -256,14 +275,65 @@ static const char
return NULL;
}
static int chunks_filled = 0;
static void
fill_rec(rec_t *rec, chunk_t *c, int nc)
{
chunk_t *chunk;
rec->magic = RECORD_MAGIC;
rec->len = 42;
rec->tag = MAX_VSL_TAG/2;
rec->occupied = 1;
VSTAILQ_INIT(&rec->chunks);
for (int i = 0; i < nc; i++) {
chunk = &c[i];
VSTAILQ_INSERT_TAIL(&rec->chunks, chunk, chunklist);
chunk->magic = CHUNK_MAGIC;
chunk->data = (char *) calloc(1, config.chunk_size);
chunks_filled++;
}
}
#define REC_NODE_CLEARED(n) \
do { \
MCHECK_OBJ(n, REC_NODE_MAGIC); \
MAZ((n)->rec); \
if ((n)->hdrs != NULL) { \
for (int x = 0; (n)->hdrs[x] != magic_end_rec; x++) \
MAZ((n)->hdrs[x]); \
MASSERT((n)->hdrs[HDRS_PER_NODE] == magic_end_rec); \
} \
} while(0)
#define REC_CLEARED(rec) \
do { \
MCHECK_OBJ_NOTNULL((rec), RECORD_MAGIC); \
MASSERT(!OCCUPIED(rec)); \
MASSERT((rec)->tag == SLT__Bogus); \
MAZ((rec)->len); \
MASSERT(VSTAILQ_EMPTY(&(rec)->chunks)); \
} while(0)
#define CHUNK_CLEARED(chunk) \
do { \
MCHECK_OBJ_NOTNULL((chunk), CHUNK_MAGIC); \
MASSERT(!OCCUPIED(chunk)); \
MAZ((chunk)->data[0]); \
} while(0)
static const char
*test_data_clear_tx(void)
{
#define NRECS 10
#define N_NODES (max_idx + 1)
#define HDRS_PER_NODE 5
#define CHUNKS_PER_REC 3
#define NRECS ((N_NODES)/2 + (((N_NODES) - (N_NODES)/2) * HDRS_PER_NODE))
#define NCHUNKS ((NRECS) * (CHUNKS_PER_REC))
tx_t tx;
rec_t r[NRECS], *rec;
chunk_t c[NRECS * CHUNKS_PER_REC], *chunk;
rec_t *rec;
chunk_t *chunk;
int n = 0;
unsigned nfree_tx = 4711, nfree_recs = 815, nfree_chunks = 1147;
......@@ -274,17 +344,36 @@ static const char
VSTAILQ_INIT(&local_freechunk);
tx.magic = TX_MAGIC;
VSTAILQ_INIT(&tx.recs);
for (int i = 0; i < NRECS; i++) {
VSTAILQ_INSERT_TAIL(&tx.recs, &r[i], reclist);
r[i].magic = RECORD_MAGIC;
VSTAILQ_INIT(&r[i].chunks);
for (int j = 0; j < CHUNKS_PER_REC; j++) {
chunk = &c[i*CHUNKS_PER_REC + j];
VSTAILQ_INSERT_TAIL(&r[i].chunks, chunk, chunklist);
chunk->magic = CHUNK_MAGIC;
chunk->data = (char *) calloc(1, config.chunk_size);
tx.t = 123456789.0;
tx.vxid = 314159265;
tx.pvxid = 2718281828;
tx.type = VSL_t_req;
tx.occupied = 1;
tx.recs = (rec_node_t **) calloc(N_NODES, sizeof(rec_node_t *));
MAN(tx.recs);
for (int i = 0; i < N_NODES/2; i++) {
tx.recs[i] = &rec_nodes[i];
rec_nodes[i].magic = REC_NODE_MAGIC;
rec_nodes[i].rec = &records[i];
fill_rec(&records[i], &chunks[i * CHUNKS_PER_REC], CHUNKS_PER_REC);
MASSERT(&chunks[(i * CHUNKS_PER_REC) + CHUNKS_PER_REC - 1]
== VSTAILQ_LAST(&records[i].chunks, chunk_t, chunklist));
rec_nodes[i].hdrs = NULL;
}
for (int i = N_NODES/2; i < N_NODES; i++) {
tx.recs[i] = &rec_nodes[i];
rec_nodes[i].magic = REC_NODE_MAGIC;
rec_nodes[i].rec = NULL;
rec_nodes[i].hdrs = (rec_t **) calloc(HDRS_PER_NODE + 1,
sizeof(rec_t *));
MAN(rec_nodes[i].hdrs);
for (int j = 0; j < HDRS_PER_NODE; j++) {
int idx = N_NODES/2 + (i - N_NODES/2) * HDRS_PER_NODE + j;
rec_nodes[i].hdrs[j] = &records[idx];
fill_rec(&records[idx], &chunks[idx * CHUNKS_PER_REC],
CHUNKS_PER_REC);
}
rec_nodes[i].hdrs[HDRS_PER_NODE] = magic_end_rec;
}
DATA_Clear_Tx(&tx, &local_freetx, &local_freerec, &local_freechunk,
......@@ -292,7 +381,7 @@ static const char
MASSERT(nfree_tx == 4712);
MASSERT(nfree_recs == 815 + NRECS);
MASSERT(nfree_chunks == 1147 + NRECS*CHUNKS_PER_REC);
MASSERT(nfree_chunks == 1147 + NCHUNKS);
MCHECK_OBJ(&tx, TX_MAGIC);
MASSERT(!OCCUPIED(&tx));
......@@ -300,7 +389,10 @@ static const char
MASSERT(tx.pvxid == -1);
MASSERT(tx.type == VSL_t_unknown);
MAZ(tx.t);
MASSERT(VSTAILQ_EMPTY(&tx.recs));
for (int i = 0; i < N_NODES; i++) {
REC_NODE_CLEARED(tx.recs[i]);
REC_NODE_CLEARED(&rec_nodes[i]);
}
MASSERT(!VSTAILQ_EMPTY(&local_freetx));
MASSERT(VSTAILQ_FIRST(&local_freetx) == &tx);
......@@ -308,11 +400,7 @@ static const char
MASSERT(!VSTAILQ_EMPTY(&local_freerec));
VSTAILQ_FOREACH(rec, &local_freerec, freelist) {
MCHECK_OBJ_NOTNULL(rec, RECORD_MAGIC);
MASSERT(!OCCUPIED(rec));
MASSERT(rec->tag == SLT__Bogus);
MAZ(rec->len);
MASSERT(VSTAILQ_EMPTY(&tx.recs));
REC_CLEARED(rec);
n++;
}
MASSERT(n == NRECS);
......@@ -320,13 +408,16 @@ static const char
MASSERT(!VSTAILQ_EMPTY(&local_freechunk));
n = 0;
VSTAILQ_FOREACH(chunk, &local_freechunk, freelist) {
MCHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
MASSERT(!OCCUPIED(chunk));
MAZ(chunk->data[0]);
CHUNK_CLEARED(chunk);
n++;
free(chunk->data);
}
MASSERT(n == NRECS * CHUNKS_PER_REC);
MASSERT(n == NCHUNKS);
for (int i = 0; i < NRECS; i++)
REC_CLEARED(&records[i]);
for (int i = 0; i < NCHUNKS; i++)
CHUNK_CLEARED(&chunks[i]);
return NULL;
}
......
This diff is collapsed.
......@@ -247,11 +247,9 @@ event(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
{
int status = DISPATCH_RETURN_OK;
unsigned nrec = 0, total_chunks = 0;
(void) vsl;
(void) priv;
if (term)
return DISPATCH_TERMINATE;
for (struct VSL_transaction *t = pt[0]; t != NULL; t = *++pt) {
struct tx_t *tx;
......@@ -270,7 +268,7 @@ event(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
}
CHECK_OBJ_NOTNULL(tx, TX_MAGIC);
assert(!OCCUPIED(tx));
assert(VSTAILQ_EMPTY(&tx->recs));
AN(tx->recs);
tx->type = t->type;
tx->vxid = t->vxid;
tx->pvxid = t->vxid_parent;
......@@ -278,34 +276,51 @@ event(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
tx->t = VTIM_real();
while ((status = VSL_Next(t->c)) > 0) {
int len, n, nchunk;
rec_t *rec;
enum VSL_tag_e tag;
int idx, hdr_idx, len, n, nchunk;
rec_t *rec, **rp;
chunk_t *chunk;
const char *p;
if (!VSL_Match(vsl, t->c))
if ((idx = tag2idx[VSL_TAG(t->c->rec.ptr)]) == -1)
continue;
CHECK_OBJ_NOTNULL(tx->recs[idx], REC_NODE_MAGIC);
if (tx->recs[idx]->rec != NULL)
continue;
if (tx->recs[idx]->hdrs != NULL) {
hdr_idx = DATA_FindHdrIdx(VSL_TAG(t->c->rec.ptr),
(const char *)
VSL_CDATA(t->c->rec.ptr));
if (hdr_idx == -1)
continue;
if (tx->recs[idx]->hdrs[hdr_idx] != NULL)
continue;
rp = &tx->recs[idx]->hdrs[hdr_idx];
}
else
rp = &tx->recs[idx]->rec;
tag = VSL_TAG(t->c->rec.ptr);
p = (const char *) VSL_CDATA(t->c->rec.ptr);
len = VSL_LEN(t->c->rec.ptr);
if (debug)
LOG_Log(LOG_DEBUG, "Record: [%u %s %.*s]",
VSL_ID(t->c->rec.ptr), VSL_tags[VSL_TAG(t->c->rec.ptr)],
len, VSL_CDATA(t->c->rec.ptr));
VSL_ID(t->c->rec.ptr), VSL_tags[tag], len, p);
rec = take_rec();
if (rec == NULL) {
no_free_rec++;
LOG_Log(LOG_DEBUG, "Freelist exhausted, record DISCARDED: "
"[%u %s %.*s]", VSL_ID(t->c->rec.ptr),
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
"[%u %s %.*s]", VSL_ID(t->c->rec.ptr), VSL_tags[tag],
len, p);
continue;
}
CHECK_OBJ_NOTNULL(rec, RECORD_MAGIC);
assert(!OCCUPIED(rec));
assert(VSTAILQ_EMPTY(&rec->chunks));
*rp = rec;
rec->tag = VSL_TAG(t->c->rec.ptr);
rec->tag = tag;
n = len;
if (len > len_hi)
len_hi = len;
......@@ -316,7 +331,6 @@ event(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
rec->len = n;
/* Copy the payload into chunks */
p = (const char *) VSL_CDATA(t->c->rec.ptr);
nchunk = (n + config.chunk_size - 1) / config.chunk_size;
for (int i = 0; i < nchunk; i++) {
assert(n > 0);
......@@ -326,8 +340,7 @@ event(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
LOG_Log(LOG_DEBUG,
"Freelist exhausted, payload TRUNCATED: "
"[%u %s %.*s]", VSL_ID(t->c->rec.ptr),
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
VSL_tags[tag], len, p);
continue;
}
CHECK_OBJ(chunk, CHUNK_MAGIC);
......@@ -343,7 +356,6 @@ event(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
total_chunks++;
}
rec->occupied = 1;
VSTAILQ_INSERT_TAIL(&tx->recs, rec, reclist);
nrec++;
}
......@@ -364,6 +376,8 @@ event(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
submit(tx);
}
if (term)
return DISPATCH_TERMINATE;
if (flush)
return DISPATCH_FLUSH;
if (!reopen)
......@@ -461,6 +475,7 @@ main(int argc, char *argv[])
struct VSM_data *vsm = NULL;
struct VSL_cursor *cursor;
enum VSL_grouping_e grouping = VSL_g_vxid;
struct vsb *hdrs = VSB_new_auto();
vsl = VSL_New();
......@@ -722,25 +737,31 @@ main(int argc, char *argv[])
LOG_Log(LOG_INFO, "Reading varnish instance %s", VSM_Name(vsm));
}
char **include_args = FMT_Get_I_Args();
if (include_args != NULL) {
assert(VSL_Arg(vsl, 'C', NULL) > 0);
for (int i = 0; include_args[i] != NULL; i++) {
LOG_Log(LOG_INFO, "Include filter: %s", include_args[i]);
assert(VSL_Arg(vsl, 'I', include_args[i]) > 0);
}
}
bprintf(scratch, "%s", FMT_Get_i_Arg());
if (!EMPTY(scratch)) {
LOG_Log(LOG_INFO, "Include tags: %s", scratch);
assert(VSL_Arg(vsl, 'i', scratch) > 0);
}
if (!VSB_EMPTY(config.cformat) && VSB_EMPTY(config.bformat))
assert(VSL_Arg(vsl, 'c', scratch) > 0);
else if (!VSB_EMPTY(config.bformat) && VSB_EMPTY(config.cformat))
assert(VSL_Arg(vsl, 'b', scratch) > 0);
for (int i = 0; i < MAX_VSL_TAG; i++) {
int idx = tag2idx[i];
if (idx == -1)
continue;
if (hdr_include_tbl[i] == NULL)
LOG_Log(LOG_INFO, "Reading tag %s", VSL_tags[i]);
else {
include_t *inc = hdr_include_tbl[i];
CHECK_OBJ_NOTNULL(inc, INCLUDE_MAGIC);
VSB_clear(hdrs);
for (int j = 0; j < inc->n; j++) {
VSB_cat(hdrs, inc->hdr[j]);
VSB_cat(hdrs, ",");
}
VSB_finish(hdrs);
LOG_Log(LOG_INFO, "Reading tags %s with headers: %s", VSL_tags[i],
VSB_data(hdrs));
}
}
if ((errnum = DATA_Init()) != 0) {
LOG_Log(LOG_CRIT, "Cannot init data tables: %s\n",
strerror(errnum));
......
......@@ -69,7 +69,10 @@
"%h %l %u %t \"%r\" %s %b \"%{Referer}i\" \"%{User-agent}i\""
struct sigaction default_action;
int tag2idx[MAX_VSL_TAG];
int max_idx;
typedef struct chunk_t {
unsigned magic;
#define CHUNK_MAGIC 0x676e0d19
......@@ -90,7 +93,6 @@ typedef struct rec_t {
unsigned len;
chunkhead_t chunks;
VSTAILQ_ENTRY(rec_t) freelist;
VSTAILQ_ENTRY(rec_t) reclist;
enum VSL_tag_e tag;
unsigned int occupied:1;
} rec_t;
......@@ -100,15 +102,24 @@ unsigned nrecords;
typedef VSTAILQ_HEAD(rechead_s, rec_t) rechead_t;
typedef struct rec_node_t {
unsigned magic;
#define REC_NODE_MAGIC 0x92d4933d
rec_t *rec;
rec_t **hdrs;
} rec_node_t;
rec_node_t *rec_nodes;
typedef struct tx_t {
unsigned magic;
#define TX_MAGIC 0xff463e42
int32_t vxid;
int32_t pvxid;
rechead_t recs;
VSTAILQ_ENTRY(tx_t) freelist;
VSTAILQ_ENTRY(tx_t) spscq;
rec_node_t **recs;
double t;
int32_t vxid;
int32_t pvxid;
enum VSL_transaction_e type:7;
unsigned int occupied:1;
} tx_t;
......@@ -122,6 +133,15 @@ typedef VSTAILQ_HEAD(txhead_s, tx_t) txhead_t;
unsigned tx_occ, rec_occ, chunk_occ, tx_occ_hi, rec_occ_hi, chunk_occ_hi,
global_nfree_tx, global_nfree_rec, global_nfree_chunk;
typedef struct include_s {
unsigned magic;
#define INCLUDE_MAGIC 0x4dd6fe3b
char **hdr;
int n;
} include_t;
include_t *hdr_include_tbl[MAX_VSL_TAG];
/* Writer (consumer) waits for this condition when the SPSC queue is empty.
Reader (producer) signals the condition after enqueue. */
pthread_cond_t spscq_ready_cond;
......@@ -205,6 +225,7 @@ unsigned DATA_Take_Freechunk(struct chunkhead_s *dst);
void DATA_Return_Freetx(struct txhead_s *returned, unsigned nreturned);
void DATA_Return_Freerec(struct rechead_s *returned, unsigned nreturned);
void DATA_Return_Freechunk(struct chunkhead_s *returned, unsigned nreturned);
int DATA_FindHdrIdx(enum VSL_tag_e, const char *hdr);
void DATA_Dump(void);
/* writer.c */
......@@ -239,8 +260,7 @@ void MON_Output(void);
/* format.c */
int FMT_Init(char *err);
char **FMT_Get_I_Args(void);
char *FMT_Get_i_Arg(void);
int FMT_GetMaxIdx(void);
int FMT_Estimate_RecsPerTx(void);
void FMT_Format(tx_t *tx, struct vsb *os);
void FMT_Fini(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment