Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
d8609795
Commit
d8609795
authored
Jun 03, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
MQ plugin for Kafka: encapsulate configuration
parent
f037540d
Changes
8
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
209 additions
and
143 deletions
+209
-143
Makefile.am
trackrdrd/src/mq/kafka/Makefile.am
+1
-0
config.c
trackrdrd/src/mq/kafka/config.c
+154
-0
monitor.c
trackrdrd/src/mq/kafka/monitor.c
+2
-0
mq.c
trackrdrd/src/mq/kafka/mq.c
+7
-113
mq_kafka.h
trackrdrd/src/mq/kafka/mq_kafka.h
+18
-6
Makefile.am
trackrdrd/src/mq/kafka/test/Makefile.am
+1
-0
worker.c
trackrdrd/src/mq/kafka/worker.c
+1
-1
zookeeper.c
trackrdrd/src/mq/kafka/zookeeper.c
+25
-23
No files found.
trackrdrd/src/mq/kafka/Makefile.am
View file @
d8609795
...
...
@@ -16,6 +16,7 @@ libtrackrdr_kafka_la_SOURCES = \
zookeeper.c
\
worker.c
\
callback.c
\
config.c
\
$(top_srcdir)
/src/config_common.c
libtrackrdr_kafka_la_LIBADD
=
\
...
...
trackrdrd/src/mq/kafka/config.c
0 → 100644
View file @
d8609795
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/* Callbacks used by rdkafka */
#include <string.h>
#include <errno.h>
#include <syslog.h>
#include <stdlib.h>
#include "mq_kafka.h"
void
CONF_Init
(
void
)
{
conf
=
rd_kafka_conf_new
();
topic_conf
=
rd_kafka_topic_conf_new
();
loglvl
=
LOG_INFO
;
topic
[
0
]
=
'\0'
;
logpath
[
0
]
=
'\0'
;
zookeeper
[
0
]
=
'\0'
;
zoo_timeout
=
0
;
stats_interval
=
0
;
zoolog
[
0
]
=
'\0'
;
brokerlist
[
0
]
=
'\0'
;
}
int
CONF_Add
(
const
char
*
lval
,
const
char
*
rval
)
{
rd_kafka_conf_res_t
result
;
char
errstr
[
LINE_MAX
];
errstr
[
0
]
=
'\0'
;
if
(
strcmp
(
lval
,
"mq.log"
)
==
0
)
{
strncpy
(
logpath
,
rval
,
PATH_MAX
);
return
(
0
);
}
if
(
strcmp
(
lval
,
"zookeeper.connect"
)
==
0
)
{
strncpy
(
zookeeper
,
rval
,
LINE_MAX
);
return
(
0
);
}
/* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */
if
(
strcmp
(
lval
,
"zookeeper.timeout"
)
==
0
)
{
char
*
endptr
=
NULL
;
long
val
;
errno
=
0
;
val
=
strtoul
(
rval
,
&
endptr
,
10
);
if
(
errno
!=
0
)
return
errno
;
if
(
*
endptr
!=
'\0'
)
return
EINVAL
;
if
(
val
>
UINT_MAX
)
return
ERANGE
;
zoo_timeout
=
val
;
return
(
0
);
}
if
(
strcmp
(
lval
,
"statistics.interval.ms"
)
==
0
)
{
char
*
endptr
=
NULL
;
long
val
;
errno
=
0
;
val
=
strtoul
(
rval
,
&
endptr
,
10
);
if
(
errno
!=
0
)
return
errno
;
if
(
*
endptr
!=
'\0'
)
return
EINVAL
;
if
(
val
>
UINT_MAX
)
return
ERANGE
;
stats_interval
=
val
;
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
!=
RD_KAFKA_CONF_OK
)
return
EINVAL
;
return
(
0
);
}
if
(
strcmp
(
lval
,
"zookeeper.log"
)
==
0
)
{
strncpy
(
zoolog
,
rval
,
PATH_MAX
);
return
(
0
);
}
if
(
strcmp
(
lval
,
"topic"
)
==
0
)
{
strncpy
(
topic
,
rval
,
LINE_MAX
);
return
(
0
);
}
if
(
strcmp
(
lval
,
"metadata.broker.list"
)
==
0
)
{
strncpy
(
brokerlist
,
rval
,
LINE_MAX
);
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
!=
RD_KAFKA_CONF_OK
)
return
EINVAL
;
return
(
0
);
}
/* XXX: use the rdkakfka param "log_level" instead */
if
(
strcmp
(
lval
,
"mq.debug"
)
==
0
)
{
if
(
strcmp
(
rval
,
"1"
)
==
0
||
strcasecmp
(
rval
,
"true"
)
==
0
||
strcasecmp
(
rval
,
"yes"
)
==
0
||
strcasecmp
(
rval
,
"on"
)
==
0
)
loglvl
=
LOG_DEBUG
;
else
if
(
strcmp
(
rval
,
"0"
)
!=
0
&&
strcasecmp
(
rval
,
"false"
)
!=
0
&&
strcasecmp
(
rval
,
"no"
)
!=
0
&&
strcasecmp
(
rval
,
"off"
)
!=
0
)
return
EINVAL
;
return
(
0
);
}
result
=
rd_kafka_topic_conf_set
(
topic_conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
==
RD_KAFKA_CONF_UNKNOWN
)
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
!=
RD_KAFKA_CONF_OK
)
return
EINVAL
;
else
return
(
0
);
}
void
CONF_Dump
(
void
)
{
MQ_LOG_Log
(
LOG_DEBUG
,
"mq.log = %s"
,
logpath
);
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.connect = %s"
,
zookeeper
);
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.timeout = %u"
,
zoo_timeout
);
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.log = %s"
,
zoolog
);
MQ_LOG_Log
(
LOG_DEBUG
,
"topic = %s"
,
topic
);
// leaving out mq.debug for now
}
trackrdrd/src/mq/kafka/monitor.c
View file @
d8609795
...
...
@@ -77,6 +77,8 @@ static void
struct
timespec
t
;
unsigned
interval
=
*
((
unsigned
*
)
arg
);
AN
(
interval
);
/* Convert ms -> struct timespec */
t
.
tv_sec
=
(
time_t
)
interval
/
1e3
;
t
.
tv_nsec
=
(
interval
%
(
unsigned
)
1e3
)
*
1e6
;
...
...
trackrdrd/src/mq/kafka/mq.c
View file @
d8609795
...
...
@@ -60,18 +60,6 @@
#define SO_VERSION "unknown version"
#endif
static
char
logpath
[
PATH_MAX
]
=
""
;
static
char
zookeeper
[
LINE_MAX
]
=
""
;
static
char
brokerlist
[
LINE_MAX
]
=
""
;
static
char
zoolog
[
PATH_MAX
]
=
""
;
static
unsigned
zoo_timeout
=
0
;
static
rd_kafka_topic_conf_t
*
topic_conf
;
static
rd_kafka_conf_t
*
conf
;
static
unsigned
stats_interval
=
0
;
static
char
errmsg
[
LINE_MAX
];
static
char
_version
[
LINE_MAX
];
...
...
@@ -105,105 +93,13 @@ toggle_debug(int sig)
}
}
static
int
conf_add
(
const
char
*
lval
,
const
char
*
rval
)
{
rd_kafka_conf_res_t
result
;
char
errstr
[
LINE_MAX
];
errstr
[
0
]
=
'\0'
;
if
(
strcmp
(
lval
,
"mq.log"
)
==
0
)
{
strncpy
(
logpath
,
rval
,
PATH_MAX
);
return
(
0
);
}
if
(
strcmp
(
lval
,
"zookeeper.connect"
)
==
0
)
{
strncpy
(
zookeeper
,
rval
,
LINE_MAX
);
return
(
0
);
}
/* XXX: "zookeeper.connection.timeout.ms", to match Kafka config */
if
(
strcmp
(
lval
,
"zookeeper.timeout"
)
==
0
)
{
char
*
endptr
=
NULL
;
long
val
;
errno
=
0
;
val
=
strtoul
(
rval
,
&
endptr
,
10
);
if
(
errno
!=
0
)
return
errno
;
if
(
*
endptr
!=
'\0'
)
return
EINVAL
;
if
(
val
>
UINT_MAX
)
return
ERANGE
;
zoo_timeout
=
val
;
return
(
0
);
}
if
(
strcmp
(
lval
,
"statistics.interval.ms"
)
==
0
)
{
char
*
endptr
=
NULL
;
long
val
;
errno
=
0
;
val
=
strtoul
(
rval
,
&
endptr
,
10
);
if
(
errno
!=
0
)
return
errno
;
if
(
*
endptr
!=
'\0'
)
return
EINVAL
;
if
(
val
>
UINT_MAX
)
return
ERANGE
;
stats_interval
=
val
;
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
!=
RD_KAFKA_CONF_OK
)
return
EINVAL
;
return
(
0
);
}
if
(
strcmp
(
lval
,
"zookeeper.log"
)
==
0
)
{
strncpy
(
zoolog
,
rval
,
PATH_MAX
);
return
(
0
);
}
if
(
strcmp
(
lval
,
"topic"
)
==
0
)
{
strncpy
(
topic
,
rval
,
LINE_MAX
);
return
(
0
);
}
if
(
strcmp
(
lval
,
"metadata.broker.list"
)
==
0
)
{
strncpy
(
brokerlist
,
rval
,
LINE_MAX
);
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
!=
RD_KAFKA_CONF_OK
)
return
EINVAL
;
return
(
0
);
}
/* XXX: use the rdkakfka param "log_level" instead */
if
(
strcmp
(
lval
,
"mq.debug"
)
==
0
)
{
if
(
strcmp
(
rval
,
"1"
)
==
0
||
strcasecmp
(
rval
,
"true"
)
==
0
||
strcasecmp
(
rval
,
"yes"
)
==
0
||
strcasecmp
(
rval
,
"on"
)
==
0
)
loglvl
=
LOG_DEBUG
;
else
if
(
strcmp
(
rval
,
"0"
)
!=
0
&&
strcasecmp
(
rval
,
"false"
)
!=
0
&&
strcasecmp
(
rval
,
"no"
)
!=
0
&&
strcasecmp
(
rval
,
"off"
)
!=
0
)
return
EINVAL
;
return
(
0
);
}
result
=
rd_kafka_topic_conf_set
(
topic_conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
==
RD_KAFKA_CONF_UNKNOWN
)
result
=
rd_kafka_conf_set
(
conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
!=
RD_KAFKA_CONF_OK
)
return
EINVAL
;
else
return
(
0
);
}
const
char
*
MQ_GlobalInit
(
unsigned
nworkers
,
const
char
*
config_fname
)
{
CONF_Init
();
nwrk
=
nworkers
;
conf
=
rd_kafka_conf_new
();
topic_conf
=
rd_kafka_topic_conf_new
();
loglvl
=
LOG_INFO
;
topic
[
0
]
=
'\0'
;
if
(
CONF_ReadFile
(
config_fname
,
conf_a
dd
)
!=
0
)
if
(
CONF_ReadFile
(
config_fname
,
CONF_A
dd
)
!=
0
)
return
"Error reading config file for Kafka"
;
if
(
logpath
[
0
]
!=
'\0'
)
{
...
...
@@ -255,7 +151,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
if
(
zoolog
[
0
]
!=
'\0'
)
{
const
char
*
err
=
MQ_ZOO_
SetLog
(
zoolog
);
const
char
*
err
=
MQ_ZOO_
OpenLog
(
);
if
(
err
!=
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot open zookeeper.log %s: %s"
,
zoolog
,
err
);
...
...
@@ -285,8 +181,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
const
char
**
cfg
;
/* Dump config */
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.connect = %s"
,
zookeeper
);
MQ_LOG_Log
(
LOG_DEBUG
,
"topic = %s"
,
topic
);
CONF_Dump
();
cfg
=
rd_kafka_conf_dump
(
conf
,
&
cfglen
);
if
(
cfg
!=
NULL
&&
cfglen
>
0
)
for
(
int
i
=
0
;
i
<
cfglen
>>
1
;
i
++
)
{
...
...
@@ -318,8 +213,7 @@ MQ_InitConnections(void)
char
zbrokerlist
[
LINE_MAX
];
const
char
*
err
;
if
((
err
=
MQ_ZOO_Init
(
zookeeper
,
zoo_timeout
,
zbrokerlist
,
LINE_MAX
))
!=
NULL
)
{
if
((
err
=
MQ_ZOO_Init
(
zbrokerlist
,
LINE_MAX
))
!=
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Failed to init/connect to zookeeper [%s]: %s"
,
zookeeper
,
err
);
...
...
@@ -353,7 +247,7 @@ MQ_InitConnections(void)
}
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
{
const
char
*
err
=
WRK_Init
(
i
,
conf
,
topic_conf
);
const
char
*
err
=
WRK_Init
(
i
);
if
(
err
!=
NULL
)
return
err
;
}
...
...
@@ -459,7 +353,7 @@ MQ_Reconnect(void **priv)
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
WRK_Fini
(
wrk
);
err
=
WRK_Init
(
wrk_num
,
conf
,
topic_conf
);
err
=
WRK_Init
(
wrk_num
);
if
(
err
!=
NULL
)
return
err
;
*
priv
=
workers
[
wrk_num
];
...
...
trackrdrd/src/mq/kafka/mq_kafka.h
View file @
d8609795
...
...
@@ -56,9 +56,18 @@ typedef struct kafka_wrk {
kafka_wrk_t
**
workers
;
unsigned
nwrk
;
/* configuration */
char
topic
[
LINE_MAX
];
int
loglvl
;
char
logpath
[
PATH_MAX
];
char
zookeeper
[
LINE_MAX
];
char
brokerlist
[
LINE_MAX
];
char
zoolog
[
PATH_MAX
];
unsigned
zoo_timeout
;
unsigned
stats_interval
;
rd_kafka_topic_conf_t
*
topic_conf
;
rd_kafka_conf_t
*
conf
;
/* log.c */
int
MQ_LOG_Open
(
const
char
*
path
);
...
...
@@ -71,15 +80,13 @@ int MQ_MON_Init(unsigned interval);
void
MQ_MON_Fini
(
void
);
/* zookeeper.c */
const
char
*
MQ_ZOO_Init
(
char
*
zooservers
,
unsigned
timeout
,
char
*
brokerlist
,
int
max
);
const
char
*
MQ_ZOO_SetLog
(
const
char
*
path
);
const
char
*
MQ_ZOO_Init
(
char
*
brokers
,
int
max
);
const
char
*
MQ_ZOO_OpenLog
(
void
);
void
MQ_ZOO_SetLogLevel
(
int
level
);
const
char
*
MQ_ZOO_Fini
(
void
);
/* worker.c */
const
char
*
WRK_Init
(
int
wrk_num
,
rd_kafka_conf_t
*
conf
,
rd_kafka_topic_conf_t
*
topic_conf
);
const
char
*
WRK_Init
(
int
wrk_num
);
void
WRK_AddBrokers
(
const
char
*
brokers
);
void
WRK_Fini
(
kafka_wrk_t
*
wrk
);
...
...
@@ -92,3 +99,8 @@ void CB_DeliveryReport(rd_kafka_t *rk, void *payload, size_t len,
rd_kafka_resp_err_t
err
,
void
*
opaque
,
void
*
msg_opaque
);
void
CB_Error
(
rd_kafka_t
*
rk
,
int
err
,
const
char
*
reason
,
void
*
opaque
);
int
CB_Stats
(
rd_kafka_t
*
rk
,
char
*
json
,
size_t
json_len
,
void
*
opaque
);
/* config.c */
void
CONF_Init
(
void
);
int
CONF_Add
(
const
char
*
lval
,
const
char
*
rval
);
void
CONF_Dump
(
void
);
trackrdrd/src/mq/kafka/test/Makefile.am
View file @
d8609795
...
...
@@ -17,6 +17,7 @@ test_kafka_LDADD = \
../zookeeper.
$(OBJEXT)
\
../worker.
$(OBJEXT)
\
../callback.
$(OBJEXT)
\
../config.
$(OBJEXT)
\
${
PTHREAD_LIBS
}
\
-lrdkafka
-lz
-lpthread
-lrt
-lzookeeper_mt
-lpcre
...
...
trackrdrd/src/mq/kafka/worker.c
View file @
d8609795
...
...
@@ -40,7 +40,7 @@
static
char
errmsg
[
LINE_MAX
];
const
char
*
WRK_Init
(
int
wrk_num
,
rd_kafka_conf_t
*
conf
,
rd_kafka_topic_conf_t
*
topic_conf
)
*
WRK_Init
(
int
wrk_num
)
{
char
clientid
[
sizeof
(
"libtrackrdr-kafka-worker-2147483648"
)];
rd_kafka_conf_t
*
wrk_conf
;
...
...
trackrdrd/src/mq/kafka/zookeeper.c
View file @
d8609795
...
...
@@ -57,16 +57,16 @@ static char errmsg[LINE_MAX];
static
FILE
*
zoologf
=
NULL
;
static
const
char
*
setBrokerList
(
char
*
broker
list
,
int
max
)
*
setBrokerList
(
char
*
broker
s
,
int
max
)
{
struct
String_vector
brokers
;
struct
String_vector
broker
_id
s
;
int
result
;
char
*
brokerptr
=
broker
list
;
char
*
brokerptr
=
broker
s
;
const
char
*
pcre_err
;
AN
(
zh
);
if
((
result
=
zoo_get_children
(
zh
,
BROKER_PATH
,
1
,
&
brokers
))
!=
ZOK
)
{
if
((
result
=
zoo_get_children
(
zh
,
BROKER_PATH
,
1
,
&
broker
_id
s
))
!=
ZOK
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot get broker ids from zookeeper: %s"
,
zerror
(
result
));
return
errmsg
;
...
...
@@ -83,16 +83,16 @@ static const char
AN
(
port_regex
);
}
memset
(
broker
list
,
0
,
max
);
for
(
int
i
=
0
;
i
<
brokers
.
count
;
i
++
)
{
memset
(
broker
s
,
0
,
max
);
for
(
int
i
=
0
;
i
<
broker
_id
s
.
count
;
i
++
)
{
char
path
[
PATH_MAX
],
broker
[
LINE_MAX
];
int
len
=
LINE_MAX
;
snprintf
(
path
,
PATH_MAX
,
"/brokers/ids/%s"
,
brokers
.
data
[
i
]);
snprintf
(
path
,
PATH_MAX
,
"/brokers/ids/%s"
,
broker
_id
s
.
data
[
i
]);
if
((
result
=
zoo_get
(
zh
,
path
,
0
,
broker
,
&
len
,
NULL
))
!=
ZOK
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot get config for broker id %s: %s"
,
brokers
.
data
[
i
],
zerror
(
result
));
broker
_id
s
.
data
[
i
],
zerror
(
result
));
return
errmsg
;
}
if
(
len
>
0
)
{
...
...
@@ -101,13 +101,13 @@ static const char
broker
[
len
]
=
'\0'
;
MQ_LOG_Log
(
LOG_DEBUG
,
"Zookeeper broker id %s config: %s"
,
brokers
.
data
[
i
],
broker
);
broker
_id
s
.
data
[
i
],
broker
);
r
=
pcre_exec
(
host_regex
,
NULL
,
broker
,
len
,
0
,
0
,
ovector
,
6
);
if
(
r
<=
PCRE_ERROR_NOMATCH
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Host not found in config for broker id %s [%s]"
,
brokers
.
data
[
i
],
broker
);
broker
_id
s
.
data
[
i
],
broker
);
return
errmsg
;
}
pcre_get_substring
(
broker
,
ovector
,
r
,
1
,
&
host
);
...
...
@@ -116,30 +116,30 @@ static const char
if
(
r
<=
PCRE_ERROR_NOMATCH
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Port not found in config for broker id %s [%s]"
,
brokers
.
data
[
i
],
broker
);
broker
_id
s
.
data
[
i
],
broker
);
return
errmsg
;
}
pcre_get_substring
(
broker
,
ovector
,
r
,
1
,
&
port
);
AN
(
port
);
if
(
strlen
(
broker
list
)
+
strlen
(
host
)
+
strlen
(
port
)
+
2
>
max
)
{
if
(
strlen
(
broker
s
)
+
strlen
(
host
)
+
strlen
(
port
)
+
2
>
max
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Broker list length exceeds max %d [%s%s:%s]"
,
max
,
broker
list
,
host
,
port
);
max
,
broker
s
,
host
,
port
);
return
errmsg
;
}
sprintf
(
brokerptr
,
"%s:%s"
,
host
,
port
);
pcre_free_substring
(
host
);
pcre_free_substring
(
port
);
brokerptr
+=
strlen
(
brokerptr
);
if
(
i
<
brokers
.
count
-
1
)
if
(
i
<
broker
_id
s
.
count
-
1
)
*
brokerptr
++
=
','
;
}
else
MQ_LOG_Log
(
LOG_WARNING
,
"Empty config returned for broker id %s"
,
brokers
.
data
[
i
]);
broker
_id
s
.
data
[
i
]);
}
deallocate_String_vector
(
&
brokers
);
deallocate_String_vector
(
&
broker
_id
s
);
return
NULL
;
}
...
...
@@ -167,25 +167,27 @@ watcher(zhandle_t *zzh, int type, int state, const char *path, void *watcherCtx)
}
const
char
*
MQ_ZOO_Init
(
char
*
zooservers
,
unsigned
timeout
,
char
*
brokerlist
,
int
max
)
*
MQ_ZOO_Init
(
char
*
brokers
,
int
max
)
{
AN
(
zookeeper
[
0
]);
/* XXX: wait for ZOO_CONNECTED_STATE */
errno
=
0
;
zh
=
zookeeper_init
(
zoo
servers
,
watcher
,
timeout
,
0
,
0
,
0
);
zh
=
zookeeper_init
(
zoo
keeper
,
watcher
,
zoo_
timeout
,
0
,
0
,
0
);
if
(
zh
==
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"init/connect failure: %s"
,
strerror
(
errno
));
return
errmsg
;
}
return
setBrokerList
(
broker
list
,
max
);
return
setBrokerList
(
broker
s
,
max
);
}
const
char
*
MQ_ZOO_
SetLog
(
const
char
*
path
)
*
MQ_ZOO_
OpenLog
(
void
)
{
AN
(
path
);
AN
(
path
[
0
]);
AN
(
zoolog
);
AN
(
zoolog
[
0
]);
zoologf
=
fopen
(
path
,
"a"
);
zoologf
=
fopen
(
zoolog
,
"a"
);
if
(
zoologf
==
NULL
)
{
strncpy
(
errmsg
,
strerror
(
errno
),
LINE_MAX
);
return
errmsg
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment