Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
b0be32c0
Commit
b0be32c0
authored
Jun 01, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
MQ plugin for Kafka: encapsulate zookeeper code in zookeeper.c
parent
6708c236
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
249 additions
and
108 deletions
+249
-108
Makefile.am
trackrdrd/src/mq/kafka/Makefile.am
+2
-1
mq.c
trackrdrd/src/mq/kafka/mq.c
+18
-107
mq_kafka.h
trackrdrd/src/mq/kafka/mq_kafka.h
+7
-0
Makefile.am
trackrdrd/src/mq/kafka/test/Makefile.am
+1
-0
zookeeper.c
trackrdrd/src/mq/kafka/zookeeper.c
+221
-0
No files found.
trackrdrd/src/mq/kafka/Makefile.am
View file @
b0be32c0
INCLUDES
=
-I
$(
VARNISHSRC)
/include
-I
$(VARNISHSRC)
-I
$(
top_srcdir)
/include
INCLUDES
=
-I
$(top_srcdir)
/include
CURRENT
=
3
CURRENT
=
3
REVISION
=
0
REVISION
=
0
...
@@ -13,6 +13,7 @@ libtrackrdr_kafka_la_SOURCES = \
...
@@ -13,6 +13,7 @@ libtrackrdr_kafka_la_SOURCES = \
mq.c
\
mq.c
\
log.c
\
log.c
\
monitor.c
\
monitor.c
\
zookeeper.c
\
$(top_srcdir)
/src/config_common.c
$(top_srcdir)
/src/config_common.c
libtrackrdr_kafka_la_LIBADD
=
\
libtrackrdr_kafka_la_LIBADD
=
\
...
...
trackrdrd/src/mq/kafka/mq.c
View file @
b0be32c0
/*-
/*-
* Copyright (c) 201
2-201
4 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 201
2-201
4 Otto Gmbh & Co KG
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* All rights reserved
* Use only with permission
* Use only with permission
*
*
...
@@ -34,12 +34,10 @@
...
@@ -34,12 +34,10 @@
#include <errno.h>
#include <errno.h>
#include <string.h>
#include <string.h>
#include <strings.h>
#include <strings.h>
#include <limits.h>
#include <syslog.h>
#include <syslog.h>
#include <ctype.h>
#include <ctype.h>
#include <signal.h>
#include <signal.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_version.h>
#include <zookeeper/zookeeper_version.h>
#include <pcre.h>
#include <pcre.h>
...
@@ -68,8 +66,6 @@ static char logpath[PATH_MAX] = "";
...
@@ -68,8 +66,6 @@ static char logpath[PATH_MAX] = "";
static
char
zookeeper
[
LINE_MAX
]
=
""
;
static
char
zookeeper
[
LINE_MAX
]
=
""
;
static
char
brokerlist
[
LINE_MAX
]
=
""
;
static
char
brokerlist
[
LINE_MAX
]
=
""
;
static
char
zoolog
[
PATH_MAX
]
=
""
;
static
char
zoolog
[
PATH_MAX
]
=
""
;
static
FILE
*
zoologf
;
static
zhandle_t
*
zh
;
static
unsigned
zoo_timeout
=
0
;
static
unsigned
zoo_timeout
=
0
;
static
char
topic
[
LINE_MAX
]
=
""
;
static
char
topic
[
LINE_MAX
]
=
""
;
...
@@ -96,16 +92,15 @@ toggle_debug(int sig)
...
@@ -96,16 +92,15 @@ toggle_debug(int sig)
loglvl
=
saved_lvl
;
loglvl
=
saved_lvl
;
debug_toggle
=
0
;
debug_toggle
=
0
;
MQ_LOG_Log
(
LOG_INFO
,
"Debug toggle switched off"
);
MQ_LOG_Log
(
LOG_INFO
,
"Debug toggle switched off"
);
zoo_set_debug_level
(
ZOO_LOG_LEVEL_INFO
);
}
}
else
{
else
{
saved_lvl
=
loglvl
;
saved_lvl
=
loglvl
;
loglvl
=
LOG_DEBUG
;
loglvl
=
LOG_DEBUG
;
debug_toggle
=
1
;
debug_toggle
=
1
;
MQ_LOG_Log
(
LOG_INFO
,
"Debug toggle switched on"
);
MQ_LOG_Log
(
LOG_INFO
,
"Debug toggle switched on"
);
zoo_set_debug_level
(
ZOO_LOG_LEVEL_DEBUG
);
}
}
MQ_LOG_SetLevel
(
loglvl
);
MQ_LOG_SetLevel
(
loglvl
);
MQ_ZOO_SetLogLevel
(
loglvl
);
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
if
(
workers
[
i
]
!=
NULL
)
{
if
(
workers
[
i
]
!=
NULL
)
{
CHECK_OBJ
(
workers
[
i
],
KAFKA_WRK_MAGIC
);
CHECK_OBJ
(
workers
[
i
],
KAFKA_WRK_MAGIC
);
...
@@ -440,14 +435,13 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
...
@@ -440,14 +435,13 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
}
if
(
zoolog
[
0
]
!=
'\0'
)
{
if
(
zoolog
[
0
]
!=
'\0'
)
{
zoologf
=
fopen
(
zoolog
,
"a"
);
const
char
*
err
=
MQ_ZOO_SetLog
(
zoolog
);
if
(
zoologf
=
=
NULL
)
{
if
(
err
!
=
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot open zookeeper.log %s: %s"
,
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot open zookeeper.log %s: %s"
,
zoolog
,
strerror
(
errno
)
);
zoolog
,
err
);
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
return
errmsg
;
}
}
zoo_set_log_stream
(
zoologf
);
}
}
if
(
stats_interval
!=
0
)
{
if
(
stats_interval
!=
0
)
{
...
@@ -487,7 +481,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
...
@@ -487,7 +481,7 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
MQ_LOG_Log
(
LOG_DEBUG
,
"%s = %s"
,
cfg
[
2
*
i
],
cfg
[
2
*
i
+
1
]);
MQ_LOG_Log
(
LOG_DEBUG
,
"%s = %s"
,
cfg
[
2
*
i
],
cfg
[
2
*
i
+
1
]);
rd_kafka_conf_dump_free
(
cfg
,
cfglen
);
rd_kafka_conf_dump_free
(
cfg
,
cfglen
);
zoo_set_debug_level
(
ZOO_LOG_LEVEL
_DEBUG
);
MQ_ZOO_SetLogLevel
(
LOG
_DEBUG
);
}
}
return
NULL
;
return
NULL
;
...
@@ -501,95 +495,17 @@ MQ_InitConnections(void)
...
@@ -501,95 +495,17 @@ MQ_InitConnections(void)
assert
(
zookeeper
[
0
]
!=
'\0'
||
brokerlist
[
0
]
!=
'\0'
);
assert
(
zookeeper
[
0
]
!=
'\0'
||
brokerlist
[
0
]
!=
'\0'
);
if
(
zookeeper
[
0
]
!=
'\0'
)
{
if
(
zookeeper
[
0
]
!=
'\0'
)
{
struct
String_vector
brokers
;
int
result
;
char
zbrokerlist
[
LINE_MAX
];
char
zbrokerlist
[
LINE_MAX
];
char
*
brokerptr
=
zbrokerlist
;
const
char
*
err
;
const
char
*
pcre_err
;
pcre
*
host_regex
,
*
port_regex
;
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */
errno
=
0
;
zh
=
zookeeper_init
(
zookeeper
,
NULL
,
zoo_timeout
,
0
,
0
,
0
);
if
(
zh
==
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Zookeeper init/connect failure: %s"
,
strerror
(
errno
));
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
/* XXX: set watch param to non-zero for watcher callback */
if
((
result
=
zoo_get_children
(
zh
,
"/brokers/ids"
,
0
,
&
brokers
))
!=
ZOK
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot get broker ids from zookeeper: %s"
,
zerror
(
result
));
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
/* XXX: cache compiled pcre regexen for use with the watcher
* callback */
host_regex
=
pcre_compile
(
"
\"
host
\"\\
s*:
\\
s*
\"
([^
\"
]+)
\"
"
,
0
,
&
pcre_err
,
&
result
,
NULL
);
AN
(
host_regex
);
port_regex
=
pcre_compile
(
"
\"
port
\"\\
s*:
\\
s*(
\\
d+)"
,
0
,
&
pcre_err
,
&
result
,
NULL
);
AN
(
port_regex
);
memset
(
zbrokerlist
,
0
,
LINE_MAX
);
for
(
int
i
=
0
;
i
<
brokers
.
count
;
i
++
)
{
char
path
[
PATH_MAX
],
broker
[
LINE_MAX
];
int
len
=
LINE_MAX
;
snprintf
(
path
,
PATH_MAX
,
"/brokers/ids/%s"
,
brokers
.
data
[
i
]);
/* XXX: set up a watcher */
if
((
result
=
zoo_get
(
zh
,
path
,
0
,
broker
,
&
len
,
NULL
))
!=
ZOK
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot get config for broker id %s from zookeeper: %s"
,
brokers
.
data
[
i
],
zerror
(
result
));
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
if
(
len
>
0
)
{
int
ovector
[
6
],
r
;
const
char
*
host
=
NULL
,
*
port
=
NULL
;
broker
[
len
]
=
'\0'
;
MQ_LOG_Log
(
LOG_DEBUG
,
"Zookeeper %s broker id %s config: %s"
,
zookeeper
,
brokers
.
data
[
i
],
broker
);
r
=
pcre_exec
(
host_regex
,
NULL
,
broker
,
len
,
0
,
0
,
ovector
,
6
);
if
((
err
=
MQ_ZOO_Init
(
zookeeper
,
zoo_timeout
,
zbrokerlist
,
LINE_MAX
))
if
(
r
<=
PCRE_ERROR_NOMATCH
)
{
!=
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Host not found in config for broker id %s from "
"zookeeper [%s]"
,
brokers
.
data
[
i
],
broker
);
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
pcre_get_substring
(
broker
,
ovector
,
r
,
1
,
&
host
);
AN
(
host
);
r
=
pcre_exec
(
port_regex
,
NULL
,
broker
,
len
,
0
,
0
,
ovector
,
6
);
if
(
r
<=
PCRE_ERROR_NOMATCH
)
{
snprintf
(
errmsg
,
LINE_MAX
,
snprintf
(
errmsg
,
LINE_MAX
,
"Port not found in config for broker id %s from "
"Failed to init/connect to zookeeper [%s]: %s"
,
"zookeeper [%s]"
,
brokers
.
data
[
i
],
broke
r
);
zookeeper
,
er
r
);
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
return
errmsg
;
}
}
pcre_get_substring
(
broker
,
ovector
,
r
,
1
,
&
port
);
AN
(
port
);
sprintf
(
brokerptr
,
"%s:%s"
,
host
,
port
);
pcre_free_substring
(
host
);
pcre_free_substring
(
port
);
brokerptr
+=
strlen
(
brokerptr
);
if
(
i
<
brokers
.
count
)
*
brokerptr
++
=
','
;
}
else
MQ_LOG_Log
(
LOG_WARNING
,
"Empty config returned from zookeeper "
"for broker id %s"
,
brokers
.
data
[
i
]);
}
deallocate_String_vector
(
&
brokers
);
if
(
zbrokerlist
[
0
]
==
'\0'
)
if
(
zbrokerlist
[
0
]
==
'\0'
)
if
(
brokerlist
[
0
]
==
'\0'
)
{
if
(
brokerlist
[
0
]
==
'\0'
)
{
snprintf
(
errmsg
,
LINE_MAX
,
snprintf
(
errmsg
,
LINE_MAX
,
...
@@ -778,7 +694,7 @@ MQ_WorkerShutdown(void **priv)
...
@@ -778,7 +694,7 @@ MQ_WorkerShutdown(void **priv)
const
char
*
const
char
*
MQ_GlobalShutdown
(
void
)
MQ_GlobalShutdown
(
void
)
{
{
int
zerr
;
const
char
*
err
=
NULL
;
MQ_MON_Fini
();
MQ_MON_Fini
();
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
...
@@ -789,20 +705,15 @@ MQ_GlobalShutdown(void)
...
@@ -789,20 +705,15 @@ MQ_GlobalShutdown(void)
rd_kafka_conf_destroy
(
conf
);
rd_kafka_conf_destroy
(
conf
);
rd_kafka_topic_conf_destroy
(
topic_conf
);
rd_kafka_topic_conf_destroy
(
topic_conf
);
errno
=
0
;
err
=
MQ_ZOO_Fini
();
if
((
zerr
=
zookeeper_close
(
zh
))
!=
ZOK
)
{
if
(
err
!=
NULL
)
{
const
char
*
err
=
zerror
(
zerr
);
if
(
zerr
==
ZSYSTEMERROR
)
snprintf
(
errmsg
,
LINE_MAX
,
"Error closing zookeeper: %s (%s)"
,
err
,
strerror
(
errno
));
else
snprintf
(
errmsg
,
LINE_MAX
,
"Error closing zookeeper: %s"
,
err
);
snprintf
(
errmsg
,
LINE_MAX
,
"Error closing zookeeper: %s"
,
err
);
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
MQ_LOG_Log
(
LOG_ERR
,
errmsg
);
return
errmsg
;
}
}
fclose
(
zoologf
);
MQ_LOG_Log
(
LOG_INFO
,
"shutting down"
);
MQ_LOG_Log
(
LOG_INFO
,
"shutting down"
);
MQ_LOG_Close
();
MQ_LOG_Close
();
if
(
err
!=
NULL
)
return
errmsg
;
return
NULL
;
return
NULL
;
}
}
trackrdrd/src/mq/kafka/mq_kafka.h
View file @
b0be32c0
...
@@ -63,3 +63,10 @@ void MQ_LOG_Close(void);
...
@@ -63,3 +63,10 @@ void MQ_LOG_Close(void);
/* monitor.c */
/* monitor.c */
int
MQ_MON_Init
(
unsigned
interval
);
int
MQ_MON_Init
(
unsigned
interval
);
void
MQ_MON_Fini
(
void
);
void
MQ_MON_Fini
(
void
);
/* zookeeper.c */
const
char
*
MQ_ZOO_Init
(
char
*
zooservers
,
unsigned
timeout
,
char
*
brokerlist
,
int
max
);
const
char
*
MQ_ZOO_SetLog
(
const
char
*
path
);
void
MQ_ZOO_SetLogLevel
(
int
level
);
const
char
*
MQ_ZOO_Fini
(
void
);
trackrdrd/src/mq/kafka/test/Makefile.am
View file @
b0be32c0
...
@@ -14,6 +14,7 @@ test_kafka_LDADD = \
...
@@ -14,6 +14,7 @@ test_kafka_LDADD = \
../mq.
$(OBJEXT)
\
../mq.
$(OBJEXT)
\
../log.
$(OBJEXT)
\
../log.
$(OBJEXT)
\
../monitor.
$(OBJEXT)
\
../monitor.
$(OBJEXT)
\
../zookeeper.
$(OBJEXT)
\
${
PTHREAD_LIBS
}
\
${
PTHREAD_LIBS
}
\
-lrdkafka
-lz
-lpthread
-lrt
-lzookeeper_mt
-lpcre
-lrdkafka
-lz
-lpthread
-lrt
-lzookeeper_mt
-lpcre
...
...
trackrdrd/src/mq/kafka/zookeeper.c
0 → 100644
View file @
b0be32c0
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/*
* Encapsulate interaction of the Kafka MQ plugin with Apache ZooKeeper
* servers
*/
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <syslog.h>
#include <zookeeper/zookeeper.h>
#include <pcre.h>
#include "mq_kafka.h"
static
zhandle_t
*
zh
=
NULL
;
static
pcre
*
host_regex
=
NULL
,
*
port_regex
=
NULL
;
static
char
errmsg
[
LINE_MAX
];
static
FILE
*
zoologf
=
NULL
;
static
const
char
*
setBrokerList
(
char
*
brokerlist
,
int
max
)
{
struct
String_vector
brokers
;
int
result
;
char
*
brokerptr
=
brokerlist
;
const
char
*
pcre_err
;
AN
(
zh
);
/* XXX: set watch param to non-zero for watcher callback */
if
((
result
=
zoo_get_children
(
zh
,
"/brokers/ids"
,
0
,
&
brokers
))
!=
ZOK
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot get broker ids from zookeeper: %s"
,
zerror
(
result
));
return
errmsg
;
}
if
(
host_regex
==
NULL
)
{
host_regex
=
pcre_compile
(
"
\"
host
\"\\
s*:
\\
s*
\"
([^
\"
]+)
\"
"
,
0
,
&
pcre_err
,
&
result
,
NULL
);
AN
(
host_regex
);
}
if
(
port_regex
==
NULL
)
{
port_regex
=
pcre_compile
(
"
\"
port
\"\\
s*:
\\
s*(
\\
d+)"
,
0
,
&
pcre_err
,
&
result
,
NULL
);
AN
(
port_regex
);
}
memset
(
brokerlist
,
0
,
max
);
for
(
int
i
=
0
;
i
<
brokers
.
count
;
i
++
)
{
char
path
[
PATH_MAX
],
broker
[
LINE_MAX
];
int
len
=
LINE_MAX
;
snprintf
(
path
,
PATH_MAX
,
"/brokers/ids/%s"
,
brokers
.
data
[
i
]);
/* XXX: set up a watcher */
if
((
result
=
zoo_get
(
zh
,
path
,
0
,
broker
,
&
len
,
NULL
))
!=
ZOK
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Cannot get config for broker id %s: %s"
,
brokers
.
data
[
i
],
zerror
(
result
));
return
errmsg
;
}
if
(
len
>
0
)
{
int
ovector
[
6
],
r
;
const
char
*
host
=
NULL
,
*
port
=
NULL
;
broker
[
len
]
=
'\0'
;
MQ_LOG_Log
(
LOG_DEBUG
,
"Zookeeper broker id %s config: %s"
,
brokers
.
data
[
i
],
broker
);
r
=
pcre_exec
(
host_regex
,
NULL
,
broker
,
len
,
0
,
0
,
ovector
,
6
);
if
(
r
<=
PCRE_ERROR_NOMATCH
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Host not found in config for broker id %s [%s]"
,
brokers
.
data
[
i
],
broker
);
return
errmsg
;
}
pcre_get_substring
(
broker
,
ovector
,
r
,
1
,
&
host
);
AN
(
host
);
r
=
pcre_exec
(
port_regex
,
NULL
,
broker
,
len
,
0
,
0
,
ovector
,
6
);
if
(
r
<=
PCRE_ERROR_NOMATCH
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Port not found in config for broker id %s [%s]"
,
brokers
.
data
[
i
],
broker
);
return
errmsg
;
}
pcre_get_substring
(
broker
,
ovector
,
r
,
1
,
&
port
);
AN
(
port
);
if
(
strlen
(
brokerlist
)
+
strlen
(
host
)
+
strlen
(
port
)
+
2
>
max
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"Broker list length exceeds max %d [%s%s:%s]"
,
max
,
brokerlist
,
host
,
port
);
return
errmsg
;
}
sprintf
(
brokerptr
,
"%s:%s"
,
host
,
port
);
pcre_free_substring
(
host
);
pcre_free_substring
(
port
);
brokerptr
+=
strlen
(
brokerptr
);
if
(
i
<
brokers
.
count
-
1
)
*
brokerptr
++
=
','
;
}
else
MQ_LOG_Log
(
LOG_WARNING
,
"Empty config returned for broker id %s"
,
brokers
.
data
[
i
]);
}
deallocate_String_vector
(
&
brokers
);
return
NULL
;
}
const
char
*
MQ_ZOO_Init
(
char
*
zooservers
,
unsigned
timeout
,
char
*
brokerlist
,
int
max
)
{
/* XXX: set a watcher function; wait for ZOO_CONNECTED_STATE */
errno
=
0
;
zh
=
zookeeper_init
(
zooservers
,
NULL
,
timeout
,
0
,
0
,
0
);
if
(
zh
==
NULL
)
{
snprintf
(
errmsg
,
LINE_MAX
,
"init/connect failure: %s"
,
strerror
(
errno
));
return
errmsg
;
}
return
setBrokerList
(
brokerlist
,
max
);
}
const
char
*
MQ_ZOO_SetLog
(
const
char
*
path
)
{
AN
(
path
);
AN
(
path
[
0
]);
zoologf
=
fopen
(
path
,
"a"
);
if
(
zoologf
==
NULL
)
{
strncpy
(
errmsg
,
strerror
(
errno
),
LINE_MAX
);
return
errmsg
;
}
zoo_set_log_stream
(
zoologf
);
return
NULL
;
}
void
MQ_ZOO_SetLogLevel
(
int
level
)
{
if
(
zh
==
NULL
)
return
;
if
(
zoologf
!=
NULL
)
/* level must be a syslog level */
switch
(
level
)
{
case
LOG_INFO
:
case
LOG_NOTICE
:
zoo_set_debug_level
(
ZOO_LOG_LEVEL_INFO
);
break
;
case
LOG_DEBUG
:
zoo_set_debug_level
(
ZOO_LOG_LEVEL_DEBUG
);
break
;
case
LOG_WARNING
:
zoo_set_debug_level
(
ZOO_LOG_LEVEL_WARN
);
break
;
case
LOG_ERR
:
case
LOG_CRIT
:
case
LOG_ALERT
:
case
LOG_EMERG
:
zoo_set_debug_level
(
ZOO_LOG_LEVEL_ERROR
);
break
;
default:
MQ_LOG_Log
(
LOG_ERR
,
"Unknown log level %d"
,
level
);
AN
(
0
);
break
;
}
}
const
char
*
MQ_ZOO_Fini
(
void
)
{
int
zerr
;
if
(
zh
==
NULL
)
return
NULL
;
errno
=
0
;
if
((
zerr
=
zookeeper_close
(
zh
))
!=
ZOK
)
{
const
char
*
err
=
zerror
(
zerr
);
if
(
zerr
==
ZSYSTEMERROR
)
snprintf
(
errmsg
,
LINE_MAX
,
"%s (%s)"
,
err
,
strerror
(
errno
));
else
strncpy
(
errmsg
,
err
,
LINE_MAX
);
return
errmsg
;
}
if
(
zoologf
!=
NULL
)
fclose
(
zoologf
);
return
NULL
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment