Commit 09737052 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka uses pcre instead of yajl for JSON parsing

parent ca8b5997
...@@ -17,7 +17,7 @@ libtrackrdr_kafka_la_SOURCES = \ ...@@ -17,7 +17,7 @@ libtrackrdr_kafka_la_SOURCES = \
libtrackrdr_kafka_la_LIBADD = \ libtrackrdr_kafka_la_LIBADD = \
${PTHREAD_LIBS} \ ${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl -lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
libtrackrdr_kafka_la_LDFLAGS = -version-info ${CURRENT}:${REVISION}:${AGE} libtrackrdr_kafka_la_LDFLAGS = -version-info ${CURRENT}:${REVISION}:${AGE}
......
...@@ -9,7 +9,7 @@ Kafka implementation of the MQ interface for the Tracking Log Reader ...@@ -9,7 +9,7 @@ Kafka implementation of the MQ interface for the Tracking Log Reader
-------------------------------------------------------------------- --------------------------------------------------------------------
:Author: Geoffrey Simmons :Author: Geoffrey Simmons
:Date: 2014-05-28 :Date: 2014-05-31
:Version: 3.0 :Version: 3.0
:Manual section: 3 :Manual section: 3
...@@ -37,10 +37,10 @@ is specified as ``mq.config_fname`` in the configuration of ...@@ -37,10 +37,10 @@ is specified as ``mq.config_fname`` in the configuration of
* ``rdkafka``, a client library for Kafka * ``rdkafka``, a client library for Kafka
* ``zookeeper_mt``, a client library for Apache ZooKeeper with * ``zookeeper_mt``, a client library for Apache ZooKeeper with
multi-threading multi-threading
* ``yajl``, a library for JSON parsing * ``pcre``, a regular expression library (used for JSON parsing)
The dynamic linker must also be able to find ``librdkafka.so``, The dynamic linker must also be able to find ``librdkafka.so``,
``libzookeeper_mt.so`` and ``libyajl.so`` at runtime. ``libzookeeper_mt.so`` and ``libpcre.so`` at runtime.
BUILD/INSTALL BUILD/INSTALL
============= =============
...@@ -55,7 +55,7 @@ The sources for the library dependencies can be obtained from: ...@@ -55,7 +55,7 @@ The sources for the library dependencies can be obtained from:
* https://github.com/edenhill/librdkafka * https://github.com/edenhill/librdkafka
* http://zookeeper.apache.org/ * http://zookeeper.apache.org/
* http://lloyd.github.io/yajl/ * http://www.pcre.org/
Building and installing the library dependencies Building and installing the library dependencies
------------------------------------------------ ------------------------------------------------
...@@ -64,13 +64,16 @@ The Kafka interface has been tested with these library versions: ...@@ -64,13 +64,16 @@ The Kafka interface has been tested with these library versions:
* rdkafka 0.8.3 * rdkafka 0.8.3
* zookeeper_mt 3.4.6 * zookeeper_mt 3.4.6
* yajl 2.0.4 * pcre 8.30 2012-02-04
If the libraries are already installed on the platform where If the libraries are already installed on the platform where
``trackrdrd`` will run, then no further action is necessary. To build ``trackrdrd`` will run, then no further action is necessary. This is
the libraries from source, it suffices to follow the instructions in almost certainly the case for the pcre library, since it is a
the source distributions -- no special configuration for the plugin is requirement for Varnish.
necessary.
To build the libraries from source, it suffices to follow the
instructions in the source distributions -- no special configuration
for the plugin is necessary.
Building and installing libtrackrdr-kafka Building and installing libtrackrdr-kafka
----------------------------------------- -----------------------------------------
......
...@@ -40,8 +40,7 @@ ...@@ -40,8 +40,7 @@
#include <zookeeper/zookeeper.h> #include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_version.h> #include <zookeeper/zookeeper_version.h>
#include <yajl/yajl_tree.h> #include <pcre.h>
#include <yajl/yajl_version.h>
#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
#include "../config.h" #include "../config.h"
...@@ -372,9 +371,9 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname) ...@@ -372,9 +371,9 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
} }
snprintf(_version, LINE_MAX, snprintf(_version, LINE_MAX,
"libtrackrdr-kafka %s, rdkafka %s, zookeeper %d.%d.%d, " "libtrackrdr-kafka %s, rdkafka %s, zookeeper %d.%d.%d, "
"yajl %d.%d.%d", SO_VERSION, rd_kafka_version_str(), "pcre %s", SO_VERSION, rd_kafka_version_str(),
ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION, ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION,
YAJL_MAJOR, YAJL_MINOR, YAJL_MICRO); pcre_version());
MQ_LOG_Log(LOG_INFO, "initializing (%s)", _version); MQ_LOG_Log(LOG_INFO, "initializing (%s)", _version);
if (zookeeper[0] == '\0' && brokerlist[0] == '\0') { if (zookeeper[0] == '\0' && brokerlist[0] == '\0') {
...@@ -456,9 +455,11 @@ MQ_InitConnections(void) ...@@ -456,9 +455,11 @@ MQ_InitConnections(void)
if (zookeeper[0] != '\0') { if (zookeeper[0] != '\0') {
struct String_vector brokers; struct String_vector brokers;
int zresult; int result;
char zbrokerlist[LINE_MAX]; char zbrokerlist[LINE_MAX];
char *brokerptr = zbrokerlist; char *brokerptr = zbrokerlist;
const char *pcre_err;
pcre *host_regex, *port_regex;
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */ /* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */
errno = 0; errno = 0;
...@@ -470,71 +471,72 @@ MQ_InitConnections(void) ...@@ -470,71 +471,72 @@ MQ_InitConnections(void)
return errmsg; return errmsg;
} }
/* XXX: set watch param to non-zero for watcher callback */ /* XXX: set watch param to non-zero for watcher callback */
if ((zresult = zoo_get_children(zh, "/brokers/ids", 0, &brokers)) if ((result = zoo_get_children(zh, "/brokers/ids", 0, &brokers))
!= ZOK) { != ZOK) {
snprintf(errmsg, LINE_MAX, snprintf(errmsg, LINE_MAX,
"Cannot get broker ids from zookeeper: %s", "Cannot get broker ids from zookeeper: %s",
zerror(zresult)); zerror(result));
MQ_LOG_Log(LOG_ERR, errmsg); MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg; return errmsg;
} }
/* XXX: cache compiled pcre regexen for use with the watcher
* callback */
host_regex = pcre_compile("\"host\"\\s*:\\s*\"([^\"]+)\"", 0,
&pcre_err, &result, NULL);
AN(host_regex);
port_regex = pcre_compile("\"port\"\\s*:\\s*(\\d+)", 0,
&pcre_err, &result, NULL);
AN(port_regex);
memset(zbrokerlist, 0, LINE_MAX); memset(zbrokerlist, 0, LINE_MAX);
for (int i = 0; i < brokers.count; i++) { for (int i = 0; i < brokers.count; i++) {
char path[PATH_MAX], broker[LINE_MAX]; char path[PATH_MAX], broker[LINE_MAX];
int len = LINE_MAX; int len = LINE_MAX;
snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]); snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]);
/* XXX: set up a watcher? */ /* XXX: set up a watcher */
if ((zresult = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) { if ((result = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
snprintf(errmsg, LINE_MAX, snprintf(errmsg, LINE_MAX,
"Cannot get config for broker id %s from zookeeper: %s", "Cannot get config for broker id %s from zookeeper: %s",
brokers.data[i], zerror(zresult)); brokers.data[i], zerror(result));
MQ_LOG_Log(LOG_ERR, errmsg); MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg; return errmsg;
} }
if (len > 0) { if (len > 0) {
char jsonerr[LINE_MAX]; int ovector[6], r;
const char *hostpath[] = { "host", NULL }; const char *host = NULL, *port = NULL;
const char *portpath[] = { "port", NULL };
yajl_val json, hostval, portval;
broker[len] = '\0'; broker[len] = '\0';
MQ_LOG_Log(LOG_DEBUG, "Zookeeper %s broker id %s config: %s", MQ_LOG_Log(LOG_DEBUG, "Zookeeper %s broker id %s config: %s",
zookeeper, brokers.data[i], broker); zookeeper, brokers.data[i], broker);
json = yajl_tree_parse((const char *) broker, jsonerr, r = pcre_exec(host_regex, NULL, broker, len, 0, 0, ovector, 6);
LINE_MAX); if (r <= PCRE_ERROR_NOMATCH) {
if (json == NULL) {
snprintf(errmsg, LINE_MAX,
"Cannot parse config for broker id %s from "
"zookeeper [%s]: %s", brokers.data[i], broker,
strlen(jsonerr) > 0 ? jsonerr : "unknown error");
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
hostval = yajl_tree_get(json, hostpath, yajl_t_string);
if (!hostval) {
snprintf(errmsg, LINE_MAX, snprintf(errmsg, LINE_MAX,
"Host not found in config for broker id %s from " "Host not found in config for broker id %s from "
"zookeeper [%s]", brokers.data[i], broker); "zookeeper [%s]", brokers.data[i], broker);
MQ_LOG_Log(LOG_ERR, errmsg); MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg; return errmsg;
} }
portval = yajl_tree_get(json, portpath, yajl_t_number); pcre_get_substring(broker, ovector, r, 1, &host);
if (!portval || !YAJL_IS_INTEGER(portval)) { AN(host);
r = pcre_exec(port_regex, NULL, broker, len, 0, 0, ovector, 6);
if (r <= PCRE_ERROR_NOMATCH) {
snprintf(errmsg, LINE_MAX, snprintf(errmsg, LINE_MAX,
"Port not found in config for broker id %s from " "Port not found in config for broker id %s from "
"zookeeper [%s]", brokers.data[i], broker); "zookeeper [%s]", brokers.data[i], broker);
MQ_LOG_Log(LOG_ERR, errmsg); MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg; return errmsg;
} }
sprintf(brokerptr, "%s:%lld", YAJL_GET_STRING(hostval), pcre_get_substring(broker, ovector, r, 1, &port);
YAJL_GET_INTEGER(portval)); AN(port);
sprintf(brokerptr, "%s:%s", host, port);
pcre_free_substring(host);
pcre_free_substring(port);
brokerptr += strlen(brokerptr); brokerptr += strlen(brokerptr);
if (i < brokers.count) if (i < brokers.count)
*brokerptr++ = ','; *brokerptr++ = ',';
yajl_tree_free(json);
} }
else else
MQ_LOG_Log(LOG_WARNING, "Empty config returned from zookeeper " MQ_LOG_Log(LOG_WARNING, "Empty config returned from zookeeper "
......
...@@ -15,6 +15,6 @@ test_kafka_LDADD = \ ...@@ -15,6 +15,6 @@ test_kafka_LDADD = \
../log.$(OBJEXT) \ ../log.$(OBJEXT) \
../monitor.$(OBJEXT) \ ../monitor.$(OBJEXT) \
${PTHREAD_LIBS} \ ${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl -lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
CLEANFILES = kafka.log zoo.log *~ CLEANFILES = kafka.log zoo.log *~
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment