Commit 1ae2cd6e authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: integrated ActiveMQ (no worker threads yet)

parent 46c9f3fa
......@@ -20,7 +20,7 @@ cd $WORKSPACE/trackrdrd
./autogen.sh
[[ $? -ne 0 ]] && exit 1
CFLAGS=-m64 ./configure VARNISHSRC=$WORKSPACE/lhotse-varnish-cache
CFLAGS=-m64 CXXFLAGS=-m64 ./configure VARNISHSRC=$WORKSPACE/lhotse-varnish-cache
[[ $? -ne 0 ]] && exit 1
make clean
......
......@@ -12,11 +12,13 @@ AM_INIT_AUTOMAKE([foreign])
AC_GNU_SOURCE
AC_PROG_CC
AC_PROG_CXX
AC_PROG_CC_STDC
if test "x$ac_cv_prog_cc_c99" = xno; then
AC_MSG_ERROR([Could not find a C99 compatible compiler])
fi
AC_PROG_CPP
AC_PROG_CXXCPP
AC_PROG_INSTALL
AC_PROG_LIBTOOL
......@@ -47,6 +49,8 @@ AC_CHECK_FILE([$VARNISHSRC/include/varnishapi.h],
[AC_MSG_FAILURE(["$VARNISHSRC" is not a Varnish source directory])]
)
PKG_CHECK_MODULES([AMQ], [activemq-cpp])
# From Varnish configure.ac
# Now that we're done using the compiler to look for functions and
# libraries, set CFLAGS to what we want them to be for our own code
......@@ -54,23 +58,34 @@ AC_CHECK_FILE([$VARNISHSRC/include/varnishapi.h],
# This corresponds to FreeBSD's WARNS level 6
DEVELOPER_CFLAGS="-Wall -Wstrict-prototypes -Wmissing-prototypes -Wpointer-arith -Wreturn-type -Wcast-qual -Wwrite-strings -Wswitch -Wshadow -Wcast-align -Wunused-parameter -Wchar-subscripts -Winline -Wnested-externs -Wredundant-decls -Wformat"
# Added for compiling C++ for ActiveMQ.
# -Wstrict-prototypes, -Wmissing-prototypes & -Wnested-externs invalid for C++
# activemq-cpp fails on -Wshadow
DEVELOPER_CXXFLAGS="-Wall -Wpointer-arith -Wreturn-type -Wcast-qual -Wwrite-strings -Wswitch -Wcast-align -Wunused-parameter -Wchar-subscripts -Winline -Wredundant-decls -Wformat"
# Additional flags for GCC 4
EXTRA_DEVELOPER_CFLAGS="-Wextra -Wno-missing-field-initializers -Wno-sign-compare"
# --enable-developer-warnings
AC_ARG_ENABLE(developer-warnings,
AS_HELP_STRING([--enable-developer-warnings],[enable strict warnings (default is NO)]),
CFLAGS="${CFLAGS} ${DEVELOPER_CFLAGS}")
CFLAGS="${CFLAGS} ${DEVELOPER_CFLAGS}"
CXXFLAGS="${CXXFLAGS} ${DEVELOPER_CXXFLAGS}"
)
# --enable-debugging-symbols
AC_ARG_ENABLE(debugging-symbols,
AS_HELP_STRING([--enable-debugging-symbols],[enable debugging symbols (default is NO)]),
CFLAGS="${CFLAGS} -O0 -g -fno-inline")
CFLAGS="${CFLAGS} -O0 -g -fno-inline"
CXXFLAGS="${CXXFLAGS} -O0 -g -fno-inline"
)
# --enable-diagnostics
AC_ARG_ENABLE(diagnostics,
AS_HELP_STRING([--enable-diagnostics],[enable run-time diagnostics (default is NO)]),
CFLAGS="${CFLAGS} -DDIAGNOSTICS")
CFLAGS="${CFLAGS} -DDIAGNOSTICS"
CXXFLAGS="${CXXFLAGS} -DDIAGNOSTICS"
)
# --enable-extra-developer-warnings
AC_ARG_ENABLE(extra-developer-warnings,
......@@ -81,7 +96,9 @@ AC_ARG_ENABLE(extra-developer-warnings,
# --enable-werror
AC_ARG_ENABLE(werror,
AS_HELP_STRING([--enable-werror],[use -Werror (default is NO)]),
CFLAGS="${CFLAGS} -Werror")
CFLAGS="${CFLAGS} -Werror"
CXXFLAGS="${CXXFLAGS} -Werror"
)
AC_CONFIG_FILES([
Makefile
......
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC)
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) @AMQ_CFLAGS@
bin_PROGRAMS = trackrdrd
......@@ -11,13 +11,17 @@ trackrdrd_SOURCES = \
log.c \
config.c \
data.c \
monitor.c
monitor.c \
mq.c \
activemq/amq.h \
activemq/amq.cpp
trackrdrd_LDADD = \
$(VARNISHSRC)/lib/libvarnishcompat/libvarnishcompat.la \
$(VARNISHSRC)/lib/libvarnishapi/libvarnishapi.la \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
${PTHREAD_LIBS}
${PTHREAD_LIBS} \
@AMQ_LIBS@
BUILT_SOURCES = revision.h usage.h
MAINTAINERCLEANFILES = revision.h usage.h
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "amq.h"
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <cms/IllegalStateException.h>
#define CATCHALL \
catch (CMSException& cex) { \
return cex.what(); \
} \
catch (Throwable& th) { \
return th.what(); \
} \
catch (std::exception& sex) { \
return sex.what(); \
} \
catch (...) { \
return "Unexpected error"; \
}
using namespace std;
using namespace activemq::core;
using namespace cms;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
ActiveMQConnectionFactory* AMQ_Worker::factory = NULL;
void
AMQ_Worker::initConnectionFactory(const std::string& brokerURI) {
factory = new ActiveMQConnectionFactory(brokerURI);
}
void
AMQ_Worker::shutdown() {
delete factory;
factory = NULL;
}
AMQ_Worker::AMQ_Worker(std::string& qName,
Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE,
int deliveryMode = DeliveryMode::NON_PERSISTENT) {
if (factory == NULL)
throw cms::IllegalStateException("Connection factory not initialized");
connection = factory->createConnection();
connection->start();
session = connection->createSession(ackMode);
queue = session->createQueue(qName);
producer = session->createProducer(queue);
producer->setDeliveryMode(deliveryMode);
msg = session->createTextMessage();
}
AMQ_Worker::~AMQ_Worker() {
if (connection != NULL)
connection->close();
delete producer;
producer = NULL;
delete queue;
queue = NULL;
delete session;
session = NULL;
delete connection;
connection = NULL;
}
/* XXX: Timeout */
void
AMQ_Worker::send(std::string& text) {
if (msg == NULL || producer == NULL)
throw cms::IllegalStateException("Worker fields are NULL");
msg->setText(text);
producer->send(msg);
}
const char *
AMQ_GlobalInit(char *uri)
{
activemq::library::ActiveMQCPP::initializeLibrary();
try {
string brokerURI (uri);
AMQ_Worker::initConnectionFactory(brokerURI);
return NULL;
}
CATCHALL
}
const char *
AMQ_WorkerInit(AMQ_Worker **worker, char *qName)
{
try {
string queueName (qName);
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(queueName));
*worker = w.release();
return NULL;
}
CATCHALL
}
const char *
AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len)
{
try {
if (worker == NULL)
throw NullPointerException(__FILE__, __LINE__, "AMQ_Worker NULL");
if (data == NULL)
throw IllegalArgumentException(__FILE__, __LINE__, "Data NULL");
string text (data, len);
worker->send(text);
return NULL;
}
CATCHALL
}
const char *
AMQ_WorkerShutdown(AMQ_Worker **worker)
{
try {
delete worker;
return NULL;
}
CATCHALL
}
const char *
AMQ_GlobalShutdown()
{
try {
AMQ_Worker::shutdown();
activemq::library::ActiveMQCPP::shutdownLibrary();
return NULL;
}
CATCHALL
}
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#ifndef _AMQ_H
#define _AMQ_H
#ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/Queue.h>
#include <cms/MessageProducer.h>
using namespace activemq::core;
using namespace cms;
class AMQ_Worker {
private:
static ActiveMQConnectionFactory* factory;
Connection* connection;
Session* session;
Queue* queue;
MessageProducer* producer;
TextMessage* msg;
AMQ_Worker() {};
public:
static void initConnectionFactory(const std::string& brokerURI);
static void shutdown();
AMQ_Worker(std::string& qName, Session::AcknowledgeMode ackMode,
int deliveryMode);
virtual ~AMQ_Worker();
void send(std::string& text);
};
#else
typedef struct AMQ_Worker AMQ_Worker;
#endif
#ifdef __cplusplus
extern "C" {
#endif
const char *AMQ_GlobalInit(char *uri);
const char *AMQ_WorkerInit(AMQ_Worker **worker, char *qName);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
const char *AMQ_GlobalShutdown(void);
#ifdef __cplusplus
}
#endif
#endif /* _AMQ_H */
......@@ -106,6 +106,8 @@ CONF_Add(const char *lval, const char *rval)
confString("log.file", log_file);
confString("varnish.bindump", varnish_bindump);
confString("processor.log", processor_log);
confString("mq.uri", mq_uri);
confString("mq.qname", mq_qname);
confUnsigned("maxopen.scale", maxopen_scale);
confUnsigned("maxdata.scale", maxdata_scale);
......@@ -171,6 +173,8 @@ CONF_Init(void)
config.processor_log[0] = '\0';
config.maxopen_scale = 0;
config.maxdata_scale = 0;
config.mq_uri[0] = '\0';
config.mq_qname[0] = '\0';
}
int
......@@ -225,6 +229,7 @@ CONF_ReadFile(const char *file) {
#define confdump(str,val) \
LOG_Log(LOG_DEBUG, "config: " str, (val))
void
CONF_Dump(void)
{
......@@ -238,4 +243,6 @@ CONF_Dump(void)
confdump("processor.log = %s", config.processor_log);
confdump("maxopen.scale = %d", config.maxopen_scale);
confdump("maxdata.scale = %d", config.maxdata_scale);
confdump("mq.uri = %s", config.mq_uri);
confdump("mq.qname = %s", config.mq_qname);
}
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "trackrdrd.h"
#include "activemq/amq.h"
const char *
MQ_GlobalInit(void)
{
return AMQ_GlobalInit(config.mq_uri);
}
const char *
MQ_WorkerInit(void **priv)
{
return AMQ_WorkerInit((AMQ_Worker **) priv, config.mq_qname);
}
const char *
MQ_Send(void *priv, const char *data, unsigned len)
{
return AMQ_Send((AMQ_Worker *) priv, data, len);
}
const char *
MQ_WorkerShutdown(void **priv)
{
const char *err = AMQ_WorkerShutdown((AMQ_Worker **) priv);
if (err != NULL)
return err;
*priv = NULL;
return NULL;
}
const char *
MQ_GlobalShutdown(void)
{
return AMQ_GlobalShutdown();
}
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC)
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) @AMQ_CFLAGS@
TESTS = test_parse test_data regress.sh
TESTS = test_parse test_data test_mq regress.sh
check_PROGRAMS = test_parse test_data
check_PROGRAMS = test_parse test_data test_mq
test_parse_SOURCES = \
minunit.h \
......@@ -20,3 +20,14 @@ test_data_SOURCES = \
test_data_LDADD = \
../data.$(OBJEXT)
test_mq_SOURCES = \
minunit.h \
test_mq.c \
../trackrdrd.h
test_mq_LDADD = \
-lm \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
@AMQ_LIBS@
......@@ -13,8 +13,9 @@
echo
echo "TEST: $0"
echo "... testing log output at debug level against a known checksum"
CKSUM=$(../trackrdrd -f varnish.binlog -l - -d -c test.conf | cksum)
if [ "$CKSUM" != '3372973632 229022' ]; then
if [ "$CKSUM" != '1387393550 229074' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "minunit.h"
#include "../trackrdrd.h"
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
int tests_run = 0;
static char errmsg[BUFSIZ];
void *worker;
/* N.B.: Always run the tests in this order */
static char
*test_global_init(void)
{
const char *err;
printf("... testing MQ global initialization\n");
strcpy(config.mq_uri, "tcp://localhost:61616");
err = MQ_GlobalInit();
sprintf(errmsg, "MQ_GlobalInit: %s", err);
mu_assert(errmsg, err == NULL);
return NULL;
}
static const char
*test_worker_init(void)
{
const char *err;
printf("... test worker init (including connect to ActiveMQ)\n");
strcpy(config.mq_qname, "lhoste/tracking/test");
err = MQ_WorkerInit(&worker);
if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED);
}
sprintf(errmsg, "MQ_WorkerInit: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("Worker is NULL after MQ_WorkerInit", worker != NULL);
return NULL;
}
static const char
*test_send(void)
{
const char *err;
printf("... testing message send\n");
mu_assert("MQ_Send: worker is NULL before call", worker != NULL);
err = MQ_Send(worker, "foo bar baz quux", 16);
sprintf(errmsg, "MQ_Send: %s", err);
mu_assert(errmsg, err == NULL);
return NULL;
}
static const char
*test_worker_shutdown(void)
{
const char *err;
printf("... testing worker shutdown\n");
mu_assert("MQ_WorkerShhutdown: worker is NULL before call", worker != NULL);
err = MQ_WorkerShutdown(&worker);
sprintf(errmsg, "MQ_WorkerShutdown: %s", err);
mu_assert(errmsg, err == NULL);
mu_assert("Worker not NULL after shutdown", worker == NULL);
err = MQ_Send(worker, "foo bar baz quux", 16);
mu_assert("No failure on MQ_Send after worker shutdown", err != NULL);
return NULL;
}
static const char
*test_global_shutdown(void)
{
const char *err;
printf("... testing global shutdown\n");
err = MQ_GlobalShutdown();
sprintf(errmsg, "MQ_GlobalShutdown: %s", err);
mu_assert(errmsg, err == NULL);
return NULL;
}
static const char
*all_tests(void)
{
mu_run_test(test_global_init);
mu_run_test(test_worker_init);
mu_run_test(test_send);
mu_run_test(test_worker_shutdown);
mu_run_test(test_global_shutdown);
return NULL;
}
TEST_RUNNER
......@@ -31,6 +31,13 @@
#include <stdio.h>
/* mq.c */
const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_WorkerShutdown(void **priv);
const char *MQ_GlobalShutdown(void);
/* data.c */
typedef enum {
DATA_EMPTY = 0,
......@@ -89,6 +96,8 @@ struct config {
char processor_log[BUFSIZ];
unsigned maxopen_scale;
unsigned maxdata_scale;
char mq_uri[BUFSIZ];
char mq_qname[BUFSIZ];
} config;
void CONF_Init(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment