Commit 37d422e7 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - added amq_connection.* and connection pooling

	- reworked config params:
		- maxopen.scale can be an power of 2
		- maxdone, maxdata and qlen.goal not powers of 2
parent 45e72d0c
...@@ -15,6 +15,8 @@ trackrdrd_SOURCES = \ ...@@ -15,6 +15,8 @@ trackrdrd_SOURCES = \
mq.c \ mq.c \
activemq/amq.h \ activemq/amq.h \
activemq/amq.cpp \ activemq/amq.cpp \
activemq/amq_connection.h \
activemq/amq_connection.cpp \
spmcq.c \ spmcq.c \
worker.c \ worker.c \
sandbox.c \ sandbox.c \
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include "amq.h" #include "amq.h"
#include <activemq/library/ActiveMQCPP.h> #include <activemq/library/ActiveMQCPP.h>
#include <activemq/core/ActiveMQConnection.h>
#include <decaf/lang/exceptions/NullPointerException.h> #include <decaf/lang/exceptions/NullPointerException.h>
#include <cms/IllegalStateException.h> #include <cms/IllegalStateException.h>
#include <cms/ConnectionMetaData.h> #include <cms/ConnectionMetaData.h>
...@@ -57,32 +58,11 @@ using namespace cms; ...@@ -57,32 +58,11 @@ using namespace cms;
using namespace decaf::lang; using namespace decaf::lang;
using namespace decaf::lang::exceptions; using namespace decaf::lang::exceptions;
std::vector<ActiveMQConnectionFactory*> AMQ_Worker::factories; AMQ_Worker::AMQ_Worker(Connection* cn, std::string& qName,
void
AMQ_Worker::shutdown() {
for (unsigned i = 0; i < factories.size(); i++)
delete factories[i];
factories.resize(0);
}
AMQ_Worker::AMQ_Worker(std::string& brokerURI, std::string& qName,
Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE, Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE,
int deliveryMode = DeliveryMode::NON_PERSISTENT) { int deliveryMode = DeliveryMode::NON_PERSISTENT) {
ActiveMQConnectionFactory* factory = NULL; connection = cn;
for (unsigned i = 0; i < factories.size(); i++)
if (factories[i]->getBrokerURI().toString().compare(brokerURI) == 0) {
factory = factories[i];
break;
}
if (factory == NULL) {
factory = new ActiveMQConnectionFactory(brokerURI);
factories.push_back(factory);
}
connection = factory->createConnection();
connection->start();
session = connection->createSession(ackMode); session = connection->createSession(ackMode);
queue = session->createQueue(qName); queue = session->createQueue(qName);
producer = session->createProducer(queue); producer = session->createProducer(queue);
...@@ -103,11 +83,6 @@ AMQ_Worker::~AMQ_Worker() { ...@@ -103,11 +83,6 @@ AMQ_Worker::~AMQ_Worker() {
delete session; delete session;
session = NULL; session = NULL;
} }
if (connection != NULL) {
connection->close();
delete connection;
connection = NULL;
}
} }
void void
...@@ -137,12 +112,12 @@ AMQ_GlobalInit(void) ...@@ -137,12 +112,12 @@ AMQ_GlobalInit(void)
} }
const char * const char *
AMQ_WorkerInit(AMQ_Worker **worker, char *uri, char *qName) AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *cn, char *qName)
{ {
try { try {
string brokerURI (uri); Connection *conn = cn->getConnection();
string queueName (qName); string queueName (qName);
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(brokerURI, queueName)); std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(conn, queueName));
*worker = w.release(); *worker = w.release();
return NULL; return NULL;
} }
...@@ -188,7 +163,6 @@ const char * ...@@ -188,7 +163,6 @@ const char *
AMQ_GlobalShutdown() AMQ_GlobalShutdown()
{ {
try { try {
AMQ_Worker::shutdown();
activemq::library::ActiveMQCPP::shutdownLibrary(); activemq::library::ActiveMQCPP::shutdownLibrary();
return NULL; return NULL;
} }
......
...@@ -32,9 +32,9 @@ ...@@ -32,9 +32,9 @@
#ifndef _AMQ_H #ifndef _AMQ_H
#define _AMQ_H #define _AMQ_H
#ifdef __cplusplus #include "amq_connection.h"
#include <vector> #ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h> #include <cms/Connection.h>
...@@ -47,7 +47,6 @@ using namespace cms; ...@@ -47,7 +47,6 @@ using namespace cms;
class AMQ_Worker { class AMQ_Worker {
private: private:
static std::vector<ActiveMQConnectionFactory*> factories;
Connection* connection; Connection* connection;
Session* session; Session* session;
Queue* queue; Queue* queue;
...@@ -58,7 +57,7 @@ private: ...@@ -58,7 +57,7 @@ private:
public: public:
static void shutdown(); static void shutdown();
AMQ_Worker(std::string& brokerURI, std::string& qName, AMQ_Worker(Connection* connection, std::string& qName,
Session::AcknowledgeMode ackMode, int deliveryMode); Session::AcknowledgeMode ackMode, int deliveryMode);
virtual ~AMQ_Worker(); virtual ~AMQ_Worker();
void send(std::string& text); void send(std::string& text);
...@@ -73,7 +72,8 @@ extern "C" { ...@@ -73,7 +72,8 @@ extern "C" {
#endif #endif
const char *AMQ_GlobalInit(void); const char *AMQ_GlobalInit(void);
const char *AMQ_WorkerInit(AMQ_Worker **worker, char *uri, char *qName); const char *AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *connection,
char *qName);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len); const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_Version(AMQ_Worker *worker, char *version); const char *AMQ_Version(AMQ_Worker *worker, char *version);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker); const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "amq_connection.h"
#define CATCHALL \
catch (CMSException& cex) { \
return cex.what(); \
} \
catch (Throwable& th) { \
return th.what(); \
} \
catch (std::exception& sex) { \
return sex.what(); \
} \
catch (...) { \
return "Unexpected error"; \
}
using namespace std;
using namespace activemq::core;
using namespace cms;
using namespace decaf::lang;
ActiveMQConnectionFactory* AMQ_Connection::factory = NULL;
AMQ_Connection::AMQ_Connection(std::string& brokerURI) {
factory = new ActiveMQConnectionFactory(brokerURI);
connection = factory->createConnection();
connection->start();
}
Connection *
AMQ_Connection::getConnection() {
return connection;
}
AMQ_Connection::~AMQ_Connection() {
if (connection != NULL) {
delete connection;
connection = NULL;
}
}
const char *
AMQ_ConnectionInit(AMQ_Connection **cn, char *uri)
{
try {
string brokerURI (uri);
auto_ptr<AMQ_Connection> c (new AMQ_Connection(brokerURI));
*cn = c.release();
return NULL;
}
CATCHALL
}
const char *
AMQ_ConnectionShutdown(AMQ_Connection *cn)
{
try {
delete cn;
return NULL;
}
CATCHALL
}
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#ifndef _AMQ_CONNECTION_H
#define _AMQ_CONNECTION_H
#ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
using namespace activemq::core;
using namespace cms;
class AMQ_Connection {
private:
static ActiveMQConnectionFactory* factory;
Connection* connection;
AMQ_Connection() {};
public:
AMQ_Connection(std::string& brokerURI);
Connection* getConnection();
virtual ~AMQ_Connection();
};
#else
typedef struct AMQ_Connection AMQ_Connection;
#endif
#ifdef __cplusplus
extern "C" {
#endif
const char *AMQ_ConnectionInit(AMQ_Connection **priv, char *uri);
const char *AMQ_ConnectionShutdown(AMQ_Connection *priv);
#ifdef __cplusplus
}
#endif
#endif /* _AMQ_CONNECTION_H */
...@@ -624,7 +624,7 @@ append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data, ...@@ -624,7 +624,7 @@ append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
{ {
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC); CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
/* Data overflow */ /* Data overflow */
if (entry->end + datalen + 1 > (1 << (config.maxdata_scale))) { if (entry->end + datalen + 1 > config.maxdata) {
LOG_Log(LOG_ALERT, LOG_Log(LOG_ALERT,
"%s: Data too long, XID=%d, current length=%d, " "%s: Data too long, XID=%d, current length=%d, "
"DISCARDING data=[%.*s]", VSL_tags[tag], xid, entry->end, "DISCARDING data=[%.*s]", VSL_tags[tag], xid, entry->end,
...@@ -927,6 +927,13 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig) ...@@ -927,6 +927,13 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
errmsg = MQ_InitConnections();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize message broker connections: %s",
errmsg);
exit(EXIT_FAILURE);
}
errnum = WRK_Init(); errnum = WRK_Init();
if (errnum != 0) { if (errnum != 0) {
LOG_Log(LOG_ERR, "Cannot prepare worker threads: %s", LOG_Log(LOG_ERR, "Cannot prepare worker threads: %s",
......
...@@ -101,19 +101,6 @@ conf_getUnsignedInt(const char *rval, unsigned *i) ...@@ -101,19 +101,6 @@ conf_getUnsignedInt(const char *rval, unsigned *i)
return(0); \ return(0); \
} }
#define confUnsignedMinVal(name,fld,min) \
if (strcmp(lval, name) == 0) { \
unsigned int i; \
int err = conf_getUnsignedInt(rval, &i); \
if (err != 0) \
return err; \
if (i < min) \
return (EINVAL); \
config.fld = i; \
return(0); \
}
int int
CONF_Add(const char *lval, const char *rval) CONF_Add(const char *lval, const char *rval)
{ {
...@@ -126,14 +113,15 @@ CONF_Add(const char *lval, const char *rval) ...@@ -126,14 +113,15 @@ CONF_Add(const char *lval, const char *rval)
confString("processor.log", processor_log); confString("processor.log", processor_log);
confString("mq.qname", mq_qname); confString("mq.qname", mq_qname);
confUnsignedMinVal("maxopen.scale", maxopen_scale, MIN_MAXOPEN_SCALE); confUnsigned("maxopen.scale", maxopen_scale);
confUnsignedMinVal("maxdone.scale", maxdone_scale, MIN_MAXDONE_SCALE); confUnsigned("maxdone", maxdone);
confUnsignedMinVal("maxdata.scale", maxdata_scale, MIN_MAXDATA_SCALE); confUnsigned("maxdata", maxdata);
confUnsigned("qlen_goal.scale", qlen_goal_scale); confUnsigned("qlen.goal", qlen_goal);
confUnsigned("hash_max_probes", hash_max_probes); confUnsigned("hash.max_probes", hash_max_probes);
confUnsigned("hash_ttl", hash_ttl); confUnsigned("hash.ttl", hash_ttl);
confUnsigned("hash_mlt", hash_mlt); confUnsigned("hash.mlt", hash_mlt);
confUnsigned("nworkers", nworkers); confUnsigned("nworkers", nworkers);
confUnsigned("mq.pool_size", mq_pool_size);
confUnsigned("restarts", restarts); confUnsigned("restarts", restarts);
confUnsigned("monitor.interval", monitor_interval); confUnsigned("monitor.interval", monitor_interval);
...@@ -230,18 +218,19 @@ CONF_Init(void) ...@@ -230,18 +218,19 @@ CONF_Init(void)
config.monitor_interval = 30; config.monitor_interval = 30;
config.monitor_workers = false; config.monitor_workers = false;
config.processor_log[0] = '\0'; config.processor_log[0] = '\0';
config.maxopen_scale = MIN_MAXOPEN_SCALE; config.maxopen_scale = DEF_MAXOPEN_SCALE;
config.maxdone_scale = MIN_MAXDONE_SCALE; config.maxdone = DEF_MAXDONE;
config.maxdata_scale = MIN_MAXDATA_SCALE; config.maxdata = DEF_MAXDATA;
config.qlen_goal_scale = DEF_QLEN_GOAL_SCALE; config.qlen_goal = DEF_QLEN_GOAL;
config.hash_max_probes = DEF_HASH_MAX_PROBES; config.hash_max_probes = DEF_HASH_MAX_PROBES;
config.hash_ttl = DEF_HASH_TTL; config.hash_ttl = DEF_HASH_TTL;
config.hash_mlt = DEF_HASH_MTL; config.hash_mlt = DEF_HASH_MLT;
config.n_mq_uris = 0; config.n_mq_uris = 0;
config.mq_uri = (char **) malloc (sizeof(char **)); config.mq_uri = (char **) malloc (sizeof(char **));
AN(config.mq_uri); AN(config.mq_uri);
config.mq_qname[0] = '\0'; config.mq_qname[0] = '\0';
config.mq_pool_size = 5;
config.nworkers = 1; config.nworkers = 1;
config.restarts = 1; config.restarts = 1;
...@@ -334,11 +323,12 @@ CONF_Dump(void) ...@@ -334,11 +323,12 @@ CONF_Dump(void)
confdump("monitor.workers = %s", config.monitor_workers ? "true" : "false"); confdump("monitor.workers = %s", config.monitor_workers ? "true" : "false");
confdump("processor.log = %s", config.processor_log); confdump("processor.log = %s", config.processor_log);
confdump("maxopen.scale = %u", config.maxopen_scale); confdump("maxopen.scale = %u", config.maxopen_scale);
confdump("maxdata.scale = %u", config.maxdata_scale); confdump("maxdone = %u", config.maxdone);
confdump("qlen_goal.scale = %u", config.qlen_goal_scale); confdump("maxdata = %u", config.maxdata);
confdump("hash_max_probes = %u", config.hash_max_probes); confdump("qlen.goal = %u", config.qlen_goal);
confdump("hash_ttl = %u", config.hash_ttl); confdump("hash.max_probes = %u", config.hash_max_probes);
confdump("hash_mlt = %u", config.hash_mlt); confdump("hash.ttl = %u", config.hash_ttl);
confdump("hash.mlt = %u", config.hash_mlt);
if (config.n_mq_uris > 0) if (config.n_mq_uris > 0)
for (int i = 0; i < config.n_mq_uris; i++) for (int i = 0; i < config.n_mq_uris; i++)
...@@ -347,6 +337,7 @@ CONF_Dump(void) ...@@ -347,6 +337,7 @@ CONF_Dump(void)
LOG_Log0(LOG_DEBUG, "config: mq.uri = "); LOG_Log0(LOG_DEBUG, "config: mq.uri = ");
confdump("mq.qname = %s", config.mq_qname); confdump("mq.qname = %s", config.mq_qname);
confdump("mq.pool_size = %u", config.mq_pool_size);
confdump("nworkers = %u", config.nworkers); confdump("nworkers = %u", config.nworkers);
confdump("restarts = %u", config.restarts); confdump("restarts = %u", config.restarts);
confdump("user = %s", config.user_name); confdump("user = %s", config.user_name);
......
...@@ -53,13 +53,13 @@ DATA_Init(void) ...@@ -53,13 +53,13 @@ DATA_Init(void)
dataentry *entryptr; dataentry *entryptr;
char *bufptr; char *bufptr;
int bufsize = 1 << config.maxdata_scale; int bufsize = config.maxdata;
/* /*
* we want enough space to accomodate all open and done records * we want enough space to accomodate all open and done records
* *
*/ */
int entries = (1 << config.maxopen_scale) + (1 << config.maxdone_scale); int entries = (1 << config.maxopen_scale) + config.maxdone;
entryptr = (dataentry *) calloc(entries, sizeof(dataentry)); entryptr = (dataentry *) calloc(entries, sizeof(dataentry));
if (entryptr == NULL) if (entryptr == NULL)
......
...@@ -29,8 +29,20 @@ ...@@ -29,8 +29,20 @@
* *
*/ */
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include "trackrdrd.h" #include "trackrdrd.h"
#include "activemq/amq.h" #include "activemq/amq.h"
#include "activemq/amq_connection.h"
#include "vas.h"
static AMQ_Connection **connections;
static unsigned nconnections;
static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
static unsigned connection = 0;
const char * const char *
MQ_GlobalInit(void) MQ_GlobalInit(void)
...@@ -39,9 +51,37 @@ MQ_GlobalInit(void) ...@@ -39,9 +51,37 @@ MQ_GlobalInit(void)
} }
const char * const char *
MQ_WorkerInit(void **priv, char *uri) MQ_InitConnections(void)
{ {
return AMQ_WorkerInit((AMQ_Worker **) priv, uri, config.mq_qname); AMQ_Connection *conn;
const char *err;
if (config.n_mq_uris == 0)
return NULL;
nconnections = config.n_mq_uris * config.mq_pool_size;
connections = (AMQ_Connection **) calloc(sizeof(AMQ_Connection *),
nconnections);
if (connections == NULL)
return strerror(errno);
for (int i = 0; i < config.n_mq_uris; i++)
for (int j = 0; j < config.mq_pool_size; j++) {
err = AMQ_ConnectionInit(&conn, config.mq_uri[i]);
if (err != NULL)
return err;
connections[i*config.mq_pool_size + j] = conn;
}
return NULL;
}
const char *
MQ_WorkerInit(void **priv)
{
AN(nconnections);
AZ(pthread_mutex_lock(&connection_lock));
AMQ_Connection *conn = connections[connection++ % nconnections];
AZ(pthread_mutex_unlock(&connection_lock));
return AMQ_WorkerInit((AMQ_Worker **) priv, conn, config.mq_qname);
} }
const char * const char *
...@@ -69,6 +109,11 @@ MQ_WorkerShutdown(void **priv) ...@@ -69,6 +109,11 @@ MQ_WorkerShutdown(void **priv)
const char * const char *
MQ_GlobalShutdown(void) MQ_GlobalShutdown(void)
{ {
const char *err;
for (int i; i < config.n_mq_uris; i++)
if ((err = AMQ_ConnectionShutdown(connections[i])) != NULL)
return err;
return AMQ_GlobalShutdown(); return AMQ_GlobalShutdown();
} }
...@@ -88,7 +88,7 @@ SPMCQ_Init(void) ...@@ -88,7 +88,7 @@ SPMCQ_Init(void)
{ {
void *buf; void *buf;
size_t n = 1 << config.maxdone_scale; size_t n = config.maxdone;
buf = calloc(n, sizeof(void *)); buf = calloc(n, sizeof(void *));
if (buf == NULL) if (buf == NULL)
return(errno); return(errno);
...@@ -101,7 +101,7 @@ SPMCQ_Init(void) ...@@ -101,7 +101,7 @@ SPMCQ_Init(void)
.tail = 0 }; .tail = 0 };
memcpy(&spmcq, &q, sizeof(spmcq_t)); memcpy(&spmcq, &q, sizeof(spmcq_t));
qlen_goal = 1 << config.qlen_goal_scale; qlen_goal = config.qlen_goal;
atexit(spmcq_cleanup); atexit(spmcq_cleanup);
return(0); return(0);
...@@ -174,7 +174,7 @@ main(int argc, char * const *argv) ...@@ -174,7 +174,7 @@ main(int argc, char * const *argv)
printf("\nTEST: %s\n", argv[0]); printf("\nTEST: %s\n", argv[0]);
printf("... test SMPCQ enqueue at UINT_MAX overflow\n"); printf("... test SMPCQ enqueue at UINT_MAX overflow\n");
config.maxdone_scale = 10; config.maxdone = 1024;
SPMCQ_Init(); SPMCQ_Init();
spmcq.head = spmcq.tail = UINT_MAX - 2; spmcq.head = spmcq.tail = UINT_MAX - 2;
......
...@@ -31,8 +31,10 @@ test_mq_SOURCES = \ ...@@ -31,8 +31,10 @@ test_mq_SOURCES = \
test_mq_LDADD = \ test_mq_LDADD = \
-lm \ -lm \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../mq.$(OBJEXT) \ ../mq.$(OBJEXT) \
../amq.$(OBJEXT) \ ../amq.$(OBJEXT) \
../amq_connection.$(OBJEXT) \
@AMQ_LIBS@ @AMQ_LIBS@
test_spmcq_SOURCES = \ test_spmcq_SOURCES = \
...@@ -67,6 +69,7 @@ test_worker_LDADD = \ ...@@ -67,6 +69,7 @@ test_worker_LDADD = \
../mq.$(OBJEXT) \ ../mq.$(OBJEXT) \
../spmcq.$(OBJEXT) \ ../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \ ../amq.$(OBJEXT) \
../amq_connection.$(OBJEXT) \
../data.$(OBJEXT) \ ../data.$(OBJEXT) \
@AMQ_LIBS@ @AMQ_LIBS@
......
...@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf" ...@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it # the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum) CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '2485830008 234066' ]; then if [ "$CKSUM" != '3826901182 234120' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM" echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1 exit 1
fi fi
......
# Test configuration for the varnish log tracking reader # Test configuration for the varnish log tracking reader
log.file = /tmp/trackrdrd.log log.file = /tmp/trackrdrd.log
pid.file = trackrdrd.pid pid.file = trackrdrd.pid
maxdata.scale = 12 maxdata = 4096
maxopen.scale = 11 maxopen.scale = 11
maxdone.scale = 10 maxdone = 1024
monitor.interval = 0 monitor.interval = 0
nworkers = 0 nworkers = 0
...@@ -50,7 +50,7 @@ static char ...@@ -50,7 +50,7 @@ static char
printf("... testing data table initialization\n"); printf("... testing data table initialization\n");
config.maxopen_scale = 10; config.maxopen_scale = 10;
config.maxdone_scale = 10; config.maxdone = 1024;
err = DATA_Init(); err = DATA_Init();
sprintf(errmsg, "DATA_Init: %s", strerror(err)); sprintf(errmsg, "DATA_Init: %s", strerror(err));
mu_assert(errmsg, err == 0); mu_assert(errmsg, err == 0);
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "minunit.h" #include "minunit.h"
#include "../trackrdrd.h" #include "../trackrdrd.h"
#include "vas.h"
/* Automake exit code for "skipped" in make check */ /* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77 #define EXIT_SKIPPED 77
...@@ -57,21 +58,41 @@ static char ...@@ -57,21 +58,41 @@ static char
return NULL; return NULL;
} }
static const char static char
*test_worker_init(void) *test_init_connection(void)
{ {
const char *err; const char *err;
char uri[sizeof("tcp://localhost:61616")];
printf("... test worker init (including connect to ActiveMQ)\n"); printf("... testing MQ connection initialization\n");
strcpy(uri, "tcp://localhost:61616"); config.n_mq_uris = 1;
config.mq_uri = (char **) malloc(sizeof(char **));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test"); strcpy(config.mq_qname, "lhoste/tracking/test");
err = MQ_WorkerInit(&worker, uri); config.mq_pool_size = 1;
err = MQ_InitConnections();
if (err != NULL && strstr(err, "Connection refused") != NULL) { if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n"); printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED); exit(EXIT_SKIPPED);
} }
sprintf(errmsg, "MQ_InitConnections: %s", err);
mu_assert(errmsg, err == NULL);
return NULL;
}
static const char
*test_worker_init(void)
{
const char *err;
printf("... test worker init (including connect to ActiveMQ)\n");
err = MQ_WorkerInit(&worker);
sprintf(errmsg, "MQ_WorkerInit: %s", err); sprintf(errmsg, "MQ_WorkerInit: %s", err);
mu_assert(errmsg, err == NULL); mu_assert(errmsg, err == NULL);
...@@ -149,6 +170,7 @@ static const char ...@@ -149,6 +170,7 @@ static const char
*all_tests(void) *all_tests(void)
{ {
mu_run_test(test_global_init); mu_run_test(test_global_init);
mu_run_test(test_init_connection);
mu_run_test(test_worker_init); mu_run_test(test_worker_init);
mu_run_test(test_version); mu_run_test(test_version);
mu_run_test(test_send); mu_run_test(test_send);
......
...@@ -44,7 +44,7 @@ ...@@ -44,7 +44,7 @@
#define NCON 10 #define NCON 10
#define MIN_TABLE_SCALE (MIN_MAXOPEN_SCALE + MIN_MAXDONE_SCALE) #define TABLE_SIZE ((1 << DEF_MAXOPEN_SCALE) + DEF_MAXDONE)
int run; int run;
...@@ -65,7 +65,7 @@ typedef struct { ...@@ -65,7 +65,7 @@ typedef struct {
int tests_run = 0; int tests_run = 0;
static char errmsg[BUFSIZ]; static char errmsg[BUFSIZ];
static unsigned xids[1 << MIN_TABLE_SCALE]; static unsigned xids[TABLE_SIZE];
static prod_con_data_t proddata; static prod_con_data_t proddata;
static prod_con_data_t condata[NCON]; static prod_con_data_t condata[NCON];
...@@ -82,7 +82,7 @@ static void ...@@ -82,7 +82,7 @@ static void
srand48(time(NULL)); srand48(time(NULL));
unsigned xid = (unsigned int) lrand48(); unsigned xid = (unsigned int) lrand48();
for (int i = 0; i < (1 << MIN_MAXOPEN_SCALE); i++) { for (int i = 0; i < (1 << DEF_MAXOPEN_SCALE); i++) {
xids[i] = xid; xids[i] = xid;
debug_print("Producer: enqueue %d (xid = %u)\n", ++enqs, xid); debug_print("Producer: enqueue %d (xid = %u)\n", ++enqs, xid);
if (!SPMCQ_Enq(&xids[i])) { if (!SPMCQ_Enq(&xids[i])) {
...@@ -173,8 +173,8 @@ static char ...@@ -173,8 +173,8 @@ static char
return(errmsg); return(errmsg);
} }
config.maxopen_scale = MIN_MAXOPEN_SCALE; config.maxopen_scale = DEF_MAXOPEN_SCALE;
config.maxdone_scale = MIN_MAXDONE_SCALE; config.maxdone = DEF_MAXDONE;
err = SPMCQ_Init(); err = SPMCQ_Init();
sprintf(errmsg, "SPMCQ_Init: %s", strerror(err)); sprintf(errmsg, "SPMCQ_Init: %s", strerror(err));
mu_assert(errmsg, err == 0); mu_assert(errmsg, err == 0);
......
...@@ -48,6 +48,9 @@ ...@@ -48,6 +48,9 @@
#define NWORKERS 5 #define NWORKERS 5
#define URI1 "tcp://localhost:61616?wireFormat.maxInactivityDuration=0"
#define URI2 "tcp://localhost:61616?connection.sendTimeout=1000&wireFormat.maxInactivityDuration=0"
int tests_run = 0; int tests_run = 0;
static char errmsg[BUFSIZ]; static char errmsg[BUFSIZ];
...@@ -61,26 +64,30 @@ static char ...@@ -61,26 +64,30 @@ static char
printf("... testing worker initialization\n"); printf("... testing worker initialization\n");
config.maxopen_scale = 10; config.maxopen_scale = 10;
config.maxdone_scale = 10; config.maxdone = 1024;
config.maxdata = 1024;
config.nworkers = NWORKERS; config.nworkers = NWORKERS;
strcpy(config.mq_qname, "lhoste/tracking/test"); strcpy(config.mq_qname, "lhoste/tracking/test");
config.mq_pool_size = 2;
config.n_mq_uris = 2; config.n_mq_uris = 2;
config.mq_uri = (char **) malloc(2 * sizeof(char**)); config.mq_uri = (char **) malloc(2 * sizeof(char**));
AN(config.mq_uri); AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1); config.mq_uri[0] = (char *) malloc(strlen(URI1) + 1);
AN(config.mq_uri[0]); AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616"); strcpy(config.mq_uri[0], URI1);
config.mq_uri[1] = (char *) config.mq_uri[1] = (char *) malloc(strlen(URI2) + 1);
malloc(strlen("tcp://localhost:61616?connection.sendTimeout=1000") + 1);
AN(config.mq_uri[1]); AN(config.mq_uri[1]);
strcpy(config.mq_uri[1], strcpy(config.mq_uri[1], URI2);
"tcp://localhost:61616?connection.sendTimeout=1000");
error = MQ_GlobalInit(); error = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit failed: %s", error); sprintf(errmsg, "MQ_GlobalInit failed: %s", error);
mu_assert(errmsg, error == NULL); mu_assert(errmsg, error == NULL);
error = MQ_InitConnections();
sprintf(errmsg, "MQ_InitConnections failed: %s", error);
mu_assert(errmsg, error == NULL);
err = WRK_Init(); err = WRK_Init();
sprintf(errmsg, "WRK_Init: %s", strerror(err)); sprintf(errmsg, "WRK_Init: %s", strerror(err));
mu_assert(errmsg, err == 0); mu_assert(errmsg, err == 0);
......
...@@ -151,7 +151,8 @@ int spmcq_datawaiter; ...@@ -151,7 +151,8 @@ int spmcq_datawaiter;
/* mq.c */ /* mq.c */
const char *MQ_GlobalInit(void); const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv, char *uri); const char *MQ_InitConnections(void);
const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len); const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version); const char *MQ_Version(void *priv, char *version);
const char *MQ_WorkerShutdown(void **priv); const char *MQ_WorkerShutdown(void **priv);
...@@ -261,23 +262,25 @@ struct config { ...@@ -261,23 +262,25 @@ struct config {
/* scale: unit is log(2,n), iow scale is taken to the power of 2 */ /* scale: unit is log(2,n), iow scale is taken to the power of 2 */
unsigned maxopen_scale; /* max number of records in *_OPEN state */ unsigned maxopen_scale; /* max number of records in *_OPEN state */
#define MIN_MAXOPEN_SCALE 10 #define DEF_MAXOPEN_SCALE 10
unsigned maxdone_scale; /* max number of records in *_DONE state */
#define MIN_MAXDONE_SCALE 10 unsigned maxdone; /* max number of records in *_DONE state */
unsigned maxdata_scale; /* scale for char data buffer */ #define DEF_MAXDONE 1024
#define MIN_MAXDATA_SCALE 10
unsigned maxdata; /* size of char data buffer */
#define DEF_MAXDATA 1024
/* /*
* scale of queue-length goal: * queue-length goal:
* *
* we scale te number of running workers dynamically propotionally to the * we scale the number of running workers dynamically propotionally to
* queue length. * the queue length.
* *
* this scale (log(2,n)) specifies the queue length at which all workers * this specifies the queue length at which all workers should be
* should be running * running
*/ */
unsigned qlen_goal_scale; unsigned qlen_goal;
#define DEF_QLEN_GOAL_SCALE 10 #define DEF_QLEN_GOAL 1024
/* max number of probes for insert/lookup */ /* max number of probes for insert/lookup */
unsigned hash_max_probes; unsigned hash_max_probes;
...@@ -289,25 +292,27 @@ struct config { ...@@ -289,25 +292,27 @@ struct config {
* entries which are older than this ttl _may_ get expired from the * entries which are older than this ttl _may_ get expired from the
* trackrdrd state. * trackrdrd state.
* *
* set to a value significantly longer than your maximum session lifetime in * set to a value significantly longer than your maximum session
* varnish. * lifetime in varnish.
*/ */
unsigned hash_ttl; unsigned hash_ttl;
#define DEF_HASH_TTL 120 #define DEF_HASH_TTL 120
/* /*
* hash_mlt: min lifetime for entries in HASH_OPEN before they could get evacuated * hash_mlt: min lifetime for entries in HASH_OPEN before they could
* get evacuated
* *
* entries are guaranteed to remain in trackrdrd for this duration. * entries are guaranteed to remain in trackrdrd for this duration.
* once the mlt is reached, they _may_ get expired if trackrdrd needs space * once the mlt is reached, they _may_ get expired if trackrdrd needs
* in the hash * space in the hash
*/ */
unsigned hash_mlt; unsigned hash_mlt;
#define DEF_HASH_MTL 5 #define DEF_HASH_MLT 5
unsigned n_mq_uris; unsigned n_mq_uris;
char **mq_uri; char **mq_uri;
char mq_qname[BUFSIZ]; char mq_qname[BUFSIZ];
unsigned mq_pool_size;
unsigned nworkers; unsigned nworkers;
unsigned restarts; unsigned restarts;
char user_name[BUFSIZ]; char user_name[BUFSIZ];
......
...@@ -137,15 +137,14 @@ static void ...@@ -137,15 +137,14 @@ static void
void *amq_worker; void *amq_worker;
dataentry *entry; dataentry *entry;
const char *err; const char *err;
char version[VERSION_LEN], *uri; char version[VERSION_LEN];
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id); LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING; wrk->state = WRK_INITIALIZING;
wrk->tid = pthread_self(); wrk->tid = pthread_self();
uri = config.mq_uri[wrk->id % config.n_mq_uris];
err = MQ_WorkerInit(&amq_worker, uri); err = MQ_WorkerInit(&amq_worker);
if (err != NULL) { if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s", LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err); wrk->id, err);
...@@ -168,7 +167,7 @@ static void ...@@ -168,7 +167,7 @@ static void
running++; running++;
AZ(pthread_mutex_unlock(&running_lock)); AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: running (%s, uri=%s)", wrk->id, version, uri); LOG_Log(LOG_INFO, "Worker %d: running (%s)", wrk->id, version);
while (run) { while (run) {
entry = (dataentry *) SPMCQ_Deq(); entry = (dataentry *) SPMCQ_Deq();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment