Commit 78705f7d authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - added amq_connection.* and connection pooling

	- reworked config params:
		- maxopen.scale can be an power of 2
		- maxdone, maxdata and qlen.goal not powers of 2
parent 301c1ba6
......@@ -15,6 +15,8 @@ trackrdrd_SOURCES = \
mq.c \
activemq/amq.h \
activemq/amq.cpp \
activemq/amq_connection.h \
activemq/amq_connection.cpp \
spmcq.c \
worker.c \
sandbox.c \
......
......@@ -33,6 +33,7 @@
#include "amq.h"
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/core/ActiveMQConnection.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <cms/IllegalStateException.h>
#include <cms/ConnectionMetaData.h>
......@@ -57,32 +58,11 @@ using namespace cms;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
std::vector<ActiveMQConnectionFactory*> AMQ_Worker::factories;
void
AMQ_Worker::shutdown() {
for (unsigned i = 0; i < factories.size(); i++)
delete factories[i];
factories.resize(0);
}
AMQ_Worker::AMQ_Worker(std::string& brokerURI, std::string& qName,
AMQ_Worker::AMQ_Worker(Connection* cn, std::string& qName,
Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE,
int deliveryMode = DeliveryMode::NON_PERSISTENT) {
ActiveMQConnectionFactory* factory = NULL;
for (unsigned i = 0; i < factories.size(); i++)
if (factories[i]->getBrokerURI().toString().compare(brokerURI) == 0) {
factory = factories[i];
break;
}
if (factory == NULL) {
factory = new ActiveMQConnectionFactory(brokerURI);
factories.push_back(factory);
}
connection = factory->createConnection();
connection->start();
connection = cn;
session = connection->createSession(ackMode);
queue = session->createQueue(qName);
producer = session->createProducer(queue);
......@@ -103,11 +83,6 @@ AMQ_Worker::~AMQ_Worker() {
delete session;
session = NULL;
}
if (connection != NULL) {
connection->close();
delete connection;
connection = NULL;
}
}
void
......@@ -137,12 +112,12 @@ AMQ_GlobalInit(void)
}
const char *
AMQ_WorkerInit(AMQ_Worker **worker, char *uri, char *qName)
AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *cn, char *qName)
{
try {
string brokerURI (uri);
Connection *conn = cn->getConnection();
string queueName (qName);
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(brokerURI, queueName));
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(conn, queueName));
*worker = w.release();
return NULL;
}
......@@ -188,7 +163,6 @@ const char *
AMQ_GlobalShutdown()
{
try {
AMQ_Worker::shutdown();
activemq::library::ActiveMQCPP::shutdownLibrary();
return NULL;
}
......
......@@ -32,9 +32,9 @@
#ifndef _AMQ_H
#define _AMQ_H
#ifdef __cplusplus
#include "amq_connection.h"
#include <vector>
#ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
......@@ -47,7 +47,6 @@ using namespace cms;
class AMQ_Worker {
private:
static std::vector<ActiveMQConnectionFactory*> factories;
Connection* connection;
Session* session;
Queue* queue;
......@@ -58,7 +57,7 @@ private:
public:
static void shutdown();
AMQ_Worker(std::string& brokerURI, std::string& qName,
AMQ_Worker(Connection* connection, std::string& qName,
Session::AcknowledgeMode ackMode, int deliveryMode);
virtual ~AMQ_Worker();
void send(std::string& text);
......@@ -73,7 +72,8 @@ extern "C" {
#endif
const char *AMQ_GlobalInit(void);
const char *AMQ_WorkerInit(AMQ_Worker **worker, char *uri, char *qName);
const char *AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *connection,
char *qName);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_Version(AMQ_Worker *worker, char *version);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "amq_connection.h"
#define CATCHALL \
catch (CMSException& cex) { \
return cex.what(); \
} \
catch (Throwable& th) { \
return th.what(); \
} \
catch (std::exception& sex) { \
return sex.what(); \
} \
catch (...) { \
return "Unexpected error"; \
}
using namespace std;
using namespace activemq::core;
using namespace cms;
using namespace decaf::lang;
ActiveMQConnectionFactory* AMQ_Connection::factory = NULL;
AMQ_Connection::AMQ_Connection(std::string& brokerURI) {
factory = new ActiveMQConnectionFactory(brokerURI);
connection = factory->createConnection();
connection->start();
}
Connection *
AMQ_Connection::getConnection() {
return connection;
}
AMQ_Connection::~AMQ_Connection() {
if (connection != NULL) {
delete connection;
connection = NULL;
}
}
const char *
AMQ_ConnectionInit(AMQ_Connection **cn, char *uri)
{
try {
string brokerURI (uri);
auto_ptr<AMQ_Connection> c (new AMQ_Connection(brokerURI));
*cn = c.release();
return NULL;
}
CATCHALL
}
const char *
AMQ_ConnectionShutdown(AMQ_Connection *cn)
{
try {
delete cn;
return NULL;
}
CATCHALL
}
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#ifndef _AMQ_CONNECTION_H
#define _AMQ_CONNECTION_H
#ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
using namespace activemq::core;
using namespace cms;
class AMQ_Connection {
private:
static ActiveMQConnectionFactory* factory;
Connection* connection;
AMQ_Connection() {};
public:
AMQ_Connection(std::string& brokerURI);
Connection* getConnection();
virtual ~AMQ_Connection();
};
#else
typedef struct AMQ_Connection AMQ_Connection;
#endif
#ifdef __cplusplus
extern "C" {
#endif
const char *AMQ_ConnectionInit(AMQ_Connection **priv, char *uri);
const char *AMQ_ConnectionShutdown(AMQ_Connection *priv);
#ifdef __cplusplus
}
#endif
#endif /* _AMQ_CONNECTION_H */
......@@ -624,7 +624,7 @@ append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
{
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
/* Data overflow */
if (entry->end + datalen + 1 > (1 << (config.maxdata_scale))) {
if (entry->end + datalen + 1 > config.maxdata) {
LOG_Log(LOG_ALERT,
"%s: Data too long, XID=%d, current length=%d, "
"DISCARDING data=[%.*s]", VSL_tags[tag], xid, entry->end,
......@@ -927,6 +927,13 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
exit(EXIT_FAILURE);
}
errmsg = MQ_InitConnections();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize message broker connections: %s",
errmsg);
exit(EXIT_FAILURE);
}
errnum = WRK_Init();
if (errnum != 0) {
LOG_Log(LOG_ERR, "Cannot prepare worker threads: %s",
......
......@@ -101,19 +101,6 @@ conf_getUnsignedInt(const char *rval, unsigned *i)
return(0); \
}
#define confUnsignedMinVal(name,fld,min) \
if (strcmp(lval, name) == 0) { \
unsigned int i; \
int err = conf_getUnsignedInt(rval, &i); \
if (err != 0) \
return err; \
if (i < min) \
return (EINVAL); \
config.fld = i; \
return(0); \
}
int
CONF_Add(const char *lval, const char *rval)
{
......@@ -126,14 +113,15 @@ CONF_Add(const char *lval, const char *rval)
confString("processor.log", processor_log);
confString("mq.qname", mq_qname);
confUnsignedMinVal("maxopen.scale", maxopen_scale, MIN_MAXOPEN_SCALE);
confUnsignedMinVal("maxdone.scale", maxdone_scale, MIN_MAXDONE_SCALE);
confUnsignedMinVal("maxdata.scale", maxdata_scale, MIN_MAXDATA_SCALE);
confUnsigned("qlen_goal.scale", qlen_goal_scale);
confUnsigned("hash_max_probes", hash_max_probes);
confUnsigned("hash_ttl", hash_ttl);
confUnsigned("hash_mlt", hash_mlt);
confUnsigned("maxopen.scale", maxopen_scale);
confUnsigned("maxdone", maxdone);
confUnsigned("maxdata", maxdata);
confUnsigned("qlen.goal", qlen_goal);
confUnsigned("hash.max_probes", hash_max_probes);
confUnsigned("hash.ttl", hash_ttl);
confUnsigned("hash.mlt", hash_mlt);
confUnsigned("nworkers", nworkers);
confUnsigned("mq.pool_size", mq_pool_size);
confUnsigned("restarts", restarts);
confUnsigned("monitor.interval", monitor_interval);
......@@ -230,18 +218,19 @@ CONF_Init(void)
config.monitor_interval = 30;
config.monitor_workers = false;
config.processor_log[0] = '\0';
config.maxopen_scale = MIN_MAXOPEN_SCALE;
config.maxdone_scale = MIN_MAXDONE_SCALE;
config.maxdata_scale = MIN_MAXDATA_SCALE;
config.qlen_goal_scale = DEF_QLEN_GOAL_SCALE;
config.maxopen_scale = DEF_MAXOPEN_SCALE;
config.maxdone = DEF_MAXDONE;
config.maxdata = DEF_MAXDATA;
config.qlen_goal = DEF_QLEN_GOAL;
config.hash_max_probes = DEF_HASH_MAX_PROBES;
config.hash_ttl = DEF_HASH_TTL;
config.hash_mlt = DEF_HASH_MTL;
config.hash_mlt = DEF_HASH_MLT;
config.n_mq_uris = 0;
config.mq_uri = (char **) malloc (sizeof(char **));
AN(config.mq_uri);
config.mq_qname[0] = '\0';
config.mq_pool_size = 5;
config.nworkers = 1;
config.restarts = 1;
......@@ -334,11 +323,12 @@ CONF_Dump(void)
confdump("monitor.workers = %s", config.monitor_workers ? "true" : "false");
confdump("processor.log = %s", config.processor_log);
confdump("maxopen.scale = %u", config.maxopen_scale);
confdump("maxdata.scale = %u", config.maxdata_scale);
confdump("qlen_goal.scale = %u", config.qlen_goal_scale);
confdump("hash_max_probes = %u", config.hash_max_probes);
confdump("hash_ttl = %u", config.hash_ttl);
confdump("hash_mlt = %u", config.hash_mlt);
confdump("maxdone = %u", config.maxdone);
confdump("maxdata = %u", config.maxdata);
confdump("qlen.goal = %u", config.qlen_goal);
confdump("hash.max_probes = %u", config.hash_max_probes);
confdump("hash.ttl = %u", config.hash_ttl);
confdump("hash.mlt = %u", config.hash_mlt);
if (config.n_mq_uris > 0)
for (int i = 0; i < config.n_mq_uris; i++)
......@@ -347,6 +337,7 @@ CONF_Dump(void)
LOG_Log0(LOG_DEBUG, "config: mq.uri = ");
confdump("mq.qname = %s", config.mq_qname);
confdump("mq.pool_size = %u", config.mq_pool_size);
confdump("nworkers = %u", config.nworkers);
confdump("restarts = %u", config.restarts);
confdump("user = %s", config.user_name);
......
......@@ -53,13 +53,13 @@ DATA_Init(void)
dataentry *entryptr;
char *bufptr;
int bufsize = 1 << config.maxdata_scale;
int bufsize = config.maxdata;
/*
* we want enough space to accomodate all open and done records
*
*/
int entries = (1 << config.maxopen_scale) + (1 << config.maxdone_scale);
int entries = (1 << config.maxopen_scale) + config.maxdone;
entryptr = (dataentry *) calloc(entries, sizeof(dataentry));
if (entryptr == NULL)
......
......@@ -29,8 +29,20 @@
*
*/
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include "trackrdrd.h"
#include "activemq/amq.h"
#include "activemq/amq_connection.h"
#include "vas.h"
static AMQ_Connection **connections;
static unsigned nconnections;
static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
static unsigned connection = 0;
const char *
MQ_GlobalInit(void)
......@@ -39,9 +51,37 @@ MQ_GlobalInit(void)
}
const char *
MQ_WorkerInit(void **priv, char *uri)
MQ_InitConnections(void)
{
return AMQ_WorkerInit((AMQ_Worker **) priv, uri, config.mq_qname);
AMQ_Connection *conn;
const char *err;
if (config.n_mq_uris == 0)
return NULL;
nconnections = config.n_mq_uris * config.mq_pool_size;
connections = (AMQ_Connection **) calloc(sizeof(AMQ_Connection *),
nconnections);
if (connections == NULL)
return strerror(errno);
for (int i = 0; i < config.n_mq_uris; i++)
for (int j = 0; j < config.mq_pool_size; j++) {
err = AMQ_ConnectionInit(&conn, config.mq_uri[i]);
if (err != NULL)
return err;
connections[i*config.mq_pool_size + j] = conn;
}
return NULL;
}
const char *
MQ_WorkerInit(void **priv)
{
AN(nconnections);
AZ(pthread_mutex_lock(&connection_lock));
AMQ_Connection *conn = connections[connection++ % nconnections];
AZ(pthread_mutex_unlock(&connection_lock));
return AMQ_WorkerInit((AMQ_Worker **) priv, conn, config.mq_qname);
}
const char *
......@@ -69,6 +109,11 @@ MQ_WorkerShutdown(void **priv)
const char *
MQ_GlobalShutdown(void)
{
const char *err;
for (int i; i < config.n_mq_uris; i++)
if ((err = AMQ_ConnectionShutdown(connections[i])) != NULL)
return err;
return AMQ_GlobalShutdown();
}
......@@ -88,7 +88,7 @@ SPMCQ_Init(void)
{
void *buf;
size_t n = 1 << config.maxdone_scale;
size_t n = config.maxdone;
buf = calloc(n, sizeof(void *));
if (buf == NULL)
return(errno);
......@@ -101,7 +101,7 @@ SPMCQ_Init(void)
.tail = 0 };
memcpy(&spmcq, &q, sizeof(spmcq_t));
qlen_goal = 1 << config.qlen_goal_scale;
qlen_goal = config.qlen_goal;
atexit(spmcq_cleanup);
return(0);
......@@ -174,7 +174,7 @@ main(int argc, char * const *argv)
printf("\nTEST: %s\n", argv[0]);
printf("... test SMPCQ enqueue at UINT_MAX overflow\n");
config.maxdone_scale = 10;
config.maxdone = 1024;
SPMCQ_Init();
spmcq.head = spmcq.tail = UINT_MAX - 2;
......
......@@ -31,8 +31,10 @@ test_mq_SOURCES = \
test_mq_LDADD = \
-lm \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../amq_connection.$(OBJEXT) \
@AMQ_LIBS@
test_spmcq_SOURCES = \
......@@ -67,6 +69,7 @@ test_worker_LDADD = \
../mq.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \
../amq_connection.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
......
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '2485830008 234066' ]; then
if [ "$CKSUM" != '3826901182 234120' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
# Test configuration for the varnish log tracking reader
log.file = /tmp/trackrdrd.log
pid.file = trackrdrd.pid
maxdata.scale = 12
maxdata = 4096
maxopen.scale = 11
maxdone.scale = 10
maxdone = 1024
monitor.interval = 0
nworkers = 0
......@@ -50,7 +50,7 @@ static char
printf("... testing data table initialization\n");
config.maxopen_scale = 10;
config.maxdone_scale = 10;
config.maxdone = 1024;
err = DATA_Init();
sprintf(errmsg, "DATA_Init: %s", strerror(err));
mu_assert(errmsg, err == 0);
......
......@@ -34,6 +34,7 @@
#include "minunit.h"
#include "../trackrdrd.h"
#include "vas.h"
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
......@@ -57,21 +58,41 @@ static char
return NULL;
}
static const char
*test_worker_init(void)
static char
*test_init_connection(void)
{
const char *err;
char uri[sizeof("tcp://localhost:61616")];
printf("... test worker init (including connect to ActiveMQ)\n");
printf("... testing MQ connection initialization\n");
strcpy(uri, "tcp://localhost:61616");
config.n_mq_uris = 1;
config.mq_uri = (char **) malloc(sizeof(char **));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
err = MQ_WorkerInit(&worker, uri);
config.mq_pool_size = 1;
err = MQ_InitConnections();
if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED);
}
sprintf(errmsg, "MQ_InitConnections: %s", err);
mu_assert(errmsg, err == NULL);
return NULL;
}
static const char
*test_worker_init(void)
{
const char *err;
printf("... test worker init (including connect to ActiveMQ)\n");
err = MQ_WorkerInit(&worker);
sprintf(errmsg, "MQ_WorkerInit: %s", err);
mu_assert(errmsg, err == NULL);
......@@ -149,6 +170,7 @@ static const char
*all_tests(void)
{
mu_run_test(test_global_init);
mu_run_test(test_init_connection);
mu_run_test(test_worker_init);
mu_run_test(test_version);
mu_run_test(test_send);
......
......@@ -44,7 +44,7 @@
#define NCON 10
#define MIN_TABLE_SCALE (MIN_MAXOPEN_SCALE + MIN_MAXDONE_SCALE)
#define TABLE_SIZE ((1 << DEF_MAXOPEN_SCALE) + DEF_MAXDONE)
int run;
......@@ -65,7 +65,7 @@ typedef struct {
int tests_run = 0;
static char errmsg[BUFSIZ];
static unsigned xids[1 << MIN_TABLE_SCALE];
static unsigned xids[TABLE_SIZE];
static prod_con_data_t proddata;
static prod_con_data_t condata[NCON];
......@@ -82,7 +82,7 @@ static void
srand48(time(NULL));
unsigned xid = (unsigned int) lrand48();
for (int i = 0; i < (1 << MIN_MAXOPEN_SCALE); i++) {
for (int i = 0; i < (1 << DEF_MAXOPEN_SCALE); i++) {
xids[i] = xid;
debug_print("Producer: enqueue %d (xid = %u)\n", ++enqs, xid);
if (!SPMCQ_Enq(&xids[i])) {
......@@ -173,8 +173,8 @@ static char
return(errmsg);
}
config.maxopen_scale = MIN_MAXOPEN_SCALE;
config.maxdone_scale = MIN_MAXDONE_SCALE;
config.maxopen_scale = DEF_MAXOPEN_SCALE;
config.maxdone = DEF_MAXDONE;
err = SPMCQ_Init();
sprintf(errmsg, "SPMCQ_Init: %s", strerror(err));
mu_assert(errmsg, err == 0);
......
......@@ -48,6 +48,9 @@
#define NWORKERS 5
#define URI1 "tcp://localhost:61616?wireFormat.maxInactivityDuration=0"
#define URI2 "tcp://localhost:61616?connection.sendTimeout=1000&wireFormat.maxInactivityDuration=0"
int tests_run = 0;
static char errmsg[BUFSIZ];
......@@ -61,26 +64,30 @@ static char
printf("... testing worker initialization\n");
config.maxopen_scale = 10;
config.maxdone_scale = 10;
config.maxdone = 1024;
config.maxdata = 1024;
config.nworkers = NWORKERS;
strcpy(config.mq_qname, "lhoste/tracking/test");
config.mq_pool_size = 2;
config.n_mq_uris = 2;
config.mq_uri = (char **) malloc(2 * sizeof(char**));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
config.mq_uri[0] = (char *) malloc(strlen(URI1) + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616");
config.mq_uri[1] = (char *)
malloc(strlen("tcp://localhost:61616?connection.sendTimeout=1000") + 1);
strcpy(config.mq_uri[0], URI1);
config.mq_uri[1] = (char *) malloc(strlen(URI2) + 1);
AN(config.mq_uri[1]);
strcpy(config.mq_uri[1],
"tcp://localhost:61616?connection.sendTimeout=1000");
strcpy(config.mq_uri[1], URI2);
error = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit failed: %s", error);
mu_assert(errmsg, error == NULL);
error = MQ_InitConnections();
sprintf(errmsg, "MQ_InitConnections failed: %s", error);
mu_assert(errmsg, error == NULL);
err = WRK_Init();
sprintf(errmsg, "WRK_Init: %s", strerror(err));
mu_assert(errmsg, err == 0);
......
......@@ -151,7 +151,8 @@ int spmcq_datawaiter;
/* mq.c */
const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv, char *uri);
const char *MQ_InitConnections(void);
const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version);
const char *MQ_WorkerShutdown(void **priv);
......@@ -261,23 +262,25 @@ struct config {
/* scale: unit is log(2,n), iow scale is taken to the power of 2 */
unsigned maxopen_scale; /* max number of records in *_OPEN state */
#define MIN_MAXOPEN_SCALE 10
unsigned maxdone_scale; /* max number of records in *_DONE state */
#define MIN_MAXDONE_SCALE 10
unsigned maxdata_scale; /* scale for char data buffer */
#define MIN_MAXDATA_SCALE 10
#define DEF_MAXOPEN_SCALE 10
unsigned maxdone; /* max number of records in *_DONE state */
#define DEF_MAXDONE 1024
unsigned maxdata; /* size of char data buffer */
#define DEF_MAXDATA 1024
/*
* scale of queue-length goal:
* queue-length goal:
*
* we scale te number of running workers dynamically propotionally to the
* queue length.
* we scale the number of running workers dynamically propotionally to
* the queue length.
*
* this scale (log(2,n)) specifies the queue length at which all workers
* should be running
* this specifies the queue length at which all workers should be
* running
*/
unsigned qlen_goal_scale;
#define DEF_QLEN_GOAL_SCALE 10
unsigned qlen_goal;
#define DEF_QLEN_GOAL 1024
/* max number of probes for insert/lookup */
unsigned hash_max_probes;
......@@ -289,25 +292,27 @@ struct config {
* entries which are older than this ttl _may_ get expired from the
* trackrdrd state.
*
* set to a value significantly longer than your maximum session lifetime in
* varnish.
* set to a value significantly longer than your maximum session
* lifetime in varnish.
*/
unsigned hash_ttl;
#define DEF_HASH_TTL 120
/*
* hash_mlt: min lifetime for entries in HASH_OPEN before they could get evacuated
* hash_mlt: min lifetime for entries in HASH_OPEN before they could
* get evacuated
*
* entries are guaranteed to remain in trackrdrd for this duration.
* once the mlt is reached, they _may_ get expired if trackrdrd needs space
* in the hash
* once the mlt is reached, they _may_ get expired if trackrdrd needs
* space in the hash
*/
unsigned hash_mlt;
#define DEF_HASH_MTL 5
#define DEF_HASH_MLT 5
unsigned n_mq_uris;
char **mq_uri;
char mq_qname[BUFSIZ];
unsigned mq_pool_size;
unsigned nworkers;
unsigned restarts;
char user_name[BUFSIZ];
......
......@@ -137,15 +137,14 @@ static void
void *amq_worker;
dataentry *entry;
const char *err;
char version[VERSION_LEN], *uri;
char version[VERSION_LEN];
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING;
wrk->tid = pthread_self();
uri = config.mq_uri[wrk->id % config.n_mq_uris];
err = MQ_WorkerInit(&amq_worker, uri);
err = MQ_WorkerInit(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err);
......@@ -168,7 +167,7 @@ static void
running++;
AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: running (%s, uri=%s)", wrk->id, version, uri);
LOG_Log(LOG_INFO, "Worker %d: running (%s)", wrk->id, version);
while (run) {
entry = (dataentry *) SPMCQ_Deq();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment