Commit 45e72d0c authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: allow configuration of multiple MQ URIs

parent 43054c9e
......@@ -57,26 +57,30 @@ using namespace cms;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
ActiveMQConnectionFactory* AMQ_Worker::factory = NULL;
void
AMQ_Worker::initConnectionFactory(const std::string& brokerURI) {
factory = new ActiveMQConnectionFactory(brokerURI);
}
std::vector<ActiveMQConnectionFactory*> AMQ_Worker::factories;
void
AMQ_Worker::shutdown() {
delete factory;
factory = NULL;
for (unsigned i = 0; i < factories.size(); i++)
delete factories[i];
factories.resize(0);
}
AMQ_Worker::AMQ_Worker(std::string& qName,
AMQ_Worker::AMQ_Worker(std::string& brokerURI, std::string& qName,
Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE,
int deliveryMode = DeliveryMode::NON_PERSISTENT) {
if (factory == NULL)
throw cms::IllegalStateException("Connection factory not initialized");
ActiveMQConnectionFactory* factory = NULL;
for (unsigned i = 0; i < factories.size(); i++)
if (factories[i]->getBrokerURI().toString().compare(brokerURI) == 0) {
factory = factories[i];
break;
}
if (factory == NULL) {
factory = new ActiveMQConnectionFactory(brokerURI);
factories.push_back(factory);
}
connection = factory->createConnection();
connection->start();
session = connection->createSession(ackMode);
......@@ -100,15 +104,12 @@ AMQ_Worker::~AMQ_Worker() {
session = NULL;
}
if (connection != NULL) {
#if 0
connection->close();
#endif
delete connection;
connection = NULL;
}
}
/* XXX: Timeout */
void
AMQ_Worker::send(std::string& text) {
if (msg == NULL || producer == NULL)
......@@ -126,23 +127,22 @@ AMQ_Worker::getVersion() {
}
const char *
AMQ_GlobalInit(char *uri)
AMQ_GlobalInit(void)
{
activemq::library::ActiveMQCPP::initializeLibrary();
try {
string brokerURI (uri);
AMQ_Worker::initConnectionFactory(brokerURI);
activemq::library::ActiveMQCPP::initializeLibrary();
return NULL;
}
CATCHALL
}
const char *
AMQ_WorkerInit(AMQ_Worker **worker, char *qName)
AMQ_WorkerInit(AMQ_Worker **worker, char *uri, char *qName)
{
try {
string brokerURI (uri);
string queueName (qName);
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(queueName));
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(brokerURI, queueName));
*worker = w.release();
return NULL;
}
......
......@@ -34,6 +34,8 @@
#ifdef __cplusplus
#include <vector>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
......@@ -45,7 +47,7 @@ using namespace cms;
class AMQ_Worker {
private:
static ActiveMQConnectionFactory* factory;
static std::vector<ActiveMQConnectionFactory*> factories;
Connection* connection;
Session* session;
Queue* queue;
......@@ -54,11 +56,10 @@ private:
AMQ_Worker() {};
public:
static void initConnectionFactory(const std::string& brokerURI);
static void shutdown();
AMQ_Worker(std::string& qName, Session::AcknowledgeMode ackMode,
int deliveryMode);
AMQ_Worker(std::string& brokerURI, std::string& qName,
Session::AcknowledgeMode ackMode, int deliveryMode);
virtual ~AMQ_Worker();
void send(std::string& text);
std::string getVersion();
......@@ -71,8 +72,8 @@ typedef struct AMQ_Worker AMQ_Worker;
extern "C" {
#endif
const char *AMQ_GlobalInit(char *uri);
const char *AMQ_WorkerInit(AMQ_Worker **worker, char *qName);
const char *AMQ_GlobalInit(void);
const char *AMQ_WorkerInit(AMQ_Worker **worker, char *uri, char *qName);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_Version(AMQ_Worker *worker, char *version);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
......
......@@ -124,7 +124,6 @@ CONF_Add(const char *lval, const char *rval)
confString("log.file", log_file);
confString("varnish.bindump", varnish_bindump);
confString("processor.log", processor_log);
confString("mq.uri", mq_uri);
confString("mq.qname", mq_qname);
confUnsignedMinVal("maxopen.scale", maxopen_scale, MIN_MAXOPEN_SCALE);
......@@ -178,6 +177,19 @@ CONF_Add(const char *lval, const char *rval)
return(EINVAL);
}
if (strcmp(lval, "mq.uri") == 0) {
int n = config.n_mq_uris++;
config.mq_uri = (char **) realloc(config.mq_uri,
config.n_mq_uris * sizeof(char **));
if (config.mq_uri == NULL)
return(errno);
config.mq_uri[n] = (char *) malloc(strlen(rval) + 1);
if (config.mq_uri[n] == NULL)
return(errno);
strcpy(config.mq_uri[n], rval);
return(0);
}
return EINVAL;
}
......@@ -226,7 +238,9 @@ CONF_Init(void)
config.hash_ttl = DEF_HASH_TTL;
config.hash_mlt = DEF_HASH_MTL;
config.mq_uri[0] = '\0';
config.n_mq_uris = 0;
config.mq_uri = (char **) malloc (sizeof(char **));
AN(config.mq_uri);
config.mq_qname[0] = '\0';
config.nworkers = 1;
config.restarts = 1;
......@@ -326,8 +340,12 @@ CONF_Dump(void)
confdump("hash_ttl = %u", config.hash_ttl);
confdump("hash_mlt = %u", config.hash_mlt);
confdump("mq.uri = %s", config.mq_uri);
if (config.n_mq_uris > 0)
for (int i = 0; i < config.n_mq_uris; i++)
confdump("mq.uri = %s", config.mq_uri[i]);
else
LOG_Log0(LOG_DEBUG, "config: mq.uri = ");
confdump("mq.qname = %s", config.mq_qname);
confdump("nworkers = %u", config.nworkers);
confdump("restarts = %u", config.restarts);
......
......@@ -35,13 +35,13 @@
const char *
MQ_GlobalInit(void)
{
return AMQ_GlobalInit(config.mq_uri);
return AMQ_GlobalInit();
}
const char *
MQ_WorkerInit(void **priv)
MQ_WorkerInit(void **priv, char *uri)
{
return AMQ_WorkerInit((AMQ_Worker **) priv, config.mq_qname);
return AMQ_WorkerInit((AMQ_Worker **) priv, uri, config.mq_qname);
}
const char *
......
......@@ -50,7 +50,6 @@ static char
printf("... testing MQ global initialization\n");
strcpy(config.mq_uri, "tcp://localhost:61616");
err = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit: %s", err);
mu_assert(errmsg, err == NULL);
......@@ -62,11 +61,13 @@ static const char
*test_worker_init(void)
{
const char *err;
char uri[sizeof("tcp://localhost:61616")];
printf("... test worker init (including connect to ActiveMQ)\n");
strcpy(uri, "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
err = MQ_WorkerInit(&worker);
err = MQ_WorkerInit(&worker, uri);
if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED);
......@@ -118,7 +119,7 @@ static const char
printf("... testing worker shutdown\n");
mu_assert("MQ_WorkerShhutdown: worker is NULL before call", worker != NULL);
mu_assert("MQ_WorkerShutdown: worker is NULL before call", worker != NULL);
err = MQ_WorkerShutdown(&worker);
sprintf(errmsg, "MQ_WorkerShutdown: %s", err);
mu_assert(errmsg, err == NULL);
......
......@@ -63,8 +63,19 @@ static char
config.maxopen_scale = 10;
config.maxdone_scale = 10;
config.nworkers = NWORKERS;
strcpy(config.mq_uri, "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
config.n_mq_uris = 2;
config.mq_uri = (char **) malloc(2 * sizeof(char**));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
AN(config.mq_uri[0]);
strcpy(config.mq_uri[0], "tcp://localhost:61616");
config.mq_uri[1] = (char *)
malloc(strlen("tcp://localhost:61616?connection.sendTimeout=1000") + 1);
AN(config.mq_uri[1]);
strcpy(config.mq_uri[1],
"tcp://localhost:61616?connection.sendTimeout=1000");
error = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit failed: %s", error);
......
......@@ -151,7 +151,7 @@ int spmcq_datawaiter;
/* mq.c */
const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv);
const char *MQ_WorkerInit(void **priv, char *uri);
const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version);
const char *MQ_WorkerShutdown(void **priv);
......@@ -305,7 +305,8 @@ struct config {
unsigned hash_mlt;
#define DEF_HASH_MTL 5
char mq_uri[BUFSIZ];
unsigned n_mq_uris;
char **mq_uri;
char mq_qname[BUFSIZ];
unsigned nworkers;
unsigned restarts;
......
......@@ -137,14 +137,15 @@ static void
void *amq_worker;
dataentry *entry;
const char *err;
char version[VERSION_LEN];
char version[VERSION_LEN], *uri;
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING;
wrk->tid = pthread_self();
uri = config.mq_uri[wrk->id % config.n_mq_uris];
err = MQ_WorkerInit(&amq_worker);
err = MQ_WorkerInit(&amq_worker, uri);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err);
......@@ -167,7 +168,7 @@ static void
running++;
AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: running (%s)", wrk->id, version);
LOG_Log(LOG_INFO, "Worker %d: running (%s, uri=%s)", wrk->id, version, uri);
while (run) {
entry = (dataentry *) SPMCQ_Deq();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment