Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
b12e6ce9
Commit
b12e6ce9
authored
Nov 14, 2017
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Discontinue support for ActiveMQ.
parent
dfe07d59
Changes
14
Show whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
22 additions
and
1233 deletions
+22
-1233
.gitignore
.gitignore
+0
-3
Makefile.am
Makefile.am
+2
-6
README.rst
README.rst
+20
-34
configure.ac
configure.ac
+0
-40
Makefile.am
src/mq/activemq/Makefile.am
+0
-34
README.rst
src/mq/activemq/README.rst
+0
-173
amq.cpp
src/mq/activemq/amq.cpp
+0
-204
amq.h
src/mq/activemq/amq.h
+0
-91
amq_connection.cpp
src/mq/activemq/amq_connection.cpp
+0
-113
amq_connection.h
src/mq/activemq/amq_connection.h
+0
-69
mq.c
src/mq/activemq/mq.c
+0
-237
Makefile.am
src/mq/activemq/test/Makefile.am
+0
-21
activemq.conf
src/mq/activemq/test/activemq.conf
+0
-3
test_activemq.c
src/mq/activemq/test/test_activemq.c
+0
-205
No files found.
.gitignore
View file @
b12e6ce9
...
...
@@ -51,7 +51,6 @@ src/test/test_mq
src/test/test_parse
src/test/test_spmcq
src/test/test_worker
src/mq/activemq/test/test_activemq
src/mq/kafka/test/test_kafka
src/mq/kafka/test/test_partition
src/mq/kafka/test/test_send
...
...
@@ -73,8 +72,6 @@ src/mq/kafka/test/zoo.log
# make check artifacts
test-suite.log
/test-driver
src/mq/activemq/test/test_*.log
src/mq/activemq/test/test_*.trs
src/mq/kafka/test/test_*.log
src/mq/kafka/test/test_*.trs
src/test/test_*.log
...
...
Makefile.am
View file @
b12e6ce9
ACLOCAL_AMFLAGS
=
-I
m4
if
AMQ_OPT
MAYBE_AMQ
=
src/mq/activemq src/mq/activemq/test
endif
if
KAFKA_OPT
MAYBE_KAFKA
=
src/mq/kafka src/mq/kafka/test
endif
SUBDIRS
=
src src/mq/file src/test
$(MAYBE_
AMQ)
$(MAYBE_
KAFKA)
SUBDIRS
=
src src/mq/file src/test
$(MAYBE_KAFKA)
if
HAVE_RST2MAN
dist_man_MANS
=
trackrdrd.3
...
...
README.rst
View file @
b12e6ce9
...
...
@@ -9,7 +9,7 @@ Tracking Log Reader demon
-------------------------
:Author: Geoffrey Simmons
:Date: 2017-
09-08
:Date: 2017-
11-14
:Version: trunk
:Manual section: 3
...
...
@@ -29,7 +29,7 @@ DESCRIPTION
The ``trackrdrd`` demon reads from the shared memory log of a running
instance of Varnish, aggregates data logged in a specific format for
requests and ESI subrequests, and forwards the data to a messaging
system, such as
ActiveMQ or
Kafka.
system, such as Kafka.
``trackrdrd`` reads data from ``VCL_Log`` entries that are displayed
in this format by the ``varnishlog`` tool for client request
...
...
@@ -92,9 +92,9 @@ declared in the MQ interface in ``include/mq.h``. See ``mq.h`` for
documentation of the interface.
The source distribution for ``trackrdrd`` includes implementations of
the MQ interface for Kafka
, ActiveMQ and for file output (the latter
for testing and debugging); see libtrackrdr-kafka(3),
libtrackrdr-activemq(3) and libtrackrdr-file(3) for
details.
the MQ interface for Kafka
and for file output (the latter for testing
and debugging); see libtrackrdr-kafka(3) and libtrackrdr-file(3) for
details.
EXAMPLE
=======
...
...
@@ -235,16 +235,10 @@ Zookeeper (``libzookeeper_mt``)::
https://github.com/edenhill/librdkafka
http://zookeeper.apache.org/
To build the messaging plugin for ActiveMQ (``libtrackrdr-activemq``)
it is neccessary to link with the CMS or ActiveMQ-CPP library
(``libactivemq-cpp``). The sources can be obtained from::
http://activemq.apache.org/cms/
The messaging plugins for Kafka and ActiveMQ are optional, and you can
choose to disable the builds of either or both of them in the
``configure`` step, as explained below. Requirements do not need to be
met for plugins that are not built.
The messaging plugin for Kafka is optional, and you can choose to
disable its build in the ``configure`` step, as explained
below. Requirements do not need to be met for plugins that are not
built.
Building and installing trackrdrd
---------------------------------
...
...
@@ -257,11 +251,6 @@ machine (with the same architecture on which they will run), then they
will likely match by default. When in doubt, set compile-time flags
such as ``CFLAGS=-m64`` for ``gcc``.
For ActiveMQ, the flag ``CXXFLAGS`` should be set similarly to
``CFLAGS``, because C++ code is also compiled (unless you choose to
disable the ActiveMQ plugin). Settings for ``CXXFLAGS`` can be
obtained from ``pkg-config --cflags apr-1``.
At minimum, run these steps::
$ git clone $TRACKRDRD_GIT_URL
...
...
@@ -288,10 +277,10 @@ be shown with::
$ configure --help
To disable the build of the Kafka
or ActiveMQ MQ implementations,
specify the options ``--disable-kafka`` or ``disable-activemq`` for
``configure``. Both are enabled by default. A file output plugin,
suitable for testing and debugging, is
always built.
To disable the build of the Kafka
MQ implementation, specify the
option ``--disable-kafka`` for ``configure``. It is enabled by
default. A file output plugin, suitable for testing and debugging, is
always built.
To specify a non-standard installation prefix, add the ``--prefix``
option::
...
...
@@ -309,8 +298,8 @@ a non-standard location, set these env variables before running
* export PATH=$PREFIX/bin:$PREFIX/sbin:$PATH
``PKG_CONFIG_PATH`` might also have to include pkg-config directories
for other requirements, such as the
ActiveMQ C++ libraries, if they
have
been installed into non-default locations.
for other requirements, such as the
Kafka client library, if they have
been installed into non-default locations.
If the Varnish installation against which ``trackrdrd`` is *run* has a
non-standard location, it is necessary to specify runtime paths to the
...
...
@@ -341,17 +330,15 @@ Building and installing packaged MQ implementations
---------------------------------------------------
The ``trackrdrd`` distribution includes implementations of the MQ
interface for Kafka and ActiveMQ message brokers, as well as the file
output plugin. For details of the builds and their dependencies, see
libtrackrdr-kafka(3), libtrackrdr-activemq(3) and libtrackrdr-file(3)
(``README.rst`` in ``src/mq/kafka``, ``src/mq/activemq`` and
``src/mq/file``).
interface for the Kafka message broker as well as the file output
plugin. For details of the builds and their dependencies, see
libtrackrdr-kafka(3) and libtrackrdr-file(3) (``README.rst`` in
``src/mq/kafka`` and ``src/mq/file``).
The global make targets for ``trackrdrd`` also build the MQ
implementations, unless their builds are disabled in the ``configure``
step as explained above. If they are enabled, then it is necessary to
configure the build for them as well, for example by setting
``CXXFLAGS`` to compile C++ sources.
configure the build for them as well.
STARTUP AND SHUTDOWN
====================
...
...
@@ -821,7 +808,6 @@ SEE ALSO
* ``varnishd(1)``
* ``libtrackrdr-file(3)``
* ``libtrackrdr-kafka(3)``
* ``libtrackrdr-activemq(3)``
* ``ld.so(8)``
* ``syslog(3)``
...
...
configure.ac
View file @
b12e6ce9
...
...
@@ -16,19 +16,16 @@ LT_INIT
AC_GNU_SOURCE
AC_PROG_CC
AC_PROG_CXX
AC_PROG_CC_STDC
if test "x$ac_cv_prog_cc_c99" = xno; then
AC_MSG_ERROR([Could not find a C99 compatible compiler])
fi
AC_PROG_CPP
AC_PROG_CXXCPP
AX_PTHREAD(,[AC_MSG_ERROR([Could not configure pthreads support])])
LIBS="$PTHREAD_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
CXXFLAGS="$CXXFLAGS -std=c++98 $PTHREAD_CFLAGS"
CC="$PTHREAD_CC"
AC_PROG_INSTALL
...
...
@@ -96,17 +93,6 @@ AS_IF([test "x$enable_kafka" != xno],
AC_MSG_ERROR([librdkafka is required]))
], [])
AC_ARG_ENABLE([activemq], [AS_HELP_STRING([--enable-activemq],
[build the ActiveMQ MQ plugin @<:@default=yes@:>@])],
[], [enable_activemq=yes])
AM_CONDITIONAL([AMQ_OPT], [test "$enable_activemq" = yes])
AS_IF([test "x$enable_activemq" != xno],
[
PKG_CHECK_MODULES([AMQ], [activemq-cpp])
PKG_CHECK_MODULES([APR], [apr-1])
PKG_CHECK_MODULES([APU], [apr-util-1])
], [])
# From Varnish configure.ac
# Now that we're done using the compiler to look for functions and
# libraries, set CFLAGS to what we want them to be for our own code
...
...
@@ -133,27 +119,6 @@ DEVELOPER_CFLAGS=" \
-Wno-sign-compare \
"
# Added for compiling C++ for ActiveMQ.
# -Wstrict-prototypes, -Wmissing-prototypes & -Wnested-externs invalid for C++
# activemq-cpp fails on: -Wshadow, -Wcast-qual,-Woverloaded-virtual and
# -Wunused-parameter
DEVELOPER_CXXFLAGS=" \
-Werror \
-Wall \
-Wno-format-y2k \
-W \
-Wpointer-arith \
-Wreturn-type \
-Wwrite-strings \
-Wswitch \
-Wcast-align \
-Wchar-subscripts \
-Wextra \
-Wno-sign-compare \
-Wno-overloaded-virtual \
-Wno-unused-parameter \
"
# These are not compliable yet
DEVELOPER_GCC_CFLAGS="-Wold-style-definition -Wredundant-decls "
#DEVELOPER_CFLAGS="${DEVELOPER_CFLAGS} ${DEVELOPER_GCC_CFLAGS}"
...
...
@@ -184,17 +149,14 @@ AS_IF([test "x$enable_stack_protector" != xno],
AC_ARG_ENABLE(developer-warnings,
AS_HELP_STRING([--enable-developer-warnings],[enable strict warnings (default is NO)]),
CFLAGS="${CFLAGS} ${DEVELOPER_CFLAGS}"
CXXFLAGS="${CXXFLAGS} ${DEVELOPER_CXXFLAGS}"
)
# --enable-debugging-symbols
AC_ARG_ENABLE(debugging-symbols,
AS_HELP_STRING([--enable-debugging-symbols],[enable debugging symbols (default is NO)]),
CFLAGS="${CFLAGS} -O0 -g -fno-inline"
CXXFLAGS="${CXXFLAGS} -O0 -g -fno-inline"
)
## From activemq-cpp configure.ac
## Execute Doxygen macros
DX_HTML_FEATURE(ON)
DX_CHM_FEATURE(OFF)
...
...
@@ -211,8 +173,6 @@ AC_CONFIG_FILES([
src/Makefile
src/mq/file/Makefile
src/test/Makefile
src/mq/activemq/Makefile
src/mq/activemq/test/Makefile
src/mq/kafka/Makefile
src/mq/kafka/test/Makefile
])
...
...
src/mq/activemq/Makefile.am
deleted
100644 → 0
View file @
dfe07d59
AUTOMAKE_OPTIONS
=
subdir-objects
AM_CPPFLAGS
=
@VARNISH_CFLAGS@
-I
$(top_srcdir)
/include
\
@AMQ_CFLAGS@ @APR_CFLAGS@ @APU_CFLAGS@
pkglib_LTLIBRARIES
=
libtrackrdr-activemq.la
libtrackrdr_activemq_la_SOURCES
=
\
$(top_srcdir)
/include/mq.h
\
$(top_srcdir)
/include/config_common.h
\
mq.c
\
amq.h
\
amq.cpp
\
amq_connection.h
\
amq_connection.cpp
\
$(top_srcdir)
/src/config_common.c
libtrackrdr_activemq_la_LIBADD
=
\
${
PTHREAD_LIBS
}
\
@AMQ_LIBS@
\
@APR_LIBS@
\
@APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS
=
-version-info
5:0:0
if
HAVE_RST2MAN
dist_man_MANS
=
libtrackrdr-activemq.3
MAINTAINERCLEANFILES
=
$(dist_man_MANS)
endif
libtrackrdr-activemq.3
:
README.rst
if
HAVE_RST2MAN
${RST2MAN}
README.rst
$@
endif
src/mq/activemq/README.rst
deleted
100644 → 0
View file @
dfe07d59
.. _ref-trackrdrd:
=====================
libtrackrdr-activemq
=====================
-----------------------------------------------------------------------
ActiveMQ implementation of the MQ interface for the Tracking Log Reader
-----------------------------------------------------------------------
:Author: Geoffrey Simmons
:Date: 2015-04-26
:Version: 3.0
:Manual section: 3
DESCRIPTION
===========
``libtrackrdr-activemq.so`` provides an implementation of the tracking
reader's MQ interface to send messages to ActiveMQ message
brokers. See ``include/mq.h`` in the ``trackrdrd`` source distribution
for documentation of the interface.
To use this implementation with ``trackrdrd``, specify the shared
object as the value of ``mq.module`` in the tracking reader's
configuration (see trackrdrd(3)). The configuration value may be the
absolute path of the shared object; or its name, provided that it can
be found by the dynamic linker (see ld.so(8)).
``libtrackrdr-activemq`` also requires a configuration file, whose
path is specified as ``mq.config_fname`` in the configuration of
``trackrdrd``.
``libtrackrdrd-activemq`` in turn depends on the library ActiveMQ-CPP
(or CMS), a client library for ActiveMQ, packaged on many systems as
``libactivemq-cpp``. The dynamic linker must also be able to find
``libactivemq-cpp.so`` at runtime.
This implementation does not use sharding keys; the key data in the
call to ``MQ_Send()`` are silently discarded.
BUILD/INSTALL
=============
The sources for ``libtrackrdr-activemq`` are provided in the source
repository for ``trackrdrd``, in the subdirectory
``src/mq/activemq/``.
The sources for ActiveMQ-CPP can be obtained from::
http://activemq.apache.org/cms/
Building and installing ActiveMQ-CPP
------------------------------------
The ActiveMQ interface has been tested with versions 3.4.4 through
3.8.2 of ActiveMQ-CPP. If the library ``libactivemq-cpp`` is already
installed on the platform where ``trackrdrd`` will run, then no
further action is necessary. To build the library from source, follow
the instructions in the ``README.txt`` file of its source
distribution.
Building and installing libtrackrdr-activemq
--------------------------------------------
``libactivemq-trackrdr`` is built as part of the global build for
``trackrdrd``; for details and requirements of the build, see
trackrdrd(3). Note that for the ActiveMQ implementation, which
includes C++ classes, it is necessary to configure the build for C++
compilation (for example by setting ``CXXFLAGS``), and it may be
necessary to add additional configuration for compiling and linking
with the ActiveMQ-CPP library, for example as obtained from
``pkg-config --cflags apr-1``.
To specifically build the MQ implementation (without building all of
the rest of ``trackrdrd``), it suffices to invoke ``make`` commands in
the subdirectory ``src/mq/activemq`` (after having executed the
``configure`` script for ``trackrdrd``)::
# in the trackrdrd repo
$ cd src/mq/activemq
$ make
For self-tests after the build, run::
$ make check
If a connection is open at port 61616 on the host where the self-tests
are run, then it is assumed that an ActiveMQ message broker is
listening, and tests are run against the MQ URI
``tcp://localhost:61616``. If the port is not open, then the ``make
check`` test exits with the status ``SKIPPED``.
(When ``make check`` is run globally for the ``trackrdrd`` build, then
it is also run for ``libtrackrdr-activemq``).
To install the shared object ``libtrackrdr-activemq``, run ``make
install`` as root, for example with ``sudo``::
$ sudo make install
In standard configuration, the ``.so`` file will be installed by
``libtool(1)``, and its location may be affected by the ``--libdir``
option to ``configure``.
CONFIGURATION
=============
As mentioned above, a configuration file for ``libtrackrdr-activemq``
must be specified in the configuration parameter ``mq.config_fname``
for ``trackrdrd``, and initialization of the MQ implementation fails
if this file cannot be found or read by the process owner of
``trackrdrd`` (or if its syntax is false, or if either of the two
required parameters are missing).
The syntax of the configuration file is the same as that of
``trackrdrd``. Both of the parameters ``mq.uri`` and ``mq.qname`` are
required (there are no default values), and (only) ``mq.uri`` may be
specified more than once.
================== ============================================================
Parameter Description
================== ============================================================
``mq.uri`` URIs for the message broker. If more than one MQ URI is
specified, than worker threads distribute their connections
to the different message brokers.
------------------ ------------------------------------------------------------
``mq.qname`` Name of the queue (destination) to which messages are sent
at the message broker(s)
================== ============================================================
SEE ALSO
========
* ``trackrdrd(3)``
* ``ld.so(8)``
* http://activemq.apache.org/cms/
COPYRIGHT AND LICENCE
=====================
For both the software and this document are governed by a BSD 2-clause
licence.
| Copyright (c) 2012-2015 UPLEX Nils Goroll Systemoptimierung
| Copyright (c) 2012-2015 Otto Gmbh & Co KG
| All rights reserved
| Use only with permission
| Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
src/mq/activemq/amq.cpp
deleted
100644 → 0
View file @
dfe07d59
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "amq.h"
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/core/ActiveMQConnection.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <cms/IllegalStateException.h>
#include <cms/ConnectionMetaData.h>
#define CATCHALL \
catch (CMSException& cex) { \
return cex.what(); \
} \
catch (Throwable& th) { \
return th.what(); \
} \
catch (std::exception& sex) { \
return sex.what(); \
} \
catch (...) { \
return "Unexpected error"; \
}
using
namespace
std
;
using
namespace
activemq
::
core
;
using
namespace
cms
;
using
namespace
decaf
::
lang
;
using
namespace
decaf
::
lang
::
exceptions
;
AMQ_Worker
::
AMQ_Worker
(
Connection
*
cn
,
std
::
string
&
qName
,
int
n
,
Session
::
AcknowledgeMode
ackMode
=
Session
::
AUTO_ACKNOWLEDGE
,
int
deliveryMode
=
DeliveryMode
::
NON_PERSISTENT
)
{
connection
=
cn
;
num
=
n
;
session
=
connection
->
createSession
(
ackMode
);
queue
=
session
->
createQueue
(
qName
);
producer
=
session
->
createProducer
(
queue
);
producer
->
setDeliveryMode
(
deliveryMode
);
msg
=
session
->
createTextMessage
();
}
AMQ_Worker
::~
AMQ_Worker
()
{
if
(
queue
!=
NULL
)
{
delete
queue
;
queue
=
NULL
;
}
if
(
producer
!=
NULL
)
{
delete
producer
;
producer
=
NULL
;
}
if
(
session
!=
NULL
)
{
session
->
stop
();
delete
session
;
session
=
NULL
;
}
}
int
AMQ_Worker
::
getNum
()
{
return
num
;
}
void
AMQ_Worker
::
send
(
std
::
string
&
text
)
{
if
(
msg
==
NULL
||
producer
==
NULL
)
throw
cms
::
IllegalStateException
(
"Worker fields are NULL"
);
msg
->
setText
(
text
);
producer
->
send
(
msg
);
}
std
::
string
AMQ_Worker
::
getVersion
()
{
if
(
connection
==
NULL
)
throw
cms
::
IllegalStateException
(
"Connection uninitialized"
);
const
ConnectionMetaData
*
md
=
connection
->
getMetaData
();
return
md
->
getCMSProviderName
()
+
" "
+
md
->
getProviderVersion
();
}
std
::
string
AMQ_Worker
::
getClientID
()
{
if
(
connection
==
NULL
)
throw
cms
::
IllegalStateException
(
"Connection uninitialized"
);
return
connection
->
getClientID
();
}
const
char
*
AMQ_GlobalInit
(
void
)
{
try
{
activemq
::
library
::
ActiveMQCPP
::
initializeLibrary
();
return
NULL
;
}
CATCHALL
}
const
char
*
AMQ_WorkerInit
(
AMQ_Worker
**
worker
,
AMQ_Connection
*
cn
,
char
*
qName
,
int
n
)
{
try
{
Connection
*
conn
=
cn
->
getConnection
();
string
queueName
(
qName
);
std
::
auto_ptr
<
AMQ_Worker
>
w
(
new
AMQ_Worker
(
conn
,
queueName
,
n
));
*
worker
=
w
.
release
();
return
NULL
;
}
CATCHALL
}
const
char
*
AMQ_Send
(
AMQ_Worker
*
worker
,
const
char
*
data
,
unsigned
len
)
{
try
{
if
(
worker
==
NULL
)
throw
NullPointerException
(
__FILE__
,
__LINE__
,
"AMQ_Worker NULL"
);
if
(
data
==
NULL
)
throw
IllegalArgumentException
(
__FILE__
,
__LINE__
,
"Data NULL"
);
string
text
(
data
,
len
);
worker
->
send
(
text
);
return
NULL
;
}
CATCHALL
}
const
char
*
AMQ_Version
(
AMQ_Worker
*
worker
,
char
*
version
,
size_t
len
)
{
try
{
strncpy
(
version
,
worker
->
getVersion
().
c_str
(),
len
);
return
NULL
;
}
CATCHALL
}
const
char
*
AMQ_ClientID
(
AMQ_Worker
*
worker
,
char
*
clientID
,
size_t
len
)
{
try
{
strncpy
(
clientID
,
worker
->
getClientID
().
c_str
(),
len
);
return
NULL
;
}
CATCHALL
}
const
char
*
AMQ_GetNum
(
AMQ_Worker
*
worker
,
int
*
n
)
{
try
{
*
n
=
worker
->
getNum
();
return
NULL
;
}
CATCHALL
}
const
char
*
AMQ_WorkerShutdown
(
AMQ_Worker
**
worker
)
{
try
{
delete
*
worker
;
return
NULL
;
}
CATCHALL
}
const
char
*
AMQ_GlobalShutdown
()
{
try
{
activemq
::
library
::
ActiveMQCPP
::
shutdownLibrary
();
return
NULL
;
}
CATCHALL
}
src/mq/activemq/amq.h
deleted
100644 → 0
View file @
dfe07d59
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#ifndef _AMQ_H
#define _AMQ_H
#include "amq_connection.h"
#ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/Queue.h>
#include <cms/MessageProducer.h>
using
namespace
activemq
::
core
;
using
namespace
cms
;
class
AMQ_Worker
{
private
:
Connection
*
connection
;
Session
*
session
;
Queue
*
queue
;
MessageProducer
*
producer
;
TextMessage
*
msg
;
int
num
;
AMQ_Worker
()
{};
public
:
static
void
shutdown
();
AMQ_Worker
(
Connection
*
connection
,
std
::
string
&
qName
,
int
n
,
Session
::
AcknowledgeMode
ackMode
,
int
deliveryMode
);
int
getNum
();
virtual
~
AMQ_Worker
();
void
send
(
std
::
string
&
text
);
std
::
string
getVersion
();
std
::
string
getClientID
();
};
#else
typedef
struct
AMQ_Worker
AMQ_Worker
;
#endif
#ifdef __cplusplus
extern
"C"
{
#endif
const
char
*
AMQ_GlobalInit
(
void
);
const
char
*
AMQ_WorkerInit
(
AMQ_Worker
**
worker
,
AMQ_Connection
*
connection
,
char
*
qName
,
int
num
);
const
char
*
AMQ_Send
(
AMQ_Worker
*
worker
,
const
char
*
data
,
unsigned
len
);
const
char
*
AMQ_Version
(
AMQ_Worker
*
worker
,
char
*
version
,
size_t
len
);
const
char
*
AMQ_ClientID
(
AMQ_Worker
*
worker
,
char
*
clientID
,
size_t
len
);
const
char
*
AMQ_GetNum
(
AMQ_Worker
*
worker
,
int
*
num
);
const
char
*
AMQ_WorkerShutdown
(
AMQ_Worker
**
worker
);
const
char
*
AMQ_GlobalShutdown
(
void
);
#ifdef __cplusplus
}
#endif
#endif
/* _AMQ_H */
src/mq/activemq/amq_connection.cpp
deleted
100644 → 0
View file @
dfe07d59
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "amq_connection.h"
#include <decaf/lang/exceptions/NullPointerException.h>
#define CATCHALL \
catch (CMSException& cex) { \
string *msg = new string(cex.getMessage()); \
return msg->c_str(); \
} \
catch (Throwable& th) { \
return th.what(); \
} \
catch (std::exception& sex) { \
return sex.what(); \
} \
catch (...) { \
return "Unexpected error"; \
}
using
namespace
std
;
using
namespace
activemq
::
core
;
using
namespace
cms
;
using
namespace
decaf
::
lang
;
using
namespace
decaf
::
lang
::
exceptions
;
ActiveMQConnectionFactory
*
AMQ_Connection
::
factory
=
NULL
;
AMQ_Connection
::
AMQ_Connection
(
std
::
string
&
brokerURI
)
{
if
(
brokerURI
.
length
()
==
0
)
throw
IllegalArgumentException
(
__FILE__
,
__LINE__
,
"Broker URI is empty"
);
factory
=
new
ActiveMQConnectionFactory
(
brokerURI
);
if
(
factory
==
NULL
)
throw
NullPointerException
(
__FILE__
,
__LINE__
,
"Factory created for %s is NULL"
,
brokerURI
.
c_str
());
factory
->
setSendTimeout
(
5000
);
// number of milliseconds to avoid blocking during F5 failover
factory
->
setCloseTimeout
(
5000
);
connection
=
factory
->
createConnection
();
if
(
connection
==
NULL
)
throw
NullPointerException
(
__FILE__
,
__LINE__
,
"Connection created for %s is NULL"
,
brokerURI
.
c_str
());
connection
->
start
();
}
Connection
*
AMQ_Connection
::
getConnection
()
{
return
connection
;
}
AMQ_Connection
::~
AMQ_Connection
()
{
if
(
connection
!=
NULL
)
{
connection
->
close
();
delete
connection
;
connection
=
NULL
;
}
}
const
char
*
AMQ_ConnectionInit
(
AMQ_Connection
**
cn
,
char
*
uri
)
{
try
{
string
brokerURI
(
uri
);
auto_ptr
<
AMQ_Connection
>
c
(
new
AMQ_Connection
(
brokerURI
));
*
cn
=
c
.
release
();
return
NULL
;
}
CATCHALL
}
const
char
*
AMQ_ConnectionShutdown
(
AMQ_Connection
*
cn
)
{
try
{
delete
cn
;
return
NULL
;
}
CATCHALL
}
src/mq/activemq/amq_connection.h
deleted
100644 → 0
View file @
dfe07d59
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#ifndef _AMQ_CONNECTION_H
#define _AMQ_CONNECTION_H
#ifdef __cplusplus
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
using
namespace
activemq
::
core
;
using
namespace
cms
;
class
AMQ_Connection
{
private
:
static
ActiveMQConnectionFactory
*
factory
;
Connection
*
connection
;
AMQ_Connection
()
{};
public
:
AMQ_Connection
(
std
::
string
&
brokerURI
);
Connection
*
getConnection
();
virtual
~
AMQ_Connection
();
};
#else
typedef
struct
AMQ_Connection
AMQ_Connection
;
#endif
#ifdef __cplusplus
extern
"C"
{
#endif
const
char
*
AMQ_ConnectionInit
(
AMQ_Connection
**
priv
,
char
*
uri
);
const
char
*
AMQ_ConnectionShutdown
(
AMQ_Connection
*
priv
);
#ifdef __cplusplus
}
#endif
#endif
/* _AMQ_CONNECTION_H */
src/mq/activemq/mq.c
deleted
100644 → 0
View file @
dfe07d59
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <assert.h>
#include "mq.h"
#include "config_common.h"
#include "amq.h"
#include "amq_connection.h"
static
AMQ_Connection
**
connections
;
static
AMQ_Worker
**
workers
;
static
unsigned
connection
=
0
;
static
unsigned
nwrk
=
0
;
static
unsigned
n_uris
=
0
;
static
char
**
uri
;
static
char
qname
[
BUFSIZ
];
static
int
conf_add
(
const
char
*
lval
,
const
char
*
rval
)
{
if
(
strcmp
(
lval
,
"mq.qname"
)
==
0
)
{
strcpy
(
qname
,
rval
);
return
(
0
);
}
if
(
strcmp
(
lval
,
"mq.uri"
)
==
0
)
{
int
n
=
n_uris
++
;
uri
=
(
char
**
)
realloc
(
uri
,
n_uris
*
sizeof
(
char
**
));
if
(
uri
==
NULL
)
return
(
errno
);
uri
[
n
]
=
(
char
*
)
malloc
(
strlen
(
rval
)
+
1
);
if
(
uri
[
n
]
==
NULL
)
return
(
errno
);
strcpy
(
uri
[
n
],
rval
);
return
(
0
);
}
return
EINVAL
;
}
const
char
*
MQ_GlobalInit
(
unsigned
nworkers
,
const
char
*
config_fname
)
{
workers
=
(
AMQ_Worker
**
)
calloc
(
sizeof
(
AMQ_Worker
*
),
nworkers
);
if
(
workers
==
NULL
)
return
strerror
(
errno
);
nwrk
=
nworkers
;
uri
=
(
char
**
)
malloc
(
sizeof
(
char
**
));
if
(
uri
==
NULL
)
return
strerror
(
errno
);
if
(
CONF_ReadFile
(
config_fname
,
conf_add
)
!=
0
)
return
"Error reading config file for ActiveMQ"
;
return
AMQ_GlobalInit
();
}
const
char
*
MQ_InitConnections
(
void
)
{
AMQ_Connection
*
conn
;
const
char
*
err
;
if
(
n_uris
==
0
)
return
NULL
;
connections
=
(
AMQ_Connection
**
)
calloc
(
sizeof
(
AMQ_Connection
*
),
nwrk
);
if
(
connections
==
NULL
)
return
strerror
(
errno
);
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
{
err
=
AMQ_ConnectionInit
(
&
conn
,
uri
[
i
%
n_uris
]);
if
(
err
!=
NULL
)
return
err
;
connections
[
i
]
=
conn
;
}
return
NULL
;
}
const
char
*
MQ_WorkerInit
(
void
**
priv
,
int
wrk_num
)
{
const
char
*
err
=
NULL
;
assert
(
wrk_num
>=
1
&&
wrk_num
<=
nwrk
);
wrk_num
--
;
AMQ_Connection
*
conn
=
connections
[
wrk_num
];
if
(
conn
==
NULL
)
{
err
=
AMQ_ConnectionInit
(
&
conn
,
uri
[
wrk_num
%
n_uris
]);
if
(
err
!=
NULL
)
return
err
;
else
connections
[
wrk_num
]
=
conn
;
}
err
=
AMQ_WorkerInit
((
AMQ_Worker
**
)
priv
,
conn
,
qname
,
wrk_num
);
if
(
err
==
NULL
)
workers
[
wrk_num
]
=
(
AMQ_Worker
*
)
*
priv
;
else
workers
[
wrk_num
]
=
NULL
;
return
err
;
}
int
MQ_Send
(
void
*
priv
,
const
char
*
data
,
unsigned
len
,
const
char
*
key
,
unsigned
keylen
,
const
char
**
error
)
{
/* The ActiveMQ implementation does not use sharding. */
(
void
)
key
;
(
void
)
keylen
;
*
error
=
AMQ_Send
((
AMQ_Worker
*
)
priv
,
data
,
len
);
if
(
*
error
!=
NULL
)
return
-
1
;
return
0
;
}
const
char
*
MQ_Reconnect
(
void
**
priv
)
{
const
char
*
err
;
AMQ_Connection
*
conn
;
int
wrk_num
;
err
=
AMQ_GetNum
((
AMQ_Worker
*
)
priv
,
&
wrk_num
);
if
(
err
!=
NULL
)
return
err
;
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
err
=
AMQ_WorkerShutdown
((
AMQ_Worker
**
)
priv
);
if
(
err
!=
NULL
)
return
err
;
else
workers
[
wrk_num
]
=
NULL
;
if
(
connections
[
wrk_num
]
!=
NULL
)
{
err
=
AMQ_ConnectionShutdown
(
connections
[
wrk_num
]);
if
(
err
!=
NULL
)
return
err
;
connections
[
wrk_num
]
=
NULL
;
}
err
=
AMQ_ConnectionInit
(
&
conn
,
uri
[
connection
++
%
n_uris
]);
if
(
err
!=
NULL
)
return
err
;
else
connections
[
wrk_num
]
=
conn
;
err
=
AMQ_WorkerInit
((
AMQ_Worker
**
)
priv
,
conn
,
qname
,
wrk_num
);
if
(
err
!=
NULL
)
workers
[
wrk_num
]
=
NULL
;
else
workers
[
wrk_num
]
=
(
AMQ_Worker
*
)
*
priv
;
return
err
;
}
const
char
*
MQ_Version
(
void
*
priv
,
char
*
version
,
size_t
len
)
{
return
AMQ_Version
((
AMQ_Worker
*
)
priv
,
version
,
len
);
}
const
char
*
MQ_ClientID
(
void
*
priv
,
char
*
clientID
,
size_t
len
)
{
return
AMQ_ClientID
((
AMQ_Worker
*
)
priv
,
clientID
,
len
);
}
const
char
*
MQ_WorkerShutdown
(
void
**
priv
,
int
wrk_num
)
{
const
char
*
err
;
wrk_num
--
;
assert
(
wrk_num
>=
0
&&
wrk_num
<
nwrk
);
if
(
connections
[
wrk_num
]
!=
NULL
)
{
err
=
AMQ_ConnectionShutdown
(
connections
[
wrk_num
]);
if
(
err
!=
NULL
)
return
err
;
connections
[
wrk_num
]
=
NULL
;
}
if
(
workers
[
wrk_num
]
!=
(
AMQ_Worker
*
)
*
priv
)
return
"AMQ worker handle not found in worker table"
;
AMQ_WorkerShutdown
((
AMQ_Worker
**
)
priv
);
if
(
err
!=
NULL
)
return
err
;
*
priv
=
NULL
;
workers
[
wrk_num
]
=
NULL
;
return
NULL
;
}
const
char
*
MQ_GlobalShutdown
(
void
)
{
const
char
*
err
;
if
(
n_uris
>
0
)
free
(
uri
);
for
(
int
i
=
0
;
i
<
nwrk
;
i
++
)
if
(
connections
[
i
]
!=
NULL
&&
(
err
=
AMQ_ConnectionShutdown
(
connections
[
i
]))
!=
NULL
)
return
err
;
free
(
connections
);
free
(
workers
);
return
AMQ_GlobalShutdown
();
}
src/mq/activemq/test/Makefile.am
deleted
100644 → 0
View file @
dfe07d59
AM_CPPFLAGS
=
@VARNISH_CFLAGS@
-I
$(top_srcdir)
/include
TESTS
=
test_activemq
check_PROGRAMS
=
test_activemq
test_activemq_SOURCES
=
\
../../minunit.h
\
../../../../include/mq.h
\
test_activemq.c
test_activemq_LDADD
=
\
../amq_connection.
$(OBJEXT)
\
../amq.
$(OBJEXT)
\
$(abs_top_srcdir)
/src/config_common.
$(OBJEXT)
\
../mq.
$(OBJEXT)
\
${
PTHREAD_LIBS
}
\
@AMQ_LIBS@
\
@APR_LIBS@
\
@APU_LIBS@
\
-lstdc
++
src/mq/activemq/test/activemq.conf
deleted
100644 → 0
View file @
dfe07d59
# test config for ActiveMQ
mq
.
uri
=
tcp
://
localhost
:
61616
mq
.
qname
=
lhoste
/
tracking
/
test
src/mq/activemq/test/test_activemq.c
deleted
100644 → 0
View file @
dfe07d59
/*-
* Copyright (c) 2012-2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012-2014 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "mq.h"
#include "../../../test/minunit.h"
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define AMQ_CONFIG "activemq.conf"
#define NWORKERS 1
int
tests_run
=
0
;
void
*
worker
;
/* N.B.: Always run the tests in this order */
static
char
*
test_global_init
(
void
)
{
const
char
*
err
;
printf
(
"... testing ActiveMQ global initialization
\n
"
);
err
=
MQ_GlobalInit
(
NWORKERS
,
AMQ_CONFIG
);
VMASSERT
(
err
==
NULL
,
"MQ_GlobalInit: %s"
,
err
);
return
NULL
;
}
static
char
*
test_init_connection
(
void
)
{
const
char
*
err
;
printf
(
"... testing ActiveMQ connection initialization
\n
"
);
err
=
MQ_InitConnections
();
if
(
err
!=
NULL
&&
strstr
(
err
,
"Connection refused"
)
!=
NULL
)
{
printf
(
"Connection refused, ActiveMQ assumed not running
\n
"
);
exit
(
EXIT_SKIPPED
);
}
VMASSERT
(
err
==
NULL
,
"MQ_InitConnections: %s"
,
err
);
return
NULL
;
}
static
const
char
*
test_worker_init
(
void
)
{
const
char
*
err
;
printf
(
"... testing ActiveMQ worker init
\n
"
);
err
=
MQ_WorkerInit
(
&
worker
,
NWORKERS
);
VMASSERT
(
err
==
NULL
,
"MQ_WorkerInit: %s"
,
err
);
MASSERT0
(
worker
!=
NULL
,
"Worker is NULL after MQ_WorkerInit"
);
return
NULL
;
}
static
const
char
*
test_version
(
void
)
{
const
char
*
err
;
char
version
[
BUFSIZ
];
printf
(
"... testing ActiveMQ version info
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Version: worker is NULL before call"
);
err
=
MQ_Version
(
worker
,
version
,
BUFSIZ
);
VMASSERT
(
err
==
NULL
,
"MQ_Version: %s"
,
err
);
MASSERT0
(
version
[
0
]
!=
'\0'
,
"MQ_Version: version is empty"
);
return
NULL
;
}
static
const
char
*
test_clientID
(
void
)
{
const
char
*
err
;
char
clientID
[
BUFSIZ
];
printf
(
"... testing ActiveMQ client ID info
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_ClientID: worker is NULL before call"
);
err
=
MQ_ClientID
(
worker
,
clientID
,
BUFSIZ
);
VMASSERT
(
err
==
NULL
,
"MQ_ClientID: %s"
,
err
);
MASSERT0
(
clientID
[
0
]
!=
'\0'
,
"MQ_ClientID: client ID is empty"
);
return
NULL
;
}
static
const
char
*
test_send
(
void
)
{
const
char
*
err
;
int
ret
;
printf
(
"... testing ActiveMQ message send
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Send: worker is NULL before call"
);
ret
=
MQ_Send
(
worker
,
"foo bar baz quux"
,
16
,
"key"
,
3
,
&
err
);
VMASSERT
(
ret
==
0
,
"MQ_Send: %s"
,
err
);
return
NULL
;
}
static
const
char
*
test_reconnect
(
void
)
{
const
char
*
err
;
int
ret
;
printf
(
"... testing ActiveMQ reconnect
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Reconnect: worker is NULL before call"
);
err
=
MQ_Reconnect
(
&
worker
);
VMASSERT
(
err
==
NULL
,
"MQ_Reconnect: %s"
,
err
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Reconnect: worker is NULL after call"
);
ret
=
MQ_Send
(
worker
,
"send after reconnect"
,
20
,
"key"
,
3
,
&
err
);
VMASSERT
(
ret
==
0
,
"MQ_Send() fails after reconnect: %s"
,
err
);
return
NULL
;
}
static
const
char
*
test_worker_shutdown
(
void
)
{
const
char
*
err
;
int
ret
;
printf
(
"... testing ActiveMQ worker shutdown
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Send: worker is NULL before call"
);
err
=
MQ_WorkerShutdown
(
&
worker
,
NWORKERS
);
VMASSERT
(
err
==
NULL
,
"MQ_WorkerShutdown: %s"
,
err
);
MASSERT0
(
worker
==
NULL
,
"Worker not NULL after shutdown"
);
ret
=
MQ_Send
(
worker
,
"foo bar baz quux"
,
16
,
"key"
,
3
,
&
err
);
MASSERT0
(
ret
!=
0
,
"No failure on MQ_Send after worker shutdown"
);
return
NULL
;
}
static
const
char
*
test_global_shutdown
(
void
)
{
const
char
*
err
;
printf
(
"... testing ActiveMQ global shutdown
\n
"
);
err
=
MQ_GlobalShutdown
();
VMASSERT
(
err
==
NULL
,
"MQ_GlobalShutdown: %s"
,
err
);
return
NULL
;
}
static
const
char
*
all_tests
(
void
)
{
mu_run_test
(
test_global_init
);
mu_run_test
(
test_init_connection
);
mu_run_test
(
test_worker_init
);
mu_run_test
(
test_version
);
mu_run_test
(
test_clientID
);
mu_run_test
(
test_send
);
mu_run_test
(
test_reconnect
);
mu_run_test
(
test_worker_shutdown
);
mu_run_test
(
test_global_shutdown
);
return
NULL
;
}
TEST_RUNNER
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment