Commit 9e4ae90c authored by Geoff Simmons's avatar Geoff Simmons

Bump to version 3.0:

* message queue plugin interface implemented in trackrdrd
* ActiveMQ connections now implemented as plugin (builds shared object)
All 'make check' tests pass
parent 7cffa351
AC_PREREQ(2.59)
AC_COPYRIGHT([Copyright (c) 2012 Otto Gmbh & Co KG])
AC_INIT([trackrdrd], [2.0])
AC_PREREQ(2.63)
AC_COPYRIGHT([Copyright (c) 2012-2014 Otto Gmbh & Co KG])
AC_INIT([trackrdrd], [3.0])
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_SRCDIR(src/trackrdrd.c)
AM_CONFIG_HEADER(config.h)
......@@ -9,6 +9,10 @@ AC_CANONICAL_SYSTEM
AC_LANG(C)
AM_INIT_AUTOMAKE([foreign])
AC_USE_SYSTEM_EXTENSIONS
LT_PREREQ(2.2)
LT_INIT
AC_GNU_SOURCE
AC_PROG_CC
......@@ -139,5 +143,6 @@ AC_CONFIG_FILES([
Makefile
src/Makefile
src/test/Makefile
src/mq/activemq/Makefile
])
AC_OUTPUT
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/* XXX: should be: MQ_GlobalInit(unsigned nworkers, const char config_fname); */
const char *MQ_GlobalInit(unsigned nworkers, unsigned n_mq_uris, char **mq_uri,
char *mq_qname);
const char *MQ_InitConnections(void);
const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version);
const char *MQ_ClientID(void *priv, char *clientID);
const char *MQ_Reconnect(void **priv);
const char *MQ_WorkerShutdown(void **priv);
const char *MQ_GlobalShutdown(void);
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) @AMQ_CFLAGS@ @APR_CFLAGS@ \
@APU_CFLAGS@
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC)
bin_PROGRAMS = trackrdrd
......@@ -13,11 +12,6 @@ trackrdrd_SOURCES = \
config.c \
data.c \
monitor.c \
mq.c \
activemq/amq.h \
activemq/amq.cpp \
activemq/amq_connection.h \
activemq/amq_connection.cpp \
spmcq.c \
worker.c \
sandbox.c \
......@@ -29,10 +23,9 @@ trackrdrd_LDADD = \
$(VARNISHSRC)/lib/libvarnishcompat/libvarnishcompat.la \
$(VARNISHSRC)/lib/libvarnishapi/libvarnishapi.la \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
${PTHREAD_LIBS} \
@AMQ_LIBS@ \
@APR_LIBS@ \
@APU_LIBS@
${PTHREAD_LIBS}
trackrdrd_LDFLAGS = -ldl
BUILT_SOURCES = revision.h usage.h
MAINTAINERCLEANFILES = revision.h usage.h
......
......@@ -52,6 +52,7 @@
#include <pwd.h>
#include <limits.h>
#include <stdarg.h>
#include <dlfcn.h>
#include "vsb.h"
#include "vpf.h"
......@@ -854,6 +855,7 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
const char *errmsg;
pthread_t monitor;
struct passwd *pw;
void *mqh;
AZ(pthread_mutexattr_init(&attr_lock));
AZ(pthread_condattr_init(&attr_cond));
......@@ -885,6 +887,28 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
AN(pw);
LOG_Log(LOG_INFO, "Running as %s", pw->pw_name);
/* read messaging module */
if (config.mq_module[0] == '\0') {
LOG_Log0(LOG_ALERT, "mq.module not found in config (required)");
exit(EXIT_FAILURE);
}
dlerror(); // to clear errors
mqh = dlopen(config.mq_module, RTLD_NOW);
if ((errmsg = dlerror()) != NULL) {
LOG_Log(LOG_ALERT, "error reading mq module %s: %s", config.mq_module,
errmsg);
exit(EXIT_FAILURE);
}
#define METHOD(instm, intfm) \
mqf.instm = dlsym(mqh, #intfm); \
if ((errmsg = dlerror()) != NULL) { \
LOG_Log(LOG_ALERT, "error loading mq method %s: %s", #intfm, errmsg); \
exit(EXIT_FAILURE); \
}
#include "methods.h"
#undef METHOD
/* install signal handlers */
dump_action.sa_handler = dump;
AZ(sigemptyset(&dump_action.sa_mask));
......@@ -925,13 +949,14 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
else
LOG_Log0(LOG_INFO, "Monitoring thread not running");
errmsg = MQ_GlobalInit();
errmsg = mqf.global_init(config.nworkers, config.n_mq_uris, config.mq_uri,
config.mq_qname);
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize message broker access: %s", errmsg);
exit(EXIT_FAILURE);
}
errmsg = MQ_InitConnections();
errmsg = mqf.init_connections();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize message broker connections: %s",
errmsg);
......@@ -1000,8 +1025,11 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
WRK_Halt();
WRK_Shutdown();
if ((errmsg = MQ_GlobalShutdown()) != NULL)
if ((errmsg = mqf.global_shutdown()) != NULL)
LOG_Log(LOG_ALERT, "Message queue shutdown failed: %s", errmsg);
if (dlclose(mqh) != 0)
LOG_Log(LOG_ALERT, "Error closing mq module %s: %s", config.mq_module,
dlerror());
if (config.monitor_interval > 0.0)
MON_StatusShutdown(monitor);
LOG_Log0(LOG_INFO, "Worker process exiting");
......
......@@ -115,6 +115,7 @@ CONF_Add(const char *lval, const char *rval)
confString("log.file", log_file);
confString("varnish.bindump", varnish_bindump);
confString("mq.qname", mq_qname);
confString("mq.module", mq_module);
confUnsigned("maxopen.scale", maxopen_scale);
confUnsigned("maxdata", maxdata);
......@@ -238,6 +239,7 @@ CONF_Init(void)
config.hash_ttl = DEF_HASH_TTL;
config.hash_mlt = DEF_HASH_MLT;
config.mq_module[0] = '\0';
config.n_mq_uris = 0;
config.mq_uri = (char **) malloc (sizeof(char **));
AN(config.mq_uri);
......@@ -347,6 +349,7 @@ CONF_Dump(void)
else
LOG_Log0(LOG_DEBUG, "config: mq.uri = ");
confdump("mq.module = %s", config.mq_module);
confdump("mq.qname = %s", config.mq_qname);
confdump("nworkers = %u", config.nworkers);
confdump("restarts = %u", config.restarts);
......
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
METHOD(global_init, MQ_GlobalInit)
METHOD(init_connections, MQ_InitConnections)
METHOD(worker_init, MQ_WorkerInit)
METHOD(send, MQ_Send)
METHOD(version, MQ_Version)
METHOD(client_id, MQ_ClientID)
METHOD(reconnect, MQ_Reconnect)
METHOD(worker_shutdown, MQ_WorkerShutdown)
METHOD(global_shutdown, MQ_GlobalShutdown)
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) -I$(top_srcdir)/include \
@AMQ_CFLAGS@ @APR_CFLAGS@ @APU_CFLAGS@
pkglib_LTLIBRARIES = libtrackrdr-activemq.la
libtrackrdr_activemq_la_SOURCES = \
$(top_srcdir)/include/mq.h \
mq.c \
amq.h \
amq.cpp \
amq_connection.h \
amq_connection.cpp
libtrackrdr_activemq_la_LIBADD = \
${PTHREAD_LIBS} \
@AMQ_LIBS@ \
@APR_LIBS@ \
@APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS = -version-info 0:0:0
......@@ -33,23 +33,37 @@
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include "trackrdrd.h"
#include "activemq/amq.h"
#include "activemq/amq_connection.h"
#include "vas.h"
#include "mq.h"
#include "amq.h"
#include "amq_connection.h"
static AMQ_Connection **connections;
static AMQ_Worker **workers;
static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
static unsigned connection = 0;
static unsigned nwrk = 0;
/* XXX: Obtain from a config file for activemq */
static unsigned n_uris;
static char **uri;
static char *qname;
const char *
MQ_GlobalInit(void)
MQ_GlobalInit(unsigned nworkers, unsigned n_mq_uris, char **mq_uri,
char *mq_qname)
{
workers = (AMQ_Worker **) calloc(sizeof (AMQ_Worker *), config.nworkers);
workers = (AMQ_Worker **) calloc(sizeof (AMQ_Worker *), nworkers);
if (workers == NULL)
return strerror(errno);
nwrk = nworkers;
/* XXX: read these from config file for activemq */
n_uris = n_mq_uris;
uri = mq_uri;
qname = mq_qname;
return AMQ_GlobalInit();
}
......@@ -59,15 +73,14 @@ MQ_InitConnections(void)
AMQ_Connection *conn;
const char *err;
if (config.n_mq_uris == 0)
if (n_uris == 0)
return NULL;
connections = (AMQ_Connection **) calloc(sizeof(AMQ_Connection *),
config.nworkers);
connections = (AMQ_Connection **) calloc(sizeof(AMQ_Connection *), nwrk);
if (connections == NULL)
return strerror(errno);
for (int i = 0; i < config.nworkers; i++) {
err = AMQ_ConnectionInit(&conn, config.mq_uri[i % config.n_mq_uris]);
for (int i = 0; i < nwrk; i++) {
err = AMQ_ConnectionInit(&conn, uri[i % n_uris]);
if (err != NULL)
return err;
connections[i] = conn;
......@@ -78,19 +91,21 @@ MQ_InitConnections(void)
const char *
MQ_WorkerInit(void **priv)
{
int i;
int i, ret;
const char *err = NULL;
AZ(pthread_mutex_lock(&connection_lock));
i = connection++ % config.nworkers;
AZ(pthread_mutex_unlock(&connection_lock));
ret = pthread_mutex_lock(&connection_lock);
assert(ret == 0);
i = connection++ % nwrk;
ret = pthread_mutex_unlock(&connection_lock);
assert(ret == 0);
AMQ_Connection *conn = connections[i];
if (conn == NULL)
err = AMQ_ConnectionInit(&conn, config.mq_uri[i % config.n_mq_uris]);
err = AMQ_ConnectionInit(&conn, uri[i % n_uris]);
if (err != NULL)
return err;
connections[i] = conn;
err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, config.mq_qname);
err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname);
if (err == NULL)
workers[i] = (AMQ_Worker *) *priv;
return err;
......@@ -112,20 +127,20 @@ MQ_Reconnect(void **priv)
err = AMQ_WorkerShutdown((AMQ_Worker **) priv);
if (err != NULL)
return err;
for (int i = 0; i < config.nworkers; i++)
for (int i = 0; i < nwrk; i++)
if (workers[i] == (AMQ_Worker *) *priv) {
wrk_num = i;
break;
}
err = AMQ_ConnectionInit(&conn,
config.mq_uri[connection++ % config.n_mq_uris]);
uri[connection++ % n_uris]);
if (err != NULL) {
connections[wrk_num] = NULL;
return err;
}
else
connections[wrk_num] = conn;
return AMQ_WorkerInit((AMQ_Worker **) priv, conn, config.mq_qname);
return AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname);
}
const char *
......@@ -155,10 +170,9 @@ MQ_GlobalShutdown(void)
{
const char *err;
for (int i; i < config.n_mq_uris; i++)
for (int i; i < n_uris; i++)
if (connections[i] != NULL
&& (err = AMQ_ConnectionShutdown(connections[i])) != NULL)
return err;
return AMQ_GlobalShutdown();
}
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) @AMQ_CFLAGS@
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC)
TESTS = test_parse test_data test_hash test_mq test_spmcq \
test_spmcq_loop.sh test_worker regress.sh
......@@ -30,36 +30,31 @@ test_hash_SOURCES = \
../trackrdrd.h
test_hash_LDADD = \
-ldl \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
$(VARNISHSRC)/lib/libvarnishapi/libvarnishapi.la \
../worker.$(OBJEXT) \
../log.$(OBJEXT) \
../mq.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \
../amq_connection.$(OBJEXT) \
../data.$(OBJEXT) \
../assert.$(OBJEXT) \
../monitor.$(OBJEXT) \
../parse.$(OBJEXT) \
../config.$(OBJEXT) \
../sandbox.$(OBJEXT) \
@AMQ_LIBS@
../sandbox.$(OBJEXT)
test_hash_CFLAGS = -DTEST_DRIVER
test_mq_SOURCES = \
minunit.h \
test_mq.c \
../trackrdrd.h
../trackrdrd.h \
../methods.h
test_mq_LDADD = \
-ldl \
-lm \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../amq_connection.$(OBJEXT) \
@AMQ_LIBS@
$(VARNISHSRC)/lib/libvarnish/libvarnish.la
test_spmcq_SOURCES = \
minunit.h \
......@@ -77,14 +72,11 @@ test_worker_SOURCES = \
../trackrdrd.h
test_worker_LDADD = \
-ldl \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../log.$(OBJEXT) \
../mq.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \
../amq_connection.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
../data.$(OBJEXT)
test_worker_CFLAGS = -DTEST_DRIVER
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '3639141188 234061' ]; then
if [ "$CKSUM" != '1036301113 234133' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
......@@ -6,3 +6,4 @@ maxopen.scale = 11
maxdone = 1024
monitor.interval = 0
nworkers = 0
mq.module = ../mq/activemq/.libs/libtrackrdr-activemq.so
\ No newline at end of file
......@@ -30,6 +30,7 @@
*/
#include <string.h>
#include <dlfcn.h>
#include "minunit.h"
......@@ -39,10 +40,44 @@
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so"
int tests_run = 0;
static char errmsg[BUFSIZ];
static void *mqh;
void *worker;
static void
init(void)
{
char *err;
dlerror(); // to clear errors
mqh = dlopen(MQ_MODULE, RTLD_NOW);
if ((err = dlerror()) != NULL) {
fprintf(stderr, "error reading mq module %s: %s", MQ_MODULE, err);
exit(EXIT_FAILURE);
}
#define METHOD(instm, intfm) \
mqf.instm = dlsym(mqh, #intfm); \
if ((err = dlerror()) != NULL) { \
fprintf(stderr, "error loading mq method %s: %s", #intfm, err); \
exit(EXIT_FAILURE); \
}
#include "../methods.h"
#undef METHOD
}
static void
fini(void)
{
if (dlclose(mqh) != 0) {
fprintf(stderr, "Error closing mq module %s: %s", MQ_MODULE, dlerror());
exit(EXIT_FAILURE);
}
}
/* N.B.: Always run the tests in this order */
static char
*test_global_init(void)
......@@ -51,7 +86,17 @@ static char
printf("... testing MQ global initialization\n");
err = MQ_GlobalInit();
config.n_mq_uris = 1;
config.nworkers = 1;
config.mq_uri = (char **) malloc(sizeof(char **));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
err = mqf.global_init(config.nworkers, config.n_mq_uris, config.mq_uri,
config.mq_qname);
sprintf(errmsg, "MQ_GlobalInit: %s", err);
mu_assert(errmsg, err == NULL);
......@@ -65,16 +110,7 @@ static char
printf("... testing MQ connection initialization\n");
config.n_mq_uris = 1;
config.nworkers = 1;
config.mq_uri = (char **) malloc(sizeof(char **));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
err = MQ_InitConnections();
err = mqf.init_connections();
if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED);
......@@ -92,7 +128,7 @@ static const char
printf("... test worker init\n");
err = MQ_WorkerInit(&worker);
err = mqf.worker_init(&worker);
sprintf(errmsg, "MQ_WorkerInit: %s", err);
mu_assert(errmsg, err == NULL);
......@@ -110,7 +146,7 @@ static const char
printf("... testing version info\n");
mu_assert("MQ_Version: worker is NULL before call", worker != NULL);
err = MQ_Version(worker, version);
err = mqf.version(worker, version);
sprintf(errmsg, "MQ_Version: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("MQ_Version: version is empty", version[0] != '\0');
......@@ -127,7 +163,7 @@ static const char
printf("... testing client ID info\n");
mu_assert("MQ_ClientID: worker is NULL before call", worker != NULL);
err = MQ_ClientID(worker, clientID);
err = mqf.client_id(worker, clientID);
sprintf(errmsg, "MQ_Version: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("MQ_Version: client ID is empty", clientID[0] != '\0');
......@@ -143,7 +179,7 @@ static const char
printf("... testing message send\n");
mu_assert("MQ_Send: worker is NULL before call", worker != NULL);
err = MQ_Send(worker, "foo bar baz quux", 16);
err = mqf.send(worker, "foo bar baz quux", 16);
sprintf(errmsg, "MQ_Send: %s", err);
mu_assert(errmsg, err == NULL);
......@@ -158,13 +194,13 @@ static const char
printf("... testing worker shutdown\n");
mu_assert("MQ_WorkerShutdown: worker is NULL before call", worker != NULL);
err = MQ_WorkerShutdown(&worker);
err = mqf.worker_shutdown(&worker);
sprintf(errmsg, "MQ_WorkerShutdown: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("Worker not NULL after shutdown", worker == NULL);
err = MQ_Send(worker, "foo bar baz quux", 16);
err = mqf.send(worker, "foo bar baz quux", 16);
mu_assert("No failure on MQ_Send after worker shutdown", err != NULL);
return NULL;
......@@ -176,7 +212,7 @@ static const char
const char *err;
printf("... testing global shutdown\n");
err = MQ_GlobalShutdown();
err = mqf.global_shutdown();
sprintf(errmsg, "MQ_GlobalShutdown: %s", err);
mu_assert(errmsg, err == NULL);
......@@ -186,6 +222,7 @@ static const char
static const char
*all_tests(void)
{
init();
mu_run_test(test_global_init);
mu_run_test(test_init_connection);
mu_run_test(test_worker_init);
......@@ -194,6 +231,7 @@ static const char
mu_run_test(test_send);
mu_run_test(test_worker_shutdown);
mu_run_test(test_global_shutdown);
fini();
return NULL;
}
......
......@@ -31,6 +31,7 @@
#include <string.h>
#include <stdbool.h>
#include <dlfcn.h>
#include "minunit.h"
......@@ -48,11 +49,45 @@
#define NWORKERS 5
#define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so"
#define URI1 "tcp://localhost:61616?wireFormat.maxInactivityDuration=0"
#define URI2 "tcp://localhost:61616?connection.sendTimeout=1000&wireFormat.maxInactivityDuration=0"
int tests_run = 0;
static char errmsg[BUFSIZ];
static void *mqh;
static void
init(void)
{
char *err;
dlerror(); // to clear errors
mqh = dlopen(MQ_MODULE, RTLD_NOW);
if ((err = dlerror()) != NULL) {
fprintf(stderr, "error reading mq module %s: %s", MQ_MODULE, err);
exit(EXIT_FAILURE);
}
#define METHOD(instm, intfm) \
mqf.instm = dlsym(mqh, #intfm); \
if ((err = dlerror()) != NULL) { \
fprintf(stderr, "error loading mq method %s: %s", #intfm, err); \
exit(EXIT_FAILURE); \
}
#include "../methods.h"
#undef METHOD
}
static void
fini(void)
{
if (dlclose(mqh) != 0) {
fprintf(stderr, "Error closing mq module %s: %s", MQ_MODULE, dlerror());
exit(EXIT_FAILURE);
}
}
/* N.B.: Always run this test first */
static char
......@@ -79,11 +114,12 @@ static char
AN(config.mq_uri[1]);
strcpy(config.mq_uri[1], URI2);
error = MQ_GlobalInit();
error = mqf.global_init(config.nworkers, config.n_mq_uris, config.mq_uri,
config.mq_qname);
sprintf(errmsg, "MQ_GlobalInit failed: %s", error);
mu_assert(errmsg, error == NULL);
error = MQ_InitConnections();
error = mqf.init_connections();
if (error != NULL && strstr(error, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED);
......@@ -135,7 +171,7 @@ static const char
WRK_Halt();
WRK_Shutdown();
AZ(MQ_GlobalShutdown());
AZ(mqf.global_shutdown());
LOG_Close();
return NULL;
......@@ -144,8 +180,10 @@ static const char
static const char
*all_tests(void)
{
init();
mu_run_test(test_worker_init);
mu_run_test(test_worker_run);
fini();
return NULL;
}
......
......@@ -40,6 +40,30 @@
#include "vqueue.h"
#include "varnishapi.h"
/* message queue methods, typedefs match the interface in mq.h */
typedef const char *global_init_f(unsigned nworkers, unsigned n_mq_uris,
char **mq_uri, char *mq_name);
typedef const char *init_connections_f(void);
typedef const char *worker_init_f(void **priv);
typedef const char *send_f(void *priv, const char *data, unsigned len);
typedef const char *version_f(void *priv, char *version);
typedef const char *client_id_f(void *priv, char *clientID);
typedef const char *reconnect_f(void **priv);
typedef const char *worker_shutdown_f(void **priv);
typedef const char *global_shutdown_f(void);
struct mqf {
global_init_f *global_init;
init_connections_f *init_connections;
worker_init_f *worker_init;
send_f *send;
version_f *version;
client_id_f *client_id;
reconnect_f *reconnect;
worker_shutdown_f *worker_shutdown;
global_shutdown_f *global_shutdown;
} mqf;
/* assert.c */
void ASRT_Fail(const char *func, const char *file, int line, const char *cond,
......@@ -89,17 +113,6 @@ int WRK_Exited(void);
void WRK_Halt(void);
void WRK_Shutdown(void);
/* mq.c */
const char *MQ_GlobalInit(void);
const char *MQ_InitConnections(void);
const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version);
const char *MQ_ClientID(void *priv, char *clientID);
const char *MQ_Reconnect(void **priv);
const char *MQ_WorkerShutdown(void **priv);
const char *MQ_GlobalShutdown(void);
/* data.c */
typedef enum {
DATA_EMPTY = 0,
......@@ -298,6 +311,7 @@ struct config {
unsigned hash_mlt;
#define DEF_HASH_MLT 5
char mq_module[BUFSIZ];
unsigned n_mq_uris;
char **mq_uri;
char mq_qname[BUFSIZ];
......
......@@ -103,12 +103,12 @@ wrk_log_connection(void *amq_worker, unsigned id)
const char *err;
char version[VERSION_LEN], clientID[CLIENT_ID_LEN];
err = MQ_Version(amq_worker, version);
err = mqf.version(amq_worker, version);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ version", id, err);
version[0] = '\0';
}
err = MQ_ClientID(amq_worker, clientID);
err = mqf.client_id(amq_worker, clientID);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ client ID", id, err);
clientID[0] = '\0';
......@@ -127,12 +127,12 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
AN(amq_worker);
/* XXX: report entry->incomplete to backend ? */
err = MQ_Send(*amq_worker, entry->data, entry->end);
err = mqf.send(*amq_worker, entry->data, entry->end);
if (err != NULL) {
LOG_Log(LOG_WARNING, "Worker %d: Failed to send data: %s",
wrk->id, err);
LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id);
err = MQ_Reconnect(amq_worker);
err = mqf.reconnect(amq_worker);
if (err != NULL) {
*amq_worker = NULL;
LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id,
......@@ -144,7 +144,7 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
wrk->reconnects++;
wrk_log_connection(*amq_worker, wrk->id);
MON_StatsUpdate(STATS_RECONNECT);
err = MQ_Send(*amq_worker, entry->data, entry->end);
err = mqf.send(*amq_worker, entry->data, entry->end);
if (err != NULL) {
wrk->fails++;
*amq_worker = NULL;
......@@ -188,7 +188,7 @@ static void
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING;
err = MQ_WorkerInit(&amq_worker);
err = mqf.worker_init(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err);
......@@ -262,7 +262,7 @@ static void
}
wrk->status = EXIT_SUCCESS;
err = MQ_WorkerShutdown(&amq_worker);
err = mqf.worker_shutdown(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s",
wrk->id, err);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment