Commit f784f4c3 authored by Geoff Simmons's avatar Geoff Simmons

Add a test program for the MQ/Kafka interface using SSL.

parent 52d08ab8
......@@ -55,6 +55,7 @@ src/mq/activemq/test/test_activemq
src/mq/kafka/test/test_kafka
src/mq/kafka/test/test_partition
src/mq/kafka/test/test_send
src/mq/kafka/test/test_send_ssl
# Test artifacts
src/test/stderr.txt
......@@ -66,6 +67,7 @@ src/test/trackrdrd_001.conf.new
src/test/trackrdrd_002.conf.new
src/test/trackrdrd_003.conf.new
src/mq/kafka/test/kafka.log
src/mq/kafka/test/kafka_ssl.log
src/mq/kafka/test/zoo.log
# make check artifacts
......
......@@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/include
TESTS = test_partition test_kafka
check_PROGRAMS = test_partition test_kafka test_send
check_PROGRAMS = test_partition test_kafka test_send test_send_ssl
test_partition_SOURCES = \
../../minunit.h \
......@@ -48,4 +48,21 @@ test_send_LDADD = \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
test_send_ssl_SOURCES = \
../../minunit.h \
../../../../include/mq.h \
test_send_ssl.c
test_send_ssl_LDADD = \
../../../config_common.$(OBJEXT) \
../mq.$(OBJEXT) \
../log.$(OBJEXT) \
../monitor.$(OBJEXT) \
../zookeeper.$(OBJEXT) \
../worker.$(OBJEXT) \
../callback.$(OBJEXT) \
../config.$(OBJEXT) \
${PTHREAD_LIBS} \
-lrdkafka -lz -lpthread -lrt -lzookeeper_mt -lpcre
CLEANFILES = kafka.log zoo.log *~
# test config for Kafka MQ plugin using SSL
mq.log = kafka_ssl.log
metadata.broker.list = localhost:9092
topic = tracking
log_level = 3
debug = all
log_error_data = true
# SSL config (s. https://kafka.apache.org/documentation/#security_ssl)
security.protocol=ssl
ssl.key.location=/path/to/client-cert-key
ssl.certificate.location=/path/to/client-cert
ssl.ca.location=/path/to/ca-cert
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdio.h>
#include <string.h>
#include "mq.h"
#include "../../../test/minunit.h"
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define KAFKA_CONFIG "kafka_ssl.conf"
#define NWORKERS 20
#define DATAMAX 8192
#define IDLEN 96
#define MAXLEN (DATAMAX + IDLEN + 1)
int tests_run = 0;
void *worker[NWORKERS];
static char
*test_send(void)
{
const char *err;
char *line;
size_t n = MAXLEN;
int wrk_num = 0, i, ret;
printf("... testing message send from stdin\n");
line = malloc(MAXLEN);
MAN(line);
err = MQ_GlobalInit(NWORKERS, KAFKA_CONFIG);
VMASSERT(err == NULL, "MQ_GlobalInit: %s", err);
err = MQ_InitConnections();
if (err != NULL && strstr(err, "connection loss") != NULL) {
printf("No connection, Kafka/Zookeeper assumed not running\n");
exit(EXIT_SKIPPED);
}
VMASSERT(err == NULL, "MQ_InitConnections: %s", err);
for (i = 0; i < NWORKERS; i++) {
err = MQ_WorkerInit(&worker[i], i + 1);
VMASSERT(err == NULL, "MQ_WorkerInit: %s", err);
MASSERT0(worker[i] != NULL, "Worker is NULL after MQ_WorkerInit");
}
while (getline(&line, &n, stdin) > 1) {
char *key, *data;
key = strtok(line, ":");
MAN(key);
data = strtok(NULL, ":");
MAN(data);
VMASSERT(worker[wrk_num] != NULL,
"MQ_Send: worker %d is NULL before call", wrk_num + 1);
ret = MQ_Send(worker[wrk_num], data, strlen(data) - 1, key, 8,
&err);
VMASSERT(ret == 0, "MQ_Send: %s", err);
wrk_num++;
wrk_num %= NWORKERS;
MASSERT(wrk_num < NWORKERS);
}
for (i = 0; i < NWORKERS; i++) {
MASSERT0(worker[i] != NULL, "worker is NULL before shutdown");
err = MQ_WorkerShutdown(&worker[i], i + 1);
VMASSERT(err == NULL, "MQ_WorkerShutdown: %s", err);
MASSERT0(worker[i] == NULL, "Worker not NULL after shutdown");
}
err = MQ_GlobalShutdown();
VMASSERT(err == NULL, "MQ_GlobalShutdown: %s", err);
return NULL;
}
static const char
*all_tests(void)
{
mu_run_test(test_send);
return NULL;
}
TEST_RUNNER
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment