Commit 9af1fa30 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: add a zookeeper watcher for notifications about

changes in the list of brokers
parent 98c25ef6
......@@ -6,6 +6,10 @@
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Portions adapted from rdkafka_zookeeper_example.c from librdkafka
* https://github.com/edenhill/librdkafka
* Copyright (c) 2012, Magnus Edenhill
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
......@@ -43,6 +47,9 @@
#include <pcre.h>
#include "mq_kafka.h"
#include "miniobj.h"
#define BROKER_PATH "/brokers/ids"
static zhandle_t *zh = NULL;
static pcre *host_regex = NULL, *port_regex = NULL;
......@@ -59,8 +66,7 @@ static const char
AN(zh);
/* XXX: set watch param to non-zero for watcher callback */
if ((result = zoo_get_children(zh, "/brokers/ids", 0, &brokers)) != ZOK) {
if ((result = zoo_get_children(zh, BROKER_PATH, 1, &brokers)) != ZOK) {
snprintf(errmsg, LINE_MAX, "Cannot get broker ids from zookeeper: %s",
zerror(result));
return errmsg;
......@@ -83,7 +89,6 @@ static const char
int len = LINE_MAX;
snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]);
/* XXX: set up a watcher */
if ((result = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
snprintf(errmsg, LINE_MAX,
"Cannot get config for broker id %s: %s",
......@@ -138,12 +143,47 @@ static const char
return NULL;
}
/* cf. rdkafka_zookeeper_example.c */
static void
watcher(zhandle_t *zzh, int type, int state, const char *path, void *watcherCtx)
{
char brokers[LINE_MAX] = "";
(void) state;
(void) watcherCtx;
assert(zzh == zh);
if (type == ZOO_CHILD_EVENT
&& strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0) {
const char *err = setBrokerList(brokers, LINE_MAX);
if (err != NULL) {
MQ_LOG_Log(LOG_ERR, "Error obtaining broker list from watcher: %s",
err);
return;
}
if (brokers[0] != '\0')
/* XXX: encapsulate */
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL) {
int nbrokers;
CHECK_OBJ(workers[i], KAFKA_WRK_MAGIC);
nbrokers = rd_kafka_brokers_add(workers[i]->kafka, brokers);
/* XXX: poll timeout configurable? */
rd_kafka_poll(workers[i]->kafka, 10);
MQ_LOG_Log(LOG_INFO, "%s: added %d brokers [%s]",
rd_kafka_name(workers[i]->kafka), nbrokers,
brokers);
}
}
}
const char
*MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist, int max)
{
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */
/* XXX: wait for ZOO_CONNECTED_STATE */
errno = 0;
zh = zookeeper_init(zooservers, NULL, timeout, 0, 0, 0);
zh = zookeeper_init(zooservers, watcher, timeout, 0, 0, 0);
if (zh == NULL) {
snprintf(errmsg, LINE_MAX, "init/connect failure: %s", strerror(errno));
return errmsg;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment