Commit f5fa81b2 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: added MQ_Version/AMQ_Worker::getVersion

parent 594bc60a
......@@ -29,10 +29,13 @@
*
*/
#include <string.h>
#include "amq.h"
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <cms/IllegalStateException.h>
#include <cms/ConnectionMetaData.h>
#define CATCHALL \
catch (CMSException& cex) { \
......@@ -114,6 +117,14 @@ AMQ_Worker::send(std::string& text) {
producer->send(msg);
}
std::string
AMQ_Worker::getVersion() {
if (connection == NULL)
throw cms::IllegalStateException("Connection uninitialized");
const ConnectionMetaData *md = connection->getMetaData();
return md->getCMSProviderName() + " " + md->getProviderVersion();
}
const char *
AMQ_GlobalInit(char *uri)
{
......@@ -153,6 +164,16 @@ AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len)
CATCHALL
}
const char *
AMQ_Version(AMQ_Worker *worker, char *version)
{
try {
strcpy(version, worker->getVersion().c_str());
return NULL;
}
CATCHALL
}
const char *
AMQ_WorkerShutdown(AMQ_Worker **worker)
{
......
......@@ -61,6 +61,7 @@ public:
int deliveryMode);
virtual ~AMQ_Worker();
void send(std::string& text);
std::string getVersion();
};
#else
typedef struct AMQ_Worker AMQ_Worker;
......@@ -73,6 +74,7 @@ extern "C" {
const char *AMQ_GlobalInit(char *uri);
const char *AMQ_WorkerInit(AMQ_Worker **worker, char *qName);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_Version(AMQ_Worker *worker, char *version);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
const char *AMQ_GlobalShutdown(void);
......
......@@ -50,6 +50,12 @@ MQ_Send(void *priv, const char *data, unsigned len)
return AMQ_Send((AMQ_Worker *) priv, data, len);
}
const char *
MQ_Version(void *priv, char *version)
{
return AMQ_Version((AMQ_Worker *) priv, version);
}
const char *
MQ_WorkerShutdown(void **priv)
{
......
......@@ -79,6 +79,23 @@ static const char
return NULL;
}
static const char
*test_version(void)
{
const char *err;
char version[BUFSIZ];
printf("... testing version info\n");
mu_assert("MQ_Version: worker is NULL before call", worker != NULL);
err = MQ_Version(worker, version);
sprintf(errmsg, "MQ_Version: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("MQ_Version: version is empty", version[0] != '\0');
return NULL;
}
static const char
*test_send(void)
{
......@@ -132,6 +149,7 @@ static const char
{
mu_run_test(test_global_init);
mu_run_test(test_worker_init);
mu_run_test(test_version);
mu_run_test(test_send);
mu_run_test(test_worker_shutdown);
mu_run_test(test_global_shutdown);
......
......@@ -30,6 +30,7 @@
*/
#include <string.h>
#include <stdbool.h>
#include "minunit.h"
......@@ -80,7 +81,7 @@ static char
return NULL;
}
static char
static const char
*test_worker_run(void)
{
dataentry *entry;
......@@ -91,6 +92,15 @@ static char
unsigned xid = (unsigned int) lrand48();
WRK_Start();
int wrk_running, wrk_wait = 0;
while ((wrk_running = WRK_Running()) < NWORKERS) {
if (wrk_wait++ > 10)
break;
TIM_sleep(1);
}
sprintf(errmsg, "%d of %d worker threads running", wrk_running, NWORKERS);
mu_assert(errmsg, wrk_running == NWORKERS);
for (int i = 0; i < 1024; i++) {
entry = &dtbl.entry[i];
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
......@@ -98,7 +108,7 @@ static char
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data);
entry->state = DATA_DONE;
assert(SPMCQ_Enq(entry));
mu_assert("SPMCQ full", SPMCQ_Enq(entry) == true);
}
WRK_Halt();
......
......@@ -153,6 +153,7 @@ int spmcq_datawaiter;
const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version);
const char *MQ_WorkerShutdown(void **priv);
const char *MQ_GlobalShutdown(void);
......
......@@ -39,6 +39,8 @@
#include "vas.h"
#include "miniobj.h"
#define VERSION_LEN 64
static int running = 0;
typedef enum {
......@@ -135,6 +137,7 @@ static void
void *amq_worker;
dataentry *entry;
const char *err;
char version[VERSION_LEN];
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
......@@ -150,6 +153,12 @@ static void
pthread_exit((void *) wrk);
}
err = MQ_Version(amq_worker, version);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ version", wrk->id, err);
version[0] = '\0';
}
VSTAILQ_INIT(&wrk->wrk_freelist);
wrk->wrk_nfree = 0;
......@@ -157,6 +166,8 @@ static void
AZ(pthread_mutex_lock(&running_lock));
running++;
AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: running (%s)", wrk->id, version);
while (run) {
entry = (dataentry *) SPMCQ_Deq();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment