Commit 5d78e446 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: add a zookeeper watcher for notifications about

changes in the list of brokers
parent e565f94f
...@@ -6,6 +6,10 @@ ...@@ -6,6 +6,10 @@
* *
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de> * Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* *
* Portions adapted from rdkafka_zookeeper_example.c from librdkafka
* https://github.com/edenhill/librdkafka
* Copyright (c) 2012, Magnus Edenhill
*
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions * modification, are permitted provided that the following conditions
* are met: * are met:
...@@ -43,6 +47,9 @@ ...@@ -43,6 +47,9 @@
#include <pcre.h> #include <pcre.h>
#include "mq_kafka.h" #include "mq_kafka.h"
#include "miniobj.h"
#define BROKER_PATH "/brokers/ids"
static zhandle_t *zh = NULL; static zhandle_t *zh = NULL;
static pcre *host_regex = NULL, *port_regex = NULL; static pcre *host_regex = NULL, *port_regex = NULL;
...@@ -59,8 +66,7 @@ static const char ...@@ -59,8 +66,7 @@ static const char
AN(zh); AN(zh);
/* XXX: set watch param to non-zero for watcher callback */ if ((result = zoo_get_children(zh, BROKER_PATH, 1, &brokers)) != ZOK) {
if ((result = zoo_get_children(zh, "/brokers/ids", 0, &brokers)) != ZOK) {
snprintf(errmsg, LINE_MAX, "Cannot get broker ids from zookeeper: %s", snprintf(errmsg, LINE_MAX, "Cannot get broker ids from zookeeper: %s",
zerror(result)); zerror(result));
return errmsg; return errmsg;
...@@ -83,7 +89,6 @@ static const char ...@@ -83,7 +89,6 @@ static const char
int len = LINE_MAX; int len = LINE_MAX;
snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]); snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]);
/* XXX: set up a watcher */
if ((result = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) { if ((result = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
snprintf(errmsg, LINE_MAX, snprintf(errmsg, LINE_MAX,
"Cannot get config for broker id %s: %s", "Cannot get config for broker id %s: %s",
...@@ -138,12 +143,47 @@ static const char ...@@ -138,12 +143,47 @@ static const char
return NULL; return NULL;
} }
/* cf. rdkafka_zookeeper_example.c */
static void
watcher(zhandle_t *zzh, int type, int state, const char *path, void *watcherCtx)
{
char brokers[LINE_MAX] = "";
(void) state;
(void) watcherCtx;
assert(zzh == zh);
if (type == ZOO_CHILD_EVENT
&& strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0) {
const char *err = setBrokerList(brokers, LINE_MAX);
if (err != NULL) {
MQ_LOG_Log(LOG_ERR, "Error obtaining broker list from watcher: %s",
err);
return;
}
if (brokers[0] != '\0')
/* XXX: encapsulate */
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL) {
int nbrokers;
CHECK_OBJ(workers[i], KAFKA_WRK_MAGIC);
nbrokers = rd_kafka_brokers_add(workers[i]->kafka, brokers);
/* XXX: poll timeout configurable? */
rd_kafka_poll(workers[i]->kafka, 10);
MQ_LOG_Log(LOG_INFO, "%s: added %d brokers [%s]",
rd_kafka_name(workers[i]->kafka), nbrokers,
brokers);
}
}
}
const char const char
*MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist, int max) *MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist, int max)
{ {
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */ /* XXX: wait for ZOO_CONNECTED_STATE */
errno = 0; errno = 0;
zh = zookeeper_init(zooservers, NULL, timeout, 0, 0, 0); zh = zookeeper_init(zooservers, watcher, timeout, 0, 0, 0);
if (zh == NULL) { if (zh == NULL) {
snprintf(errmsg, LINE_MAX, "init/connect failure: %s", strerror(errno)); snprintf(errmsg, LINE_MAX, "init/connect failure: %s", strerror(errno));
return errmsg; return errmsg;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment