Commit 0a3eb15f authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: added MQ plugin for Kafka

parent c3cd67d6
ACLOCAL_AMFLAGS = -I m4
SUBDIRS = src src/test src/mq/activemq src/mq/activemq/test
SUBDIRS = src src/test src/mq/activemq src/mq/activemq/test \
src/mq/kafka src/mq/kafka/test
if HAVE_RST2MAN
dist_man_MANS = trackrdrd.3
......
......@@ -145,5 +145,7 @@ AC_CONFIG_FILES([
src/test/Makefile
src/mq/activemq/Makefile
src/mq/activemq/test/Makefile
src/mq/kafka/Makefile
src/mq/kafka/test/Makefile
])
AC_OUTPUT
/*
* Written by Poul-Henning Kamp <phk@phk.freebsd.dk>
*
* This file is in the public domain.
*
*/
#define ALLOC_OBJ(to, type_magic) \
do { \
(to) = calloc(sizeof *(to), 1); \
if ((to) != NULL) \
(to)->magic = (type_magic); \
} while (0)
#define FREE_OBJ(to) \
do { \
(to)->magic = (0); \
free(to); \
} while (0)
#define VALID_OBJ(ptr, type_magic) \
((ptr) != NULL && (ptr)->magic == (type_magic))
#define CHECK_OBJ(ptr, type_magic) \
do { \
assert((ptr)->magic == type_magic); \
} while (0)
#define CHECK_OBJ_NOTNULL(ptr, type_magic) \
do { \
assert((ptr) != NULL); \
assert((ptr)->magic == type_magic); \
} while (0)
#define CHECK_OBJ_ORNULL(ptr, type_magic) \
do { \
if ((ptr) != NULL) \
assert((ptr)->magic == type_magic); \
} while (0)
#define CAST_OBJ(to, from, type_magic) \
do { \
(to) = (from); \
if ((to) != NULL) \
CHECK_OBJ((to), (type_magic)); \
} while (0)
#define CAST_OBJ_NOTNULL(to, from, type_magic) \
do { \
(to) = (from); \
assert((to) != NULL); \
CHECK_OBJ((to), (type_magic)); \
} while (0)
#define REPLACE(ptr, val) \
do { \
if ((ptr) != NULL) \
free(ptr); \
if ((val) != NULL) { \
ptr = strdup(val); \
AN((ptr)); \
} else { \
ptr = NULL; \
} \
} while (0)
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) -I$(top_srcdir)/include
CURRENT = 3
REVISION = 0
AGE = 0
pkglib_LTLIBRARIES = libtrackrdr-kafka.la
libtrackrdr_kafka_la_SOURCES = \
$(top_srcdir)/include/mq.h \
$(top_srcdir)/include/config_common.h \
mq_kafka.h \
mq.c \
log.c \
$(top_srcdir)/src/config_common.c
libtrackrdr_kafka_la_LIBADD = \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl
libtrackrdr_kafka_la_LDFLAGS = -version-info ${CURRENT}:${REVISION}:${AGE}
libtrackrdr_kafka_la_CFLAGS = \
-DCURRENT=${CURRENT} \
-DREVISION=${REVISION} \
-DAGE=${AGE}
if HAVE_RST2MAN
dist_man_MANS = libtrackrdr-kafka.3
MAINTAINERCLEANFILES = $(dist_man_MANS)
endif
libtrackrdr-kafka.3: README.rst
if HAVE_RST2MAN
${RST2MAN} README.rst $@
endif
.. _ref-trackrdrd:
==================
libtrackrdr-kafka
==================
--------------------------------------------------------------------
Kafka implementation of the MQ interface for the Tracking Log Reader
--------------------------------------------------------------------
:Author: Geoffrey Simmons
:Date: 2014-05-28
:Version: 3.0
:Manual section: 3
DESCRIPTION
===========
``libtrackrdr-kafka.so`` provides an implementation of the tracking
reader's MQ interface to send messages to Apache Kafka message
brokers. See ``include/mq.h`` in the ``trackrdrd`` source distribution
for documentation of the interface.
To use this implementation with ``trackrdrd``, specify the shared
object as the value of ``mq.module`` in the tracking reader's
configuration (see trackrdrd(3)). The configuration value may be the
absolute path of the shared object; or its name, provided that it can
be found by the dynamic linker (see ld.so(8)).
``libtrackrdr-kafka`` also requires a configuration file, whose path
is specified as ``mq.config_fname`` in the configuration of
``trackrdrd``.
``libtrackrdrd-kafka`` in turn depends on these libraries:
* ``rdkafka``, a client library for Kafka
* ``zookeeper_mt``, a client library for Apache ZooKeeper with
multi-threading
* ``yajl``, a library for JSON parsing
The dynamic linker must also be able to find ``librdkafka.so``,
``libzookeeper_mt.so`` and ``libyajl.so`` at runtime.
BUILD/INSTALL
=============
The sources for ``libtrackrdr-kafka`` are provided in the source
repository for ``trackrdrd``, in the subdirectory ``src/mq/kafka/``
of::
git@repo.org:lhotse-tracking-varnish
The sources for the library dependencies can be obtained from:
* https://github.com/edenhill/librdkafka
* http://zookeeper.apache.org/
* http://lloyd.github.io/yajl/
Building and installing the library dependencies
------------------------------------------------
The Kafka interface has been tested with these library versions:
* rdkafka 0.8.3
* zookeeper_mt 3.4.6
* yajl 2.0.4
If the libraries are already installed on the platform where
``trackrdrd`` will run, then no further action is necessary. To build
the libraries from source, it suffices to follow the instructions in
the source distributions -- no special configuration for the plugin is
necessary.
Building and installing libtrackrdr-kafka
-----------------------------------------
``libtrackrdr-kafka`` is built as part of the global build for
``trackrdrd``; for details and requirements of the build, see
trackrdrd(3).
To specifically build the MQ implementation (without building all of
the rest of ``trackrdrd``), it suffices to invoke ``make`` commands in
the subdirectory ``src/mq/kafka`` (after having executed the
``configure`` script for ``trackrdrd``)::
# in lhotse-tracking-varnish/trackrdrd
$ cd src/mq/kafka
$ make
For self-tests after the build::
$ cd src/mq/kafka/test
$ make check
The global ``make`` and ``make check`` commands for ``trackrdrd`` also
execute both of these for the Kafka plugin.
The self-tests depend on the configuration file ``kafka.conf`` in the
``test/`` subdirectory, which specifies ``localhost:2181`` as the
address of a ZooKeeper server. If a ZooKeeper is listening, then tests
are run against that instance of ZooKeeper and any running Kafka
brokers that the ZooKeeper server is managing. If connections to a
ZooKeeper server or Kafka brokers fail, then the ``make check`` test
exits with the status ``SKIPPED``.
To install the shared object ``libtrackrdr-kafka.so``, run ``make
install`` as root, for example with ``sudo``::
$ sudo make install
In standard configuration, the ``.so`` file will be installed by
``libtool(1)``, and its location may be affected by the ``--libdir``
option to ``configure``.
CONFIGURATION
=============
As mentioned above, a configuration file for ``libtrackrdr-kafka``
MUST be specified in the configuration parameter ``mq.config_fname``
for ``trackrdrd``, and initialization of the MQ implementation fails
if this file cannot be found or read by the process owner of
``trackrdrd`` (or if its syntax is false, or if required parameters
are missing).
The syntax of the configuration file is the same as that of
``trackrdrd``, and it may contain configuration parameters for
``rdkafka``, except as noted below -- thus the configuration applies
to both the messaging plugin and the ``rdkafka`` client library.
If the config parameter ``zookeeper.connect`` is set, then the plugin
obtains information about Kafka brokers from the specified ZooKeeper
server(s), and the value of the ``rdkafka`` parameter
``metadata.broker.list`` is ignored. If ``zookeeper.connect`` is not
set, then an initial list brokers MUST be specified by
``metadata.broker.list`` -- if neither of ``zookeeper.connect`` and
``metadata.broker.list`` are set, then the configuration fails and
``trackrdrd`` will exit.
In addition to configuration parameters for ``rdkafka``, these
parameters can be specified:
===================== ==========================================================
Parameter Description
===================== ==========================================================
``zookeeper.connect`` Comma-separated list of ``host:port`` pairs specifying
the addresses of ZooKeeper servers. If not set, then
``metadata.broker.list`` MUST be set, as described above.
--------------------- ----------------------------------------------------------
``zookeeper.timeout`` Timeout for connections to ZooKeeper servers (optional,
default 0,
--------------------- ----------------------------------------------------------
``zookeeper.log`` Path of a log file for the ZooKeeper client (optional)
--------------------- ----------------------------------------------------------
``log`` Path of a log file for the messaging plugin and Kafka
client (optional)
--------------------- ----------------------------------------------------------
``topic`` Name of the Kafka topic to which messages are sent
--------------------- ----------------------------------------------------------
``mq.debug`` If set to true, then log at DEBUG level
===================== ==========================================================
Except as noted below, the configuration can specify any parameters for
the ``rdkafka`` client, as documented at::
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
The following ``rdkafka`` parameters in the config file are ignored
(they are set internally by the messaging plugin, or are only relevant
to consumers):
* ``client.id``
* ``error_cb``
* ``stats_cb``
* ``log_cb``
* ``socket_cb``
* ``open_cb``
* ``opaque``
* ``queued.*``
* ``fetch.*``
* ``group.id``
* ``dr_cb``
* ``dr_msg_cb``
* ``partitioner``
* ``opaque``
* ``auto.*``
* ``offset.*``
SHARDING
========
The plugin requires that calls to ``MQ_Send()`` supply a hexadecimal
string of up to 8 characters as the sharding key; ``MQ_Send()`` fails
if a key is not specified, or if it contains non-hex characters in the
first 8 bytes.
The plugin uses up to the first 8 hex digits of the key; if the string
is longer, then the remainder from the 9th byte is ignored.
LOGGING AND STATISTICS
======================
XXX: TuDu
MESSAGE SEND FAILURE AND RECOVERY
=================================
XXX: TuDu
* stats callback from rdkafka
* stats counters for missing shard keys or data
SIGNALS
=======
XXX: TuDu -- toggle DEBUG log level
SEE ALSO
========
* ``trackrdrd(3)``
* ``ld.so(8)``
* http://kafka.apache.org/
* http://zookeeper.apache.org/
* https://github.com/edenhill/librdkafka
* http://zookeeper.apache.org/doc/r3.4.6/zookeeperProgrammers.html#C+Binding
COPYRIGHT AND LICENCE
=====================
Both the software and this document are governed by a BSD 2-clause
licence.
| Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
| Copyright (c) 2014 Otto Gmbh & Co KG
| All rights reserved
| Use only with permission
| Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <syslog.h>
#include <stdio.h>
#include <stdarg.h>
#include <errno.h>
#include <string.h>
#include <time.h>
#include "mq_kafka.h"
static int lvl = LOG_INFO;
static const char *level2name[LOG_DEBUG+1];
static FILE *out = NULL;
static void
init_lvlnames(void)
{
level2name[LOG_EMERG] = "EMERG";
level2name[LOG_ALERT] = "ALERT";
level2name[LOG_CRIT] = "CRIT";
level2name[LOG_ERR] = "ERR";
level2name[LOG_WARNING] = "WARNING";
level2name[LOG_NOTICE] = "NOTICE";
level2name[LOG_INFO] = "INFO";
level2name[LOG_DEBUG] = "DEBUG";
}
void
MQ_LOG_Log(int level, const char *msg, ...)
{
time_t t;
char timestr[26];
va_list ap;
if (level > lvl || out == NULL)
return;
t = time(NULL);
ctime_r(&t, timestr);
timestr[24] = '\0';
flockfile(out);
fprintf(out, "%s [%s]: ", timestr, level2name[level]);
va_start(ap, msg);
(void) vfprintf(out, msg, ap);
va_end(ap);
fprintf(out, "\n");
fflush(out);
funlockfile(out);
}
void
MQ_LOG_SetLevel(int level)
{
lvl = level;
}
void
MQ_LOG_Close(void)
{
if (out != NULL)
fclose(out);
}
int
MQ_LOG_Open(const char *path)
{
AN(path);
if (path[0] == '\0')
return EINVAL;
if (strcmp(path, "-") == 0)
out = stdout;
else {
out = fopen(path, "a");
if (out == NULL)
return errno;
}
init_lvlnames();
return(0);
}
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <strings.h>
#include <limits.h>
#include <syslog.h>
#include <ctype.h>
#include <librdkafka/rdkafka.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_version.h>
#include <yajl/yajl_tree.h>
#include <yajl/yajl_version.h>
#ifdef HAVE_CONFIG_H
#include "../config.h"
#endif
#include "mq.h"
#include "mq_kafka.h"
#include "config_common.h"
#include "miniobj.h"
#define xstr(X) #X
#define str(X) xstr(X)
#if defined(CURRENT) && defined(REVISION) && defined(AGE)
#define SO_VERSION (str(CURRENT) "." str(REVISION) "." str(AGE))
#elif defined(VERSION)
#define SO_VERSION VERSION
#else
#define SO_VERSION "unknown version"
#endif
typedef struct kafka_wrk {
unsigned magic;
#define KAFKA_WRK_MAGIC 0xd14d4425
int n;
rd_kafka_t *kafka;
rd_kafka_topic_t *topic;
int err;
} kafka_wrk_t;
static kafka_wrk_t **workers;
static unsigned nwrk = 0;
static char logpath[PATH_MAX] = "";
static char zookeeper[LINE_MAX] = "";
static char brokerlist[LINE_MAX] = "";
static char zoolog[PATH_MAX] = "";
static FILE *zoologf;
static zhandle_t *zh;
static unsigned zoo_timeout = 0;
static char topic[LINE_MAX];
static rd_kafka_topic_conf_t *topic_conf;
static rd_kafka_conf_t *conf;
static char errmsg[LINE_MAX];
static char _version[LINE_MAX];
static int loglvl = LOG_INFO;
static void
log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
(void) fac;
MQ_LOG_Log(level, "rdkafka %s: %s", rd_kafka_name(rk), buf);
}
static void
dr_cb(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err,
void *opaque, void *msg_opaque)
{
(void) msg_opaque;
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_ERR, "Delivery error (client ID = %s, msg = [%.*s]): %s",
rd_kafka_name(rk), (int) len, (char *) payload,
rd_kafka_err2str(err));
wrk->err = 1;
}
else if (loglvl == LOG_DEBUG)
MQ_LOG_Log(LOG_DEBUG, "Delivered (client ID = %s): msg = [%.*s]",
rd_kafka_name(rk), (int) len, (char *) payload);
}
static void
error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
kafka_wrk_t *wrk = (kafka_wrk_t *) opaque;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
MQ_LOG_Log(LOG_ERR, "Client error (ID = %s) %d: %s", rd_kafka_name(rk), err,
reason);
wrk->err = 1;
}
static int
stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
(void) opaque;
MQ_LOG_Log(LOG_INFO, "Client stats (ID = %s): %.*s", rd_kafka_name(rk),
(int) json_len, json);
return 0;
}
/*
* Partitioner assumes that the key string is an unsigned 32-bit
* hexadecimal.
*/
static int32_t
partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
int32_t partition_cnt, void *rkt_opaque, void *msg_opaque)
{
int32_t partition;
unsigned long key;
char keystr[sizeof("ffffffff")], *endptr = NULL;
(void) rkt_opaque;
(void) msg_opaque;
strncpy(keystr, (const char *) keydata, keylen);
keystr[keylen] = '\0';
errno = 0;
key = strtoul(keystr, &endptr, 16);
if (errno != 0 || *endptr != '\0' || key > 0xffffffffUL) {
/* XXX: should use opaque objs to identify msg & maybe wrk obj */
MQ_LOG_Log(LOG_ERR, "Cannot parse partition key: %.*s", (int) keylen,
(const char *) keydata);
return RD_KAFKA_PARTITION_UA;
}
if (partition_cnt != 0 && (partition_cnt & (partition_cnt - 1)) == 0)
/* partition_cnt is a power of 2 */
partition = key & (partition_cnt - 1);
else
partition = key % partition_cnt;
assert(partition >= 0);
if (! rd_kafka_topic_partition_available(rkt, partition)) {
MQ_LOG_Log(LOG_ERR, "Partition %d not available", partition);
return RD_KAFKA_PARTITION_UA;
}
MQ_LOG_Log(LOG_DEBUG, "Computed partition %d for key %.*s", partition,
(int) keylen, (const char *) keydata);
return partition;
}
/* XXX: encapsulate wrk_init and _fini in a separate source */
static const char
*wrk_init(int wrk_num)
{
char clientid[sizeof("libtrackrdr-kafka-worker-99999")];
rd_kafka_conf_t *wrk_conf;
rd_kafka_topic_conf_t *wrk_topic_conf;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
kafka_wrk_t *wrk;
wrk_conf = rd_kafka_conf_dup(conf);
wrk_topic_conf = rd_kafka_topic_conf_dup(topic_conf);
sprintf(clientid, "libtrackrdr-kafka-worker %d", wrk_num);
if (rd_kafka_conf_set(wrk_conf, "client.id", clientid, errmsg,
LINE_MAX) != RD_KAFKA_CONF_OK) {
MQ_LOG_Log(LOG_ERR, "rdkafka config error [client.id = %s]: %s",
clientid, errmsg);
return errmsg;
}
rd_kafka_topic_conf_set_partitioner_cb(wrk_topic_conf, partitioner_cb);
ALLOC_OBJ(wrk, KAFKA_WRK_MAGIC);
if (wrk == NULL) {
snprintf(errmsg, LINE_MAX, "Failed to create worker handle: %s",
strerror(errno));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
rd_kafka_conf_set_opaque(wrk_conf, (void *) wrk);
rd_kafka_topic_conf_set_opaque(wrk_topic_conf, (void *) wrk);
rk = rd_kafka_new(RD_KAFKA_PRODUCER, wrk_conf, errmsg, LINE_MAX);
if (rk == NULL) {
MQ_LOG_Log(LOG_ERR, "Failed to create producer: %s", errmsg);
return errmsg;
}
CHECK_OBJ_NOTNULL((kafka_wrk_t *) rd_kafka_opaque(rk), KAFKA_WRK_MAGIC);
rd_kafka_set_log_level(rk, loglvl);
errno = 0;
rkt = rd_kafka_topic_new(rk, topic, wrk_topic_conf);
if (rkt == NULL) {
rd_kafka_resp_err_t rkerr = rd_kafka_errno2err(errno);
snprintf(errmsg, LINE_MAX, "Failed to initialize topic: %s",
rd_kafka_err2str(rkerr));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
wrk-> n = wrk_num;
wrk->kafka = rk;
wrk->topic = rkt;
wrk->err = 0;
workers[wrk_num] = wrk;
MQ_LOG_Log(LOG_INFO, "initialized worker %d", wrk_num);
rd_kafka_poll(wrk->kafka, 0);
return NULL;
}
static void
wrk_fini(kafka_wrk_t *wrk)
{
int wrk_num;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
wrk_num = wrk->n;
assert(wrk_num >= 0 && wrk_num < nwrk);
/* Wait for messages to be delivered */
/* XXX: timeout? configure poll timeout? */
while (rd_kafka_outq_len(wrk->kafka) > 0)
rd_kafka_poll(wrk->kafka, 100);
rd_kafka_topic_destroy(wrk->topic);
rd_kafka_destroy(wrk->kafka);
FREE_OBJ(wrk);
AN(wrk);
workers[wrk_num] = NULL;
}
static int
conf_add(const char *lval, const char *rval)
{
rd_kafka_conf_res_t result;
char errstr[LINE_MAX];
errstr[0] = '\0';
/* XXX: rename as "mq.log" */
if (strcmp(lval, "log") == 0) {
strncpy(logpath, rval, PATH_MAX);
return(0);
}
if (strcmp(lval, "zookeeper.connect") == 0) {
strncpy(zookeeper, rval, LINE_MAX);
return(0);
}
/* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */
if (strcmp(lval, "zookeeper.timeout") == 0) {
char *endptr = NULL;
long val;
errno = 0;
val = strtoul(rval, &endptr, 10);
if (errno != 0)
return errno;
if (*endptr != '\0')
return EINVAL;
if (val > UINT_MAX)
return ERANGE;
zoo_timeout = val;
return(0);
}
if (strcmp(lval, "zookeeper.log") == 0) {
strncpy(zoolog, rval, PATH_MAX);
return(0);
}
if (strcmp(lval, "topic") == 0) {
strncpy(topic, rval, LINE_MAX);
return(0);
}
if (strcmp(lval, "metadata.broker.list") == 0) {
strncpy(brokerlist, rval, LINE_MAX);
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
return(0);
}
/* XXX: use the rdkakfka param "log_level" instead */
if (strcmp(lval, "mq.debug") == 0) {
if (strcmp(rval, "1") == 0
|| strcasecmp(rval, "true") == 0
|| strcasecmp(rval, "yes") == 0
|| strcasecmp(rval, "on") == 0)
loglvl = LOG_DEBUG;
else if (strcmp(rval, "0") != 0
&& strcasecmp(rval, "false") != 0
&& strcasecmp(rval, "no") != 0
&& strcasecmp(rval, "off") != 0)
return EINVAL;
return(0);
}
result = rd_kafka_topic_conf_set(topic_conf, lval, rval, errstr, LINE_MAX);
if (result == RD_KAFKA_CONF_UNKNOWN)
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
else
return(0);
}
/* XXX: fail if "topic" is not set */
const char *
MQ_GlobalInit(unsigned nworkers, const char *config_fname)
{
snprintf(_version, LINE_MAX,
"libtrackrdr-kafka %s, rdkafka %s, zookeeper %d.%d.%d, "
"yajl %d.%d.%d", SO_VERSION, rd_kafka_version_str(),
ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION,
YAJL_MAJOR, YAJL_MINOR, YAJL_MICRO);
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();
if (CONF_ReadFile(config_fname, conf_add) != 0)
return "Error reading config file for Kafka";
if (logpath[0] != '\0') {
int err;
if ((err = MQ_LOG_Open(logpath)) != 0) {
snprintf(errmsg, LINE_MAX, "Cannot open %s: %s", logpath,
strerror(err));
return errmsg;
}
MQ_LOG_SetLevel(loglvl);
}
MQ_LOG_Log(LOG_INFO, "initializing (%s)", _version);
if (zookeeper[0] == '\0' && brokerlist[0] == '\0') {
snprintf(errmsg, LINE_MAX,
"zookeeper.connect and metadata.broker.list not set in %s",
config_fname);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
workers = (kafka_wrk_t **) calloc(sizeof (kafka_wrk_t *), nworkers);
if (workers == NULL) {
snprintf(errmsg, LINE_MAX, "Cannot allocate worker table: %s",
strerror(errno));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
nwrk = nworkers;
if (zoolog[0] != '\0') {
zoologf = fopen(zoolog, "a");
if (zoologf == NULL) {
snprintf(errmsg, LINE_MAX, "Cannot open zookeeper.log %s: %s",
zoolog, strerror(errno));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
zoo_set_log_stream(zoologf);
}
rd_kafka_conf_set_dr_cb(conf, dr_cb);
rd_kafka_conf_set_error_cb(conf, error_cb);
rd_kafka_conf_set_log_cb(conf, log_cb);
rd_kafka_conf_set_stats_cb(conf, stats_cb);
rd_kafka_topic_conf_set_partitioner_cb(topic_conf, partitioner_cb);
if (loglvl == LOG_DEBUG) {
size_t cfglen;
const char **cfg;
/* Dump config */
MQ_LOG_Log(LOG_DEBUG, "zookeeper.connect = %s", zookeeper);
MQ_LOG_Log(LOG_DEBUG, "topic = %s", topic);
cfg = rd_kafka_conf_dump(conf, &cfglen);
if (cfg != NULL && cfglen > 0)
for (int i = 0; i < cfglen >> 1; i++) {
if (cfg[2*i] == NULL)
break;
MQ_LOG_Log(LOG_DEBUG, "%s = %s", cfg[2*i], cfg[2*i + 1]);
}
rd_kafka_conf_dump_free(cfg, cfglen);
cfg = rd_kafka_topic_conf_dump(topic_conf, &cfglen);
if (cfg != NULL && cfglen > 0)
for (int i = 0; i < cfglen >> 1; i++)
MQ_LOG_Log(LOG_DEBUG, "%s = %s", cfg[2*i], cfg[2*i + 1]);
rd_kafka_conf_dump_free(cfg, cfglen);
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
}
return NULL;
}
const char *
MQ_InitConnections(void)
{
AN(conf);
AN(topic_conf);
assert(zookeeper[0] != '\0' || brokerlist[0] != '\0');
if (zookeeper[0] != '\0') {
struct String_vector brokers;
int zresult;
char zbrokerlist[LINE_MAX];
char *brokerptr = zbrokerlist;
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */
errno = 0;
zh = zookeeper_init(zookeeper, NULL, zoo_timeout, 0, 0, 0);
if (zh == NULL) {
snprintf(errmsg, LINE_MAX, "Zookeeper init/connect failure: %s",
strerror(errno));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
/* XXX: set watch param to non-zero for watcher callback */
if ((zresult = zoo_get_children(zh, "/brokers/ids", 0, &brokers))
!= ZOK) {
snprintf(errmsg, LINE_MAX,
"Cannot get broker ids from zookeeper: %s",
zerror(zresult));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
memset(zbrokerlist, 0, LINE_MAX);
for (int i = 0; i < brokers.count; i++) {
char path[PATH_MAX], broker[LINE_MAX];
int len = LINE_MAX;
snprintf(path, PATH_MAX, "/brokers/ids/%s", brokers.data[i]);
/* XXX: set up a watcher? */
if ((zresult = zoo_get(zh, path, 0, broker, &len, NULL)) != ZOK) {
snprintf(errmsg, LINE_MAX,
"Cannot get config for broker id %s from zookeeper: %s",
brokers.data[i], zerror(zresult));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
if (len > 0) {
char jsonerr[LINE_MAX];
const char *hostpath[] = { "host", NULL };
const char *portpath[] = { "port", NULL };
yajl_val json, hostval, portval;
broker[len] = '\0';
MQ_LOG_Log(LOG_DEBUG, "Zookeeper %s broker id %s config: %s",
zookeeper, brokers.data[i], broker);
json = yajl_tree_parse((const char *) broker, jsonerr,
LINE_MAX);
if (json == NULL) {
snprintf(errmsg, LINE_MAX,
"Cannot parse config for broker id %s from "
"zookeeper [%s]: %s", brokers.data[i], broker,
strlen(jsonerr) > 0 ? jsonerr : "unknown error");
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
hostval = yajl_tree_get(json, hostpath, yajl_t_string);
if (!hostval) {
snprintf(errmsg, LINE_MAX,
"Host not found in config for broker id %s from "
"zookeeper [%s]", brokers.data[i], broker);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
portval = yajl_tree_get(json, portpath, yajl_t_number);
if (!portval || !YAJL_IS_INTEGER(portval)) {
snprintf(errmsg, LINE_MAX,
"Port not found in config for broker id %s from "
"zookeeper [%s]", brokers.data[i], broker);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
sprintf(brokerptr, "%s:%lld", YAJL_GET_STRING(hostval),
YAJL_GET_INTEGER(portval));
brokerptr += strlen(brokerptr);
if (i < brokers.count)
*brokerptr++ = ',';
yajl_tree_free(json);
}
else
MQ_LOG_Log(LOG_WARNING, "Empty config returned from zookeeper "
"for broker id %s", brokers.data[i]);
}
deallocate_String_vector(&brokers);
if (zbrokerlist[0] == '\0')
if (brokerlist[0] == '\0') {
snprintf(errmsg, LINE_MAX,
"Zookeeper at %s returned no brokers, and "
"metadata.broker.list not configured", zookeeper);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
else
MQ_LOG_Log(LOG_WARNING, "Zookeeper at %s returned no brokers, "
"using value of metadata.broker.list instead",
zookeeper);
else {
strcpy(brokerlist, zbrokerlist);
MQ_LOG_Log(LOG_DEBUG, "Zookeeper %s broker list %s", zookeeper,
brokerlist);
}
}
if (rd_kafka_conf_set(conf, "metadata.broker.list", brokerlist, errmsg,
LINE_MAX) != RD_KAFKA_CONF_OK) {
MQ_LOG_Log(LOG_ERR,
"rdkafka config error [metadata.broker.list = %s]: %s",
brokerlist, errmsg);
return errmsg;
}
for (int i = 0; i < nwrk; i++) {
const char *err = wrk_init(i);
if (err != NULL)
return err;
}
return NULL;
}
const char *
MQ_WorkerInit(void **priv, int wrk_num)
{
kafka_wrk_t *wrk;
assert(wrk_num >= 1 && wrk_num <= nwrk);
wrk = workers[wrk_num - 1];
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
*priv = (void *) wrk;
return NULL;
}
/*
* XXX: we really only want to set off trackrdrd error recovery on
* message send failure or priv == NULL, not e.g. if the key is
* missing or invalid. How to signal back a recoverable error?
*/
const char *
MQ_Send(void *priv, const char *data, unsigned len, const char *key,
unsigned keylen)
{
kafka_wrk_t *wrk;
void *payload = NULL;
char *ret = NULL;
/* XXX: error? */
if (len == 0)
return NULL;
if (priv == NULL) {
MQ_LOG_Log(LOG_ERR, "MQ_Send() called with NULL worker object");
return "Worker object is NULL";
}
CAST_OBJ(wrk, priv, KAFKA_WRK_MAGIC);
/*
* XXX
* These errors are recoverable
* Increment stats counters on error, so that they can be monitored
* Toggle log level DEBUG with signals
*/
if (key == NULL || keylen == 0) {
snprintf(errmsg, LINE_MAX, "%s message shard key is missing",
rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_ERR, errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=", rd_kafka_name(wrk->kafka),
len, data);
return errmsg;
}
if (data == NULL) {
snprintf(errmsg, LINE_MAX, "%s message payload is NULL",
rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_DEBUG, "%s data= key=[%.*s]", rd_kafka_name(wrk->kafka),
keylen, key);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
if (keylen > 8)
keylen = 8;
for (int i = 0; i < keylen; i++)
if (!isxdigit(key[i])) {
snprintf(errmsg, LINE_MAX, "%s message shard key is not hex",
rd_kafka_name(wrk->kafka));
MQ_LOG_Log(LOG_ERR, errmsg);
MQ_LOG_Log(LOG_DEBUG, "%s data=[%.*s] key=[%.*s]",
rd_kafka_name(wrk->kafka), len, data, keylen, key);
return errmsg;
}
REPLACE(payload, data);
if (rd_kafka_produce(wrk->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE,
payload, len, key, keylen, NULL) == -1) {
snprintf(errmsg, LINE_MAX, rd_kafka_err2str(rd_kafka_errno2err(errno)));
MQ_LOG_Log(LOG_ERR, "%s message send failure (%d): %s",
rd_kafka_name(wrk->kafka), errno, errmsg);
ret = errmsg;
}
rd_kafka_poll(wrk->kafka, 0);
return ret;
}
const char *
MQ_Reconnect(void **priv)
{
kafka_wrk_t *wrk;
int wrk_num;
const char *err;
CAST_OBJ_NOTNULL(wrk, *priv, KAFKA_WRK_MAGIC);
wrk_num = wrk->n;
assert(wrk_num >= 0 && wrk_num < nwrk);
wrk_fini(wrk);
err = wrk_init(wrk_num);
if (err != NULL)
return err;
*priv = workers[wrk_num];
return NULL;
}
const char *
MQ_Version(void *priv, char *version)
{
(void) priv;
strcpy(version, _version);
return NULL;
}
const char *
MQ_ClientID(void *priv, char *clientID)
{
kafka_wrk_t *wrk;
CAST_OBJ_NOTNULL(wrk, priv, KAFKA_WRK_MAGIC);
strcpy(clientID, rd_kafka_name(wrk->kafka));
return NULL;
}
const char *
MQ_WorkerShutdown(void **priv)
{
kafka_wrk_t *wrk;
CAST_OBJ_NOTNULL(wrk, *priv, KAFKA_WRK_MAGIC);
wrk_fini(wrk);
*priv = NULL;
return NULL;
}
const char *
MQ_GlobalShutdown(void)
{
int zerr;
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL)
wrk_fini(workers[i]);
free(workers);
rd_kafka_conf_destroy(conf);
rd_kafka_topic_conf_destroy(topic_conf);
errno = 0;
if ((zerr = zookeeper_close(zh)) != ZOK) {
const char *err = zerror(zerr);
if (zerr == ZSYSTEMERROR)
snprintf(errmsg, LINE_MAX, "Error closing zookeeper: %s (%s)", err,
strerror(errno));
else
snprintf(errmsg, LINE_MAX, "Error closing zookeeper: %s", err);
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
fclose(zoologf);
MQ_LOG_Log(LOG_INFO, "shutting down");
MQ_LOG_Close();
return NULL;
}
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <assert.h>
#define AZ(foo) do { assert((foo) == 0); } while (0)
#define AN(foo) do { assert((foo) != 0); } while (0)
/* log.c */
int MQ_LOG_Open(const char *path);
void MQ_LOG_Log(int level, const char *msg, ...);
void MQ_LOG_SetLevel(int level);
void MQ_LOG_Close(void);
INCLUDES = -I$(top_srcdir)/include
TESTS = test_kafka
check_PROGRAMS = test_kafka
test_kafka_SOURCES = \
../../minunit.h \
../../../../include/mq.h \
test_kafka.c
test_kafka_LDADD = \
../../../config_common.$(OBJEXT) \
../mq.$(OBJEXT) \
../log.$(OBJEXT) \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl
CLEANFILES = kafka.log zoo.log *~
# test config for Kafka MQ plugin
log = kafka.log
zookeeper.connect = localhost:2181
zookeeper.timeout = 10000
zookeeper.log = zoo.log
topic = libtrackrdr_kafka_test
mq.debug = true
debug = all
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "mq.h"
#include "../../../test/minunit.h"
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define KAFKA_CONFIG "kafka.conf"
#define NWORKERS 1
int tests_run = 0;
void *worker;
/* N.B.: Always run the tests in this order */
static char
*test_global_init(void)
{
const char *err;
printf("... testing Kafka global initialization\n");
err = MQ_GlobalInit(NWORKERS, KAFKA_CONFIG);
VMASSERT(err == NULL, "MQ_GlobalInit: %s", err);
return NULL;
}
static char
*test_init_connection(void)
{
const char *err;
printf("... testing Kafka connection initialization\n");
err = MQ_InitConnections();
if (err != NULL && strstr(err, "connection loss") != NULL) {
printf("No connection, Kafka/Zookeeper assumed not running\n");
exit(EXIT_SKIPPED);
}
VMASSERT(err == NULL, "MQ_InitConnections: %s", err);
return NULL;
}
static const char
*test_worker_init(void)
{
const char *err;
printf("... testing Kafka worker init\n");
err = MQ_WorkerInit(&worker, NWORKERS);
VMASSERT(err == NULL, "MQ_WorkerInit: %s", err);
MASSERT0(worker != NULL, "Worker is NULL after MQ_WorkerInit");
return NULL;
}
static const char
*test_version(void)
{
const char *err;
char version[BUFSIZ];
printf("... testing Kafka version info\n");
MASSERT0(worker != NULL, "MQ_Version: worker is NULL before call");
err = MQ_Version(worker, version);
VMASSERT(err == NULL, "MQ_Version: %s", err);
MASSERT0(version[0] != '\0', "MQ_Version: version is empty");
return NULL;
}
static const char
*test_clientID(void)
{
const char *err;
char clientID[BUFSIZ];
printf("... testing Kafka client ID info\n");
MASSERT0(worker != NULL, "MQ_ClientID: worker is NULL before call");
err = MQ_ClientID(worker, clientID);
VMASSERT(err == NULL, "MQ_ClientID: %s", err);
MASSERT0(clientID[0] != '\0', "MQ_ClientID: client ID is empty");
return NULL;
}
static const char
*test_send(void)
{
const char *err;
printf("... testing Kafka message send\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_Send(worker, "foo bar baz quux", 16, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send: %s", err);
/* Keys shorter and longer than 8 hex digits */
err = MQ_Send(worker, "the quick brown fox", 19, "abcdef", 6);
VMASSERT(err == NULL, "MQ_Send: %s", err);
err = MQ_Send(worker, "jumps over the lazy dog", 23,
"fedcba9876543210", 16);
VMASSERT(err == NULL, "MQ_Send: %s", err);
/* No error if message is empty (silently discarded) */
err = MQ_Send(worker, "", 0, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send: %s", err);
/* Fail if the worker object is null */
err = MQ_Send(NULL, "foo bar baz quux", 16, "12345678", 8);
MAN(err);
/* Fail if the key is empty */
err = MQ_Send(worker, "foo bar baz quux", 16, "", 0);
MAN(err);
VMASSERT(strstr("shard key is missing", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the key is NULL */
err = MQ_Send(worker, "foo bar baz quux", 16, NULL, 0);
MAN(err);
VMASSERT(strstr("shard key is missing", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the key contains non-hex characters */
err = MQ_Send(worker, "foo bar baz quux", 16, "key", 3);
MAN(err);
VMASSERT(strstr("shard key is not hex", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the message is NULL */
err = MQ_Send(worker, NULL, 16, "12345678", 8);
MAN(err);
VMASSERT(strstr("message payload is NULL", err) == 0,
"MQ_Send unexpected error message: %s", err);
return NULL;
}
static const char
*test_reconnect(void)
{
const char *err;
printf("... testing Kafka reconnect\n");
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL before call");
err = MQ_Reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
err = MQ_Send(worker, "send after reconnect", 20, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err);
return NULL;
}
static const char
*test_worker_shutdown(void)
{
const char *err;
printf("... testing Kafka worker shutdown\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_WorkerShutdown(&worker);
VMASSERT(err == NULL, "MQ_WorkerShutdown: %s", err);
MASSERT0(worker == NULL, "Worker not NULL after shutdown");
err = MQ_Send(worker, "foo bar baz quux", 16, "12345678", 8);
MASSERT0(err != NULL, "No failure on MQ_Send after worker shutdown");
return NULL;
}
static const char
*test_global_shutdown(void)
{
const char *err;
printf("... testing Kafka global shutdown\n");
err = MQ_GlobalShutdown();
VMASSERT(err == NULL, "MQ_GlobalShutdown: %s", err);
return NULL;
}
static const char
*all_tests(void)
{
mu_run_test(test_global_init);
mu_run_test(test_init_connection);
mu_run_test(test_worker_init);
mu_run_test(test_version);
mu_run_test(test_clientID);
mu_run_test(test_send);
mu_run_test(test_reconnect);
mu_run_test(test_worker_shutdown);
mu_run_test(test_global_shutdown);
return NULL;
}
TEST_RUNNER
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment