Commit d49b7092 authored by Geoff Simmons's avatar Geoff Simmons

Add a file implementation for the MQ interface, and use it for the

general 'make check' tests, to test without a message broker running.
Also add --enable/--disable options to configure to optionally build
the MQ implementations.
parent fb0bf89d
......@@ -59,6 +59,7 @@ src/mq/kafka/test/test_send
# Test artifacts
src/test/stderr.txt
src/test/testing.log
src/test/mq_test.log
src/test/trackrdrd.pid
src/test/trackrdrd_001.conf.new
src/test/trackrdrd_002.conf.new
......
......@@ -8,7 +8,7 @@ if KAFKA_OPT
MAYBE_KAFKA = src/mq/kafka src/mq/kafka/test
endif
SUBDIRS = src src/test $(MAYBE_AMQ) $(MAYBE_KAFKA)
SUBDIRS = src src/mq/file src/test $(MAYBE_AMQ) $(MAYBE_KAFKA)
if HAVE_RST2MAN
dist_man_MANS = trackrdrd.3
......
......@@ -9,7 +9,7 @@ Tracking Log Reader demon
-------------------------
:Author: Geoffrey Simmons
:Date: 2015-04-26
:Date: 2015-04-27
:Version: 3.0
:Manual section: 3
......@@ -155,9 +155,11 @@ builds and their dependencies, see libtrackrdr-kafka(3) and
libtrackrdr-activemq(3) (``README.rst`` in ``src/mq/kafka`` and
``src/mq/activemq``).
Since the global make targets for ``trackrdrd`` also build the MQ
implementations, it is necessary to configure the build for them as
well, for example by setting ``CXXFLAGS`` to compile C++ sources.
The global make targets for ``trackrdrd`` also build the MQ
implementations, unless their builds are disabled in the ``configure``
step as explained below. If they are enabled, then it is necessary to
configure the build for them as well, for example by setting
``CXXFLAGS`` to compile C++ sources.
Building and installing trackrdrd
---------------------------------
......@@ -205,8 +207,12 @@ be shown with::
$ configure --help
For example, to specify a non-standard installation prefix, add the
``--prefix`` option::
To disable the build of the Kafka or ActiveMQ MQ implementations,
specify the options ``--disable-kafka`` or ``disable-activemq`` for
``configure``. Both are enabled by default.
To specify a non-standard installation prefix, add the ``--prefix``
option::
$ CFLAGS=-m64 CXXFLAGS=-m64 ./configure \\
VARNISHSRC=/path/to/varnish_build \\
......
......@@ -153,6 +153,7 @@ DX_INIT_DOXYGEN(trackrdrd, doxygen.cfg, doc)
AC_CONFIG_FILES([
Makefile
src/Makefile
src/mq/file/Makefile
src/test/Makefile
src/mq/activemq/Makefile
src/mq/activemq/test/Makefile
......
......@@ -37,7 +37,7 @@
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define AMQ_CONFIG "../../../test/activemq.conf"
#define AMQ_CONFIG "activemq.conf"
#define NWORKERS 1
int tests_run = 0;
......
INCLUDES = -I$(top_srcdir)/include
CURRENT = 1
REVISION = 0
AGE = 0
pkglib_LTLIBRARIES = libtrackrdr-file.la
libtrackrdr_file_la_SOURCES = \
$(top_srcdir)/include/mq.h \
$(top_srcdir)/include/config_common.h \
$(top_srcdir)/include/miniobj.h \
$(top_srcdir)/src/config_common.c \
mq.c
libtrackrdr_file_la_LDFLAGS = -version-info ${CURRENT}:${REVISION}:${AGE}
libtrackrdr_file_la_CFLAGS = \
-DCURRENT=${CURRENT} \
-DREVISION=${REVISION} \
-DAGE=${AGE}
if HAVE_RST2MAN
dist_man_MANS = libtrackrdr-file.3
MAINTAINERCLEANFILES = $(dist_man_MANS)
endif
libtrackrdr-file.3: README.rst
if HAVE_RST2MAN
${RST2MAN} README.rst $@
endif
CLEANFILES = *~
.. _ref-trackrdrd:
=================
libtrackrdr-file
=================
-------------------------------------------------------------------
File implementation of the MQ interface for the Tracking Log Reader
-------------------------------------------------------------------
:Author: Geoffrey Simmons
:Date: 2015-04-27
:Version: 1.0.0
:Manual section: 3
DESCRIPTION
===========
``libtrackrdr-file.so`` provides an implementation of the tracking
reader's MQ interface to write messages to a file, as an aid to
testing and debugging. See ``include/mq.h`` in the ``trackrdrd``
source distribution for documentation of the interface.
To use this implementation with ``trackrdrd``, specify the shared
object as the value of ``mq.module`` in the tracking reader's
configuration (see trackrdrd(3)). The configuration value may be the
absolute path of the shared object; or its name, provided that it can
be found by the dynamic linker (see ld.so(8)).
``libtrackrdr-file`` also requires a configuration file, whose path is
specified as ``mq.config_fname`` in the configuration of
``trackrdrd``.
BUILD/INSTALL
=============
The sources for ``libtrackrdr-file`` are provided in the source
repository for ``trackrdrd``, in the subdirectory ``src/mq/file/``.
``libtrackrdr-file`` is built as part of the global build for
``trackrdrd``; for details and requirements of the build, see
trackrdrd(3).
To specifically build the MQ implementation (without building all of
the rest of ``trackrdrd``), it suffices to invoke ``make`` commands in
the subdirectory ``src/mq/file`` (after having executed the
``configure`` script for ``trackrdrd``)::
# in the trackrdrd repo
$ cd src/mq/file
$ make
The global ``make`` command for ``trackrdrd`` also executes both of
these for the file plugin.
To install the shared object ``libtrackrdr-file.so``, run ``make
install`` as root, for example with ``sudo``::
$ sudo make install
In standard configuration, the ``.so`` file will be installed by
``libtool(1)``, and its location may be affected by the ``--libdir``
option to ``configure``.
CONFIGURATION
=============
As mentioned above, a configuration file for ``libtrackrdr-file``
MUST be specified in the configuration parameter ``mq.config_fname``
for ``trackrdrd``, and initialization of the MQ implementation fails
if this file cannot be found or read by the process owner of
``trackrdrd`` (or if its syntax is false, or if required parameters
are missing).
The syntax of the configuration file is the same as that of
``trackrdrd``.
These parameters can be specified:
=================================== ============================================
Parameter Description
=================================== ============================================
``output.file`` The file to which messages are written.
If exactly equal to ``-``, then messages
are written to stdout.
----------------------------------- --------------------------------------------
``append`` If true, then the output file is appended
to, otherwise it is overwritten. True by
default. This parameter has no effect if
output is written to stdout.
=================================== ============================================
The parameter ``output.file`` MUST be set.
SEE ALSO
========
* ``trackrdrd(3)``
* ``ld.so(8)``
COPYRIGHT AND LICENCE
=====================
Both the software and this document are governed by a BSD 2-clause
licence.
| Copyright (c) 2015 UPLEX Nils Goroll Systemoptimierung
| Copyright (c) 2015 Otto Gmbh & Co KG
| All rights reserved
| Use only with permission
| Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <limits.h>
#include <stdlib.h>
#include <assert.h>
#include "mq.h"
#include "config_common.h"
#include "miniobj.h"
#define xstr(X) #X
#define str(X) xstr(X)
#if defined(CURRENT) && defined(REVISION) && defined(AGE)
#define SO_VERSION (str(CURRENT) "." str(REVISION) "." str(AGE))
#elif defined(VERSION)
#define SO_VERSION VERSION
#else
#define SO_VERSION "unknown version"
#endif
typedef struct wrk_s {
unsigned magic;
#define FILE_WRK_MAGIC 0x50bff5f0
int n;
char errmsg[LINE_MAX];
} wrk_t;
static FILE *out;
static int append = 1;
static unsigned nwrk;
static wrk_t *workers;
static char fname[PATH_MAX + 1] = "";
static char errmsg[LINE_MAX];
static char _version[LINE_MAX];
static int
conf_add(const char *lval, const char *rval)
{
if (strcmp(lval, "output.file") == 0) {
strncpy(fname, rval, PATH_MAX);
return 0;
}
if (strcmp(lval, "append") == 0) {
if (strcasecmp(rval, "yes") == 0
|| strcasecmp(rval, "true") == 0
|| strcmp(rval, "1") == 0) {
append = 1;
return 0;
}
else if (strcasecmp(rval, "no") == 0
|| strcasecmp(rval, "false") == 0
|| strcmp(rval, "0") == 0) {
append = 0;
return 0;
}
return EINVAL;
}
return EINVAL;
}
const char *
MQ_GlobalInit(unsigned nworkers, const char *config_fname)
{
int errnum;
nwrk = nworkers;
if ((errnum = CONF_ReadFile(config_fname, conf_add)) != 0) {
snprintf(errmsg, LINE_MAX,
"Error reading config file %s for the file plugin: %s",
config_fname, strerror(errnum));
return errmsg;
}
if (strcmp(fname, "-") == 0)
out = stdout;
else {
errno = 0;
out = fopen(fname, append ? "a" : "w");
if (out == NULL) {
snprintf(errmsg, LINE_MAX, "Cannot open output file %s: %s",
fname, strerror(errno));
return errmsg;
}
}
snprintf(_version, LINE_MAX, "libtrackrdr-file %s", SO_VERSION);
return NULL;
}
const char *
MQ_InitConnections(void)
{
workers = (wrk_t *) calloc(nwrk, sizeof(wrk_t));
for (int i = 0; i < nwrk; i++)
workers[i].magic = FILE_WRK_MAGIC;
return NULL;
}
const char *
MQ_WorkerInit(void **priv, int wrk_num)
{
wrk_t *wrk;
assert(wrk_num >= 1 && wrk_num <= nwrk);
wrk = &workers[wrk_num - 1];
wrk->n = wrk_num;
*priv = (void *) wrk;
return NULL;
}
int
MQ_Send(void *priv, const char *data, unsigned len, const char *key,
unsigned keylen, const char **error)
{
wrk_t *wrk;
if (priv == NULL) {
*error = "MQ_Send() called with NULL worker object";
return -1;
}
CAST_OBJ(wrk, priv, FILE_WRK_MAGIC);
if (fprintf(out, "key=%.*s: %.*s\n", keylen, key, len, data) < 0) {
snprintf(wrk->errmsg, LINE_MAX, "worker %d: error writing output",
wrk->n);
*error = wrk->errmsg;
return 1;
}
return 0;
}
const char *
MQ_Reconnect(void **priv)
{
wrk_t *wrk;
CAST_OBJ_NOTNULL(wrk, *priv, FILE_WRK_MAGIC);
assert(wrk->n > 0 && wrk->n <= nwrk);
return NULL;
}
const char *
MQ_Version(void *priv, char *version)
{
(void) priv;
strcpy(version, _version);
return NULL;
}
const char *
MQ_ClientID(void *priv, char *clientID)
{
wrk_t *wrk;
CAST_OBJ_NOTNULL(wrk, priv, FILE_WRK_MAGIC);
sprintf(clientID, "worker %d", wrk->n);
return NULL;
}
const char *
MQ_WorkerShutdown(void **priv, int wrk_num)
{
wrk_t *wrk;
(void) wrk_num;
CAST_OBJ_NOTNULL(wrk, *priv, FILE_WRK_MAGIC);
assert(wrk->n > 0 && wrk->n <= nwrk);
*priv = NULL;
return NULL;
}
const char *
MQ_GlobalShutdown(void)
{
free(workers);
if (out != stdout) {
errno = 0;
if (fclose(out) != 0) {
snprintf(errmsg, LINE_MAX, "Error closing output file %s: %s",
fname, strerror(errno));
return errmsg;
}
}
return NULL;
}
# test config for ActiveMQ with 2 MQ URIs
mq.uri = tcp://localhost:61616?wireFormat.maxInactivityDuration=0
mq.uri = tcp://localhost:61616?connection.sendTimeout=1000&wireFormat.maxInactivityDuration=0
mq.qname = lhoste/tracking/test
# test config for the file MQ plugin
output.file = mq_test.log
append = false
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '2777461934 234191' ]; then
if [ "$CKSUM" != '422255173 234182' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
......@@ -6,5 +6,5 @@ maxopen.scale = 11
maxdone = 1024
monitor.interval = 0
nworkers = 0
mq.module = ../mq/activemq/.libs/libtrackrdr-activemq.so
mq.config_file = activemq.conf
mq.module = ../mq/file/.libs/libtrackrdr-file.so
mq.config_file = file_mq.conf
......@@ -37,11 +37,8 @@
#include "../trackrdrd.h"
#include "vas.h"
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so"
#define MQ_CONFIG "activemq.conf"
#define MQ_MODULE "../mq/file/.libs/libtrackrdr-file.so"
#define MQ_CONFIG "file_mq.conf"
#define NWORKERS 1
......@@ -92,8 +89,7 @@ static char
config.nworkers = NWORKERS;
strcpy(config.mq_config_file, MQ_CONFIG);
err = mqf.global_init(config.nworkers, config.mq_config_file);
sprintf(errmsg, "MQ_GlobalInit: %s", err);
mu_assert(errmsg, err == NULL);
VMASSERT(err == NULL, "MQ_GlobalInit: %s", err);
return NULL;
}
......@@ -106,12 +102,7 @@ static char
printf("... testing MQ connection initialization\n");
err = mqf.init_connections();
if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED);
}
sprintf(errmsg, "MQ_InitConnections: %s", err);
mu_assert(errmsg, err == NULL);
VMASSERT(err == NULL, "MQ_InitConnections: %s", err);
return NULL;
}
......@@ -124,10 +115,8 @@ static const char
printf("... test worker init\n");
err = mqf.worker_init(&worker, NWORKERS);
sprintf(errmsg, "MQ_WorkerInit: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("Worker is NULL after MQ_WorkerInit", worker != NULL);
VMASSERT(err == NULL, "MQ_WorkerInit: %s", err);
MASSERT0(worker != NULL, "Worker is NULL after MQ_WorkerInit");
return NULL;
}
......@@ -140,11 +129,10 @@ static const char
printf("... testing version info\n");
mu_assert("MQ_Version: worker is NULL before call", worker != NULL);
MASSERT0(worker != NULL, "MQ_Version: worker is NULL before call");
err = mqf.version(worker, version);
sprintf(errmsg, "MQ_Version: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("MQ_Version: version is empty", version[0] != '\0');
VMASSERT(err == NULL, "MQ_Version: %s", err);
MASSERT0(version[0] != '\0', "MQ_Version: version is empty");
return NULL;
}
......@@ -157,11 +145,10 @@ static const char
printf("... testing client ID info\n");
mu_assert("MQ_ClientID: worker is NULL before call", worker != NULL);
MASSERT0(worker != NULL, "MQ_ClientID: worker is NULL before call");
err = mqf.client_id(worker, clientID);
sprintf(errmsg, "MQ_Version: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("MQ_Version: client ID is empty", clientID[0] != '\0');
VMASSERT(err == NULL, "MQ_ClientID: %s", err);
MASSERT0(clientID[0] != '\0', "MQ_ClientID: client ID is empty");
return NULL;
}
......@@ -174,10 +161,9 @@ static const char
printf("... testing message send\n");
mu_assert("MQ_Send: worker is NULL before call", worker != NULL);
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
ret = mqf.send(worker, "foo bar baz quux", 16, "key", 3, &err);
sprintf(errmsg, "MQ_Send: %s", err);
mu_assert(errmsg, ret == 0);
VMASSERT(ret == 0, "MQ_Send: %s", err);
return NULL;
}
......
......@@ -44,13 +44,10 @@
#define debug_print(fmt, ...) \
do { if (DEBUG) fprintf(stderr, fmt, __VA_ARGS__); } while(0)
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define NWORKERS 5
#define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so"
#define MQ_CONFIG "activemq2.conf"
#define MQ_MODULE "../mq/file/.libs/libtrackrdr-file.so"
#define MQ_CONFIG "file_mq.conf"
int tests_run = 0;
static char errmsg[BUFSIZ];
......@@ -108,10 +105,6 @@ static char
mu_assert(errmsg, error == NULL);
error = mqf.init_connections();
if (error != NULL && strstr(error, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED);
}
sprintf(errmsg, "MQ_InitConnections failed: %s", error);
mu_assert(errmsg, error == NULL);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment