Commit fde11b39 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka uses pcre instead of yajl for JSON parsing

parent e1d0dac9
......@@ -17,7 +17,7 @@ libtrackrdr_kafka_la_SOURCES = \
libtrackrdr_kafka_la_LIBADD = \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
libtrackrdr_kafka_la_LDFLAGS = -version-info ${CURRENT}:${REVISION}:${AGE}
......
......@@ -9,7 +9,7 @@ Kafka implementation of the MQ interface for the Tracking Log Reader
--------------------------------------------------------------------
:Author: Geoffrey Simmons
:Date: 2014-05-28
:Date: 2014-05-31
:Version: 3.0
:Manual section: 3
......@@ -37,10 +37,10 @@ is specified as ``mq.config_fname`` in the configuration of
* ``rdkafka``, a client library for Kafka
* ``zookeeper_mt``, a client library for Apache ZooKeeper with
multi-threading
* ``yajl``, a library for JSON parsing
* ``pcre``, a regular expression library (used for JSON parsing)
The dynamic linker must also be able to find ``librdkafka.so``,
``libzookeeper_mt.so`` and ``libyajl.so`` at runtime.
``libzookeeper_mt.so`` and ``libpcre.so`` at runtime.
BUILD/INSTALL
=============
......@@ -55,7 +55,7 @@ The sources for the library dependencies can be obtained from:
* https://github.com/edenhill/librdkafka
* http://zookeeper.apache.org/
* http://lloyd.github.io/yajl/
* http://www.pcre.org/
Building and installing the library dependencies
------------------------------------------------
......@@ -64,13 +64,16 @@ The Kafka interface has been tested with these library versions:
* rdkafka 0.8.3
* zookeeper_mt 3.4.6
* yajl 2.0.4
* pcre 8.30 2012-02-04
If the libraries are already installed on the platform where
``trackrdrd`` will run, then no further action is necessary. To build
the libraries from source, it suffices to follow the instructions in
the source distributions -- no special configuration for the plugin is
necessary.
``trackrdrd`` will run, then no further action is necessary. This is
almost certainly the case for the pcre library, since it is a
requirement for Varnish.
To build the libraries from source, it suffices to follow the
instructions in the source distributions -- no special configuration
for the plugin is necessary.
Building and installing libtrackrdr-kafka
-----------------------------------------
......
......@@ -40,8 +40,7 @@
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_version.h>
#include <yajl/yajl_tree.h>
#include <yajl/yajl_version.h>
#include <pcre.h>
#ifdef HAVE_CONFIG_H
#include "../config.h"
......@@ -372,9 +371,9 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
snprintf(_version, LINE_MAX,
"libtrackrdr-kafka %s, rdkafka %s, zookeeper %d.%d.%d, "
"yajl %d.%d.%d", SO_VERSION, rd_kafka_version_str(),
"pcre %s", SO_VERSION, rd_kafka_version_str(),
ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION,
YAJL_MAJOR, YAJL_MINOR, YAJL_MICRO);
pcre_version());
MQ_LOG_Log(LOG_INFO, "initializing (%s)", _version);
if (zookeeper[0] == '\0' && brokerlist[0] == '\0') {
......@@ -456,9 +455,11 @@ MQ_InitConnections(void)
if (zookeeper[0] != '\0') {
struct String_vector brokers;
int zresult;
int result;
char zbrokerlist[LINE_MAX];
char *brokerptr = zbrokerlist;
const char *pcre_err;
pcre *host_regex, *port_regex;
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */
errno = 0;
......@@ -470,71 +471,72 @@ MQ_InitConnections(void)
return errmsg;
}
/* XXX: set watch param to non-zero for watcher callback */
if ((zresult = zoo_get_children(zh, "/brokers/ids", 0, &brokers))
if ((result = zoo_get_children(zh, "/brokers/ids", 0, &brokers))
!= ZOK) {
snprintf(errmsg, LINE_MAX,
"Cannot get broker ids from zookeeper: %s",
zerror(zresult));
zerror(result));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
/* XXX: cache compiled pcre regexen for use with the watcher
* callback */
host_regex = pcre_compile("\"host\"\\s*:\\s*\"([^\"]+)\"", 0,
&pcre_err, &result, NULL);
AN(host_regex);
port_regex = pcre_compile("\"port\"\\s*:\\s*(\\d+)", 0,
&pcre_err, &result, NULL);
AN(port_regex);
memset(zbrokerlist, 0, LINE_MAX);
for (int i = 0; i < brokers.count; i++) {
char path[PATH_MAX], broker[LINE_MAX];
int len = LINE_MAX;
snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]);
/* XXX: set up a watcher? */
if ((zresult = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
/* XXX: set up a watcher */
if ((result = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
snprintf(errmsg, LINE_MAX,
"Cannot get config for broker id %s from zookeeper: %s",
brokers.data[i], zerror(zresult));
brokers.data[i], zerror(result));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
if (len > 0) {
char jsonerr[LINE_MAX];
const char *hostpath[] = { "host", NULL };
const char *portpath[] = { "port", NULL };
yajl_val json, hostval, portval;
int ovector[6], r;
const char *host = NULL, *port = NULL;
broker[len] = '\0';
MQ_LOG_Log(LOG_DEBUG, "Zookeeper %s broker id %s config: %s",
zookeeper, brokers.data[i], broker);
json = yajl_tree_parse((const char *) broker, jsonerr,
LINE_MAX);
if (json == NULL) {
snprintf(errmsg, LINE_MAX,
"Cannot parse config for broker id %s from "
"zookeeper [%s]: %s", brokers.data[i], broker,
strlen(jsonerr) > 0 ? jsonerr : "unknown error");
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
hostval = yajl_tree_get(json, hostpath, yajl_t_string);
if (!hostval) {
r = pcre_exec(host_regex, NULL, broker, len, 0, 0, ovector, 6);
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX,
"Host not found in config for broker id %s from "
"zookeeper [%s]", brokers.data[i], broker);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
portval = yajl_tree_get(json, portpath, yajl_t_number);
if (!portval || !YAJL_IS_INTEGER(portval)) {
pcre_get_substring(broker, ovector, r, 1, &host);
AN(host);
r = pcre_exec(port_regex, NULL, broker, len, 0, 0, ovector, 6);
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX,
"Port not found in config for broker id %s from "
"zookeeper [%s]", brokers.data[i], broker);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
sprintf(brokerptr, "%s:%lld", YAJL_GET_STRING(hostval),
YAJL_GET_INTEGER(portval));
pcre_get_substring(broker, ovector, r, 1, &port);
AN(port);
sprintf(brokerptr, "%s:%s", host, port);
pcre_free_substring(host);
pcre_free_substring(port);
brokerptr += strlen(brokerptr);
if (i < brokers.count)
*brokerptr++ = ',';
yajl_tree_free(json);
}
else
MQ_LOG_Log(LOG_WARNING, "Empty config returned from zookeeper "
......
......@@ -15,6 +15,6 @@ test_kafka_LDADD = \
../log.$(OBJEXT) \
../monitor.$(OBJEXT) \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
CLEANFILES = kafka.log zoo.log *~
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment