Commit 1a6a3aa3 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: added MQ_Version/AMQ_Worker::getVersion

parent d0fd842e
...@@ -29,10 +29,13 @@ ...@@ -29,10 +29,13 @@
* *
*/ */
#include <string.h>
#include "amq.h" #include "amq.h"
#include <activemq/library/ActiveMQCPP.h> #include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/exceptions/NullPointerException.h> #include <decaf/lang/exceptions/NullPointerException.h>
#include <cms/IllegalStateException.h> #include <cms/IllegalStateException.h>
#include <cms/ConnectionMetaData.h>
#define CATCHALL \ #define CATCHALL \
catch (CMSException& cex) { \ catch (CMSException& cex) { \
...@@ -114,6 +117,14 @@ AMQ_Worker::send(std::string& text) { ...@@ -114,6 +117,14 @@ AMQ_Worker::send(std::string& text) {
producer->send(msg); producer->send(msg);
} }
std::string
AMQ_Worker::getVersion() {
if (connection == NULL)
throw cms::IllegalStateException("Connection uninitialized");
const ConnectionMetaData *md = connection->getMetaData();
return md->getCMSProviderName() + " " + md->getProviderVersion();
}
const char * const char *
AMQ_GlobalInit(char *uri) AMQ_GlobalInit(char *uri)
{ {
...@@ -153,6 +164,16 @@ AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len) ...@@ -153,6 +164,16 @@ AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len)
CATCHALL CATCHALL
} }
const char *
AMQ_Version(AMQ_Worker *worker, char *version)
{
try {
strcpy(version, worker->getVersion().c_str());
return NULL;
}
CATCHALL
}
const char * const char *
AMQ_WorkerShutdown(AMQ_Worker **worker) AMQ_WorkerShutdown(AMQ_Worker **worker)
{ {
......
...@@ -61,6 +61,7 @@ public: ...@@ -61,6 +61,7 @@ public:
int deliveryMode); int deliveryMode);
virtual ~AMQ_Worker(); virtual ~AMQ_Worker();
void send(std::string& text); void send(std::string& text);
std::string getVersion();
}; };
#else #else
typedef struct AMQ_Worker AMQ_Worker; typedef struct AMQ_Worker AMQ_Worker;
...@@ -73,6 +74,7 @@ extern "C" { ...@@ -73,6 +74,7 @@ extern "C" {
const char *AMQ_GlobalInit(char *uri); const char *AMQ_GlobalInit(char *uri);
const char *AMQ_WorkerInit(AMQ_Worker **worker, char *qName); const char *AMQ_WorkerInit(AMQ_Worker **worker, char *qName);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len); const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_Version(AMQ_Worker *worker, char *version);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker); const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
const char *AMQ_GlobalShutdown(void); const char *AMQ_GlobalShutdown(void);
......
...@@ -50,6 +50,12 @@ MQ_Send(void *priv, const char *data, unsigned len) ...@@ -50,6 +50,12 @@ MQ_Send(void *priv, const char *data, unsigned len)
return AMQ_Send((AMQ_Worker *) priv, data, len); return AMQ_Send((AMQ_Worker *) priv, data, len);
} }
const char *
MQ_Version(void *priv, char *version)
{
return AMQ_Version((AMQ_Worker *) priv, version);
}
const char * const char *
MQ_WorkerShutdown(void **priv) MQ_WorkerShutdown(void **priv)
{ {
......
...@@ -79,6 +79,23 @@ static const char ...@@ -79,6 +79,23 @@ static const char
return NULL; return NULL;
} }
static const char
*test_version(void)
{
const char *err;
char version[BUFSIZ];
printf("... testing version info\n");
mu_assert("MQ_Version: worker is NULL before call", worker != NULL);
err = MQ_Version(worker, version);
sprintf(errmsg, "MQ_Version: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("MQ_Version: version is empty", version[0] != '\0');
return NULL;
}
static const char static const char
*test_send(void) *test_send(void)
{ {
...@@ -132,6 +149,7 @@ static const char ...@@ -132,6 +149,7 @@ static const char
{ {
mu_run_test(test_global_init); mu_run_test(test_global_init);
mu_run_test(test_worker_init); mu_run_test(test_worker_init);
mu_run_test(test_version);
mu_run_test(test_send); mu_run_test(test_send);
mu_run_test(test_worker_shutdown); mu_run_test(test_worker_shutdown);
mu_run_test(test_global_shutdown); mu_run_test(test_global_shutdown);
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
*/ */
#include <string.h> #include <string.h>
#include <stdbool.h>
#include "minunit.h" #include "minunit.h"
...@@ -80,7 +81,7 @@ static char ...@@ -80,7 +81,7 @@ static char
return NULL; return NULL;
} }
static char static const char
*test_worker_run(void) *test_worker_run(void)
{ {
dataentry *entry; dataentry *entry;
...@@ -91,6 +92,15 @@ static char ...@@ -91,6 +92,15 @@ static char
unsigned xid = (unsigned int) lrand48(); unsigned xid = (unsigned int) lrand48();
WRK_Start(); WRK_Start();
int wrk_running, wrk_wait = 0;
while ((wrk_running = WRK_Running()) < NWORKERS) {
if (wrk_wait++ > 10)
break;
TIM_sleep(1);
}
sprintf(errmsg, "%d of %d worker threads running", wrk_running, NWORKERS);
mu_assert(errmsg, wrk_running == NWORKERS);
for (int i = 0; i < 1024; i++) { for (int i = 0; i < 1024; i++) {
entry = &dtbl.entry[i]; entry = &dtbl.entry[i];
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC); CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
...@@ -98,7 +108,7 @@ static char ...@@ -98,7 +108,7 @@ static char
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1); sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data); entry->end = strlen(entry->data);
entry->state = DATA_DONE; entry->state = DATA_DONE;
assert(SPMCQ_Enq(entry)); mu_assert("SPMCQ full", SPMCQ_Enq(entry) == true);
} }
WRK_Halt(); WRK_Halt();
......
...@@ -153,6 +153,7 @@ int spmcq_datawaiter; ...@@ -153,6 +153,7 @@ int spmcq_datawaiter;
const char *MQ_GlobalInit(void); const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv); const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len); const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version);
const char *MQ_WorkerShutdown(void **priv); const char *MQ_WorkerShutdown(void **priv);
const char *MQ_GlobalShutdown(void); const char *MQ_GlobalShutdown(void);
......
...@@ -39,6 +39,8 @@ ...@@ -39,6 +39,8 @@
#include "vas.h" #include "vas.h"
#include "miniobj.h" #include "miniobj.h"
#define VERSION_LEN 64
static int running = 0; static int running = 0;
typedef enum { typedef enum {
...@@ -135,6 +137,7 @@ static void ...@@ -135,6 +137,7 @@ static void
void *amq_worker; void *amq_worker;
dataentry *entry; dataentry *entry;
const char *err; const char *err;
char version[VERSION_LEN];
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id); LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
...@@ -150,6 +153,12 @@ static void ...@@ -150,6 +153,12 @@ static void
pthread_exit((void *) wrk); pthread_exit((void *) wrk);
} }
err = MQ_Version(amq_worker, version);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ version", wrk->id, err);
version[0] = '\0';
}
VSTAILQ_INIT(&wrk->wrk_freelist); VSTAILQ_INIT(&wrk->wrk_freelist);
wrk->wrk_nfree = 0; wrk->wrk_nfree = 0;
...@@ -157,6 +166,8 @@ static void ...@@ -157,6 +166,8 @@ static void
AZ(pthread_mutex_lock(&running_lock)); AZ(pthread_mutex_lock(&running_lock));
running++; running++;
AZ(pthread_mutex_unlock(&running_lock)); AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: running (%s)", wrk->id, version);
while (run) { while (run) {
entry = (dataentry *) SPMCQ_Deq(); entry = (dataentry *) SPMCQ_Deq();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment