Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
c24a4d46
Commit
c24a4d46
authored
Jun 16, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
MQ plugin for Kafka: test the partitioner function
parent
888e7ca9
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
118 additions
and
14 deletions
+118
-14
callback.c
trackrdrd/src/mq/kafka/callback.c
+30
-12
mq_kafka.h
trackrdrd/src/mq/kafka/mq_kafka.h
+2
-0
Makefile.am
trackrdrd/src/mq/kafka/test/Makefile.am
+12
-2
test_partition.c
trackrdrd/src/mq/kafka/test/test_partition.c
+74
-0
No files found.
trackrdrd/src/mq/kafka/callback.c
View file @
c24a4d46
...
@@ -35,6 +35,7 @@
...
@@ -35,6 +35,7 @@
#include <stdlib.h>
#include <stdlib.h>
#include <errno.h>
#include <errno.h>
#include <syslog.h>
#include <syslog.h>
#include <stdint.h>
#include "mq_kafka.h"
#include "mq_kafka.h"
#include "miniobj.h"
#include "miniobj.h"
...
@@ -43,15 +44,12 @@
...
@@ -43,15 +44,12 @@
* Partitioner assumes that the key string is an unsigned 32-bit
* Partitioner assumes that the key string is an unsigned 32-bit
* hexadecimal.
* hexadecimal.
*/
*/
int32_t
static
inline
int32_t
CB_Partitioner
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
size_t
keylen
,
get_partition
(
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
)
int32_t
partition_cnt
,
void
*
rkt_opaque
,
void
*
msg_opaque
)
{
{
int32_t
partition
;
int32_t
partition
;
unsigned
long
key
;
unsigned
long
key
;
char
keystr
[
sizeof
(
"ffffffff"
)],
*
endptr
=
NULL
;
char
keystr
[
sizeof
(
"ffffffff"
)],
*
endptr
=
NULL
;
(
void
)
rkt_opaque
;
(
void
)
msg_opaque
;
assert
(
partition_cnt
>
0
);
assert
(
partition_cnt
>
0
);
assert
(
keylen
<=
8
);
assert
(
keylen
<=
8
);
...
@@ -60,23 +58,43 @@ CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
...
@@ -60,23 +58,43 @@ CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
keystr
[
keylen
]
=
'\0'
;
keystr
[
keylen
]
=
'\0'
;
errno
=
0
;
errno
=
0
;
key
=
strtoul
(
keystr
,
&
endptr
,
16
);
key
=
strtoul
(
keystr
,
&
endptr
,
16
);
if
(
errno
!=
0
||
*
endptr
!=
'\0'
||
key
>
0xffffffffUL
)
{
if
(
errno
!=
0
||
*
endptr
!=
'\0'
||
key
<
0
||
key
>
UINT32_MAX
)
MQ_LOG_Log
(
LOG_ERR
,
"Cannot parse partition key: %.*s"
,
(
int
)
keylen
,
return
-
1
;
(
const
char
*
)
keydata
);
return
RD_KAFKA_PARTITION_UA
;
}
if
((
partition_cnt
&
(
partition_cnt
-
1
))
==
0
)
if
((
partition_cnt
&
(
partition_cnt
-
1
))
==
0
)
/* partition_cnt is a power of 2 */
/* partition_cnt is a power of 2 */
partition
=
key
&
(
partition_cnt
-
1
);
partition
=
key
&
(
partition_cnt
-
1
);
else
else
partition
=
key
%
partition_cnt
;
partition
=
key
%
partition_cnt
;
return
partition
;
}
int32_t
TEST_Partition
(
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
)
{
return
get_partition
(
keydata
,
keylen
,
partition_cnt
);
}
int32_t
CB_Partitioner
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
,
void
*
rkt_opaque
,
void
*
msg_opaque
)
{
int32_t
partition
;
(
void
)
rkt_opaque
;
(
void
)
msg_opaque
;
partition
=
get_partition
(
keydata
,
keylen
,
partition_cnt
);
if
(
partition
<
0
)
{
MQ_LOG_Log
(
LOG_ERR
,
"Cannot parse partition key: %.*s"
,
(
int
)
keylen
,
(
const
char
*
)
keydata
);
return
RD_KAFKA_PARTITION_UA
;
}
if
(
!
rd_kafka_topic_partition_available
(
rkt
,
partition
))
{
if
(
!
rd_kafka_topic_partition_available
(
rkt
,
partition
))
{
MQ_LOG_Log
(
LOG_ERR
,
"Partition %d not available"
,
partition
);
MQ_LOG_Log
(
LOG_ERR
,
"Partition %d not available"
,
partition
);
return
RD_KAFKA_PARTITION_UA
;
return
RD_KAFKA_PARTITION_UA
;
}
}
MQ_LOG_Log
(
LOG_DEBUG
,
"Computed partition %d for key %.*s"
,
partition
,
MQ_LOG_Log
(
LOG_DEBUG
,
(
int
)
keylen
,
(
const
char
*
)
keydata
);
"Computed partition %d for key %.*s (%d partitions)"
,
partition
,
(
int
)
keylen
,
(
const
char
*
)
keydata
,
partition_cnt
);
return
partition
;
return
partition
;
}
}
...
...
trackrdrd/src/mq/kafka/mq_kafka.h
View file @
c24a4d46
...
@@ -95,6 +95,8 @@ void WRK_Fini(kafka_wrk_t *wrk);
...
@@ -95,6 +95,8 @@ void WRK_Fini(kafka_wrk_t *wrk);
int32_t
CB_Partitioner
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
int32_t
CB_Partitioner
(
const
rd_kafka_topic_t
*
rkt
,
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
,
void
*
rkt_opaque
,
size_t
keylen
,
int32_t
partition_cnt
,
void
*
rkt_opaque
,
void
*
msg_opaque
);
void
*
msg_opaque
);
int32_t
TEST_Partition
(
const
void
*
keydata
,
size_t
keylen
,
int32_t
partition_cnt
);
void
CB_Log
(
const
rd_kafka_t
*
rk
,
int
level
,
const
char
*
fac
,
const
char
*
buf
);
void
CB_Log
(
const
rd_kafka_t
*
rk
,
int
level
,
const
char
*
fac
,
const
char
*
buf
);
void
CB_DeliveryReport
(
rd_kafka_t
*
rk
,
void
*
payload
,
size_t
len
,
void
CB_DeliveryReport
(
rd_kafka_t
*
rk
,
void
*
payload
,
size_t
len
,
rd_kafka_resp_err_t
err
,
void
*
opaque
,
void
*
msg_opaque
);
rd_kafka_resp_err_t
err
,
void
*
opaque
,
void
*
msg_opaque
);
...
...
trackrdrd/src/mq/kafka/test/Makefile.am
View file @
c24a4d46
INCLUDES
=
-I
$(top_srcdir)
/include
INCLUDES
=
-I
$(top_srcdir)
/include
TESTS
=
test_kafka
TESTS
=
test_
partition test_
kafka
check_PROGRAMS
=
test_kafka
check_PROGRAMS
=
test_partition test_kafka
test_partition_SOURCES
=
\
../../minunit.h
\
../mq_kafka.h
\
test_partition.c
test_partition_LDADD
=
\
../callback.
$(OBJEXT)
\
../log.
$(OBJEXT)
\
-lrdkafka
test_kafka_SOURCES
=
\
test_kafka_SOURCES
=
\
../../minunit.h
\
../../minunit.h
\
...
...
trackrdrd/src/mq/kafka/test/test_partition.c
0 → 100644
View file @
c24a4d46
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdint.h>
#include "../mq_kafka.h"
#include "../../../test/minunit.h"
int
tests_run
=
0
;
static
char
*
test_partitioner
(
void
)
{
int32_t
partition
;
printf
(
"... testing partitioner function
\n
"
);
partition
=
TEST_Partition
((
const
void
*
)
"5ff1b68d"
,
8
,
4
);
VMASSERT
(
partition
==
1
,
"key 5ff1b68d, expected 1, got %d"
,
partition
);
partition
=
TEST_Partition
((
const
void
*
)
"5f9f78d5"
,
8
,
4
);
VMASSERT
(
partition
==
1
,
"key 5f9f78d5, expected 1, got %d"
,
partition
);
partition
=
TEST_Partition
((
const
void
*
)
"7c735b38"
,
8
,
4
);
VMASSERT
(
partition
==
0
,
"key 7c735b38, expected 1, got %d"
,
partition
);
partition
=
TEST_Partition
((
const
void
*
)
"80ffd2a7"
,
8
,
4
);
VMASSERT
(
partition
==
3
,
"key 80ffd2a7, expected 1, got %d"
,
partition
);
partition
=
TEST_Partition
((
const
void
*
)
"a3d0b3e9"
,
8
,
4
);
VMASSERT
(
partition
==
1
,
"key a3d0b3e9, expected 1, got %d"
,
partition
);
partition
=
TEST_Partition
((
const
void
*
)
"c923ca00"
,
8
,
4
);
VMASSERT
(
partition
==
0
,
"key c923ca00, expected 0, got %d"
,
partition
);
return
NULL
;
}
static
const
char
*
all_tests
(
void
)
{
mu_run_test
(
test_partitioner
);
return
NULL
;
}
TEST_RUNNER
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment