Commit c84abb64 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: add stats seen, produced, delivered and failed,

and add a summary stats line to the log
parent 05b21bc1
......@@ -40,6 +40,7 @@
static pthread_t monitor;
static int run = 0;
static unsigned seen, produced, delivered, failed, nokey, badkey, nodata;
/* Call rd_kafka_poll() for each worker to provoke callbacks */
static void
......@@ -50,6 +51,13 @@ poll_workers(void)
kafka_wrk_t *wrk = workers[i];
CHECK_OBJ(wrk, KAFKA_WRK_MAGIC);
rd_kafka_poll(wrk->kafka, 0);
seen += wrk->seen;
produced += wrk->produced;
delivered += wrk->delivered;
failed += wrk->failed;
nokey += wrk->nokey;
badkey += wrk->badkey;
nodata += wrk->nodata;
}
}
......@@ -89,13 +97,17 @@ static void
continue;
}
else {
MQ_LOG_Log(LOG_ERR, "libtrackrdr-kafka monitoring thread: %s\n",
MQ_LOG_Log(LOG_ERR, "libtrackrdr-kafka monitoring thread: %s",
strerror(errno));
err = errno;
pthread_exit(&err);
}
}
seen = produced = delivered = failed = nokey = badkey = nodata = 0;
poll_workers();
MQ_LOG_Log(LOG_INFO, "mq stats summary: seen=%u produced=%u "
"delivered=%u failed=%u nokey=%u badkey=%u nodata=%u",
seen, produced, delivered, failed, nokey, badkey, nodata);
}
pthread_cleanup_pop(0);
......
......@@ -120,18 +120,22 @@ dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err,
void *opaque, void *msg_opaque)
{
(void) msg_opaque;
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
strncpy(wrk->reason, rd_kafka_err2str(err), LINE_MAX);
MQ_LOG_Log(LOG_ERR, "Delivery error (client ID = %s, msg = [%.*s]): %s",
rd_kafka_name(rk), (int) len, (char *) payload, wrk->reason);
wrk->err = (int) err;
wrk->failed++;
}
else if (loglvl == LOG_DEBUG)
else {
wrk->delivered++;
if (loglvl == LOG_DEBUG)
MQ_LOG_Log(LOG_DEBUG, "Delivered (client ID = %s): msg = [%.*s]",
rd_kafka_name(rk), (int) len, (char *) payload);
}
}
static void
......@@ -152,8 +156,11 @@ stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_INFO, "rdkafka stats (ID = %s): %.*s", rd_kafka_name(rk),
(int) json_len, json);
MQ_LOG_Log(LOG_INFO, "mq stats (ID = %s): nokey=%u badkey=%u nodata=%u",
rd_kafka_name(rk), wrk->nokey, wrk->badkey, wrk->nodata);
MQ_LOG_Log(LOG_INFO,
"mq stats (ID = %s): seen=%u produced=%u delivered=%u failed=%u "
"nokey=%u badkey=%u nodata=%u",
rd_kafka_name(rk), wrk->seen, wrk->produced, wrk->delivered,
wrk->failed, wrk->nokey, wrk->badkey, wrk->nodata);
return 0;
}
......@@ -257,7 +264,8 @@ static const char
wrk->err = 0;
wrk->errmsg[0] = '\0';
wrk->reason[0] = '\0';
wrk->nokey = wrk->badkey = wrk->nodata = 0;
wrk->seen = wrk->produced = wrk->delivered = wrk->failed = wrk->nokey
= wrk->badkey = wrk->nodata = 0;
workers[wrk_num] = wrk;
MQ_LOG_Log(LOG_INFO, "initialized worker %d: %s", wrk_num,
rd_kafka_name(wrk->kafka));
......@@ -570,6 +578,7 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
return -1;
}
CAST_OBJ(wrk, priv, KAFKA_WRK_MAGIC);
wrk->seen++;
/* Check for an error state */
rd_kafka_poll(wrk->kafka, 0);
......@@ -630,6 +639,7 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
*error = wrk->errmsg;
return -1;
}
wrk->produced++;
/* Check for an error state again */
rd_kafka_poll(wrk->kafka, 0);
......
......@@ -46,6 +46,10 @@ typedef struct kafka_wrk {
int err;
char reason[LINE_MAX]; /* errs from rdkafka callbacks */
char errmsg[LINE_MAX]; /* thread-safe return from MQ_*() */
unsigned seen;
unsigned produced;
unsigned delivered;
unsigned failed;
unsigned nokey;
unsigned badkey;
unsigned nodata;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment