Commit df698e43 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: encapsulate all callbacks

parent 2e1774fb
......@@ -37,6 +37,7 @@
#include <syslog.h>
#include "mq_kafka.h"
#include "miniobj.h"
/*
* Partitioner assumes that the key string is an unsigned 32-bit
......@@ -78,3 +79,57 @@ CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
(int) keylen, (const char *) keydata);
return partition;
}
void
CB_Log(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
(void) fac;
MQ_LOG_Log(level, "rdkafka %s: %s", rd_kafka_name(rk), buf);
}
void
CB_DeliveryReport(rd_kafka_t *rk, void *payload, size_t len,
rd_kafka_resp_err_t err, void *opaque, void *msg_opaque)
{
(void) msg_opaque;
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
MQ_LOG_Log(LOG_ERR,
"Delivery error %d (client ID = %s, msg = [%.*s]): %s",
err, rd_kafka_name(rk), (int) len, (char *) payload,
rd_kafka_err2str(err));
wrk->failed++;
}
else {
wrk->delivered++;
if (loglvl == LOG_DEBUG)
MQ_LOG_Log(LOG_DEBUG, "Delivered (client ID = %s): msg = [%.*s]",
rd_kafka_name(rk), (int) len, (char *) payload);
}
}
void
CB_Error(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
(void) opaque;
MQ_LOG_Log(LOG_ERR, "Client error (ID = %s) %d: %s", rd_kafka_name(rk), err,
reason);
}
int
CB_Stats(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_INFO, "rdkafka stats (ID = %s): %.*s", rd_kafka_name(rk),
(int) json_len, json);
MQ_LOG_Log(LOG_INFO,
"mq stats (ID = %s): seen=%u produced=%u delivered=%u failed=%u "
"nokey=%u badkey=%u nodata=%u",
rd_kafka_name(rk), wrk->seen, wrk->produced, wrk->delivered,
wrk->failed, wrk->nokey, wrk->badkey, wrk->nodata);
return 0;
}
......@@ -105,60 +105,6 @@ toggle_debug(int sig)
}
}
static void
log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
(void) fac;
MQ_LOG_Log(level, "rdkafka %s: %s", rd_kafka_name(rk), buf);
}
static void
dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err,
void *opaque, void *msg_opaque)
{
(void) msg_opaque;
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
MQ_LOG_Log(LOG_ERR,
"Delivery error %d (client ID = %s, msg = [%.*s]): %s",
err, rd_kafka_name(rk), (int) len, (char *) payload,
rd_kafka_err2str(err));
wrk->failed++;
}
else {
wrk->delivered++;
if (loglvl == LOG_DEBUG)
MQ_LOG_Log(LOG_DEBUG, "Delivered (client ID = %s): msg = [%.*s]",
rd_kafka_name(rk), (int) len, (char *) payload);
}
}
static void
error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
(void) opaque;
MQ_LOG_Log(LOG_ERR, "Client error (ID = %s) %d: %s", rd_kafka_name(rk), err,
reason);
}
static int
stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_INFO, "rdkafka stats (ID = %s): %.*s", rd_kafka_name(rk),
(int) json_len, json);
MQ_LOG_Log(LOG_INFO,
"mq stats (ID = %s): seen=%u produced=%u delivered=%u failed=%u "
"nokey=%u badkey=%u nodata=%u",
rd_kafka_name(rk), wrk->seen, wrk->produced, wrk->delivered,
wrk->failed, wrk->nokey, wrk->badkey, wrk->nodata);
return 0;
}
static int
conf_add(const char *lval, const char *rval)
{
......@@ -328,10 +274,10 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
}
rd_kafka_conf_set_dr_cb(conf, dr_cb);
rd_kafka_conf_set_error_cb(conf, error_cb);
rd_kafka_conf_set_log_cb(conf, log_cb);
rd_kafka_conf_set_stats_cb(conf, stats_cb);
rd_kafka_conf_set_dr_cb(conf, CB_DeliveryReport);
rd_kafka_conf_set_error_cb(conf, CB_Error);
rd_kafka_conf_set_log_cb(conf, CB_Log);
rd_kafka_conf_set_stats_cb(conf, CB_Stats);
rd_kafka_topic_conf_set_partitioner_cb(topic_conf, CB_Partitioner);
if (loglvl == LOG_DEBUG) {
......
......@@ -87,3 +87,8 @@ void WRK_Fini(kafka_wrk_t *wrk);
int32_t CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata,
size_t keylen, int32_t partition_cnt, void *rkt_opaque,
void *msg_opaque);
void CB_Log(const rd_kafka_t *rk, int level, const char *fac, const char *buf);
void CB_DeliveryReport(rd_kafka_t *rk, void *payload, size_t len,
rd_kafka_resp_err_t err, void *opaque, void *msg_opaque);
void CB_Error(rd_kafka_t *rk, int err, const char *reason, void *opaque);
int CB_Stats(rd_kafka_t *rk, char *json, size_t json_len, void *opaque);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment