Commit 31d6c08d authored by Geoff Simmons's avatar Geoff Simmons

Kafka MQ plugin: MQ_Send() polls rdkafka callbacks and fails on pending

  error states; wrk objects have private data for error states and stats;
  stats callback emits wrk object stats
parent 1440690d
...@@ -71,6 +71,11 @@ typedef struct kafka_wrk { ...@@ -71,6 +71,11 @@ typedef struct kafka_wrk {
rd_kafka_t *kafka; rd_kafka_t *kafka;
rd_kafka_topic_t *topic; rd_kafka_topic_t *topic;
int err; int err;
char reason[LINE_MAX]; /* errs from rdkafka callbacks */
char errmsg[LINE_MAX]; /* thread-safe return from MQ_*() */
unsigned nokey;
unsigned badkey;
unsigned nodata;
} kafka_wrk_t; } kafka_wrk_t;
static kafka_wrk_t **workers; static kafka_wrk_t **workers;
...@@ -110,10 +115,10 @@ dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, ...@@ -110,10 +115,10 @@ dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err,
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque; kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC); CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
strncpy(wrk->reason, rd_kafka_err2str(err), LINE_MAX);
MQ_LOG_Log(LOG_ERR, "Delivery error (client ID = %s, msg = [%.*s]): %s", MQ_LOG_Log(LOG_ERR, "Delivery error (client ID = %s, msg = [%.*s]): %s",
rd_kafka_name(rk), (int) len, (char *) payload, rd_kafka_name(rk), (int) len, (char *) payload, wrk->reason);
rd_kafka_err2str(err)); wrk->err = (int) err;
wrk->err = 1;
} }
else if (loglvl == LOG_DEBUG) else if (loglvl == LOG_DEBUG)
MQ_LOG_Log(LOG_DEBUG, "Delivered (client ID = %s): msg = [%.*s]", MQ_LOG_Log(LOG_DEBUG, "Delivered (client ID = %s): msg = [%.*s]",
...@@ -127,16 +132,19 @@ error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) ...@@ -127,16 +132,19 @@ error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC); CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_ERR, "Client error (ID = %s) %d: %s", rd_kafka_name(rk), err, MQ_LOG_Log(LOG_ERR, "Client error (ID = %s) %d: %s", rd_kafka_name(rk), err,
reason); reason);
wrk->err = 1; wrk->err = err;
strncpy(wrk->reason, reason, LINE_MAX);
} }
static int static int
stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{ {
(void) opaque; kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_INFO, "Client stats (ID = %s): %.*s", rd_kafka_name(rk), MQ_LOG_Log(LOG_INFO, "rdkafka stats (ID = %s): %.*s", rd_kafka_name(rk),
(int) json_len, json); (int) json_len, json);
MQ_LOG_Log(LOG_INFO, "mq stats (ID = %s): nokey=%u badkey=%u nodata=%u",
rd_kafka_name(rk), wrk->nokey, wrk->badkey, wrk->nodata);
return 0; return 0;
} }
...@@ -154,22 +162,23 @@ partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, ...@@ -154,22 +162,23 @@ partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
(void) rkt_opaque; (void) rkt_opaque;
(void) msg_opaque; (void) msg_opaque;
assert(partition_cnt > 0);
assert(keylen <= 8);
strncpy(keystr, (const char *) keydata, keylen); strncpy(keystr, (const char *) keydata, keylen);
keystr[keylen] = '\0'; keystr[keylen] = '\0';
errno = 0; errno = 0;
key = strtoul(keystr, &endptr, 16); key = strtoul(keystr, &endptr, 16);
if (errno != 0 || *endptr != '\0' || key > 0xffffffffUL) { if (errno != 0 || *endptr != '\0' || key > 0xffffffffUL) {
/* XXX: should use opaque objs to identify msg & maybe wrk obj */
MQ_LOG_Log(LOG_ERR, "Cannot parse partition key: %.*s", (int) keylen, MQ_LOG_Log(LOG_ERR, "Cannot parse partition key: %.*s", (int) keylen,
(const char *) keydata); (const char *) keydata);
return RD_KAFKA_PARTITION_UA; return RD_KAFKA_PARTITION_UA;
} }
if (partition_cnt != 0 && (partition_cnt & (partition_cnt - 1)) == 0) if ((partition_cnt & (partition_cnt - 1)) == 0)
/* partition_cnt is a power of 2 */ /* partition_cnt is a power of 2 */
partition = key & (partition_cnt - 1); partition = key & (partition_cnt - 1);
else else
partition = key % partition_cnt; partition = key % partition_cnt;
assert(partition >= 0);
if (! rd_kafka_topic_partition_available(rkt, partition)) { if (! rd_kafka_topic_partition_available(rkt, partition)) {
MQ_LOG_Log(LOG_ERR, "Partition %d not available", partition); MQ_LOG_Log(LOG_ERR, "Partition %d not available", partition);
...@@ -185,13 +194,15 @@ partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, ...@@ -185,13 +194,15 @@ partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
static const char static const char
*wrk_init(int wrk_num) *wrk_init(int wrk_num)
{ {
char clientid[sizeof("libtrackrdr-kafka-worker-99999")]; char clientid[sizeof("libtrackrdr-kafka-worker 2147483648")];
rd_kafka_conf_t *wrk_conf; rd_kafka_conf_t *wrk_conf;
rd_kafka_topic_conf_t *wrk_topic_conf; rd_kafka_topic_conf_t *wrk_topic_conf;
rd_kafka_t *rk; rd_kafka_t *rk;
rd_kafka_topic_t *rkt; rd_kafka_topic_t *rkt;
kafka_wrk_t *wrk; kafka_wrk_t *wrk;
assert(wrk_num >= 0 && wrk_num < nwrk);
wrk_conf = rd_kafka_conf_dup(conf); wrk_conf = rd_kafka_conf_dup(conf);
wrk_topic_conf = rd_kafka_topic_conf_dup(topic_conf); wrk_topic_conf = rd_kafka_topic_conf_dup(topic_conf);
sprintf(clientid, "libtrackrdr-kafka-worker %d", wrk_num); sprintf(clientid, "libtrackrdr-kafka-worker %d", wrk_num);
...@@ -235,8 +246,12 @@ static const char ...@@ -235,8 +246,12 @@ static const char
wrk->kafka = rk; wrk->kafka = rk;
wrk->topic = rkt; wrk->topic = rkt;
wrk->err = 0; wrk->err = 0;
wrk->errmsg[0] = '\0';
wrk->reason[0] = '\0';
wrk->nokey = wrk->badkey = wrk->nodata = 0;
workers[wrk_num] = wrk; workers[wrk_num] = wrk;
MQ_LOG_Log(LOG_INFO, "initialized worker %d", wrk_num); MQ_LOG_Log(LOG_INFO, "initialized worker %d: %s", wrk_num,
rd_kafka_name(wrk->kafka));
rd_kafka_poll(wrk->kafka, 0); rd_kafka_poll(wrk->kafka, 0);
return NULL; return NULL;
} }
...@@ -570,7 +585,6 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key, ...@@ -570,7 +585,6 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
{ {
kafka_wrk_t *wrk; kafka_wrk_t *wrk;
void *payload = NULL; void *payload = NULL;
int ret = 0;
/* XXX: error? */ /* XXX: error? */
if (len == 0) if (len == 0)
...@@ -578,33 +592,43 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key, ...@@ -578,33 +592,43 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
if (priv == NULL) { if (priv == NULL) {
MQ_LOG_Log(LOG_ERR, "MQ_Send() called with NULL worker object"); MQ_LOG_Log(LOG_ERR, "MQ_Send() called with NULL worker object");
strncpy(errmsg, "Worker object is NULL", LINE_MAX); *error = "MQ_Send() called with NULL worker object";
*error = errmsg;
return -1; return -1;
} }
CAST_OBJ(wrk, priv, KAFKA_WRK_MAGIC); CAST_OBJ(wrk, priv, KAFKA_WRK_MAGIC);
/* Check for an error state */
rd_kafka_poll(wrk->kafka, 0);
if (wrk->err) {
snprintf(wrk->errmsg, LINE_MAX, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
*error = wrk->errmsg;
return -1;
}
/* /*
* XXX * XXX
* Increment stats counters on error, so that they can be monitored
* Toggle log level DEBUG with signals * Toggle log level DEBUG with signals
*/ */
if (key == NULL || keylen == 0) { if (key == NULL || keylen == 0) {
snprintf(errmsg, LINE_MAX, "%s message shard key is missing", snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is missing",
rd_kafka_name(wrk->kafka)); rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_ERR, errmsg); MQ_LOG_Log(LOG_ERR, wrk->errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=", rd_kafka_name(wrk->kafka), MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=", rd_kafka_name(wrk->kafka),
len, data); len, data);
*error = errmsg; wrk->nokey++;
*error = wrk->errmsg;
return 1; return 1;
} }
if (data == NULL) { if (data == NULL) {
snprintf(errmsg, LINE_MAX, "%s message payload is NULL", snprintf(wrk->errmsg, LINE_MAX, "%s message payload is NULL",
rd_kafka_name(wrk->kafka)); rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_DEBUG, "%s data= key=[%.*s]", rd_kafka_name(wrk->kafka), MQ_LOG_Log(LOG_DEBUG, "%s data= key=[%.*s]", rd_kafka_name(wrk->kafka),
keylen, key); keylen, key);
MQ_LOG_Log(LOG_ERR, errmsg); MQ_LOG_Log(LOG_ERR, wrk->errmsg);
*error = errmsg; wrk->nodata++;
*error = wrk->errmsg;
return 1; return 1;
} }
...@@ -612,26 +636,37 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key, ...@@ -612,26 +636,37 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
keylen = 8; keylen = 8;
for (int i = 0; i < keylen; i++) for (int i = 0; i < keylen; i++)
if (!isxdigit(key[i])) { if (!isxdigit(key[i])) {
snprintf(errmsg, LINE_MAX, "%s message shard key is not hex", snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is not hex",
rd_kafka_name(wrk->kafka)); rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_ERR, errmsg); MQ_LOG_Log(LOG_ERR, wrk->errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=[%.*s]", MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=[%.*s]",
rd_kafka_name(wrk->kafka), len, data, keylen, key); rd_kafka_name(wrk->kafka), len, data, keylen, key);
*error = errmsg; *error = wrk->errmsg;
wrk->badkey++;
return 1; return 1;
} }
REPLACE(payload, data); REPLACE(payload, data);
if (rd_kafka_produce(wrk->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, if (rd_kafka_produce(wrk->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE,
payload, len, key, keylen, NULL) == -1) { payload, len, key, keylen, NULL) == -1) {
snprintf(errmsg, LINE_MAX, rd_kafka_err2str(rd_kafka_errno2err(errno))); snprintf(wrk->errmsg, LINE_MAX,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
MQ_LOG_Log(LOG_ERR, "%s message send failure (%d): %s", MQ_LOG_Log(LOG_ERR, "%s message send failure (%d): %s",
rd_kafka_name(wrk->kafka), errno, errmsg); rd_kafka_name(wrk->kafka), errno, wrk->errmsg);
*error = errmsg; *error = wrk->errmsg;
ret = -1; return -1;
} }
/* Check for an error state again */
rd_kafka_poll(wrk->kafka, 0); rd_kafka_poll(wrk->kafka, 0);
return ret; if (wrk->err) {
snprintf(wrk->errmsg, LINE_MAX, "%s error state (%d): %s",
rd_kafka_name(wrk->kafka), wrk->err, wrk->reason);
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
*error = wrk->errmsg;
return -1;
}
return 0;
} }
const char * const char *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment