Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
f037540d
Commit
f037540d
authored
Jun 03, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
MQ plugin for Kafka: encapsulate all callbacks
parent
4b573cf5
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
64 additions
and
58 deletions
+64
-58
callback.c
trackrdrd/src/mq/kafka/callback.c
+55
-0
mq.c
trackrdrd/src/mq/kafka/mq.c
+4
-58
mq_kafka.h
trackrdrd/src/mq/kafka/mq_kafka.h
+5
-0
No files found.
trackrdrd/src/mq/kafka/callback.c
View file @
f037540d
...
@@ -37,6 +37,7 @@
...
@@ -37,6 +37,7 @@
#include <syslog.h>
#include <syslog.h>
#include "mq_kafka.h"
#include "mq_kafka.h"
#include "miniobj.h"
/*
/*
* Partitioner assumes that the key string is an unsigned 32-bit
* Partitioner assumes that the key string is an unsigned 32-bit
...
@@ -78,3 +79,57 @@ CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
...
@@ -78,3 +79,57 @@ CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
(
int
)
keylen
,
(
const
char
*
)
keydata
);
(
int
)
keylen
,
(
const
char
*
)
keydata
);
return
partition
;
return
partition
;
}
}
void
CB_Log
(
const
rd_kafka_t
*
rk
,
int
level
,
const
char
*
fac
,
const
char
*
buf
)
{
(
void
)
fac
;
MQ_LOG_Log
(
level
,
"rdkafka %s: %s"
,
rd_kafka_name
(
rk
),
buf
);
}
void
CB_DeliveryReport
(
rd_kafka_t
*
rk
,
void
*
payload
,
size_t
len
,
rd_kafka_resp_err_t
err
,
void
*
opaque
,
void
*
msg_opaque
)
{
(
void
)
msg_opaque
;
kafka_wrk_t
*
wrk
=
(
kafka_wrk_t
*
)
opaque
;
CHECK_OBJ_NOTNULL
(
wrk
,
KAFKA_WRK_MAGIC
);
if
(
err
!=
RD_KAFKA_RESP_ERR_NO_ERROR
)
{
MQ_LOG_Log
(
LOG_ERR
,
"Delivery error %d (client ID = %s, msg = [%.*s]): %s"
,
err
,
rd_kafka_name
(
rk
),
(
int
)
len
,
(
char
*
)
payload
,
rd_kafka_err2str
(
err
));
wrk
->
failed
++
;
}
else
{
wrk
->
delivered
++
;
if
(
loglvl
==
LOG_DEBUG
)
MQ_LOG_Log
(
LOG_DEBUG
,
"Delivered (client ID = %s): msg = [%.*s]"
,
rd_kafka_name
(
rk
),
(
int
)
len
,
(
char
*
)
payload
);
}
}
void
CB_Error
(
rd_kafka_t
*
rk
,
int
err
,
const
char
*
reason
,
void
*
opaque
)
{
(
void
)
opaque
;
MQ_LOG_Log
(
LOG_ERR
,
"Client error (ID = %s) %d: %s"
,
rd_kafka_name
(
rk
),
err
,
reason
);
}
int
CB_Stats
(
rd_kafka_t
*
rk
,
char
*
json
,
size_t
json_len
,
void
*
opaque
)
{
kafka_wrk_t
*
wrk
=
(
kafka_wrk_t
*
)
opaque
;
CHECK_OBJ_NOTNULL
(
wrk
,
KAFKA_WRK_MAGIC
);
MQ_LOG_Log
(
LOG_INFO
,
"rdkafka stats (ID = %s): %.*s"
,
rd_kafka_name
(
rk
),
(
int
)
json_len
,
json
);
MQ_LOG_Log
(
LOG_INFO
,
"mq stats (ID = %s): seen=%u produced=%u delivered=%u failed=%u "
"nokey=%u badkey=%u nodata=%u"
,
rd_kafka_name
(
rk
),
wrk
->
seen
,
wrk
->
produced
,
wrk
->
delivered
,
wrk
->
failed
,
wrk
->
nokey
,
wrk
->
badkey
,
wrk
->
nodata
);
return
0
;
}
trackrdrd/src/mq/kafka/mq.c
View file @
f037540d
...
@@ -105,60 +105,6 @@ toggle_debug(int sig)
...
@@ -105,60 +105,6 @@ toggle_debug(int sig)
}
}
}
}
static
void
log_cb
(
const
rd_kafka_t
*
rk
,
int
level
,
const
char
*
fac
,
const
char
*
buf
)
{
(
void
)
fac
;
MQ_LOG_Log
(
level
,
"rdkafka %s: %s"
,
rd_kafka_name
(
rk
),
buf
);
}
static
void
dr_cb
(
rd_kafka_t
*
rk
,
void
*
payload
,
size_t
len
,
rd_kafka_resp_err_t
err
,
void
*
opaque
,
void
*
msg_opaque
)
{
(
void
)
msg_opaque
;
kafka_wrk_t
*
wrk
=
(
kafka_wrk_t
*
)
opaque
;
CHECK_OBJ_NOTNULL
(
wrk
,
KAFKA_WRK_MAGIC
);
if
(
err
!=
RD_KAFKA_RESP_ERR_NO_ERROR
)
{
MQ_LOG_Log
(
LOG_ERR
,
"Delivery error %d (client ID = %s, msg = [%.*s]): %s"
,
err
,
rd_kafka_name
(
rk
),
(
int
)
len
,
(
char
*
)
payload
,
rd_kafka_err2str
(
err
));
wrk
->
failed
++
;
}
else
{
wrk
->
delivered
++
;
if
(
loglvl
==
LOG_DEBUG
)
MQ_LOG_Log
(
LOG_DEBUG
,
"Delivered (client ID = %s): msg = [%.*s]"
,
rd_kafka_name
(
rk
),
(
int
)
len
,
(
char
*
)
payload
);
}
}
static
void
error_cb
(
rd_kafka_t
*
rk
,
int
err
,
const
char
*
reason
,
void
*
opaque
)
{
(
void
)
opaque
;
MQ_LOG_Log
(
LOG_ERR
,
"Client error (ID = %s) %d: %s"
,
rd_kafka_name
(
rk
),
err
,
reason
);
}
static
int
stats_cb
(
rd_kafka_t
*
rk
,
char
*
json
,
size_t
json_len
,
void
*
opaque
)
{
kafka_wrk_t
*
wrk
=
(
kafka_wrk_t
*
)
opaque
;
CHECK_OBJ_NOTNULL
(
wrk
,
KAFKA_WRK_MAGIC
);
MQ_LOG_Log
(
LOG_INFO
,
"rdkafka stats (ID = %s): %.*s"
,
rd_kafka_name
(
rk
),
(
int
)
json_len
,
json
);
MQ_LOG_Log
(
LOG_INFO
,
"mq stats (ID = %s): seen=%u produced=%u delivered=%u failed=%u "
"nokey=%u badkey=%u nodata=%u"
,
rd_kafka_name
(
rk
),
wrk
->
seen
,
wrk
->
produced
,
wrk
->
delivered
,
wrk
->
failed
,
wrk
->
nokey
,
wrk
->
badkey
,
wrk
->
nodata
);
return
0
;
}
static
int
static
int
conf_add
(
const
char
*
lval
,
const
char
*
rval
)
conf_add
(
const
char
*
lval
,
const
char
*
rval
)
{
{
...
@@ -328,10 +274,10 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
...
@@ -328,10 +274,10 @@ MQ_GlobalInit(unsigned nworkers, const char *config_fname)
}
}
}
}
rd_kafka_conf_set_dr_cb
(
conf
,
dr_cb
);
rd_kafka_conf_set_dr_cb
(
conf
,
CB_DeliveryReport
);
rd_kafka_conf_set_error_cb
(
conf
,
error_cb
);
rd_kafka_conf_set_error_cb
(
conf
,
CB_Error
);
rd_kafka_conf_set_log_cb
(
conf
,
log_cb
);
rd_kafka_conf_set_log_cb
(
conf
,
CB_Log
);
rd_kafka_conf_set_stats_cb
(
conf
,
stats_cb
);
rd_kafka_conf_set_stats_cb
(
conf
,
CB_Stats
);
rd_kafka_topic_conf_set_partitioner_cb
(
topic_conf
,
CB_Partitioner
);
rd_kafka_topic_conf_set_partitioner_cb
(
topic_conf
,
CB_Partitioner
);
if
(
loglvl
==
LOG_DEBUG
)
{
if
(
loglvl
==
LOG_DEBUG
)
{
...
...
trackrdrd/src/mq/kafka/mq_kafka.h
View file @
f037540d
...
@@ -87,3 +87,8 @@ void WRK_Fini(kafka_wrk_t *wrk);
...
@@ -87,3 +87,8 @@ void WRK_Fini(kafka_wrk_t *wrk);
int32_t
CB_Partitioner
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
int32_t
CB_Partitioner
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
,
void
*
rkt_opaque
,
size_t
keylen
,
int32_t
partition_cnt
,
void
*
rkt_opaque
,
void
*
msg_opaque
);
void
*
msg_opaque
);
void
CB_Log
(
const
rd_kafka_t
*
rk
,
int
level
,
const
char
*
fac
,
const
char
*
buf
);
void
CB_DeliveryReport
(
rd_kafka_t
*
rk
,
void
*
payload
,
size_t
len
,
rd_kafka_resp_err_t
err
,
void
*
opaque
,
void
*
msg_opaque
);
void
CB_Error
(
rd_kafka_t
*
rk
,
int
err
,
const
char
*
reason
,
void
*
opaque
);
int
CB_Stats
(
rd_kafka_t
*
rk
,
char
*
json
,
size_t
json_len
,
void
*
opaque
);
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment