Commit c51e7ce9 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: reworked SPMCQ, waits for a full queue are never necessary

parent a3d1bc25
......@@ -232,11 +232,7 @@ data_submit(dataentry *de)
return;
}
while (!SPMCQ_Enq((void *) de)) {
dtbl.w_stats.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
spmcq_wait(room);
}
SPMCQ_Enq(de);
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
......
......@@ -54,7 +54,6 @@ log_output(void)
"len=%u "
"nodata=%u "
"submitted=%u "
"wait_qfull=%u "
"wait_room=%u "
"data_hi=%u "
"data_overflows=%u "
......@@ -68,7 +67,6 @@ log_output(void)
dtbl.len,
dtbl.w_stats.nodata,
dtbl.w_stats.submitted,
dtbl.w_stats.wait_qfull,
dtbl.w_stats.wait_room,
dtbl.w_stats.data_hi,
dtbl.w_stats.data_overflows,
......
......@@ -27,6 +27,7 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* Single producer multiple consumer bounded FIFO queue
*/
#include <stdlib.h>
......@@ -37,79 +38,100 @@
#include "trackrdrd.h"
#include "vas.h"
#include "vmb.h"
#include "vqueue.h"
#if 0
typedef struct {
unsigned magic;
#define SPMCQ_MAGIC 0xe9a5d0a8
const unsigned mask;
void **data;
volatile unsigned head;
volatile unsigned tail;
} spmcq_t;
spmcq_t spmcq;
#endif
static volatile unsigned long enqs = 0, deqs = 0;
static pthread_mutex_t spmcq_lock;
static pthread_mutex_t spmcq_deq_lock;
static unsigned qlen_goal;
VSTAILQ_HEAD(spmcq_s, dataentry_s);
struct spmcq_s spmcq_head = VSTAILQ_HEAD_INITIALIZER(spmcq_head);
struct spmcq_s enq_head = VSTAILQ_HEAD_INITIALIZER(enq_head);
struct spmcq_s deq_head = VSTAILQ_HEAD_INITIALIZER(deq_head);
static inline unsigned
spmcq_len(void)
{
if (spmcq.tail >= spmcq.head)
return spmcq.tail - spmcq.head;
return UINT_MAX - spmcq.head + 1 + spmcq.tail;
return enqs - deqs;
}
static void
spmcq_cleanup(void)
{
free(spmcq.data);
AZ(pthread_mutex_destroy(&spmcq_lock));
AZ(pthread_mutex_destroy(&spmcq_deq_lock));
}
static inline int
spmcq_wrk_len_ratio(int working, int running)
{
return working * qlen_goal / running;
}
int
SPMCQ_Init(void)
{
void *buf;
size_t n = config.maxdone;
buf = calloc(n, sizeof(void *));
if (buf == NULL)
if (pthread_mutex_init(&spmcq_lock, &attr_lock) != 0)
return(errno);
if (pthread_mutex_init(&spmcq_deq_lock, NULL) != 0)
if (pthread_mutex_init(&spmcq_deq_lock, &attr_lock) != 0)
return(errno);
spmcq_t q =
{ .magic = SPMCQ_MAGIC, .mask = n - 1, .data = buf, .head = 0,
.tail = 0 };
memcpy(&spmcq, &q, sizeof(spmcq_t));
qlen_goal = config.qlen_goal;
atexit(spmcq_cleanup);
return(0);
}
bool
SPMCQ_Enq(void *ptr)
void
SPMCQ_Enq(dataentry *ptr)
{
if (spmcq_len() > spmcq.mask)
return false;
spmcq.data[spmcq.tail++ & spmcq.mask] = ptr;
return true;
AZ(pthread_mutex_lock(&spmcq_lock));
assert(enqs - deqs < config.maxdone);
enqs++;
VSTAILQ_INSERT_TAIL(&enq_head, ptr, spmcq);
if (VSTAILQ_EMPTY(&spmcq_head))
VSTAILQ_CONCAT(&spmcq_head, &enq_head);
AZ(pthread_mutex_unlock(&spmcq_lock));
}
void
dataentry
*SPMCQ_Deq(void)
{
void *ptr;
AZ(pthread_mutex_lock(&spmcq_deq_lock));
if (spmcq_len() == 0)
if (VSTAILQ_EMPTY(&deq_head)) {
AZ(pthread_mutex_lock(&spmcq_lock));
VSTAILQ_CONCAT(&deq_head, &spmcq_head);
AZ(pthread_mutex_unlock(&spmcq_lock));
}
if (VSTAILQ_EMPTY(&deq_head))
ptr = NULL;
else
ptr = spmcq.data[spmcq.head++ & spmcq.mask];
else {
ptr = VSTAILQ_FIRST(&deq_head);
VSTAILQ_REMOVE_HEAD(&deq_head, spmcq);
deqs++;
}
AZ(pthread_mutex_unlock(&spmcq_deq_lock));
return ptr;
}
void
SPMCQ_Drain(void)
{
AZ(pthread_mutex_lock(&spmcq_lock));
VSTAILQ_CONCAT(&spmcq_head, &enq_head);
AZ(pthread_mutex_unlock(&spmcq_lock));
}
/*
* should we wake up another worker?
*
......@@ -127,6 +149,12 @@ void
* Q_Len > working * qlen_goal / max_workers
*/
static inline int
spmcq_wrk_len_ratio(int working, int running)
{
return working * qlen_goal / running;
}
bool
SPMCQ_NeedWorker(int running)
{
......@@ -144,30 +172,3 @@ SPMCQ_StopWorker(int running)
return spmcq_len() < spmcq_wrk_len_ratio(running - spmcq_datawaiter - 1,
running);
}
#ifdef TEST_DRIVER
int
main(int argc, char * const *argv)
{
(void) argc;
printf("\nTEST: %s\n", argv[0]);
printf("... test SMPCQ enqueue at UINT_MAX overflow\n");
config.maxdone = 1024;
SPMCQ_Init();
spmcq.head = spmcq.tail = UINT_MAX - 2;
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(spmcq_len() == 7);
printf("%s: 1 test run\n", argv[0]);
exit(0);
}
#endif
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) @AMQ_CFLAGS@
TESTS = test_parse test_data test_hash test_mq test_spmcq \
test_spmcq_loop.sh test_spmcq_len test_worker regress.sh
test_spmcq_loop.sh test_worker regress.sh
check_PROGRAMS = test_parse test_data test_hash test_mq test_spmcq \
test_spmcq_len test_worker
test_worker
test_parse_SOURCES = \
minunit.h \
......@@ -70,16 +70,6 @@ test_spmcq_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../spmcq.$(OBJEXT)
test_spmcq_len_SOURCES = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../spmcq.c \
../trackrdrd.h
test_spmcq_len_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la
test_spmcq_len_CFLAGS = -DTEST_DRIVER
test_worker_SOURCES = \
minunit.h \
test_worker.c \
......
......@@ -50,7 +50,6 @@ int run;
typedef enum {
SUCCESS = 0,
PRODUCER_QFULL,
PRODUCER_BCAST,
CONSUMER_MUTEX,
CONSUMER_WAIT,
......@@ -65,7 +64,7 @@ typedef struct {
int tests_run = 0;
static char errmsg[BUFSIZ];
static unsigned xids[TABLE_SIZE];
static dataentry entries[TABLE_SIZE];
static prod_con_data_t proddata;
static prod_con_data_t condata[NCON];
......@@ -83,12 +82,9 @@ static void
unsigned xid = (unsigned int) lrand48();
for (int i = 0; i < (1 << DEF_MAXOPEN_SCALE); i++) {
xids[i] = xid;
entries[i].xid = xid;
debug_print("Producer: enqueue %d (xid = %u)\n", ++enqs, xid);
if (!SPMCQ_Enq(&xids[i])) {
proddata.fail = PRODUCER_QFULL;
pthread_exit(&proddata);
}
SPMCQ_Enq(&entries[i]);
debug_print("%s\n", "Producer: broadcast");
if (pthread_cond_broadcast(&spmcq_datawaiter_cond) != 0) {
proddata.fail = PRODUCER_BCAST;
......@@ -114,13 +110,13 @@ static void
prod_con_data_t *pcdata = &condata[id-1];
pcdata->sum = 0;
pcdata->fail = SUCCESS;
unsigned *xid;
dataentry *entry;
while (run) {
/* run may be stale at this point */
debug_print("Consumer %d: attempt dequeue\n", id);
xid = (unsigned *) SPMCQ_Deq();
if (xid == NULL) {
entry = SPMCQ_Deq();
if (entry == NULL) {
/* grab the CV lock, which also constitutes an implicit memory
barrier */
debug_print("Consumer %d: mutex\n", id);
......@@ -141,16 +137,17 @@ static void
break;
}
} else {
/* xid != NULL */
/* entry != NULL */
debug_print("Consumer %d: dequeue %d (xid = %u)\n", id, ++deqs,
*xid);
pcdata->sum += *xid;
entry->xid);
pcdata->sum += entry->xid;
}
}
debug_print("Consumer %d: drain queue, run = %d\n", id, run);
while ((xid = (unsigned *) SPMCQ_Deq()) != NULL) {
debug_print("Consumer %d: dequeue %d (xid = %u)\n", id, ++deqs, *xid);
pcdata->sum += *xid;
while ((entry = SPMCQ_Deq()) != NULL) {
debug_print("Consumer %d: dequeue %d (xid = %u)\n", id, ++deqs,
entry->xid);
pcdata->sum += entry->xid;
}
debug_print("Consumer %d: exit\n", id);
pthread_exit((void *) pcdata);
......@@ -185,17 +182,18 @@ static char
static const char
*test_spmcq_enq_deq(void)
{
bool r;
unsigned xid = 1234567890, *xid2;
#define XID 1234567890
dataentry entry1, *entry2;
printf("... testing SPMCQ enqueue and dequeue\n");
r = SPMCQ_Enq(&xid);
mu_assert("SPMCQ_Enq failed", r);
xid2 = SPMCQ_Deq();
sprintf(errmsg, "SMPCQ_Deq: expected %d, got %d", xid, *xid2);
mu_assert(errmsg, xid == *xid2);
entry1.xid = 1234567890;
SPMCQ_Enq(&entry1);
entry2 = SPMCQ_Deq();
mu_assert("SPMCQ_Deq: returned NULL from non-empty queue", entry2 != NULL);
sprintf(errmsg, "SMPCQ_Deq: expected %d, got %d", XID, entry2->xid);
mu_assert(errmsg, XID == entry2->xid);
return NULL;
}
......@@ -232,6 +230,7 @@ static const char
* waiting _after_ we have broadcasted and so miss the event.
*/
MAZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
SPMCQ_Drain();
run = 0;
MAZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
MAZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
......@@ -244,9 +243,7 @@ static const char
mu_assert(errmsg, err == 0);
if (prod_data->fail != SUCCESS) {
if (prod_data->fail == PRODUCER_QFULL)
sprintf(errmsg, "Producer: queue full");
else if (prod_data->fail == PRODUCER_BCAST)
if (prod_data->fail == PRODUCER_BCAST)
sprintf(errmsg, "Producer: broadcast failed");
mu_assert(errmsg, prod_data->fail == SUCCESS);
}
......@@ -313,6 +310,7 @@ static const char
* waiting _after_ we have broadcasted and so miss the event.
*/
MAZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
SPMCQ_Drain();
run = 0;
MAZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
MAZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
......@@ -324,9 +322,7 @@ static const char
}
if (prod_fail != SUCCESS) {
if (prod_fail == PRODUCER_QFULL)
sprintf(errmsg, "Producer: queue full");
else if (prod_fail == PRODUCER_BCAST)
if (prod_fail == PRODUCER_BCAST)
sprintf(errmsg, "Producer: broadcast failed");
else
sprintf(errmsg, "Producer: unknown error %d", prod_fail);
......
......@@ -9,13 +9,13 @@ echo "... running test_spmcq $N times"
I=1
while [[ $I -le $N ]]
do
# echo -en "Test $N\r"
./test_spmcq > /dev/null
MSG=$(./test_spmcq)
if [ $? -ne 0 ]; then
echo "Test $I FAILED"
echo $MSG
exit 1
fi
((I++))
done
exit 0
\ No newline at end of file
exit 0
......@@ -130,7 +130,7 @@ static const char
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data);
entry->state = DATA_DONE;
mu_assert("SPMCQ full", SPMCQ_Enq(entry) == true);
SPMCQ_Enq(entry);
}
WRK_Halt();
......
......@@ -86,66 +86,6 @@ int WRK_Running(void);
void WRK_Halt(void);
void WRK_Shutdown(void);
/* spmcq.c */
/* Single producer multiple consumer bounded FIFO queue */
typedef struct {
unsigned magic;
#define SPMCQ_MAGIC 0xe9a5d0a8
const unsigned mask;
void **data;
volatile unsigned head;
volatile unsigned tail;
} spmcq_t;
spmcq_t spmcq;
int SPMCQ_Init(void);
bool SPMCQ_Enq(void *ptr);
void *SPMCQ_Deq(void);
bool SPMCQ_NeedWorker(int running);
bool SPMCQ_StopWorker(int running);
#define spmcq_wait(what) \
do { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter--; \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} while (0)
/*
* the first test is not synced, so we might enter the if body too late or
* unnecessarily
*
* * too late: doesn't matter, will come back next time
* * unnecessarily: we'll find out now
*/
#define spmcq_signal(what) \
do { \
if (spmcq_##what##waiter) { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
if (spmcq_##what##waiter) \
AZ(pthread_cond_signal(&spmcq_##what##waiter_cond)); \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} \
} while (0)
/* Producer waits for this condition when the spmc queue is full.
Consumers signal this condition after dequeue. */
pthread_cond_t spmcq_roomwaiter_cond;
pthread_mutex_t spmcq_roomwaiter_lock;
int spmcq_roomwaiter;
/* Consumers wait for this condition when the spmc queue is empty.
Producer signals this condition after enqueue. */
pthread_cond_t spmcq_datawaiter_cond;
pthread_mutex_t spmcq_datawaiter_lock;
int spmcq_datawaiter;
/* mq.c */
const char *MQ_GlobalInit(void);
const char *MQ_InitConnections(void);
......@@ -169,6 +109,7 @@ struct dataentry_s {
unsigned magic;
#define DATA_MAGIC 0xb41cb1e1
VSTAILQ_ENTRY(dataentry_s) freelist;
VSTAILQ_ENTRY(dataentry_s) spmcq;
data_state_e state;
unsigned xid;
......@@ -187,14 +128,9 @@ VSTAILQ_HEAD(freehead_s, dataentry_s);
struct data_writer_stats_s {
unsigned nodata; /* Not submitted, no data */
unsigned submitted; /* Submitted to worker threads */
unsigned wait_qfull; /* Waits for SPMCQ - should not happen */
unsigned wait_room; /* waits for space in dtbl */
unsigned data_hi; /* max string length of entry->data */
#ifdef REMOVE
unsigned len_overflows;
#endif
unsigned data_overflows;
unsigned data_overflows; /* config.maxdata exceeded */
};
/* stats protected by mutex */
......@@ -235,6 +171,55 @@ void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
void DATA_Dump1(dataentry *entry, int i);
void DATA_Dump(void);
/* spmcq.c */
int SPMCQ_Init(void);
void SPMCQ_Enq(dataentry *ptr);
dataentry *SPMCQ_Deq(void);
void SPMCQ_Drain(void);
bool SPMCQ_NeedWorker(int running);
bool SPMCQ_StopWorker(int running);
#define spmcq_wait(what) \
do { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter--; \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} while (0)
/*
* the first test is not synced, so we might enter the if body too late or
* unnecessarily
*
* * too late: doesn't matter, will come back next time
* * unnecessarily: we'll find out now
*/
#define spmcq_signal(what) \
do { \
if (spmcq_##what##waiter) { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
if (spmcq_##what##waiter) \
AZ(pthread_cond_signal(&spmcq_##what##waiter_cond)); \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} \
} while (0)
/* Producer waits for this condition when the spmc queue is full.
Consumers signal this condition after dequeue. */
pthread_cond_t spmcq_roomwaiter_cond;
pthread_mutex_t spmcq_roomwaiter_lock;
int spmcq_roomwaiter;
/* Consumers wait for this condition when the spmc queue is empty.
Producer signals this condition after enqueue. */
pthread_cond_t spmcq_datawaiter_cond;
pthread_mutex_t spmcq_datawaiter_lock;
int spmcq_datawaiter;
/* trackrdrd.c */
void HASH_Stats(void);
......
......@@ -177,7 +177,7 @@ static void
clientID);
while (run) {
entry = (dataentry *) SPMCQ_Deq();
entry = SPMCQ_Deq();
if (entry != NULL) {
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
......@@ -206,6 +206,7 @@ static void
*
* also re-check the stop condition under the lock
*/
SPMCQ_Drain();
if (run && ((! entry) || SPMCQ_StopWorker(running))) {
wrk->waits++;
spmcq_datawaiter++;
......@@ -221,7 +222,7 @@ static void
wrk->state = WRK_SHUTTINGDOWN;
/* Prepare to exit, drain the queue */
while ((entry = (dataentry *) SPMCQ_Deq()) != NULL) {
while ((entry = SPMCQ_Deq()) != NULL) {
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
}
......@@ -335,6 +336,7 @@ WRK_Halt(void)
* waiting _after_ we have broadcasted and so miss the event.
*/
AZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
SPMCQ_Drain();
run = 0;
AZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
AZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment