Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
4b573cf5
Commit
4b573cf5
authored
Jun 03, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
MQ plugin for Kafka refactoring: encapsulate worker functions and the
partitioner callback
parent
bf7c925e
Changes
7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
249 additions
and
151 deletions
+249
-151
Makefile.am
trackrdrd/src/mq/kafka/Makefile.am
+2
-0
callback.c
trackrdrd/src/mq/kafka/callback.c
+80
-0
mq.c
trackrdrd/src/mq/kafka/mq.c
+8
-138
mq_kafka.h
trackrdrd/src/mq/kafka/mq_kafka.h
+15
-0
Makefile.am
trackrdrd/src/mq/kafka/test/Makefile.am
+2
-0
worker.c
trackrdrd/src/mq/kafka/worker.c
+141
-0
zookeeper.c
trackrdrd/src/mq/kafka/zookeeper.c
+1
-13
No files found.
trackrdrd/src/mq/kafka/Makefile.am
View file @
4b573cf5
...
...
@@ -14,6 +14,8 @@ libtrackrdr_kafka_la_SOURCES = \
log.c
\
monitor.c
\
zookeeper.c
\
worker.c
\
callback.c
\
$(top_srcdir)
/src/config_common.c
libtrackrdr_kafka_la_LIBADD
=
\
...
...
trackrdrd/src/mq/kafka/callback.c
0 → 100644
View file @
4b573cf5
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/* Callbacks used by rdkafka */
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <syslog.h>
#include "mq_kafka.h"
/*
* Partitioner assumes that the key string is an unsigned 32-bit
* hexadecimal.
*/
int32_t
CB_Partitioner
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
,
void
*
rkt_opaque
,
void
*
msg_opaque
)
{
int32_t
partition
;
unsigned
long
key
;
char
keystr
[
sizeof
(
"ffffffff"
)],
*
endptr
=
NULL
;
(
void
)
rkt_opaque
;
(
void
)
msg_opaque
;
assert
(
partition_cnt
>
0
);
assert
(
keylen
<=
8
);
strncpy
(
keystr
,
(
const
char
*
)
keydata
,
keylen
);
keystr
[
keylen
]
=
'\0'
;
errno
=
0
;
key
=
strtoul
(
keystr
,
&
endptr
,
16
);
if
(
errno
!=
0
||
*
endptr
!=
'\0'
||
key
>
0xffffffffUL
)
{
MQ_LOG_Log
(
LOG_ERR
,
"Cannot parse partition key: %.*s"
,
(
int
)
keylen
,
(
const
char
*
)
keydata
);
return
RD_KAFKA_PARTITION_UA
;
}
if
((
partition_cnt
&
(
partition_cnt
-
1
))
==
0
)
/* partition_cnt is a power of 2 */
partition
=
key
&
(
partition_cnt
-
1
);
else
partition
=
key
%
partition_cnt
;
if
(
!
rd_kafka_topic_partition_available
(
rkt
,
partition
))
{
MQ_LOG_Log
(
LOG_ERR
,
"Partition %d not available"
,
partition
);
return
RD_KAFKA_PARTITION_UA
;
}
MQ_LOG_Log
(
LOG_DEBUG
,
"Computed partition %d for key %.*s"
,
partition
,
(
int
)
keylen
,
(
const
char
*
)
keydata
);
return
partition
;
}
trackrdrd/src/mq/kafka/mq.c
View file @
4b573cf5
...
...
@@ -30,7 +30,6 @@
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <strings.h>
...
...
@@ -68,7 +67,6 @@ static char brokerlist[LINE_MAX] = "";
static
char
zoolog
[
PATH_MAX
]
=
""
;
static
unsigned
zoo_timeout
=
0
;
static
char
topic
[
LINE_MAX
]
=
""
;
static
rd_kafka_topic_conf_t
*
topic_conf
;
static
rd_kafka_conf_t
*
conf
;
...
...
@@ -77,7 +75,6 @@ static unsigned stats_interval = 0;
static
char
errmsg
[
LINE_MAX
];
static
char
_version
[
LINE_MAX
];
static
int
loglvl
=
LOG_INFO
;
static
int
saved_lvl
=
LOG_INFO
;
static
int
debug_toggle
=
0
;
struct
sigaction
toggle_action
;
...
...
@@ -162,135 +159,6 @@ stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
return
0
;
}
/*
* Partitioner assumes that the key string is an unsigned 32-bit
* hexadecimal.
*/
static
int32_t
partitioner_cb
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
,
void
*
rkt_opaque
,
void
*
msg_opaque
)
{
int32_t
partition
;
unsigned
long
key
;
char
keystr
[
sizeof
(
"ffffffff"
)],
*
endptr
=
NULL
;
(
void
)
rkt_opaque
;
(
void
)
msg_opaque
;
assert
(
partition_cnt
>
0
);
assert
(
keylen
<=
8
);
strncpy
(
keystr
,
(
const
char
*
)
keydata
,
keylen
);
keystr
[
keylen
]
=
'\0'
;
errno
=
0
;
key
=
strtoul
(
keystr
,
&
endptr
,
16
);
if
(
errno
!=
0
||
*
endptr
!=
'\0'
||
key
>
0xffffffffUL
)
{
MQ_LOG_Log
(
LOG_ERR
,
"Cannot parse partition key: %.*s"
,
(
int
)
keylen
,
(
const
char
*
)
keydata
);
return
RD_KAFKA_PARTITION_UA
;
}
if
((
partition_cnt
&
(
partition_cnt
-
1
))
==
0
)
/* partition_cnt is a power of 2 */
partition
=
key
&
(
partition_cnt
-
1
);
else
partition
=
key
%
partition_cnt
;
if
(
!
rd_kafka_topic_partition_available
(
rkt
,
partition
))
{
MQ_LOG_Log
(
LOG_ERR
,
"Partition %d not available"
,
partition
);
return
RD_KAFKA_PARTITION_UA
;
}
MQ_LOG_Log
(
LOG_DEBUG
,
"Computed partition %d for key %.*s"
,
partition
,
(
int
)
keylen
,
(
const
char
*
)
keydata
);
return
partition
;
}
/* XXX: encapsulate wrk_init and _fini in a separate source */
static
const
char
*
wrk_init
(
int
wrk_num
)
{
char
clientid
[
sizeof
(
"libtrackrdr-kafka-worker-2147483648"
)];
rd_kafka_conf_t
*
wrk_conf
;
rd_kafka_topic_conf_t
*
wrk_topic_conf
;
rd_kafka_t
*
rk
;
rd_kafka_topic_t
*
rkt
;
kafka_wrk_t
*
wrk
;
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
wrk_conf
=
rd_kafka_conf_dup
(
conf
);
wrk_topic_conf
=
rd_kafka_topic_conf_dup
(
topic_conf
);
sprintf
(
clientid
,
"libtrackrdr-kafka-worker-%d"
,
wrk_num
);
if
(
rd_kafka_conf_set
(
wrk_conf
,
"client.id"
,
clientid
,
errmsg
,
LINE_MAX
)
!=
RD_KAFKA_CONF_OK
)
{
MQ_LOG_Log
(
LOG_ERR
,
"rdkafka config error [client.id = %s]: %s"
,
clientid
,
errmsg
);
return
errmsg
;
}
rd_kafka_topic_conf_set_partitioner_cb
(
wrk_topic_conf
,
partitioner_cb
);
ALLOC_OBJ
(
wrk
,
KAFKA_WRK_MAGIC
);
if
(
wrk
==
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Failed to create worker handle: %s"
,
strerror
(
errno
));
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
rd_kafka_conf_set_opaque
(
wrk_conf
,
(
void
*
)
wrk
);
rd_kafka_topic_conf_set_opaque
(
wrk_topic_conf
,
(
void
*
)
wrk
);
rk
=
rd_kafka_new
(
RD_KAFKA_PRODUCER
,
wrk_conf
,
errmsg
,
LINE_MAX
);
if
(
rk
==
NULL
)
{
MQ_LOG_Log
(
LOG_ERR
,
"Failed to create producer: %s"
,
errmsg
);
return
errmsg
;
}
CHECK_OBJ_NOTNULL
((
kafka_wrk_t
*
)
rd_kafka_opaque
(
rk
),
KAFKA_WRK_MAGIC
);
rd_kafka_set_log_level
(
rk
,
loglvl
);
errno
=
0
;
rkt
=
rd_kafka_topic_new
(
rk
,
topic
,
wrk_topic_conf
);
if
(
rkt
==
NULL
)
{
rd_kafka_resp_err_t
rkerr
=
rd_kafka_errno2err
(
errno
);
snprintf
(
errmsg
,
LINE_MAX
,
"Failed to initialize topic: %s"
,
rd_kafka_err2str
(
rkerr
));
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
wrk
->
n
=
wrk_num
;
wrk
->
kafka
=
rk
;
wrk
->
topic
=
rkt
;
wrk
->
errmsg
[
0
]
=
'\0'
;
wrk
->
seen
=
wrk
->
produced
=
wrk
->
delivered
=
wrk
->
failed
=
wrk
->
nokey
=
wrk
->
badkey
=
wrk
->
nodata
=
0
;
workers
[
wrk_num
]
=
wrk
;
MQ_LOG_Log
(
LOG_INFO
,
"initialized worker %d: %s"
,
wrk_num
,
rd_kafka_name
(
wrk
->
kafka
));
rd_kafka_poll
(
wrk
->
kafka
,
0
);
return
NULL
;
}
static
void
wrk_fini
(
kafka_wrk_t
*
wrk
)
{
int
wrk_num
;
CHECK_OBJ_NOTNULL
(
wrk
,
KAFKA_WRK_MAGIC
);
wrk_num
=
wrk
->
n
;
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
/* Wait for messages to be delivered */
/* XXX: timeout? configure poll timeout? */
while
(
rd_kafka_outq_len
(
wrk
->
kafka
)
>
0
)
rd_kafka_poll
(
wrk
->
kafka
,
100
);
rd_kafka_topic_destroy
(
wrk
->
topic
);
rd_kafka_destroy
(
wrk
->
kafka
);
FREE_OBJ
(
wrk
);
AN
(
wrk
);
workers
[
wrk_num
]
=
NULL
;
}
static
int
conf_add
(
const
char
*
lval
,
const
char
*
rval
)
{
...
...
@@ -386,6 +254,8 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
nwrk
=
nworkers
;
conf
=
rd_kafka_conf_new
();
topic_conf
=
rd_kafka_topic_conf_new
();
loglvl
=
LOG_INFO
;
topic
[
0
]
=
'\0'
;
if
(
CONF_ReadFile
(
config_fname
,
conf_add
)
!=
0
)
return
"Error reading config file for Kafka"
;
...
...
@@ -462,7 +332,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
rd_kafka_conf_set_error_cb
(
conf
,
error_cb
);
rd_kafka_conf_set_log_cb
(
conf
,
log_cb
);
rd_kafka_conf_set_stats_cb
(
conf
,
stats_cb
);
rd_kafka_topic_conf_set_partitioner_cb
(
topic_conf
,
partitioner_cb
);
rd_kafka_topic_conf_set_partitioner_cb
(
topic_conf
,
CB_Partitioner
);
if
(
loglvl
==
LOG_DEBUG
)
{
size_t
cfglen
;
...
...
@@ -537,7 +407,7 @@ MQ_InitConnections(void)
}
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
{
const
char
*
err
=
wrk_init
(
i
);
const
char
*
err
=
WRK_Init
(
i
,
conf
,
topic_conf
);
if
(
err
!=
NULL
)
return
err
;
}
...
...
@@ -641,9 +511,9 @@ MQ_Reconnect(void **priv)
CAST_OBJ_NOTNULL
(
wrk
,
*
priv
,
KAFKA_WRK_MAGIC
);
wrk_num
=
wrk
->
n
;
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
wrk_f
ini
(
wrk
);
WRK_F
ini
(
wrk
);
err
=
wrk_init
(
wrk_num
);
err
=
WRK_Init
(
wrk_num
,
conf
,
topic_conf
);
if
(
err
!=
NULL
)
return
err
;
*
priv
=
workers
[
wrk_num
];
...
...
@@ -673,7 +543,7 @@ MQ_WorkerShutdown(void **priv)
kafka_wrk_t
*
wrk
;
CAST_OBJ_NOTNULL
(
wrk
,
*
priv
,
KAFKA_WRK_MAGIC
);
wrk_f
ini
(
wrk
);
WRK_F
ini
(
wrk
);
*
priv
=
NULL
;
return
NULL
;
...
...
@@ -687,7 +557,7 @@ MQ_GlobalShutdown(void)
MQ_MON_Fini
();
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
if
(
workers
[
i
]
!=
NULL
)
wrk_f
ini
(
workers
[
i
]);
WRK_F
ini
(
workers
[
i
]);
free
(
workers
);
rd_kafka_conf_destroy
(
conf
);
...
...
trackrdrd/src/mq/kafka/mq_kafka.h
View file @
4b573cf5
...
...
@@ -56,6 +56,10 @@ typedef struct kafka_wrk {
kafka_wrk_t
**
workers
;
unsigned
nwrk
;
char
topic
[
LINE_MAX
];
int
loglvl
;
/* log.c */
int
MQ_LOG_Open
(
const
char
*
path
);
void
MQ_LOG_Log
(
int
level
,
const
char
*
msg
,
...);
...
...
@@ -72,3 +76,14 @@ const char *MQ_ZOO_Init(char *zooservers, unsigned timeout, char *brokerlist,
const
char
*
MQ_ZOO_SetLog
(
const
char
*
path
);
void
MQ_ZOO_SetLogLevel
(
int
level
);
const
char
*
MQ_ZOO_Fini
(
void
);
/* worker.c */
const
char
*
WRK_Init
(
int
wrk_num
,
rd_kafka_conf_t
*
conf
,
rd_kafka_topic_conf_t
*
topic_conf
);
void
WRK_AddBrokers
(
const
char
*
brokers
);
void
WRK_Fini
(
kafka_wrk_t
*
wrk
);
/* callback.c */
int32_t
CB_Partitioner
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
,
void
*
rkt_opaque
,
void
*
msg_opaque
);
trackrdrd/src/mq/kafka/test/Makefile.am
View file @
4b573cf5
...
...
@@ -15,6 +15,8 @@ test_kafka_LDADD = \
../log.
$(OBJEXT)
\
../monitor.
$(OBJEXT)
\
../zookeeper.
$(OBJEXT)
\
../worker.
$(OBJEXT)
\
../callback.
$(OBJEXT)
\
${
PTHREAD_LIBS
}
\
-lrdkafka
-lz
-lpthread
-lrt
-lzookeeper_mt
-lpcre
...
...
trackrdrd/src/mq/kafka/worker.c
0 → 100644
View file @
4b573cf5
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <syslog.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include "mq_kafka.h"
#include "miniobj.h"
static
char
errmsg
[
LINE_MAX
];
const
char
*
WRK_Init
(
int
wrk_num
,
rd_kafka_conf_t
*
conf
,
rd_kafka_topic_conf_t
*
topic_conf
)
{
char
clientid
[
sizeof
(
"libtrackrdr-kafka-worker-2147483648"
)];
rd_kafka_conf_t
*
wrk_conf
;
rd_kafka_topic_conf_t
*
wrk_topic_conf
;
rd_kafka_t
*
rk
;
rd_kafka_topic_t
*
rkt
;
kafka_wrk_t
*
wrk
;
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
wrk_conf
=
rd_kafka_conf_dup
(
conf
);
wrk_topic_conf
=
rd_kafka_topic_conf_dup
(
topic_conf
);
sprintf
(
clientid
,
"libtrackrdr-kafka-worker-%d"
,
wrk_num
);
if
(
rd_kafka_conf_set
(
wrk_conf
,
"client.id"
,
clientid
,
errmsg
,
LINE_MAX
)
!=
RD_KAFKA_CONF_OK
)
{
MQ_LOG_Log
(
LOG_ERR
,
"rdkafka config error [client.id = %s]: %s"
,
clientid
,
errmsg
);
return
errmsg
;
}
rd_kafka_topic_conf_set_partitioner_cb
(
wrk_topic_conf
,
CB_Partitioner
);
ALLOC_OBJ
(
wrk
,
KAFKA_WRK_MAGIC
);
if
(
wrk
==
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Failed to create worker handle: %s"
,
strerror
(
errno
));
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
rd_kafka_conf_set_opaque
(
wrk_conf
,
(
void
*
)
wrk
);
rd_kafka_topic_conf_set_opaque
(
wrk_topic_conf
,
(
void
*
)
wrk
);
rk
=
rd_kafka_new
(
RD_KAFKA_PRODUCER
,
wrk_conf
,
errmsg
,
LINE_MAX
);
if
(
rk
==
NULL
)
{
MQ_LOG_Log
(
LOG_ERR
,
"Failed to create producer: %s"
,
errmsg
);
return
errmsg
;
}
CHECK_OBJ_NOTNULL
((
kafka_wrk_t
*
)
rd_kafka_opaque
(
rk
),
KAFKA_WRK_MAGIC
);
rd_kafka_set_log_level
(
rk
,
loglvl
);
errno
=
0
;
rkt
=
rd_kafka_topic_new
(
rk
,
topic
,
wrk_topic_conf
);
if
(
rkt
==
NULL
)
{
rd_kafka_resp_err_t
rkerr
=
rd_kafka_errno2err
(
errno
);
snprintf
(
errmsg
,
LINE_MAX
,
"Failed to initialize topic: %s"
,
rd_kafka_err2str
(
rkerr
));
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
wrk
->
n
=
wrk_num
;
wrk
->
kafka
=
rk
;
wrk
->
topic
=
rkt
;
wrk
->
errmsg
[
0
]
=
'\0'
;
wrk
->
seen
=
wrk
->
produced
=
wrk
->
delivered
=
wrk
->
failed
=
wrk
->
nokey
=
wrk
->
badkey
=
wrk
->
nodata
=
0
;
workers
[
wrk_num
]
=
wrk
;
MQ_LOG_Log
(
LOG_INFO
,
"initialized worker %d: %s"
,
wrk_num
,
rd_kafka_name
(
wrk
->
kafka
));
rd_kafka_poll
(
wrk
->
kafka
,
0
);
return
NULL
;
}
void
WRK_AddBrokers
(
const
char
*
brokers
)
{
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
if
(
workers
[
i
]
!=
NULL
)
{
int
nbrokers
;
CHECK_OBJ
(
workers
[
i
],
KAFKA_WRK_MAGIC
);
nbrokers
=
rd_kafka_brokers_add
(
workers
[
i
]
->
kafka
,
brokers
);
/* XXX: poll timeout configurable? */
rd_kafka_poll
(
workers
[
i
]
->
kafka
,
10
);
MQ_LOG_Log
(
LOG_INFO
,
"%s: added %d brokers [%s]"
,
rd_kafka_name
(
workers
[
i
]
->
kafka
),
nbrokers
,
brokers
);
}
}
void
WRK_Fini
(
kafka_wrk_t
*
wrk
)
{
int
wrk_num
;
CHECK_OBJ_NOTNULL
(
wrk
,
KAFKA_WRK_MAGIC
);
wrk_num
=
wrk
->
n
;
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
/* Wait for messages to be delivered */
/* XXX: timeout? configure poll timeout? */
while
(
rd_kafka_outq_len
(
wrk
->
kafka
)
>
0
)
rd_kafka_poll
(
wrk
->
kafka
,
100
);
rd_kafka_topic_destroy
(
wrk
->
topic
);
rd_kafka_destroy
(
wrk
->
kafka
);
FREE_OBJ
(
wrk
);
workers
[
wrk_num
]
=
NULL
;
}
trackrdrd/src/mq/kafka/zookeeper.c
View file @
4b573cf5
...
...
@@ -162,19 +162,7 @@ watcher(zhandle_t *zzh, int type, int state, const char *path, void *watcherCtx)
return
;
}
if
(
brokers
[
0
]
!=
'\0'
)
/* XXX: encapsulate */
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
if
(
workers
[
i
]
!=
NULL
)
{
int
nbrokers
;
CHECK_OBJ
(
workers
[
i
],
KAFKA_WRK_MAGIC
);
nbrokers
=
rd_kafka_brokers_add
(
workers
[
i
]
->
kafka
,
brokers
);
/* XXX: poll timeout configurable? */
rd_kafka_poll
(
workers
[
i
]
->
kafka
,
10
);
MQ_LOG_Log
(
LOG_INFO
,
"%s: added %d brokers [%s]"
,
rd_kafka_name
(
workers
[
i
]
->
kafka
),
nbrokers
,
brokers
);
}
WRK_AddBrokers
((
const
char
*
)
brokers
);
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment