Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
80a25737
Commit
80a25737
authored
May 31, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Kafka MQ plugin: add the monitoring thread
parent
31d6c08d
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
266 additions
and
43 deletions
+266
-43
Makefile.am
trackrdrd/src/mq/kafka/Makefile.am
+3
-0
README.rst
trackrdrd/src/mq/kafka/README.rst
+72
-10
monitor.c
trackrdrd/src/mq/kafka/monitor.c
+123
-0
mq.c
trackrdrd/src/mq/kafka/mq.c
+40
-29
mq_kafka.h
trackrdrd/src/mq/kafka/mq_kafka.h
+26
-3
Makefile.am
trackrdrd/src/mq/kafka/test/Makefile.am
+1
-0
kafka.conf
trackrdrd/src/mq/kafka/test/kafka.conf
+1
-1
No files found.
trackrdrd/src/mq/kafka/Makefile.am
View file @
80a25737
...
...
@@ -12,6 +12,7 @@ libtrackrdr_kafka_la_SOURCES = \
mq_kafka.h
\
mq.c
\
log.c
\
monitor.c
\
$(top_srcdir)
/src/config_common.c
libtrackrdr_kafka_la_LIBADD
=
\
...
...
@@ -34,3 +35,5 @@ libtrackrdr-kafka.3: README.rst
if
HAVE_RST2MAN
${RST2MAN}
README.rst
$@
endif
CLEANFILES
=
*
~
trackrdrd/src/mq/kafka/README.rst
View file @
80a25737
...
...
@@ -137,6 +137,8 @@ set, then an initial list brokers MUST be specified by
``metadata.broker.list`` are set, then the configuration fails and
``trackrdrd`` will exit.
The ``topic`` parameter MUST be set.
In addition to configuration parameters for ``rdkafka``, these
parameters can be specified:
...
...
@@ -147,15 +149,17 @@ Parameter Description
the addresses of ZooKeeper servers. If not set, then
``metadata.broker.list`` MUST be set, as described above.
--------------------- ----------------------------------------------------------
``zookeeper.timeout`` Timeout for connections to ZooKeeper servers (optional,
default 0,
``zookeeper.timeout`` Timeout in milliseconds for connections to ZooKeeper
servers. If 0, then a connection attempt fails immediately
if the servers cannot be reached. (optional, default 0)
--------------------- ----------------------------------------------------------
``zookeeper.log`` Path of a log file for the ZooKeeper client (optional)
--------------------- ----------------------------------------------------------
``
log``
Path of a log file for the messaging plugin and Kafka
``
mq.log``
Path of a log file for the messaging plugin and Kafka
client (optional)
--------------------- ----------------------------------------------------------
``topic`` Name of the Kafka topic to which messages are sent
(required)
--------------------- ----------------------------------------------------------
``mq.debug`` If set to true, then log at DEBUG level
===================== ==========================================================
...
...
@@ -194,21 +198,79 @@ string of up to 8 characters as the sharding key; ``MQ_Send()`` fails
if a key is not specified, or if it contains non-hex characters in the
first 8 bytes.
The plugin uses up to the first 8 hex digits of the key
; if the string
is longer, then the remainder from the 9th byte is ignored.
Only the first 8 hex digits of the key are significant
; if the string
is longer, then the remainder
of the key
from the 9th byte is ignored.
LOGGING AND STATISTICS
======================
XXX: TuDu
The parameter ``mq.log`` sets the path of a log file for
informational, error and debug messages from both the messaging plugin
and the rdkafka client library. If the parameter is not set, then no
log file is written.
If the rdkafka parameter ``statistics.interval.ms`` is set and
non-zero, then statistics from both the plugin and the client library
are emitted to the log at that interval for each worker object
(i.e. for each worker thread of the tracking reader).
Log lines beginning with ``rdkafka stats`` contain statistics from the
rdkafka library for a worker object. The format and content of these
lines are determined by the rdkafka library.
Log lines beginning with ``mq stats`` are generated by the MQ plugin,
and have the following form (possibly with additional formatting and
information from the logger)::
mq stats (ID = <CLIENTID>): nokey=0 badkey=0 nodata=0
``CLIENTID`` is the ID of the worker object (as returned from
``MQ_ClientID()``). The statistics are all cumulative counters.
===================== ==========================================================
Statistic Description
===================== ==========================================================
``nokey`` The number of ``MQ_Send()`` operations called for the
worker with no shard key.
--------------------- ----------------------------------------------------------
``badkey`` The number of send operations called with an illegal
shard key (not a hex string).
--------------------- ----------------------------------------------------------
``nodata`` The number of send operations called with no message
payload.
===================== ==========================================================
MESSAGE SEND FAILURE AND RECOVERY
=================================
XXX: TuDu
* stats callback from rdkafka
* stats counters for missing shard keys or data
The messaging plugin uses the rdkafka client library, whose send
operations are asynchronous -- messages to be sent are placed on an
internal queue, from which they are sent to Kafka brokers as
determined by the ``queue.*`` configuration parameters. Unless there
is some exceptional condition (for example, the internal queue is
full), rdkafka's "produce" operation succeeds immediately after the
message is placed on the queue. If a failure occurs when delivery of a
message to a broker is attempted, then the rdkafka library saves the
error status in its internal state, but this ordinarily becomes known
some time after the "produce" operation has been completed.
This means that in ordinary operation, the plugin's ``MQ_Send()`` call
will not fail immediately if in fact it turns out that the message
cannot be delivered to a broker.
The messaging plugin polls the internal state of an rdkafka producer
associated with a worker object during ``MQ_Send()`` once before
invoking the "produce" operation, once afterward, and also every time
rdkafka internal statistics are queried as described above. If a
pending unrecoverable error state is determined during the call to
``MQ_Send()``, then an unrecoverable error status is returned (also if
the "produce" operation fails immediately); the tracking reader can
then engage its process for error recovery. It should be understood
that an unrecoverable error status from ``MQ_Send()`` does not
necessarily indicate that delivery of the current message has failed
(unless it is due to failure of the "produce" operation), but rather
the delivery of a message submitted via ``MQ_Send()`` at an earlier
point in time.
SIGNALS
=======
...
...
trackrdrd/src/mq/kafka/monitor.c
0 → 100644
View file @
80a25737
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <syslog.h>
#include <time.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include "mq_kafka.h"
#include "miniobj.h"
static
pthread_t
monitor
;
static
int
run
=
0
;
/* Call rd_kafka_poll() for each worker to provoke callbacks */
static
void
poll_workers
(
void
)
{
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
if
(
workers
[
i
]
!=
NULL
)
{
kafka_wrk_t
*
wrk
=
workers
[
i
];
CHECK_OBJ
(
wrk
,
KAFKA_WRK_MAGIC
);
rd_kafka_poll
(
wrk
->
kafka
,
0
);
}
}
static
void
monitor_cleanup
(
void
*
arg
)
{
(
void
)
arg
;
poll_workers
();
MQ_LOG_Log
(
LOG_INFO
,
"libtrackrdr-kafka monitoring thread exiting"
);
}
static
void
*
monitor_thread
(
void
*
arg
)
{
struct
timespec
t
;
unsigned
interval
=
*
((
unsigned
*
)
arg
);
/* Convert ms -> struct timespec */
t
.
tv_sec
=
(
time_t
)
interval
/
10e3
;
t
.
tv_nsec
=
(
interval
%
(
unsigned
)
10e3
)
*
10e6
;
MQ_LOG_Log
(
LOG_INFO
,
"libtrackrdr-kafka monitor thread running every %u.%03u secs"
,
t
.
tv_sec
,
t
.
tv_nsec
/
10e6
);
run
=
1
;
pthread_cleanup_push
(
monitor_cleanup
,
arg
);
while
(
run
)
{
int
err
;
if
(
nanosleep
(
&
t
,
NULL
)
!=
0
)
{
if
(
errno
==
EINTR
)
{
if
(
run
==
0
)
break
;
MQ_LOG_Log
(
LOG_INFO
,
"libtrackrdr-kafka monitoring thread interrupted"
);
continue
;
}
else
{
MQ_LOG_Log
(
LOG_ERR
,
"libtrackrdr-kafka monitoring thread: %s
\n
"
,
strerror
(
errno
));
err
=
errno
;
pthread_exit
(
&
err
);
}
}
poll_workers
();
}
pthread_cleanup_pop
(
0
);
MQ_LOG_Log
(
LOG_INFO
,
"libtrackrdr-kafka monitoring thread exiting"
);
pthread_exit
((
void
*
)
NULL
);
}
int
MQ_MON_Init
(
unsigned
interval
)
{
if
(
interval
==
0
)
return
0
;
return
pthread_create
(
&
monitor
,
NULL
,
monitor_thread
,
(
void
*
)
&
interval
);
}
void
MQ_MON_Fini
(
void
)
{
if
(
run
)
{
run
=
0
;
AZ
(
pthread_cancel
(
monitor
));
/* XXX: read and return an error status */
AZ
(
pthread_join
(
monitor
,
NULL
));
}
}
trackrdrd/src/mq/kafka/mq.c
View file @
80a25737
...
...
@@ -38,7 +38,6 @@
#include <syslog.h>
#include <ctype.h>
#include <librdkafka/rdkafka.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_version.h>
#include <yajl/yajl_tree.h>
...
...
@@ -64,23 +63,6 @@
#define SO_VERSION "unknown version"
#endif
typedef
struct
kafka_wrk
{
unsigned
magic
;
#define KAFKA_WRK_MAGIC 0xd14d4425
int
n
;
rd_kafka_t
*
kafka
;
rd_kafka_topic_t
*
topic
;
int
err
;
char
reason
[
LINE_MAX
];
/* errs from rdkafka callbacks */
char
errmsg
[
LINE_MAX
];
/* thread-safe return from MQ_*() */
unsigned
nokey
;
unsigned
badkey
;
unsigned
nodata
;
}
kafka_wrk_t
;
static
kafka_wrk_t
**
workers
;
static
unsigned
nwrk
=
0
;
static
char
logpath
[
PATH_MAX
]
=
""
;
static
char
zookeeper
[
LINE_MAX
]
=
""
;
...
...
@@ -98,6 +80,7 @@ static char errmsg[LINE_MAX];
static
char
_version
[
LINE_MAX
];
static
int
loglvl
=
LOG_INFO
;
static
unsigned
stats_interval
=
0
;
static
void
log_cb
(
const
rd_kafka_t
*
rk
,
int
level
,
const
char
*
fac
,
const
char
*
buf
)
...
...
@@ -194,7 +177,7 @@ partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
static
const
char
*
wrk_init
(
int
wrk_num
)
{
char
clientid
[
sizeof
(
"libtrackrdr-kafka-worker
2147483648"
)];
char
clientid
[
sizeof
(
"libtrackrdr-kafka-worker
-
2147483648"
)];
rd_kafka_conf_t
*
wrk_conf
;
rd_kafka_topic_conf_t
*
wrk_topic_conf
;
rd_kafka_t
*
rk
;
...
...
@@ -205,7 +188,7 @@ static const char
wrk_conf
=
rd_kafka_conf_dup
(
conf
);
wrk_topic_conf
=
rd_kafka_topic_conf_dup
(
topic_conf
);
sprintf
(
clientid
,
"libtrackrdr-kafka-worker
%d"
,
wrk_num
);
sprintf
(
clientid
,
"libtrackrdr-kafka-worker
-
%d"
,
wrk_num
);
if
(
rd_kafka_conf_set
(
wrk_conf
,
"client.id"
,
clientid
,
errmsg
,
LINE_MAX
)
!=
RD_KAFKA_CONF_OK
)
{
MQ_LOG_Log
(
LOG_ERR
,
"rdkafka config error [client.id = %s]: %s"
,
...
...
@@ -286,8 +269,7 @@ conf_add(const char *lval, const char *rval)
errstr
[
0
]
=
'\0'
;
/* XXX: rename as "mq.log" */
if
(
strcmp
(
lval
,
"log"
)
==
0
)
{
if
(
strcmp
(
lval
,
"mq.log"
)
==
0
)
{
strncpy
(
logpath
,
rval
,
PATH_MAX
);
return
(
0
);
}
...
...
@@ -311,6 +293,24 @@ conf_add(const char *lval, const char *rval)
zoo_timeout
=
val
;
return
(
0
);
}
if
(
strcmp
(
lval
,
"statistics.interval.ms"
)
==
0
)
{
char
*
endptr
=
NULL
;
long
val
;
errno
=
0
;
val
=
strtoul
(
rval
,
&
endptr
,
10
);
if
(
errno
!=
0
)
return
errno
;
if
(
*
endptr
!=
'\0'
)
return
EINVAL
;
if
(
val
>
UINT_MAX
)
return
ERANGE
;
stats_interval
=
val
;
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
!=
RD_KAFKA_CONF_OK
)
return
EINVAL
;
return
(
0
);
}
if
(
strcmp
(
lval
,
"zookeeper.log"
)
==
0
)
{
strncpy
(
zoolog
,
rval
,
PATH_MAX
);
return
(
0
);
...
...
@@ -354,12 +354,7 @@ conf_add(const char *lval, const char *rval)
const
char
*
MQ_GlobalInit
(
unsigned
nworkers
,
const
char
*
config_fname
)
{
snprintf
(
_version
,
LINE_MAX
,
"libtrackrdr-kafka %s, rdkafka %s, zookeeper %d.%d.%d, "
"yajl %d.%d.%d"
,
SO_VERSION
,
rd_kafka_version_str
(),
ZOO_MAJOR_VERSION
,
ZOO_MINOR_VERSION
,
ZOO_PATCH_VERSION
,
YAJL_MAJOR
,
YAJL_MINOR
,
YAJL_MICRO
);
nwrk
=
nworkers
;
conf
=
rd_kafka_conf_new
();
topic_conf
=
rd_kafka_topic_conf_new
();
...
...
@@ -375,6 +370,11 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
MQ_LOG_SetLevel
(
loglvl
);
}
snprintf
(
_version
,
LINE_MAX
,
"libtrackrdr-kafka %s, rdkafka %s, zookeeper %d.%d.%d, "
"yajl %d.%d.%d"
,
SO_VERSION
,
rd_kafka_version_str
(),
ZOO_MAJOR_VERSION
,
ZOO_MINOR_VERSION
,
ZOO_PATCH_VERSION
,
YAJL_MAJOR
,
YAJL_MINOR
,
YAJL_MICRO
);
MQ_LOG_Log
(
LOG_INFO
,
"initializing (%s)"
,
_version
);
if
(
zookeeper
[
0
]
==
'\0'
&&
brokerlist
[
0
]
==
'\0'
)
{
...
...
@@ -392,7 +392,6 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
nwrk
=
nworkers
;
if
(
zoolog
[
0
]
!=
'\0'
)
{
zoologf
=
fopen
(
zoolog
,
"a"
);
...
...
@@ -405,6 +404,16 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
zoo_set_log_stream
(
zoologf
);
}
if
(
stats_interval
!=
0
)
{
int
err
=
MQ_MON_Init
(
stats_interval
);
if
(
err
!=
0
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot start monitoring thread: %s"
,
strerror
(
err
));
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
}
rd_kafka_conf_set_dr_cb
(
conf
,
dr_cb
);
rd_kafka_conf_set_error_cb
(
conf
,
error_cb
);
rd_kafka_conf_set_log_cb
(
conf
,
log_cb
);
...
...
@@ -722,6 +731,7 @@ MQ_GlobalShutdown(void)
{
int
zerr
;
MQ_MON_Fini
();
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
if
(
workers
[
i
]
!=
NULL
)
wrk_fini
(
workers
[
i
]);
...
...
@@ -741,6 +751,7 @@ MQ_GlobalShutdown(void)
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
fclose
(
zoologf
);
MQ_LOG_Log
(
LOG_INFO
,
"shutting down"
);
MQ_LOG_Close
();
...
...
trackrdrd/src/mq/kafka/mq_kafka.h
View file @
80a25737
/*-
* Copyright (c) 201
2-201
4 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 201
2-201
4 Otto Gmbh & Co KG
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
...
...
@@ -31,12 +30,36 @@
*/
#include <assert.h>
#include <limits.h>
#include <librdkafka/rdkafka.h>
#define AZ(foo) do { assert((foo) == 0); } while (0)
#define AN(foo) do { assert((foo) != 0); } while (0)
typedef
struct
kafka_wrk
{
unsigned
magic
;
#define KAFKA_WRK_MAGIC 0xd14d4425
int
n
;
rd_kafka_t
*
kafka
;
rd_kafka_topic_t
*
topic
;
int
err
;
char
reason
[
LINE_MAX
];
/* errs from rdkafka callbacks */
char
errmsg
[
LINE_MAX
];
/* thread-safe return from MQ_*() */
unsigned
nokey
;
unsigned
badkey
;
unsigned
nodata
;
}
kafka_wrk_t
;
kafka_wrk_t
**
workers
;
unsigned
nwrk
;
/* log.c */
int
MQ_LOG_Open
(
const
char
*
path
);
void
MQ_LOG_Log
(
int
level
,
const
char
*
msg
,
...);
void
MQ_LOG_SetLevel
(
int
level
);
void
MQ_LOG_Close
(
void
);
/* monitor.c */
int
MQ_MON_Init
(
unsigned
interval
);
void
MQ_MON_Fini
(
void
);
trackrdrd/src/mq/kafka/test/Makefile.am
View file @
80a25737
...
...
@@ -13,6 +13,7 @@ test_kafka_LDADD = \
../../../config_common.
$(OBJEXT)
\
../mq.
$(OBJEXT)
\
../log.
$(OBJEXT)
\
../monitor.
$(OBJEXT)
\
${
PTHREAD_LIBS
}
\
-lrdkafka
-lz
-lpthread
-lrt
-lzookeeper_mt
-lyajl
...
...
trackrdrd/src/mq/kafka/test/kafka.conf
View file @
80a25737
# test config for Kafka MQ plugin
log
=
kafka
.
log
mq
.
log
=
kafka
.
log
zookeeper
.
connect
=
localhost
:
2181
zookeeper
.
timeout
=
10000
zookeeper
.
log
=
zoo
.
log
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment