Commit 4e034900 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: prior error states on a worker object are not

unrecoverable (since rdkafka attempts recovery); just log WARNINGs
parent 9af1fa30
......@@ -9,8 +9,8 @@ Kafka implementation of the MQ interface for the Tracking Log Reader
--------------------------------------------------------------------
:Author: Geoffrey Simmons
:Date: 2014-05-31
:Version: 3.0
:Date: 2014-06-01
:Version: 3.0.0
:Manual section: 3
......@@ -225,19 +225,34 @@ Log lines beginning with ``mq stats`` are generated by the MQ plugin,
and have the following form (possibly with additional formatting and
information from the logger)::
mq stats (ID = <CLIENTID>): nokey=0 badkey=0 nodata=0
mq stats (ID = $CLIENTID): seen=2 produced=2 delivered=2 failed=0 nokey=0 badkey=0 nodata=0
mq stats summary: seen=47 produced=47 delivered=47 failed=0 nokey=0 badkey=0 nodata=0
``CLIENTID`` is the ID of the worker object (as returned from
``MQ_ClientID()``). The statistics are all cumulative counters.
``$CLIENTID`` is the ID of a worker object (as returned from
``MQ_ClientID()``), and the statistics in that line pertain to that
object. The line containing ``mq stats summary`` contains sums of the
stats for all worker objects.
The statistics are all cumulative counters:
===================== ==========================================================
Statistic Description
===================== ==========================================================
``nokey`` The number of ``MQ_Send()`` operations called for the
worker with no shard key.
``seen`` The number of times that ``MQ_Send()`` was called
--------------------- ----------------------------------------------------------
``produced`` The number of successful invocations of the rdkafka
client library's "produce" operation
--------------------- ----------------------------------------------------------
``delivered`` The number of messages successfully delivered to a broker
--------------------- ----------------------------------------------------------
``failed`` The number of failures, either of "produce" or failed
deliveries to a broker
--------------------- ----------------------------------------------------------
``nokey`` The number of ``MQ_Send()`` operations called with no
shard key.
--------------------- ----------------------------------------------------------
``badkey`` The number of send operations called with an illegal
shard key (not a hex string).
shard key (not a hex string in the first 8 bytes)
--------------------- ----------------------------------------------------------
``nodata`` The number of send operations called with no message
payload.
......@@ -260,23 +275,25 @@ message to a broker is attempted, then the rdkafka library saves the
error status in its internal state, but this ordinarily becomes known
some time after the "produce" operation has been completed.
The rdkafka library attempts error recovery on its own, for example by
restoring lost connections to brokers, and then retries the delivery
of messages that failed on prior attemepts.
This means that in ordinary operation, the plugin's ``MQ_Send()`` call
will not fail immediately if in fact it turns out that the message
cannot be delivered to a broker.
will not fail immediately if in fact it turns out that, on the first
attempt, the message cannot be delivered to a broker. The only
unrecoverable error for ``MQ_Send()`` occurs when the "produce"
operation fails immediately (such as when an rdkafka queue is full).
The messaging plugin polls the internal state of an rdkafka producer
associated with a worker object during ``MQ_Send()`` once before
invoking the "produce" operation, once afterward, and also every time
rdkafka internal statistics are queried as described above. If a
pending unrecoverable error state is determined during the call to
``MQ_Send()``, then an unrecoverable error status is returned (also if
the "produce" operation fails immediately); the tracking reader can
then engage its process for error recovery. It should be understood
that an unrecoverable error status from ``MQ_Send()`` does not
necessarily indicate that delivery of the current message has failed
(unless it is due to failure of the "produce" operation), but rather
the delivery of a message submitted via ``MQ_Send()`` at an earlier
point in time.
rdkafka internal statistics are queried as described above. If a prior
error state is determined during the call to ``MQ_Send()``, then a log
message at level WARNING is generated. It should be understood these
warning messages describe an error that may have occurred at an
earlier point in time, and recovery may have already succeeded (which
can be ascertained from messages that appear earlier in the log).
SIGNALS
=======
......
......@@ -568,10 +568,6 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
kafka_wrk_t *wrk;
void *payload = NULL;
/* XXX: error? */
if (len == 0)
return 0;
if (priv == NULL) {
MQ_LOG_Log(LOG_ERR, "MQ_Send() called with NULL worker object");
*error = "MQ_Send() called with NULL worker object";
......@@ -580,20 +576,20 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
CAST_OBJ(wrk, priv, KAFKA_WRK_MAGIC);
wrk->seen++;
/* XXX: error? */
if (len == 0) {
wrk->nodata++;
return 0;
}
/* Check for an error state */
rd_kafka_poll(wrk->kafka, 0);
if (wrk->err) {
snprintf(wrk->errmsg, LINE_MAX, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
*error = wrk->errmsg;
return -1;
MQ_LOG_Log(LOG_WARNING, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
wrk->err = 0;
}
/*
* XXX
* Toggle log level DEBUG with signals
*/
if (key == NULL || keylen == 0) {
snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is missing",
rd_kafka_name(wrk->kafka));
......@@ -644,12 +640,11 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
/* Check for an error state again */
rd_kafka_poll(wrk->kafka, 0);
if (wrk->err) {
snprintf(wrk->errmsg, LINE_MAX, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
*error = wrk->errmsg;
return -1;
MQ_LOG_Log(LOG_WARNING, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
wrk->err = 0;
}
return 0;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment