Commit c3cd67d6 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd version 3 of the MQ interface: MQ_WorkerInit() supplies the

worker number as a parameter
parent 6e631b41
...@@ -31,8 +31,10 @@ ...@@ -31,8 +31,10 @@
/** /**
* \file mq.h * \file mq.h
* \brief MQ -- the messaging interface for the Varnish log tracking * \brief MQ messaging interface for trackrdrd
* reader. * \details MQ -- the messaging interface for the Varnish log tracking
* reader
* \version 3
* *
* This header defines the interface to a messaging system, such as * This header defines the interface to a messaging system, such as
* ActiveMQ or Kafka, used by the tracking reader. It is responsible for * ActiveMQ or Kafka, used by the tracking reader. It is responsible for
...@@ -128,9 +130,11 @@ const char *MQ_InitConnections(void); ...@@ -128,9 +130,11 @@ const char *MQ_InitConnections(void);
* @param priv pointer to a private object handle. The implementation is * @param priv pointer to a private object handle. The implementation is
* expected to place a pointer to its private data structure in this * expected to place a pointer to its private data structure in this
* location. * location.
* @param wrk_num the worker number, from 1 to the value of ``nworkers``
* supplied in ``MQ_GlobalInit()``, inclusive
* @return `NULL` on success, an error message on failure * @return `NULL` on success, an error message on failure
*/ */
const char *MQ_WorkerInit(void **priv); const char *MQ_WorkerInit(void **priv, int wrk_num);
/** /**
* Send data to the messaging system. * Send data to the messaging system.
......
...@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \ ...@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \
@APR_LIBS@ \ @APR_LIBS@ \
@APU_LIBS@ @APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS = -version-info 2:0:0 libtrackrdr_activemq_la_LDFLAGS = -version-info 3:0:0
if HAVE_RST2MAN if HAVE_RST2MAN
dist_man_MANS = libtrackrdr-activemq.3 dist_man_MANS = libtrackrdr-activemq.3
......
...@@ -33,7 +33,6 @@ ...@@ -33,7 +33,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <pthread.h>
#include <assert.h> #include <assert.h>
#include "mq.h" #include "mq.h"
...@@ -43,7 +42,6 @@ ...@@ -43,7 +42,6 @@
static AMQ_Connection **connections; static AMQ_Connection **connections;
static AMQ_Worker **workers; static AMQ_Worker **workers;
static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
static unsigned connection = 0; static unsigned connection = 0;
static unsigned nwrk = 0; static unsigned nwrk = 0;
...@@ -113,29 +111,25 @@ MQ_InitConnections(void) ...@@ -113,29 +111,25 @@ MQ_InitConnections(void)
} }
const char * const char *
MQ_WorkerInit(void **priv) MQ_WorkerInit(void **priv, int wrk_num)
{ {
int i, ret;
const char *err = NULL; const char *err = NULL;
ret = pthread_mutex_lock(&connection_lock); assert(wrk_num >= 1 && wrk_num <= nwrk);
assert(ret == 0); wrk_num--;
i = connection++ % nwrk; AMQ_Connection *conn = connections[wrk_num];
ret = pthread_mutex_unlock(&connection_lock);
assert(ret == 0);
AMQ_Connection *conn = connections[i];
if (conn == NULL) { if (conn == NULL) {
err = AMQ_ConnectionInit(&conn, uri[i % n_uris]); err = AMQ_ConnectionInit(&conn, uri[wrk_num % n_uris]);
if (err != NULL) if (err != NULL)
return err; return err;
else else
connections[i] = conn; connections[wrk_num] = conn;
} }
err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname, i); err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname, wrk_num);
if (err == NULL) if (err == NULL)
workers[i] = (AMQ_Worker *) *priv; workers[wrk_num] = (AMQ_Worker *) *priv;
else else
workers[i] = NULL; workers[wrk_num] = NULL;
return err; return err;
} }
......
...@@ -81,7 +81,7 @@ static const char ...@@ -81,7 +81,7 @@ static const char
printf("... testing ActiveMQ worker init\n"); printf("... testing ActiveMQ worker init\n");
err = MQ_WorkerInit(&worker); err = MQ_WorkerInit(&worker, NWORKERS);
VMASSERT(err == NULL, "MQ_WorkerInit: %s", err); VMASSERT(err == NULL, "MQ_WorkerInit: %s", err);
MASSERT0(worker != NULL, "Worker is NULL after MQ_WorkerInit"); MASSERT0(worker != NULL, "Worker is NULL after MQ_WorkerInit");
......
...@@ -43,6 +43,8 @@ ...@@ -43,6 +43,8 @@
#define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so" #define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so"
#define MQ_CONFIG "activemq.conf" #define MQ_CONFIG "activemq.conf"
#define NWORKERS 1
int tests_run = 0; int tests_run = 0;
static char errmsg[BUFSIZ]; static char errmsg[BUFSIZ];
static void *mqh; static void *mqh;
...@@ -87,7 +89,7 @@ static char ...@@ -87,7 +89,7 @@ static char
printf("... testing MQ global initialization\n"); printf("... testing MQ global initialization\n");
config.nworkers = 1; config.nworkers = NWORKERS;
strcpy(config.mq_config_file, MQ_CONFIG); strcpy(config.mq_config_file, MQ_CONFIG);
err = mqf.global_init(config.nworkers, config.mq_config_file); err = mqf.global_init(config.nworkers, config.mq_config_file);
sprintf(errmsg, "MQ_GlobalInit: %s", err); sprintf(errmsg, "MQ_GlobalInit: %s", err);
...@@ -121,7 +123,7 @@ static const char ...@@ -121,7 +123,7 @@ static const char
printf("... test worker init\n"); printf("... test worker init\n");
err = mqf.worker_init(&worker); err = mqf.worker_init(&worker, NWORKERS);
sprintf(errmsg, "MQ_WorkerInit: %s", err); sprintf(errmsg, "MQ_WorkerInit: %s", err);
mu_assert(errmsg, err == NULL); mu_assert(errmsg, err == NULL);
......
...@@ -43,7 +43,7 @@ ...@@ -43,7 +43,7 @@
/* message queue methods, typedefs match the interface in mq.h */ /* message queue methods, typedefs match the interface in mq.h */
typedef const char *global_init_f(unsigned nworkers, const char *config_fname); typedef const char *global_init_f(unsigned nworkers, const char *config_fname);
typedef const char *init_connections_f(void); typedef const char *init_connections_f(void);
typedef const char *worker_init_f(void **priv); typedef const char *worker_init_f(void **priv, int wrk_num);
typedef const char *send_f(void *priv, const char *data, unsigned len, typedef const char *send_f(void *priv, const char *data, unsigned len,
const char *key, unsigned keylen); const char *key, unsigned keylen);
typedef const char *version_f(void *priv, char *version); typedef const char *version_f(void *priv, char *version);
......
...@@ -190,7 +190,7 @@ static void ...@@ -190,7 +190,7 @@ static void
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING; wrk->state = WRK_INITIALIZING;
err = mqf.worker_init(&amq_worker); err = mqf.worker_init(&amq_worker, wrk->id);
if (err != NULL) { if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s", LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err); wrk->id, err);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment