Commit 6c3ae785 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - make check now passes

	- Stop/NeedWorker now encapsulated by the SPMCQ interface
	- spmcq_len not exposed by the SPMCQ interface
parent ac9eb9bc
......@@ -308,9 +308,9 @@ CONF_Dump(void)
confdump("maxopen.scale = %u", config.maxopen_scale);
confdump("maxdata.scale = %u", config.maxdata_scale);
confdump("qlen_goal.scale = %u", config.qlen_goal_scale);
confdump("hash_max_probes", config.hash_max_probes);
confdump("hash_ttl", config.hash_ttl);
confdump("hash_mlt", config.hash_mlt);
confdump("hash_max_probes = %u", config.hash_max_probes);
confdump("hash_ttl = %u", config.hash_ttl);
confdump("hash_mlt = %u", config.hash_mlt);
confdump("mq.uri = %s", config.mq_uri);
......
......@@ -217,7 +217,7 @@ DATA_noMT_Submit(dataentry *de)
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
if (SPMCQ_need_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter), nworkers, (1 << config.qlen_goal_scale)))
if (SPMCQ_NeedWorker())
spmcq_signal(data);
/*
......
......@@ -40,6 +40,7 @@
#include "vmb.h"
static pthread_mutex_t spmcq_deq_lock;
static unsigned qlen_goal;
static inline unsigned
spmcq_len(void)
......@@ -49,6 +50,7 @@ spmcq_len(void)
return UINT_MAX - spmcq.head + 1 + spmcq.tail;
}
#if 0
/*
* this is only approximately correct and could even become negative when values
* get updated while we read them!
......@@ -66,6 +68,7 @@ int SPMCQ_Len(void) {
return (l);
}
#endif
static void
spmcq_cleanup(void)
......@@ -74,6 +77,12 @@ spmcq_cleanup(void)
AZ(pthread_mutex_destroy(&spmcq_deq_lock));
}
static inline int
spmcq_wrk_len_ratio(int working)
{
return working * qlen_goal / nworkers;
}
int
SPMCQ_Init(void)
{
......@@ -91,6 +100,9 @@ SPMCQ_Init(void)
{ .magic = SPMCQ_MAGIC, .mask = n - 1, .data = buf, .head = 0,
.tail = 0 };
memcpy(&spmcq, &q, sizeof(spmcq_t));
qlen_goal = 1 << config.qlen_goal_scale;
atexit(spmcq_cleanup);
return(0);
}
......@@ -118,6 +130,39 @@ void
return ptr;
}
/*
* should we wake up another worker?
*
* M = l / (u x p)
*
* l: arrival rate
* u: service rate
* p: utilization
*
* to get an optimal M, we would need to measure l and u, so to
* simplify, we just try to keep the number of workers proportional to
* the queue length
*
* wake up another worker if queue is sufficiently full
* Q_Len > working * qlen_goal / max_workers
*/
bool
SPMCQ_NeedWorker(void)
{
if (nworkers == 0)
return false;
return spmcq_len() > spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter);
}
bool
SPMCQ_StopWorker(void)
{
if (nworkers == 0)
return false;
return spmcq_len() < spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter - 1);
}
#ifdef TEST_DRIVER
int
main(int argc, char * const *argv)
......@@ -127,7 +172,7 @@ main(int argc, char * const *argv)
printf("\nTEST: %s\n", argv[0]);
printf("... test SMPCQ enqueue at UINT_MAX overflow\n");
config.maxopen_scale = 10;
config.maxdone_scale = 10;
SPMCQ_Init();
spmcq.head = spmcq.tail = UINT_MAX - 2;
......@@ -139,7 +184,6 @@ main(int argc, char * const *argv)
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(spmcq_len() == 7);
assert(SPMCQ_Len() == 7);
printf("%s: 1 test run\n", argv[0]);
exit(0);
......
......@@ -21,7 +21,14 @@ test_data_SOURCES = \
../trackrdrd.h
test_data_LDADD = \
../data.$(OBJEXT)
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
test_mq_SOURCES = \
minunit.h \
......@@ -41,7 +48,13 @@ test_spmcq_SOURCES = \
test_spmcq_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../spmcq.$(OBJEXT)
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../spmcq.$(OBJEXT) \
@AMQ_LIBS@
test_spmcq_len_SOURCES = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
......@@ -49,7 +62,13 @@ test_spmcq_len_SOURCES = \
../trackrdrd.h
test_spmcq_len_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
test_spmcq_len_CFLAGS = -DTEST_DRIVER
......@@ -67,4 +86,5 @@ test_worker_LDADD = \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../monitor.$(OBJEXT) \
../hash.$(OBJEXT) \
@AMQ_LIBS@
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '2764833651 232316' ]; then
if [ "$CKSUM" != '2982448694 234027' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
# Test configuration for the varnish log tracking reader
log.file = /tmp/trackrdrd.log
pid.file = trackrdrd.pid
maxdata.scale = 2
maxopen.scale = 1
maxdata.scale = 12
maxopen.scale = 11
maxdone.scale = 10
monitor.interval = 0
nworkers = 0
......@@ -38,6 +38,9 @@
int tests_run = 0;
static char errmsg[BUFSIZ];
static struct freehead_s local_freehead
= VSTAILQ_HEAD_INITIALIZER(local_freehead);
/* N.B.: Always run this test first */
static char
*test_data_init(void)
......@@ -46,17 +49,58 @@ static char
printf("... testing data table initialization\n");
config.maxopen_scale = 0;
config.maxdata_scale = 0;
config.maxopen_scale = 10;
config.maxdone_scale = 10;
err = DATA_Init();
sprintf(errmsg, "DATA_Init: %s", strerror(err));
mu_assert(errmsg, err == 0);
sprintf(errmsg, "DATA_Init: expected table length 1024, got %d", tbl.len);
mu_assert(errmsg, tbl.len == 1024);
sprintf(errmsg, "DATA_Init: expected table length 2048, got %d", dtbl.len);
mu_assert(errmsg, dtbl.len == 2048);
return NULL;
}
static const char
*test_data_take(void)
{
printf("... testing freelist take\n");
DATA_Take_Freelist(&local_freehead);
mu_assert("Local freelist empty after take",
!VSTAILQ_EMPTY(&local_freehead));
sprintf(errmsg, "Global free count non-zero after take (%u)", dtbl.nfree);
mu_assert(errmsg, dtbl.nfree == 0);
mu_assert("Global free list non-empty after take",
VSTAILQ_EMPTY(&dtbl.freehead));
return NULL;
}
static const char
*test_data_return(void)
{
printf("... testing freelist return\n");
DATA_Return_Freelist(&local_freehead, 2048);
mu_assert("Local freelist non-empty after return",
VSTAILQ_EMPTY(&local_freehead));
sprintf(errmsg, "Expected global free count == 2048 after return (%u)",
dtbl.nfree);
mu_assert(errmsg, dtbl.nfree == 2048);
mu_assert("Global free list empty after take",
!VSTAILQ_EMPTY(&dtbl.freehead));
return NULL;
}
#if 0
/* XXX: should be tests for the hash table */
static const char
*test_data_insert(void)
{
......@@ -151,13 +195,18 @@ static const char
return NULL;
}
#endif
static const char
*all_tests(void)
{
mu_run_test(test_data_init);
#if 0
mu_run_test(test_data_insert);
mu_run_test(test_data_find);
#endif
mu_run_test(test_data_take);
mu_run_test(test_data_return);
return NULL;
}
......
......@@ -44,6 +44,8 @@
#define NCON 10
#define MIN_TABLE_SCALE (MIN_MAXOPEN_SCALE + MIN_MAXDONE_SCALE)
int run;
typedef enum {
......@@ -80,7 +82,7 @@ static void
srand48(time(NULL));
unsigned xid = (unsigned int) lrand48();
for (int i = 0; i < (1 << MIN_TABLE_SCALE); i++) {
for (int i = 0; i < (1 << MIN_MAXOPEN_SCALE); i++) {
xids[i] = xid;
debug_print("Producer: enqueue %d (xid = %u)\n", ++enqs, xid);
if (!SPMCQ_Enq(&xids[i])) {
......@@ -88,7 +90,7 @@ static void
pthread_exit(&proddata);
}
debug_print("%s\n", "Producer: broadcast");
if (pthread_cond_broadcast(&spmcq_nonempty_cond) != 0) {
if (pthread_cond_broadcast(&spmcq_datawaiter_cond) != 0) {
proddata.fail = PRODUCER_BCAST;
pthread_exit(&proddata);
}
......@@ -122,17 +124,17 @@ static void
/* grab the CV lock, which also constitutes an implicit memory
barrier */
debug_print("Consumer %d: mutex\n", id);
if (pthread_mutex_lock(&spmcq_nonempty_lock) != 0)
if (pthread_mutex_lock(&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_MUTEX);
/* run is guaranteed to be fresh here */
if (run) {
debug_print("Consumer %d: wait, run = %d\n", id, run);
if (pthread_cond_wait(&spmcq_nonempty_cond,
&spmcq_nonempty_lock) != 0)
if (pthread_cond_wait(&spmcq_datawaiter_cond,
&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_WAIT);
}
debug_print("Consumer %d: unlock\n", id);
if (pthread_mutex_unlock(&spmcq_nonempty_lock) != 0)
if (pthread_mutex_unlock(&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_MUTEX);
if (! run) {
debug_print("Consumer %d: quit signaled, run = %d\n", id, run);
......@@ -162,16 +164,17 @@ static char
printf("... testing SPMCQ initialization\n");
if (pthread_mutex_init(&spmcq_nonempty_lock, NULL) != 0) {
if (pthread_mutex_init(&spmcq_datawaiter_lock, NULL) != 0) {
sprintf(errmsg, "mutex_init failed: %s", strerror(errno));
return(errmsg);
}
if (pthread_cond_init(&spmcq_nonempty_cond, NULL) != 0) {
if (pthread_cond_init(&spmcq_datawaiter_cond, NULL) != 0) {
sprintf(errmsg, "cond_init failed: %s", strerror(errno));
return(errmsg);
}
config.maxopen_scale = 0;
config.maxopen_scale = MIN_MAXOPEN_SCALE;
config.maxdone_scale = MIN_MAXDONE_SCALE;
err = SPMCQ_Init();
sprintf(errmsg, "SPMCQ_Init: %s", strerror(err));
mu_assert(errmsg, err == 0);
......@@ -224,14 +227,14 @@ static const char
mu_assert(errmsg, err == 0);
/*
* must only modify run under spmcq_nonempty_lock to ensure that
* must only modify run under spmcq_datawaiter_lock to ensure that
* we signal all waiting consumers (otherwise a consumer could go
* waiting _after_ we have broadcasted and so miss the event.
*/
MAZ(pthread_mutex_lock(&spmcq_nonempty_lock));
MAZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
run = 0;
MAZ(pthread_cond_broadcast(&spmcq_nonempty_cond));
MAZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
MAZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
MAZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
err = pthread_join(con1, (void **) &con1_data);
sprintf(errmsg, "Failed to join consumer 1: %s", strerror(err));
......@@ -305,14 +308,14 @@ static const char
prodsum = prod_data->sum;
/*
* must only modify run under spmcq_nonempty_lock to ensure that
* must only modify run under spmcq_datawaiter_lock to ensure that
* we signal all waiting consumers (otherwise a consumer could go
* waiting _after_ we have broadcasted and so miss the event.
*/
MAZ(pthread_mutex_lock(&spmcq_nonempty_lock));
MAZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
run = 0;
MAZ(pthread_cond_broadcast(&spmcq_nonempty_cond));
MAZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
MAZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
MAZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
for (int i = 0; i < NCON; i++) {
err = pthread_join(con[i], (void **) &con_data[i]);
......
......@@ -36,6 +36,7 @@
#include "../trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
#include "libvarnish.h"
#define DEBUG 0
#define debug_print(fmt, ...) \
......@@ -58,7 +59,8 @@ static char
printf("... testing worker initialization\n");
config.maxopen_scale = 0;
config.maxopen_scale = 10;
config.maxdone_scale = 10;
config.nworkers = NWORKERS;
strcpy(config.mq_uri, "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
......@@ -72,6 +74,7 @@ static char
mu_assert(errmsg, err == 0);
AZ(LOG_Open("test_worker"));
AZ(HASH_Init());
AZ(DATA_Init());
AZ(SPMCQ_Init());
......@@ -82,6 +85,7 @@ static char
*test_worker_run(void)
{
dataentry *entry;
hashentry *he;
printf("... testing run of %d workers\n", NWORKERS);
......@@ -89,15 +93,17 @@ static char
unsigned xid = (unsigned int) lrand48();
WRK_Start();
DATA_noMT_Register();
for (int i = 0; i < 1024; i++) {
entry = DATA_Insert(xid);
entry = DATA_noMT_Get();
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
he = HASH_Insert(++xid, entry, TIM_mono());
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry->xid = xid;
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data);
entry->state = DATA_DONE;
sprintf(errmsg, "SPMCQ_Enq: queue full");
mu_assert(errmsg, SPMCQ_Enq(entry));
HASH_Submit(he);
}
WRK_Halt();
......
......@@ -76,7 +76,8 @@ spmcq_t spmcq;
int SPMCQ_Init(void);
bool SPMCQ_Enq(void *ptr);
void *SPMCQ_Deq(void);
int SPMCQ_Len(void);
bool SPMCQ_NeedWorker(void);
bool SPMCQ_StopWorker(void);
#define spmcq_wait(what) \
do { \
......@@ -118,30 +119,6 @@ pthread_cond_t spmcq_datawaiter_cond;
pthread_mutex_t spmcq_datawaiter_lock;
int spmcq_datawaiter;
/*
* should we wake up another worker?
*
* M = l / (u x p)
*
* l: arrival rate
* u: service rate
* p: utilization
*
* to get an optimal M, we would need to measure l and u, so to
* simplify, we just try to keep the number of workers proportional to
* the queue length
*
* wake up another worker if queue is sufficiently full
* Q_Len > working * qlen_goal / max_workers
*/
#define SPMCQ_need_worker(qlen, working, max_workers, qlen_goal) \
((qlen) > (working) * (qlen_goal) / max_workers)
/* stop workers when we have one more than we need */
#define SPMCQ_stop_worker(qlen, working, max_workers, qlen_goal) \
((qlen) < ((MAX(working,1)) - 1) * (qlen_goal) / max_workers)
/* mq.c */
const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv);
......
......@@ -160,7 +160,7 @@ static void
wrk_send(amq_worker, entry, wrk);
/* should we go to sleep ? */
if (SPMCQ_stop_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter), nworkers, (1 << config.qlen_goal_scale)))
if (SPMCQ_StopWorker())
goto sleep;
continue;
......@@ -188,10 +188,7 @@ static void
*
* also re-check the stop condition under the lock
*/
if (run &&
((! entry) ||
SPMCQ_stop_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter),
nworkers, (1 << config.qlen_goal_scale)))) {
if (run && ((! entry) || SPMCQ_StopWorker())) {
wrk->waits++;
spmcq_datawaiter++;
wrk->state = WRK_WAITING;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment