Commit 693abdec authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: added worker.c and unit test

parent 086e03de
...@@ -15,7 +15,8 @@ trackrdrd_SOURCES = \ ...@@ -15,7 +15,8 @@ trackrdrd_SOURCES = \
mq.c \ mq.c \
activemq/amq.h \ activemq/amq.h \
activemq/amq.cpp \ activemq/amq.cpp \
spmcq.c spmcq.c \
worker.c
trackrdrd_LDADD = \ trackrdrd_LDADD = \
$(VARNISHSRC)/lib/libvarnishcompat/libvarnishcompat.la \ $(VARNISHSRC)/lib/libvarnishcompat/libvarnishcompat.la \
......
...@@ -111,6 +111,7 @@ CONF_Add(const char *lval, const char *rval) ...@@ -111,6 +111,7 @@ CONF_Add(const char *lval, const char *rval)
confUnsigned("maxopen.scale", maxopen_scale); confUnsigned("maxopen.scale", maxopen_scale);
confUnsigned("maxdata.scale", maxdata_scale); confUnsigned("maxdata.scale", maxdata_scale);
confUnsigned("nworkers", nworkers);
if (strcmp(lval, "syslog.facility") == 0) { if (strcmp(lval, "syslog.facility") == 0) {
if ((ret = conf_getFacility(rval)) < 0) if ((ret = conf_getFacility(rval)) < 0)
...@@ -175,6 +176,7 @@ CONF_Init(void) ...@@ -175,6 +176,7 @@ CONF_Init(void)
config.maxdata_scale = 0; config.maxdata_scale = 0;
config.mq_uri[0] = '\0'; config.mq_uri[0] = '\0';
config.mq_qname[0] = '\0'; config.mq_qname[0] = '\0';
config.nworkers = 1;
} }
int int
...@@ -245,4 +247,5 @@ CONF_Dump(void) ...@@ -245,4 +247,5 @@ CONF_Dump(void)
confdump("maxdata.scale = %d", config.maxdata_scale); confdump("maxdata.scale = %d", config.maxdata_scale);
confdump("mq.uri = %s", config.mq_uri); confdump("mq.uri = %s", config.mq_uri);
confdump("mq.qname = %s", config.mq_qname); confdump("mq.qname = %s", config.mq_qname);
confdump("nworkers = %d", config.nworkers);
} }
...@@ -53,7 +53,7 @@ static void ...@@ -53,7 +53,7 @@ static void
spmcq_cleanup(void) spmcq_cleanup(void)
{ {
free(spmcq.data); free(spmcq.data);
pthread_mutex_destroy(&spmcq_deq_lock); AZ(pthread_mutex_destroy(&spmcq_deq_lock));
} }
int int
......
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) @AMQ_CFLAGS@ INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) @AMQ_CFLAGS@
TESTS = test_parse test_data test_mq test_spmcq test_spmcq_loop.sh regress.sh TESTS = test_parse test_data test_mq test_spmcq test_spmcq_loop.sh \
test_worker regress.sh
check_PROGRAMS = test_parse test_data test_mq test_spmcq check_PROGRAMS = test_parse test_data test_mq test_spmcq test_worker
test_parse_SOURCES = \ test_parse_SOURCES = \
minunit.h \ minunit.h \
...@@ -41,3 +42,17 @@ test_spmcq_LDADD = \ ...@@ -41,3 +42,17 @@ test_spmcq_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \ $(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../spmcq.$(OBJEXT) ../spmcq.$(OBJEXT)
test_worker_SOURCES = \
minunit.h \
test_worker.c \
../trackrdrd.h
test_worker_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../log.$(OBJEXT) \
../mq.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
...@@ -15,7 +15,7 @@ echo ...@@ -15,7 +15,7 @@ echo
echo "TEST: $0" echo "TEST: $0"
echo "... testing log output at debug level against a known checksum" echo "... testing log output at debug level against a known checksum"
CKSUM=$(../trackrdrd -f varnish.binlog -l - -d -c test.conf | cksum) CKSUM=$(../trackrdrd -f varnish.binlog -l - -d -c test.conf | cksum)
if [ "$CKSUM" != '1387393550 229074' ]; then if [ "$CKSUM" != '1094437405 229102' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM" echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1 exit 1
fi fi
......
...@@ -49,7 +49,7 @@ static char ...@@ -49,7 +49,7 @@ static char
const char *err; const char *err;
printf("... testing MQ global initialization\n"); printf("... testing MQ global initialization\n");
strcpy(config.mq_uri, "tcp://localhost:61616"); strcpy(config.mq_uri, "tcp://localhost:61616");
err = MQ_GlobalInit(); err = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit: %s", err); sprintf(errmsg, "MQ_GlobalInit: %s", err);
......
#! /bin/bash #! /bin/bash
N=1000 N=500
echo echo
echo "TEST: $0" echo "TEST: $0"
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "minunit.h"
#include "../trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
#define DEBUG 0
#define debug_print(fmt, ...) \
do { if (DEBUG) fprintf(stderr, fmt, __VA_ARGS__); } while(0)
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define NWORKERS 5
int tests_run = 0;
static char errmsg[BUFSIZ];
/* N.B.: Always run this test first */
static char
*test_worker_init(void)
{
int err;
const char *error;
printf("... testing worker initialization\n");
config.maxopen_scale = 0;
config.nworkers = NWORKERS;
strcpy(config.mq_uri, "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
error = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit failed: %s", error);
mu_assert(errmsg, error == NULL);
err = WRK_Init();
sprintf(errmsg, "WRK_Init: %s", strerror(err));
mu_assert(errmsg, err == 0);
AZ(LOG_Open("test_worker"));
AZ(DATA_Init());
AZ(SPMCQ_Init());
return NULL;
}
static char
*test_worker_run(void)
{
dataentry *entry;
printf("... testing run of %d workers\n", NWORKERS);
srand48(time(NULL));
unsigned xid = (unsigned int) lrand48();
WRK_Start();
for (int i = 0; i < 1024; i++) {
entry = DATA_Insert(xid);
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
entry->xid = xid;
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data);
entry->state = DATA_DONE;
sprintf(errmsg, "SPMCQ_Enq: queue full");
mu_assert(errmsg, SPMCQ_Enq(entry) != NULL);
}
WRK_Halt();
WRK_Shutdown();
AZ(MQ_GlobalShutdown());
LOG_Close();
return NULL;
}
static const char
*all_tests(void)
{
mu_run_test(test_worker_init);
mu_run_test(test_worker_run);
return NULL;
}
TEST_RUNNER
...@@ -35,6 +35,13 @@ ...@@ -35,6 +35,13 @@
#define MIN_TABLE_SCALE 10 #define MIN_TABLE_SCALE 10
/* worker.c */
int WRK_Init(void);
void WRK_Start(void);
void WRK_Halt(void);
void WRK_Shutdown(void);
/* spmcq.c */ /* spmcq.c */
/* Single producer multiple consumer bounded FIFO queue */ /* Single producer multiple consumer bounded FIFO queue */
...@@ -100,6 +107,8 @@ typedef struct { ...@@ -100,6 +107,8 @@ typedef struct {
unsigned open; unsigned open;
unsigned done; unsigned done;
unsigned submitted; /* Records submitted */ unsigned submitted; /* Records submitted */
unsigned sent; /* Records sent to MQ */
unsigned failed; /* MQ send fails */
unsigned occ_hi; /* Occupancy high water mark */ unsigned occ_hi; /* Occupancy high water mark */
unsigned data_hi; /* Data high water mark */ unsigned data_hi; /* Data high water mark */
dataentry *entry; dataentry *entry;
...@@ -130,6 +139,7 @@ struct config { ...@@ -130,6 +139,7 @@ struct config {
unsigned maxdata_scale; unsigned maxdata_scale;
char mq_uri[BUFSIZ]; char mq_uri[BUFSIZ];
char mq_qname[BUFSIZ]; char mq_qname[BUFSIZ];
unsigned nworkers;
} config; } config;
void CONF_Init(void); void CONF_Init(void);
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <pthread.h>
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include "trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
typedef struct {
unsigned magic;
#define WORKER_DATA_MAGIC 0xd8eef137
unsigned id;
unsigned status;
} worker_data_t;
typedef struct {
pthread_t worker;
worker_data_t *wrk_data;
} thread_data_t;
static unsigned run, cleaned = 0;
static thread_data_t *thread_data;
static inline void
wrk_send(void *amq_worker, dataentry *entry, unsigned id)
{
const char *err;
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->state == DATA_DONE);
AN(amq_worker);
err = MQ_Send(amq_worker, entry->data, entry->end);
if (err != NULL) {
/* XXX: error recovery? reconnect? preserve the data? */
LOG_Log(LOG_ALERT, "Worker %d: Failed to send data: %s", id, err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", id, entry->end,
entry->data);
tbl.failed++;
}
else
tbl.sent++;
entry->state = DATA_OPEN;
}
static void
*wrk_main(void *arg)
{
worker_data_t *wrk = (worker_data_t *) arg;
void *amq_worker;
dataentry *entry;
const char *err;
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
err = MQ_WorkerInit(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err);
wrk->status = EXIT_FAILURE;
pthread_exit((void *) wrk);
}
while (run) {
entry = (dataentry *) SPMCQ_Deq();
if (entry != NULL) {
/* Dequeued a data entry */
wrk_send(amq_worker, entry, wrk->id);
continue;
}
/* Queue is empty, wait until data are available, or quit is
signaled.
Grab the CV lock, which also constitutes an implicit memory
barrier */
AZ(pthread_mutex_lock(&spmcq_nonempty_lock));
/* run is guaranteed to be fresh here */
if (run)
AZ(pthread_cond_wait(&spmcq_nonempty_cond,
&spmcq_nonempty_lock));
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
}
/* Prepare to exit, drain the queue */
while ((entry = (dataentry *) SPMCQ_Deq()) != NULL)
wrk_send(amq_worker, entry, wrk->id);
wrk->status = EXIT_SUCCESS;
err = MQ_WorkerShutdown(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s",
wrk->id, err);
wrk->status = EXIT_FAILURE;
}
LOG_Log(LOG_INFO, "Worker %d: exiting", wrk->id);
pthread_exit((void *) wrk);
}
static void wrk_cleanup(void)
{
if (cleaned) return;
for (int i = 0; i < config.nworkers; i++)
free(thread_data[i].wrk_data);
free(thread_data);
AZ(pthread_mutex_destroy(&spmcq_nonempty_lock));
AZ(pthread_cond_destroy(&spmcq_nonempty_cond));
AZ(pthread_mutex_destroy(&spmcq_nonfull_lock));
AZ(pthread_cond_destroy(&spmcq_nonfull_cond));
cleaned = 1;
}
int
WRK_Init(void)
{
thread_data
= (thread_data_t *) malloc(config.nworkers * sizeof(thread_data_t));
if (thread_data == NULL) {
LOG_Log(LOG_ALERT, "Cannot allocate thread data: %s", strerror(errno));
return(errno);
}
run = 1;
for (int i = 0; i < config.nworkers; i++) {
thread_data[i].wrk_data
= (worker_data_t *) malloc(sizeof(worker_data_t));
if (thread_data[i].wrk_data == NULL) {
LOG_Log(LOG_ALERT, "Cannot allocate worker data for worker %d: %s",
i+1, strerror(errno));
return(errno);
}
thread_data[i].wrk_data->magic = WORKER_DATA_MAGIC;
thread_data[i].wrk_data->id = i + 1;
}
AZ(pthread_mutex_init(&spmcq_nonempty_lock, NULL));
AZ(pthread_cond_init(&spmcq_nonempty_cond, NULL));
AZ(pthread_mutex_init(&spmcq_nonfull_lock, NULL));
AZ(pthread_cond_init(&spmcq_nonfull_cond, NULL));
atexit(wrk_cleanup);
return 0;
}
void
WRK_Start(void)
{
run = 1;
for (int i = 0; i < config.nworkers; i++)
AZ(pthread_create(&thread_data[i].worker, NULL, wrk_main,
thread_data[i].wrk_data));
}
void
WRK_Halt(void)
{
/*
* must only modify run under spmcq_nonempty_lock to ensure that
* we signal all waiting consumers (otherwise a consumer could go
* waiting _after_ we have broadcasted and so miss the event.
*/
AZ(pthread_mutex_lock(&spmcq_nonempty_lock));
run = 0;
AZ(pthread_cond_broadcast(&spmcq_nonempty_cond));
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
for(int i = 0; i < config.nworkers; i++) {
AZ(pthread_join(thread_data[i].worker,
(void **) &thread_data[i].wrk_data));
CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC);
if (thread_data[i].wrk_data->status != EXIT_SUCCESS)
LOG_Log(LOG_ERR, "Worker %d returned failure status", i+1);
}
}
void
WRK_Shutdown(void)
{
/* XXX: error if run=1? */
wrk_cleanup();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment