Commit a7d362ab authored by Geoff Simmons's avatar Geoff Simmons

buffers are now divided into chunks, to waste less space

parent aa1edbb2
......@@ -90,34 +90,43 @@ const char *version = PACKAGE_TARNAME "-" PACKAGE_VERSION " revision " \
VCS_Version " branch " VCS_Branch;
static unsigned len_hi = 0, debug = 0, data_exhausted = 0;
// chunk_exhausted = 0;
static unsigned long seen = 0, submitted = 0, len_overflows = 0, no_data = 0,
no_free_data = 0, vcl_log_err = 0, vsl_errs = 0, closed = 0, overrun = 0,
ioerr = 0, reacquire = 0, truncated = 0, key_hi = 0, key_overflows = 0;
// no_free_chunk = 0;
ioerr = 0, reacquire = 0, truncated = 0, key_hi = 0, key_overflows = 0,
no_free_chunk = 0;
static volatile sig_atomic_t flush = 0, term = 0;
static struct sigaction terminate_action, dump_action, flush_action;
/* Local freelist */
static struct freehead_s reader_freelist =
VSTAILQ_HEAD_INITIALIZER(reader_freelist);
static unsigned rdr_data_free = 0;
/* Local freelists */
static struct rechead_s reader_freerec =
VSTAILQ_HEAD_INITIALIZER(reader_freerec);
static chunkhead_t reader_freechunk =
VSTAILQ_HEAD_INITIALIZER(reader_freechunk);
static unsigned rdr_rec_free = 0, rdr_chunk_free = 0;
/*--------------------------------------------------------------------*/
void
RDR_Stats(void)
{
LOG_Log(LOG_INFO, "Reader: seen=%lu submitted=%lu nodata=%lu free=%u "
"no_free_rec=%lu len_hi=%u key_hi=%lu len_overflows=%lu "
"truncated=%lu key_overflows=%lu vcl_log_err=%lu vsl_err=%lu "
"closed=%lu overrun=%lu ioerr=%lu reacquire=%lu",
seen, submitted, no_data, rdr_data_free, no_free_data, len_hi,
key_hi, len_overflows, truncated, key_overflows, vcl_log_err,
vsl_errs, closed, overrun, ioerr, reacquire);
LOG_Log(LOG_INFO, "Reader: seen=%lu submitted=%lu nodata=%lu free_rec=%u "
"free_chunk=%u no_free_rec=%lu no_free_chunk=%lu len_hi=%u "
"key_hi=%lu len_overflows=%lu truncated=%lu key_overflows=%lu "
"vcl_log_err=%lu vsl_err=%lu closed=%lu overrun=%lu ioerr=%lu "
"reacquire=%lu",
seen, submitted, no_data, rdr_rec_free, rdr_chunk_free,
no_free_data, no_free_chunk, len_hi, key_hi, len_overflows,
truncated, key_overflows, vcl_log_err, vsl_errs, closed, overrun,
ioerr, reacquire);
}
int
RDR_Exhausted(void)
{
return data_exhausted;
}
/*--------------------------------------------------------------------*/
......@@ -175,24 +184,47 @@ static inline dataentry
{
dataentry *data;
while (VSTAILQ_EMPTY(&reader_freelist)) {
while (VSTAILQ_EMPTY(&reader_freerec)) {
spmcq_signal();
rdr_data_free = DATA_Take_Freelist(&reader_freelist);
if (VSTAILQ_EMPTY(&reader_freelist)) {
rdr_rec_free = DATA_Take_Freerec(&reader_freerec);
if (VSTAILQ_EMPTY(&reader_freerec)) {
data_exhausted = 1;
return NULL;
}
if (debug)
LOG_Log(LOG_DEBUG, "Reader: took %u free data entries",
rdr_data_free);
rdr_rec_free);
}
data_exhausted = 0;
data = VSTAILQ_FIRST(&reader_freelist);
VSTAILQ_REMOVE_HEAD(&reader_freelist, freelist);
rdr_data_free--;
data = VSTAILQ_FIRST(&reader_freerec);
VSTAILQ_REMOVE_HEAD(&reader_freerec, freelist);
rdr_rec_free--;
return (data);
}
static inline chunk_t
*take_chunk(void)
{
chunk_t *chunk;
while (VSTAILQ_EMPTY(&reader_freechunk)) {
spmcq_signal();
rdr_chunk_free = DATA_Take_Freechunk(&reader_freechunk);
if (VSTAILQ_EMPTY(&reader_freechunk)) {
data_exhausted = 1;
return NULL;
}
if (debug)
LOG_Log(LOG_DEBUG, "Reader: took %u free chunks",
rdr_chunk_free);
}
data_exhausted = 0;
chunk = VSTAILQ_FIRST(&reader_freechunk);
VSTAILQ_REMOVE_HEAD(&reader_freechunk, freelist);
rdr_chunk_free--;
return (chunk);
}
/* return to our own local cache */
static inline void
......@@ -200,7 +232,8 @@ data_free(dataentry *de)
{
AN(de);
assert(!OCCUPIED(de));
VSTAILQ_INSERT_HEAD(&reader_freelist, de, freelist);
rdr_chunk_free += DATA_Reset(de, &reader_freechunk);
VSTAILQ_INSERT_HEAD(&reader_freerec, de, freelist);
}
static inline void
......@@ -210,9 +243,27 @@ data_submit(dataentry *de)
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
assert(OCCUPIED(de));
AZ(memchr(de->data, '\0', de->end));
if (debug)
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, de->data);
if (debug) {
chunk_t *chunk;
char *p, *data = (char *) malloc(de->end);
int n = de->end;
p = data;
chunk = VSTAILQ_FIRST(&de->chunks);
while (n > 0) {
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
assert(OCCUPIED(chunk));
int cp = n;
if (cp > config.chunk_size)
cp = config.chunk_size;
memcpy(p, chunk->data, cp);
chunk = VSTAILQ_NEXT(chunk, chunklist);
n -= cp;
p += cp;
}
assert(p == data + de->end);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, data);
free(data);
}
SPMCQ_Enq(de);
submitted++;
......@@ -231,7 +282,8 @@ data_submit(dataentry *de)
static inline void
take_free(void)
{
rdr_data_free += DATA_Take_Freelist(&reader_freelist);
rdr_rec_free += DATA_Take_Freerec(&reader_freerec);
rdr_chunk_free += DATA_Take_Freechunk(&reader_freechunk);
}
/*--------------------------------------------------------------------*/
......@@ -250,11 +302,35 @@ all_wrk_abandoned(void)
/*--------------------------------------------------------------------*/
static void
static chunk_t *
get_chunk(dataentry *entry)
{
chunk_t *chunk;
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
chunk = take_chunk();
if (chunk == NULL) {
no_free_chunk++;
return NULL;
}
CHECK_OBJ(chunk, CHUNK_MAGIC);
assert(!OCCUPIED(chunk));
entry->curchunk = chunk;
entry->curchunkidx = 0;
VSTAILQ_INSERT_TAIL(&entry->chunks, chunk, chunklist);
chunk->occupied = 1;
return chunk;
}
static unsigned
append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
int datalen)
{
char *null;
chunk_t *chunk;
char *null, *p;
unsigned chunks_added = 0;
int n;
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
/* Data overflow */
......@@ -263,7 +339,7 @@ append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
"DISCARDING data=[%.*s]", VSL_tags[tag], xid, entry->end,
datalen, data);
len_overflows++;
return;
return -1;
}
/* Null chars in the payload means that the data was truncated in the
log, due to exceeding shm_reclen. */
......@@ -273,14 +349,41 @@ append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
VSL_tags[tag], xid, datalen, data);
truncated++;
}
entry->data[entry->end] = '&';
assert(entry->curchunkidx <= config.chunk_size);
if (entry->curchunkidx == config.chunk_size) {
chunk = get_chunk(entry);
if (chunk == NULL)
return -1;
chunks_added++;
}
entry->curchunk->data[entry->curchunkidx] = '&';
entry->curchunkidx++;
entry->end++;
memcpy(&entry->data[entry->end], data, datalen);
p = data;
n = datalen;
while (n > 0) {
assert(entry->curchunkidx <= config.chunk_size);
if (entry->curchunkidx == config.chunk_size) {
chunk = get_chunk(entry);
if (chunk == NULL)
return -1;
chunks_added++;
}
int cp = n;
if (cp + entry->curchunkidx > config.chunk_size)
cp = config.chunk_size - entry->curchunkidx;
memcpy(&entry->curchunk->data[entry->curchunkidx], p, cp);
entry->curchunkidx += cp;
p += cp;
n -= cp;
}
assert(p == data + datalen);
entry->end += datalen;
if (entry->end > len_hi)
len_hi = entry->end;
return;
return chunks_added;
}
static inline void
......@@ -306,11 +409,12 @@ addkey(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *key,
static int
dispatch(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
{
int status = DISPATCH_RETURN_OK, hasdata = 0;
int status = DISPATCH_RETURN_OK, hasdata = 0, chunks = 0;
dataentry *de = NULL;
char reqend_str[REQEND_T_LEN];
int32_t vxid;
struct timeval latest_t = { 0 };
unsigned chunks_added = 0;
(void) priv;
if (all_wrk_abandoned())
......@@ -342,11 +446,27 @@ dispatch(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
assert(VSL_CLIENT(t->c->rec.ptr));
if (de->end == 0) {
chunk_t *chunk;
chunk = get_chunk(de);
if (chunk == NULL) {
if (debug)
LOG_Log(LOG_DEBUG, "Free chunks exhausted, "
"DATA DISCARDED: [Tx %d]", t->vxid);
data_free(de);
return status;
}
vxid = t->vxid;
snprintf(de->data, config.max_reclen, "XID=%u", t->vxid);
de->end = strlen(de->data);
de->curchunk = chunk;
/* XXX: minimum chunk size */
snprintf(de->curchunk->data, config.chunk_size, "XID=%u",
t->vxid);
de->curchunkidx = strlen(de->curchunk->data);
de->end = de->curchunkidx;
de->occupied = 1;
if (de->end > len_hi)
len_hi = de->end;
chunks_added++;
}
len = VSL_LEN(t->c->rec.ptr) - 1;
......@@ -382,7 +502,15 @@ dispatch(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
datalen, data);
if (data_type == VCL_LOG_DATA) {
append(de, tag, xid, data, datalen);
chunks = append(de, tag, xid, data, datalen);
if (chunks < 0) {
if (debug)
LOG_Log(LOG_DEBUG, "Chunks exhausted, DATA "
"DISCARDED: %.*s", datalen, data);
data_free(de);
return status;
}
chunks_added += chunks;
hasdata = 1;
}
else
......@@ -422,9 +550,16 @@ dispatch(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
snprintf(reqend_str, REQEND_T_LEN, "%s=%u.%06lu", REQEND_T_VAR,
(unsigned) latest_t.tv_sec, latest_t.tv_usec);
append(de, SLT_Timestamp, vxid, reqend_str, REQEND_T_LEN - 1);
chunks = append(de, SLT_Timestamp, vxid, reqend_str, REQEND_T_LEN - 1);
if (chunks < 0) {
if (debug)
LOG_Log(LOG_DEBUG, "Chunks exhausted, DATA DISCARDED: Tx %u", vxid);
data_free(de);
return status;
}
chunks_added += chunks;
de->occupied = 1;
MON_StatsUpdate(STATS_OCCUPANCY, 0);
MON_StatsUpdate(STATS_OCCUPANCY, chunks_added, 0);
data_submit(de);
if (term)
......@@ -751,26 +886,91 @@ int tests_run = 0;
static char
*test_append(void)
{
dataentry *entry;
dataentry entry;
chunk_t chunk, *c;
char data[DEF_MAX_RECLEN - 1], result[DEF_MAX_RECLEN];
printf("... testing data append\n");
config.max_reclen = DEF_MAX_RECLEN;
config.chunk_size = DEF_CHUNK_SIZE;
config.max_records = DEF_MAX_RECORDS;
MAZ(DATA_Init());
entry.magic = DATA_MAGIC;
VSTAILQ_INIT(&entry.chunks);
chunk.magic = CHUNK_MAGIC;
chunk.data = (char *) calloc(1, config.max_reclen);
VSTAILQ_INSERT_TAIL(&entry.chunks, &chunk, chunklist);
entry.curchunk = &chunk;
entry.curchunkidx = 0;
entry.end = 0;
entry.occupied = 1;
chunk.occupied = 1;
truncated = len_overflows = len_hi = 0;
strcpy(config.log_file, "-");
AZ(LOG_Open("test_append"));
for (int i = 0; i < DEF_MAX_RECLEN - 1; i++)
data[i] = (i % 10) + '0';
append(&entry, SLT_VCL_Log, 12345678, data, DEF_MAX_RECLEN - 1);
MASSERT(entry.end == DEF_MAX_RECLEN);
MASSERT(len_hi == DEF_MAX_RECLEN);
MAZ(truncated);
MAZ(len_overflows);
int idx = 0;
int n = entry.end;
c = VSTAILQ_FIRST(&entry.chunks);
while (n > 0) {
CHECK_OBJ_NOTNULL(c, CHUNK_MAGIC);
int cp = n;
if (cp > config.chunk_size)
cp = config.chunk_size;
memcpy(&result[idx], c->data, cp);
n -= cp;
idx += cp;
c = VSTAILQ_NEXT(c, chunklist);
}
MASSERT(result[0] == '&');
MASSERT(memcmp(&result[1], data, DEF_MAX_RECLEN - 1) == 0);
return NULL;
}
static char
*test_truncated(void)
{
dataentry entry;
chunk_t chunk;
char data_with_null[8];
printf("... testing data append (expect an ERR)\n");
printf("... testing data append with truncated data (expect an ERR)\n");
config.max_reclen = DEF_MAX_RECLEN;
entry = calloc(1, sizeof(dataentry));
AN(entry);
entry->data = calloc(1, config.max_reclen);
AN(entry->data);
entry->magic = DATA_MAGIC;
config.chunk_size = DEF_CHUNK_SIZE;
entry.magic = DATA_MAGIC;
VSTAILQ_INIT(&entry.chunks);
chunk.magic = CHUNK_MAGIC;
chunk.data = (char *) calloc(1, config.max_reclen);
VSTAILQ_INSERT_TAIL(&entry.chunks, &chunk, chunklist);
entry.curchunk = &chunk;
entry.curchunkidx = 0;
entry.end = 0;
entry.occupied = 1;
chunk.occupied = 1;
truncated = len_hi = 0;
strcpy(config.log_file, "-");
AZ(LOG_Open("test_append"));
memcpy(data_with_null, "foo\0bar", 8);
append(entry, SLT_VCL_Log, 12345678, data_with_null, 7);
append(&entry, SLT_VCL_Log, 12345678, data_with_null, 7);
MASSERT(memcmp(entry->data, "&foo\0\0", 6) == 0);
MASSERT(entry->end == 4);
MASSERT(memcmp(chunk.data, "&foo\0\0", 6) == 0);
MASSERT(entry.end == 4);
MASSERT(entry.curchunkidx == 4);
MASSERT(truncated == 1);
MASSERT(len_hi == 4);
......@@ -781,6 +981,7 @@ static const char
*all_tests(void)
{
mu_run_test(test_append);
mu_run_test(test_truncated);
return NULL;
}
......
......@@ -125,6 +125,7 @@ CONF_Add(const char *lval, const char *rval)
confUnsigned("max.reclen", max_reclen);
confUnsigned("maxkeylen", maxkeylen);
confUnsigned("chunk.size", chunk_size);
confUnsigned("qlen.goal", qlen_goal);
confUnsigned("nworkers", nworkers);
confUnsigned("restarts", restarts);
......@@ -215,6 +216,7 @@ CONF_Init(void)
config.monitor_workers = false;
config.max_records = DEF_MAX_RECORDS;
config.max_reclen = DEF_MAX_RECLEN;
config.chunk_size = DEF_CHUNK_SIZE;
config.maxkeylen = DEF_MAXKEYLEN;
config.qlen_goal = DEF_QLEN_GOAL;
config.idle_pause = DEF_IDLE_PAUSE;
......@@ -267,6 +269,7 @@ CONF_Dump(int level)
config.monitor_workers ? "true" : "false");
confdump(level, "max.records = %u", config.max_records);
confdump(level, "max.reclen = %u", config.max_reclen);
confdump(level, "chunk.size = %u", config.chunk_size);
confdump(level, "maxkeylen = %u", config.maxkeylen);
confdump(level, "qlen.goal = %u", config.qlen_goal);
......
......@@ -40,6 +40,7 @@
#include "vas.h"
#include "miniobj.h"
#include "vsb.h"
/* Preprend head2 before head1, result in head1, head2 empty afterward */
#define VSTAILQ_PREPEND(head1, head2) do { \
......@@ -53,108 +54,181 @@
VSTAILQ_INIT((head2)); \
} while (0)
static pthread_mutex_t freelist_lock;
static char *buf;
static pthread_mutex_t freerec_lock, freechunk_lock;
static char *buf, *keybuf;
static void
data_Cleanup(void)
{
free(chunktbl);
free(entrytbl);
free(keybuf);
free(buf);
AZ(pthread_mutex_destroy(&freelist_lock));
AZ(pthread_mutex_destroy(&freerec_lock));
AZ(pthread_mutex_destroy(&freechunk_lock));
}
int
DATA_Init(void)
{
unsigned bufsize = config.max_reclen + config.maxkeylen;
/*
* we want enough space to accomodate all open and done records
*
*/
unsigned chunks_per_rec
= (config.max_reclen + config.chunk_size - 1) / config.chunk_size;
unsigned nchunks = chunks_per_rec * config.max_records;
entrytbl = (dataentry *) calloc(config.max_records, sizeof(dataentry));
if (entrytbl == NULL)
return(errno);
buf = (char *) calloc(config.max_records, bufsize);
chunktbl = (chunk_t *) calloc(nchunks, sizeof(chunk_t));
if (chunktbl == NULL) {
free(entrytbl);
return(errno);
}
buf = (char *) calloc(nchunks, config.chunk_size);
if (buf == NULL) {
free(entrytbl);
free(chunktbl);
return(errno);
}
VSTAILQ_INIT(&freehead);
AZ(pthread_mutex_init(&freelist_lock, NULL));
keybuf = (char *) calloc(config.max_records, config.maxkeylen);
if (keybuf == NULL) {
free(entrytbl);
free(chunktbl);
free(buf);
return(errno);
}
VSTAILQ_INIT(&freechunkhead);
VSTAILQ_INIT(&freerechead);
AZ(pthread_mutex_init(&freerec_lock, NULL));
AZ(pthread_mutex_init(&freechunk_lock, NULL));
global_nfree_rec = config.max_records;
global_nfree_chunk = nchunks;
global_nfree = 0;
for (int i = 0; i < nchunks; i++) {
chunktbl[i].magic = CHUNK_MAGIC;
chunktbl[i].data = &buf[i * config.chunk_size];
VSTAILQ_INSERT_TAIL(&freechunkhead, &chunktbl[i], freelist);
}
for (unsigned i = 0; i < config.max_records; i++) {
entrytbl[i].magic = DATA_MAGIC;
entrytbl[i].data = &buf[i * bufsize];
entrytbl[i].key = &buf[(i * bufsize) + config.max_reclen];
VSTAILQ_INSERT_TAIL(&freehead, &entrytbl[i], freelist);
global_nfree++;
entrytbl[i].key = &keybuf[(i * config.maxkeylen)];
VSTAILQ_INIT(&entrytbl[i].chunks);
VSTAILQ_INSERT_TAIL(&freerechead, &entrytbl[i], freelist);
}
assert(global_nfree == config.max_records);
assert(VSTAILQ_FIRST(&freehead));
atexit(data_Cleanup);
return(0);
}
void
DATA_Reset(dataentry *entry)
unsigned
DATA_Reset(dataentry *entry, chunkhead_t * const freechunk)
{
chunk_t *chunk;
unsigned nchunk = 0;
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
entry->occupied = 0;
entry->end = 0;
*entry->data = '\0';
entry->keylen = 0;
*entry->key = '\0';
entry->curchunk = NULL;
entry->curchunkidx = 0;
while ((chunk = VSTAILQ_FIRST(&entry->chunks)) != NULL) {
CHECK_OBJ(chunk, CHUNK_MAGIC);
chunk->occupied = 0;
*chunk->data = '\0';
VSTAILQ_REMOVE_HEAD(&entry->chunks, chunklist);
VSTAILQ_INSERT_HEAD(freechunk, chunk, freelist);
nchunk++;
}
assert(VSTAILQ_EMPTY(&entry->chunks));
return nchunk;
}
/*
* take all free entries from the datatable for lockless
* allocation
* prepend a global freelist to the reader's freelist for access with rare
* locking
*/
unsigned
DATA_Take_Freelist(struct freehead_s *dst)
{
unsigned nfree;
AZ(pthread_mutex_lock(&freelist_lock));
nfree = global_nfree;
global_nfree = 0;
VSTAILQ_PREPEND(dst, &freehead);
AZ(pthread_mutex_unlock(&freelist_lock));
return nfree;
#define DATA_Take_Free(type) \
unsigned \
DATA_Take_Free##type(struct type##head_s *dst) \
{ \
unsigned nfree; \
\
AZ(pthread_mutex_lock(&free##type##_lock)); \
VSTAILQ_PREPEND(dst, &free##type##head); \
nfree = global_nfree_##type; \
global_nfree_##type = 0; \
AZ(pthread_mutex_unlock(&free##type##_lock)); \
return nfree; \
}
DATA_Take_Free(rec)
DATA_Take_Free(chunk)
/*
* return to freehead
*
* return to global freelist
* returned must be locked by caller, if required
*/
void
DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
{
AZ(pthread_mutex_lock(&freelist_lock));
VSTAILQ_PREPEND(&freehead, returned);
global_nfree += nreturned;
AZ(pthread_mutex_unlock(&freelist_lock));
#define DATA_Return_Free(type) \
void \
DATA_Return_Free##type(struct type##head_s *returned, unsigned nreturned) \
{ \
AZ(pthread_mutex_lock(&free##type##_lock)); \
VSTAILQ_PREPEND(&free##type##head, returned); \
global_nfree_##type += nreturned; \
AZ(pthread_mutex_unlock(&free##type##_lock)); \
}
DATA_Return_Free(rec)
DATA_Return_Free(chunk)
void
DATA_Dump(void)
{
struct vsb *data = VSB_new_auto();
for (int i = 0; i < config.max_records; i++) {
dataentry *entry = &entrytbl[i];
if (entry == NULL)
continue;
if (entry->magic != DATA_MAGIC) {
LOG_Log(LOG_ERR, "Invalid data entry at index %d, magic = 0x%08x, "
"expected 0x%08x", i, entry->magic, DATA_MAGIC);
}
if (!OCCUPIED(entry))
continue;
VSB_clear(data);
if (entry->end) {
int n = entry->end;
chunk_t *chunk = VSTAILQ_FIRST(&entry->chunks);
while (n > 0 && chunk != NULL) {
if (chunk->magic != CHUNK_MAGIC) {
LOG_Log(LOG_ERR,
"Invalid chunk at index %d, magic = 0x%08x, "
"expected 0x%08x",
i, chunk->magic, CHUNK_MAGIC);
continue;
}
int cp = n;
if (cp > config.chunk_size)
cp = config.chunk_size;
VSB_bcat(data, chunk->data, cp);
n -= cp;
chunk = VSTAILQ_NEXT(chunk, chunklist);
}
}
VSB_finish(data);
LOG_Log(LOG_INFO,
"Data entry %d: data=[%.*s] key=[%.*s]",
i, entry->end, entry->data, entry->keylen, entry->key);
i, entry->end, VSB_data(data), entry->keylen, entry->key);
}
}
......@@ -30,8 +30,10 @@
*
*/
/* Head of the global free list */
struct freehead_s freehead;
/* Heads of the global free lists */
struct rechead_s freerechead;
chunkhead_t freechunkhead;
/* Table of data entries */
/* Tables of records and chunks */
dataentry *entrytbl;
chunk_t *chunktbl;
......@@ -42,6 +42,7 @@ static int run;
static pthread_mutex_t mutex;
static unsigned occ = 0;
static unsigned occ_chunk = 0;
static unsigned long sent = 0; /* Sent successfully to MQ */
static unsigned long bytes = 0; /* Total bytes successfully sent */
static unsigned long failed = 0; /* MQ send fails */
......@@ -50,6 +51,8 @@ static unsigned long restarts = 0; /* Worker thread restarts */
static unsigned occ_hi = 0; /* Occupancy high water mark */
static unsigned occ_hi_this = 0;/* Occupancy high water mark
this reporting interval */
static unsigned occ_chunk_hi = 0;
static unsigned occ_chunk_hi_this = 0;
static void
log_output(void)
......@@ -61,9 +64,12 @@ log_output(void)
if (wrk_running > wrk_running_hi)
wrk_running_hi = wrk_running;
LOG_Log(LOG_INFO, "Data table: len=%u occ=%u occ_hi=%u occ_hi_this=%u "
"global_free=%u",
config.max_records, occ, occ_hi, occ_hi_this, global_nfree);
LOG_Log(LOG_INFO, "Data table: len=%u occ_rec=%u occ_rec_hi=%u "
"occ_rec_hi_this=%u occ_chunk=%u occ_chunk_hi=%u "
"occ_chunk_hi_this=%u global_free_rec=%u global_free_chunk=%u",
config.max_records, occ, occ_hi, occ_hi_this, occ_chunk,
occ_chunk_hi, occ_chunk_hi_this, global_nfree_rec,
global_nfree_chunk);
/* Eliminate the dependency of trackrdrd.o for unit tests */
#ifndef TEST_DRIVER
......@@ -82,6 +88,7 @@ log_output(void)
/* locking would be overkill */
occ_hi_this = 0;
occ_chunk_hi_this = 0;
if (config.monitor_workers)
WRK_Stats();
......@@ -154,20 +161,22 @@ MON_StatsInit(void)
}
void
MON_StatsUpdate(stats_update_t update, unsigned n)
MON_StatsUpdate(stats_update_t update, unsigned nchunks, unsigned nbytes)
{
AZ(pthread_mutex_lock(&mutex));
switch(update) {
case STATS_SENT:
sent++;
bytes += n;
bytes += nbytes;
occ--;
occ_chunk -= nchunks;
break;
case STATS_FAILED:
failed++;
occ--;
occ_chunk -= nchunks;
break;
case STATS_RECONNECT:
......@@ -176,10 +185,15 @@ MON_StatsUpdate(stats_update_t update, unsigned n)
case STATS_OCCUPANCY:
occ++;
occ_chunk += nchunks;
if (occ > occ_hi)
occ_hi = occ;
if (occ > occ_hi_this)
occ_hi_this = occ;
if (occ_chunk > occ_chunk_hi)
occ_chunk_hi = occ_chunk;
if (occ_chunk > occ_chunk_hi_this)
occ_chunk_hi_this = occ_chunk;
break;
case STATS_RESTART:
......
......@@ -27,7 +27,8 @@ test_data_SOURCES = \
test_data_LDADD = \
../data.$(OBJEXT) \
../assert.$(OBJEXT)
../assert.$(OBJEXT) \
@VARNISH_LIBS@ -L${VARNISH_PKG_LIB}/varnish -lvarnish
test_append_SOURCES = \
../child.c \
......@@ -46,7 +47,7 @@ test_append_LDADD = \
../config_common.$(OBJEXT) \
../sandbox.$(OBJEXT) \
../vtim.$(OBJEXT) \
@VARNISH_LIBS@
@VARNISH_LIBS@ -L${VARNISH_PKG_LIB}/varnish -lvarnish
test_append_CFLAGS = -DTEST_DRIVER
......@@ -97,4 +98,5 @@ test_worker_LDADD = \
../spmcq.$(OBJEXT) \
../data.$(OBJEXT) \
../assert.$(OBJEXT) \
../vtim.$(OBJEXT)
../vtim.$(OBJEXT) \
-L${VARNISH_PKG_LIB}/varnish -lvarnish
......@@ -29,14 +29,14 @@ rm -f $LOG $MSG
# "Not running as root" filtered so that the test is independent of
# the user running it
CKSUM=$( grep -v 'Worker 1' $LOG | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '3218424934 214113' ]; then
if [ "$CKSUM" != '2709219299 214213' ]; then
echo "ERROR: Regression test incorrect log cksum: $CKSUM"
exit 1
fi
# Now check the logs from the worker thread
CKSUM=$( grep 'Worker 1' $LOG | cksum)
if [ "$CKSUM" != '1219614274 35546' ]; then
if [ "$CKSUM" != '1419634713 52909' ]; then
echo "ERROR: Regression test incorrect output cksum: $CKSUM"
exit 1
fi
......
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* Copyright (c) 2012-2015 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2015 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
......@@ -38,106 +38,269 @@
int tests_run = 0;
static unsigned nfree = 0;
static struct rechead_s local_freerechead
= VSTAILQ_HEAD_INITIALIZER(local_freerechead);
static chunkhead_t local_freechunk = VSTAILQ_HEAD_INITIALIZER(local_freechunk);
static struct freehead_s local_freehead
= VSTAILQ_HEAD_INITIALIZER(local_freehead);
unsigned nchunks;
/* N.B.: Always run this test first */
static char
*test_data_init(void)
{
int err;
unsigned chunks_per_rec, free_chunk = 0, free_rec = 0;
chunk_t *chunk;
dataentry *entry;
printf("... testing data table initialization\n");
config.max_records = DEF_MAX_RECORDS;
config.max_reclen = DEF_MAX_RECLEN;
config.maxkeylen = DEF_MAXKEYLEN;
config.chunk_size = DEF_CHUNK_SIZE;
err = DATA_Init();
VMASSERT(err == 0, "DATA_Init: %s", strerror(err));
MAN(entrytbl);
MAN(chunktbl);
MASSERT(!VSTAILQ_EMPTY(&freerechead));
MASSERT(!VSTAILQ_EMPTY(&freechunkhead));
chunks_per_rec = (DEF_MAX_RECLEN + DEF_CHUNK_SIZE - 1) / DEF_CHUNK_SIZE;
nchunks = chunks_per_rec * DEF_MAX_RECORDS;
MASSERT(global_nfree_chunk == nchunks);
MASSERT(global_nfree_rec == DEF_MAX_RECORDS);
for (int i = 0; i < nchunks; i++) {
MCHECK_OBJ_NOTNULL(&chunktbl[i], CHUNK_MAGIC);
MASSERT(!OCCUPIED(&chunktbl[i]));
MAN(chunktbl[i].data);
}
for (int i = 0; i < config.max_records; i++) {
MCHECK_OBJ_NOTNULL(&entrytbl[i], DATA_MAGIC);
MASSERT(!OCCUPIED(&entrytbl[i]));
MAN(entrytbl[i].data);
MASSERT(VSTAILQ_EMPTY(&entrytbl[i].chunks));
MAN(entrytbl[i].key);
MAZ(entrytbl[i].end);
MAZ(entrytbl[i].keylen);
MAZ(entrytbl[i].curchunk);
MAZ(entrytbl[i].curchunkidx);
}
VSTAILQ_FOREACH(chunk, &freechunkhead, freelist) {
MCHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
free_chunk++;
}
MASSERT(free_chunk == global_nfree_chunk);
VSTAILQ_FOREACH(entry, &freerechead, freelist) {
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
free_rec++;
}
MASSERT(free_rec == global_nfree_rec);
return NULL;
}
static const char
*test_data_set_get(void)
*test_data_take_rec(void)
{
char data[DEF_MAX_RECLEN], key[DEF_MAXKEYLEN];
printf("... testing data write and read\n");
unsigned nfree, cfree = 0;
dataentry *entry;
for (int i = 0; i < config.max_records; i++) {
memset(entrytbl[i].data, 'd', DEF_MAX_RECLEN);
memset(entrytbl[i].key, 'k', DEF_MAXKEYLEN);
printf("... testing record freelist take\n");
nfree = DATA_Take_Freerec(&local_freerechead);
MASSERT(nfree == config.max_records);
MASSERT(!VSTAILQ_EMPTY(&local_freerechead));
MAZ(global_nfree_rec);
MASSERT(VSTAILQ_EMPTY(&freerechead));
VSTAILQ_FOREACH(entry, &local_freerechead, freelist) {
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
cfree++;
}
MASSERT(nfree == cfree);
memset(data, 'd', DEF_MAX_RECLEN);
memset(key, 'k', DEF_MAXKEYLEN);
return NULL;
}
for (int i = 0; i < config.max_records; i++) {
MASSERT(memcmp(entrytbl[i].data, data, DEF_MAX_RECLEN) == 0);
MASSERT(memcmp(entrytbl[i].key, key, DEF_MAXKEYLEN) == 0);
static const char
*test_data_take_chunk(void)
{
unsigned nfree, cfree = 0;
chunk_t *chunk;
printf("... testing chunk freelist take\n");
nfree = DATA_Take_Freechunk(&local_freechunk);
MASSERT(nfree == nchunks);
MASSERT(!VSTAILQ_EMPTY(&local_freechunk));
MAZ(global_nfree_chunk);
MASSERT(VSTAILQ_EMPTY(&freechunkhead));
VSTAILQ_FOREACH(chunk, &local_freechunk, freelist) {
MCHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
cfree++;
}
MASSERT(nfree == cfree);
return NULL;
}
static const char
*test_data_take(void)
*test_data_return_rec(void)
{
printf("... testing freelist take\n");
unsigned cfree = 0;
dataentry *entry;
nfree = DATA_Take_Freelist(&local_freehead);
printf("... testing record freelist return\n");
MASSERT(nfree == config.max_records);
DATA_Return_Freerec(&local_freerechead, config.max_records);
MASSERT0(!VSTAILQ_EMPTY(&local_freehead),
"Local freelist empty after take");
VMASSERT(global_nfree == 0, "Global free count non-zero after take (%u)",
global_nfree);
MASSERT(VSTAILQ_EMPTY(&local_freerechead));
MASSERT(global_nfree_rec == DEF_MAX_RECORDS);
MASSERT(!VSTAILQ_EMPTY(&freerechead));
VSTAILQ_FOREACH(entry, &freerechead, freelist) {
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
cfree++;
}
MASSERT(global_nfree_rec == cfree);
return NULL;
}
static const char
*test_data_return_chunk(void)
{
unsigned cfree = 0;
chunk_t *chunk;
MASSERT0(VSTAILQ_EMPTY(&freehead),
"Global free list non-empty after take");
printf("... testing chunk freelist return\n");
DATA_Return_Freechunk(&local_freechunk, nchunks);
MASSERT(VSTAILQ_EMPTY(&local_freechunk));
MASSERT(global_nfree_chunk == nchunks);
MASSERT(!VSTAILQ_EMPTY(&freechunkhead));
VSTAILQ_FOREACH(chunk, &freechunkhead, freelist) {
MCHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
cfree++;
}
MASSERT(global_nfree_chunk == cfree);
return NULL;
}
static const char
*test_data_return(void)
*test_data_prepend(void)
{
printf("... testing freelist return\n");
dataentry *entry;
int n = 0;
DATA_Return_Freelist(&local_freehead, nfree);
printf("... testing freelist prepend\n");
MASSERT0(VSTAILQ_EMPTY(&local_freehead),
"Local freelist non-empty after return");
MASSERT(global_nfree == DEF_MAX_RECORDS);
MASSERT(VSTAILQ_EMPTY(&local_freerechead));
/* Return an empty list */
DATA_Return_Freerec(&local_freerechead, 0);
MASSERT(VSTAILQ_EMPTY(&local_freerechead));
MASSERT(global_nfree_rec == config.max_records);
MASSERT0(!VSTAILQ_EMPTY(&freehead),
"Global free list empty after take");
DATA_Take_Freerec(&local_freerechead);
VSTAILQ_INIT(&local_freerechead);
/* insert the first 10 records to the local list */
for (int i = 0; i < 10; i++)
VSTAILQ_INSERT_TAIL(&local_freerechead, &entrytbl[i], freelist);
/* Prepend them to the global free list */
DATA_Return_Freerec(&local_freerechead, 10);
/* insert the next 10 records */
VSTAILQ_INIT(&local_freerechead);
for (int i = 10; i < 20; i++)
VSTAILQ_INSERT_TAIL(&local_freerechead, &entrytbl[i], freelist);
/* Prepend them to the global list */
DATA_Return_Freerec(&local_freerechead, 10);
/*
* Take the global list, and verify that records 10-19 are at the front,
* followed by records 0-9.
*/
DATA_Take_Freerec(&local_freerechead);
VSTAILQ_FOREACH(entry, &local_freerechead, freelist) {
if (n < 10)
MASSERT(entry == &entrytbl[n + 10]);
else
MASSERT(entry == &entrytbl[n - 10]);
n++;
}
MASSERT(n == 20);
return NULL;
}
static const char
*test_data_clear(void)
{
#define CHUNKS_PER_REC 4
dataentry entry;
chunk_t c[CHUNKS_PER_REC], *chunk;
int n = 0;
unsigned nfree_chunks;
printf("... testing data record clear\n");
VSTAILQ_INIT(&local_freechunk);
entry.magic = DATA_MAGIC;
VSTAILQ_INIT(&entry.chunks);
entry.end = 4711;
entry.keylen = 815;
entry.key = (char *) malloc(config.maxkeylen);
for (int i = 0; i < CHUNKS_PER_REC; i++) {
VSTAILQ_INSERT_TAIL(&entry.chunks, &c[i], chunklist);
c[i].magic = CHUNK_MAGIC;
c[i].data = (char *) malloc(config.chunk_size);
}
nfree_chunks = DATA_Reset(&entry, &local_freechunk);
MASSERT(nfree_chunks == CHUNKS_PER_REC);
MCHECK_OBJ(&entry, DATA_MAGIC);
MASSERT(!OCCUPIED(&entry));
MAZ(entry.end);
MAZ(entry.keylen);
MAZ(entry.curchunk);
MAZ(entry.curchunkidx);
MASSERT(EMPTY(entry.key));
MASSERT(VSTAILQ_EMPTY(&entry.chunks));
free(entry.key);
MASSERT(!VSTAILQ_EMPTY(&local_freechunk));
VSTAILQ_FOREACH(chunk, &local_freechunk, freelist) {
MCHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
MASSERT(!OCCUPIED(chunk));
MAZ(chunk->data[0]);
n++;
free(chunk->data);
}
MASSERT(n == CHUNKS_PER_REC);
return NULL;
}
static const char
*all_tests(void)
{
mu_run_test(test_data_init);
mu_run_test(test_data_set_get);
mu_run_test(test_data_take);
mu_run_test(test_data_return);
mu_run_test(test_data_take_rec);
mu_run_test(test_data_take_chunk);
mu_run_test(test_data_return_rec);
mu_run_test(test_data_return_chunk);
mu_run_test(test_data_prepend);
mu_run_test(test_data_clear);
return NULL;
}
......
......@@ -87,7 +87,7 @@ static void
proddata.fail = PRODUCER_BCAST;
pthread_exit(&proddata);
}
proddata.sum += (uintptr_t) entries[i].data;
proddata.sum += (uintptr_t) &entries[i].chunks;
}
debug_print("%s\n", "Producer: exit");
pthread_exit((void *) &proddata);
......@@ -135,13 +135,13 @@ static void
} else {
/* entry != NULL */
debug_print("Consumer %d: dequeue %d\n", id, ++deqs);
pcdata->sum += (uintptr_t) entry->data;
pcdata->sum += (uintptr_t) &entry->chunks;
}
}
debug_print("Consumer %d: drain queue, run = %d\n", id, run);
while ((entry = SPMCQ_Deq()) != NULL) {
debug_print("Consumer %d: dequeue %d\n", id, ++deqs);
pcdata->sum += (uintptr_t) entry->data;
pcdata->sum += (uintptr_t) &entry->chunks;
}
debug_print("Consumer %d: exit\n", id);
pthread_exit((void *) pcdata);
......
......@@ -54,10 +54,19 @@ static void *mqh;
/* Called from worker.c, but we don't want to pull in all of monitor.c's
dependecies. */
void
MON_StatsUpdate(stats_update_t update, unsigned n)
MON_StatsUpdate(stats_update_t update, unsigned nchunks, unsigned nbytes)
{
(void) update;
(void) n;
(void) nchunks;
(void) nbytes;
}
/* Called from worker.c, but we don't want to pull in all of child.c's
dependecies. */
int
RDR_Exhausted(void)
{
return 0;
}
static void
......@@ -104,6 +113,7 @@ static char
config.max_reclen = DEF_MAX_RECLEN;
config.maxkeylen = DEF_MAXKEYLEN;
config.nworkers = NWORKERS;
config.chunk_size = DEF_CHUNK_SIZE;
strcpy(config.mq_config_file, MQ_CONFIG);
error = mqf.global_init(config.nworkers, config.mq_config_file);
......@@ -126,6 +136,7 @@ static const char
*test_worker_run(void)
{
dataentry *entry;
chunk_t *chunk;
printf("... testing run of %d workers\n", NWORKERS);
......@@ -142,8 +153,14 @@ static const char
for (int i = 0; i < config.max_records; i++) {
entry = &entrytbl[i];
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
sprintf(entry->data, "foo=bar&baz=quux&record=%d", i+1);
entry->end = strlen(entry->data);
chunk = &chunktbl[i];
MCHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
chunk->data = (char *) malloc(sizeof("foo=bar&baz=quux&record=9999"));
sprintf(chunk->data, "foo=bar&baz=quux&record=%d", i+1);
chunk->occupied = 1;
VSTAILQ_INSERT_TAIL(&entry->chunks, chunk, chunklist);
entry->end = strlen(chunk->data);
entry->occupied = 1;
SPMCQ_Enq(entry);
}
......@@ -163,9 +180,11 @@ static const char
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
MASSERT(!OCCUPIED(entry));
MAZ(entry->end);
MAZ(*entry->data);
MAZ(entry->keylen);
MAZ(*entry->key);
MAZ(entry->curchunk);
MAZ(entry->curchunkidx);
MASSERT(VSTAILQ_EMPTY(&entry->chunks));
}
return NULL;
......
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* Copyright (c) 2012-2015 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2015 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
......@@ -112,29 +112,42 @@ void WRK_Shutdown(void);
#define OCCUPIED(e) ((e)->occupied == 1)
unsigned global_nfree;
unsigned global_nfree_rec, global_nfree_chunk;
typedef struct chunk_t {
unsigned magic;
#define CHUNK_MAGIC 0x224a86ed
char *data;
VSTAILQ_ENTRY(chunk_t) freelist;
VSTAILQ_ENTRY(chunk_t) chunklist;
unsigned char occupied;
} chunk_t;
typedef VSTAILQ_HEAD(chunkhead_s, chunk_t) chunkhead_t;
struct dataentry_s {
unsigned magic;
#define DATA_MAGIC 0xb41cb1e1
chunkhead_t chunks;
VSTAILQ_ENTRY(dataentry_s) freelist;
VSTAILQ_ENTRY(dataentry_s) spmcq;
char *data;
char *key;
unsigned end; /* End of string index in data */
chunk_t *curchunk;
unsigned curchunkidx;
unsigned keylen;
char occupied;
unsigned end; /* End of string index in data */
unsigned char occupied;
};
typedef struct dataentry_s dataentry;
VSTAILQ_HEAD(freehead_s, dataentry_s);
VSTAILQ_HEAD(rechead_s, dataentry_s);
int DATA_Init(void);
void DATA_Reset(dataentry *entry);
unsigned DATA_Take_Freelist(struct freehead_s *dst);
void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
unsigned DATA_Reset(dataentry *entry, chunkhead_t * const freechunk);
unsigned DATA_Take_Freerec(struct rechead_s *dst);
void DATA_Return_Freerec(struct rechead_s *returned, unsigned nreturned);
unsigned DATA_Take_Freechunk(struct chunkhead_s *dst);
void DATA_Return_Freechunk(struct chunkhead_s *returned, unsigned nreturned);
void DATA_Dump(void);
/* spmcq.c */
......@@ -154,6 +167,7 @@ int spmcq_datawaiter;
/* child.c */
void RDR_Stats(void);
void CHILD_Main(int readconfig);
int RDR_Exhausted(void);
/* config.c */
#define EMPTY(s) (s[0] == '\0')
......@@ -206,6 +220,8 @@ struct config {
unsigned restarts;
unsigned restart_pause;
unsigned thread_restarts;
unsigned chunk_size;
#define DEF_CHUNK_SIZE 64
} config;
void CONF_Init(void);
......@@ -259,7 +275,7 @@ void *MON_StatusThread(void *arg);
void MON_Output(void);
void MON_StatusShutdown(pthread_t monitor);
void MON_StatsInit(void);
void MON_StatsUpdate(stats_update_t update, unsigned n);
void MON_StatsUpdate(stats_update_t update, unsigned nchunks, unsigned nbytes);
/* parse.c */
......
......@@ -39,6 +39,7 @@
#include "trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
#include "vsb.h"
#define VERSION_LEN 64
#define CLIENT_ID_LEN 80
......@@ -72,10 +73,13 @@ struct worker_data_s {
unsigned id;
unsigned status; /* exit status */
wrk_state_e state;
struct vsb *sb;
/* per-worker freelist - return space in chunks */
struct freehead_s wrk_freelist;
unsigned wrk_nfree;
/* per-worker freelists */
struct rechead_s freerec;
unsigned nfree_rec;
chunkhead_t freechunk;
unsigned nfree_chunk;
/* stats */
unsigned long deqs;
......@@ -95,11 +99,13 @@ typedef struct {
worker_data_t *wrk_data;
} thread_data_t;
static unsigned run, cleaned = 0;
static unsigned run, cleaned = 0, rec_thresh, chunk_thresh;
static thread_data_t *thread_data;
static pthread_mutex_t running_lock;
static char empty[1] = "";
static void
wrk_log_connection(void *mq_worker, unsigned id)
{
......@@ -120,26 +126,77 @@ wrk_log_connection(void *mq_worker, unsigned id)
clientID);
}
static char *
wrk_get_data(dataentry *entry, worker_data_t *wrk) {
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(OCCUPIED(entry));
if (entry->end == 0)
return empty;
chunk_t *chunk = VSTAILQ_FIRST(&entry->chunks);
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
assert(OCCUPIED(chunk));
if (entry->end <= config.chunk_size)
return chunk->data;
VSB_clear(wrk->sb);
int n = entry->end;
while (n > 0) {
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
int cp = n;
if (cp > config.chunk_size)
cp = config.chunk_size;
VSB_bcat(wrk->sb, chunk->data, cp);
n -= cp;
chunk = VSTAILQ_NEXT(chunk, chunklist);
}
assert(VSB_len(wrk->sb) == entry->end);
VSB_finish(wrk->sb);
return VSB_data(wrk->sb);
}
static inline void
wrk_return_freelist(worker_data_t *wrk)
{
if (wrk->nfree_rec > 0) {
DATA_Return_Freerec(&wrk->freerec, wrk->nfree_rec);
LOG_Log(LOG_DEBUG, "Worker %d: returned %u records to free list",
wrk->id, wrk->nfree_rec);
wrk->nfree_rec = 0;
assert(VSTAILQ_EMPTY(&wrk->freerec));
}
if (wrk->nfree_chunk > 0) {
DATA_Return_Freechunk(&wrk->freechunk, wrk->nfree_chunk);
LOG_Log(LOG_DEBUG, "Worker %d: returned %u chunks to free list",
wrk->id, wrk->nfree_chunk);
wrk->nfree_chunk = 0;
assert(VSTAILQ_EMPTY(&wrk->freechunk));
}
}
static inline void
wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
{
char *data;
const char *err;
int errnum;
stats_update_t stat = STATS_FAILED;
unsigned bytes = 0;
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(OCCUPIED(entry));
AN(mq_worker);
AZ(memchr(entry->data, '\0', entry->end));
errnum = mqf.send(*mq_worker, entry->data, entry->end,
data = wrk_get_data(entry, wrk);
AZ(memchr(data, '\0', entry->end));
errnum = mqf.send(*mq_worker, data, entry->end,
entry->key, entry->keylen, &err);
if (errnum != 0) {
LOG_Log(LOG_WARNING, "Worker %d: Failed to send data: %s",
wrk->id, err);
if (errnum > 0) {
if (errnum > 0)
wrk->recoverables++;
MON_StatsUpdate(STATS_FAILED, 0);
}
else {
/* Non-recoverable error */
LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id);
......@@ -149,29 +206,25 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id,
err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED, 0);
entry->end, data);
}
else {
wrk->reconnects++;
wrk_log_connection(*mq_worker, wrk->id);
MON_StatsUpdate(STATS_RECONNECT, 0);
errnum = mqf.send(*mq_worker, entry->data, entry->end,
MON_StatsUpdate(STATS_RECONNECT, 0, 0);
errnum = mqf.send(*mq_worker, data, entry->end,
entry->key, entry->keylen, &err);
if (errnum != 0) {
LOG_Log(LOG_WARNING, "Worker %d: Failed to send data "
"after reconnect: %s", wrk->id, err);
if (errnum > 0) {
if (errnum > 0)
wrk->recoverables++;
MON_StatsUpdate(STATS_FAILED, 0);
}
else {
/* Fail after reconnect, give up */
wrk->fails++;
wrk->status = EXIT_FAILURE;
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]",
wrk->id, entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED, 0);
wrk->id, entry->end, data);
}
}
}
......@@ -180,19 +233,20 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
if (errnum == 0) {
wrk->sends++;
wrk->bytes += entry->end;
MON_StatsUpdate(STATS_SENT, entry->end);
stat = STATS_SENT;
bytes = entry->end;
LOG_Log(LOG_DEBUG, "Worker %d: Successfully sent data [%.*s]", wrk->id,
entry->end, entry->data);
}
DATA_Reset(entry);
VSTAILQ_INSERT_TAIL(&wrk->wrk_freelist, entry, freelist);
wrk->wrk_nfree++;
if (global_nfree == 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
assert(VSTAILQ_EMPTY(&wrk->wrk_freelist));
entry->end, data);
}
unsigned chunks = DATA_Reset(entry, &wrk->freechunk);
MON_StatsUpdate(stat, chunks, bytes);
VSTAILQ_INSERT_HEAD(&wrk->freerec, entry, freelist);
wrk->nfree_rec++;
wrk->nfree_chunk += chunks;
if (RDR_Exhausted() || wrk->nfree_rec > rec_thresh
|| wrk->nfree_chunk > chunk_thresh)
wrk_return_freelist(wrk);
}
static void
......@@ -220,8 +274,10 @@ static void
}
wrk_log_connection(mq_worker, wrk->id);
VSTAILQ_INIT(&wrk->wrk_freelist);
wrk->wrk_nfree = 0;
VSTAILQ_INIT(&wrk->freerec);
wrk->nfree_rec = 0;
VSTAILQ_INIT(&wrk->freechunk);
wrk->nfree_chunk = 0;
wrk->state = WRK_RUNNING;
AZ(pthread_mutex_lock(&running_lock));
......@@ -240,10 +296,7 @@ static void
}
/* return space before sleeping */
if (wrk->wrk_nfree > 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
}
wrk_return_freelist(wrk);
/*
* Queue is empty, wait until data are available, or quit is
......@@ -331,6 +384,9 @@ WRK_Init(void)
worker_data_t *wrk = thread_data[i].wrk_data;
wrk->magic = WORKER_DATA_MAGIC;
wrk->sb = (struct vsb *) malloc(sizeof(struct vsb));
AN(wrk->sb);
AN(VSB_new(wrk->sb, NULL, config.max_reclen + 1, VSB_FIXEDLEN));
wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= wrk->restarts = wrk->recoverables = wrk->bytes = 0;
......@@ -341,6 +397,10 @@ WRK_Init(void)
AZ(pthread_mutex_init(&spmcq_datawaiter_lock, NULL));
AZ(pthread_cond_init(&spmcq_datawaiter_cond, NULL));
rec_thresh = (config.max_records >> 1) / config.nworkers;
chunk_thresh = rec_thresh *
((config.max_reclen + config.chunk_size - 1) / config.chunk_size);
atexit(wrk_cleanup);
return 0;
}
......@@ -378,7 +438,7 @@ WRK_Restart(void)
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= 0;
wrk->restarts++;
MON_StatsUpdate(STATS_RESTART, 0);
MON_StatsUpdate(STATS_RESTART, 0, 0);
wrk->state = WRK_NOTSTARTED;
if (pthread_create(&thread_data[i].worker, NULL, wrk_main, wrk)
!= 0) {
......@@ -405,10 +465,11 @@ WRK_Stats(void)
wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO,
"Worker %d (%s): seen=%lu waits=%lu sent=%lu bytes=%lu "
"reconnects=%lu restarts=%lu failed_recoverable=%lu failed=%lu",
"free_rec=%u free_chunk=%u reconnects=%lu restarts=%lu "
"failed_recoverable=%lu failed=%lu",
wrk->id, statename[wrk->state], wrk->deqs, wrk->waits,
wrk->sends, wrk->bytes, wrk->reconnects, wrk->restarts,
wrk->recoverables, wrk->fails);
wrk->sends, wrk->bytes, wrk->nfree_rec, wrk->nfree_chunk,
wrk->reconnects, wrk->restarts, wrk->recoverables, wrk->fails);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment