Commit ae1946e3 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd ActiveMQ MQ implementation: bugfix MQ_Reconnect by managing

worker and connection objects correctly
parent 3da6fdab
......@@ -58,11 +58,12 @@ using namespace cms;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
AMQ_Worker::AMQ_Worker(Connection* cn, std::string& qName,
AMQ_Worker::AMQ_Worker(Connection* cn, std::string& qName, int n,
Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE,
int deliveryMode = DeliveryMode::NON_PERSISTENT) {
connection = cn;
num = n;
session = connection->createSession(ackMode);
queue = session->createQueue(qName);
producer = session->createProducer(queue);
......@@ -86,6 +87,11 @@ AMQ_Worker::~AMQ_Worker() {
}
}
int
AMQ_Worker::getNum() {
return num;
}
void
AMQ_Worker::send(std::string& text) {
if (msg == NULL || producer == NULL)
......@@ -120,12 +126,12 @@ AMQ_GlobalInit(void)
}
const char *
AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *cn, char *qName)
AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *cn, char *qName, int n)
{
try {
Connection *conn = cn->getConnection();
string queueName (qName);
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(conn, queueName));
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(conn, queueName, n));
*worker = w.release();
return NULL;
}
......@@ -167,6 +173,16 @@ AMQ_ClientID(AMQ_Worker *worker, char *clientID)
CATCHALL
}
const char *
AMQ_GetNum(AMQ_Worker *worker, int *n)
{
try {
*n = worker->getNum();
return NULL;
}
CATCHALL
}
const char *
AMQ_WorkerShutdown(AMQ_Worker **worker)
{
......
......@@ -52,13 +52,15 @@ private:
Queue* queue;
MessageProducer* producer;
TextMessage* msg;
int num;
AMQ_Worker() {};
public:
static void shutdown();
AMQ_Worker(Connection* connection, std::string& qName,
AMQ_Worker(Connection* connection, std::string& qName, int n,
Session::AcknowledgeMode ackMode, int deliveryMode);
int getNum();
virtual ~AMQ_Worker();
void send(std::string& text);
std::string getVersion();
......@@ -74,10 +76,11 @@ extern "C" {
const char *AMQ_GlobalInit(void);
const char *AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *connection,
char *qName);
char *qName, int num);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_Version(AMQ_Worker *worker, char *version);
const char *AMQ_ClientID(AMQ_Worker *worker, char *clientID);
const char *AMQ_GetNum(AMQ_Worker *worker, int *num);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
const char *AMQ_GlobalShutdown(void);
......
......@@ -124,14 +124,18 @@ MQ_WorkerInit(void **priv)
ret = pthread_mutex_unlock(&connection_lock);
assert(ret == 0);
AMQ_Connection *conn = connections[i];
if (conn == NULL)
if (conn == NULL) {
err = AMQ_ConnectionInit(&conn, uri[i % n_uris]);
if (err != NULL)
return err;
connections[i] = conn;
err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname);
if (err != NULL)
return err;
else
connections[i] = conn;
}
err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname, i);
if (err == NULL)
workers[i] = (AMQ_Worker *) *priv;
else
workers[i] = NULL;
return err;
}
......@@ -148,23 +152,32 @@ MQ_Reconnect(void **priv)
AMQ_Connection *conn;
int wrk_num;
err = AMQ_GetNum((AMQ_Worker *) priv, &wrk_num);
if (err != NULL)
return err;
assert(wrk_num >= 0 && wrk_num < nwrk);
err = AMQ_WorkerShutdown((AMQ_Worker **) priv);
if (err != NULL)
return err;
for (int i = 0; i < nwrk; i++)
if (workers[i] == (AMQ_Worker *) *priv) {
wrk_num = i;
break;
}
err = AMQ_ConnectionInit(&conn,
uri[connection++ % n_uris]);
if (err != NULL) {
else
workers[wrk_num] = NULL;
if (connections[wrk_num] != NULL) {
err = AMQ_ConnectionShutdown(connections[wrk_num]);
if (err != NULL)
return err;
connections[wrk_num] = NULL;
return err;
}
err = AMQ_ConnectionInit(&conn, uri[connection++ % n_uris]);
if (err != NULL)
return err;
else
connections[wrk_num] = conn;
return AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname);
err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname, wrk_num);
if (err != NULL)
workers[wrk_num] = NULL;
else
workers[wrk_num] = (AMQ_Worker *) *priv;
return err;
}
const char *
......@@ -182,23 +195,41 @@ MQ_ClientID(void *priv, char *clientID)
const char *
MQ_WorkerShutdown(void **priv)
{
const char *err = AMQ_WorkerShutdown((AMQ_Worker **) priv);
const char *err;
int wrk_num;
err = AMQ_GetNum((AMQ_Worker *) *priv, &wrk_num);
if (err != NULL)
return err;
if (connections[wrk_num] != NULL) {
err = AMQ_ConnectionShutdown(connections[wrk_num]);
if (err != NULL)
return err;
connections[wrk_num] = NULL;
}
AMQ_WorkerShutdown((AMQ_Worker **) priv);
if (err != NULL)
return err;
*priv = NULL;
workers[wrk_num] = NULL;
return NULL;
}
const char *
MQ_GlobalShutdown(void)
{
if (n_uris > 0) {
const char *err;
for (int i = 0; i < n_uris; i++)
if (connections[i] != NULL
&& (err = AMQ_ConnectionShutdown(connections[i])) != NULL)
return err;
free(connections);
}
const char *err;
if (n_uris > 0)
free(uri);
for (int i = 0; i < nwrk; i++)
if (connections[i] != NULL
&& (err = AMQ_ConnectionShutdown(connections[i])) != NULL)
return err;
free(connections);
free(workers);
return AMQ_GlobalShutdown();
}
......@@ -135,6 +135,23 @@ static const char
return NULL;
}
static const char
*test_reconnect(void)
{
const char *err;
printf("... testing ActiveMQ reconnect\n");
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL before call");
err = MQ_Reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
err = MQ_Send(worker, "send after reconnect", 20);
VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err);
return NULL;
}
static const char
*test_worker_shutdown(void)
{
......@@ -176,6 +193,7 @@ static const char
mu_run_test(test_version);
mu_run_test(test_clientID);
mu_run_test(test_send);
mu_run_test(test_reconnect);
mu_run_test(test_worker_shutdown);
mu_run_test(test_global_shutdown);
return NULL;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment