Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
4bb93144
Commit
4bb93144
authored
May 28, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
trackrdrd version 3 of the MQ interface: MQ_WorkerInit() supplies the
worker number as a parameter
parent
257c6695
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
24 additions
and
24 deletions
+24
-24
mq.h
trackrdrd/include/mq.h
+7
-3
Makefile.am
trackrdrd/src/mq/activemq/Makefile.am
+1
-1
mq.c
trackrdrd/src/mq/activemq/mq.c
+9
-15
test_activemq.c
trackrdrd/src/mq/activemq/test/test_activemq.c
+1
-1
test_mq.c
trackrdrd/src/test/test_mq.c
+4
-2
trackrdrd.h
trackrdrd/src/trackrdrd.h
+1
-1
worker.c
trackrdrd/src/worker.c
+1
-1
No files found.
trackrdrd/include/mq.h
View file @
4bb93144
...
...
@@ -31,8 +31,10 @@
/**
* \file mq.h
* \brief MQ -- the messaging interface for the Varnish log tracking
* reader.
* \brief MQ messaging interface for trackrdrd
* \details MQ -- the messaging interface for the Varnish log tracking
* reader
* \version 3
*
* This header defines the interface to a messaging system, such as
* ActiveMQ or Kafka, used by the tracking reader. It is responsible for
...
...
@@ -128,9 +130,11 @@ const char *MQ_InitConnections(void);
* @param priv pointer to a private object handle. The implementation is
* expected to place a pointer to its private data structure in this
* location.
* @param wrk_num the worker number, from 1 to the value of ``nworkers``
* supplied in ``MQ_GlobalInit()``, inclusive
* @return `NULL` on success, an error message on failure
*/
const
char
*
MQ_WorkerInit
(
void
**
priv
);
const
char
*
MQ_WorkerInit
(
void
**
priv
,
int
wrk_num
);
/**
* Send data to the messaging system.
...
...
trackrdrd/src/mq/activemq/Makefile.am
View file @
4bb93144
...
...
@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \
@APR_LIBS@
\
@APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS
=
-version-info
2
:0:0
libtrackrdr_activemq_la_LDFLAGS
=
-version-info
3
:0:0
if
HAVE_RST2MAN
dist_man_MANS
=
libtrackrdr-activemq.3
...
...
trackrdrd/src/mq/activemq/mq.c
View file @
4bb93144
...
...
@@ -33,7 +33,6 @@
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include "mq.h"
...
...
@@ -43,7 +42,6 @@
static
AMQ_Connection
**
connections
;
static
AMQ_Worker
**
workers
;
static
pthread_mutex_t
connection_lock
=
PTHREAD_MUTEX_INITIALIZER
;
static
unsigned
connection
=
0
;
static
unsigned
nwrk
=
0
;
...
...
@@ -113,29 +111,25 @@ MQ_InitConnections(void)
}
const
char
*
MQ_WorkerInit
(
void
**
priv
)
MQ_WorkerInit
(
void
**
priv
,
int
wrk_num
)
{
int
i
,
ret
;
const
char
*
err
=
NULL
;
ret
=
pthread_mutex_lock
(
&
connection_lock
);
assert
(
ret
==
0
);
i
=
connection
++
%
nwrk
;
ret
=
pthread_mutex_unlock
(
&
connection_lock
);
assert
(
ret
==
0
);
AMQ_Connection
*
conn
=
connections
[
i
];
assert
(
wrk_num
>=
1
&&
wrk_num
<=
nwrk
);
wrk_num
--
;
AMQ_Connection
*
conn
=
connections
[
wrk_num
];
if
(
conn
==
NULL
)
{
err
=
AMQ_ConnectionInit
(
&
conn
,
uri
[
i
%
n_uris
]);
err
=
AMQ_ConnectionInit
(
&
conn
,
uri
[
wrk_num
%
n_uris
]);
if
(
err
!=
NULL
)
return
err
;
else
connections
[
i
]
=
conn
;
connections
[
wrk_num
]
=
conn
;
}
err
=
AMQ_WorkerInit
((
AMQ_Worker
**
)
priv
,
conn
,
qname
,
i
);
err
=
AMQ_WorkerInit
((
AMQ_Worker
**
)
priv
,
conn
,
qname
,
wrk_num
);
if
(
err
==
NULL
)
workers
[
i
]
=
(
AMQ_Worker
*
)
*
priv
;
workers
[
wrk_num
]
=
(
AMQ_Worker
*
)
*
priv
;
else
workers
[
i
]
=
NULL
;
workers
[
wrk_num
]
=
NULL
;
return
err
;
}
...
...
trackrdrd/src/mq/activemq/test/test_activemq.c
View file @
4bb93144
...
...
@@ -81,7 +81,7 @@ static const char
printf
(
"... testing ActiveMQ worker init
\n
"
);
err
=
MQ_WorkerInit
(
&
worker
);
err
=
MQ_WorkerInit
(
&
worker
,
NWORKERS
);
VMASSERT
(
err
==
NULL
,
"MQ_WorkerInit: %s"
,
err
);
MASSERT0
(
worker
!=
NULL
,
"Worker is NULL after MQ_WorkerInit"
);
...
...
trackrdrd/src/test/test_mq.c
View file @
4bb93144
...
...
@@ -43,6 +43,8 @@
#define MQ_MODULE "../mq/activemq/.libs/libtrackrdr-activemq.so"
#define MQ_CONFIG "activemq.conf"
#define NWORKERS 1
int
tests_run
=
0
;
static
char
errmsg
[
BUFSIZ
];
static
void
*
mqh
;
...
...
@@ -87,7 +89,7 @@ static char
printf
(
"... testing MQ global initialization
\n
"
);
config
.
nworkers
=
1
;
config
.
nworkers
=
NWORKERS
;
strcpy
(
config
.
mq_config_file
,
MQ_CONFIG
);
err
=
mqf
.
global_init
(
config
.
nworkers
,
config
.
mq_config_file
);
sprintf
(
errmsg
,
"MQ_GlobalInit: %s"
,
err
);
...
...
@@ -121,7 +123,7 @@ static const char
printf
(
"... test worker init
\n
"
);
err
=
mqf
.
worker_init
(
&
worker
);
err
=
mqf
.
worker_init
(
&
worker
,
NWORKERS
);
sprintf
(
errmsg
,
"MQ_WorkerInit: %s"
,
err
);
mu_assert
(
errmsg
,
err
==
NULL
);
...
...
trackrdrd/src/trackrdrd.h
View file @
4bb93144
...
...
@@ -43,7 +43,7 @@
/* message queue methods, typedefs match the interface in mq.h */
typedef
const
char
*
global_init_f
(
unsigned
nworkers
,
const
char
*
config_fname
);
typedef
const
char
*
init_connections_f
(
void
);
typedef
const
char
*
worker_init_f
(
void
**
priv
);
typedef
const
char
*
worker_init_f
(
void
**
priv
,
int
wrk_num
);
typedef
const
char
*
send_f
(
void
*
priv
,
const
char
*
data
,
unsigned
len
,
const
char
*
key
,
unsigned
keylen
);
typedef
const
char
*
version_f
(
void
*
priv
,
char
*
version
);
...
...
trackrdrd/src/worker.c
View file @
4bb93144
...
...
@@ -190,7 +190,7 @@ static void
CHECK_OBJ_NOTNULL
(
wrk
,
WORKER_DATA_MAGIC
);
wrk
->
state
=
WRK_INITIALIZING
;
err
=
mqf
.
worker_init
(
&
amq_worker
);
err
=
mqf
.
worker_init
(
&
amq_worker
,
wrk
->
id
);
if
(
err
!=
NULL
)
{
LOG_Log
(
LOG_ALERT
,
"Worker %d: Cannot initialize queue connection: %s"
,
wrk
->
id
,
err
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment