Commit 864a3eb1 authored by Geoff Simmons's avatar Geoff Simmons

MQ config now read from a separate file in config value mq.config_file

All other mq.* config parameters retired except for mq.module
parent 5da6aa30
ACLOCAL_AMFLAGS = -I m4
SUBDIRS = src src/test
SUBDIRS = src src/test src/mq/activemq
if HAVE_RST2MAN
dist_man_MANS = trackrdrd.3
MAINTAINERCLEANFILES = $(dist_man_MANS)
MAINTAINERCLEANFILES = $(dist_man_MANS) *~
endif
trackrdrd.3: README.rst
......
......@@ -29,9 +29,7 @@
*
*/
/* XXX: should be: MQ_GlobalInit(unsigned nworkers, const char config_fname); */
const char *MQ_GlobalInit(unsigned nworkers, unsigned n_mq_uris, char **mq_uri,
char *mq_qname);
const char *MQ_GlobalInit(unsigned nworkers, const char *config_fname);
const char *MQ_InitConnections(void);
const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len);
......
......@@ -950,8 +950,7 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
else
LOG_Log0(LOG_INFO, "Monitoring thread not running");
errmsg = mqf.global_init(config.nworkers, config.n_mq_uris, config.mq_uri,
config.mq_qname);
errmsg = mqf.global_init(config.nworkers, config.mq_config_file);
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize message broker access: %s", errmsg);
exit(EXIT_FAILURE);
......
......@@ -115,8 +115,8 @@ CONF_Add(const char *lval, const char *rval)
confString("varnish.name", varnish_name);
confString("log.file", log_file);
confString("varnish.bindump", varnish_bindump);
confString("mq.qname", mq_qname);
confString("mq.module", mq_module);
confString("mq.config_file", mq_config_file);
confUnsigned("maxopen.scale", maxopen_scale);
confUnsigned("maxdata", maxdata);
......@@ -180,19 +180,6 @@ CONF_Add(const char *lval, const char *rval)
return(EINVAL);
}
if (strcmp(lval, "mq.uri") == 0) {
int n = config.n_mq_uris++;
config.mq_uri = (char **) realloc(config.mq_uri,
config.n_mq_uris * sizeof(char **));
if (config.mq_uri == NULL)
return(errno);
config.mq_uri[n] = (char *) malloc(strlen(rval) + 1);
if (config.mq_uri[n] == NULL)
return(errno);
strcpy(config.mq_uri[n], rval);
return(0);
}
return EINVAL;
}
......@@ -217,10 +204,7 @@ CONF_Init(void)
config.hash_mlt = DEF_HASH_MLT;
config.mq_module[0] = '\0';
config.n_mq_uris = 0;
config.mq_uri = (char **) malloc (sizeof(char **));
AN(config.mq_uri);
config.mq_qname[0] = '\0';
config.mq_config_file[0] = '\0';
config.nworkers = 1;
config.restarts = 1;
config.thread_restarts = 1;
......@@ -270,14 +254,8 @@ CONF_Dump(void)
confdump("hash.ttl = %u", config.hash_ttl);
confdump("hash.mlt = %u", config.hash_mlt);
if (config.n_mq_uris > 0)
for (int i = 0; i < config.n_mq_uris; i++)
confdump("mq.uri = %s", config.mq_uri[i]);
else
LOG_Log0(LOG_DEBUG, "config: mq.uri = ");
confdump("mq.module = %s", config.mq_module);
confdump("mq.qname = %s", config.mq_qname);
confdump("mq.config_file = %s", config.mq_config_file);
confdump("nworkers = %u", config.nworkers);
confdump("restarts = %u", config.restarts);
confdump("thread.restarts = %u", config.thread_restarts);
......
......@@ -5,11 +5,13 @@ pkglib_LTLIBRARIES = libtrackrdr-activemq.la
libtrackrdr_activemq_la_SOURCES = \
$(top_srcdir)/include/mq.h \
$(top_srcdir)/include/config_common.h \
mq.c \
amq.h \
amq.cpp \
amq_connection.h \
amq_connection.cpp
amq_connection.cpp \
$(top_srcdir)/src/config_common.c
libtrackrdr_activemq_la_LIBADD = \
${PTHREAD_LIBS} \
......@@ -17,4 +19,4 @@ libtrackrdr_activemq_la_LIBADD = \
@APR_LIBS@ \
@APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS = -version-info 0:0:0
libtrackrdr_activemq_la_LDFLAGS = -version-info 1:0:0
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
......@@ -29,6 +29,7 @@
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
......@@ -36,6 +37,7 @@
#include <assert.h>
#include "mq.h"
#include "config_common.h"
#include "amq.h"
#include "amq_connection.h"
......@@ -45,24 +47,46 @@ static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
static unsigned connection = 0;
static unsigned nwrk = 0;
/* XXX: Obtain from a config file for activemq */
static unsigned n_uris;
static unsigned n_uris = 0;
static char **uri;
static char *qname;
static char qname[BUFSIZ];
static int
conf_add(const char *lval, const char *rval)
{
if (strcmp(lval, "mq.qname") == 0) {
strcpy(qname, rval);
return(0);
}
if (strcmp(lval, "mq.uri") == 0) {
int n = n_uris++;
uri = (char **) realloc(uri, n_uris * sizeof(char **));
if (uri == NULL)
return(errno);
uri[n] = (char *) malloc(strlen(rval) + 1);
if (uri[n] == NULL)
return(errno);
strcpy(uri[n], rval);
return(0);
}
return EINVAL;
}
const char *
MQ_GlobalInit(unsigned nworkers, unsigned n_mq_uris, char **mq_uri,
char *mq_qname)
MQ_GlobalInit(unsigned nworkers, const char *config_fname)
{
workers = (AMQ_Worker **) calloc(sizeof (AMQ_Worker *), nworkers);
if (workers == NULL)
return strerror(errno);
nwrk = nworkers;
/* XXX: read these from config file for activemq */
n_uris = n_mq_uris;
uri = mq_uri;
qname = mq_qname;
uri = (char **) malloc (sizeof(char **));
if (uri == NULL)
return strerror(errno);
if (CONF_ReadFile(config_fname, conf_add) != 0)
return "Error reading config file for ActiveMQ";
return AMQ_GlobalInit();
}
......
# test config for ActiveMQ
mq.uri = tcp://localhost:61616
mq.qname = lhoste/tracking/test
# test config for ActiveMQ with 2 MQ URIs
mq.uri = tcp://localhost:61616?wireFormat.maxInactivityDuration=0
mq.uri = tcp://localhost:61616?connection.sendTimeout=1000&wireFormat.maxInactivityDuration=0
mq.qname = lhoste/tracking/test
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '1036301113 234133' ]; then
if [ "$CKSUM" != '2459426004 234127' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
......@@ -6,4 +6,5 @@ maxopen.scale = 11
maxdone = 1024
monitor.interval = 0
nworkers = 0
mq.module = ../mq/activemq/.libs/libtrackrdr-activemq.so
\ No newline at end of file
mq.module = ../mq/activemq/.libs/libtrackrdr-activemq.so
mq.config_file = activemq.conf
......@@ -41,6 +41,7 @@
#define EXIT_SKIPPED 77
#define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so"
#define MQ_CONFIG "activemq.conf"
int tests_run = 0;
static char errmsg[BUFSIZ];
......@@ -86,17 +87,9 @@ static char
printf("... testing MQ global initialization\n");
config.n_mq_uris = 1;
config.nworkers = 1;
config.mq_uri = (char **) malloc(sizeof(char **));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
err = mqf.global_init(config.nworkers, config.n_mq_uris, config.mq_uri,
config.mq_qname);
strcpy(config.mq_config_file, MQ_CONFIG);
err = mqf.global_init(config.nworkers, config.mq_config_file);
sprintf(errmsg, "MQ_GlobalInit: %s", err);
mu_assert(errmsg, err == NULL);
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
......@@ -50,9 +50,7 @@
#define NWORKERS 5
#define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so"
#define URI1 "tcp://localhost:61616?wireFormat.maxInactivityDuration=0"
#define URI2 "tcp://localhost:61616?connection.sendTimeout=1000&wireFormat.maxInactivityDuration=0"
#define MQ_CONFIG "activemq2.conf"
int tests_run = 0;
static char errmsg[BUFSIZ];
......@@ -102,20 +100,9 @@ static char
config.maxdone = 1024;
config.maxdata = 1024;
config.nworkers = NWORKERS;
strcpy(config.mq_qname, "lhoste/tracking/test");
config.n_mq_uris = 2;
config.mq_uri = (char **) malloc(2 * sizeof(char**));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen(URI1) + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], URI1);
config.mq_uri[1] = (char *) malloc(strlen(URI2) + 1);
AN(config.mq_uri[1]);
strcpy(config.mq_uri[1], URI2);
error = mqf.global_init(config.nworkers, config.n_mq_uris, config.mq_uri,
config.mq_qname);
strcpy(config.mq_config_file, MQ_CONFIG);
error = mqf.global_init(config.nworkers, config.mq_config_file);
sprintf(errmsg, "MQ_GlobalInit failed: %s", error);
mu_assert(errmsg, error == NULL);
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
......@@ -41,8 +41,7 @@
#include "varnishapi.h"
/* message queue methods, typedefs match the interface in mq.h */
typedef const char *global_init_f(unsigned nworkers, unsigned n_mq_uris,
char **mq_uri, char *mq_name);
typedef const char *global_init_f(unsigned nworkers, const char *config_fname);
typedef const char *init_connections_f(void);
typedef const char *worker_init_f(void **priv);
typedef const char *send_f(void *priv, const char *data, unsigned len);
......@@ -312,9 +311,7 @@ struct config {
#define DEF_HASH_MLT 5
char mq_module[BUFSIZ];
unsigned n_mq_uris;
char **mq_uri;
char mq_qname[BUFSIZ];
char mq_config_file[BUFSIZ];
unsigned nworkers;
unsigned restarts;
unsigned thread_restarts;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment