Commit 2e1774fb authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka refactoring: encapsulate worker functions and the

partitioner callback
parent 51cf99a4
......@@ -14,6 +14,8 @@ libtrackrdr_kafka_la_SOURCES = \
log.c \
monitor.c \
zookeeper.c \
worker.c \
callback.c \
$(top_srcdir)/src/config_common.c
libtrackrdr_kafka_la_LIBADD = \
......
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/* Callbacks used by rdkafka */
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <syslog.h>
#include "mq_kafka.h"
/*
* Partitioner assumes that the key string is an unsigned 32-bit
* hexadecimal.
*/
int32_t
CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
int32_t partition_cnt, void *rkt_opaque, void *msg_opaque)
{
int32_t partition;
unsigned long key;
char keystr[sizeof("ffffffff")], *endptr = NULL;
(void) rkt_opaque;
(void) msg_opaque;
assert(partition_cnt > 0);
assert(keylen <= 8);
strncpy(keystr, (const char *) keydata, keylen);
keystr[keylen] = '\0';
errno = 0;
key = strtoul(keystr, &endptr, 16);
if (errno != 0 || *endptr != '\0' || key > 0xffffffffUL) {
MQ_LOG_Log(LOG_ERR, "Cannot parse partition key: %.*s", (int) keylen,
(const char *) keydata);
return RD_KAFKA_PARTITION_UA;
}
if ((partition_cnt & (partition_cnt - 1)) == 0)
/* partition_cnt is a power of 2 */
partition = key & (partition_cnt - 1);
else
partition = key % partition_cnt;
if (! rd_kafka_topic_partition_available(rkt, partition)) {
MQ_LOG_Log(LOG_ERR, "Partition %d not available", partition);
return RD_KAFKA_PARTITION_UA;
}
MQ_LOG_Log(LOG_DEBUG, "Computed partition %d for key %.*s", partition,
(int) keylen, (const char *) keydata);
return partition;
}
......@@ -30,7 +30,6 @@
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <strings.h>
......@@ -68,7 +67,6 @@ static char brokerlist[LINE_MAX] = "";
static char zoolog[PATH_MAX] = "";
static unsigned zoo_timeout = 0;
static char topic[LINE_MAX] = "";
static rd_kafka_topic_conf_t *topic_conf;
static rd_kafka_conf_t *conf;
......@@ -77,7 +75,6 @@ static unsigned stats_interval = 0;
static char errmsg[LINE_MAX];
static char _version[LINE_MAX];
static int loglvl = LOG_INFO;
static int saved_lvl = LOG_INFO;
static int debug_toggle = 0;
struct sigaction toggle_action;
......@@ -162,135 +159,6 @@ stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
return 0;
}
/*
* Partitioner assumes that the key string is an unsigned 32-bit
* hexadecimal.
*/
static int32_t
partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
int32_t partition_cnt, void *rkt_opaque, void *msg_opaque)
{
int32_t partition;
unsigned long key;
char keystr[sizeof("ffffffff")], *endptr = NULL;
(void) rkt_opaque;
(void) msg_opaque;
assert(partition_cnt > 0);
assert(keylen <= 8);
strncpy(keystr, (const char *) keydata, keylen);
keystr[keylen] = '\0';
errno = 0;
key = strtoul(keystr, &endptr, 16);
if (errno != 0 || *endptr != '\0' || key > 0xffffffffUL) {
MQ_LOG_Log(LOG_ERR, "Cannot parse partition key: %.*s", (int) keylen,
(const char *) keydata);
return RD_KAFKA_PARTITION_UA;
}
if ((partition_cnt & (partition_cnt - 1)) == 0)
/* partition_cnt is a power of 2 */
partition = key & (partition_cnt - 1);
else
partition = key % partition_cnt;
if (! rd_kafka_topic_partition_available(rkt, partition)) {
MQ_LOG_Log(LOG_ERR, "Partition %d not available", partition);
return RD_KAFKA_PARTITION_UA;
}
MQ_LOG_Log(LOG_DEBUG, "Computed partition %d for key %.*s", partition,
(int) keylen, (const char *) keydata);
return partition;
}
/* XXX: encapsulate wrk_init and _fini in a separate source */
static const char
*wrk_init(int wrk_num)
{
char clientid[sizeof("libtrackrdr-kafka-worker-2147483648")];
rd_kafka_conf_t *wrk_conf;
rd_kafka_topic_conf_t *wrk_topic_conf;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
kafka_wrk_t *wrk;
assert(wrk_num >= 0 && wrk_num < nwrk);
wrk_conf = rd_kafka_conf_dup(conf);
wrk_topic_conf = rd_kafka_topic_conf_dup(topic_conf);
sprintf(clientid, "libtrackrdr-kafka-worker-%d", wrk_num);
if (rd_kafka_conf_set(wrk_conf, "client.id", clientid, errmsg,
LINE_MAX) != RD_KAFKA_CONF_OK) {
MQ_LOG_Log(LOG_ERR, "rdkafka config error [client.id = %s]: %s",
clientid, errmsg);
return errmsg;
}
rd_kafka_topic_conf_set_partitioner_cb(wrk_topic_conf, partitioner_cb);
ALLOC_OBJ(wrk, KAFKA_WRK_MAGIC);
if (wrk == NULL) {
snprintf(errmsg, LINE_MAX, "Failed to create worker handle: %s",
strerror(errno));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
rd_kafka_conf_set_opaque(wrk_conf, (void *) wrk);
rd_kafka_topic_conf_set_opaque(wrk_topic_conf, (void *) wrk);
rk = rd_kafka_new(RD_KAFKA_PRODUCER, wrk_conf, errmsg, LINE_MAX);
if (rk == NULL) {
MQ_LOG_Log(LOG_ERR, "Failed to create producer: %s", errmsg);
return errmsg;
}
CHECK_OBJ_NOTNULL((kafka_wrk_t *) rd_kafka_opaque(rk), KAFKA_WRK_MAGIC);
rd_kafka_set_log_level(rk, loglvl);
errno = 0;
rkt = rd_kafka_topic_new(rk, topic, wrk_topic_conf);
if (rkt == NULL) {
rd_kafka_resp_err_t rkerr = rd_kafka_errno2err(errno);
snprintf(errmsg, LINE_MAX, "Failed to initialize topic: %s",
rd_kafka_err2str(rkerr));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
wrk-> n = wrk_num;
wrk->kafka = rk;
wrk->topic = rkt;
wrk->errmsg[0] = '\0';
wrk->seen = wrk->produced = wrk->delivered = wrk->failed = wrk->nokey
= wrk->badkey = wrk->nodata = 0;
workers[wrk_num] = wrk;
MQ_LOG_Log(LOG_INFO, "initialized worker %d: %s", wrk_num,
rd_kafka_name(wrk->kafka));
rd_kafka_poll(wrk->kafka, 0);
return NULL;
}
static void
wrk_fini(kafka_wrk_t *wrk)
{
int wrk_num;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
wrk_num = wrk->n;
assert(wrk_num >= 0 && wrk_num < nwrk);
/* Wait for messages to be delivered */
/* XXX: timeout? configure poll timeout? */
while (rd_kafka_outq_len(wrk->kafka) > 0)
rd_kafka_poll(wrk->kafka, 100);
rd_kafka_topic_destroy(wrk->topic);
rd_kafka_destroy(wrk->kafka);
FREE_OBJ(wrk);
AN(wrk);
workers[wrk_num] = NULL;
}
static int
conf_add(const char *lval, const char *rval)
{
......@@ -386,6 +254,8 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
nwrk = nworkers;
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();
loglvl = LOG_INFO;
topic[0] = '\0';
if (CONF_ReadFile(config_fname, conf_add) != 0)
return "Error reading config file for Kafka";
......@@ -462,7 +332,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
rd_kafka_conf_set_error_cb(conf, error_cb);
rd_kafka_conf_set_log_cb(conf, log_cb);
rd_kafka_conf_set_stats_cb(conf, stats_cb);
rd_kafka_topic_conf_set_partitioner_cb(topic_conf, partitioner_cb);
rd_kafka_topic_conf_set_partitioner_cb(topic_conf, CB_Partitioner);
if (loglvl == LOG_DEBUG) {
size_t cfglen;
......@@ -537,7 +407,7 @@ MQ_InitConnections(void)
}
for (int i = 0; i < nwrk; i++) {
const char *err = wrk_init(i);
const char *err = WRK_Init(i, conf, topic_conf);
if (err != NULL)
return err;
}
......@@ -641,9 +511,9 @@ MQ_Reconnect(void **priv)
CAST_OBJ_NOTNULL(wrk, *priv, KAFKA_WRK_MAGIC);
wrk_num = wrk->n;
assert(wrk_num >= 0 && wrk_num < nwrk);
wrk_fini(wrk);
WRK_Fini(wrk);
err = wrk_init(wrk_num);
err = WRK_Init(wrk_num, conf, topic_conf);
if (err != NULL)
return err;
*priv = workers[wrk_num];
......@@ -673,7 +543,7 @@ MQ_WorkerShutdown(void **priv)
kafka_wrk_t *wrk;
CAST_OBJ_NOTNULL(wrk, *priv, KAFKA_WRK_MAGIC);
wrk_fini(wrk);
WRK_Fini(wrk);
*priv = NULL;
return NULL;
......@@ -687,7 +557,7 @@ MQ_GlobalShutdown(void)
MQ_MON_Fini();
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL)
wrk_fini(workers[i]);
WRK_Fini(workers[i]);
free(workers);
rd_kafka_conf_destroy(conf);
......
......@@ -56,6 +56,10 @@ typedef struct kafka_wrk {
kafka_wrk_t **workers;
unsigned nwrk;
char topic[LINE_MAX];
int loglvl;
/* log.c */
int MQ_LOG_Open(const char *path);
void MQ_LOG_Log(int level, const char *msg, ...);
......@@ -72,3 +76,14 @@ const char *MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist,
const char *MQ_ZOO_SetLog(const char *path);
void MQ_ZOO_SetLogLevel(int level);
const char *MQ_ZOO_Fini(void);
/* worker.c */
const char *WRK_Init(int wrk_num, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *topic_conf);
void WRK_AddBrokers(const char *brokers);
void WRK_Fini(kafka_wrk_t *wrk);
/* callback.c */
int32_t CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata,
size_t keylen, int32_t partition_cnt, void *rkt_opaque,
void *msg_opaque);
......@@ -15,6 +15,8 @@ test_kafka_LDADD = \
../log.$(OBJEXT) \
../monitor.$(OBJEXT) \
../zookeeper.$(OBJEXT) \
../worker.$(OBJEXT) \
../callback.$(OBJEXT) \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
......
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <syslog.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include "mq_kafka.h"
#include "miniobj.h"
static char errmsg[LINE_MAX];
const char
*WRK_Init(int wrk_num, rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *topic_conf)
{
char clientid[sizeof("libtrackrdr-kafka-worker-2147483648")];
rd_kafka_conf_t *wrk_conf;
rd_kafka_topic_conf_t *wrk_topic_conf;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
kafka_wrk_t *wrk;
assert(wrk_num >= 0 && wrk_num < nwrk);
wrk_conf = rd_kafka_conf_dup(conf);
wrk_topic_conf = rd_kafka_topic_conf_dup(topic_conf);
sprintf(clientid, "libtrackrdr-kafka-worker-%d", wrk_num);
if (rd_kafka_conf_set(wrk_conf, "client.id", clientid, errmsg,
LINE_MAX) != RD_KAFKA_CONF_OK) {
MQ_LOG_Log(LOG_ERR, "rdkafka config error [client.id = %s]: %s",
clientid, errmsg);
return errmsg;
}
rd_kafka_topic_conf_set_partitioner_cb(wrk_topic_conf, CB_Partitioner);
ALLOC_OBJ(wrk, KAFKA_WRK_MAGIC);
if (wrk == NULL) {
snprintf(errmsg, LINE_MAX, "Failed to create worker handle: %s",
strerror(errno));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
rd_kafka_conf_set_opaque(wrk_conf, (void *) wrk);
rd_kafka_topic_conf_set_opaque(wrk_topic_conf, (void *) wrk);
rk = rd_kafka_new(RD_KAFKA_PRODUCER, wrk_conf, errmsg, LINE_MAX);
if (rk == NULL) {
MQ_LOG_Log(LOG_ERR, "Failed to create producer: %s", errmsg);
return errmsg;
}
CHECK_OBJ_NOTNULL((kafka_wrk_t *) rd_kafka_opaque(rk), KAFKA_WRK_MAGIC);
rd_kafka_set_log_level(rk, loglvl);
errno = 0;
rkt = rd_kafka_topic_new(rk, topic, wrk_topic_conf);
if (rkt == NULL) {
rd_kafka_resp_err_t rkerr = rd_kafka_errno2err(errno);
snprintf(errmsg, LINE_MAX, "Failed to initialize topic: %s",
rd_kafka_err2str(rkerr));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
wrk->n = wrk_num;
wrk->kafka = rk;
wrk->topic = rkt;
wrk->errmsg[0] = '\0';
wrk->seen = wrk->produced = wrk->delivered = wrk->failed = wrk->nokey
= wrk->badkey = wrk->nodata = 0;
workers[wrk_num] = wrk;
MQ_LOG_Log(LOG_INFO, "initialized worker %d: %s", wrk_num,
rd_kafka_name(wrk->kafka));
rd_kafka_poll(wrk->kafka, 0);
return NULL;
}
void
WRK_AddBrokers(const char *brokers)
{
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL) {
int nbrokers;
CHECK_OBJ(workers[i], KAFKA_WRK_MAGIC);
nbrokers = rd_kafka_brokers_add(workers[i]->kafka, brokers);
/* XXX: poll timeout configurable? */
rd_kafka_poll(workers[i]->kafka, 10);
MQ_LOG_Log(LOG_INFO, "%s: added %d brokers [%s]",
rd_kafka_name(workers[i]->kafka), nbrokers, brokers);
}
}
void
WRK_Fini(kafka_wrk_t *wrk)
{
int wrk_num;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
wrk_num = wrk->n;
assert(wrk_num >= 0 && wrk_num < nwrk);
/* Wait for messages to be delivered */
/* XXX: timeout? configure poll timeout? */
while (rd_kafka_outq_len(wrk->kafka) > 0)
rd_kafka_poll(wrk->kafka, 100);
rd_kafka_topic_destroy(wrk->topic);
rd_kafka_destroy(wrk->kafka);
FREE_OBJ(wrk);
workers[wrk_num] = NULL;
}
......@@ -162,19 +162,7 @@ watcher(zhandle_t *zzh, int type, int state, const char *path, void *watcherCtx)
return;
}
if (brokers[0] != '\0')
/* XXX: encapsulate */
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL) {
int nbrokers;
CHECK_OBJ(workers[i], KAFKA_WRK_MAGIC);
nbrokers = rd_kafka_brokers_add(workers[i]->kafka, brokers);
/* XXX: poll timeout configurable? */
rd_kafka_poll(workers[i]->kafka, 10);
MQ_LOG_Log(LOG_INFO, "%s: added %d brokers [%s]",
rd_kafka_name(workers[i]->kafka), nbrokers,
brokers);
}
WRK_AddBrokers((const char *) brokers);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment