Commit 257c6695 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: extended the MQ interface to allow the use of shard keys

(but the ActiveMQ implementation does not use sharding)
parent 97f8d464
...@@ -140,9 +140,12 @@ const char *MQ_WorkerInit(void **priv); ...@@ -140,9 +140,12 @@ const char *MQ_WorkerInit(void **priv);
* @param priv private object handle * @param priv private object handle
* @param data pointer to the data to be sent * @param data pointer to the data to be sent
* @param len length of the data in bytes * @param len length of the data in bytes
* @param key an optional sharding key for the messaging system
* @param keylen length of the sharding key
* @return `NULL` on success, an error message on failure * @return `NULL` on success, an error message on failure
*/ */
const char *MQ_Send(void *priv, const char *data, unsigned len); const char *MQ_Send(void *priv, const char *data, unsigned len,
const char *key, unsigned keylen);
/** /**
* Return the version string of the messaging system. * Return the version string of the messaging system.
......
...@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \ ...@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \
@APR_LIBS@ \ @APR_LIBS@ \
@APU_LIBS@ @APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS = -version-info 1:0:0 libtrackrdr_activemq_la_LDFLAGS = -version-info 2:0:0
if HAVE_RST2MAN if HAVE_RST2MAN
dist_man_MANS = libtrackrdr-activemq.3 dist_man_MANS = libtrackrdr-activemq.3
......
...@@ -37,6 +37,9 @@ path is specified as ``mq.config_fname`` in the configuration of ...@@ -37,6 +37,9 @@ path is specified as ``mq.config_fname`` in the configuration of
``libactivemq-cpp``. The dynamic linker must also be able to find ``libactivemq-cpp``. The dynamic linker must also be able to find
``libactivemq-cpp.so`` at runtime. ``libactivemq-cpp.so`` at runtime.
This implementation does not use sharding keys; the key data in the
call to ``MQ_Send()`` are silently discarded.
BUILD/INSTALL BUILD/INSTALL
============= =============
......
...@@ -140,8 +140,13 @@ MQ_WorkerInit(void **priv) ...@@ -140,8 +140,13 @@ MQ_WorkerInit(void **priv)
} }
const char * const char *
MQ_Send(void *priv, const char *data, unsigned len) MQ_Send(void *priv, const char *data, unsigned len, const char *key,
unsigned keylen)
{ {
/* The ActiveMQ implementation does not use sharding. */
(void) key;
(void) keylen;
return AMQ_Send((AMQ_Worker *) priv, data, len); return AMQ_Send((AMQ_Worker *) priv, data, len);
} }
......
...@@ -129,7 +129,7 @@ static const char ...@@ -129,7 +129,7 @@ static const char
printf("... testing ActiveMQ message send\n"); printf("... testing ActiveMQ message send\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call"); MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_Send(worker, "foo bar baz quux", 16); err = MQ_Send(worker, "foo bar baz quux", 16, "key", 3);
VMASSERT(err == NULL, "MQ_Send: %s", err); VMASSERT(err == NULL, "MQ_Send: %s", err);
return NULL; return NULL;
...@@ -146,7 +146,7 @@ static const char ...@@ -146,7 +146,7 @@ static const char
err = MQ_Reconnect(&worker); err = MQ_Reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err); VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call"); MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
err = MQ_Send(worker, "send after reconnect", 20); err = MQ_Send(worker, "send after reconnect", 20, "key", 3);
VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err); VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err);
return NULL; return NULL;
...@@ -165,7 +165,7 @@ static const char ...@@ -165,7 +165,7 @@ static const char
MASSERT0(worker == NULL, "Worker not NULL after shutdown"); MASSERT0(worker == NULL, "Worker not NULL after shutdown");
err = MQ_Send(worker, "foo bar baz quux", 16); err = MQ_Send(worker, "foo bar baz quux", 16, "key", 3);
MASSERT0(err != NULL, "No failure on MQ_Send after worker shutdown"); MASSERT0(err != NULL, "No failure on MQ_Send after worker shutdown");
return NULL; return NULL;
......
...@@ -172,7 +172,7 @@ static const char ...@@ -172,7 +172,7 @@ static const char
printf("... testing message send\n"); printf("... testing message send\n");
mu_assert("MQ_Send: worker is NULL before call", worker != NULL); mu_assert("MQ_Send: worker is NULL before call", worker != NULL);
err = mqf.send(worker, "foo bar baz quux", 16); err = mqf.send(worker, "foo bar baz quux", 16, "key", 3);
sprintf(errmsg, "MQ_Send: %s", err); sprintf(errmsg, "MQ_Send: %s", err);
mu_assert(errmsg, err == NULL); mu_assert(errmsg, err == NULL);
...@@ -190,7 +190,7 @@ static const char ...@@ -190,7 +190,7 @@ static const char
err = mqf.reconnect(&worker); err = mqf.reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err); VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call"); MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
err = mqf.send(worker, "send after reconnect", 20); err = mqf.send(worker, "send after reconnect", 20, "key", 3);
VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err); VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err);
return NULL; return NULL;
...@@ -210,7 +210,7 @@ static const char ...@@ -210,7 +210,7 @@ static const char
mu_assert("Worker not NULL after shutdown", worker == NULL); mu_assert("Worker not NULL after shutdown", worker == NULL);
err = mqf.send(worker, "foo bar baz quux", 16); err = mqf.send(worker, "foo bar baz quux", 16, "key", 3);
mu_assert("No failure on MQ_Send after worker shutdown", err != NULL); mu_assert("No failure on MQ_Send after worker shutdown", err != NULL);
return NULL; return NULL;
......
...@@ -44,7 +44,8 @@ ...@@ -44,7 +44,8 @@
typedef const char *global_init_f(unsigned nworkers, const char *config_fname); typedef const char *global_init_f(unsigned nworkers, const char *config_fname);
typedef const char *init_connections_f(void); typedef const char *init_connections_f(void);
typedef const char *worker_init_f(void **priv); typedef const char *worker_init_f(void **priv);
typedef const char *send_f(void *priv, const char *data, unsigned len); typedef const char *send_f(void *priv, const char *data, unsigned len,
const char *key, unsigned keylen);
typedef const char *version_f(void *priv, char *version); typedef const char *version_f(void *priv, char *version);
typedef const char *client_id_f(void *priv, char *clientID); typedef const char *client_id_f(void *priv, char *clientID);
typedef const char *reconnect_f(void **priv); typedef const char *reconnect_f(void **priv);
......
...@@ -127,7 +127,8 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk) ...@@ -127,7 +127,8 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
AN(amq_worker); AN(amq_worker);
/* XXX: report entry->incomplete to backend ? */ /* XXX: report entry->incomplete to backend ? */
err = mqf.send(*amq_worker, entry->data, entry->end); err = mqf.send(*amq_worker, entry->data, entry->end,
entry->key, entry->keylen);
if (err != NULL) { if (err != NULL) {
LOG_Log(LOG_WARNING, "Worker %d: Failed to send data: %s", LOG_Log(LOG_WARNING, "Worker %d: Failed to send data: %s",
wrk->id, err); wrk->id, err);
...@@ -144,7 +145,8 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk) ...@@ -144,7 +145,8 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
wrk->reconnects++; wrk->reconnects++;
wrk_log_connection(*amq_worker, wrk->id); wrk_log_connection(*amq_worker, wrk->id);
MON_StatsUpdate(STATS_RECONNECT); MON_StatsUpdate(STATS_RECONNECT);
err = mqf.send(*amq_worker, entry->data, entry->end); err = mqf.send(*amq_worker, entry->data, entry->end,
entry->key, entry->keylen);
if (err != NULL) { if (err != NULL) {
wrk->fails++; wrk->fails++;
wrk->status = EXIT_FAILURE; wrk->status = EXIT_FAILURE;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment