Commit ee1a700f authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: added MQ plugin for Kafka

parent 4bb93144
ACLOCAL_AMFLAGS = -I m4
SUBDIRS = src src/test src/mq/activemq src/mq/activemq/test
SUBDIRS = src src/test src/mq/activemq src/mq/activemq/test \
src/mq/kafka src/mq/kafka/test
if HAVE_RST2MAN
dist_man_MANS = trackrdrd.3
......
......@@ -145,5 +145,7 @@ AC_CONFIG_FILES([
src/test/Makefile
src/mq/activemq/Makefile
src/mq/activemq/test/Makefile
src/mq/kafka/Makefile
src/mq/kafka/test/Makefile
])
AC_OUTPUT
/*
* Written by Poul-Henning Kamp <phk@phk.freebsd.dk>
*
* This file is in the public domain.
*
*/
#define ALLOC_OBJ(to, type_magic) \
do { \
(to) = calloc(sizeof *(to), 1); \
if ((to) != NULL) \
(to)->magic = (type_magic); \
} while (0)
#define FREE_OBJ(to) \
do { \
(to)->magic = (0); \
free(to); \
} while (0)
#define VALID_OBJ(ptr, type_magic) \
((ptr) != NULL && (ptr)->magic == (type_magic))
#define CHECK_OBJ(ptr, type_magic) \
do { \
assert((ptr)->magic == type_magic); \
} while (0)
#define CHECK_OBJ_NOTNULL(ptr, type_magic) \
do { \
assert((ptr) != NULL); \
assert((ptr)->magic == type_magic); \
} while (0)
#define CHECK_OBJ_ORNULL(ptr, type_magic) \
do { \
if ((ptr) != NULL) \
assert((ptr)->magic == type_magic); \
} while (0)
#define CAST_OBJ(to, from, type_magic) \
do { \
(to) = (from); \
if ((to) != NULL) \
CHECK_OBJ((to), (type_magic)); \
} while (0)
#define CAST_OBJ_NOTNULL(to, from, type_magic) \
do { \
(to) = (from); \
assert((to) != NULL); \
CHECK_OBJ((to), (type_magic)); \
} while (0)
#define REPLACE(ptr, val) \
do { \
if ((ptr) != NULL) \
free(ptr); \
if ((val) != NULL) { \
ptr = strdup(val); \
AN((ptr)); \
} else { \
ptr = NULL; \
} \
} while (0)
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) -I$(top_srcdir)/include
CURRENT = 3
REVISION = 0
AGE = 0
pkglib_LTLIBRARIES = libtrackrdr-kafka.la
libtrackrdr_kafka_la_SOURCES = \
$(top_srcdir)/include/mq.h \
$(top_srcdir)/include/config_common.h \
mq_kafka.h \
mq.c \
log.c \
$(top_srcdir)/src/config_common.c
libtrackrdr_kafka_la_LIBADD = \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl
libtrackrdr_kafka_la_LDFLAGS = -version-info ${CURRENT}:${REVISION}:${AGE}
libtrackrdr_kafka_la_CFLAGS = \
-DCURRENT=${CURRENT} \
-DREVISION=${REVISION} \
-DAGE=${AGE}
if HAVE_RST2MAN
dist_man_MANS = libtrackrdr-kafka.3
MAINTAINERCLEANFILES = $(dist_man_MANS)
endif
libtrackrdr-kafka.3: README.rst
if HAVE_RST2MAN
${RST2MAN} README.rst $@
endif
.. _ref-trackrdrd:
==================
libtrackrdr-kafka
==================
--------------------------------------------------------------------
Kafka implementation of the MQ interface for the Tracking Log Reader
--------------------------------------------------------------------
:Author: Geoffrey Simmons
:Date: 2014-05-28
:Version: 3.0
:Manual section: 3
DESCRIPTION
===========
``libtrackrdr-kafka.so`` provides an implementation of the tracking
reader's MQ interface to send messages to Apache Kafka message
brokers. See ``include/mq.h`` in the ``trackrdrd`` source distribution
for documentation of the interface.
To use this implementation with ``trackrdrd``, specify the shared
object as the value of ``mq.module`` in the tracking reader's
configuration (see trackrdrd(3)). The configuration value may be the
absolute path of the shared object; or its name, provided that it can
be found by the dynamic linker (see ld.so(8)).
``libtrackrdr-kafka`` also requires a configuration file, whose path
is specified as ``mq.config_fname`` in the configuration of
``trackrdrd``.
``libtrackrdrd-kafka`` in turn depends on these libraries:
* ``rdkafka``, a client library for Kafka
* ``zookeeper_mt``, a client library for Apache ZooKeeper with
multi-threading
* ``yajl``, a library for JSON parsing
The dynamic linker must also be able to find ``librdkafka.so``,
``libzookeeper_mt.so`` and ``libyajl.so`` at runtime.
BUILD/INSTALL
=============
The sources for ``libtrackrdr-kafka`` are provided in the source
repository for ``trackrdrd``, in the subdirectory ``src/mq/kafka/``
of::
git@repo.org:trackrdrd
The sources for the library dependencies can be obtained from:
* https://github.com/edenhill/librdkafka
* http://zookeeper.apache.org/
* http://lloyd.github.io/yajl/
Building and installing the library dependencies
------------------------------------------------
The Kafka interface has been tested with these library versions:
* rdkafka 0.8.3
* zookeeper_mt 3.4.6
* yajl 2.0.4
If the libraries are already installed on the platform where
``trackrdrd`` will run, then no further action is necessary. To build
the libraries from source, it suffices to follow the instructions in
the source distributions -- no special configuration for the plugin is
necessary.
Building and installing libtrackrdr-kafka
-----------------------------------------
``libtrackrdr-kafka`` is built as part of the global build for
``trackrdrd``; for details and requirements of the build, see
trackrdrd(3).
To specifically build the MQ implementation (without building all of
the rest of ``trackrdrd``), it suffices to invoke ``make`` commands in
the subdirectory ``src/mq/kafka`` (after having executed the
``configure`` script for ``trackrdrd``)::
# in trackrdrd/trackrdrd
$ cd src/mq/kafka
$ make
For self-tests after the build::
$ cd src/mq/kafka/test
$ make check
The global ``make`` and ``make check`` commands for ``trackrdrd`` also
execute both of these for the Kafka plugin.
The self-tests depend on the configuration file ``kafka.conf`` in the
``test/`` subdirectory, which specifies ``localhost:2181`` as the
address of a ZooKeeper server. If a ZooKeeper is listening, then tests
are run against that instance of ZooKeeper and any running Kafka
brokers that the ZooKeeper server is managing. If connections to a
ZooKeeper server or Kafka brokers fail, then the ``make check`` test
exits with the status ``SKIPPED``.
To install the shared object ``libtrackrdr-kafka.so``, run ``make
install`` as root, for example with ``sudo``::
$ sudo make install
In standard configuration, the ``.so`` file will be installed by
``libtool(1)``, and its location may be affected by the ``--libdir``
option to ``configure``.
CONFIGURATION
=============
As mentioned above, a configuration file for ``libtrackrdr-kafka``
MUST be specified in the configuration parameter ``mq.config_fname``
for ``trackrdrd``, and initialization of the MQ implementation fails
if this file cannot be found or read by the process owner of
``trackrdrd`` (or if its syntax is false, or if required parameters
are missing).
The syntax of the configuration file is the same as that of
``trackrdrd``, and it may contain configuration parameters for
``rdkafka``, except as noted below -- thus the configuration applies
to both the messaging plugin and the ``rdkafka`` client library.
If the config parameter ``zookeeper.connect`` is set, then the plugin
obtains information about Kafka brokers from the specified ZooKeeper
server(s), and the value of the ``rdkafka`` parameter
``metadata.broker.list`` is ignored. If ``zookeeper.connect`` is not
set, then an initial list brokers MUST be specified by
``metadata.broker.list`` -- if neither of ``zookeeper.connect`` and
``metadata.broker.list`` are set, then the configuration fails and
``trackrdrd`` will exit.
In addition to configuration parameters for ``rdkafka``, these
parameters can be specified:
===================== ==========================================================
Parameter Description
===================== ==========================================================
``zookeeper.connect`` Comma-separated list of ``host:port`` pairs specifying
the addresses of ZooKeeper servers. If not set, then
``metadata.broker.list`` MUST be set, as described above.
--------------------- ----------------------------------------------------------
``zookeeper.timeout`` Timeout for connections to ZooKeeper servers (optional,
default 0,
--------------------- ----------------------------------------------------------
``zookeeper.log`` Path of a log file for the ZooKeeper client (optional)
--------------------- ----------------------------------------------------------
``log`` Path of a log file for the messaging plugin and Kafka
client (optional)
--------------------- ----------------------------------------------------------
``topic`` Name of the Kafka topic to which messages are sent
--------------------- ----------------------------------------------------------
``mq.debug`` If set to true, then log at DEBUG level
===================== ==========================================================
Except as noted below, the configuration can specify any parameters for
the ``rdkafka`` client, as documented at::
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
The following ``rdkafka`` parameters in the config file are ignored
(they are set internally by the messaging plugin, or are only relevant
to consumers):
* ``client.id``
* ``error_cb``
* ``stats_cb``
* ``log_cb``
* ``socket_cb``
* ``open_cb``
* ``opaque``
* ``queued.*``
* ``fetch.*``
* ``group.id``
* ``dr_cb``
* ``dr_msg_cb``
* ``partitioner``
* ``opaque``
* ``auto.*``
* ``offset.*``
SHARDING
========
The plugin requires that calls to ``MQ_Send()`` supply a hexadecimal
string of up to 8 characters as the sharding key; ``MQ_Send()`` fails
if a key is not specified, or if it contains non-hex characters in the
first 8 bytes.
The plugin uses up to the first 8 hex digits of the key; if the string
is longer, then the remainder from the 9th byte is ignored.
LOGGING AND STATISTICS
======================
XXX: TuDu
MESSAGE SEND FAILURE AND RECOVERY
=================================
XXX: TuDu
* stats callback from rdkafka
* stats counters for missing shard keys or data
SIGNALS
=======
XXX: TuDu -- toggle DEBUG log level
SEE ALSO
========
* ``trackrdrd(3)``
* ``ld.so(8)``
* http://kafka.apache.org/
* http://zookeeper.apache.org/
* https://github.com/edenhill/librdkafka
* http://zookeeper.apache.org/doc/r3.4.6/zookeeperProgrammers.html#C+Binding
COPYRIGHT AND LICENCE
=====================
Both the software and this document are governed by a BSD 2-clause
licence.
| Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
| Copyright (c) 2014 Otto Gmbh & Co KG
| All rights reserved
| Use only with permission
| Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <syslog.h>
#include <stdio.h>
#include <stdarg.h>
#include <errno.h>
#include <string.h>
#include <time.h>
#include "mq_kafka.h"
static int lvl = LOG_INFO;
static const char *level2name[LOG_DEBUG+1];
static FILE *out = NULL;
static void
init_lvlnames(void)
{
level2name[LOG_EMERG] = "EMERG";
level2name[LOG_ALERT] = "ALERT";
level2name[LOG_CRIT] = "CRIT";
level2name[LOG_ERR] = "ERR";
level2name[LOG_WARNING] = "WARNING";
level2name[LOG_NOTICE] = "NOTICE";
level2name[LOG_INFO] = "INFO";
level2name[LOG_DEBUG] = "DEBUG";
}
void
MQ_LOG_Log(int level, const char *msg, ...)
{
time_t t;
char timestr[26];
va_list ap;
if (level > lvl || out == NULL)
return;
t = time(NULL);
ctime_r(&t, timestr);
timestr[24] = '\0';
flockfile(out);
fprintf(out, "%s [%s]: ", timestr, level2name[level]);
va_start(ap, msg);
(void) vfprintf(out, msg, ap);
va_end(ap);
fprintf(out, "\n");
fflush(out);
funlockfile(out);
}
void
MQ_LOG_SetLevel(int level)
{
lvl = level;
}
void
MQ_LOG_Close(void)
{
if (out != NULL)
fclose(out);
}
int
MQ_LOG_Open(const char *path)
{
AN(path);
if (path[0] == '\0')
return EINVAL;
if (strcmp(path, "-") == 0)
out = stdout;
else {
out = fopen(path, "a");
if (out == NULL)
return errno;
}
init_lvlnames();
return(0);
}
This diff is collapsed.
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <assert.h>
#define AZ(foo) do { assert((foo) == 0); } while (0)
#define AN(foo) do { assert((foo) != 0); } while (0)
/* log.c */
int MQ_LOG_Open(const char *path);
void MQ_LOG_Log(int level, const char *msg, ...);
void MQ_LOG_SetLevel(int level);
void MQ_LOG_Close(void);
INCLUDES = -I$(top_srcdir)/include
TESTS = test_kafka
check_PROGRAMS = test_kafka
test_kafka_SOURCES = \
../../minunit.h \
../../../../include/mq.h \
test_kafka.c
test_kafka_LDADD = \
../../../config_common.$(OBJEXT) \
../mq.$(OBJEXT) \
../log.$(OBJEXT) \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl
CLEANFILES = kafka.log zoo.log *~
# test config for Kafka MQ plugin
log = kafka.log
zookeeper.connect = localhost:2181
zookeeper.timeout = 10000
zookeeper.log = zoo.log
topic = libtrackrdr_kafka_test
mq.debug = true
debug = all
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "mq.h"
#include "../../../test/minunit.h"
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define KAFKA_CONFIG "kafka.conf"
#define NWORKERS 1
int tests_run = 0;
void *worker;
/* N.B.: Always run the tests in this order */
static char
*test_global_init(void)
{
const char *err;
printf("... testing Kafka global initialization\n");
err = MQ_GlobalInit(NWORKERS, KAFKA_CONFIG);
VMASSERT(err == NULL, "MQ_GlobalInit: %s", err);
return NULL;
}
static char
*test_init_connection(void)
{
const char *err;
printf("... testing Kafka connection initialization\n");
err = MQ_InitConnections();
if (err != NULL && strstr(err, "connection loss") != NULL) {
printf("No connection, Kafka/Zookeeper assumed not running\n");
exit(EXIT_SKIPPED);
}
VMASSERT(err == NULL, "MQ_InitConnections: %s", err);
return NULL;
}
static const char
*test_worker_init(void)
{
const char *err;
printf("... testing Kafka worker init\n");
err = MQ_WorkerInit(&worker, NWORKERS);
VMASSERT(err == NULL, "MQ_WorkerInit: %s", err);
MASSERT0(worker != NULL, "Worker is NULL after MQ_WorkerInit");
return NULL;
}
static const char
*test_version(void)
{
const char *err;
char version[BUFSIZ];
printf("... testing Kafka version info\n");
MASSERT0(worker != NULL, "MQ_Version: worker is NULL before call");
err = MQ_Version(worker, version);
VMASSERT(err == NULL, "MQ_Version: %s", err);
MASSERT0(version[0] != '\0', "MQ_Version: version is empty");
return NULL;
}
static const char
*test_clientID(void)
{
const char *err;
char clientID[BUFSIZ];
printf("... testing Kafka client ID info\n");
MASSERT0(worker != NULL, "MQ_ClientID: worker is NULL before call");
err = MQ_ClientID(worker, clientID);
VMASSERT(err == NULL, "MQ_ClientID: %s", err);
MASSERT0(clientID[0] != '\0', "MQ_ClientID: client ID is empty");
return NULL;
}
static const char
*test_send(void)
{
const char *err;
printf("... testing Kafka message send\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_Send(worker, "foo bar baz quux", 16, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send: %s", err);
/* Keys shorter and longer than 8 hex digits */
err = MQ_Send(worker, "the quick brown fox", 19, "abcdef", 6);
VMASSERT(err == NULL, "MQ_Send: %s", err);
err = MQ_Send(worker, "jumps over the lazy dog", 23,
"fedcba9876543210", 16);
VMASSERT(err == NULL, "MQ_Send: %s", err);
/* No error if message is empty (silently discarded) */
err = MQ_Send(worker, "", 0, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send: %s", err);
/* Fail if the worker object is null */
err = MQ_Send(NULL, "foo bar baz quux", 16, "12345678", 8);
MAN(err);
/* Fail if the key is empty */
err = MQ_Send(worker, "foo bar baz quux", 16, "", 0);
MAN(err);
VMASSERT(strstr("shard key is missing", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the key is NULL */
err = MQ_Send(worker, "foo bar baz quux", 16, NULL, 0);
MAN(err);
VMASSERT(strstr("shard key is missing", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the key contains non-hex characters */
err = MQ_Send(worker, "foo bar baz quux", 16, "key", 3);
MAN(err);
VMASSERT(strstr("shard key is not hex", err) == 0,
"MQ_Send unexpected error message: %s", err);
/* Fail if the message is NULL */
err = MQ_Send(worker, NULL, 16, "12345678", 8);
MAN(err);
VMASSERT(strstr("message payload is NULL", err) == 0,
"MQ_Send unexpected error message: %s", err);
return NULL;
}
static const char
*test_reconnect(void)
{
const char *err;
printf("... testing Kafka reconnect\n");
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL before call");
err = MQ_Reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
err = MQ_Send(worker, "send after reconnect", 20, "12345678", 8);
VMASSERT(err == NULL, "MQ_Send() fails after reconnect: %s", err);
return NULL;
}
static const char
*test_worker_shutdown(void)
{
const char *err;
printf("... testing Kafka worker shutdown\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_WorkerShutdown(&worker);
VMASSERT(err == NULL, "MQ_WorkerShutdown: %s", err);
MASSERT0(worker == NULL, "Worker not NULL after shutdown");
err = MQ_Send(worker, "foo bar baz quux", 16, "12345678", 8);
MASSERT0(err != NULL, "No failure on MQ_Send after worker shutdown");
return NULL;
}
static const char
*test_global_shutdown(void)
{
const char *err;
printf("... testing Kafka global shutdown\n");
err = MQ_GlobalShutdown();
VMASSERT(err == NULL, "MQ_GlobalShutdown: %s", err);
return NULL;
}
static const char
*all_tests(void)
{
mu_run_test(test_global_init);
mu_run_test(test_init_connection);
mu_run_test(test_worker_init);
mu_run_test(test_version);
mu_run_test(test_clientID);
mu_run_test(test_send);
mu_run_test(test_reconnect);
mu_run_test(test_worker_shutdown);
mu_run_test(test_global_shutdown);
return NULL;
}
TEST_RUNNER
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment