Commit ca8b5997 authored by Geoff Simmons's avatar Geoff Simmons

Kafka MQ plugin: add the monitoring thread

parent b39b96f2
......@@ -12,6 +12,7 @@ libtrackrdr_kafka_la_SOURCES = \
mq_kafka.h \
mq.c \
log.c \
monitor.c \
$(top_srcdir)/src/config_common.c
libtrackrdr_kafka_la_LIBADD = \
......@@ -34,3 +35,5 @@ libtrackrdr-kafka.3: README.rst
if HAVE_RST2MAN
${RST2MAN} README.rst $@
endif
CLEANFILES = *~
......@@ -137,6 +137,8 @@ set, then an initial list brokers MUST be specified by
``metadata.broker.list`` are set, then the configuration fails and
``trackrdrd`` will exit.
The ``topic`` parameter MUST be set.
In addition to configuration parameters for ``rdkafka``, these
parameters can be specified:
......@@ -147,15 +149,17 @@ Parameter Description
the addresses of ZooKeeper servers. If not set, then
``metadata.broker.list`` MUST be set, as described above.
--------------------- ----------------------------------------------------------
``zookeeper.timeout`` Timeout for connections to ZooKeeper servers (optional,
default 0,
``zookeeper.timeout`` Timeout in milliseconds for connections to ZooKeeper
servers. If 0, then a connection attempt fails immediately
if the servers cannot be reached. (optional, default 0)
--------------------- ----------------------------------------------------------
``zookeeper.log`` Path of a log file for the ZooKeeper client (optional)
--------------------- ----------------------------------------------------------
``log`` Path of a log file for the messaging plugin and Kafka
``mq.log`` Path of a log file for the messaging plugin and Kafka
client (optional)
--------------------- ----------------------------------------------------------
``topic`` Name of the Kafka topic to which messages are sent
(required)
--------------------- ----------------------------------------------------------
``mq.debug`` If set to true, then log at DEBUG level
===================== ==========================================================
......@@ -194,21 +198,79 @@ string of up to 8 characters as the sharding key; ``MQ_Send()`` fails
if a key is not specified, or if it contains non-hex characters in the
first 8 bytes.
The plugin uses up to the first 8 hex digits of the key; if the string
is longer, then the remainder from the 9th byte is ignored.
Only the first 8 hex digits of the key are significant; if the string
is longer, then the remainder of the key from the 9th byte is ignored.
LOGGING AND STATISTICS
======================
XXX: TuDu
The parameter ``mq.log`` sets the path of a log file for
informational, error and debug messages from both the messaging plugin
and the rdkafka client library. If the parameter is not set, then no
log file is written.
If the rdkafka parameter ``statistics.interval.ms`` is set and
non-zero, then statistics from both the plugin and the client library
are emitted to the log at that interval for each worker object
(i.e. for each worker thread of the tracking reader).
Log lines beginning with ``rdkafka stats`` contain statistics from the
rdkafka library for a worker object. The format and content of these
lines are determined by the rdkafka library.
Log lines beginning with ``mq stats`` are generated by the MQ plugin,
and have the following form (possibly with additional formatting and
information from the logger)::
mq stats (ID = <CLIENTID>): nokey=0 badkey=0 nodata=0
``CLIENTID`` is the ID of the worker object (as returned from
``MQ_ClientID()``). The statistics are all cumulative counters.
===================== ==========================================================
Statistic Description
===================== ==========================================================
``nokey`` The number of ``MQ_Send()`` operations called for the
worker with no shard key.
--------------------- ----------------------------------------------------------
``badkey`` The number of send operations called with an illegal
shard key (not a hex string).
--------------------- ----------------------------------------------------------
``nodata`` The number of send operations called with no message
payload.
===================== ==========================================================
MESSAGE SEND FAILURE AND RECOVERY
=================================
XXX: TuDu
* stats callback from rdkafka
* stats counters for missing shard keys or data
The messaging plugin uses the rdkafka client library, whose send
operations are asynchronous -- messages to be sent are placed on an
internal queue, from which they are sent to Kafka brokers as
determined by the ``queue.*`` configuration parameters. Unless there
is some exceptional condition (for example, the internal queue is
full), rdkafka's "produce" operation succeeds immediately after the
message is placed on the queue. If a failure occurs when delivery of a
message to a broker is attempted, then the rdkafka library saves the
error status in its internal state, but this ordinarily becomes known
some time after the "produce" operation has been completed.
This means that in ordinary operation, the plugin's ``MQ_Send()`` call
will not fail immediately if in fact it turns out that the message
cannot be delivered to a broker.
The messaging plugin polls the internal state of an rdkafka producer
associated with a worker object during ``MQ_Send()`` once before
invoking the "produce" operation, once afterward, and also every time
rdkafka internal statistics are queried as described above. If a
pending unrecoverable error state is determined during the call to
``MQ_Send()``, then an unrecoverable error status is returned (also if
the "produce" operation fails immediately); the tracking reader can
then engage its process for error recovery. It should be understood
that an unrecoverable error status from ``MQ_Send()`` does not
necessarily indicate that delivery of the current message has failed
(unless it is due to failure of the "produce" operation), but rather
the delivery of a message submitted via ``MQ_Send()`` at an earlier
point in time.
SIGNALS
=======
......
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <syslog.h>
#include <time.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include "mq_kafka.h"
#include "miniobj.h"
static pthread_t monitor;
static int run = 0;
/* Call rd_kafka_poll() for each worker to provoke callbacks */
static void
poll_workers(void)
{
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL) {
kafka_wrk_t *wrk = workers[i];
CHECK_OBJ(wrk, KAFKA_WRK_MAGIC);
rd_kafka_poll(wrk->kafka, 0);
}
}
static void
monitor_cleanup(void *arg)
{
(void) arg;
poll_workers();
MQ_LOG_Log(LOG_INFO, "libtrackrdr-kafka monitoring thread exiting");
}
static void
*monitor_thread(void *arg)
{
struct timespec t;
unsigned interval = *((unsigned *) arg);
/* Convert ms -> struct timespec */
t.tv_sec = (time_t) interval / 10e3;
t.tv_nsec = (interval % (unsigned) 10e3) * 10e6;
MQ_LOG_Log(LOG_INFO,
"libtrackrdr-kafka monitor thread running every %u.%03u secs",
t.tv_sec, t.tv_nsec / 10e6);
run = 1;
pthread_cleanup_push(monitor_cleanup, arg);
while (run) {
int err;
if (nanosleep(&t, NULL) != 0) {
if (errno == EINTR) {
if (run == 0)
break;
MQ_LOG_Log(LOG_INFO,
"libtrackrdr-kafka monitoring thread interrupted");
continue;
}
else {
MQ_LOG_Log(LOG_ERR, "libtrackrdr-kafka monitoring thread: %s\n",
strerror(errno));
err = errno;
pthread_exit(&err);
}
}
poll_workers();
}
pthread_cleanup_pop(0);
MQ_LOG_Log(LOG_INFO, "libtrackrdr-kafka monitoring thread exiting");
pthread_exit((void *) NULL);
}
int
MQ_MON_Init(unsigned interval)
{
if (interval == 0)
return 0;
return pthread_create(&monitor, NULL, monitor_thread, (void *) &interval);
}
void
MQ_MON_Fini(void)
{
if (run) {
run = 0;
AZ(pthread_cancel(monitor));
/* XXX: read and return an error status */
AZ(pthread_join(monitor, NULL));
}
}
......@@ -38,7 +38,6 @@
#include <syslog.h>
#include <ctype.h>
#include <librdkafka/rdkafka.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_version.h>
#include <yajl/yajl_tree.h>
......@@ -64,23 +63,6 @@
#define SO_VERSION "unknown version"
#endif
typedef struct kafka_wrk {
unsigned magic;
#define KAFKA_WRK_MAGIC 0xd14d4425
int n;
rd_kafka_t *kafka;
rd_kafka_topic_t *topic;
int err;
char reason[LINE_MAX]; /* errs from rdkafka callbacks */
char errmsg[LINE_MAX]; /* thread-safe return from MQ_*() */
unsigned nokey;
unsigned badkey;
unsigned nodata;
} kafka_wrk_t;
static kafka_wrk_t **workers;
static unsigned nwrk = 0;
static char logpath[PATH_MAX] = "";
static char zookeeper[LINE_MAX] = "";
......@@ -98,6 +80,7 @@ static char errmsg[LINE_MAX];
static char _version[LINE_MAX];
static int loglvl = LOG_INFO;
static unsigned stats_interval = 0;
static void
log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
......@@ -194,7 +177,7 @@ partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
static const char
*wrk_init(int wrk_num)
{
char clientid[sizeof("libtrackrdr-kafka-worker 2147483648")];
char clientid[sizeof("libtrackrdr-kafka-worker-2147483648")];
rd_kafka_conf_t *wrk_conf;
rd_kafka_topic_conf_t *wrk_topic_conf;
rd_kafka_t *rk;
......@@ -205,7 +188,7 @@ static const char
wrk_conf = rd_kafka_conf_dup(conf);
wrk_topic_conf = rd_kafka_topic_conf_dup(topic_conf);
sprintf(clientid, "libtrackrdr-kafka-worker %d", wrk_num);
sprintf(clientid, "libtrackrdr-kafka-worker-%d", wrk_num);
if (rd_kafka_conf_set(wrk_conf, "client.id", clientid, errmsg,
LINE_MAX) != RD_KAFKA_CONF_OK) {
MQ_LOG_Log(LOG_ERR, "rdkafka config error [client.id = %s]: %s",
......@@ -286,8 +269,7 @@ conf_add(const char *lval, const char *rval)
errstr[0] = '\0';
/* XXX: rename as "mq.log" */
if (strcmp(lval, "log") == 0) {
if (strcmp(lval, "mq.log") == 0) {
strncpy(logpath, rval, PATH_MAX);
return(0);
}
......@@ -311,6 +293,24 @@ conf_add(const char *lval, const char *rval)
zoo_timeout = val;
return(0);
}
if (strcmp(lval, "statistics.interval.ms") == 0) {
char *endptr = NULL;
long val;
errno = 0;
val = strtoul(rval, &endptr, 10);
if (errno != 0)
return errno;
if (*endptr != '\0')
return EINVAL;
if (val > UINT_MAX)
return ERANGE;
stats_interval = val;
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK)
return EINVAL;
return(0);
}
if (strcmp(lval, "zookeeper.log") == 0) {
strncpy(zoolog, rval, PATH_MAX);
return(0);
......@@ -354,12 +354,7 @@ conf_add(const char *lval, const char *rval)
const char *
MQ_GlobalInit(unsigned nworkers, const char *config_fname)
{
snprintf(_version, LINE_MAX,
"libtrackrdr-kafka %s, rdkafka %s, zookeeper %d.%d.%d, "
"yajl %d.%d.%d", SO_VERSION, rd_kafka_version_str(),
ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION,
YAJL_MAJOR, YAJL_MINOR, YAJL_MICRO);
nwrk = nworkers;
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();
......@@ -375,6 +370,11 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
MQ_LOG_SetLevel(loglvl);
}
snprintf(_version, LINE_MAX,
"libtrackrdr-kafka %s, rdkafka %s, zookeeper %d.%d.%d, "
"yajl %d.%d.%d", SO_VERSION, rd_kafka_version_str(),
ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION,
YAJL_MAJOR, YAJL_MINOR, YAJL_MICRO);
MQ_LOG_Log(LOG_INFO, "initializing (%s)", _version);
if (zookeeper[0] == '\0' && brokerlist[0] == '\0') {
......@@ -392,7 +392,6 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
nwrk = nworkers;
if (zoolog[0] != '\0') {
zoologf = fopen(zoolog, "a");
......@@ -405,6 +404,16 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
zoo_set_log_stream(zoologf);
}
if (stats_interval != 0) {
int err = MQ_MON_Init(stats_interval);
if (err != 0) {
snprintf(errmsg, LINE_MAX, "Cannot start monitoring thread: %s",
strerror(err));
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
}
rd_kafka_conf_set_dr_cb(conf, dr_cb);
rd_kafka_conf_set_error_cb(conf, error_cb);
rd_kafka_conf_set_log_cb(conf, log_cb);
......@@ -722,6 +731,7 @@ MQ_GlobalShutdown(void)
{
int zerr;
MQ_MON_Fini();
for (int i = 0; i < nwrk; i++)
if (workers[i] != NULL)
wrk_fini(workers[i]);
......@@ -741,6 +751,7 @@ MQ_GlobalShutdown(void)
MQ_LOG_Log(LOG_ERR, errmsg);
return errmsg;
}
fclose(zoologf);
MQ_LOG_Log(LOG_INFO, "shutting down");
MQ_LOG_Close();
......
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
......@@ -31,12 +30,36 @@
*/
#include <assert.h>
#include <limits.h>
#include <librdkafka/rdkafka.h>
#define AZ(foo) do { assert((foo) == 0); } while (0)
#define AN(foo) do { assert((foo) != 0); } while (0)
typedef struct kafka_wrk {
unsigned magic;
#define KAFKA_WRK_MAGIC 0xd14d4425
int n;
rd_kafka_t *kafka;
rd_kafka_topic_t *topic;
int err;
char reason[LINE_MAX]; /* errs from rdkafka callbacks */
char errmsg[LINE_MAX]; /* thread-safe return from MQ_*() */
unsigned nokey;
unsigned badkey;
unsigned nodata;
} kafka_wrk_t;
kafka_wrk_t **workers;
unsigned nwrk;
/* log.c */
int MQ_LOG_Open(const char *path);
void MQ_LOG_Log(int level, const char *msg, ...);
void MQ_LOG_SetLevel(int level);
void MQ_LOG_Close(void);
/* monitor.c */
int MQ_MON_Init(unsigned interval);
void MQ_MON_Fini(void);
......@@ -13,6 +13,7 @@ test_kafka_LDADD = \
../../../config_common.$(OBJEXT) \
../mq.$(OBJEXT) \
../log.$(OBJEXT) \
../monitor.$(OBJEXT) \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lyajl
......
# test config for Kafka MQ plugin
log = kafka.log
mq.log = kafka.log
zookeeper.connect = localhost:2181
zookeeper.timeout = 10000
zookeeper.log = zoo.log
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment