Commit 301c1ba6 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: allow configuration of multiple MQ URIs

parent eebb874d
...@@ -57,26 +57,30 @@ using namespace cms; ...@@ -57,26 +57,30 @@ using namespace cms;
using namespace decaf::lang; using namespace decaf::lang;
using namespace decaf::lang::exceptions; using namespace decaf::lang::exceptions;
ActiveMQConnectionFactory* AMQ_Worker::factory = NULL; std::vector<ActiveMQConnectionFactory*> AMQ_Worker::factories;
void
AMQ_Worker::initConnectionFactory(const std::string& brokerURI) {
factory = new ActiveMQConnectionFactory(brokerURI);
}
void void
AMQ_Worker::shutdown() { AMQ_Worker::shutdown() {
delete factory; for (unsigned i = 0; i < factories.size(); i++)
factory = NULL; delete factories[i];
factories.resize(0);
} }
AMQ_Worker::AMQ_Worker(std::string& qName, AMQ_Worker::AMQ_Worker(std::string& brokerURI, std::string& qName,
Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE, Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE,
int deliveryMode = DeliveryMode::NON_PERSISTENT) { int deliveryMode = DeliveryMode::NON_PERSISTENT) {
if (factory == NULL) ActiveMQConnectionFactory* factory = NULL;
throw cms::IllegalStateException("Connection factory not initialized"); for (unsigned i = 0; i < factories.size(); i++)
if (factories[i]->getBrokerURI().toString().compare(brokerURI) == 0) {
factory = factories[i];
break;
}
if (factory == NULL) {
factory = new ActiveMQConnectionFactory(brokerURI);
factories.push_back(factory);
}
connection = factory->createConnection(); connection = factory->createConnection();
connection->start(); connection->start();
session = connection->createSession(ackMode); session = connection->createSession(ackMode);
...@@ -100,15 +104,12 @@ AMQ_Worker::~AMQ_Worker() { ...@@ -100,15 +104,12 @@ AMQ_Worker::~AMQ_Worker() {
session = NULL; session = NULL;
} }
if (connection != NULL) { if (connection != NULL) {
#if 0
connection->close(); connection->close();
#endif
delete connection; delete connection;
connection = NULL; connection = NULL;
} }
} }
/* XXX: Timeout */
void void
AMQ_Worker::send(std::string& text) { AMQ_Worker::send(std::string& text) {
if (msg == NULL || producer == NULL) if (msg == NULL || producer == NULL)
...@@ -126,23 +127,22 @@ AMQ_Worker::getVersion() { ...@@ -126,23 +127,22 @@ AMQ_Worker::getVersion() {
} }
const char * const char *
AMQ_GlobalInit(char *uri) AMQ_GlobalInit(void)
{ {
activemq::library::ActiveMQCPP::initializeLibrary();
try { try {
string brokerURI (uri); activemq::library::ActiveMQCPP::initializeLibrary();
AMQ_Worker::initConnectionFactory(brokerURI);
return NULL; return NULL;
} }
CATCHALL CATCHALL
} }
const char * const char *
AMQ_WorkerInit(AMQ_Worker **worker, char *qName) AMQ_WorkerInit(AMQ_Worker **worker, char *uri, char *qName)
{ {
try { try {
string brokerURI (uri);
string queueName (qName); string queueName (qName);
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(queueName)); std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(brokerURI, queueName));
*worker = w.release(); *worker = w.release();
return NULL; return NULL;
} }
......
...@@ -34,6 +34,8 @@ ...@@ -34,6 +34,8 @@
#ifdef __cplusplus #ifdef __cplusplus
#include <vector>
#include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h> #include <cms/Connection.h>
#include <cms/Session.h> #include <cms/Session.h>
...@@ -45,7 +47,7 @@ using namespace cms; ...@@ -45,7 +47,7 @@ using namespace cms;
class AMQ_Worker { class AMQ_Worker {
private: private:
static ActiveMQConnectionFactory* factory; static std::vector<ActiveMQConnectionFactory*> factories;
Connection* connection; Connection* connection;
Session* session; Session* session;
Queue* queue; Queue* queue;
...@@ -54,11 +56,10 @@ private: ...@@ -54,11 +56,10 @@ private:
AMQ_Worker() {}; AMQ_Worker() {};
public: public:
static void initConnectionFactory(const std::string& brokerURI);
static void shutdown(); static void shutdown();
AMQ_Worker(std::string& qName, Session::AcknowledgeMode ackMode, AMQ_Worker(std::string& brokerURI, std::string& qName,
int deliveryMode); Session::AcknowledgeMode ackMode, int deliveryMode);
virtual ~AMQ_Worker(); virtual ~AMQ_Worker();
void send(std::string& text); void send(std::string& text);
std::string getVersion(); std::string getVersion();
...@@ -71,8 +72,8 @@ typedef struct AMQ_Worker AMQ_Worker; ...@@ -71,8 +72,8 @@ typedef struct AMQ_Worker AMQ_Worker;
extern "C" { extern "C" {
#endif #endif
const char *AMQ_GlobalInit(char *uri); const char *AMQ_GlobalInit(void);
const char *AMQ_WorkerInit(AMQ_Worker **worker, char *qName); const char *AMQ_WorkerInit(AMQ_Worker **worker, char *uri, char *qName);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len); const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_Version(AMQ_Worker *worker, char *version); const char *AMQ_Version(AMQ_Worker *worker, char *version);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker); const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
......
...@@ -124,7 +124,6 @@ CONF_Add(const char *lval, const char *rval) ...@@ -124,7 +124,6 @@ CONF_Add(const char *lval, const char *rval)
confString("log.file", log_file); confString("log.file", log_file);
confString("varnish.bindump", varnish_bindump); confString("varnish.bindump", varnish_bindump);
confString("processor.log", processor_log); confString("processor.log", processor_log);
confString("mq.uri", mq_uri);
confString("mq.qname", mq_qname); confString("mq.qname", mq_qname);
confUnsignedMinVal("maxopen.scale", maxopen_scale, MIN_MAXOPEN_SCALE); confUnsignedMinVal("maxopen.scale", maxopen_scale, MIN_MAXOPEN_SCALE);
...@@ -178,6 +177,19 @@ CONF_Add(const char *lval, const char *rval) ...@@ -178,6 +177,19 @@ CONF_Add(const char *lval, const char *rval)
return(EINVAL); return(EINVAL);
} }
if (strcmp(lval, "mq.uri") == 0) {
int n = config.n_mq_uris++;
config.mq_uri = (char **) realloc(config.mq_uri,
config.n_mq_uris * sizeof(char **));
if (config.mq_uri == NULL)
return(errno);
config.mq_uri[n] = (char *) malloc(strlen(rval) + 1);
if (config.mq_uri[n] == NULL)
return(errno);
strcpy(config.mq_uri[n], rval);
return(0);
}
return EINVAL; return EINVAL;
} }
...@@ -226,7 +238,9 @@ CONF_Init(void) ...@@ -226,7 +238,9 @@ CONF_Init(void)
config.hash_ttl = DEF_HASH_TTL; config.hash_ttl = DEF_HASH_TTL;
config.hash_mlt = DEF_HASH_MTL; config.hash_mlt = DEF_HASH_MTL;
config.mq_uri[0] = '\0'; config.n_mq_uris = 0;
config.mq_uri = (char **) malloc (sizeof(char **));
AN(config.mq_uri);
config.mq_qname[0] = '\0'; config.mq_qname[0] = '\0';
config.nworkers = 1; config.nworkers = 1;
config.restarts = 1; config.restarts = 1;
...@@ -326,8 +340,12 @@ CONF_Dump(void) ...@@ -326,8 +340,12 @@ CONF_Dump(void)
confdump("hash_ttl = %u", config.hash_ttl); confdump("hash_ttl = %u", config.hash_ttl);
confdump("hash_mlt = %u", config.hash_mlt); confdump("hash_mlt = %u", config.hash_mlt);
if (config.n_mq_uris > 0)
confdump("mq.uri = %s", config.mq_uri); for (int i = 0; i < config.n_mq_uris; i++)
confdump("mq.uri = %s", config.mq_uri[i]);
else
LOG_Log0(LOG_DEBUG, "config: mq.uri = ");
confdump("mq.qname = %s", config.mq_qname); confdump("mq.qname = %s", config.mq_qname);
confdump("nworkers = %u", config.nworkers); confdump("nworkers = %u", config.nworkers);
confdump("restarts = %u", config.restarts); confdump("restarts = %u", config.restarts);
......
...@@ -35,13 +35,13 @@ ...@@ -35,13 +35,13 @@
const char * const char *
MQ_GlobalInit(void) MQ_GlobalInit(void)
{ {
return AMQ_GlobalInit(config.mq_uri); return AMQ_GlobalInit();
} }
const char * const char *
MQ_WorkerInit(void **priv) MQ_WorkerInit(void **priv, char *uri)
{ {
return AMQ_WorkerInit((AMQ_Worker **) priv, config.mq_qname); return AMQ_WorkerInit((AMQ_Worker **) priv, uri, config.mq_qname);
} }
const char * const char *
......
...@@ -50,7 +50,6 @@ static char ...@@ -50,7 +50,6 @@ static char
printf("... testing MQ global initialization\n"); printf("... testing MQ global initialization\n");
strcpy(config.mq_uri, "tcp://localhost:61616");
err = MQ_GlobalInit(); err = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit: %s", err); sprintf(errmsg, "MQ_GlobalInit: %s", err);
mu_assert(errmsg, err == NULL); mu_assert(errmsg, err == NULL);
...@@ -62,11 +61,13 @@ static const char ...@@ -62,11 +61,13 @@ static const char
*test_worker_init(void) *test_worker_init(void)
{ {
const char *err; const char *err;
char uri[sizeof("tcp://localhost:61616")];
printf("... test worker init (including connect to ActiveMQ)\n"); printf("... test worker init (including connect to ActiveMQ)\n");
strcpy(uri, "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test"); strcpy(config.mq_qname, "lhoste/tracking/test");
err = MQ_WorkerInit(&worker); err = MQ_WorkerInit(&worker, uri);
if (err != NULL && strstr(err, "Connection refused") != NULL) { if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n"); printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED); exit(EXIT_SKIPPED);
...@@ -118,7 +119,7 @@ static const char ...@@ -118,7 +119,7 @@ static const char
printf("... testing worker shutdown\n"); printf("... testing worker shutdown\n");
mu_assert("MQ_WorkerShhutdown: worker is NULL before call", worker != NULL); mu_assert("MQ_WorkerShutdown: worker is NULL before call", worker != NULL);
err = MQ_WorkerShutdown(&worker); err = MQ_WorkerShutdown(&worker);
sprintf(errmsg, "MQ_WorkerShutdown: %s", err); sprintf(errmsg, "MQ_WorkerShutdown: %s", err);
mu_assert(errmsg, err == NULL); mu_assert(errmsg, err == NULL);
......
...@@ -63,8 +63,19 @@ static char ...@@ -63,8 +63,19 @@ static char
config.maxopen_scale = 10; config.maxopen_scale = 10;
config.maxdone_scale = 10; config.maxdone_scale = 10;
config.nworkers = NWORKERS; config.nworkers = NWORKERS;
strcpy(config.mq_uri, "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test"); strcpy(config.mq_qname, "lhoste/tracking/test");
config.n_mq_uris = 2;
config.mq_uri = (char **) malloc(2 * sizeof(char**));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616");
config.mq_uri[1] = (char *)
malloc(strlen("tcp://localhost:61616?connection.sendTimeout=1000") + 1);
AN(config.mq_uri[1]);
strcpy(config.mq_uri[1],
"tcp://localhost:61616?connection.sendTimeout=1000");
error = MQ_GlobalInit(); error = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit failed: %s", error); sprintf(errmsg, "MQ_GlobalInit failed: %s", error);
......
...@@ -151,7 +151,7 @@ int spmcq_datawaiter; ...@@ -151,7 +151,7 @@ int spmcq_datawaiter;
/* mq.c */ /* mq.c */
const char *MQ_GlobalInit(void); const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv); const char *MQ_WorkerInit(void **priv, char *uri);
const char *MQ_Send(void *priv, const char *data, unsigned len); const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version); const char *MQ_Version(void *priv, char *version);
const char *MQ_WorkerShutdown(void **priv); const char *MQ_WorkerShutdown(void **priv);
...@@ -305,7 +305,8 @@ struct config { ...@@ -305,7 +305,8 @@ struct config {
unsigned hash_mlt; unsigned hash_mlt;
#define DEF_HASH_MTL 5 #define DEF_HASH_MTL 5
char mq_uri[BUFSIZ]; unsigned n_mq_uris;
char **mq_uri;
char mq_qname[BUFSIZ]; char mq_qname[BUFSIZ];
unsigned nworkers; unsigned nworkers;
unsigned restarts; unsigned restarts;
......
...@@ -137,14 +137,15 @@ static void ...@@ -137,14 +137,15 @@ static void
void *amq_worker; void *amq_worker;
dataentry *entry; dataentry *entry;
const char *err; const char *err;
char version[VERSION_LEN]; char version[VERSION_LEN], *uri;
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id); LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING; wrk->state = WRK_INITIALIZING;
wrk->tid = pthread_self(); wrk->tid = pthread_self();
uri = config.mq_uri[wrk->id % config.n_mq_uris];
err = MQ_WorkerInit(&amq_worker); err = MQ_WorkerInit(&amq_worker, uri);
if (err != NULL) { if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s", LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err); wrk->id, err);
...@@ -167,7 +168,7 @@ static void ...@@ -167,7 +168,7 @@ static void
running++; running++;
AZ(pthread_mutex_unlock(&running_lock)); AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: running (%s)", wrk->id, version); LOG_Log(LOG_INFO, "Worker %d: running (%s, uri=%s)", wrk->id, version, uri);
while (run) { while (run) {
entry = (dataentry *) SPMCQ_Deq(); entry = (dataentry *) SPMCQ_Deq();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment