Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
434fd065
Commit
434fd065
authored
Jun 03, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
MQ plugin for Kafka: added config param worker.shutdown.timeout.ms
parent
d8609795
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
92 additions
and
47 deletions
+92
-47
README.rst
trackrdrd/src/mq/kafka/README.rst
+35
-21
config.c
trackrdrd/src/mq/kafka/config.c
+31
-24
mq.c
trackrdrd/src/mq/kafka/mq.c
+5
-0
mq_kafka.h
trackrdrd/src/mq/kafka/mq_kafka.h
+1
-0
worker.c
trackrdrd/src/mq/kafka/worker.c
+20
-2
No files found.
trackrdrd/src/mq/kafka/README.rst
View file @
434fd065
...
@@ -145,27 +145,41 @@ The ``topic`` parameter MUST be set.
...
@@ -145,27 +145,41 @@ The ``topic`` parameter MUST be set.
In addition to configuration parameters for ``rdkafka``, these
In addition to configuration parameters for ``rdkafka``, these
parameters can be specified:
parameters can be specified:
===================== ==========================================================
=================================== ============================================
Parameter Description
Parameter Description
===================== ==========================================================
=================================== ============================================
``zookeeper.connect`` Comma-separated list of ``host:port`` pairs specifying
``zookeeper.connect`` Comma-separated list of ``host:port`` pairs
the addresses of ZooKeeper servers. If not set, then
specifying the addresses of ZooKeeper
``metadata.broker.list`` MUST be set, as described above.
servers. If not set, then
--------------------- ----------------------------------------------------------
``metadata.broker.list`` MUST be set, as
``zookeeper.timeout`` Timeout in milliseconds for connections to ZooKeeper
described above.
servers. If 0, then a connection attempt fails immediately
----------------------------------- --------------------------------------------
if the servers cannot be reached. (optional, default 0)
``zookeeper.timeout`` Timeout in milliseconds for connections to
--------------------- ----------------------------------------------------------
ZooKeeper servers. If 0, then a connection
``zookeeper.log`` Path of a log file for the ZooKeeper client (optional)
attempt fails immediately if the servers
--------------------- ----------------------------------------------------------
cannot be reached. (optional, default 0)
``mq.log`` Path of a log file for the messaging plugin and Kafka
----------------------------------- --------------------------------------------
client (optional)
``zookeeper.log`` Path of a log file for the ZooKeeper client
--------------------- ----------------------------------------------------------
(optional)
``topic`` Name of the Kafka topic to which messages are sent
----------------------------------- --------------------------------------------
(required)
``mq.log`` Path of a log file for the messaging plugin
--------------------- ----------------------------------------------------------
and Kafka client (optional)
``mq.debug`` If set to true, then log at DEBUG level
----------------------------------- --------------------------------------------
===================== ==========================================================
``topic`` Name of the Kafka topic to which messages
are sent (required)
----------------------------------- --------------------------------------------
``mq.debug`` If set to true, then log at DEBUG level
----------------------------------- --------------------------------------------
``worker.shutdown.timeout.ms`` If non-zero, workers will wait this long
before they shut down for acknowledgements
that all of the messages that they produced
are delivered; and on global shutdown, the
plugin will wait this long for all rdkafka
client objects to finalize. If zero, wait
indefinitely for message delivery, but don't
wait for rdkafka finalization. (optional,
default 1000 ms)
=================================== ============================================
Except as noted below, the configuration can specify any parameters for
Except as noted below, the configuration can specify any parameters for
the ``rdkafka`` client, as documented at::
the ``rdkafka`` client, as documented at::
...
...
trackrdrd/src/mq/kafka/config.c
View file @
434fd065
...
@@ -38,6 +38,24 @@
...
@@ -38,6 +38,24 @@
#include "mq_kafka.h"
#include "mq_kafka.h"
static
int
conf_getUnsignedInt
(
const
char
*
rval
,
unsigned
*
i
)
{
unsigned
long
n
;
char
*
p
;
errno
=
0
;
n
=
strtoul
(
rval
,
&
p
,
10
);
if
(
errno
)
return
(
errno
);
if
(
strlen
(
p
)
!=
0
)
return
(
EINVAL
);
if
(
n
>
UINT_MAX
)
return
(
ERANGE
);
*
i
=
(
unsigned
int
)
n
;
return
(
0
);
}
void
void
CONF_Init
(
void
)
CONF_Init
(
void
)
{
{
...
@@ -51,6 +69,7 @@ CONF_Init(void)
...
@@ -51,6 +69,7 @@ CONF_Init(void)
stats_interval
=
0
;
stats_interval
=
0
;
zoolog
[
0
]
=
'\0'
;
zoolog
[
0
]
=
'\0'
;
brokerlist
[
0
]
=
'\0'
;
brokerlist
[
0
]
=
'\0'
;
wrk_shutdown_timeout
=
1000
;
}
}
int
int
...
@@ -58,6 +77,7 @@ CONF_Add(const char *lval, const char *rval)
...
@@ -58,6 +77,7 @@ CONF_Add(const char *lval, const char *rval)
{
{
rd_kafka_conf_res_t
result
;
rd_kafka_conf_res_t
result
;
char
errstr
[
LINE_MAX
];
char
errstr
[
LINE_MAX
];
int
err
;
errstr
[
0
]
=
'\0'
;
errstr
[
0
]
=
'\0'
;
...
@@ -71,33 +91,18 @@ CONF_Add(const char *lval, const char *rval)
...
@@ -71,33 +91,18 @@ CONF_Add(const char *lval, const char *rval)
}
}
/* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */
/* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */
if
(
strcmp
(
lval
,
"zookeeper.timeout"
)
==
0
)
{
if
(
strcmp
(
lval
,
"zookeeper.timeout"
)
==
0
)
{
char
*
endptr
=
NULL
;
if
((
err
=
conf_getUnsignedInt
(
rval
,
&
zoo_timeout
))
!=
0
)
long
val
;
return
(
err
);
return
(
0
);
errno
=
0
;
}
val
=
strtoul
(
rval
,
&
endptr
,
10
);
if
(
strcmp
(
lval
,
"worker.shutdown.timeout.ms"
)
==
0
)
{
if
(
errno
!=
0
)
if
((
err
=
conf_getUnsignedInt
(
rval
,
&
wrk_shutdown_timeout
))
!=
0
)
return
errno
;
return
(
err
);
if
(
*
endptr
!=
'\0'
)
return
EINVAL
;
if
(
val
>
UINT_MAX
)
return
ERANGE
;
zoo_timeout
=
val
;
return
(
0
);
return
(
0
);
}
}
if
(
strcmp
(
lval
,
"statistics.interval.ms"
)
==
0
)
{
if
(
strcmp
(
lval
,
"statistics.interval.ms"
)
==
0
)
{
char
*
endptr
=
NULL
;
if
((
err
=
conf_getUnsignedInt
(
rval
,
&
stats_interval
))
!=
0
)
long
val
;
return
(
err
);
errno
=
0
;
val
=
strtoul
(
rval
,
&
endptr
,
10
);
if
(
errno
!=
0
)
return
errno
;
if
(
*
endptr
!=
'\0'
)
return
EINVAL
;
if
(
val
>
UINT_MAX
)
return
ERANGE
;
stats_interval
=
val
;
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
!=
RD_KAFKA_CONF_OK
)
if
(
result
!=
RD_KAFKA_CONF_OK
)
return
EINVAL
;
return
EINVAL
;
...
@@ -150,5 +155,7 @@ CONF_Dump(void)
...
@@ -150,5 +155,7 @@ CONF_Dump(void)
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.timeout = %u"
,
zoo_timeout
);
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.timeout = %u"
,
zoo_timeout
);
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.log = %s"
,
zoolog
);
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.log = %s"
,
zoolog
);
MQ_LOG_Log
(
LOG_DEBUG
,
"topic = %s"
,
topic
);
MQ_LOG_Log
(
LOG_DEBUG
,
"topic = %s"
,
topic
);
MQ_LOG_Log
(
LOG_DEBUG
,
"worker.shutdown.timeout.ms = %u"
,
wrk_shutdown_timeout
);
// leaving out mq.debug for now
// leaving out mq.debug for now
}
}
trackrdrd/src/mq/kafka/mq.c
View file @
434fd065
...
@@ -400,6 +400,11 @@ MQ_GlobalShutdown(void)
...
@@ -400,6 +400,11 @@ MQ_GlobalShutdown(void)
WRK_Fini
(
workers
[
i
]);
WRK_Fini
(
workers
[
i
]);
free
(
workers
);
free
(
workers
);
if
(
wrk_shutdown_timeout
&&
rd_kafka_wait_destroyed
(
wrk_shutdown_timeout
)
!=
0
)
MQ_LOG_Log
(
LOG_WARNING
,
"timeout (%u ms) waiting for "
"rdkafka clients to shut down"
,
wrk_shutdown_timeout
);
rd_kafka_conf_destroy
(
conf
);
rd_kafka_conf_destroy
(
conf
);
rd_kafka_topic_conf_destroy
(
topic_conf
);
rd_kafka_topic_conf_destroy
(
topic_conf
);
...
...
trackrdrd/src/mq/kafka/mq_kafka.h
View file @
434fd065
...
@@ -65,6 +65,7 @@ char brokerlist[LINE_MAX];
...
@@ -65,6 +65,7 @@ char brokerlist[LINE_MAX];
char
zoolog
[
PATH_MAX
];
char
zoolog
[
PATH_MAX
];
unsigned
zoo_timeout
;
unsigned
zoo_timeout
;
unsigned
stats_interval
;
unsigned
stats_interval
;
unsigned
wrk_shutdown_timeout
;
rd_kafka_topic_conf_t
*
topic_conf
;
rd_kafka_topic_conf_t
*
topic_conf
;
rd_kafka_conf_t
*
conf
;
rd_kafka_conf_t
*
conf
;
...
...
trackrdrd/src/mq/kafka/worker.c
View file @
434fd065
...
@@ -33,12 +33,21 @@
...
@@ -33,12 +33,21 @@
#include <stdlib.h>
#include <stdlib.h>
#include <errno.h>
#include <errno.h>
#include <string.h>
#include <string.h>
#include <time.h>
#include "mq_kafka.h"
#include "mq_kafka.h"
#include "miniobj.h"
#include "miniobj.h"
static
char
errmsg
[
LINE_MAX
];
static
char
errmsg
[
LINE_MAX
];
static
unsigned
get_clock_ms
(
void
)
{
struct
timespec
t
;
AZ
(
clock_gettime
(
CLOCK_REALTIME
,
&
t
));
return
(
t
.
tv_sec
*
1e3
)
+
(
t
.
tv_nsec
/
1e6
);
}
const
char
const
char
*
WRK_Init
(
int
wrk_num
)
*
WRK_Init
(
int
wrk_num
)
{
{
...
@@ -123,6 +132,7 @@ void
...
@@ -123,6 +132,7 @@ void
WRK_Fini
(
kafka_wrk_t
*
wrk
)
WRK_Fini
(
kafka_wrk_t
*
wrk
)
{
{
int
wrk_num
;
int
wrk_num
;
unsigned
t
=
0
;
CHECK_OBJ_NOTNULL
(
wrk
,
KAFKA_WRK_MAGIC
);
CHECK_OBJ_NOTNULL
(
wrk
,
KAFKA_WRK_MAGIC
);
...
@@ -130,9 +140,17 @@ WRK_Fini(kafka_wrk_t *wrk)
...
@@ -130,9 +140,17 @@ WRK_Fini(kafka_wrk_t *wrk)
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
/* Wait for messages to be delivered */
/* Wait for messages to be delivered */
/* XXX: timeout? configure poll timeout? */
if
(
wrk_shutdown_timeout
)
while
(
rd_kafka_outq_len
(
wrk
->
kafka
)
>
0
)
t
=
get_clock_ms
();
while
(
rd_kafka_outq_len
(
wrk
->
kafka
)
>
0
)
{
rd_kafka_poll
(
wrk
->
kafka
,
100
);
rd_kafka_poll
(
wrk
->
kafka
,
100
);
if
(
t
&&
(
get_clock_ms
()
-
t
>
wrk_shutdown_timeout
))
{
MQ_LOG_Log
(
LOG_WARNING
,
"%s: timeout (%u ms) waiting for message delivery"
,
rd_kafka_name
(
wrk
->
kafka
),
wrk_shutdown_timeout
);
break
;
}
}
rd_kafka_topic_destroy
(
wrk
->
topic
);
rd_kafka_topic_destroy
(
wrk
->
topic
);
rd_kafka_destroy
(
wrk
->
kafka
);
rd_kafka_destroy
(
wrk
->
kafka
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment