Commit 9a5d149b authored by Geoff Simmons's avatar Geoff Simmons

add the log_error_data config parameter, based on a patch from

Thilo Keber (thx Thilo)
parent 3c448e40
......@@ -174,6 +174,13 @@ Parameter Description
indefinitely for message delivery, but don't
wait for rdkafka finalization. (optional,
default 1000 ms)
----------------------------------- --------------------------------------------
``log_error_data`` Boolean. If false, only the error message is
logged for missing or illegal shard keys, or
missing message payloads. If true, the
offending message is also logged (an empty
field in the case of the missing payload).
(optional, default false)
=================================== ============================================
Except as noted below, the configuration can specify any parameters for
......
......@@ -35,6 +35,7 @@
#include <errno.h>
#include <syslog.h>
#include <stdlib.h>
#include <stdbool.h>
#include "mq_kafka.h"
......@@ -70,6 +71,7 @@ CONF_Init(void)
zoolog[0] = '\0';
brokerlist[0] = '\0';
wrk_shutdown_timeout = 1000;
log_error_data = false;
}
int
......@@ -134,6 +136,23 @@ CONF_Add(const char *lval, const char *rval)
return EINVAL;
return(0);
}
if (strcmp(lval, "log_error_data") == 0) {
if (strcasecmp(rval, "true") == 0
|| strcasecmp(rval, "on") == 0
|| strcasecmp(rval, "yes") == 0
|| strcmp(rval, "1") == 0) {
log_error_data = true;
return(0);
}
if (strcasecmp(rval, "false") == 0
|| strcasecmp(rval, "off") == 0
|| strcasecmp(rval, "no") == 0
|| strcmp(rval, "0") == 0) {
log_error_data = false;
return(0);
}
return(EINVAL);
}
result = rd_kafka_topic_conf_set(topic_conf, lval, rval, errstr, LINE_MAX);
if (result == RD_KAFKA_CONF_UNKNOWN)
......@@ -152,6 +171,7 @@ CONF_Dump(void)
MQ_LOG_Log(LOG_DEBUG, "zookeeper.timeout = %u", zoo_timeout);
MQ_LOG_Log(LOG_DEBUG, "zookeeper.log = %s", zoolog);
MQ_LOG_Log(LOG_DEBUG, "topic = %s", topic);
MQ_LOG_Log(LOG_DEBUG, "worker.shutdown.timeout.ms = %u",
wrk_shutdown_timeout);
MQ_LOG_Log(LOG_DEBUG, "worker.shutdown.timeout.ms = %u", wrk_shutdown_timeout);
MQ_LOG_Log(LOG_DEBUG, "log_error_data = %s",
log_error_data ? "true" : "false");
}
......@@ -293,9 +293,14 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
if (key == NULL || keylen == 0) {
snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is missing",
rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=", rd_kafka_name(wrk->kafka),
len, data);
if (log_error_data) {
MQ_LOG_Log(LOG_ERR, "%s: data=[%.*s] key=", wrk->errmsg, len, data);
}
else {
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=",
rd_kafka_name(wrk->kafka), len, data);
}
wrk->nokey++;
*error = wrk->errmsg;
return 1;
......@@ -303,9 +308,15 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
if (data == NULL) {
snprintf(wrk->errmsg, LINE_MAX, "%s message payload is NULL",
rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_DEBUG, "%s data= key=[%.*s]", rd_kafka_name(wrk->kafka),
keylen, key);
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
if (log_error_data) {
MQ_LOG_Log(LOG_ERR, "%s: data= key=[%.*s]", wrk->errmsg, keylen,
key);
}
else {
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data= key=[%.*s]",
rd_kafka_name(wrk->kafka), keylen, key);
}
wrk->nodata++;
*error = wrk->errmsg;
return 1;
......@@ -317,9 +328,16 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
if (!isxdigit(key[i])) {
snprintf(wrk->errmsg, LINE_MAX, "%s message shard key is not hex",
rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=[%.*s]",
rd_kafka_name(wrk->kafka), len, data, keylen, key);
if (log_error_data) {
MQ_LOG_Log(LOG_ERR, "%s: data=[%.*s] key=[%.*s]", wrk->errmsg,
len, data, keylen, key);
}
else {
MQ_LOG_Log(LOG_ERR, wrk->errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=[%.*s]",
rd_kafka_name(wrk->kafka), len, data, keylen, key);
}
*error = wrk->errmsg;
wrk->badkey++;
return 1;
......
......@@ -34,6 +34,8 @@
#include <librdkafka/rdkafka.h>
#include <syslog.h>
#define AZ(foo) do { assert((foo) == 0); } while (0)
#define AN(foo) do { assert((foo) != 0); } while (0)
......@@ -66,6 +68,7 @@ char zoolog[PATH_MAX];
unsigned zoo_timeout;
unsigned stats_interval;
unsigned wrk_shutdown_timeout;
unsigned log_error_data;
rd_kafka_topic_conf_t *topic_conf;
rd_kafka_conf_t *conf;
......
......@@ -4,5 +4,6 @@ zookeeper.connect = localhost:2181
zookeeper.connection.timeout.ms = 10000
zookeeper.log = zoo.log
topic = libtrackrdr_kafka_test
log_level = 7
log_level = 3
debug = all
log_error_data = true
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment