Commit 05b21bc1 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: encapsulate zookeeper code in zookeeper.c

parent 02e288a2
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) -I$(top_srcdir)/include
INCLUDES = -I$(top_srcdir)/include
CURRENT = 3
REVISION = 0
......@@ -13,6 +13,7 @@ libtrackrdr_kafka_la_SOURCES = \
mq.c \
log.c \
monitor.c \
zookeeper.c \
$(top_srcdir)/src/config_common.c
libtrackrdr_kafka_la_LIBADD = \
......
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
......@@ -34,12 +34,10 @@
#include <errno.h>
#include <string.h>
#include <strings.h>
#include <limits.h>
#include <syslog.h>
#include <ctype.h>
#include <signal.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_version.h>
#include <pcre.h>
......@@ -68,8 +66,6 @@ static char logpath[PATH_MAX] = "";
static char zookeeper[LINE_MAX] = "";
static char brokerlist[LINE_MAX] = "";
static char zoolog[PATH_MAX] = "";
static FILE *zoologf;
static zhandle_t *zh;
static unsigned zoo_timeout = 0;
static char topic[LINE_MAX] = "";
......@@ -96,16 +92,15 @@ toggle_debug(int sig)
loglvl = saved_lvl;
debug_toggle = 0;
MQ_LOG_Log(LOG_INFO, "Debug toggle switched off");
zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
}
else {
saved_lvl = loglvl;
loglvl = LOG_DEBUG;
debug_toggle = 1;
MQ_LOG_Log(LOG_INFO, "Debug toggle switched on");
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
}
MQ_LOG_SetLevel(loglvl);
MQ_ZOO_SetLogLevel(loglvl);
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL) {
CHECK_OBJ(workers[i], KAFKA_WRK_MAGIC);
......@@ -440,14 +435,13 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
if (zoolog[0] != '\0') {
zoologf = fopen(zoolog, "a");
if (zoologf == NULL) {
const char *err = MQ_ZOO_SetLog(zoolog);
if (err != NULL) {
snprintf(errmsg, LINE_MAX, "Cannot open zookeeper.log %s: %s",
zoolog, strerror(errno));
zoolog, err);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
zoo_set_log_stream(zoologf);
}
if (stats_interval != 0) {
......@@ -487,7 +481,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
MQ_LOG_Log(LOG_DEBUG, "%s = %s", cfg[2*i], cfg[2*i + 1]);
rd_kafka_conf_dump_free(cfg, cfglen);
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
MQ_ZOO_SetLogLevel(LOG_DEBUG);
}
return NULL;
......@@ -501,95 +495,17 @@ MQ_InitConnections(void)
assert(zookeeper[0] != '\0' || brokerlist[0] != '\0');
if (zookeeper[0] != '\0') {
struct String_vector brokers;
int result;
char zbrokerlist[LINE_MAX];
char *brokerptr = zbrokerlist;
const char *pcre_err;
pcre *host_regex, *port_regex;
const char *err;
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */
errno = 0;
zh = zookeeper_init(zookeeper, NULL, zoo_timeout, 0, 0, 0);
if (zh == NULL) {
snprintf(errmsg, LINE_MAX, "Zookeeper init/connect failure: %s",
strerror(errno));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
/* XXX: set watch param to non-zero for watcher callback */
if ((result = zoo_get_children(zh, "/brokers/ids", 0, &brokers))
!= ZOK) {
if ((err = MQ_ZOO_Init(zookeeper, zoo_timeout, zbrokerlist, LINE_MAX))
!= NULL) {
snprintf(errmsg, LINE_MAX,
"Cannot get broker ids from zookeeper: %s",
zerror(result));
"Failed to init/connect to zookeeper [%s]: %s",
zookeeper, err);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
/* XXX: cache compiled pcre regexen for use with the watcher
* callback */
host_regex = pcre_compile("\"host\"\\s*:\\s*\"([^\"]+)\"", 0,
&pcre_err, &result, NULL);
AN(host_regex);
port_regex = pcre_compile("\"port\"\\s*:\\s*(\\d+)", 0,
&pcre_err, &result, NULL);
AN(port_regex);
memset(zbrokerlist, 0, LINE_MAX);
for (int i = 0; i < brokers.count; i++) {
char path[PATH_MAX], broker[LINE_MAX];
int len = LINE_MAX;
snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]);
/* XXX: set up a watcher */
if ((result = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
snprintf(errmsg, LINE_MAX,
"Cannot get config for broker id %s from zookeeper: %s",
brokers.data[i], zerror(result));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
if (len > 0) {
int ovector[6], r;
const char *host = NULL, *port = NULL;
broker[len] = '\0';
MQ_LOG_Log(LOG_DEBUG, "Zookeeper %s broker id %s config: %s",
zookeeper, brokers.data[i], broker);
r = pcre_exec(host_regex, NULL, broker, len, 0, 0, ovector, 6);
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX,
"Host not found in config for broker id %s from "
"zookeeper [%s]", brokers.data[i], broker);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
pcre_get_substring(broker, ovector, r, 1, &host);
AN(host);
r = pcre_exec(port_regex, NULL, broker, len, 0, 0, ovector, 6);
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX,
"Port not found in config for broker id %s from "
"zookeeper [%s]", brokers.data[i], broker);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
pcre_get_substring(broker, ovector, r, 1, &port);
AN(port);
sprintf(brokerptr, "%s:%s", host, port);
pcre_free_substring(host);
pcre_free_substring(port);
brokerptr += strlen(brokerptr);
if (i < brokers.count)
*brokerptr++ = ',';
}
else
MQ_LOG_Log(LOG_WARNING, "Empty config returned from zookeeper "
"for broker id %s", brokers.data[i]);
}
deallocate_String_vector(&brokers);
if (zbrokerlist[0] == '\0')
if (brokerlist[0] == '\0') {
snprintf(errmsg, LINE_MAX,
......@@ -778,7 +694,7 @@ MQ_WorkerShutdown(void **priv)
const char *
MQ_GlobalShutdown(void)
{
int zerr;
const char *err = NULL;
MQ_MON_Fini();
for (int i = 0; i < nwrk; i++)
......@@ -789,20 +705,15 @@ MQ_GlobalShutdown(void)
rd_kafka_conf_destroy(conf);
rd_kafka_topic_conf_destroy(topic_conf);
errno = 0;
if ((zerr = zookeeper_close(zh)) != ZOK) {
const char *err = zerror(zerr);
if (zerr == ZSYSTEMERROR)
snprintf(errmsg, LINE_MAX, "Error closing zookeeper: %s (%s)", err,
strerror(errno));
else
snprintf(errmsg, LINE_MAX, "Error closing zookeeper: %s", err);
err = MQ_ZOO_Fini();
if (err != NULL) {
snprintf(errmsg, LINE_MAX, "Error closing zookeeper: %s", err);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
fclose(zoologf);
MQ_LOG_Log(LOG_INFO, "shutting down");
MQ_LOG_Close();
if (err != NULL)
return errmsg;
return NULL;
}
......@@ -63,3 +63,10 @@ void MQ_LOG_Close(void);
/* monitor.c */
int MQ_MON_Init(unsigned interval);
void MQ_MON_Fini(void);
/* zookeeper.c */
const char *MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist,
int max);
const char *MQ_ZOO_SetLog(const char *path);
void MQ_ZOO_SetLogLevel(int level);
const char *MQ_ZOO_Fini(void);
......@@ -14,6 +14,7 @@ test_kafka_LDADD = \
../mq.$(OBJEXT) \
../log.$(OBJEXT) \
../monitor.$(OBJEXT) \
../zookeeper.$(OBJEXT) \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
......
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/*
* Encapsulate interaction of the Kafka MQ plugin with Apache ZooKeeper
* servers
*/
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <syslog.h>
#include <zookeeper/zookeeper.h>
#include <pcre.h>
#include "mq_kafka.h"
static zhandle_t *zh = NULL;
static pcre *host_regex = NULL, *port_regex = NULL;
static char errmsg[LINE_MAX];
static FILE *zoologf = NULL;
static const char
*setBrokerList(char *brokerlist, int max)
{
struct String_vector brokers;
int result;
char *brokerptr = brokerlist;
const char *pcre_err;
AN(zh);
/* XXX: set watch param to non-zero for watcher callback */
if ((result = zoo_get_children(zh, "/brokers/ids", 0, &brokers)) != ZOK) {
snprintf(errmsg, LINE_MAX, "Cannot get broker ids from zookeeper: %s",
zerror(result));
return errmsg;
}
if (host_regex == NULL) {
host_regex = pcre_compile("\"host\"\\s*:\\s*\"([^\"]+)\"", 0,
&pcre_err, &result, NULL);
AN(host_regex);
}
if (port_regex == NULL) {
port_regex = pcre_compile("\"port\"\\s*:\\s*(\\d+)", 0,
&pcre_err, &result, NULL);
AN(port_regex);
}
memset(brokerlist, 0, max);
for (int i = 0; i < brokers.count; i++) {
char path[PATH_MAX], broker[LINE_MAX];
int len = LINE_MAX;
snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]);
/* XXX: set up a watcher */
if ((result = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
snprintf(errmsg, LINE_MAX,
"Cannot get config for broker id %s: %s",
brokers.data[i], zerror(result));
return errmsg;
}
if (len > 0) {
int ovector[6], r;
const char *host = NULL, *port = NULL;
broker[len] = '\0';
MQ_LOG_Log(LOG_DEBUG, "Zookeeper broker id %s config: %s",
brokers.data[i], broker);
r = pcre_exec(host_regex, NULL, broker, len, 0, 0, ovector, 6);
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX,
"Host not found in config for broker id %s [%s]",
brokers.data[i], broker);
return errmsg;
}
pcre_get_substring(broker, ovector, r, 1, &host);
AN(host);
r = pcre_exec(port_regex, NULL, broker, len, 0, 0, ovector, 6);
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX,
"Port not found in config for broker id %s [%s]",
brokers.data[i], broker);
return errmsg;
}
pcre_get_substring(broker, ovector, r, 1, &port);
AN(port);
if (strlen(brokerlist) + strlen(host) + strlen(port) + 2 > max) {
snprintf(errmsg, LINE_MAX,
"Broker list length exceeds max %d [%s%s:%s]",
max, brokerlist, host, port);
return errmsg;
}
sprintf(brokerptr, "%s:%s", host, port);
pcre_free_substring(host);
pcre_free_substring(port);
brokerptr += strlen(brokerptr);
if (i < brokers.count - 1)
*brokerptr++ = ',';
}
else
MQ_LOG_Log(LOG_WARNING, "Empty config returned for broker id %s",
brokers.data[i]);
}
deallocate_String_vector(&brokers);
return NULL;
}
const char
*MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist, int max)
{
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */
errno = 0;
zh = zookeeper_init(zooservers, NULL, timeout, 0, 0, 0);
if (zh == NULL) {
snprintf(errmsg, LINE_MAX, "init/connect failure: %s", strerror(errno));
return errmsg;
}
return setBrokerList(brokerlist, max);
}
const char
*MQ_ZOO_SetLog(const char *path)
{
AN(path);
AN(path[0]);
zoologf = fopen(path, "a");
if (zoologf == NULL) {
strncpy(errmsg, strerror(errno), LINE_MAX);
return errmsg;
}
zoo_set_log_stream(zoologf);
return NULL;
}
void
MQ_ZOO_SetLogLevel(int level)
{
if (zh == NULL)
return;
if (zoologf != NULL)
/* level must be a syslog level */
switch(level) {
case LOG_INFO:
case LOG_NOTICE:
zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
break;
case LOG_DEBUG:
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
break;
case LOG_WARNING:
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
break;
case LOG_ERR:
case LOG_CRIT:
case LOG_ALERT:
case LOG_EMERG:
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
break;
default:
MQ_LOG_Log(LOG_ERR, "Unknown log level %d", level);
AN(0);
break;
}
}
const char
*MQ_ZOO_Fini(void)
{
int zerr;
if (zh == NULL)
return NULL;
errno = 0;
if ((zerr = zookeeper_close(zh)) != ZOK) {
const char *err = zerror(zerr);
if (zerr == ZSYSTEMERROR)
snprintf(errmsg, LINE_MAX, "%s (%s)", err, strerror(errno));
else
strncpy(errmsg, err, LINE_MAX);
return errmsg;
}
if (zoologf != NULL)
fclose(zoologf);
return NULL;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment