Commit c5118848 authored by Geoff Simmons's avatar Geoff Simmons

MQ plugin for Kafka: test the partitioner function

parent a15d6ab9
......@@ -35,6 +35,7 @@
#include <stdlib.h>
#include <errno.h>
#include <syslog.h>
#include <stdint.h>
#include "mq_kafka.h"
#include "miniobj.h"
......@@ -43,15 +44,12 @@
* Partitioner assumes that the key string is an unsigned 32-bit
* hexadecimal.
*/
int32_t
CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
int32_t partition_cnt, void *rkt_opaque, void *msg_opaque)
static inline int32_t
get_partition(const void *keydata, size_t keylen, int32_t partition_cnt)
{
int32_t partition;
unsigned long key;
char keystr[sizeof("ffffffff")], *endptr = NULL;
(void) rkt_opaque;
(void) msg_opaque;
assert(partition_cnt > 0);
assert(keylen <= 8);
......@@ -60,23 +58,43 @@ CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
keystr[keylen] = '\0';
errno = 0;
key = strtoul(keystr, &endptr, 16);
if (errno != 0 || *endptr != '\0' || key > 0xffffffffUL) {
MQ_LOG_Log(LOG_ERR, "Cannot parse partition key: %.*s", (int) keylen,
(const char *) keydata);
return RD_KAFKA_PARTITION_UA;
}
if (errno != 0 || *endptr != '\0' || key < 0 || key > UINT32_MAX)
return -1;
if ((partition_cnt & (partition_cnt - 1)) == 0)
/* partition_cnt is a power of 2 */
partition = key & (partition_cnt - 1);
else
partition = key % partition_cnt;
return partition;
}
int32_t
TEST_Partition(const void *keydata, size_t keylen, int32_t partition_cnt)
{
return get_partition(keydata, keylen, partition_cnt);
}
int32_t
CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen,
int32_t partition_cnt, void *rkt_opaque, void *msg_opaque)
{
int32_t partition;
(void) rkt_opaque;
(void) msg_opaque;
partition = get_partition(keydata, keylen, partition_cnt);
if (partition < 0) {
MQ_LOG_Log(LOG_ERR, "Cannot parse partition key: %.*s", (int) keylen,
(const char *) keydata);
return RD_KAFKA_PARTITION_UA;
}
if (! rd_kafka_topic_partition_available(rkt, partition)) {
MQ_LOG_Log(LOG_ERR, "Partition %d not available", partition);
return RD_KAFKA_PARTITION_UA;
}
MQ_LOG_Log(LOG_DEBUG, "Computed partition %d for key %.*s", partition,
(int) keylen, (const char *) keydata);
MQ_LOG_Log(LOG_DEBUG,
"Computed partition %d for key %.*s (%d partitions)",
partition, (int) keylen, (const char *) keydata, partition_cnt);
return partition;
}
......
......@@ -95,6 +95,8 @@ void WRK_Fini(kafka_wrk_t *wrk);
int32_t CB_Partitioner(const rd_kafka_topic_t *rkt, const void *keydata,
size_t keylen, int32_t partition_cnt, void *rkt_opaque,
void *msg_opaque);
int32_t TEST_Partition(const void *keydata, size_t keylen,
int32_t partition_cnt);
void CB_Log(const rd_kafka_t *rk, int level, const char *fac, const char *buf);
void CB_DeliveryReport(rd_kafka_t *rk, void *payload, size_t len,
rd_kafka_resp_err_t err, void *opaque, void *msg_opaque);
......
INCLUDES = -I$(top_srcdir)/include
TESTS = test_kafka
TESTS = test_partition test_kafka
check_PROGRAMS = test_kafka
check_PROGRAMS = test_partition test_kafka
test_partition_SOURCES = \
../../minunit.h \
../mq_kafka.h \
test_partition.c
test_partition_LDADD = \
../callback.$(OBJEXT) \
../log.$(OBJEXT) \
-lrdkafka
test_kafka_SOURCES = \
../../minunit.h \
......
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdint.h>
#include "../mq_kafka.h"
#include "../../../test/minunit.h"
int tests_run = 0;
static char
*test_partitioner(void)
{
int32_t partition;
printf("... testing partitioner function\n");
partition = TEST_Partition((const void *) "5ff1b68d", 8, 4);
VMASSERT(partition == 1, "key 5ff1b68d, expected 1, got %d", partition);
partition = TEST_Partition((const void *) "5f9f78d5", 8, 4);
VMASSERT(partition == 1, "key 5f9f78d5, expected 1, got %d", partition);
partition = TEST_Partition((const void *) "7c735b38", 8, 4);
VMASSERT(partition == 0, "key 7c735b38, expected 1, got %d", partition);
partition = TEST_Partition((const void *) "80ffd2a7", 8, 4);
VMASSERT(partition == 3, "key 80ffd2a7, expected 1, got %d", partition);
partition = TEST_Partition((const void *) "a3d0b3e9", 8, 4);
VMASSERT(partition == 1, "key a3d0b3e9, expected 1, got %d", partition);
partition = TEST_Partition((const void *) "c923ca00", 8, 4);
VMASSERT(partition == 0, "key c923ca00, expected 0, got %d", partition);
return NULL;
}
static const char
*all_tests(void)
{
mu_run_test(test_partitioner);
return NULL;
}
TEST_RUNNER
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment