Commit 81a9b29f authored by Geoff Simmons's avatar Geoff Simmons

Discontinue support for ActiveMQ.

parent f784f4c3
......@@ -51,7 +51,6 @@ src/test/test_mq
src/test/test_parse
src/test/test_spmcq
src/test/test_worker
src/mq/activemq/test/test_activemq
src/mq/kafka/test/test_kafka
src/mq/kafka/test/test_partition
src/mq/kafka/test/test_send
......@@ -73,8 +72,6 @@ src/mq/kafka/test/zoo.log
# make check artifacts
test-suite.log
/test-driver
src/mq/activemq/test/test_*.log
src/mq/activemq/test/test_*.trs
src/mq/kafka/test/test_*.log
src/mq/kafka/test/test_*.trs
src/test/test_*.log
......
ACLOCAL_AMFLAGS = -I m4
if AMQ_OPT
MAYBE_AMQ = src/mq/activemq src/mq/activemq/test
endif
if KAFKA_OPT
MAYBE_KAFKA = src/mq/kafka src/mq/kafka/test
endif
SUBDIRS = src src/mq/file src/test $(MAYBE_AMQ) $(MAYBE_KAFKA)
SUBDIRS = src src/mq/file src/test $(MAYBE_KAFKA)
if HAVE_RST2MAN
dist_man_MANS = trackrdrd.3
......@@ -28,4 +24,4 @@ include doxygen-include.am
EXTRA_DIST = README.rst autogen.sh m4/* $(DX_CONFIG) doc/html
doc/html: doxygen-run
\ No newline at end of file
doc/html: doxygen-run
......@@ -9,7 +9,7 @@ Tracking Log Reader demon
-------------------------
:Author: Geoffrey Simmons
:Date: 2017-09-08
:Date: 2017-11-14
:Version: trunk
:Manual section: 3
......@@ -29,7 +29,7 @@ DESCRIPTION
The ``trackrdrd`` demon reads from the shared memory log of a running
instance of Varnish, aggregates data logged in a specific format for
requests and ESI subrequests, and forwards the data to a messaging
system, such as ActiveMQ or Kafka.
system, such as Kafka.
``trackrdrd`` reads data from ``VCL_Log`` entries that are displayed
in this format by the ``varnishlog`` tool for client request
......@@ -92,9 +92,9 @@ declared in the MQ interface in ``include/mq.h``. See ``mq.h`` for
documentation of the interface.
The source distribution for ``trackrdrd`` includes implementations of
the MQ interface for Kafka, ActiveMQ and for file output (the latter
for testing and debugging); see libtrackrdr-kafka(3),
libtrackrdr-activemq(3) and libtrackrdr-file(3) for details.
the MQ interface for Kafka and for file output (the latter for testing
and debugging); see libtrackrdr-kafka(3) and libtrackrdr-file(3) for
details.
EXAMPLE
=======
......@@ -235,16 +235,10 @@ Zookeeper (``libzookeeper_mt``)::
https://github.com/edenhill/librdkafka
http://zookeeper.apache.org/
To build the messaging plugin for ActiveMQ (``libtrackrdr-activemq``)
it is neccessary to link with the CMS or ActiveMQ-CPP library
(``libactivemq-cpp``). The sources can be obtained from::
http://activemq.apache.org/cms/
The messaging plugins for Kafka and ActiveMQ are optional, and you can
choose to disable the builds of either or both of them in the
``configure`` step, as explained below. Requirements do not need to be
met for plugins that are not built.
The messaging plugin for Kafka is optional, and you can choose to
disable its build in the ``configure`` step, as explained
below. Requirements do not need to be met for plugins that are not
built.
Building and installing trackrdrd
---------------------------------
......@@ -257,11 +251,6 @@ machine (with the same architecture on which they will run), then they
will likely match by default. When in doubt, set compile-time flags
such as ``CFLAGS=-m64`` for ``gcc``.
For ActiveMQ, the flag ``CXXFLAGS`` should be set similarly to
``CFLAGS``, because C++ code is also compiled (unless you choose to
disable the ActiveMQ plugin). Settings for ``CXXFLAGS`` can be
obtained from ``pkg-config --cflags apr-1``.
At minimum, run these steps::
$ git clone $TRACKRDRD_GIT_URL
......@@ -288,10 +277,10 @@ be shown with::
$ configure --help
To disable the build of the Kafka or ActiveMQ MQ implementations,
specify the options ``--disable-kafka`` or ``disable-activemq`` for
``configure``. Both are enabled by default. A file output plugin,
suitable for testing and debugging, is always built.
To disable the build of the Kafka MQ implementation, specify the
option ``--disable-kafka`` for ``configure``. It is enabled by
default. A file output plugin, suitable for testing and debugging, is
always built.
To specify a non-standard installation prefix, add the ``--prefix``
option::
......@@ -309,8 +298,8 @@ a non-standard location, set these env variables before running
* export PATH=$PREFIX/bin:$PREFIX/sbin:$PATH
``PKG_CONFIG_PATH`` might also have to include pkg-config directories
for other requirements, such as the ActiveMQ C++ libraries, if they
have been installed into non-default locations.
for other requirements, such as the Kafka client library, if they have
been installed into non-default locations.
If the Varnish installation against which ``trackrdrd`` is *run* has a
non-standard location, it is necessary to specify runtime paths to the
......@@ -341,17 +330,15 @@ Building and installing packaged MQ implementations
---------------------------------------------------
The ``trackrdrd`` distribution includes implementations of the MQ
interface for Kafka and ActiveMQ message brokers, as well as the file
output plugin. For details of the builds and their dependencies, see
libtrackrdr-kafka(3), libtrackrdr-activemq(3) and libtrackrdr-file(3)
(``README.rst`` in ``src/mq/kafka``, ``src/mq/activemq`` and
``src/mq/file``).
interface for the Kafka message broker as well as the file output
plugin. For details of the builds and their dependencies, see
libtrackrdr-kafka(3) and libtrackrdr-file(3) (``README.rst`` in
``src/mq/kafka`` and ``src/mq/file``).
The global make targets for ``trackrdrd`` also build the MQ
implementations, unless their builds are disabled in the ``configure``
step as explained above. If they are enabled, then it is necessary to
configure the build for them as well, for example by setting
``CXXFLAGS`` to compile C++ sources.
configure the build for them as well.
STARTUP AND SHUTDOWN
====================
......@@ -821,7 +808,6 @@ SEE ALSO
* ``varnishd(1)``
* ``libtrackrdr-file(3)``
* ``libtrackrdr-kafka(3)``
* ``libtrackrdr-activemq(3)``
* ``ld.so(8)``
* ``syslog(3)``
......
......@@ -16,19 +16,16 @@ LT_INIT
AC_GNU_SOURCE
AC_PROG_CC
AC_PROG_CXX
AC_PROG_CC_STDC
if test "x$ac_cv_prog_cc_c99" = xno; then
AC_MSG_ERROR([Could not find a C99 compatible compiler])
fi
AC_PROG_CPP
AC_PROG_CXXCPP
AX_PTHREAD(,[AC_MSG_ERROR([Could not configure pthreads support])])
LIBS="$PTHREAD_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
CXXFLAGS="$CXXFLAGS -std=c++98 $PTHREAD_CFLAGS"
CC="$PTHREAD_CC"
AC_PROG_INSTALL
......@@ -96,17 +93,6 @@ AS_IF([test "x$enable_kafka" != xno],
AC_MSG_ERROR([librdkafka is required]))
], [])
AC_ARG_ENABLE([activemq], [AS_HELP_STRING([--enable-activemq],
[build the ActiveMQ MQ plugin @<:@default=yes@:>@])],
[], [enable_activemq=yes])
AM_CONDITIONAL([AMQ_OPT], [test "$enable_activemq" = yes])
AS_IF([test "x$enable_activemq" != xno],
[
PKG_CHECK_MODULES([AMQ], [activemq-cpp])
PKG_CHECK_MODULES([APR], [apr-1])
PKG_CHECK_MODULES([APU], [apr-util-1])
], [])
# From Varnish configure.ac
# Now that we're done using the compiler to look for functions and
# libraries, set CFLAGS to what we want them to be for our own code
......@@ -133,27 +119,6 @@ DEVELOPER_CFLAGS=" \
-Wno-sign-compare \
"
# Added for compiling C++ for ActiveMQ.
# -Wstrict-prototypes, -Wmissing-prototypes & -Wnested-externs invalid for C++
# activemq-cpp fails on: -Wshadow, -Wcast-qual,-Woverloaded-virtual and
# -Wunused-parameter
DEVELOPER_CXXFLAGS=" \
-Werror \
-Wall \
-Wno-format-y2k \
-W \
-Wpointer-arith \
-Wreturn-type \
-Wwrite-strings \
-Wswitch \
-Wcast-align \
-Wchar-subscripts \
-Wextra \
-Wno-sign-compare \
-Wno-overloaded-virtual \
-Wno-unused-parameter \
"
# These are not compliable yet
DEVELOPER_GCC_CFLAGS="-Wold-style-definition -Wredundant-decls "
#DEVELOPER_CFLAGS="${DEVELOPER_CFLAGS} ${DEVELOPER_GCC_CFLAGS}"
......@@ -184,17 +149,14 @@ AS_IF([test "x$enable_stack_protector" != xno],
AC_ARG_ENABLE(developer-warnings,
AS_HELP_STRING([--enable-developer-warnings],[enable strict warnings (default is NO)]),
CFLAGS="${CFLAGS} ${DEVELOPER_CFLAGS}"
CXXFLAGS="${CXXFLAGS} ${DEVELOPER_CXXFLAGS}"
)
# --enable-debugging-symbols
AC_ARG_ENABLE(debugging-symbols,
AS_HELP_STRING([--enable-debugging-symbols],[enable debugging symbols (default is NO)]),
CFLAGS="${CFLAGS} -O0 -g -fno-inline"
CXXFLAGS="${CXXFLAGS} -O0 -g -fno-inline"
)
## From activemq-cpp configure.ac
## Execute Doxygen macros
DX_HTML_FEATURE(ON)
DX_CHM_FEATURE(OFF)
......@@ -211,8 +173,6 @@ AC_CONFIG_FILES([
src/Makefile
src/mq/file/Makefile
src/test/Makefile
src/mq/activemq/Makefile
src/mq/activemq/test/Makefile
src/mq/kafka/Makefile
src/mq/kafka/test/Makefile
])
......
AUTOMAKE_OPTIONS = subdir-objects
AM_CPPFLAGS = @VARNISH_CFLAGS@ -I$(top_srcdir)/include \
@AMQ_CFLAGS@ @APR_CFLAGS@ @APU_CFLAGS@
pkglib_LTLIBRARIES = libtrackrdr-activemq.la
libtrackrdr_activemq_la_SOURCES = \
$(top_srcdir)/include/mq.h \
$(top_srcdir)/include/config_common.h \
mq.c \
amq.h \
amq.cpp \
amq_connection.h \
amq_connection.cpp \
$(top_srcdir)/src/config_common.c
libtrackrdr_activemq_la_LIBADD = \
${PTHREAD_LIBS} \
@AMQ_LIBS@ \
@APR_LIBS@ \
@APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS = -version-info 5:0:0
if HAVE_RST2MAN
dist_man_MANS = libtrackrdr-activemq.3
MAINTAINERCLEANFILES = $(dist_man_MANS)
endif
libtrackrdr-activemq.3: README.rst
if HAVE_RST2MAN
${RST2MAN} README.rst $@
endif
.. _ref-trackrdrd:
=====================
libtrackrdr-activemq
=====================
-----------------------------------------------------------------------
ActiveMQ implementation of the MQ interface for the Tracking Log Reader
-----------------------------------------------------------------------
:Author: Geoffrey Simmons
:Date: 2015-04-26
:Version: 3.0
:Manual section: 3
DESCRIPTION
===========
``libtrackrdr-activemq.so`` provides an implementation of the tracking
reader's MQ interface to send messages to ActiveMQ message
brokers. See ``include/mq.h`` in the ``trackrdrd`` source distribution
for documentation of the interface.
To use this implementation with ``trackrdrd``, specify the shared
object as the value of ``mq.module`` in the tracking reader's
configuration (see trackrdrd(3)). The configuration value may be the
absolute path of the shared object; or its name, provided that it can
be found by the dynamic linker (see ld.so(8)).
``libtrackrdr-activemq`` also requires a configuration file, whose
path is specified as ``mq.config_fname`` in the configuration of
``trackrdrd``.
``libtrackrdrd-activemq`` in turn depends on the library ActiveMQ-CPP
(or CMS), a client library for ActiveMQ, packaged on many systems as
``libactivemq-cpp``. The dynamic linker must also be able to find
``libactivemq-cpp.so`` at runtime.
This implementation does not use sharding keys; the key data in the
call to ``MQ_Send()`` are silently discarded.
BUILD/INSTALL
=============
The sources for ``libtrackrdr-activemq`` are provided in the source
repository for ``trackrdrd``, in the subdirectory
``src/mq/activemq/``.
The sources for ActiveMQ-CPP can be obtained from::
http://activemq.apache.org/cms/
Building and installing ActiveMQ-CPP
------------------------------------
The ActiveMQ interface has been tested with versions 3.4.4 through
3.8.2 of ActiveMQ-CPP. If the library ``libactivemq-cpp`` is already
installed on the platform where ``trackrdrd`` will run, then no
further action is necessary. To build the library from source, follow
the instructions in the ``README.txt`` file of its source
distribution.
Building and installing libtrackrdr-activemq
--------------------------------------------
``libactivemq-trackrdr`` is built as part of the global build for
``trackrdrd``; for details and requirements of the build, see
trackrdrd(3). Note that for the ActiveMQ implementation, which
includes C++ classes, it is necessary to configure the build for C++
compilation (for example by setting ``CXXFLAGS``), and it may be
necessary to add additional configuration for compiling and linking
with the ActiveMQ-CPP library, for example as obtained from
``pkg-config --cflags apr-1``.
To specifically build the MQ implementation (without building all of
the rest of ``trackrdrd``), it suffices to invoke ``make`` commands in
the subdirectory ``src/mq/activemq`` (after having executed the
``configure`` script for ``trackrdrd``)::
# in the trackrdrd repo
$ cd src/mq/activemq
$ make
For self-tests after the build, run::
$ make check
If a connection is open at port 61616 on the host where the self-tests
are run, then it is assumed that an ActiveMQ message broker is
listening, and tests are run against the MQ URI
``tcp://localhost:61616``. If the port is not open, then the ``make
check`` test exits with the status ``SKIPPED``.
(When ``make check`` is run globally for the ``trackrdrd`` build, then
it is also run for ``libtrackrdr-activemq``).
To install the shared object ``libtrackrdr-activemq``, run ``make
install`` as root, for example with ``sudo``::
$ sudo make install
In standard configuration, the ``.so`` file will be installed by
``libtool(1)``, and its location may be affected by the ``--libdir``
option to ``configure``.
CONFIGURATION
=============
As mentioned above, a configuration file for ``libtrackrdr-activemq``
must be specified in the configuration parameter ``mq.config_fname``
for ``trackrdrd``, and initialization of the MQ implementation fails
if this file cannot be found or read by the process owner of
``trackrdrd`` (or if its syntax is false, or if either of the two
required parameters are missing).
The syntax of the configuration file is the same as that of
``trackrdrd``. Both of the parameters ``mq.uri`` and ``mq.qname`` are
required (there are no default values), and (only) ``mq.uri`` may be
specified more than once.
================== ============================================================
Parameter Description
================== ============================================================
``mq.uri`` URIs for the message broker. If more than one MQ URI is
specified, than worker threads distribute their connections
to the different message brokers.
------------------ ------------------------------------------------------------
``mq.qname`` Name of the queue (destination) to which messages are sent
at the message broker(s)
================== ============================================================
SEE ALSO
========
* ``trackrdrd(3)``
* ``ld.so(8)``
* http://activemq.apache.org/cms/
COPYRIGHT AND LICENCE
=====================
For both the software and this document are governed by a BSD 2-clause
licence.
| Copyright (c) 2012-2015 UPLEX Nils Goroll Systemoptimierung
| Copyright (c) 2012-2015 Otto Gmbh & Co KG
| All rights reserved
| Use only with permission
| Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "amq.h"
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/core/ActiveMQConnection.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <cms/IllegalStateException.h>
#include <cms/ConnectionMetaData.h>
#define CATCHALL \
catch (CMSException& cex) { \
return cex.what(); \
} \
catch (Throwable& th) { \
return th.what(); \
} \
catch (std::exception& sex) { \
return sex.what(); \
} \
catch (...) { \
return "Unexpected error"; \
}
using namespace std;
using namespace activemq::core;
using namespace cms;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
AMQ_Worker::AMQ_Worker(Connection* cn, std::string& qName, int n,
Session::AcknowledgeMode ackMode = Session::AUTO_ACKNOWLEDGE,
int deliveryMode = DeliveryMode::NON_PERSISTENT) {
connection = cn;
num = n;
session = connection->createSession(ackMode);
queue = session->createQueue(qName);
producer = session->createProducer(queue);
producer->setDeliveryMode(deliveryMode);
msg = session->createTextMessage();
}
AMQ_Worker::~AMQ_Worker() {
if (queue != NULL) {
delete queue;
queue = NULL;
}
if (producer != NULL) {
delete producer;
producer = NULL;
}
if (session != NULL) {
session->stop();
delete session;
session = NULL;
}
}
int
AMQ_Worker::getNum() {
return num;
}
void
AMQ_Worker::send(std::string& text) {
if (msg == NULL || producer == NULL)
throw cms::IllegalStateException("Worker fields are NULL");
msg->setText(text);
producer->send(msg);
}
std::string
AMQ_Worker::getVersion() {
if (connection == NULL)
throw cms::IllegalStateException("Connection uninitialized");
const ConnectionMetaData *md = connection->getMetaData();
return md->getCMSProviderName() + " " + md->getProviderVersion();
}
std::string
AMQ_Worker::getClientID() {
if (connection == NULL)
throw cms::IllegalStateException("Connection uninitialized");
return connection->getClientID();
}
const char *
AMQ_GlobalInit(void)
{
try {
activemq::library::ActiveMQCPP::initializeLibrary();
return NULL;
}
CATCHALL
}
const char *
AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *cn, char *qName, int n)
{
try {
Connection *conn = cn->getConnection();
string queueName (qName);
std::auto_ptr<AMQ_Worker> w (new AMQ_Worker(conn, queueName, n));
*worker = w.release();
return NULL;
}
CATCHALL
}
const char *
AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len)
{
try {
if (worker == NULL)
throw NullPointerException(__FILE__, __LINE__, "AMQ_Worker NULL");
if (data == NULL)
throw IllegalArgumentException(__FILE__, __LINE__, "Data NULL");
string text (data, len);
worker->send(text);
return NULL;
}
CATCHALL
}
const char *
AMQ_Version(AMQ_Worker *worker, char *version, size_t len)
{
try {
strncpy(version, worker->getVersion().c_str(), len);
return NULL;
}
CATCHALL
}
const char *
AMQ_ClientID(AMQ_Worker *worker, char *clientID, size_t len)
{
try {
strncpy(clientID, worker->getClientID().c_str(), len);
return NULL;
}
CATCHALL
}
const char *
AMQ_GetNum(AMQ_Worker *worker, int *n)
{
try {
*n = worker->getNum();
return NULL;
}
CATCHALL
}
const char *
AMQ_WorkerShutdown(AMQ_Worker **worker)
{
try {
delete *worker;
return NULL;
}
CATCHALL
}
const char *
AMQ_GlobalShutdown()
{
try {
activemq::library::ActiveMQCPP::shutdownLibrary();
return NULL;
}
CATCHALL
}
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#ifndef _AMQ_H
#define _AMQ_H
#include "amq_connection.h"
#ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/Queue.h>
#include <cms/MessageProducer.h>
using namespace activemq::core;
using namespace cms;
class AMQ_Worker {
private:
Connection* connection;
Session* session;
Queue* queue;
MessageProducer* producer;
TextMessage* msg;
int num;
AMQ_Worker() {};
public:
static void shutdown();
AMQ_Worker(Connection* connection, std::string& qName, int n,
Session::AcknowledgeMode ackMode, int deliveryMode);
int getNum();
virtual ~AMQ_Worker();
void send(std::string& text);
std::string getVersion();
std::string getClientID();
};
#else
typedef struct AMQ_Worker AMQ_Worker;
#endif
#ifdef __cplusplus
extern "C" {
#endif
const char *AMQ_GlobalInit(void);
const char *AMQ_WorkerInit(AMQ_Worker **worker, AMQ_Connection *connection,
char *qName, int num);
const char *AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len);
const char *AMQ_Version(AMQ_Worker *worker, char *version, size_t len);
const char *AMQ_ClientID(AMQ_Worker *worker, char *clientID, size_t len);
const char *AMQ_GetNum(AMQ_Worker *worker, int *num);
const char *AMQ_WorkerShutdown(AMQ_Worker **worker);
const char *AMQ_GlobalShutdown(void);
#ifdef __cplusplus
}
#endif
#endif /* _AMQ_H */
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "amq_connection.h"
#include <decaf/lang/exceptions/NullPointerException.h>
#define CATCHALL \
catch (CMSException& cex) { \
string *msg = new string(cex.getMessage()); \
return msg->c_str(); \
} \
catch (Throwable& th) { \
return th.what(); \
} \
catch (std::exception& sex) { \
return sex.what(); \
} \
catch (...) { \
return "Unexpected error"; \
}
using namespace std;
using namespace activemq::core;
using namespace cms;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
ActiveMQConnectionFactory* AMQ_Connection::factory = NULL;
AMQ_Connection::AMQ_Connection(std::string& brokerURI) {
if (brokerURI.length() == 0)
throw IllegalArgumentException(__FILE__, __LINE__,
"Broker URI is empty");
factory = new ActiveMQConnectionFactory(brokerURI);
if (factory == NULL)
throw NullPointerException(__FILE__, __LINE__,
"Factory created for %s is NULL",
brokerURI.c_str());
factory->setSendTimeout( 5000 ); // number of milliseconds to avoid blocking during F5 failover
factory->setCloseTimeout( 5000 );
connection = factory->createConnection();
if (connection == NULL)
throw NullPointerException(__FILE__, __LINE__,
"Connection created for %s is NULL",
brokerURI.c_str());
connection->start();
}
Connection *
AMQ_Connection::getConnection() {
return connection;
}
AMQ_Connection::~AMQ_Connection() {
if (connection != NULL) {
connection->close();
delete connection;
connection = NULL;
}
}
const char *
AMQ_ConnectionInit(AMQ_Connection **cn, char *uri)
{
try {
string brokerURI (uri);
auto_ptr<AMQ_Connection> c (new AMQ_Connection(brokerURI));
*cn = c.release();
return NULL;
}
CATCHALL
}
const char *
AMQ_ConnectionShutdown(AMQ_Connection *cn)
{
try {
delete cn;
return NULL;
}
CATCHALL
}
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#ifndef _AMQ_CONNECTION_H
#define _AMQ_CONNECTION_H
#ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
using namespace activemq::core;
using namespace cms;
class AMQ_Connection {
private:
static ActiveMQConnectionFactory* factory;
Connection* connection;
AMQ_Connection() {};
public:
AMQ_Connection(std::string& brokerURI);
Connection* getConnection();
virtual ~AMQ_Connection();
};
#else
typedef struct AMQ_Connection AMQ_Connection;
#endif
#ifdef __cplusplus
extern "C" {
#endif
const char *AMQ_ConnectionInit(AMQ_Connection **priv, char *uri);
const char *AMQ_ConnectionShutdown(AMQ_Connection *priv);
#ifdef __cplusplus
}
#endif
#endif /* _AMQ_CONNECTION_H */
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <assert.h>
#include "mq.h"
#include "config_common.h"
#include "amq.h"
#include "amq_connection.h"
static AMQ_Connection **connections;
static AMQ_Worker **workers;
static unsigned connection = 0;
static unsigned nwrk = 0;
static unsigned n_uris = 0;
static char **uri;
static char qname[BUFSIZ];
static int
conf_add(const char *lval, const char *rval)
{
if (strcmp(lval, "mq.qname") == 0) {
strcpy(qname, rval);
return(0);
}
if (strcmp(lval, "mq.uri") == 0) {
int n = n_uris++;
uri = (char **) realloc(uri, n_uris * sizeof(char **));
if (uri == NULL)
return(errno);
uri[n] = (char *) malloc(strlen(rval) + 1);
if (uri[n] == NULL)
return(errno);
strcpy(uri[n], rval);
return(0);
}
return EINVAL;
}
const char *
MQ_GlobalInit(unsigned nworkers, const char *config_fname)
{
workers = (AMQ_Worker **) calloc(sizeof (AMQ_Worker *), nworkers);
if (workers == NULL)
return strerror(errno);
nwrk = nworkers;
uri = (char **) malloc (sizeof(char **));
if (uri == NULL)
return strerror(errno);
if (CONF_ReadFile(config_fname, conf_add) != 0)
return "Error reading config file for ActiveMQ";
return AMQ_GlobalInit();
}
const char *
MQ_InitConnections(void)
{
AMQ_Connection *conn;
const char *err;
if (n_uris == 0)
return NULL;
connections = (AMQ_Connection **) calloc(sizeof(AMQ_Connection *), nwrk);
if (connections == NULL)
return strerror(errno);
for (int i = 0; i < nwrk; i++) {
err = AMQ_ConnectionInit(&conn, uri[i % n_uris]);
if (err != NULL)
return err;
connections[i] = conn;
}
return NULL;
}
const char *
MQ_WorkerInit(void **priv, int wrk_num)
{
const char *err = NULL;
assert(wrk_num >= 1 && wrk_num <= nwrk);
wrk_num--;
AMQ_Connection *conn = connections[wrk_num];
if (conn == NULL) {
err = AMQ_ConnectionInit(&conn, uri[wrk_num % n_uris]);
if (err != NULL)
return err;
else
connections[wrk_num] = conn;
}
err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname, wrk_num);
if (err == NULL)
workers[wrk_num] = (AMQ_Worker *) *priv;
else
workers[wrk_num] = NULL;
return err;
}
int
MQ_Send(void *priv, const char *data, unsigned len, const char *key,
unsigned keylen, const char **error)
{
/* The ActiveMQ implementation does not use sharding. */
(void) key;
(void) keylen;
*error = AMQ_Send((AMQ_Worker *) priv, data, len);
if (*error != NULL)
return -1;
return 0;
}
const char *
MQ_Reconnect(void **priv)
{
const char *err;
AMQ_Connection *conn;
int wrk_num;
err = AMQ_GetNum((AMQ_Worker *) priv, &wrk_num);
if (err != NULL)
return err;
assert(wrk_num >= 0 && wrk_num < nwrk);
err = AMQ_WorkerShutdown((AMQ_Worker **) priv);
if (err != NULL)
return err;
else
workers[wrk_num] = NULL;
if (connections[wrk_num] != NULL) {
err = AMQ_ConnectionShutdown(connections[wrk_num]);
if (err != NULL)
return err;
connections[wrk_num] = NULL;
}
err = AMQ_ConnectionInit(&conn, uri[connection++ % n_uris]);
if (err != NULL)
return err;
else
connections[wrk_num] = conn;
err = AMQ_WorkerInit((AMQ_Worker **) priv, conn, qname, wrk_num);
if (err != NULL)
workers[wrk_num] = NULL;
else
workers[wrk_num] = (AMQ_Worker *) *priv;
return err;
}
const char *
MQ_Version(void *priv, char *version, size_t len)
{
return AMQ_Version((AMQ_Worker *) priv, version, len);
}
const char *
MQ_ClientID(void *priv, char *clientID, size_t len)
{
return AMQ_ClientID((AMQ_Worker *) priv, clientID, len);
}
const char *
MQ_WorkerShutdown(void **priv, int wrk_num)
{
const char *err;
wrk_num--;
assert(wrk_num >= 0 && wrk_num < nwrk);
if (connections[wrk_num] != NULL) {
err = AMQ_ConnectionShutdown(connections[wrk_num]);
if (err != NULL)
return err;
connections[wrk_num] = NULL;
}
if (workers[wrk_num] != (AMQ_Worker *) *priv)
return "AMQ worker handle not found in worker table";
AMQ_WorkerShutdown((AMQ_Worker **) priv);
if (err != NULL)
return err;
*priv = NULL;
workers[wrk_num] = NULL;
return NULL;
}
const char *
MQ_GlobalShutdown(void)
{
const char *err;
if (n_uris > 0)
free(uri);
for (int i = 0; i < nwrk; i++)
if (connections[i] != NULL
&& (err = AMQ_ConnectionShutdown(connections[i])) != NULL)
return err;
free(connections);
free(workers);
return AMQ_GlobalShutdown();
}
AM_CPPFLAGS = @VARNISH_CFLAGS@ -I$(top_srcdir)/include
TESTS = test_activemq
check_PROGRAMS = test_activemq
test_activemq_SOURCES = \
../../minunit.h \
../../../../include/mq.h \
test_activemq.c
test_activemq_LDADD = \
../amq_connection.$(OBJEXT) \
../amq.$(OBJEXT) \
$(abs_top_srcdir)/src/config_common.$(OBJEXT) \
../mq.$(OBJEXT) \
${PTHREAD_LIBS} \
@AMQ_LIBS@ \
@APR_LIBS@ \
@APU_LIBS@ \
-lstdc++
# test config for ActiveMQ
mq.uri = tcp://localhost:61616
mq.qname = lhoste/tracking/test
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "mq.h"
#include "../../../test/minunit.h"
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define AMQ_CONFIG "activemq.conf"
#define NWORKERS 1
int tests_run = 0;
void *worker;
/* N.B.: Always run the tests in this order */
static char
*test_global_init(void)
{
const char *err;
printf("... testing ActiveMQ global initialization\n");
err = MQ_GlobalInit(NWORKERS, AMQ_CONFIG);
VMASSERT(err == NULL, "MQ_GlobalInit: %s", err);
return NULL;
}
static char
*test_init_connection(void)
{
const char *err;
printf("... testing ActiveMQ connection initialization\n");
err = MQ_InitConnections();
if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
exit(EXIT_SKIPPED);
}
VMASSERT(err == NULL, "MQ_InitConnections: %s", err);
return NULL;
}
static const char
*test_worker_init(void)
{
const char *err;
printf("... testing ActiveMQ worker init\n");
err = MQ_WorkerInit(&worker, NWORKERS);
VMASSERT(err == NULL, "MQ_WorkerInit: %s", err);
MASSERT0(worker != NULL, "Worker is NULL after MQ_WorkerInit");
return NULL;
}
static const char
*test_version(void)
{
const char *err;
char version[BUFSIZ];
printf("... testing ActiveMQ version info\n");
MASSERT0(worker != NULL, "MQ_Version: worker is NULL before call");
err = MQ_Version(worker, version, BUFSIZ);
VMASSERT(err == NULL, "MQ_Version: %s", err);
MASSERT0(version[0] != '\0', "MQ_Version: version is empty");
return NULL;
}
static const char
*test_clientID(void)
{
const char *err;
char clientID[BUFSIZ];
printf("... testing ActiveMQ client ID info\n");
MASSERT0(worker != NULL, "MQ_ClientID: worker is NULL before call");
err = MQ_ClientID(worker, clientID, BUFSIZ);
VMASSERT(err == NULL, "MQ_ClientID: %s", err);
MASSERT0(clientID[0] != '\0', "MQ_ClientID: client ID is empty");
return NULL;
}
static const char
*test_send(void)
{
const char *err;
int ret;
printf("... testing ActiveMQ message send\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
ret = MQ_Send(worker, "foo bar baz quux", 16, "key", 3, &err);
VMASSERT(ret == 0, "MQ_Send: %s", err);
return NULL;
}
static const char
*test_reconnect(void)
{
const char *err;
int ret;
printf("... testing ActiveMQ reconnect\n");
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL before call");
err = MQ_Reconnect(&worker);
VMASSERT(err == NULL, "MQ_Reconnect: %s", err);
MASSERT0(worker != NULL, "MQ_Reconnect: worker is NULL after call");
ret = MQ_Send(worker, "send after reconnect", 20, "key", 3, &err);
VMASSERT(ret == 0, "MQ_Send() fails after reconnect: %s", err);
return NULL;
}
static const char
*test_worker_shutdown(void)
{
const char *err;
int ret;
printf("... testing ActiveMQ worker shutdown\n");
MASSERT0(worker != NULL, "MQ_Send: worker is NULL before call");
err = MQ_WorkerShutdown(&worker, NWORKERS);
VMASSERT(err == NULL, "MQ_WorkerShutdown: %s", err);
MASSERT0(worker == NULL, "Worker not NULL after shutdown");
ret = MQ_Send(worker, "foo bar baz quux", 16, "key", 3, &err);
MASSERT0(ret != 0, "No failure on MQ_Send after worker shutdown");
return NULL;
}
static const char
*test_global_shutdown(void)
{
const char *err;
printf("... testing ActiveMQ global shutdown\n");
err = MQ_GlobalShutdown();
VMASSERT(err == NULL, "MQ_GlobalShutdown: %s", err);
return NULL;
}
static const char
*all_tests(void)
{
mu_run_test(test_global_init);
mu_run_test(test_init_connection);
mu_run_test(test_worker_init);
mu_run_test(test_version);
mu_run_test(test_clientID);
mu_run_test(test_send);
mu_run_test(test_reconnect);
mu_run_test(test_worker_shutdown);
mu_run_test(test_global_shutdown);
return NULL;
}
TEST_RUNNER
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment