Commit 682e878f authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: encapsulate configuration

parent df698e43
......@@ -16,6 +16,7 @@ libtrackrdr_kafka_la_SOURCES = \
zookeeper.c \
worker.c \
callback.c \
config.c \
$(top_srcdir)/src/config_common.c
libtrackrdr_kafka_la_LIBADD = \
......
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/* Callbacks used by rdkafka */
#include <string.h>
#include <errno.h>
#include <syslog.h>
#include <stdlib.h>
#include "mq_kafka.h"
void
CONF_Init(void)
{
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();
loglvl = LOG_INFO;
topic[0] = '\0';
logpath[0] = '\0';
zookeeper[0] = '\0';
zoo_timeout = 0;
stats_interval = 0;
zoolog[0] = '\0';
brokerlist[0] = '\0';
}
int
CONF_Add(const char *lval, const char *rval)
{
rd_kafka_conf_res_t result;
char errstr[LINE_MAX];
errstr[0] = '\0';
if (strcmp(lval, "mq.log") == 0) {
strncpy(logpath, rval, PATH_MAX);
return(0);
}
if (strcmp(lval, "zookeeper.connect") == 0) {
strncpy(zookeeper, rval, LINE_MAX);
return(0);
}
/* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */
if (strcmp(lval, "zookeeper.timeout") == 0) {
char *endptr = NULL;
long val;
errno = 0;
val = strtoul(rval, &endptr, 10);
if (errno != 0)
return errno;
if (*endptr != '\0')
return EINVAL;
if (val > UINT_MAX)
return ERANGE;
zoo_timeout = val;
return(0);
}
if (strcmp(lval, "statistics.interval.ms") == 0) {
char *endptr = NULL;
long val;
errno = 0;
val = strtoul(rval, &endptr, 10);
if (errno != 0)
return errno;
if (*endptr != '\0')
return EINVAL;
if (val > UINT_MAX)
return ERANGE;
stats_interval = val;
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
return(0);
}
if (strcmp(lval, "zookeeper.log") == 0) {
strncpy(zoolog, rval, PATH_MAX);
return(0);
}
if (strcmp(lval, "topic") == 0) {
strncpy(topic, rval, LINE_MAX);
return(0);
}
if (strcmp(lval, "metadata.broker.list") == 0) {
strncpy(brokerlist, rval, LINE_MAX);
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
return(0);
}
/* XXX: use the rdkakfka param "log_level" instead */
if (strcmp(lval, "mq.debug") == 0) {
if (strcmp(rval, "1") == 0
|| strcasecmp(rval, "true") == 0
|| strcasecmp(rval, "yes") == 0
|| strcasecmp(rval, "on") == 0)
loglvl = LOG_DEBUG;
else if (strcmp(rval, "0") != 0
&& strcasecmp(rval, "false") != 0
&& strcasecmp(rval, "no") != 0
&& strcasecmp(rval, "off") != 0)
return EINVAL;
return(0);
}
result = rd_kafka_topic_conf_set(topic_conf, lval, rval, errstr, LINE_MAX);
if (result == RD_KAFKA_CONF_UNKNOWN)
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
else
return(0);
}
void
CONF_Dump(void)
{
MQ_LOG_Log(LOG_DEBUG, "mq.log = %s", logpath);
MQ_LOG_Log(LOG_DEBUG, "zookeeper.connect = %s", zookeeper);
MQ_LOG_Log(LOG_DEBUG, "zookeeper.timeout = %u", zoo_timeout);
MQ_LOG_Log(LOG_DEBUG, "zookeeper.log = %s", zoolog);
MQ_LOG_Log(LOG_DEBUG, "topic = %s", topic);
// leaving out mq.debug for now
}
......@@ -77,6 +77,8 @@ static void
struct timespec t;
unsigned interval = *((unsigned *) arg);
AN(interval);
/* Convert ms -> struct timespec */
t.tv_sec = (time_t) interval / 1e3;
t.tv_nsec = (interval % (unsigned) 1e3) * 1e6;
......
......@@ -60,18 +60,6 @@
#define SO_VERSION "unknown version"
#endif
static char logpath[PATH_MAX] = "";
static char zookeeper[LINE_MAX] = "";
static char brokerlist[LINE_MAX] = "";
static char zoolog[PATH_MAX] = "";
static unsigned zoo_timeout = 0;
static rd_kafka_topic_conf_t *topic_conf;
static rd_kafka_conf_t *conf;
static unsigned stats_interval = 0;
static char errmsg[LINE_MAX];
static char _version[LINE_MAX];
......@@ -105,105 +93,13 @@ toggle_debug(int sig)
}
}
static int
conf_add(const char *lval, const char *rval)
{
rd_kafka_conf_res_t result;
char errstr[LINE_MAX];
errstr[0] = '\0';
if (strcmp(lval, "mq.log") == 0) {
strncpy(logpath, rval, PATH_MAX);
return(0);
}
if (strcmp(lval, "zookeeper.connect") == 0) {
strncpy(zookeeper, rval, LINE_MAX);
return(0);
}
/* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */
if (strcmp(lval, "zookeeper.timeout") == 0) {
char *endptr = NULL;
long val;
errno = 0;
val = strtoul(rval, &endptr, 10);
if (errno != 0)
return errno;
if (*endptr != '\0')
return EINVAL;
if (val > UINT_MAX)
return ERANGE;
zoo_timeout = val;
return(0);
}
if (strcmp(lval, "statistics.interval.ms") == 0) {
char *endptr = NULL;
long val;
errno = 0;
val = strtoul(rval, &endptr, 10);
if (errno != 0)
return errno;
if (*endptr != '\0')
return EINVAL;
if (val > UINT_MAX)
return ERANGE;
stats_interval = val;
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
return(0);
}
if (strcmp(lval, "zookeeper.log") == 0) {
strncpy(zoolog, rval, PATH_MAX);
return(0);
}
if (strcmp(lval, "topic") == 0) {
strncpy(topic, rval, LINE_MAX);
return(0);
}
if (strcmp(lval, "metadata.broker.list") == 0) {
strncpy(brokerlist, rval, LINE_MAX);
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
return(0);
}
/* XXX: use the rdkakfka param "log_level" instead */
if (strcmp(lval, "mq.debug") == 0) {
if (strcmp(rval, "1") == 0
|| strcasecmp(rval, "true") == 0
|| strcasecmp(rval, "yes") == 0
|| strcasecmp(rval, "on") == 0)
loglvl = LOG_DEBUG;
else if (strcmp(rval, "0") != 0
&& strcasecmp(rval, "false") != 0
&& strcasecmp(rval, "no") != 0
&& strcasecmp(rval, "off") != 0)
return EINVAL;
return(0);
}
result = rd_kafka_topic_conf_set(topic_conf, lval, rval, errstr, LINE_MAX);
if (result == RD_KAFKA_CONF_UNKNOWN)
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
else
return(0);
}
const char *
MQ_GlobalInit(unsigned nworkers, const char *config_fname)
{
CONF_Init();
nwrk = nworkers;
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();
loglvl = LOG_INFO;
topic[0] = '\0';
if (CONF_ReadFile(config_fname, conf_add) != 0)
if (CONF_ReadFile(config_fname, CONF_Add) != 0)
return "Error reading config file for Kafka";
if (logpath[0] != '\0') {
......@@ -255,7 +151,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
if (zoolog[0] != '\0') {
const char *err = MQ_ZOO_SetLog(zoolog);
const char *err = MQ_ZOO_OpenLog();
if (err != NULL) {
snprintf(errmsg, LINE_MAX, "Cannot open zookeeper.log %s: %s",
zoolog, err);
......@@ -285,8 +181,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
const char **cfg;
/* Dump config */
MQ_LOG_Log(LOG_DEBUG, "zookeeper.connect = %s", zookeeper);
MQ_LOG_Log(LOG_DEBUG, "topic = %s", topic);
CONF_Dump();
cfg = rd_kafka_conf_dump(conf, &cfglen);
if (cfg != NULL && cfglen > 0)
for (int i = 0; i < cfglen >> 1; i++) {
......@@ -318,8 +213,7 @@ MQ_InitConnections(void)
char zbrokerlist[LINE_MAX];
const char *err;
if ((err = MQ_ZOO_Init(zookeeper, zoo_timeout, zbrokerlist, LINE_MAX))
!= NULL) {
if ((err = MQ_ZOO_Init(zbrokerlist, LINE_MAX)) != NULL) {
snprintf(errmsg, LINE_MAX,
"Failed to init/connect to zookeeper [%s]: %s",
zookeeper, err);
......@@ -353,7 +247,7 @@ MQ_InitConnections(void)
}
for (int i = 0; i < nwrk; i++) {
const char *err = WRK_Init(i, conf, topic_conf);
const char *err = WRK_Init(i);
if (err != NULL)
return err;
}
......@@ -459,7 +353,7 @@ MQ_Reconnect(void **priv)
assert(wrk_num >= 0 && wrk_num < nwrk);
WRK_Fini(wrk);
err = WRK_Init(wrk_num, conf, topic_conf);
err = WRK_Init(wrk_num);
if (err != NULL)
return err;
*priv = workers[wrk_num];
......
......@@ -56,9 +56,18 @@ typedef struct kafka_wrk {
kafka_wrk_t **workers;
unsigned nwrk;
/* configuration */
char topic[LINE_MAX];
int loglvl;
char logpath[PATH_MAX];
char zookeeper[LINE_MAX];
char brokerlist[LINE_MAX];
char zoolog[PATH_MAX];
unsigned zoo_timeout;
unsigned stats_interval;
rd_kafka_topic_conf_t *topic_conf;
rd_kafka_conf_t *conf;
/* log.c */
int MQ_LOG_Open(const char *path);
......@@ -71,15 +80,13 @@ int MQ_MON_Init(unsigned interval);
void MQ_MON_Fini(void);
/* zookeeper.c */
const char *MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist,
int max);
const char *MQ_ZOO_SetLog(const char *path);
const char *MQ_ZOO_Init(char *brokers, int max);
const char *MQ_ZOO_OpenLog(void);
void MQ_ZOO_SetLogLevel(int level);
const char *MQ_ZOO_Fini(void);
/* worker.c */
const char *WRK_Init(int wrk_num, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *topic_conf);
const char *WRK_Init(int wrk_num);
void WRK_AddBrokers(const char *brokers);
void WRK_Fini(kafka_wrk_t *wrk);
......@@ -92,3 +99,8 @@ void CB_DeliveryReport(rd_kafka_t *rk, void *payload, size_t len,
rd_kafka_resp_err_t err, void *opaque, void *msg_opaque);
void CB_Error(rd_kafka_t *rk, int err, const char *reason, void *opaque);
int CB_Stats(rd_kafka_t *rk, char *json, size_t json_len, void *opaque);
/* config.c */
void CONF_Init(void);
int CONF_Add(const char *lval, const char *rval);
void CONF_Dump(void);
......@@ -17,6 +17,7 @@ test_kafka_LDADD = \
../zookeeper.$(OBJEXT) \
../worker.$(OBJEXT) \
../callback.$(OBJEXT) \
../config.$(OBJEXT) \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
......
......@@ -40,7 +40,7 @@
static char errmsg[LINE_MAX];
const char
*WRK_Init(int wrk_num, rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *topic_conf)
*WRK_Init(int wrk_num)
{
char clientid[sizeof("libtrackrdr-kafka-worker-2147483648")];
rd_kafka_conf_t *wrk_conf;
......
......@@ -57,16 +57,16 @@ static char errmsg[LINE_MAX];
static FILE *zoologf = NULL;
static const char
*setBrokerList(char *brokerlist, int max)
*setBrokerList(char *brokers, int max)
{
struct String_vector brokers;
struct String_vector broker_ids;
int result;
char *brokerptr = brokerlist;
char *brokerptr = brokers;
const char *pcre_err;
AN(zh);
if ((result = zoo_get_children(zh, BROKER_PATH, 1, &brokers)) != ZOK) {
if ((result = zoo_get_children(zh, BROKER_PATH, 1, &broker_ids)) != ZOK) {
snprintf(errmsg, LINE_MAX, "Cannot get broker ids from zookeeper: %s",
zerror(result));
return errmsg;
......@@ -83,16 +83,16 @@ static const char
AN(port_regex);
}
memset(brokerlist, 0, max);
for (int i = 0; i < brokers.count; i++) {
memset(brokers, 0, max);
for (int i = 0; i < broker_ids.count; i++) {
char path[PATH_MAX], broker[LINE_MAX];
int len = LINE_MAX;
snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]);
snprintf(path, PATH_MAX, "/brokers/ids/%s", broker_ids.data[i]);
if ((result = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
snprintf(errmsg, LINE_MAX,
"Cannot get config for broker id %s: %s",
brokers.data[i], zerror(result));
broker_ids.data[i], zerror(result));
return errmsg;
}
if (len > 0) {
......@@ -101,13 +101,13 @@ static const char
broker[len] = '\0';
MQ_LOG_Log(LOG_DEBUG, "Zookeeper broker id %s config: %s",
brokers.data[i], broker);
broker_ids.data[i], broker);
r = pcre_exec(host_regex, NULL, broker, len, 0, 0, ovector, 6);
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX,
"Host not found in config for broker id %s [%s]",
brokers.data[i], broker);
broker_ids.data[i], broker);
return errmsg;
}
pcre_get_substring(broker, ovector, r, 1, &host);
......@@ -116,30 +116,30 @@ static const char
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX,
"Port not found in config for broker id %s [%s]",
brokers.data[i], broker);
broker_ids.data[i], broker);
return errmsg;
}
pcre_get_substring(broker, ovector, r, 1, &port);
AN(port);
if (strlen(brokerlist) + strlen(host) + strlen(port) + 2 > max) {
if (strlen(brokers) + strlen(host) + strlen(port) + 2 > max) {
snprintf(errmsg, LINE_MAX,
"Broker list length exceeds max %d [%s%s:%s]",
max, brokerlist, host, port);
max, brokers, host, port);
return errmsg;
}
sprintf(brokerptr, "%s:%s", host, port);
pcre_free_substring(host);
pcre_free_substring(port);
brokerptr += strlen(brokerptr);
if (i < brokers.count - 1)
if (i < broker_ids.count - 1)
*brokerptr++ = ',';
}
else
MQ_LOG_Log(LOG_WARNING, "Empty config returned for broker id %s",
brokers.data[i]);
broker_ids.data[i]);
}
deallocate_String_vector(&brokers);
deallocate_String_vector(&broker_ids);
return NULL;
}
......@@ -167,25 +167,27 @@ watcher(zhandle_t *zzh, int type, int state, const char *path, void *watcherCtx)
}
const char
*MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist, int max)
*MQ_ZOO_Init(char *brokers, int max)
{
AN(zookeeper[0]);
/* XXX: wait for ZOO_CONNECTED_STATE */
errno = 0;
zh = zookeeper_init(zooservers, watcher, timeout, 0, 0, 0);
zh = zookeeper_init(zookeeper, watcher, zoo_timeout, 0, 0, 0);
if (zh == NULL) {
snprintf(errmsg, LINE_MAX, "init/connect failure: %s", strerror(errno));
return errmsg;
}
return setBrokerList(brokerlist, max);
return setBrokerList(brokers, max);
}
const char
*MQ_ZOO_SetLog(const char *path)
*MQ_ZOO_OpenLog(void)
{
AN(path);
AN(path[0]);
AN(zoolog);
AN(zoolog[0]);
zoologf = fopen(path, "a");
zoologf = fopen(zoolog, "a");
if (zoologf == NULL) {
strncpy(errmsg, strerror(errno), LINE_MAX);
return errmsg;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment