Commit 778656ba authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: added config param worker.shutdown.timeout.ms

parent 682e878f
...@@ -145,27 +145,41 @@ The ``topic`` parameter MUST be set. ...@@ -145,27 +145,41 @@ The ``topic`` parameter MUST be set.
In addition to configuration parameters for ``rdkafka``, these In addition to configuration parameters for ``rdkafka``, these
parameters can be specified: parameters can be specified:
===================== ========================================================== =================================== ============================================
Parameter Description Parameter Description
===================== ========================================================== =================================== ============================================
``zookeeper.connect`` Comma-separated list of ``host:port`` pairs specifying ``zookeeper.connect`` Comma-separated list of ``host:port`` pairs
the addresses of ZooKeeper servers. If not set, then specifying the addresses of ZooKeeper
``metadata.broker.list`` MUST be set, as described above. servers. If not set, then
--------------------- ---------------------------------------------------------- ``metadata.broker.list`` MUST be set, as
``zookeeper.timeout`` Timeout in milliseconds for connections to ZooKeeper described above.
servers. If 0, then a connection attempt fails immediately ----------------------------------- --------------------------------------------
if the servers cannot be reached. (optional, default 0) ``zookeeper.timeout`` Timeout in milliseconds for connections to
--------------------- ---------------------------------------------------------- ZooKeeper servers. If 0, then a connection
``zookeeper.log`` Path of a log file for the ZooKeeper client (optional) attempt fails immediately if the servers
--------------------- ---------------------------------------------------------- cannot be reached. (optional, default 0)
``mq.log`` Path of a log file for the messaging plugin and Kafka ----------------------------------- --------------------------------------------
client (optional) ``zookeeper.log`` Path of a log file for the ZooKeeper client
--------------------- ---------------------------------------------------------- (optional)
``topic`` Name of the Kafka topic to which messages are sent ----------------------------------- --------------------------------------------
(required) ``mq.log`` Path of a log file for the messaging plugin
--------------------- ---------------------------------------------------------- and Kafka client (optional)
``mq.debug`` If set to true, then log at DEBUG level ----------------------------------- --------------------------------------------
===================== ========================================================== ``topic`` Name of the Kafka topic to which messages
are sent (required)
----------------------------------- --------------------------------------------
``mq.debug`` If set to true, then log at DEBUG level
----------------------------------- --------------------------------------------
``worker.shutdown.timeout.ms`` If non-zero, workers will wait this long
before they shut down for acknowledgements
that all of the messages that they produced
are delivered; and on global shutdown, the
plugin will wait this long for all rdkafka
client objects to finalize. If zero, wait
indefinitely for message delivery, but don't
wait for rdkafka finalization. (optional,
default 1000 ms)
=================================== ============================================
Except as noted below, the configuration can specify any parameters for Except as noted below, the configuration can specify any parameters for
the ``rdkafka`` client, as documented at:: the ``rdkafka`` client, as documented at::
......
...@@ -38,6 +38,24 @@ ...@@ -38,6 +38,24 @@
#include "mq_kafka.h" #include "mq_kafka.h"
static int
conf_getUnsignedInt(const char *rval, unsigned *i)
{
unsigned long n;
char *p;
errno = 0;
n = strtoul(rval, &p, 10);
if (errno)
return(errno);
if (strlen(p) != 0)
return(EINVAL);
if (n > UINT_MAX)
return(ERANGE);
*i = (unsigned int) n;
return(0);
}
void void
CONF_Init(void) CONF_Init(void)
{ {
...@@ -51,6 +69,7 @@ CONF_Init(void) ...@@ -51,6 +69,7 @@ CONF_Init(void)
stats_interval = 0; stats_interval = 0;
zoolog[0] = '\0'; zoolog[0] = '\0';
brokerlist[0] = '\0'; brokerlist[0] = '\0';
wrk_shutdown_timeout = 1000;
} }
int int
...@@ -58,6 +77,7 @@ CONF_Add(const char *lval, const char *rval) ...@@ -58,6 +77,7 @@ CONF_Add(const char *lval, const char *rval)
{ {
rd_kafka_conf_res_t result; rd_kafka_conf_res_t result;
char errstr[LINE_MAX]; char errstr[LINE_MAX];
int err;
errstr[0] = '\0'; errstr[0] = '\0';
...@@ -71,33 +91,18 @@ CONF_Add(const char *lval, const char *rval) ...@@ -71,33 +91,18 @@ CONF_Add(const char *lval, const char *rval)
} }
/* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */ /* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */
if (strcmp(lval, "zookeeper.timeout") == 0) { if (strcmp(lval, "zookeeper.timeout") == 0) {
char *endptr = NULL; if ((err = conf_getUnsignedInt(rval, &zoo_timeout)) != 0)
long val; return(err);
return(0);
errno = 0; }
val = strtoul(rval, &endptr, 10); if (strcmp(lval, "worker.shutdown.timeout.ms") == 0) {
if (errno != 0) if ((err = conf_getUnsignedInt(rval, &wrk_shutdown_timeout)) != 0)
return errno; return(err);
if (*endptr != '\0')
return EINVAL;
if (val > UINT_MAX)
return ERANGE;
zoo_timeout = val;
return(0); return(0);
} }
if (strcmp(lval, "statistics.interval.ms") == 0) { if (strcmp(lval, "statistics.interval.ms") == 0) {
char *endptr = NULL; if ((err = conf_getUnsignedInt(rval, &stats_interval)) != 0)
long val; return(err);
errno = 0;
val = strtoul(rval, &endptr, 10);
if (errno != 0)
return errno;
if (*endptr != '\0')
return EINVAL;
if (val > UINT_MAX)
return ERANGE;
stats_interval = val;
result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX); result = rd_kafka_conf_set(conf, lval, rval, errstr, LINE_MAX);
if (result != RD_KAFKA_CONF_OK) if (result != RD_KAFKA_CONF_OK)
return EINVAL; return EINVAL;
...@@ -150,5 +155,7 @@ CONF_Dump(void) ...@@ -150,5 +155,7 @@ CONF_Dump(void)
MQ_LOG_Log(LOG_DEBUG, "zookeeper.timeout = %u", zoo_timeout); MQ_LOG_Log(LOG_DEBUG, "zookeeper.timeout = %u", zoo_timeout);
MQ_LOG_Log(LOG_DEBUG, "zookeeper.log = %s", zoolog); MQ_LOG_Log(LOG_DEBUG, "zookeeper.log = %s", zoolog);
MQ_LOG_Log(LOG_DEBUG, "topic = %s", topic); MQ_LOG_Log(LOG_DEBUG, "topic = %s", topic);
MQ_LOG_Log(LOG_DEBUG, "worker.shutdown.timeout.ms = %u",
wrk_shutdown_timeout);
// leaving out mq.debug for now // leaving out mq.debug for now
} }
...@@ -400,6 +400,11 @@ MQ_GlobalShutdown(void) ...@@ -400,6 +400,11 @@ MQ_GlobalShutdown(void)
WRK_Fini(workers[i]); WRK_Fini(workers[i]);
free(workers); free(workers);
if (wrk_shutdown_timeout
&& rd_kafka_wait_destroyed(wrk_shutdown_timeout) != 0)
MQ_LOG_Log(LOG_WARNING, "timeout (%u ms) waiting for "
"rdkafka clients to shut down", wrk_shutdown_timeout);
rd_kafka_conf_destroy(conf); rd_kafka_conf_destroy(conf);
rd_kafka_topic_conf_destroy(topic_conf); rd_kafka_topic_conf_destroy(topic_conf);
......
...@@ -65,6 +65,7 @@ char brokerlist[LINE_MAX]; ...@@ -65,6 +65,7 @@ char brokerlist[LINE_MAX];
char zoolog[PATH_MAX]; char zoolog[PATH_MAX];
unsigned zoo_timeout; unsigned zoo_timeout;
unsigned stats_interval; unsigned stats_interval;
unsigned wrk_shutdown_timeout;
rd_kafka_topic_conf_t *topic_conf; rd_kafka_topic_conf_t *topic_conf;
rd_kafka_conf_t *conf; rd_kafka_conf_t *conf;
......
...@@ -33,12 +33,21 @@ ...@@ -33,12 +33,21 @@
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <time.h>
#include "mq_kafka.h" #include "mq_kafka.h"
#include "miniobj.h" #include "miniobj.h"
static char errmsg[LINE_MAX]; static char errmsg[LINE_MAX];
static unsigned
get_clock_ms(void)
{
struct timespec t;
AZ(clock_gettime(CLOCK_REALTIME, &t));
return (t.tv_sec * 1e3) + (t.tv_nsec / 1e6);
}
const char const char
*WRK_Init(int wrk_num) *WRK_Init(int wrk_num)
{ {
...@@ -123,6 +132,7 @@ void ...@@ -123,6 +132,7 @@ void
WRK_Fini(kafka_wrk_t *wrk) WRK_Fini(kafka_wrk_t *wrk)
{ {
int wrk_num; int wrk_num;
unsigned t = 0;
CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC); CHECK_OBJ_NOTNULL(wrk, KAFKA_WRK_MAGIC);
...@@ -130,9 +140,17 @@ WRK_Fini(kafka_wrk_t *wrk) ...@@ -130,9 +140,17 @@ WRK_Fini(kafka_wrk_t *wrk)
assert(wrk_num >= 0 && wrk_num < nwrk); assert(wrk_num >= 0 && wrk_num < nwrk);
/* Wait for messages to be delivered */ /* Wait for messages to be delivered */
/* XXX: timeout? configure poll timeout? */ if (wrk_shutdown_timeout)
while (rd_kafka_outq_len(wrk->kafka) > 0) t = get_clock_ms();
while (rd_kafka_outq_len(wrk->kafka) > 0) {
rd_kafka_poll(wrk->kafka, 100); rd_kafka_poll(wrk->kafka, 100);
if (t && (get_clock_ms() - t > wrk_shutdown_timeout)) {
MQ_LOG_Log(LOG_WARNING,
"%s: timeout (%u ms) waiting for message delivery",
rd_kafka_name(wrk->kafka), wrk_shutdown_timeout);
break;
}
}
rd_kafka_topic_destroy(wrk->topic); rd_kafka_topic_destroy(wrk->topic);
rd_kafka_destroy(wrk->kafka); rd_kafka_destroy(wrk->kafka);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment