Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
d6a7ba55
Commit
d6a7ba55
authored
Feb 21, 2013
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
trackrdrd: reworked SPMCQ, waits for a full queue are never necessary
parent
6b84c169
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
149 additions
and
181 deletions
+149
-181
child.c
src/child.c
+1
-5
monitor.c
src/monitor.c
+0
-2
spmcq.c
src/spmcq.c
+61
-60
Makefile.am
src/test/Makefile.am
+2
-12
test_spmcq.c
src/test/test_spmcq.c
+26
-30
test_spmcq_loop.sh
src/test/test_spmcq_loop.sh
+3
-3
test_worker.c
src/test/test_worker.c
+1
-1
trackrdrd.h
src/trackrdrd.h
+51
-66
worker.c
src/worker.c
+4
-2
No files found.
src/child.c
View file @
d6a7ba55
...
@@ -232,11 +232,7 @@ data_submit(dataentry *de)
...
@@ -232,11 +232,7 @@ data_submit(dataentry *de)
return
;
return
;
}
}
while
(
!
SPMCQ_Enq
((
void
*
)
de
))
{
SPMCQ_Enq
(
de
);
dtbl
.
w_stats
.
wait_qfull
++
;
LOG_Log
(
LOG_ALERT
,
"%s"
,
"Internal queue full, waiting for dequeue"
);
spmcq_wait
(
room
);
}
dtbl
.
w_stats
.
submitted
++
;
dtbl
.
w_stats
.
submitted
++
;
/* should we wake up another worker? */
/* should we wake up another worker? */
...
...
src/monitor.c
View file @
d6a7ba55
...
@@ -54,7 +54,6 @@ log_output(void)
...
@@ -54,7 +54,6 @@ log_output(void)
"len=%u "
"len=%u "
"nodata=%u "
"nodata=%u "
"submitted=%u "
"submitted=%u "
"wait_qfull=%u "
"wait_room=%u "
"wait_room=%u "
"data_hi=%u "
"data_hi=%u "
"data_overflows=%u "
"data_overflows=%u "
...
@@ -68,7 +67,6 @@ log_output(void)
...
@@ -68,7 +67,6 @@ log_output(void)
dtbl
.
len
,
dtbl
.
len
,
dtbl
.
w_stats
.
nodata
,
dtbl
.
w_stats
.
nodata
,
dtbl
.
w_stats
.
submitted
,
dtbl
.
w_stats
.
submitted
,
dtbl
.
w_stats
.
wait_qfull
,
dtbl
.
w_stats
.
wait_room
,
dtbl
.
w_stats
.
wait_room
,
dtbl
.
w_stats
.
data_hi
,
dtbl
.
w_stats
.
data_hi
,
dtbl
.
w_stats
.
data_overflows
,
dtbl
.
w_stats
.
data_overflows
,
...
...
src/spmcq.c
View file @
d6a7ba55
...
@@ -27,6 +27,7 @@
...
@@ -27,6 +27,7 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* SUCH DAMAGE.
*
*
* Single producer multiple consumer bounded FIFO queue
*/
*/
#include <stdlib.h>
#include <stdlib.h>
...
@@ -37,79 +38,100 @@
...
@@ -37,79 +38,100 @@
#include "trackrdrd.h"
#include "trackrdrd.h"
#include "vas.h"
#include "vas.h"
#include "vmb.h"
#include "vqueue.h"
#if 0
typedef struct {
unsigned magic;
#define SPMCQ_MAGIC 0xe9a5d0a8
const unsigned mask;
void **data;
volatile unsigned head;
volatile unsigned tail;
} spmcq_t;
spmcq_t spmcq;
#endif
static
volatile
unsigned
long
enqs
=
0
,
deqs
=
0
;
static
pthread_mutex_t
spmcq_lock
;
static
pthread_mutex_t
spmcq_deq_lock
;
static
pthread_mutex_t
spmcq_deq_lock
;
static
unsigned
qlen_goal
;
static
unsigned
qlen_goal
;
VSTAILQ_HEAD
(
spmcq_s
,
dataentry_s
);
struct
spmcq_s
spmcq_head
=
VSTAILQ_HEAD_INITIALIZER
(
spmcq_head
);
struct
spmcq_s
enq_head
=
VSTAILQ_HEAD_INITIALIZER
(
enq_head
);
struct
spmcq_s
deq_head
=
VSTAILQ_HEAD_INITIALIZER
(
deq_head
);
static
inline
unsigned
static
inline
unsigned
spmcq_len
(
void
)
spmcq_len
(
void
)
{
{
if
(
spmcq
.
tail
>=
spmcq
.
head
)
return
enqs
-
deqs
;
return
spmcq
.
tail
-
spmcq
.
head
;
return
UINT_MAX
-
spmcq
.
head
+
1
+
spmcq
.
tail
;
}
}
static
void
static
void
spmcq_cleanup
(
void
)
spmcq_cleanup
(
void
)
{
{
free
(
spmcq
.
data
);
AZ
(
pthread_mutex_destroy
(
&
spmcq_lock
)
);
AZ
(
pthread_mutex_destroy
(
&
spmcq_deq_lock
));
AZ
(
pthread_mutex_destroy
(
&
spmcq_deq_lock
));
}
}
static
inline
int
spmcq_wrk_len_ratio
(
int
working
,
int
running
)
{
return
working
*
qlen_goal
/
running
;
}
int
int
SPMCQ_Init
(
void
)
SPMCQ_Init
(
void
)
{
{
void
*
buf
;
if
(
pthread_mutex_init
(
&
spmcq_lock
,
&
attr_lock
)
!=
0
)
size_t
n
=
config
.
maxdone
;
buf
=
calloc
(
n
,
sizeof
(
void
*
));
if
(
buf
==
NULL
)
return
(
errno
);
return
(
errno
);
if
(
pthread_mutex_init
(
&
spmcq_deq_lock
,
&
attr_lock
)
!=
0
)
if
(
pthread_mutex_init
(
&
spmcq_deq_lock
,
NULL
)
!=
0
)
return
(
errno
);
return
(
errno
);
spmcq_t
q
=
{
.
magic
=
SPMCQ_MAGIC
,
.
mask
=
n
-
1
,
.
data
=
buf
,
.
head
=
0
,
.
tail
=
0
};
memcpy
(
&
spmcq
,
&
q
,
sizeof
(
spmcq_t
));
qlen_goal
=
config
.
qlen_goal
;
qlen_goal
=
config
.
qlen_goal
;
atexit
(
spmcq_cleanup
);
atexit
(
spmcq_cleanup
);
return
(
0
);
return
(
0
);
}
}
bool
void
SPMCQ_Enq
(
void
*
ptr
)
SPMCQ_Enq
(
dataentry
*
ptr
)
{
{
if
(
spmcq_len
()
>
spmcq
.
mask
)
AZ
(
pthread_mutex_lock
(
&
spmcq_lock
));
return
false
;
assert
(
enqs
-
deqs
<
config
.
maxdone
);
spmcq
.
data
[
spmcq
.
tail
++
&
spmcq
.
mask
]
=
ptr
;
enqs
++
;
return
true
;
VSTAILQ_INSERT_TAIL
(
&
enq_head
,
ptr
,
spmcq
);
if
(
VSTAILQ_EMPTY
(
&
spmcq_head
))
VSTAILQ_CONCAT
(
&
spmcq_head
,
&
enq_head
);
AZ
(
pthread_mutex_unlock
(
&
spmcq_lock
));
}
}
void
dataentry
*
SPMCQ_Deq
(
void
)
*
SPMCQ_Deq
(
void
)
{
{
void
*
ptr
;
void
*
ptr
;
AZ
(
pthread_mutex_lock
(
&
spmcq_deq_lock
));
AZ
(
pthread_mutex_lock
(
&
spmcq_deq_lock
));
if
(
spmcq_len
()
==
0
)
if
(
VSTAILQ_EMPTY
(
&
deq_head
))
{
AZ
(
pthread_mutex_lock
(
&
spmcq_lock
));
VSTAILQ_CONCAT
(
&
deq_head
,
&
spmcq_head
);
AZ
(
pthread_mutex_unlock
(
&
spmcq_lock
));
}
if
(
VSTAILQ_EMPTY
(
&
deq_head
))
ptr
=
NULL
;
ptr
=
NULL
;
else
else
{
ptr
=
spmcq
.
data
[
spmcq
.
head
++
&
spmcq
.
mask
];
ptr
=
VSTAILQ_FIRST
(
&
deq_head
);
VSTAILQ_REMOVE_HEAD
(
&
deq_head
,
spmcq
);
deqs
++
;
}
AZ
(
pthread_mutex_unlock
(
&
spmcq_deq_lock
));
AZ
(
pthread_mutex_unlock
(
&
spmcq_deq_lock
));
return
ptr
;
return
ptr
;
}
}
void
SPMCQ_Drain
(
void
)
{
AZ
(
pthread_mutex_lock
(
&
spmcq_lock
));
VSTAILQ_CONCAT
(
&
spmcq_head
,
&
enq_head
);
AZ
(
pthread_mutex_unlock
(
&
spmcq_lock
));
}
/*
/*
* should we wake up another worker?
* should we wake up another worker?
*
*
...
@@ -127,6 +149,12 @@ void
...
@@ -127,6 +149,12 @@ void
* Q_Len > working * qlen_goal / max_workers
* Q_Len > working * qlen_goal / max_workers
*/
*/
static
inline
int
spmcq_wrk_len_ratio
(
int
working
,
int
running
)
{
return
working
*
qlen_goal
/
running
;
}
bool
bool
SPMCQ_NeedWorker
(
int
running
)
SPMCQ_NeedWorker
(
int
running
)
{
{
...
@@ -144,30 +172,3 @@ SPMCQ_StopWorker(int running)
...
@@ -144,30 +172,3 @@ SPMCQ_StopWorker(int running)
return
spmcq_len
()
<
spmcq_wrk_len_ratio
(
running
-
spmcq_datawaiter
-
1
,
return
spmcq_len
()
<
spmcq_wrk_len_ratio
(
running
-
spmcq_datawaiter
-
1
,
running
);
running
);
}
}
#ifdef TEST_DRIVER
int
main
(
int
argc
,
char
*
const
*
argv
)
{
(
void
)
argc
;
printf
(
"
\n
TEST: %s
\n
"
,
argv
[
0
]);
printf
(
"... test SMPCQ enqueue at UINT_MAX overflow
\n
"
);
config
.
maxdone
=
1024
;
SPMCQ_Init
();
spmcq
.
head
=
spmcq
.
tail
=
UINT_MAX
-
2
;
assert
(
SPMCQ_Enq
(
NULL
));
assert
(
SPMCQ_Enq
(
NULL
));
assert
(
SPMCQ_Enq
(
NULL
));
assert
(
SPMCQ_Enq
(
NULL
));
assert
(
SPMCQ_Enq
(
NULL
));
assert
(
SPMCQ_Enq
(
NULL
));
assert
(
SPMCQ_Enq
(
NULL
));
assert
(
spmcq_len
()
==
7
);
printf
(
"%s: 1 test run
\n
"
,
argv
[
0
]);
exit
(
0
);
}
#endif
src/test/Makefile.am
View file @
d6a7ba55
INCLUDES
=
-I
$(VARNISHSRC)
/include
-I
$(VARNISHSRC)
@AMQ_CFLAGS@
INCLUDES
=
-I
$(VARNISHSRC)
/include
-I
$(VARNISHSRC)
@AMQ_CFLAGS@
TESTS
=
test_parse test_data test_hash test_mq test_spmcq
\
TESTS
=
test_parse test_data test_hash test_mq test_spmcq
\
test_spmcq_loop.sh test_
spmcq_len test_
worker regress.sh
test_spmcq_loop.sh test_worker regress.sh
check_PROGRAMS
=
test_parse test_data test_hash test_mq test_spmcq
\
check_PROGRAMS
=
test_parse test_data test_hash test_mq test_spmcq
\
test_
spmcq_len test_
worker
test_worker
test_parse_SOURCES
=
\
test_parse_SOURCES
=
\
minunit.h
\
minunit.h
\
...
@@ -70,16 +70,6 @@ test_spmcq_LDADD = \
...
@@ -70,16 +70,6 @@ test_spmcq_LDADD = \
$(VARNISHSRC)
/lib/libvarnish/libvarnish.la
\
$(VARNISHSRC)
/lib/libvarnish/libvarnish.la
\
../spmcq.
$(OBJEXT)
../spmcq.
$(OBJEXT)
test_spmcq_len_SOURCES
=
\
$(VARNISHSRC)
/lib/libvarnish/libvarnish.la
\
../spmcq.c
\
../trackrdrd.h
test_spmcq_len_LDADD
=
\
$(VARNISHSRC)
/lib/libvarnish/libvarnish.la
test_spmcq_len_CFLAGS
=
-DTEST_DRIVER
test_worker_SOURCES
=
\
test_worker_SOURCES
=
\
minunit.h
\
minunit.h
\
test_worker.c
\
test_worker.c
\
...
...
src/test/test_spmcq.c
View file @
d6a7ba55
...
@@ -50,7 +50,6 @@ int run;
...
@@ -50,7 +50,6 @@ int run;
typedef
enum
{
typedef
enum
{
SUCCESS
=
0
,
SUCCESS
=
0
,
PRODUCER_QFULL
,
PRODUCER_BCAST
,
PRODUCER_BCAST
,
CONSUMER_MUTEX
,
CONSUMER_MUTEX
,
CONSUMER_WAIT
,
CONSUMER_WAIT
,
...
@@ -65,7 +64,7 @@ typedef struct {
...
@@ -65,7 +64,7 @@ typedef struct {
int
tests_run
=
0
;
int
tests_run
=
0
;
static
char
errmsg
[
BUFSIZ
];
static
char
errmsg
[
BUFSIZ
];
static
unsigned
xid
s
[
TABLE_SIZE
];
static
dataentry
entrie
s
[
TABLE_SIZE
];
static
prod_con_data_t
proddata
;
static
prod_con_data_t
proddata
;
static
prod_con_data_t
condata
[
NCON
];
static
prod_con_data_t
condata
[
NCON
];
...
@@ -83,12 +82,9 @@ static void
...
@@ -83,12 +82,9 @@ static void
unsigned
xid
=
(
unsigned
int
)
lrand48
();
unsigned
xid
=
(
unsigned
int
)
lrand48
();
for
(
int
i
=
0
;
i
<
(
1
<<
DEF_MAXOPEN_SCALE
);
i
++
)
{
for
(
int
i
=
0
;
i
<
(
1
<<
DEF_MAXOPEN_SCALE
);
i
++
)
{
xids
[
i
]
=
xid
;
entries
[
i
].
xid
=
xid
;
debug_print
(
"Producer: enqueue %d (xid = %u)
\n
"
,
++
enqs
,
xid
);
debug_print
(
"Producer: enqueue %d (xid = %u)
\n
"
,
++
enqs
,
xid
);
if
(
!
SPMCQ_Enq
(
&
xids
[
i
]))
{
SPMCQ_Enq
(
&
entries
[
i
]);
proddata
.
fail
=
PRODUCER_QFULL
;
pthread_exit
(
&
proddata
);
}
debug_print
(
"%s
\n
"
,
"Producer: broadcast"
);
debug_print
(
"%s
\n
"
,
"Producer: broadcast"
);
if
(
pthread_cond_broadcast
(
&
spmcq_datawaiter_cond
)
!=
0
)
{
if
(
pthread_cond_broadcast
(
&
spmcq_datawaiter_cond
)
!=
0
)
{
proddata
.
fail
=
PRODUCER_BCAST
;
proddata
.
fail
=
PRODUCER_BCAST
;
...
@@ -114,13 +110,13 @@ static void
...
@@ -114,13 +110,13 @@ static void
prod_con_data_t
*
pcdata
=
&
condata
[
id
-
1
];
prod_con_data_t
*
pcdata
=
&
condata
[
id
-
1
];
pcdata
->
sum
=
0
;
pcdata
->
sum
=
0
;
pcdata
->
fail
=
SUCCESS
;
pcdata
->
fail
=
SUCCESS
;
unsigned
*
xid
;
dataentry
*
entry
;
while
(
run
)
{
while
(
run
)
{
/* run may be stale at this point */
/* run may be stale at this point */
debug_print
(
"Consumer %d: attempt dequeue
\n
"
,
id
);
debug_print
(
"Consumer %d: attempt dequeue
\n
"
,
id
);
xid
=
(
unsigned
*
)
SPMCQ_Deq
();
entry
=
SPMCQ_Deq
();
if
(
xid
==
NULL
)
{
if
(
entry
==
NULL
)
{
/* grab the CV lock, which also constitutes an implicit memory
/* grab the CV lock, which also constitutes an implicit memory
barrier */
barrier */
debug_print
(
"Consumer %d: mutex
\n
"
,
id
);
debug_print
(
"Consumer %d: mutex
\n
"
,
id
);
...
@@ -141,16 +137,17 @@ static void
...
@@ -141,16 +137,17 @@ static void
break
;
break
;
}
}
}
else
{
}
else
{
/*
xid
!= NULL */
/*
entry
!= NULL */
debug_print
(
"Consumer %d: dequeue %d (xid = %u)
\n
"
,
id
,
++
deqs
,
debug_print
(
"Consumer %d: dequeue %d (xid = %u)
\n
"
,
id
,
++
deqs
,
*
xid
);
entry
->
xid
);
pcdata
->
sum
+=
*
xid
;
pcdata
->
sum
+=
entry
->
xid
;
}
}
}
}
debug_print
(
"Consumer %d: drain queue, run = %d
\n
"
,
id
,
run
);
debug_print
(
"Consumer %d: drain queue, run = %d
\n
"
,
id
,
run
);
while
((
xid
=
(
unsigned
*
)
SPMCQ_Deq
())
!=
NULL
)
{
while
((
entry
=
SPMCQ_Deq
())
!=
NULL
)
{
debug_print
(
"Consumer %d: dequeue %d (xid = %u)
\n
"
,
id
,
++
deqs
,
*
xid
);
debug_print
(
"Consumer %d: dequeue %d (xid = %u)
\n
"
,
id
,
++
deqs
,
pcdata
->
sum
+=
*
xid
;
entry
->
xid
);
pcdata
->
sum
+=
entry
->
xid
;
}
}
debug_print
(
"Consumer %d: exit
\n
"
,
id
);
debug_print
(
"Consumer %d: exit
\n
"
,
id
);
pthread_exit
((
void
*
)
pcdata
);
pthread_exit
((
void
*
)
pcdata
);
...
@@ -185,17 +182,18 @@ static char
...
@@ -185,17 +182,18 @@ static char
static
const
char
static
const
char
*
test_spmcq_enq_deq
(
void
)
*
test_spmcq_enq_deq
(
void
)
{
{
bool
r
;
#define XID 1234567890
unsigned
xid
=
1234567890
,
*
xid
2
;
dataentry
entry1
,
*
entry
2
;
printf
(
"... testing SPMCQ enqueue and dequeue
\n
"
);
printf
(
"... testing SPMCQ enqueue and dequeue
\n
"
);
r
=
SPMCQ_Enq
(
&
xid
);
mu_assert
(
"SPMCQ_Enq failed"
,
r
);
xid2
=
SPMCQ_Deq
();
entry1
.
xid
=
1234567890
;
sprintf
(
errmsg
,
"SMPCQ_Deq: expected %d, got %d"
,
xid
,
*
xid2
);
SPMCQ_Enq
(
&
entry1
);
mu_assert
(
errmsg
,
xid
==
*
xid2
);
entry2
=
SPMCQ_Deq
();
mu_assert
(
"SPMCQ_Deq: returned NULL from non-empty queue"
,
entry2
!=
NULL
);
sprintf
(
errmsg
,
"SMPCQ_Deq: expected %d, got %d"
,
XID
,
entry2
->
xid
);
mu_assert
(
errmsg
,
XID
==
entry2
->
xid
);
return
NULL
;
return
NULL
;
}
}
...
@@ -232,6 +230,7 @@ static const char
...
@@ -232,6 +230,7 @@ static const char
* waiting _after_ we have broadcasted and so miss the event.
* waiting _after_ we have broadcasted and so miss the event.
*/
*/
MAZ
(
pthread_mutex_lock
(
&
spmcq_datawaiter_lock
));
MAZ
(
pthread_mutex_lock
(
&
spmcq_datawaiter_lock
));
SPMCQ_Drain
();
run
=
0
;
run
=
0
;
MAZ
(
pthread_cond_broadcast
(
&
spmcq_datawaiter_cond
));
MAZ
(
pthread_cond_broadcast
(
&
spmcq_datawaiter_cond
));
MAZ
(
pthread_mutex_unlock
(
&
spmcq_datawaiter_lock
));
MAZ
(
pthread_mutex_unlock
(
&
spmcq_datawaiter_lock
));
...
@@ -244,9 +243,7 @@ static const char
...
@@ -244,9 +243,7 @@ static const char
mu_assert
(
errmsg
,
err
==
0
);
mu_assert
(
errmsg
,
err
==
0
);
if
(
prod_data
->
fail
!=
SUCCESS
)
{
if
(
prod_data
->
fail
!=
SUCCESS
)
{
if
(
prod_data
->
fail
==
PRODUCER_QFULL
)
if
(
prod_data
->
fail
==
PRODUCER_BCAST
)
sprintf
(
errmsg
,
"Producer: queue full"
);
else
if
(
prod_data
->
fail
==
PRODUCER_BCAST
)
sprintf
(
errmsg
,
"Producer: broadcast failed"
);
sprintf
(
errmsg
,
"Producer: broadcast failed"
);
mu_assert
(
errmsg
,
prod_data
->
fail
==
SUCCESS
);
mu_assert
(
errmsg
,
prod_data
->
fail
==
SUCCESS
);
}
}
...
@@ -313,6 +310,7 @@ static const char
...
@@ -313,6 +310,7 @@ static const char
* waiting _after_ we have broadcasted and so miss the event.
* waiting _after_ we have broadcasted and so miss the event.
*/
*/
MAZ
(
pthread_mutex_lock
(
&
spmcq_datawaiter_lock
));
MAZ
(
pthread_mutex_lock
(
&
spmcq_datawaiter_lock
));
SPMCQ_Drain
();
run
=
0
;
run
=
0
;
MAZ
(
pthread_cond_broadcast
(
&
spmcq_datawaiter_cond
));
MAZ
(
pthread_cond_broadcast
(
&
spmcq_datawaiter_cond
));
MAZ
(
pthread_mutex_unlock
(
&
spmcq_datawaiter_lock
));
MAZ
(
pthread_mutex_unlock
(
&
spmcq_datawaiter_lock
));
...
@@ -324,9 +322,7 @@ static const char
...
@@ -324,9 +322,7 @@ static const char
}
}
if
(
prod_fail
!=
SUCCESS
)
{
if
(
prod_fail
!=
SUCCESS
)
{
if
(
prod_fail
==
PRODUCER_QFULL
)
if
(
prod_fail
==
PRODUCER_BCAST
)
sprintf
(
errmsg
,
"Producer: queue full"
);
else
if
(
prod_fail
==
PRODUCER_BCAST
)
sprintf
(
errmsg
,
"Producer: broadcast failed"
);
sprintf
(
errmsg
,
"Producer: broadcast failed"
);
else
else
sprintf
(
errmsg
,
"Producer: unknown error %d"
,
prod_fail
);
sprintf
(
errmsg
,
"Producer: unknown error %d"
,
prod_fail
);
...
...
src/test/test_spmcq_loop.sh
View file @
d6a7ba55
...
@@ -9,13 +9,13 @@ echo "... running test_spmcq $N times"
...
@@ -9,13 +9,13 @@ echo "... running test_spmcq $N times"
I
=
1
I
=
1
while
[[
$I
-le
$N
]]
while
[[
$I
-le
$N
]]
do
do
# echo -en "Test $N\r"
MSG
=
$(
./test_spmcq
)
./test_spmcq
>
/dev/null
if
[
$?
-ne
0
]
;
then
if
[
$?
-ne
0
]
;
then
echo
"Test
$I
FAILED"
echo
"Test
$I
FAILED"
echo
$MSG
exit
1
exit
1
fi
fi
((
I++
))
((
I++
))
done
done
exit
0
exit
0
\ No newline at end of file
src/test/test_worker.c
View file @
d6a7ba55
...
@@ -130,7 +130,7 @@ static const char
...
@@ -130,7 +130,7 @@ static const char
sprintf
(
entry
->
data
,
"XID=%d&foo=bar&baz=quux&record=%d"
,
xid
,
i
+
1
);
sprintf
(
entry
->
data
,
"XID=%d&foo=bar&baz=quux&record=%d"
,
xid
,
i
+
1
);
entry
->
end
=
strlen
(
entry
->
data
);
entry
->
end
=
strlen
(
entry
->
data
);
entry
->
state
=
DATA_DONE
;
entry
->
state
=
DATA_DONE
;
mu_assert
(
"SPMCQ full"
,
SPMCQ_Enq
(
entry
)
==
true
);
SPMCQ_Enq
(
entry
);
}
}
WRK_Halt
();
WRK_Halt
();
...
...
src/trackrdrd.h
View file @
d6a7ba55
...
@@ -86,66 +86,6 @@ int WRK_Running(void);
...
@@ -86,66 +86,6 @@ int WRK_Running(void);
void
WRK_Halt
(
void
);
void
WRK_Halt
(
void
);
void
WRK_Shutdown
(
void
);
void
WRK_Shutdown
(
void
);
/* spmcq.c */
/* Single producer multiple consumer bounded FIFO queue */
typedef
struct
{
unsigned
magic
;
#define SPMCQ_MAGIC 0xe9a5d0a8
const
unsigned
mask
;
void
**
data
;
volatile
unsigned
head
;
volatile
unsigned
tail
;
}
spmcq_t
;
spmcq_t
spmcq
;
int
SPMCQ_Init
(
void
);
bool
SPMCQ_Enq
(
void
*
ptr
);
void
*
SPMCQ_Deq
(
void
);
bool
SPMCQ_NeedWorker
(
int
running
);
bool
SPMCQ_StopWorker
(
int
running
);
#define spmcq_wait(what) \
do { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter--; \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} while (0)
/*
* the first test is not synced, so we might enter the if body too late or
* unnecessarily
*
* * too late: doesn't matter, will come back next time
* * unnecessarily: we'll find out now
*/
#define spmcq_signal(what) \
do { \
if (spmcq_##what##waiter) { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
if (spmcq_##what##waiter) \
AZ(pthread_cond_signal(&spmcq_##what##waiter_cond)); \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} \
} while (0)
/* Producer waits for this condition when the spmc queue is full.
Consumers signal this condition after dequeue. */
pthread_cond_t
spmcq_roomwaiter_cond
;
pthread_mutex_t
spmcq_roomwaiter_lock
;
int
spmcq_roomwaiter
;
/* Consumers wait for this condition when the spmc queue is empty.
Producer signals this condition after enqueue. */
pthread_cond_t
spmcq_datawaiter_cond
;
pthread_mutex_t
spmcq_datawaiter_lock
;
int
spmcq_datawaiter
;
/* mq.c */
/* mq.c */
const
char
*
MQ_GlobalInit
(
void
);
const
char
*
MQ_GlobalInit
(
void
);
const
char
*
MQ_InitConnections
(
void
);
const
char
*
MQ_InitConnections
(
void
);
...
@@ -169,6 +109,7 @@ struct dataentry_s {
...
@@ -169,6 +109,7 @@ struct dataentry_s {
unsigned
magic
;
unsigned
magic
;
#define DATA_MAGIC 0xb41cb1e1
#define DATA_MAGIC 0xb41cb1e1
VSTAILQ_ENTRY
(
dataentry_s
)
freelist
;
VSTAILQ_ENTRY
(
dataentry_s
)
freelist
;
VSTAILQ_ENTRY
(
dataentry_s
)
spmcq
;
data_state_e
state
;
data_state_e
state
;
unsigned
xid
;
unsigned
xid
;
...
@@ -187,14 +128,9 @@ VSTAILQ_HEAD(freehead_s, dataentry_s);
...
@@ -187,14 +128,9 @@ VSTAILQ_HEAD(freehead_s, dataentry_s);
struct
data_writer_stats_s
{
struct
data_writer_stats_s
{
unsigned
nodata
;
/* Not submitted, no data */
unsigned
nodata
;
/* Not submitted, no data */
unsigned
submitted
;
/* Submitted to worker threads */
unsigned
submitted
;
/* Submitted to worker threads */
unsigned
wait_qfull
;
/* Waits for SPMCQ - should not happen */
unsigned
wait_room
;
/* waits for space in dtbl */
unsigned
wait_room
;
/* waits for space in dtbl */
unsigned
data_hi
;
/* max string length of entry->data */
unsigned
data_hi
;
/* max string length of entry->data */
unsigned
data_overflows
;
/* config.maxdata exceeded */
#ifdef REMOVE
unsigned
len_overflows
;
#endif
unsigned
data_overflows
;
};
};
/* stats protected by mutex */
/* stats protected by mutex */
...
@@ -235,6 +171,55 @@ void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
...
@@ -235,6 +171,55 @@ void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
void
DATA_Dump1
(
dataentry
*
entry
,
int
i
);
void
DATA_Dump1
(
dataentry
*
entry
,
int
i
);
void
DATA_Dump
(
void
);
void
DATA_Dump
(
void
);
/* spmcq.c */
int
SPMCQ_Init
(
void
);
void
SPMCQ_Enq
(
dataentry
*
ptr
);
dataentry
*
SPMCQ_Deq
(
void
);
void
SPMCQ_Drain
(
void
);
bool
SPMCQ_NeedWorker
(
int
running
);
bool
SPMCQ_StopWorker
(
int
running
);
#define spmcq_wait(what) \
do { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter--; \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} while (0)
/*
* the first test is not synced, so we might enter the if body too late or
* unnecessarily
*
* * too late: doesn't matter, will come back next time
* * unnecessarily: we'll find out now
*/
#define spmcq_signal(what) \
do { \
if (spmcq_##what##waiter) { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
if (spmcq_##what##waiter) \
AZ(pthread_cond_signal(&spmcq_##what##waiter_cond)); \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} \
} while (0)
/* Producer waits for this condition when the spmc queue is full.
Consumers signal this condition after dequeue. */
pthread_cond_t
spmcq_roomwaiter_cond
;
pthread_mutex_t
spmcq_roomwaiter_lock
;
int
spmcq_roomwaiter
;
/* Consumers wait for this condition when the spmc queue is empty.
Producer signals this condition after enqueue. */
pthread_cond_t
spmcq_datawaiter_cond
;
pthread_mutex_t
spmcq_datawaiter_lock
;
int
spmcq_datawaiter
;
/* trackrdrd.c */
/* trackrdrd.c */
void
HASH_Stats
(
void
);
void
HASH_Stats
(
void
);
...
...
src/worker.c
View file @
d6a7ba55
...
@@ -177,7 +177,7 @@ static void
...
@@ -177,7 +177,7 @@ static void
clientID
);
clientID
);
while
(
run
)
{
while
(
run
)
{
entry
=
(
dataentry
*
)
SPMCQ_Deq
();
entry
=
SPMCQ_Deq
();
if
(
entry
!=
NULL
)
{
if
(
entry
!=
NULL
)
{
wrk
->
deqs
++
;
wrk
->
deqs
++
;
wrk_send
(
amq_worker
,
entry
,
wrk
);
wrk_send
(
amq_worker
,
entry
,
wrk
);
...
@@ -206,6 +206,7 @@ static void
...
@@ -206,6 +206,7 @@ static void
*
*
* also re-check the stop condition under the lock
* also re-check the stop condition under the lock
*/
*/
SPMCQ_Drain
();
if
(
run
&&
((
!
entry
)
||
SPMCQ_StopWorker
(
running
)))
{
if
(
run
&&
((
!
entry
)
||
SPMCQ_StopWorker
(
running
)))
{
wrk
->
waits
++
;
wrk
->
waits
++
;
spmcq_datawaiter
++
;
spmcq_datawaiter
++
;
...
@@ -221,7 +222,7 @@ static void
...
@@ -221,7 +222,7 @@ static void
wrk
->
state
=
WRK_SHUTTINGDOWN
;
wrk
->
state
=
WRK_SHUTTINGDOWN
;
/* Prepare to exit, drain the queue */
/* Prepare to exit, drain the queue */
while
((
entry
=
(
dataentry
*
)
SPMCQ_Deq
())
!=
NULL
)
{
while
((
entry
=
SPMCQ_Deq
())
!=
NULL
)
{
wrk
->
deqs
++
;
wrk
->
deqs
++
;
wrk_send
(
amq_worker
,
entry
,
wrk
);
wrk_send
(
amq_worker
,
entry
,
wrk
);
}
}
...
@@ -335,6 +336,7 @@ WRK_Halt(void)
...
@@ -335,6 +336,7 @@ WRK_Halt(void)
* waiting _after_ we have broadcasted and so miss the event.
* waiting _after_ we have broadcasted and so miss the event.
*/
*/
AZ
(
pthread_mutex_lock
(
&
spmcq_datawaiter_lock
));
AZ
(
pthread_mutex_lock
(
&
spmcq_datawaiter_lock
));
SPMCQ_Drain
();
run
=
0
;
run
=
0
;
AZ
(
pthread_cond_broadcast
(
&
spmcq_datawaiter_cond
));
AZ
(
pthread_cond_broadcast
(
&
spmcq_datawaiter_cond
));
AZ
(
pthread_mutex_unlock
(
&
spmcq_datawaiter_lock
));
AZ
(
pthread_mutex_unlock
(
&
spmcq_datawaiter_lock
));
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment