Commit c433051c authored by Geoff Simmons's avatar Geoff Simmons

MQ interface version 4 -- pass in the worker number in MQ_WorkerShutdown()

(because the ActiveMQ implementation was crashing when it tried to look
up the number itself)
parent 0d7417df
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
* \brief MQ messaging interface for trackrdrd * \brief MQ messaging interface for trackrdrd
* \details MQ -- the messaging interface for the Varnish log tracking * \details MQ -- the messaging interface for the Varnish log tracking
* reader * reader
* \version 3 * \version 4
* *
* This header defines the interface to a messaging system, such as * This header defines the interface to a messaging system, such as
* ActiveMQ or Kafka, used by the tracking reader. It is responsible for * ActiveMQ or Kafka, used by the tracking reader. It is responsible for
...@@ -236,9 +236,11 @@ const char *MQ_Reconnect(void **priv); ...@@ -236,9 +236,11 @@ const char *MQ_Reconnect(void **priv);
* this method (so it may, for example, be set to `NULL`). * this method (so it may, for example, be set to `NULL`).
* *
* @param priv pointer to the private object handle * @param priv pointer to the private object handle
* @param wrk_num worker number, the same value passed in the call
* to MQ_Init() when this object was initialized
* @return `NULL` on success, an error message on failure * @return `NULL` on success, an error message on failure
*/ */
const char *MQ_WorkerShutdown(void **priv); const char *MQ_WorkerShutdown(void **priv, int wrk_num);
/** /**
* Globally shut down the messaging implementation * Globally shut down the messaging implementation
......
...@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \ ...@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \
@APR_LIBS@ \ @APR_LIBS@ \
@APU_LIBS@ @APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS = -version-info 3:0:0 libtrackrdr_activemq_la_LDFLAGS = -version-info 4:0:0
if HAVE_RST2MAN if HAVE_RST2MAN
dist_man_MANS = libtrackrdr-activemq.3 dist_man_MANS = libtrackrdr-activemq.3
......
...@@ -195,14 +195,11 @@ MQ_ClientID(void *priv, char *clientID) ...@@ -195,14 +195,11 @@ MQ_ClientID(void *priv, char *clientID)
} }
const char * const char *
MQ_WorkerShutdown(void **priv) MQ_WorkerShutdown(void **priv, int wrk_num)
{ {
const char *err; const char *err;
int wrk_num;
err = AMQ_GetNum((AMQ_Worker *) *priv, &wrk_num); wrk_num--;
if (err != NULL)
return err;
assert(wrk_num >= 0 && wrk_num < nwrk); assert(wrk_num >= 0 && wrk_num < nwrk);
if (connections[wrk_num] != NULL) { if (connections[wrk_num] != NULL) {
err = AMQ_ConnectionShutdown(connections[wrk_num]); err = AMQ_ConnectionShutdown(connections[wrk_num]);
...@@ -210,6 +207,8 @@ MQ_WorkerShutdown(void **priv) ...@@ -210,6 +207,8 @@ MQ_WorkerShutdown(void **priv)
return err; return err;
connections[wrk_num] = NULL; connections[wrk_num] = NULL;
} }
if (workers[wrk_num] != (AMQ_Worker *) *priv)
return "AMQ worker handle not found in worker table";
AMQ_WorkerShutdown((AMQ_Worker **) priv); AMQ_WorkerShutdown((AMQ_Worker **) priv);
if (err != NULL) if (err != NULL)
return err; return err;
......
...@@ -163,7 +163,7 @@ static const char ...@@ -163,7 +163,7 @@ static const char
printf("... testing ActiveMQ worker shutdown\n"); printf("... testing ActiveMQ worker shutdown\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call"); MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_WorkerShutdown(&worker); err = MQ_WorkerShutdown(&worker, NWORKERS);
VMASSERT(err == NULL, "MQ_WorkerShutdown: %s", err); VMASSERT(err == NULL, "MQ_WorkerShutdown: %s", err);
MASSERT0(worker == NULL, "Worker not NULL after shutdown"); MASSERT0(worker == NULL, "Worker not NULL after shutdown");
......
INCLUDES = -I$(top_srcdir)/include INCLUDES = -I$(top_srcdir)/include
CURRENT = 3 CURRENT = 4
REVISION = 0 REVISION = 0
AGE = 0 AGE = 0
......
...@@ -378,10 +378,11 @@ MQ_ClientID(void *priv, char *clientID) ...@@ -378,10 +378,11 @@ MQ_ClientID(void *priv, char *clientID)
} }
const char * const char *
MQ_WorkerShutdown(void **priv) MQ_WorkerShutdown(void **priv, int wrk_num)
{ {
kafka_wrk_t *wrk; kafka_wrk_t *wrk;
(void) wrk_num;
CAST_OBJ_NOTNULL(wrk, *priv, KAFKA_WRK_MAGIC); CAST_OBJ_NOTNULL(wrk, *priv, KAFKA_WRK_MAGIC);
WRK_Fini(wrk); WRK_Fini(wrk);
*priv = NULL; *priv = NULL;
......
...@@ -202,7 +202,7 @@ static const char ...@@ -202,7 +202,7 @@ static const char
printf("... testing Kafka worker shutdown\n"); printf("... testing Kafka worker shutdown\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call"); MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_WorkerShutdown(&worker); err = MQ_WorkerShutdown(&worker, NWORKERS);
VMASSERT(err == NULL, "MQ_WorkerShutdown: %s", err); VMASSERT(err == NULL, "MQ_WorkerShutdown: %s", err);
MASSERT0(worker == NULL, "Worker not NULL after shutdown"); MASSERT0(worker == NULL, "Worker not NULL after shutdown");
......
...@@ -209,7 +209,7 @@ static const char ...@@ -209,7 +209,7 @@ static const char
printf("... testing worker shutdown\n"); printf("... testing worker shutdown\n");
mu_assert("MQ_WorkerShutdown: worker is NULL before call", worker != NULL); mu_assert("MQ_WorkerShutdown: worker is NULL before call", worker != NULL);
err = mqf.worker_shutdown(&worker); err = mqf.worker_shutdown(&worker, NWORKERS);
sprintf(errmsg, "MQ_WorkerShutdown: %s", err); sprintf(errmsg, "MQ_WorkerShutdown: %s", err);
mu_assert(errmsg, err == NULL); mu_assert(errmsg, err == NULL);
......
...@@ -49,7 +49,7 @@ typedef int send_f(void *priv, const char *data, unsigned len, ...@@ -49,7 +49,7 @@ typedef int send_f(void *priv, const char *data, unsigned len,
typedef const char *version_f(void *priv, char *version); typedef const char *version_f(void *priv, char *version);
typedef const char *client_id_f(void *priv, char *clientID); typedef const char *client_id_f(void *priv, char *clientID);
typedef const char *reconnect_f(void **priv); typedef const char *reconnect_f(void **priv);
typedef const char *worker_shutdown_f(void **priv); typedef const char *worker_shutdown_f(void **priv, int wrk_num);
typedef const char *global_shutdown_f(void); typedef const char *global_shutdown_f(void);
struct mqf { struct mqf {
......
...@@ -274,7 +274,7 @@ static void ...@@ -274,7 +274,7 @@ static void
wrk->status = EXIT_SUCCESS; wrk->status = EXIT_SUCCESS;
} }
err = mqf.worker_shutdown(&mq_worker); err = mqf.worker_shutdown(&mq_worker, wrk->id);
if (err != NULL) { if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s", LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s",
wrk->id, err); wrk->id, err);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment