Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
b58f3d1b
Commit
b58f3d1b
authored
Dec 09, 2015
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add the log_error_data config parameter, based on a patch from
Thilo Keber (thx Thilo)
parent
5be64670
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
61 additions
and
12 deletions
+61
-12
README.rst
src/mq/kafka/README.rst
+7
-0
config.c
src/mq/kafka/config.c
+22
-2
mq.c
src/mq/kafka/mq.c
+27
-9
mq_kafka.h
src/mq/kafka/mq_kafka.h
+3
-0
kafka.conf
src/mq/kafka/test/kafka.conf
+2
-1
No files found.
src/mq/kafka/README.rst
View file @
b58f3d1b
...
...
@@ -174,6 +174,13 @@ Parameter Description
indefinitely for message delivery, but don't
wait for rdkafka finalization. (optional,
default 1000 ms)
----------------------------------- --------------------------------------------
``log_error_data`` Boolean. If false, only the error message is
logged for missing or illegal shard keys, or
missing message payloads. If true, the
offending message is also logged (an empty
field in the case of the missing payload).
(optional, default false)
=================================== ============================================
Except as noted below, the configuration can specify any parameters for
...
...
src/mq/kafka/config.c
View file @
b58f3d1b
...
...
@@ -35,6 +35,7 @@
#include <errno.h>
#include <syslog.h>
#include <stdlib.h>
#include <stdbool.h>
#include "mq_kafka.h"
...
...
@@ -70,6 +71,7 @@ CONF_Init(void)
zoolog
[
0
]
=
'\0'
;
brokerlist
[
0
]
=
'\0'
;
wrk_shutdown_timeout
=
1000
;
log_error_data
=
false
;
}
int
...
...
@@ -134,6 +136,23 @@ CONF_Add(const char *lval, const char *rval)
return
EINVAL
;
return
(
0
);
}
if
(
strcmp
(
lval
,
"log_error_data"
)
==
0
)
{
if
(
strcasecmp
(
rval
,
"true"
)
==
0
||
strcasecmp
(
rval
,
"on"
)
==
0
||
strcasecmp
(
rval
,
"yes"
)
==
0
||
strcmp
(
rval
,
"1"
)
==
0
)
{
log_error_data
=
true
;
return
(
0
);
}
if
(
strcasecmp
(
rval
,
"false"
)
==
0
||
strcasecmp
(
rval
,
"off"
)
==
0
||
strcasecmp
(
rval
,
"no"
)
==
0
||
strcmp
(
rval
,
"0"
)
==
0
)
{
log_error_data
=
false
;
return
(
0
);
}
return
(
EINVAL
);
}
result
=
rd_kafka_topic_conf_set
(
topic_conf
,
lval
,
rval
,
errstr
,
LINE_MAX
);
if
(
result
==
RD_KAFKA_CONF_UNKNOWN
)
...
...
@@ -152,6 +171,7 @@ CONF_Dump(void)
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.timeout = %u"
,
zoo_timeout
);
MQ_LOG_Log
(
LOG_DEBUG
,
"zookeeper.log = %s"
,
zoolog
);
MQ_LOG_Log
(
LOG_DEBUG
,
"topic = %s"
,
topic
);
MQ_LOG_Log
(
LOG_DEBUG
,
"worker.shutdown.timeout.ms = %u"
,
wrk_shutdown_timeout
);
MQ_LOG_Log
(
LOG_DEBUG
,
"worker.shutdown.timeout.ms = %u"
,
wrk_shutdown_timeout
);
MQ_LOG_Log
(
LOG_DEBUG
,
"log_error_data = %s"
,
log_error_data
?
"true"
:
"false"
);
}
src/mq/kafka/mq.c
View file @
b58f3d1b
...
...
@@ -293,9 +293,14 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
if
(
key
==
NULL
||
keylen
==
0
)
{
snprintf
(
wrk
->
errmsg
,
LINE_MAX
,
"%s message shard key is missing"
,
rd_kafka_name
(
wrk
->
kafka
));
if
(
log_error_data
)
{
MQ_LOG_Log
(
LOG_ERR
,
"%s: data=[%.*s] key="
,
wrk
->
errmsg
,
len
,
data
);
}
else
{
MQ_LOG_Log
(
LOG_ERR
,
wrk
->
errmsg
);
MQ_LOG_Log
(
LOG_DEBUG
,
"%s data=[%.*s] key="
,
rd_kafka_name
(
wrk
->
kafka
),
len
,
data
);
MQ_LOG_Log
(
LOG_DEBUG
,
"%s data=[%.*s] key="
,
rd_kafka_name
(
wrk
->
kafka
),
len
,
data
);
}
wrk
->
nokey
++
;
*
error
=
wrk
->
errmsg
;
return
1
;
...
...
@@ -303,9 +308,15 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
if
(
data
==
NULL
)
{
snprintf
(
wrk
->
errmsg
,
LINE_MAX
,
"%s message payload is NULL"
,
rd_kafka_name
(
wrk
->
kafka
));
MQ_LOG_Log
(
LOG_DEBUG
,
"%s data= key=[%.*s]"
,
rd_kafka_name
(
wrk
->
kafka
),
keylen
,
key
);
if
(
log_error_data
)
{
MQ_LOG_Log
(
LOG_ERR
,
"%s: data= key=[%.*s]"
,
wrk
->
errmsg
,
keylen
,
key
);
}
else
{
MQ_LOG_Log
(
LOG_ERR
,
wrk
->
errmsg
);
MQ_LOG_Log
(
LOG_DEBUG
,
"%s data= key=[%.*s]"
,
rd_kafka_name
(
wrk
->
kafka
),
keylen
,
key
);
}
wrk
->
nodata
++
;
*
error
=
wrk
->
errmsg
;
return
1
;
...
...
@@ -317,9 +328,16 @@ MQ_Send(void *priv, const char *data, unsigned len, const char *key,
if
(
!
isxdigit
(
key
[
i
]))
{
snprintf
(
wrk
->
errmsg
,
LINE_MAX
,
"%s message shard key is not hex"
,
rd_kafka_name
(
wrk
->
kafka
));
if
(
log_error_data
)
{
MQ_LOG_Log
(
LOG_ERR
,
"%s: data=[%.*s] key=[%.*s]"
,
wrk
->
errmsg
,
len
,
data
,
keylen
,
key
);
}
else
{
MQ_LOG_Log
(
LOG_ERR
,
wrk
->
errmsg
);
MQ_LOG_Log
(
LOG_DEBUG
,
"%s data=[%.*s] key=[%.*s]"
,
rd_kafka_name
(
wrk
->
kafka
),
len
,
data
,
keylen
,
key
);
}
*
error
=
wrk
->
errmsg
;
wrk
->
badkey
++
;
return
1
;
...
...
src/mq/kafka/mq_kafka.h
View file @
b58f3d1b
...
...
@@ -34,6 +34,8 @@
#include <librdkafka/rdkafka.h>
#include <syslog.h>
#define AZ(foo) do { assert((foo) == 0); } while (0)
#define AN(foo) do { assert((foo) != 0); } while (0)
...
...
@@ -66,6 +68,7 @@ char zoolog[PATH_MAX];
unsigned
zoo_timeout
;
unsigned
stats_interval
;
unsigned
wrk_shutdown_timeout
;
unsigned
log_error_data
;
rd_kafka_topic_conf_t
*
topic_conf
;
rd_kafka_conf_t
*
conf
;
...
...
src/mq/kafka/test/kafka.conf
View file @
b58f3d1b
...
...
@@ -4,5 +4,6 @@ zookeeper.connect = localhost:2181
zookeeper
.
connection
.
timeout
.
ms
=
10000
zookeeper
.
log
=
zoo
.
log
topic
=
libtrackrdr_kafka_test
log_level
=
7
log_level
=
3
debug
=
all
log_error_data
=
true
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment