Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
b9aa9b61
Commit
b9aa9b61
authored
May 05, 2015
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
enforce maximum lengths for the version and client ID strings in
the MQ interface, bumps the MQ CURRENT version to 5
parent
b47249ff
Changes
15
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
45 additions
and
39 deletions
+45
-39
mq.h
include/mq.h
+9
-5
Makefile.am
src/mq/activemq/Makefile.am
+1
-1
amq.cpp
src/mq/activemq/amq.cpp
+4
-4
amq.h
src/mq/activemq/amq.h
+2
-2
mq.c
src/mq/activemq/mq.c
+4
-4
test_activemq.c
src/mq/activemq/test/test_activemq.c
+2
-2
Makefile.am
src/mq/file/Makefile.am
+1
-1
mq.c
src/mq/file/mq.c
+4
-4
Makefile.am
src/mq/kafka/Makefile.am
+1
-1
mq.c
src/mq/kafka/mq.c
+4
-4
test_kafka.c
src/mq/kafka/test/test_kafka.c
+2
-2
regress.sh
src/test/regress.sh
+3
-2
test_mq.c
src/test/test_mq.c
+2
-2
trackrdrd.h
src/trackrdrd.h
+3
-2
worker.c
src/worker.c
+3
-3
No files found.
include/mq.h
View file @
b9aa9b61
/*-
/*-
* Copyright (c) 2014 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014
-2015
UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2014 Otto Gmbh & Co KG
* Copyright (c) 2014
-2015
Otto Gmbh & Co KG
* All rights reserved
* All rights reserved
* Use only with permission
* Use only with permission
*
*
...
@@ -29,12 +29,14 @@
...
@@ -29,12 +29,14 @@
*
*
*/
*/
#include <stddef.h>
/**
/**
* \file mq.h
* \file mq.h
* \brief MQ messaging interface for trackrdrd
* \brief MQ messaging interface for trackrdrd
* \details MQ -- the messaging interface for the Varnish log tracking
* \details MQ -- the messaging interface for the Varnish log tracking
* reader
* reader
* \version
4
* \version
5
*
*
* This header defines the interface to a messaging system, such as
* This header defines the interface to a messaging system, such as
* ActiveMQ or Kafka, used by the tracking reader. It is responsible for
* ActiveMQ or Kafka, used by the tracking reader. It is responsible for
...
@@ -188,9 +190,10 @@ int MQ_Send(void *priv, const char *data, unsigned len,
...
@@ -188,9 +190,10 @@ int MQ_Send(void *priv, const char *data, unsigned len,
* @param version pointer to the version string. The implementation is
* @param version pointer to the version string. The implementation is
* expected to place the starting address of a null-terminated string in
* expected to place the starting address of a null-terminated string in
* this location.
* this location.
* @param len maximum length of the version string
* @return `NULL` on success, an error message on failure
* @return `NULL` on success, an error message on failure
*/
*/
const
char
*
MQ_Version
(
void
*
priv
,
char
*
version
);
const
char
*
MQ_Version
(
void
*
priv
,
char
*
version
,
size_t
len
);
/**
/**
* Return an ID string for the client connection.
* Return an ID string for the client connection.
...
@@ -204,9 +207,10 @@ const char *MQ_Version(void *priv, char *version);
...
@@ -204,9 +207,10 @@ const char *MQ_Version(void *priv, char *version);
* @param clientID pointer to the client ID string. The implementation is
* @param clientID pointer to the client ID string. The implementation is
* expected to place the starting address of a null-terminated string in
* expected to place the starting address of a null-terminated string in
* this location.
* this location.
* @param len maximum length of the ID string
* @return `NULL` on success, an error message on failure
* @return `NULL` on success, an error message on failure
*/
*/
const
char
*
MQ_ClientID
(
void
*
priv
,
char
*
clientID
);
const
char
*
MQ_ClientID
(
void
*
priv
,
char
*
clientID
,
size_t
len
);
/**
/**
* Re-initialize a connection to the messaging system after a send
* Re-initialize a connection to the messaging system after a send
...
...
src/mq/activemq/Makefile.am
View file @
b9aa9b61
...
@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \
...
@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \
@APR_LIBS@
\
@APR_LIBS@
\
@APU_LIBS@
@APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS
=
-version-info
4
:0:0
libtrackrdr_activemq_la_LDFLAGS
=
-version-info
5
:0:0
if
HAVE_RST2MAN
if
HAVE_RST2MAN
dist_man_MANS
=
libtrackrdr-activemq.3
dist_man_MANS
=
libtrackrdr-activemq.3
...
...
src/mq/activemq/amq.cpp
View file @
b9aa9b61
...
@@ -154,20 +154,20 @@ AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len)
...
@@ -154,20 +154,20 @@ AMQ_Send(AMQ_Worker *worker, const char *data, unsigned len)
}
}
const
char
*
const
char
*
AMQ_Version
(
AMQ_Worker
*
worker
,
char
*
version
)
AMQ_Version
(
AMQ_Worker
*
worker
,
char
*
version
,
size_t
len
)
{
{
try
{
try
{
strcpy
(
version
,
worker
->
getVersion
().
c_str
()
);
strncpy
(
version
,
worker
->
getVersion
().
c_str
(),
len
);
return
NULL
;
return
NULL
;
}
}
CATCHALL
CATCHALL
}
}
const
char
*
const
char
*
AMQ_ClientID
(
AMQ_Worker
*
worker
,
char
*
clientID
)
AMQ_ClientID
(
AMQ_Worker
*
worker
,
char
*
clientID
,
size_t
len
)
{
{
try
{
try
{
strcpy
(
clientID
,
worker
->
getClientID
().
c_str
()
);
strncpy
(
clientID
,
worker
->
getClientID
().
c_str
(),
len
);
return
NULL
;
return
NULL
;
}
}
CATCHALL
CATCHALL
...
...
src/mq/activemq/amq.h
View file @
b9aa9b61
...
@@ -78,8 +78,8 @@ extern "C" {
...
@@ -78,8 +78,8 @@ extern "C" {
const
char
*
AMQ_WorkerInit
(
AMQ_Worker
**
worker
,
AMQ_Connection
*
connection
,
const
char
*
AMQ_WorkerInit
(
AMQ_Worker
**
worker
,
AMQ_Connection
*
connection
,
char
*
qName
,
int
num
);
char
*
qName
,
int
num
);
const
char
*
AMQ_Send
(
AMQ_Worker
*
worker
,
const
char
*
data
,
unsigned
len
);
const
char
*
AMQ_Send
(
AMQ_Worker
*
worker
,
const
char
*
data
,
unsigned
len
);
const
char
*
AMQ_Version
(
AMQ_Worker
*
worker
,
char
*
version
);
const
char
*
AMQ_Version
(
AMQ_Worker
*
worker
,
char
*
version
,
size_t
len
);
const
char
*
AMQ_ClientID
(
AMQ_Worker
*
worker
,
char
*
clientID
);
const
char
*
AMQ_ClientID
(
AMQ_Worker
*
worker
,
char
*
clientID
,
size_t
len
);
const
char
*
AMQ_GetNum
(
AMQ_Worker
*
worker
,
int
*
num
);
const
char
*
AMQ_GetNum
(
AMQ_Worker
*
worker
,
int
*
num
);
const
char
*
AMQ_WorkerShutdown
(
AMQ_Worker
**
worker
);
const
char
*
AMQ_WorkerShutdown
(
AMQ_Worker
**
worker
);
const
char
*
AMQ_GlobalShutdown
(
void
);
const
char
*
AMQ_GlobalShutdown
(
void
);
...
...
src/mq/activemq/mq.c
View file @
b9aa9b61
...
@@ -183,15 +183,15 @@ MQ_Reconnect(void **priv)
...
@@ -183,15 +183,15 @@ MQ_Reconnect(void **priv)
}
}
const
char
*
const
char
*
MQ_Version
(
void
*
priv
,
char
*
version
)
MQ_Version
(
void
*
priv
,
char
*
version
,
size_t
len
)
{
{
return
AMQ_Version
((
AMQ_Worker
*
)
priv
,
version
);
return
AMQ_Version
((
AMQ_Worker
*
)
priv
,
version
,
len
);
}
}
const
char
*
const
char
*
MQ_ClientID
(
void
*
priv
,
char
*
clientID
)
MQ_ClientID
(
void
*
priv
,
char
*
clientID
,
size_t
len
)
{
{
return
AMQ_ClientID
((
AMQ_Worker
*
)
priv
,
clientID
);
return
AMQ_ClientID
((
AMQ_Worker
*
)
priv
,
clientID
,
len
);
}
}
const
char
*
const
char
*
...
...
src/mq/activemq/test/test_activemq.c
View file @
b9aa9b61
...
@@ -98,7 +98,7 @@ static const char
...
@@ -98,7 +98,7 @@ static const char
printf
(
"... testing ActiveMQ version info
\n
"
);
printf
(
"... testing ActiveMQ version info
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Version: worker is NULL before call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Version: worker is NULL before call"
);
err
=
MQ_Version
(
worker
,
version
);
err
=
MQ_Version
(
worker
,
version
,
BUFSIZ
);
VMASSERT
(
err
==
NULL
,
"MQ_Version: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_Version: %s"
,
err
);
MASSERT0
(
version
[
0
]
!=
'\0'
,
"MQ_Version: version is empty"
);
MASSERT0
(
version
[
0
]
!=
'\0'
,
"MQ_Version: version is empty"
);
...
@@ -114,7 +114,7 @@ static const char
...
@@ -114,7 +114,7 @@ static const char
printf
(
"... testing ActiveMQ client ID info
\n
"
);
printf
(
"... testing ActiveMQ client ID info
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_ClientID: worker is NULL before call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_ClientID: worker is NULL before call"
);
err
=
MQ_ClientID
(
worker
,
clientID
);
err
=
MQ_ClientID
(
worker
,
clientID
,
BUFSIZ
);
VMASSERT
(
err
==
NULL
,
"MQ_ClientID: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_ClientID: %s"
,
err
);
MASSERT0
(
clientID
[
0
]
!=
'\0'
,
"MQ_ClientID: client ID is empty"
);
MASSERT0
(
clientID
[
0
]
!=
'\0'
,
"MQ_ClientID: client ID is empty"
);
...
...
src/mq/file/Makefile.am
View file @
b9aa9b61
INCLUDES
=
-I
$(top_srcdir)
/include
INCLUDES
=
-I
$(top_srcdir)
/include
CURRENT
=
1
CURRENT
=
5
REVISION
=
0
REVISION
=
0
AGE
=
0
AGE
=
0
...
...
src/mq/file/mq.c
View file @
b9aa9b61
...
@@ -174,19 +174,19 @@ MQ_Reconnect(void **priv)
...
@@ -174,19 +174,19 @@ MQ_Reconnect(void **priv)
}
}
const
char
*
const
char
*
MQ_Version
(
void
*
priv
,
char
*
version
)
MQ_Version
(
void
*
priv
,
char
*
version
,
size_t
len
)
{
{
(
void
)
priv
;
(
void
)
priv
;
str
cpy
(
version
,
_versio
n
);
str
ncpy
(
version
,
_version
,
le
n
);
return
NULL
;
return
NULL
;
}
}
const
char
*
const
char
*
MQ_ClientID
(
void
*
priv
,
char
*
clientID
)
MQ_ClientID
(
void
*
priv
,
char
*
clientID
,
size_t
len
)
{
{
wrk_t
*
wrk
;
wrk_t
*
wrk
;
CAST_OBJ_NOTNULL
(
wrk
,
priv
,
FILE_WRK_MAGIC
);
CAST_OBJ_NOTNULL
(
wrk
,
priv
,
FILE_WRK_MAGIC
);
s
printf
(
clientID
,
"worker %d"
,
wrk
->
n
);
s
nprintf
(
clientID
,
len
,
"worker %d"
,
wrk
->
n
);
return
NULL
;
return
NULL
;
}
}
...
...
src/mq/kafka/Makefile.am
View file @
b9aa9b61
INCLUDES
=
-I
$(top_srcdir)
/include
INCLUDES
=
-I
$(top_srcdir)
/include
CURRENT
=
4
CURRENT
=
5
REVISION
=
0
REVISION
=
0
AGE
=
0
AGE
=
0
...
...
src/mq/kafka/mq.c
View file @
b9aa9b61
...
@@ -361,19 +361,19 @@ MQ_Reconnect(void **priv)
...
@@ -361,19 +361,19 @@ MQ_Reconnect(void **priv)
}
}
const
char
*
const
char
*
MQ_Version
(
void
*
priv
,
char
*
version
)
MQ_Version
(
void
*
priv
,
char
*
version
,
size_t
len
)
{
{
(
void
)
priv
;
(
void
)
priv
;
str
cpy
(
version
,
_versio
n
);
str
ncpy
(
version
,
_version
,
le
n
);
return
NULL
;
return
NULL
;
}
}
const
char
*
const
char
*
MQ_ClientID
(
void
*
priv
,
char
*
clientID
)
MQ_ClientID
(
void
*
priv
,
char
*
clientID
,
size_t
len
)
{
{
kafka_wrk_t
*
wrk
;
kafka_wrk_t
*
wrk
;
CAST_OBJ_NOTNULL
(
wrk
,
priv
,
KAFKA_WRK_MAGIC
);
CAST_OBJ_NOTNULL
(
wrk
,
priv
,
KAFKA_WRK_MAGIC
);
str
cpy
(
clientID
,
rd_kafka_name
(
wrk
->
kafka
)
);
str
ncpy
(
clientID
,
rd_kafka_name
(
wrk
->
kafka
),
len
);
return
NULL
;
return
NULL
;
}
}
...
...
src/mq/kafka/test/test_kafka.c
View file @
b9aa9b61
...
@@ -98,7 +98,7 @@ static const char
...
@@ -98,7 +98,7 @@ static const char
printf
(
"... testing Kafka version info
\n
"
);
printf
(
"... testing Kafka version info
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Version: worker is NULL before call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Version: worker is NULL before call"
);
err
=
MQ_Version
(
worker
,
version
);
err
=
MQ_Version
(
worker
,
version
,
BUFSIZ
);
VMASSERT
(
err
==
NULL
,
"MQ_Version: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_Version: %s"
,
err
);
MASSERT0
(
version
[
0
]
!=
'\0'
,
"MQ_Version: version is empty"
);
MASSERT0
(
version
[
0
]
!=
'\0'
,
"MQ_Version: version is empty"
);
...
@@ -114,7 +114,7 @@ static const char
...
@@ -114,7 +114,7 @@ static const char
printf
(
"... testing Kafka client ID info
\n
"
);
printf
(
"... testing Kafka client ID info
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_ClientID: worker is NULL before call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_ClientID: worker is NULL before call"
);
err
=
MQ_ClientID
(
worker
,
clientID
);
err
=
MQ_ClientID
(
worker
,
clientID
,
BUFSIZ
);
VMASSERT
(
err
==
NULL
,
"MQ_ClientID: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_ClientID: %s"
,
err
);
MASSERT0
(
clientID
[
0
]
!=
'\0'
,
"MQ_ClientID: client ID is empty"
);
MASSERT0
(
clientID
[
0
]
!=
'\0'
,
"MQ_ClientID: client ID is empty"
);
...
...
src/test/regress.sh
View file @
b9aa9b61
...
@@ -37,8 +37,9 @@ fi
...
@@ -37,8 +37,9 @@ fi
# Now check the logs from the worker thread
# Now check the logs from the worker thread
# Filter the 'returned to free list' messages, since these may be different
# Filter the 'returned to free list' messages, since these may be different
# in different runs.
# in different runs.
CKSUM
=
$(
grep
'Worker 1'
$LOG
| egrep
-v
'returned [0-9]+ [^ ]+ to free list'
|
cksum
)
# Also filter the version/revision from the "connected" line.
if
[
"
$CKSUM
"
!=
'1219614274 35546'
]
;
then
CKSUM
=
$(
grep
'Worker 1'
$LOG
| egrep
-v
'returned [0-9]+ [^ ]+ to free list'
|
sed
-e
's/\(connected\) \(.*\)/\1/'
|
cksum
)
if
[
"
$CKSUM
"
!=
'4004155691 35506'
]
;
then
echo
"ERROR: Regression test incorrect worker log cksum:
$CKSUM
"
echo
"ERROR: Regression test incorrect worker log cksum:
$CKSUM
"
exit
1
exit
1
fi
fi
...
...
src/test/test_mq.c
View file @
b9aa9b61
...
@@ -130,7 +130,7 @@ static const char
...
@@ -130,7 +130,7 @@ static const char
printf
(
"... testing version info
\n
"
);
printf
(
"... testing version info
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Version: worker is NULL before call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Version: worker is NULL before call"
);
err
=
mqf
.
version
(
worker
,
version
);
err
=
mqf
.
version
(
worker
,
version
,
BUFSIZ
);
VMASSERT
(
err
==
NULL
,
"MQ_Version: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_Version: %s"
,
err
);
MASSERT0
(
version
[
0
]
!=
'\0'
,
"MQ_Version: version is empty"
);
MASSERT0
(
version
[
0
]
!=
'\0'
,
"MQ_Version: version is empty"
);
...
@@ -146,7 +146,7 @@ static const char
...
@@ -146,7 +146,7 @@ static const char
printf
(
"... testing client ID info
\n
"
);
printf
(
"... testing client ID info
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_ClientID: worker is NULL before call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_ClientID: worker is NULL before call"
);
err
=
mqf
.
client_id
(
worker
,
clientID
);
err
=
mqf
.
client_id
(
worker
,
clientID
,
BUFSIZ
);
VMASSERT
(
err
==
NULL
,
"MQ_ClientID: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_ClientID: %s"
,
err
);
MASSERT0
(
clientID
[
0
]
!=
'\0'
,
"MQ_ClientID: client ID is empty"
);
MASSERT0
(
clientID
[
0
]
!=
'\0'
,
"MQ_ClientID: client ID is empty"
);
...
...
src/trackrdrd.h
View file @
b9aa9b61
...
@@ -37,6 +37,7 @@
...
@@ -37,6 +37,7 @@
#include <signal.h>
#include <signal.h>
#include <stdint.h>
#include <stdint.h>
#include <limits.h>
#include <limits.h>
#include <stddef.h>
#include "vapi/vsl.h"
#include "vapi/vsl.h"
#include "vqueue.h"
#include "vqueue.h"
...
@@ -47,8 +48,8 @@ typedef const char *init_connections_f(void);
...
@@ -47,8 +48,8 @@ typedef const char *init_connections_f(void);
typedef
const
char
*
worker_init_f
(
void
**
priv
,
int
wrk_num
);
typedef
const
char
*
worker_init_f
(
void
**
priv
,
int
wrk_num
);
typedef
int
send_f
(
void
*
priv
,
const
char
*
data
,
unsigned
len
,
typedef
int
send_f
(
void
*
priv
,
const
char
*
data
,
unsigned
len
,
const
char
*
key
,
unsigned
keylen
,
const
char
**
error
);
const
char
*
key
,
unsigned
keylen
,
const
char
**
error
);
typedef
const
char
*
version_f
(
void
*
priv
,
char
*
version
);
typedef
const
char
*
version_f
(
void
*
priv
,
char
*
version
,
size_t
len
);
typedef
const
char
*
client_id_f
(
void
*
priv
,
char
*
clientID
);
typedef
const
char
*
client_id_f
(
void
*
priv
,
char
*
clientID
,
size_t
len
);
typedef
const
char
*
reconnect_f
(
void
**
priv
);
typedef
const
char
*
reconnect_f
(
void
**
priv
);
typedef
const
char
*
worker_shutdown_f
(
void
**
priv
,
int
wrk_num
);
typedef
const
char
*
worker_shutdown_f
(
void
**
priv
,
int
wrk_num
);
typedef
const
char
*
global_shutdown_f
(
void
);
typedef
const
char
*
global_shutdown_f
(
void
);
...
...
src/worker.c
View file @
b9aa9b61
...
@@ -41,7 +41,7 @@
...
@@ -41,7 +41,7 @@
#include "miniobj.h"
#include "miniobj.h"
#include "vsb.h"
#include "vsb.h"
#define VERSION_LEN
64
#define VERSION_LEN
80
#define CLIENT_ID_LEN 80
#define CLIENT_ID_LEN 80
static
int
running
=
0
,
exited
=
0
;
static
int
running
=
0
,
exited
=
0
;
...
@@ -112,12 +112,12 @@ wrk_log_connection(void *mq_worker, unsigned id)
...
@@ -112,12 +112,12 @@ wrk_log_connection(void *mq_worker, unsigned id)
const
char
*
err
;
const
char
*
err
;
char
version
[
VERSION_LEN
],
clientID
[
CLIENT_ID_LEN
];
char
version
[
VERSION_LEN
],
clientID
[
CLIENT_ID_LEN
];
err
=
mqf
.
version
(
mq_worker
,
version
);
err
=
mqf
.
version
(
mq_worker
,
version
,
VERSION_LEN
);
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
LOG_Log
(
LOG_ERR
,
"Worker %d: Failed to get MQ version"
,
id
,
err
);
LOG_Log
(
LOG_ERR
,
"Worker %d: Failed to get MQ version"
,
id
,
err
);
version
[
0
]
=
'\0'
;
version
[
0
]
=
'\0'
;
}
}
err
=
mqf
.
client_id
(
mq_worker
,
clientID
);
err
=
mqf
.
client_id
(
mq_worker
,
clientID
,
CLIENT_ID_LEN
);
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
LOG_Log
(
LOG_ERR
,
"Worker %d: Failed to get MQ client ID"
,
id
,
err
);
LOG_Log
(
LOG_ERR
,
"Worker %d: Failed to get MQ client ID"
,
id
,
err
);
clientID
[
0
]
=
'\0'
;
clientID
[
0
]
=
'\0'
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment