Commit 579fba26 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: prior error states on a worker object are not

unrecoverable (since rdkafka attempts recovery); just log WARNINGs
parent 5d78e446
...@@ -9,8 +9,8 @@ Kafka implementation of the MQ interface for the Tracking Log Reader ...@@ -9,8 +9,8 @@ Kafka implementation of the MQ interface for the Tracking Log Reader
-------------------------------------------------------------------- --------------------------------------------------------------------
:Author: Geoffrey Simmons :Author: Geoffrey Simmons
:Date: 2014-05-31 :Date: 2014-06-01
:Version: 3.0 :Version: 3.0.0
:Manual section: 3 :Manual section: 3
...@@ -225,19 +225,34 @@ Log lines beginning with ``mq stats`` are generated by the MQ plugin, ...@@ -225,19 +225,34 @@ Log lines beginning with ``mq stats`` are generated by the MQ plugin,
and have the following form (possibly with additional formatting and and have the following form (possibly with additional formatting and
information from the logger):: information from the logger)::
mq stats (ID = <CLIENTID>): nokey=0 badkey=0 nodata=0 mq stats (ID = $CLIENTID): seen=2 produced=2 delivered=2 failed=0 nokey=0 badkey=0 nodata=0
mq stats summary: seen=47 produced=47 delivered=47 failed=0 nokey=0 badkey=0 nodata=0
``CLIENTID`` is the ID of the worker object (as returned from ``$CLIENTID`` is the ID of a worker object (as returned from
``MQ_ClientID()``). The statistics are all cumulative counters. ``MQ_ClientID()``), and the statistics in that line pertain to that
object. The line containing ``mq stats summary`` contains sums of the
stats for all worker objects.
The statistics are all cumulative counters:
===================== ========================================================== ===================== ==========================================================
Statistic Description Statistic Description
===================== ========================================================== ===================== ==========================================================
``nokey`` The number of ``MQ_Send()`` operations called for the ``seen`` The number of times that ``MQ_Send()`` was called
worker with no shard key. --------------------- ----------------------------------------------------------
``produced`` The number of successful invocations of the rdkafka
client library's "produce" operation
--------------------- ----------------------------------------------------------
``delivered`` The number of messages successfully delivered to a broker
--------------------- ----------------------------------------------------------
``failed`` The number of failures, either of "produce" or failed
deliveries to a broker
--------------------- ----------------------------------------------------------
``nokey`` The number of ``MQ_Send()`` operations called with no
shard key.
--------------------- ---------------------------------------------------------- --------------------- ----------------------------------------------------------
``badkey`` The number of send operations called with an illegal ``badkey`` The number of send operations called with an illegal
shard key (not a hex string). shard key (not a hex string in the first 8 bytes)
--------------------- ---------------------------------------------------------- --------------------- ----------------------------------------------------------
``nodata`` The number of send operations called with no message ``nodata`` The number of send operations called with no message
payload. payload.
...@@ -260,23 +275,25 @@ message to a broker is attempted, then the rdkafka library saves the ...@@ -260,23 +275,25 @@ message to a broker is attempted, then the rdkafka library saves the
error status in its internal state, but this ordinarily becomes known error status in its internal state, but this ordinarily becomes known
some time after the "produce" operation has been completed. some time after the "produce" operation has been completed.
The rdkafka library attempts error recovery on its own, for example by
restoring lost connections to brokers, and then retries the delivery
of messages that failed on prior attemepts.
This means that in ordinary operation, the plugin's ``MQ_Send()`` call This means that in ordinary operation, the plugin's ``MQ_Send()`` call
will not fail immediately if in fact it turns out that the message will not fail immediately if in fact it turns out that, on the first
cannot be delivered to a broker. attempt, the message cannot be delivered to a broker. The only
unrecoverable error for ``MQ_Send()`` occurs when the "produce"
operation fails immediately (such as when an rdkafka queue is full).
The messaging plugin polls the internal state of an rdkafka producer The messaging plugin polls the internal state of an rdkafka producer
associated with a worker object during ``MQ_Send()`` once before associated with a worker object during ``MQ_Send()`` once before
invoking the "produce" operation, once afterward, and also every time invoking the "produce" operation, once afterward, and also every time
rdkafka internal statistics are queried as described above. If a rdkafka internal statistics are queried as described above. If a prior
pending unrecoverable error state is determined during the call to error state is determined during the call to ``MQ_Send()``, then a log
``MQ_Send()``, then an unrecoverable error status is returned (also if message at level WARNING is generated. It should be understood these
the "produce" operation fails immediately); the tracking reader can warning messages describe an error that may have occurred at an
then engage its process for error recovery. It should be understood earlier point in time, and recovery may have already succeeded (which
that an unrecoverable error status from ``MQ_Send()`` does not can be ascertained from messages that appear earlier in the log).
necessarily indicate that delivery of the current message has failed
(unless it is due to failure of the "produce" operation), but rather
the delivery of a message submitted via ``MQ_Send()`` at an earlier
point in time.
SIGNALS SIGNALS
======= =======
......
...@@ -568,10 +568,6 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key, ...@@ -568,10 +568,6 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
kafka_wrk_t *wrk; kafka_wrk_t *wrk;
void *payload = NULL; void *payload = NULL;
/* XXX: error? */
if (len == 0)
return 0;
if (priv == NULL) { if (priv == NULL) {
MQ_LOG_Log(LOG_ERR, "MQ_Send() called with NULL worker object"); MQ_LOG_Log(LOG_ERR, "MQ_Send() called with NULL worker object");
*error = "MQ_Send() called with NULL worker object"; *error = "MQ_Send() called with NULL worker object";
...@@ -580,20 +576,20 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key, ...@@ -580,20 +576,20 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
CAST_OBJ(wrk, priv, KAFKA_WRK_MAGIC); CAST_OBJ(wrk, priv, KAFKA_WRK_MAGIC);
wrk->seen++; wrk->seen++;
/* XXX: error? */
if (len == 0) {
wrk->nodata++;
return 0;
}
/* Check for an error state */ /* Check for an error state */
rd_kafka_poll(wrk->kafka, 0); rd_kafka_poll(wrk->kafka, 0);
if (wrk->err) { if (wrk->err) {
snprintf(wrk->errmsg, LINE_MAX, "%s error state (%d): %s", MQ_LOG_Log(LOG_WARNING, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason); rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
MQ_LOG_Log(LOG_ERR, wrk->errmsg); wrk->err = 0;
*error = wrk->errmsg;
return -1;
} }
/*
* XXX
* Toggle log level DEBUG with signals
*/
if (key == NULL || keylen == 0) { if (key == NULL || keylen == 0) {
snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is missing", snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is missing",
rd_kafka_name(wrk->kafka)); rd_kafka_name(wrk->kafka));
...@@ -644,12 +640,11 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key, ...@@ -644,12 +640,11 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
/* Check for an error state again */ /* Check for an error state again */
rd_kafka_poll(wrk->kafka, 0); rd_kafka_poll(wrk->kafka, 0);
if (wrk->err) { if (wrk->err) {
snprintf(wrk->errmsg, LINE_MAX, "%s error state (%d): %s", MQ_LOG_Log(LOG_WARNING, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason); rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
MQ_LOG_Log(LOG_ERR, wrk->errmsg); wrk->err = 0;
*error = wrk->errmsg;
return -1;
} }
return 0; return 0;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment