Commit f1b3c487 authored by Geoff Simmons's avatar Geoff Simmons

MQ interface: MQ_Send() now distinguishes recoverable from non-recoverable

errors
parent 50b85ef6
# Ignore everything in this directory
*
# Except this file
!.gitignore
......@@ -53,10 +53,24 @@
* MQ_WorkerInit(). A thread-safe implementation must be provided for
* each operation defined with such an object as an argument.
*
* Each operation in this interface is expected to return `NULL` on
* success, or an error string on failure, to be used by the tracking
* reader to log error messages. The tracking reader does not attempt to
* free non-`NULL` pointers returned from the messaging interface.
* With the exception of MQ_Send(), each operation in this interface is
* expected to return `NULL` on success, or an error string on failure, to
* be used by the tracking reader to log error messages. MQ_Send() is
* expected to return 0 for a successful send, an integer greater than 0
* for a recoverable error, or an integer less than 0 for a
* non-recoverable error; and to place an error message in its `error`
* parameter. Non-recoverable errors should be signaled when internal
* structures of the messaging implementation must be shut down and
* re-initialized, for example when a network connection has been lost or
* has become unreliable; in this case, the tracking reader performs the
* error recovery procedure described below. After recoverable errors from
* MQ_Send(), the tracking reader simply logs the error message and
* continues.
*
* The pointers to error messages returned from operations in this
* interface, or set in the `error` parameter of MQ_Send(), should point
* to static storage. The tracking reader does not attempt to free
* non-`NULL` pointers returned from the interface.
*
* The methods in this interface are called in the following order:
*
......@@ -79,7 +93,7 @@
* logged, but the thread continues.
* - The main loop of the worker thread calls MQ_Send() for every data
* record that it processes. See below for a description of how the
* tracking reader handles message send failures.
* tracking reader handles non-recoverable message send failures.
* - MQ_WorkerShutdown() is called when the worker thread is shutting
* down.
*
......@@ -89,8 +103,8 @@
*
* Once a worker thread has entered its main loop (and hence global
* initialization, initialization of network connections and of a private
* worker object have succeeded), the tracking reader handles failures of
* message sends as follows:
* worker object have succeeded), the tracking reader handles
* non-recoverable failures of message sends as follows:
*
* - If MQ_Send() fails, the thread calls MQ_Reconnect(); the messaging
* implementation is expected to attempt a new connection, and may
......@@ -139,6 +153,14 @@ const char *MQ_WorkerInit(void **priv, int wrk_num);
/**
* Send data to the messaging system.
*
* On failure, the implementation must signal whether the error is
* recoverable or non-recoverable, with a return code greater than or less
* than zero, respectively. If the implementation can safely continue with
* the state referenced by its private object handle, the error is
* recoverable; if its private structures must be shut down / destroyed,
* the error is non-recoverable (and the tracking reader initiates the
* shutdown and possible re-initialization as described above).
*
* The implementation of this method must be thread-safe.
*
* @param priv private object handle
......@@ -146,10 +168,13 @@ const char *MQ_WorkerInit(void **priv, int wrk_num);
* @param len length of the data in bytes
* @param key an optional sharding key for the messaging system
* @param keylen length of the sharding key
* @return `NULL` on success, an error message on failure
* @param error pointer to an error message. The implementation is
* expected to place a message in this location when non-zero is returned.
* @return zero on success, >0 for a recoverable error, <0 for a
* non-recoverable error
*/
const char *MQ_Send(void *priv, const char *data, unsigned len,
const char *key, unsigned keylen);
int MQ_Send(void *priv, const char *data, unsigned len,
const char *key, unsigned keylen, const char **error);
/**
* Return the version string of the messaging system.
......
......@@ -133,15 +133,18 @@ MQ_WorkerInit(void **priv, int wrk_num)
return err;
}
const char *
int
MQ_Send(void *priv, const char *data, unsigned len, const char *key,
unsigned keylen)
unsigned keylen, const char **error)
{
/* The ActiveMQ implementation does not use sharding. */
(void) key;
(void) keylen;
return AMQ_Send((AMQ_Worker *) priv, data, len);
*error = AMQ_Send((AMQ_Worker *) priv, data, len);
if (*error != NULL)
return -1;
return 0;
}
const char *
......
......@@ -125,12 +125,13 @@ static const char
*test_send(void)
{
const char *err;
int ret;
printf("... testing ActiveMQ message send\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_Send(worker, "foo bar baz quux", 16, "key", 3);
VMASSERT(err == NULL, "MQ_Send: %s", err);
ret = MQ_Send(worker, "foo bar baz quux", 16, "key", 3, &err);
VMASSERT(ret == 0, "MQ_Send: %s", err);
return NULL;
}
......@@ -139,6 +140,7 @@ static const char
*test_reconnect(void)
{
const char *err;
int ret;
printf("... testing ActiveMQ reconnect\n");
......@@ -146,8 +148,8 @@ static const char
err = MQ_Reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
err = MQ_Send(worker, "send after reconnect", 20, "key", 3);
VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err);
ret = MQ_Send(worker, "send after reconnect", 20, "key", 3, &err);
VMASSERT(ret == 0, "MQ_Send() fails after reconnect: %s", err);
return NULL;
}
......@@ -156,6 +158,7 @@ static const char
*test_worker_shutdown(void)
{
const char *err;
int ret;
printf("... testing ActiveMQ worker shutdown\n");
......@@ -165,8 +168,8 @@ static const char
MASSERT0(worker == NULL, "Worker not NULL after shutdown");
err = MQ_Send(worker, "foo bar baz quux", 16, "key", 3);
MASSERT0(err != NULL, "No failure on MQ_Send after worker shutdown");
ret = MQ_Send(worker, "foo bar baz quux", 16, "key", 3, &err);
MASSERT0(ret != 0, "No failure on MQ_Send after worker shutdown");
return NULL;
}
......
......@@ -126,7 +126,7 @@ error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_ERR, "Client error (ID = %s) %d: %s", rd_kafka_name(rk), err,
reason);
reason);
wrk->err = 1;
}
......@@ -136,7 +136,7 @@ stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
(void) opaque;
MQ_LOG_Log(LOG_INFO, "Client stats (ID = %s): %.*s", rd_kafka_name(rk),
(int) json_len, json);
(int) json_len, json);
return 0;
}
......@@ -564,32 +564,28 @@ MQ_WorkerInit(void **priv, int wrk_num)
return NULL;
}
/*
* XXX: we really only want to set off trackrdrd error recovery on
* message send failure or priv == NULL, not e.g. if the key is
* missing or invalid. How to signal back a recoverable error?
*/
const char *
int
MQ_Send(void *priv, const char *data, unsigned len, const char *key,
unsigned keylen)
unsigned keylen, const char **error)
{
kafka_wrk_t *wrk;
void *payload = NULL;
char *ret = NULL;
int ret = 0;
/* XXX: error? */
if (len == 0)
return NULL;
return 0;
if (priv == NULL) {
MQ_LOG_Log(LOG_ERR, "MQ_Send() called with NULL worker object");
return "Worker object is NULL";
strncpy(errmsg, "Worker object is NULL", LINE_MAX);
*error = errmsg;
return -1;
}
CAST_OBJ(wrk, priv, KAFKA_WRK_MAGIC);
/*
* XXX
* These errors are recoverable
* Increment stats counters on error, so that they can be monitored
* Toggle log level DEBUG with signals
*/
......@@ -599,7 +595,8 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
MQ_LOG_Log(LOG_ERR, errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=", rd_kafka_name(wrk->kafka),
len, data);
return errmsg;
*error = errmsg;
return 1;
}
if (data == NULL) {
snprintf(errmsg, LINE_MAX, "%s message payload is NULL",
......@@ -607,7 +604,8 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
MQ_LOG_Log(LOG_DEBUG, "%s data= key=[%.*s]", rd_kafka_name(wrk->kafka),
keylen, key);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
*error = errmsg;
return 1;
}
if (keylen > 8)
......@@ -619,7 +617,8 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
MQ_LOG_Log(LOG_ERR, errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=[%.*s]",
rd_kafka_name(wrk->kafka), len, data, keylen, key);
return errmsg;
*error = errmsg;
return 1;
}
REPLACE(payload, data);
......@@ -628,7 +627,8 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
snprintf(errmsg, LINE_MAX, rd_kafka_err2str(rd_kafka_errno2err(errno)));
MQ_LOG_Log(LOG_ERR, "%s message send failure (%d): %s",
rd_kafka_name(wrk->kafka), errno, errmsg);
ret = errmsg;
*error = errmsg;
ret = -1;
}
rd_kafka_poll(wrk->kafka, 0);
return ret;
......
......@@ -125,49 +125,50 @@ static const char
*test_send(void)
{
const char *err;
int ret;
printf("... testing Kafka message send\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_Send(worker, "foo bar baz quux", 16, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send: %s", err);
ret = MQ_Send(worker, "foo bar baz quux", 16, "12345678", 8, &err);
VMASSERT(ret == 0, "MQ_Send: %s", err);
/* Keys shorter and longer than 8 hex digits */
err = MQ_Send(worker, "the quick brown fox", 19, "abcdef", 6);
VMASSERT(err == NULL, "MQ_Send: %s", err);
err = MQ_Send(worker, "jumps over the lazy dog", 23,
"fedcba9876543210", 16);
VMASSERT(err == NULL, "MQ_Send: %s", err);
ret = MQ_Send(worker, "the quick brown fox", 19, "abcdef", 6, &err);
VMASSERT(ret == 0, "MQ_Send: %s", err);
ret = MQ_Send(worker, "jumps over the lazy dog", 23,
"fedcba9876543210", 16, &err);
VMASSERT(ret == 0, "MQ_Send: %s", err);
/* No error if message is empty (silently discarded) */
err = MQ_Send(worker, "", 0, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send: %s", err);
ret = MQ_Send(worker, "", 0, "12345678", 8, &err);
VMASSERT(ret == 0, "MQ_Send: %s", err);
/* Fail if the worker object is null */
err = MQ_Send(NULL, "foo bar baz quux", 16, "12345678", 8);
MAN(err);
/* Non-recoverable error if the worker object is null */
ret = MQ_Send(NULL, "foo bar baz quux", 16, "12345678", 8, &err);
MASSERT(ret < 0);
/* Fail if the key is empty */
err = MQ_Send(worker, "foo bar baz quux", 16, "", 0);
MAN(err);
/* Recoverable error if the key is empty */
ret = MQ_Send(worker, "foo bar baz quux", 16, "", 0 ,&err);
MASSERT(ret > 0);
VMASSERT(strstr("shard key is missing", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the key is NULL */
err = MQ_Send(worker, "foo bar baz quux", 16, NULL, 0);
MAN(err);
/* Recoverable error if the key is NULL */
ret = MQ_Send(worker, "foo bar baz quux", 16, NULL, 0, &err);
MASSERT(ret > 0);
VMASSERT(strstr("shard key is missing", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the key contains non-hex characters */
err = MQ_Send(worker, "foo bar baz quux", 16, "key", 3);
MAN(err);
/* Recoverable error if the key contains non-hex characters */
ret = MQ_Send(worker, "foo bar baz quux", 16, "key", 3, &err);
MASSERT(ret > 0);
VMASSERT(strstr("shard key is not hex", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the message is NULL */
err = MQ_Send(worker, NULL, 16, "12345678", 8);
MAN(err);
/* Recoverable error if the message is NULL */
ret = MQ_Send(worker, NULL, 16, "12345678", 8, &err);
MASSERT(ret > 0);
VMASSERT(strstr("message payload is NULL", err) == 0,
"MQ_Send unexpected error message: %s", err);
......@@ -178,6 +179,7 @@ static const char
*test_reconnect(void)
{
const char *err;
int ret;
printf("... testing Kafka reconnect\n");
......@@ -185,8 +187,8 @@ static const char
err = MQ_Reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
err = MQ_Send(worker, "send after reconnect", 20, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err);
ret = MQ_Send(worker, "send after reconnect", 20, "12345678", 8, &err);
VMASSERT(ret == 0, "MQ_Send() fails after reconnect: %s", err);
return NULL;
}
......@@ -195,6 +197,7 @@ static const char
*test_worker_shutdown(void)
{
const char *err;
int ret;
printf("... testing Kafka worker shutdown\n");
......@@ -204,8 +207,8 @@ static const char
MASSERT0(worker == NULL, "Worker not NULL after shutdown");
err = MQ_Send(worker, "foo bar baz quux", 16, "12345678", 8);
MASSERT0(err != NULL, "No failure on MQ_Send after worker shutdown");
ret = MQ_Send(worker, "foo bar baz quux", 16, "12345678", 8, &err);
MASSERT0(ret != 0, "No failure on MQ_Send after worker shutdown");
return NULL;
}
......
......@@ -170,13 +170,14 @@ static const char
*test_send(void)
{
const char *err;
int ret;
printf("... testing message send\n");
mu_assert("MQ_Send: worker is NULL before call", worker != NULL);
err = mqf.send(worker, "foo bar baz quux", 16, "key", 3);
ret = mqf.send(worker, "foo bar baz quux", 16, "key", 3, &err);
sprintf(errmsg, "MQ_Send: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert(errmsg, ret == 0);
return NULL;
}
......@@ -185,6 +186,7 @@ static const char
*test_reconnect(void)
{
const char *err;
int ret;
printf("... testing MQ reconnect\n");
......@@ -192,8 +194,8 @@ static const char
err = mqf.reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
err = mqf.send(worker, "send after reconnect", 20, "key", 3);
VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err);
ret = mqf.send(worker, "send after reconnect", 20, "key", 3, &err);
VMASSERT(ret == 0, "MQ_Send() fails after reconnect: %s", err);
return NULL;
}
......@@ -202,6 +204,7 @@ static const char
*test_worker_shutdown(void)
{
const char *err;
int ret;
printf("... testing worker shutdown\n");
......@@ -212,8 +215,8 @@ static const char
mu_assert("Worker not NULL after shutdown", worker == NULL);
err = mqf.send(worker, "foo bar baz quux", 16, "key", 3);
mu_assert("No failure on MQ_Send after worker shutdown", err != NULL);
ret = mqf.send(worker, "foo bar baz quux", 16, "key", 3, &err);
mu_assert("No failure on MQ_Send after worker shutdown", ret != 0);
return NULL;
}
......
......@@ -44,8 +44,8 @@
typedef const char *global_init_f(unsigned nworkers, const char *config_fname);
typedef const char *init_connections_f(void);
typedef const char *worker_init_f(void **priv, int wrk_num);
typedef const char *send_f(void *priv, const char *data, unsigned len,
const char *key, unsigned keylen);
typedef int send_f(void *priv, const char *data, unsigned len,
const char *key, unsigned keylen, const char **error);
typedef const char *version_f(void *priv, char *version);
typedef const char *client_id_f(void *priv, char *clientID);
typedef const char *reconnect_f(void **priv);
......
......@@ -81,6 +81,7 @@ struct worker_data_s {
unsigned waits;
unsigned sends;
unsigned fails;
unsigned recoverables;
unsigned reconnects;
unsigned restarts;
};
......@@ -98,17 +99,17 @@ static thread_data_t *thread_data;
static pthread_mutex_t running_lock;
static void
wrk_log_connection(void *amq_worker, unsigned id)
wrk_log_connection(void *mq_worker, unsigned id)
{
const char *err;
char version[VERSION_LEN], clientID[CLIENT_ID_LEN];
err = mqf.version(amq_worker, version);
err = mqf.version(mq_worker, version);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ version", id, err);
version[0] = '\0';
}
err = mqf.client_id(amq_worker, clientID);
err = mqf.client_id(mq_worker, clientID);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ client ID", id, err);
clientID[0] = '\0';
......@@ -118,48 +119,58 @@ wrk_log_connection(void *amq_worker, unsigned id)
}
static inline void
wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
{
const char *err;
int errnum;
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->state == DATA_DONE);
AN(amq_worker);
AN(mq_worker);
/* XXX: report entry->incomplete to backend ? */
err = mqf.send(*amq_worker, entry->data, entry->end,
entry->key, entry->keylen);
if (err != NULL) {
errnum = mqf.send(*mq_worker, entry->data, entry->end,
entry->key, entry->keylen, &err);
if (errnum != 0) {
LOG_Log(LOG_WARNING, "Worker %d: Failed to send data: %s",
wrk->id, err);
LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id);
err = mqf.reconnect(amq_worker);
if (err != NULL) {
wrk->status = EXIT_FAILURE;
LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id,
err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
entry->end, entry->data);
}
wrk->id, err);
if (errnum > 0)
wrk->recoverables++;
else {
wrk->reconnects++;
wrk_log_connection(*amq_worker, wrk->id);
MON_StatsUpdate(STATS_RECONNECT);
err = mqf.send(*amq_worker, entry->data, entry->end,
entry->key, entry->keylen);
/* Non-recoverable error */
LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id);
err = mqf.reconnect(mq_worker);
if (err != NULL) {
wrk->fails++;
wrk->status = EXIT_FAILURE;
LOG_Log(LOG_ALERT,
"Worker %d: Failed to send data after reconnect: %s",
wrk->id, err);
LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id,
err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED);
entry->end, entry->data);
}
else {
wrk->reconnects++;
wrk_log_connection(*mq_worker, wrk->id);
MON_StatsUpdate(STATS_RECONNECT);
errnum = mqf.send(*mq_worker, entry->data, entry->end,
entry->key, entry->keylen, &err);
if (errnum != 0) {
LOG_Log(LOG_WARNING, "Worker %d: Failed to send data "
"after reconnect: %s", wrk->id, err);
if (errnum > 0)
wrk->recoverables++;
else {
/* Fail after reconnect, give up */
wrk->fails++;
wrk->status = EXIT_FAILURE;
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]",
wrk->id, entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED);
}
}
}
}
}
if (err == NULL) {
if (errnum == 0) {
wrk->sends++;
MON_StatsUpdate(STATS_SENT);
LOG_Log(LOG_DEBUG, "Worker %d: Successfully sent data [%.*s]", wrk->id,
......@@ -182,7 +193,7 @@ static void
*wrk_main(void *arg)
{
worker_data_t *wrk = (worker_data_t *) arg;
void *amq_worker;
void *mq_worker;
dataentry *entry;
const char *err;
......@@ -190,7 +201,7 @@ static void
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING;
err = mqf.worker_init(&amq_worker, wrk->id);
err = mqf.worker_init(&mq_worker, wrk->id);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err);
......@@ -202,7 +213,7 @@ static void
pthread_exit((void *) wrk);
}
wrk_log_connection(amq_worker, wrk->id);
wrk_log_connection(mq_worker, wrk->id);
VSTAILQ_INIT(&wrk->wrk_freelist);
wrk->wrk_nfree = 0;
......@@ -215,7 +226,7 @@ static void
entry = SPMCQ_Deq();
if (entry != NULL) {
wrk->deqs++;
wrk_send(&amq_worker, entry, wrk);
wrk_send(&mq_worker, entry, wrk);
if (wrk->status == EXIT_FAILURE)
break;
......@@ -258,12 +269,12 @@ static void
/* Prepare to exit, drain the queue */
while ((entry = SPMCQ_Deq()) != NULL) {
wrk->deqs++;
wrk_send(&amq_worker, entry, wrk);
wrk_send(&mq_worker, entry, wrk);
}
wrk->status = EXIT_SUCCESS;
}
err = mqf.worker_shutdown(&amq_worker);
err = mqf.worker_shutdown(&mq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s",
wrk->id, err);
......@@ -318,7 +329,7 @@ WRK_Init(void)
wrk->magic = WORKER_DATA_MAGIC;
wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= wrk->restarts = 0;
= wrk->restarts = wrk->recoverables = 0;
wrk->state = WRK_NOTSTARTED;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment