Commit 51cf99a4 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: only generate error messages in the callbacks when

appropriate -- the additional WARNING in MQ_Send() is superfluous
parent 6116b4fb
...@@ -290,10 +290,10 @@ associated with a worker object during ``MQ_Send()`` once before ...@@ -290,10 +290,10 @@ associated with a worker object during ``MQ_Send()`` once before
invoking the "produce" operation, once afterward, and also every time invoking the "produce" operation, once afterward, and also every time
rdkafka internal statistics are queried as described above. If a prior rdkafka internal statistics are queried as described above. If a prior
error state is determined during the call to ``MQ_Send()``, then a log error state is determined during the call to ``MQ_Send()``, then a log
message at level WARNING is generated. It should be understood these message at level ERROR is generated. It should be understood these
warning messages describe an error that may have occurred at an messages describe an error that may have occurred at an earlier point
earlier point in time, and recovery may have already succeeded (which in time, and recovery may have already succeeded (which can be
can be ascertained from messages that appear earlier in the log). ascertained from messages that appear earlier in the log).
SIGNALS SIGNALS
======= =======
......
...@@ -124,10 +124,10 @@ dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, ...@@ -124,10 +124,10 @@ dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err,
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC); CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
strncpy(wrk->reason, rd_kafka_err2str(err), LINE_MAX); MQ_LOG_Log(LOG_ERR,
MQ_LOG_Log(LOG_ERR, "Delivery error (client ID = %s, msg = [%.*s]): %s", "Delivery error %d (client ID = %s, msg = [%.*s]): %s",
rd_kafka_name(rk), (int) len, (char *) payload, wrk->reason); err, rd_kafka_name(rk), (int) len, (char *) payload,
wrk->err = (int) err; rd_kafka_err2str(err));
wrk->failed++; wrk->failed++;
} }
else { else {
...@@ -141,12 +141,10 @@ dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, ...@@ -141,12 +141,10 @@ dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err,
static void static void
error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{ {
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque; (void) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_ERR, "Client error (ID = %s) %d: %s", rd_kafka_name(rk), err, MQ_LOG_Log(LOG_ERR, "Client error (ID = %s) %d: %s", rd_kafka_name(rk), err,
reason); reason);
wrk->err = err;
strncpy(wrk->reason, reason, LINE_MAX);
} }
static int static int
...@@ -261,9 +259,7 @@ static const char ...@@ -261,9 +259,7 @@ static const char
wrk-> n = wrk_num; wrk-> n = wrk_num;
wrk->kafka = rk; wrk->kafka = rk;
wrk->topic = rkt; wrk->topic = rkt;
wrk->err = 0;
wrk->errmsg[0] = '\0'; wrk->errmsg[0] = '\0';
wrk->reason[0] = '\0';
wrk->seen = wrk->produced = wrk->delivered = wrk->failed = wrk->nokey wrk->seen = wrk->produced = wrk->delivered = wrk->failed = wrk->nokey
= wrk->badkey = wrk->nodata = 0; = wrk->badkey = wrk->nodata = 0;
workers[wrk_num] = wrk; workers[wrk_num] = wrk;
...@@ -582,13 +578,7 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key, ...@@ -582,13 +578,7 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
return 0; return 0;
} }
/* Check for an error state */
rd_kafka_poll(wrk->kafka, 0); rd_kafka_poll(wrk->kafka, 0);
if (wrk->err) {
MQ_LOG_Log(LOG_WARNING, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
wrk->err = 0;
}
if (key == NULL || keylen == 0) { if (key == NULL || keylen == 0) {
snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is missing", snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is missing",
...@@ -635,16 +625,9 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key, ...@@ -635,16 +625,9 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
*error = wrk->errmsg; *error = wrk->errmsg;
return -1; return -1;
} }
wrk->produced++;
/* Check for an error state again */ wrk->produced++;
rd_kafka_poll(wrk->kafka, 0); rd_kafka_poll(wrk->kafka, 0);
if (wrk->err) {
MQ_LOG_Log(LOG_WARNING, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
wrk->err = 0;
}
return 0; return 0;
} }
......
...@@ -43,8 +43,6 @@ typedef struct kafka_wrk { ...@@ -43,8 +43,6 @@ typedef struct kafka_wrk {
int n; int n;
rd_kafka_t *kafka; rd_kafka_t *kafka;
rd_kafka_topic_t *topic; rd_kafka_topic_t *topic;
int err;
char reason[LINE_MAX]; /* errs from rdkafka callbacks */
char errmsg[LINE_MAX]; /* thread-safe return from MQ_*() */ char errmsg[LINE_MAX]; /* thread-safe return from MQ_*() */
unsigned long seen; unsigned long seen;
unsigned long produced; unsigned long produced;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment