Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
8039432a
Commit
8039432a
authored
Nov 05, 2012
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
trackrdrd: added worker.c and unit test
parent
52a7aa8d
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
369 additions
and
7 deletions
+369
-7
Makefile.am
trackrdrd/src/Makefile.am
+2
-1
config.c
trackrdrd/src/config.c
+3
-0
spmcq.c
trackrdrd/src/spmcq.c
+1
-1
Makefile.am
trackrdrd/src/test/Makefile.am
+17
-2
regress.sh
trackrdrd/src/test/regress.sh
+1
-1
test_mq.c
trackrdrd/src/test/test_mq.c
+1
-1
test_spmcq_loop.sh
trackrdrd/src/test/test_spmcq_loop.sh
+1
-1
test_worker.c
trackrdrd/src/test/test_worker.c
+120
-0
trackrdrd.h
trackrdrd/src/trackrdrd.h
+10
-0
worker.c
trackrdrd/src/worker.c
+213
-0
No files found.
trackrdrd/src/Makefile.am
View file @
8039432a
...
...
@@ -15,7 +15,8 @@ trackrdrd_SOURCES = \
mq.c
\
activemq/amq.h
\
activemq/amq.cpp
\
spmcq.c
spmcq.c
\
worker.c
trackrdrd_LDADD
=
\
$(VARNISHSRC)
/lib/libvarnishcompat/libvarnishcompat.la
\
...
...
trackrdrd/src/config.c
View file @
8039432a
...
...
@@ -111,6 +111,7 @@ CONF_Add(const char *lval, const char *rval)
confUnsigned
(
"maxopen.scale"
,
maxopen_scale
);
confUnsigned
(
"maxdata.scale"
,
maxdata_scale
);
confUnsigned
(
"nworkers"
,
nworkers
);
if
(
strcmp
(
lval
,
"syslog.facility"
)
==
0
)
{
if
((
ret
=
conf_getFacility
(
rval
))
<
0
)
...
...
@@ -175,6 +176,7 @@ CONF_Init(void)
config
.
maxdata_scale
=
0
;
config
.
mq_uri
[
0
]
=
'\0'
;
config
.
mq_qname
[
0
]
=
'\0'
;
config
.
nworkers
=
1
;
}
int
...
...
@@ -245,4 +247,5 @@ CONF_Dump(void)
confdump
(
"maxdata.scale = %d"
,
config
.
maxdata_scale
);
confdump
(
"mq.uri = %s"
,
config
.
mq_uri
);
confdump
(
"mq.qname = %s"
,
config
.
mq_qname
);
confdump
(
"nworkers = %d"
,
config
.
nworkers
);
}
trackrdrd/src/spmcq.c
View file @
8039432a
...
...
@@ -53,7 +53,7 @@ static void
spmcq_cleanup
(
void
)
{
free
(
spmcq
.
data
);
pthread_mutex_destroy
(
&
spmcq_deq_lock
);
AZ
(
pthread_mutex_destroy
(
&
spmcq_deq_lock
)
);
}
int
...
...
trackrdrd/src/test/Makefile.am
View file @
8039432a
INCLUDES
=
-I
$(VARNISHSRC)
/include
-I
$(VARNISHSRC)
@AMQ_CFLAGS@
TESTS
=
test_parse test_data test_mq test_spmcq test_spmcq_loop.sh regress.sh
TESTS
=
test_parse test_data test_mq test_spmcq test_spmcq_loop.sh
\
test_worker regress.sh
check_PROGRAMS
=
test_parse test_data test_mq test_spmcq
check_PROGRAMS
=
test_parse test_data test_mq test_spmcq
test_worker
test_parse_SOURCES
=
\
minunit.h
\
...
...
@@ -41,3 +42,17 @@ test_spmcq_LDADD = \
$(VARNISHSRC)
/lib/libvarnish/libvarnish.la
\
../spmcq.
$(OBJEXT)
test_worker_SOURCES
=
\
minunit.h
\
test_worker.c
\
../trackrdrd.h
test_worker_LDADD
=
\
$(VARNISHSRC)
/lib/libvarnish/libvarnish.la
\
../worker.
$(OBJEXT)
\
../log.
$(OBJEXT)
\
../mq.
$(OBJEXT)
\
../spmcq.
$(OBJEXT)
\
../amq.
$(OBJEXT)
\
../data.
$(OBJEXT)
\
@AMQ_LIBS@
trackrdrd/src/test/regress.sh
View file @
8039432a
...
...
@@ -15,7 +15,7 @@ echo
echo
"TEST:
$0
"
echo
"... testing log output at debug level against a known checksum"
CKSUM
=
$(
../trackrdrd
-f
varnish.binlog
-l
-
-d
-c
test.conf |
cksum
)
if
[
"
$CKSUM
"
!=
'1
387393550 229074
'
]
;
then
if
[
"
$CKSUM
"
!=
'1
094437405 229102
'
]
;
then
echo
"ERROR: Regression test incorrect cksum:
$CKSUM
"
exit
1
fi
...
...
trackrdrd/src/test/test_mq.c
View file @
8039432a
...
...
@@ -49,7 +49,7 @@ static char
const
char
*
err
;
printf
(
"... testing MQ global initialization
\n
"
);
strcpy
(
config
.
mq_uri
,
"tcp://localhost:61616"
);
err
=
MQ_GlobalInit
();
sprintf
(
errmsg
,
"MQ_GlobalInit: %s"
,
err
);
...
...
trackrdrd/src/test/test_spmcq_loop.sh
View file @
8039432a
#! /bin/bash
N
=
10
00
N
=
5
00
echo
echo
"TEST:
$0
"
...
...
trackrdrd/src/test/test_worker.c
0 → 100644
View file @
8039432a
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <string.h>
#include "minunit.h"
#include "../trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
#define DEBUG 0
#define debug_print(fmt, ...) \
do { if (DEBUG) fprintf(stderr, fmt, __VA_ARGS__); } while(0)
/* Automake exit code for "skipped" in make check */
#define EXIT_SKIPPED 77
#define NWORKERS 5
int
tests_run
=
0
;
static
char
errmsg
[
BUFSIZ
];
/* N.B.: Always run this test first */
static
char
*
test_worker_init
(
void
)
{
int
err
;
const
char
*
error
;
printf
(
"... testing worker initialization
\n
"
);
config
.
maxopen_scale
=
0
;
config
.
nworkers
=
NWORKERS
;
strcpy
(
config
.
mq_uri
,
"tcp://localhost:61616"
);
strcpy
(
config
.
mq_qname
,
"lhoste/tracking/test"
);
error
=
MQ_GlobalInit
();
sprintf
(
errmsg
,
"MQ_GlobalInit failed: %s"
,
error
);
mu_assert
(
errmsg
,
error
==
NULL
);
err
=
WRK_Init
();
sprintf
(
errmsg
,
"WRK_Init: %s"
,
strerror
(
err
));
mu_assert
(
errmsg
,
err
==
0
);
AZ
(
LOG_Open
(
"test_worker"
));
AZ
(
DATA_Init
());
AZ
(
SPMCQ_Init
());
return
NULL
;
}
static
char
*
test_worker_run
(
void
)
{
dataentry
*
entry
;
printf
(
"... testing run of %d workers
\n
"
,
NWORKERS
);
srand48
(
time
(
NULL
));
unsigned
xid
=
(
unsigned
int
)
lrand48
();
WRK_Start
();
for
(
int
i
=
0
;
i
<
1024
;
i
++
)
{
entry
=
DATA_Insert
(
xid
);
CHECK_OBJ_NOTNULL
(
entry
,
DATA_MAGIC
);
entry
->
xid
=
xid
;
sprintf
(
entry
->
data
,
"XID=%d&foo=bar&baz=quux&record=%d"
,
xid
,
i
+
1
);
entry
->
end
=
strlen
(
entry
->
data
);
entry
->
state
=
DATA_DONE
;
sprintf
(
errmsg
,
"SPMCQ_Enq: queue full"
);
mu_assert
(
errmsg
,
SPMCQ_Enq
(
entry
)
!=
NULL
);
}
WRK_Halt
();
WRK_Shutdown
();
AZ
(
MQ_GlobalShutdown
());
LOG_Close
();
return
NULL
;
}
static
const
char
*
all_tests
(
void
)
{
mu_run_test
(
test_worker_init
);
mu_run_test
(
test_worker_run
);
return
NULL
;
}
TEST_RUNNER
trackrdrd/src/trackrdrd.h
View file @
8039432a
...
...
@@ -35,6 +35,13 @@
#define MIN_TABLE_SCALE 10
/* worker.c */
int
WRK_Init
(
void
);
void
WRK_Start
(
void
);
void
WRK_Halt
(
void
);
void
WRK_Shutdown
(
void
);
/* spmcq.c */
/* Single producer multiple consumer bounded FIFO queue */
...
...
@@ -100,6 +107,8 @@ typedef struct {
unsigned
open
;
unsigned
done
;
unsigned
submitted
;
/* Records submitted */
unsigned
sent
;
/* Records sent to MQ */
unsigned
failed
;
/* MQ send fails */
unsigned
occ_hi
;
/* Occupancy high water mark */
unsigned
data_hi
;
/* Data high water mark */
dataentry
*
entry
;
...
...
@@ -130,6 +139,7 @@ struct config {
unsigned
maxdata_scale
;
char
mq_uri
[
BUFSIZ
];
char
mq_qname
[
BUFSIZ
];
unsigned
nworkers
;
}
config
;
void
CONF_Init
(
void
);
...
...
trackrdrd/src/worker.c
0 → 100644
View file @
8039432a
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <pthread.h>
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include "trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
typedef
struct
{
unsigned
magic
;
#define WORKER_DATA_MAGIC 0xd8eef137
unsigned
id
;
unsigned
status
;
}
worker_data_t
;
typedef
struct
{
pthread_t
worker
;
worker_data_t
*
wrk_data
;
}
thread_data_t
;
static
unsigned
run
,
cleaned
=
0
;
static
thread_data_t
*
thread_data
;
static
inline
void
wrk_send
(
void
*
amq_worker
,
dataentry
*
entry
,
unsigned
id
)
{
const
char
*
err
;
CHECK_OBJ_NOTNULL
(
entry
,
DATA_MAGIC
);
assert
(
entry
->
state
==
DATA_DONE
);
AN
(
amq_worker
);
err
=
MQ_Send
(
amq_worker
,
entry
->
data
,
entry
->
end
);
if
(
err
!=
NULL
)
{
/* XXX: error recovery? reconnect? preserve the data? */
LOG_Log
(
LOG_ALERT
,
"Worker %d: Failed to send data: %s"
,
id
,
err
);
LOG_Log
(
LOG_ERR
,
"Worker %d: Data DISCARDED [%.*s]"
,
id
,
entry
->
end
,
entry
->
data
);
tbl
.
failed
++
;
}
else
tbl
.
sent
++
;
entry
->
state
=
DATA_OPEN
;
}
static
void
*
wrk_main
(
void
*
arg
)
{
worker_data_t
*
wrk
=
(
worker_data_t
*
)
arg
;
void
*
amq_worker
;
dataentry
*
entry
;
const
char
*
err
;
LOG_Log
(
LOG_INFO
,
"Worker %d: starting"
,
wrk
->
id
);
CHECK_OBJ_NOTNULL
(
wrk
,
WORKER_DATA_MAGIC
);
err
=
MQ_WorkerInit
(
&
amq_worker
);
if
(
err
!=
NULL
)
{
LOG_Log
(
LOG_ALERT
,
"Worker %d: Cannot initialize queue connection: %s"
,
wrk
->
id
,
err
);
wrk
->
status
=
EXIT_FAILURE
;
pthread_exit
((
void
*
)
wrk
);
}
while
(
run
)
{
entry
=
(
dataentry
*
)
SPMCQ_Deq
();
if
(
entry
!=
NULL
)
{
/* Dequeued a data entry */
wrk_send
(
amq_worker
,
entry
,
wrk
->
id
);
continue
;
}
/* Queue is empty, wait until data are available, or quit is
signaled.
Grab the CV lock, which also constitutes an implicit memory
barrier */
AZ
(
pthread_mutex_lock
(
&
spmcq_nonempty_lock
));
/* run is guaranteed to be fresh here */
if
(
run
)
AZ
(
pthread_cond_wait
(
&
spmcq_nonempty_cond
,
&
spmcq_nonempty_lock
));
AZ
(
pthread_mutex_unlock
(
&
spmcq_nonempty_lock
));
}
/* Prepare to exit, drain the queue */
while
((
entry
=
(
dataentry
*
)
SPMCQ_Deq
())
!=
NULL
)
wrk_send
(
amq_worker
,
entry
,
wrk
->
id
);
wrk
->
status
=
EXIT_SUCCESS
;
err
=
MQ_WorkerShutdown
(
&
amq_worker
);
if
(
err
!=
NULL
)
{
LOG_Log
(
LOG_ALERT
,
"Worker %d: MQ worker shutdown failed: %s"
,
wrk
->
id
,
err
);
wrk
->
status
=
EXIT_FAILURE
;
}
LOG_Log
(
LOG_INFO
,
"Worker %d: exiting"
,
wrk
->
id
);
pthread_exit
((
void
*
)
wrk
);
}
static
void
wrk_cleanup
(
void
)
{
if
(
cleaned
)
return
;
for
(
int
i
=
0
;
i
<
config
.
nworkers
;
i
++
)
free
(
thread_data
[
i
].
wrk_data
);
free
(
thread_data
);
AZ
(
pthread_mutex_destroy
(
&
spmcq_nonempty_lock
));
AZ
(
pthread_cond_destroy
(
&
spmcq_nonempty_cond
));
AZ
(
pthread_mutex_destroy
(
&
spmcq_nonfull_lock
));
AZ
(
pthread_cond_destroy
(
&
spmcq_nonfull_cond
));
cleaned
=
1
;
}
int
WRK_Init
(
void
)
{
thread_data
=
(
thread_data_t
*
)
malloc
(
config
.
nworkers
*
sizeof
(
thread_data_t
));
if
(
thread_data
==
NULL
)
{
LOG_Log
(
LOG_ALERT
,
"Cannot allocate thread data: %s"
,
strerror
(
errno
));
return
(
errno
);
}
run
=
1
;
for
(
int
i
=
0
;
i
<
config
.
nworkers
;
i
++
)
{
thread_data
[
i
].
wrk_data
=
(
worker_data_t
*
)
malloc
(
sizeof
(
worker_data_t
));
if
(
thread_data
[
i
].
wrk_data
==
NULL
)
{
LOG_Log
(
LOG_ALERT
,
"Cannot allocate worker data for worker %d: %s"
,
i
+
1
,
strerror
(
errno
));
return
(
errno
);
}
thread_data
[
i
].
wrk_data
->
magic
=
WORKER_DATA_MAGIC
;
thread_data
[
i
].
wrk_data
->
id
=
i
+
1
;
}
AZ
(
pthread_mutex_init
(
&
spmcq_nonempty_lock
,
NULL
));
AZ
(
pthread_cond_init
(
&
spmcq_nonempty_cond
,
NULL
));
AZ
(
pthread_mutex_init
(
&
spmcq_nonfull_lock
,
NULL
));
AZ
(
pthread_cond_init
(
&
spmcq_nonfull_cond
,
NULL
));
atexit
(
wrk_cleanup
);
return
0
;
}
void
WRK_Start
(
void
)
{
run
=
1
;
for
(
int
i
=
0
;
i
<
config
.
nworkers
;
i
++
)
AZ
(
pthread_create
(
&
thread_data
[
i
].
worker
,
NULL
,
wrk_main
,
thread_data
[
i
].
wrk_data
));
}
void
WRK_Halt
(
void
)
{
/*
* must only modify run under spmcq_nonempty_lock to ensure that
* we signal all waiting consumers (otherwise a consumer could go
* waiting _after_ we have broadcasted and so miss the event.
*/
AZ
(
pthread_mutex_lock
(
&
spmcq_nonempty_lock
));
run
=
0
;
AZ
(
pthread_cond_broadcast
(
&
spmcq_nonempty_cond
));
AZ
(
pthread_mutex_unlock
(
&
spmcq_nonempty_lock
));
for
(
int
i
=
0
;
i
<
config
.
nworkers
;
i
++
)
{
AZ
(
pthread_join
(
thread_data
[
i
].
worker
,
(
void
**
)
&
thread_data
[
i
].
wrk_data
));
CHECK_OBJ_NOTNULL
(
thread_data
[
i
].
wrk_data
,
WORKER_DATA_MAGIC
);
if
(
thread_data
[
i
].
wrk_data
->
status
!=
EXIT_SUCCESS
)
LOG_Log
(
LOG_ERR
,
"Worker %d returned failure status"
,
i
+
1
);
}
}
void
WRK_Shutdown
(
void
)
{
/* XXX: error if run=1? */
wrk_cleanup
();
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment