Commit df2c8e58 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - make check now passes

	- Stop/NeedWorker now encapsulated by the SPMCQ interface
	- spmcq_len not exposed by the SPMCQ interface
parent f9d53c4d
...@@ -308,9 +308,9 @@ CONF_Dump(void) ...@@ -308,9 +308,9 @@ CONF_Dump(void)
confdump("maxopen.scale = %u", config.maxopen_scale); confdump("maxopen.scale = %u", config.maxopen_scale);
confdump("maxdata.scale = %u", config.maxdata_scale); confdump("maxdata.scale = %u", config.maxdata_scale);
confdump("qlen_goal.scale = %u", config.qlen_goal_scale); confdump("qlen_goal.scale = %u", config.qlen_goal_scale);
confdump("hash_max_probes", config.hash_max_probes); confdump("hash_max_probes = %u", config.hash_max_probes);
confdump("hash_ttl", config.hash_ttl); confdump("hash_ttl = %u", config.hash_ttl);
confdump("hash_mlt", config.hash_mlt); confdump("hash_mlt = %u", config.hash_mlt);
confdump("mq.uri = %s", config.mq_uri); confdump("mq.uri = %s", config.mq_uri);
......
...@@ -217,7 +217,7 @@ DATA_noMT_Submit(dataentry *de) ...@@ -217,7 +217,7 @@ DATA_noMT_Submit(dataentry *de)
dtbl.w_stats.submitted++; dtbl.w_stats.submitted++;
/* should we wake up another worker? */ /* should we wake up another worker? */
if (SPMCQ_need_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter), nworkers, (1 << config.qlen_goal_scale))) if (SPMCQ_NeedWorker())
spmcq_signal(data); spmcq_signal(data);
/* /*
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "vmb.h" #include "vmb.h"
static pthread_mutex_t spmcq_deq_lock; static pthread_mutex_t spmcq_deq_lock;
static unsigned qlen_goal;
static inline unsigned static inline unsigned
spmcq_len(void) spmcq_len(void)
...@@ -49,6 +50,7 @@ spmcq_len(void) ...@@ -49,6 +50,7 @@ spmcq_len(void)
return UINT_MAX - spmcq.head + 1 + spmcq.tail; return UINT_MAX - spmcq.head + 1 + spmcq.tail;
} }
#if 0
/* /*
* this is only approximately correct and could even become negative when values * this is only approximately correct and could even become negative when values
* get updated while we read them! * get updated while we read them!
...@@ -66,6 +68,7 @@ int SPMCQ_Len(void) { ...@@ -66,6 +68,7 @@ int SPMCQ_Len(void) {
return (l); return (l);
} }
#endif
static void static void
spmcq_cleanup(void) spmcq_cleanup(void)
...@@ -74,6 +77,12 @@ spmcq_cleanup(void) ...@@ -74,6 +77,12 @@ spmcq_cleanup(void)
AZ(pthread_mutex_destroy(&spmcq_deq_lock)); AZ(pthread_mutex_destroy(&spmcq_deq_lock));
} }
static inline int
spmcq_wrk_len_ratio(int working)
{
return working * qlen_goal / nworkers;
}
int int
SPMCQ_Init(void) SPMCQ_Init(void)
{ {
...@@ -91,6 +100,9 @@ SPMCQ_Init(void) ...@@ -91,6 +100,9 @@ SPMCQ_Init(void)
{ .magic = SPMCQ_MAGIC, .mask = n - 1, .data = buf, .head = 0, { .magic = SPMCQ_MAGIC, .mask = n - 1, .data = buf, .head = 0,
.tail = 0 }; .tail = 0 };
memcpy(&spmcq, &q, sizeof(spmcq_t)); memcpy(&spmcq, &q, sizeof(spmcq_t));
qlen_goal = 1 << config.qlen_goal_scale;
atexit(spmcq_cleanup); atexit(spmcq_cleanup);
return(0); return(0);
} }
...@@ -118,6 +130,39 @@ void ...@@ -118,6 +130,39 @@ void
return ptr; return ptr;
} }
/*
* should we wake up another worker?
*
* M = l / (u x p)
*
* l: arrival rate
* u: service rate
* p: utilization
*
* to get an optimal M, we would need to measure l and u, so to
* simplify, we just try to keep the number of workers proportional to
* the queue length
*
* wake up another worker if queue is sufficiently full
* Q_Len > working * qlen_goal / max_workers
*/
bool
SPMCQ_NeedWorker(void)
{
if (nworkers == 0)
return false;
return spmcq_len() > spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter);
}
bool
SPMCQ_StopWorker(void)
{
if (nworkers == 0)
return false;
return spmcq_len() < spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter - 1);
}
#ifdef TEST_DRIVER #ifdef TEST_DRIVER
int int
main(int argc, char * const *argv) main(int argc, char * const *argv)
...@@ -127,7 +172,7 @@ main(int argc, char * const *argv) ...@@ -127,7 +172,7 @@ main(int argc, char * const *argv)
printf("\nTEST: %s\n", argv[0]); printf("\nTEST: %s\n", argv[0]);
printf("... test SMPCQ enqueue at UINT_MAX overflow\n"); printf("... test SMPCQ enqueue at UINT_MAX overflow\n");
config.maxopen_scale = 10; config.maxdone_scale = 10;
SPMCQ_Init(); SPMCQ_Init();
spmcq.head = spmcq.tail = UINT_MAX - 2; spmcq.head = spmcq.tail = UINT_MAX - 2;
...@@ -139,7 +184,6 @@ main(int argc, char * const *argv) ...@@ -139,7 +184,6 @@ main(int argc, char * const *argv)
assert(SPMCQ_Enq(NULL)); assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL)); assert(SPMCQ_Enq(NULL));
assert(spmcq_len() == 7); assert(spmcq_len() == 7);
assert(SPMCQ_Len() == 7);
printf("%s: 1 test run\n", argv[0]); printf("%s: 1 test run\n", argv[0]);
exit(0); exit(0);
......
...@@ -21,7 +21,14 @@ test_data_SOURCES = \ ...@@ -21,7 +21,14 @@ test_data_SOURCES = \
../trackrdrd.h ../trackrdrd.h
test_data_LDADD = \ test_data_LDADD = \
../data.$(OBJEXT) $(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
test_mq_SOURCES = \ test_mq_SOURCES = \
minunit.h \ minunit.h \
...@@ -41,7 +48,13 @@ test_spmcq_SOURCES = \ ...@@ -41,7 +48,13 @@ test_spmcq_SOURCES = \
test_spmcq_LDADD = \ test_spmcq_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \ $(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../spmcq.$(OBJEXT) ../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../spmcq.$(OBJEXT) \
@AMQ_LIBS@
test_spmcq_len_SOURCES = \ test_spmcq_len_SOURCES = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \ $(VARNISHSRC)/lib/libvarnish/libvarnish.la \
...@@ -49,7 +62,13 @@ test_spmcq_len_SOURCES = \ ...@@ -49,7 +62,13 @@ test_spmcq_len_SOURCES = \
../trackrdrd.h ../trackrdrd.h
test_spmcq_len_LDADD = \ test_spmcq_len_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la $(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
test_spmcq_len_CFLAGS = -DTEST_DRIVER test_spmcq_len_CFLAGS = -DTEST_DRIVER
...@@ -67,4 +86,5 @@ test_worker_LDADD = \ ...@@ -67,4 +86,5 @@ test_worker_LDADD = \
../amq.$(OBJEXT) \ ../amq.$(OBJEXT) \
../data.$(OBJEXT) \ ../data.$(OBJEXT) \
../monitor.$(OBJEXT) \ ../monitor.$(OBJEXT) \
../hash.$(OBJEXT) \
@AMQ_LIBS@ @AMQ_LIBS@
...@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf" ...@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it # the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum) CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '2764833651 232316' ]; then if [ "$CKSUM" != '2982448694 234027' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM" echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1 exit 1
fi fi
......
# Test configuration for the varnish log tracking reader # Test configuration for the varnish log tracking reader
log.file = /tmp/trackrdrd.log log.file = /tmp/trackrdrd.log
pid.file = trackrdrd.pid pid.file = trackrdrd.pid
maxdata.scale = 2 maxdata.scale = 12
maxopen.scale = 1 maxopen.scale = 11
maxdone.scale = 10
monitor.interval = 0 monitor.interval = 0
nworkers = 0 nworkers = 0
...@@ -38,6 +38,9 @@ ...@@ -38,6 +38,9 @@
int tests_run = 0; int tests_run = 0;
static char errmsg[BUFSIZ]; static char errmsg[BUFSIZ];
static struct freehead_s local_freehead
= VSTAILQ_HEAD_INITIALIZER(local_freehead);
/* N.B.: Always run this test first */ /* N.B.: Always run this test first */
static char static char
*test_data_init(void) *test_data_init(void)
...@@ -46,17 +49,58 @@ static char ...@@ -46,17 +49,58 @@ static char
printf("... testing data table initialization\n"); printf("... testing data table initialization\n");
config.maxopen_scale = 0; config.maxopen_scale = 10;
config.maxdata_scale = 0; config.maxdone_scale = 10;
err = DATA_Init(); err = DATA_Init();
sprintf(errmsg, "DATA_Init: %s", strerror(err)); sprintf(errmsg, "DATA_Init: %s", strerror(err));
mu_assert(errmsg, err == 0); mu_assert(errmsg, err == 0);
sprintf(errmsg, "DATA_Init: expected table length 1024, got %d", tbl.len); sprintf(errmsg, "DATA_Init: expected table length 2048, got %d", dtbl.len);
mu_assert(errmsg, tbl.len == 1024); mu_assert(errmsg, dtbl.len == 2048);
return NULL;
}
static const char
*test_data_take(void)
{
printf("... testing freelist take\n");
DATA_Take_Freelist(&local_freehead);
mu_assert("Local freelist empty after take",
!VSTAILQ_EMPTY(&local_freehead));
sprintf(errmsg, "Global free count non-zero after take (%u)", dtbl.nfree);
mu_assert(errmsg, dtbl.nfree == 0);
mu_assert("Global free list non-empty after take",
VSTAILQ_EMPTY(&dtbl.freehead));
return NULL;
}
static const char
*test_data_return(void)
{
printf("... testing freelist return\n");
DATA_Return_Freelist(&local_freehead, 2048);
mu_assert("Local freelist non-empty after return",
VSTAILQ_EMPTY(&local_freehead));
sprintf(errmsg, "Expected global free count == 2048 after return (%u)",
dtbl.nfree);
mu_assert(errmsg, dtbl.nfree == 2048);
mu_assert("Global free list empty after take",
!VSTAILQ_EMPTY(&dtbl.freehead));
return NULL; return NULL;
} }
#if 0
/* XXX: should be tests for the hash table */
static const char static const char
*test_data_insert(void) *test_data_insert(void)
{ {
...@@ -151,13 +195,18 @@ static const char ...@@ -151,13 +195,18 @@ static const char
return NULL; return NULL;
} }
#endif
static const char static const char
*all_tests(void) *all_tests(void)
{ {
mu_run_test(test_data_init); mu_run_test(test_data_init);
#if 0
mu_run_test(test_data_insert); mu_run_test(test_data_insert);
mu_run_test(test_data_find); mu_run_test(test_data_find);
#endif
mu_run_test(test_data_take);
mu_run_test(test_data_return);
return NULL; return NULL;
} }
......
...@@ -44,6 +44,8 @@ ...@@ -44,6 +44,8 @@
#define NCON 10 #define NCON 10
#define MIN_TABLE_SCALE (MIN_MAXOPEN_SCALE + MIN_MAXDONE_SCALE)
int run; int run;
typedef enum { typedef enum {
...@@ -80,7 +82,7 @@ static void ...@@ -80,7 +82,7 @@ static void
srand48(time(NULL)); srand48(time(NULL));
unsigned xid = (unsigned int) lrand48(); unsigned xid = (unsigned int) lrand48();
for (int i = 0; i < (1 << MIN_TABLE_SCALE); i++) { for (int i = 0; i < (1 << MIN_MAXOPEN_SCALE); i++) {
xids[i] = xid; xids[i] = xid;
debug_print("Producer: enqueue %d (xid = %u)\n", ++enqs, xid); debug_print("Producer: enqueue %d (xid = %u)\n", ++enqs, xid);
if (!SPMCQ_Enq(&xids[i])) { if (!SPMCQ_Enq(&xids[i])) {
...@@ -88,7 +90,7 @@ static void ...@@ -88,7 +90,7 @@ static void
pthread_exit(&proddata); pthread_exit(&proddata);
} }
debug_print("%s\n", "Producer: broadcast"); debug_print("%s\n", "Producer: broadcast");
if (pthread_cond_broadcast(&spmcq_nonempty_cond) != 0) { if (pthread_cond_broadcast(&spmcq_datawaiter_cond) != 0) {
proddata.fail = PRODUCER_BCAST; proddata.fail = PRODUCER_BCAST;
pthread_exit(&proddata); pthread_exit(&proddata);
} }
...@@ -122,17 +124,17 @@ static void ...@@ -122,17 +124,17 @@ static void
/* grab the CV lock, which also constitutes an implicit memory /* grab the CV lock, which also constitutes an implicit memory
barrier */ barrier */
debug_print("Consumer %d: mutex\n", id); debug_print("Consumer %d: mutex\n", id);
if (pthread_mutex_lock(&spmcq_nonempty_lock) != 0) if (pthread_mutex_lock(&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_MUTEX); consumer_exit(pcdata, CONSUMER_MUTEX);
/* run is guaranteed to be fresh here */ /* run is guaranteed to be fresh here */
if (run) { if (run) {
debug_print("Consumer %d: wait, run = %d\n", id, run); debug_print("Consumer %d: wait, run = %d\n", id, run);
if (pthread_cond_wait(&spmcq_nonempty_cond, if (pthread_cond_wait(&spmcq_datawaiter_cond,
&spmcq_nonempty_lock) != 0) &spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_WAIT); consumer_exit(pcdata, CONSUMER_WAIT);
} }
debug_print("Consumer %d: unlock\n", id); debug_print("Consumer %d: unlock\n", id);
if (pthread_mutex_unlock(&spmcq_nonempty_lock) != 0) if (pthread_mutex_unlock(&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_MUTEX); consumer_exit(pcdata, CONSUMER_MUTEX);
if (! run) { if (! run) {
debug_print("Consumer %d: quit signaled, run = %d\n", id, run); debug_print("Consumer %d: quit signaled, run = %d\n", id, run);
...@@ -162,16 +164,17 @@ static char ...@@ -162,16 +164,17 @@ static char
printf("... testing SPMCQ initialization\n"); printf("... testing SPMCQ initialization\n");
if (pthread_mutex_init(&spmcq_nonempty_lock, NULL) != 0) { if (pthread_mutex_init(&spmcq_datawaiter_lock, NULL) != 0) {
sprintf(errmsg, "mutex_init failed: %s", strerror(errno)); sprintf(errmsg, "mutex_init failed: %s", strerror(errno));
return(errmsg); return(errmsg);
} }
if (pthread_cond_init(&spmcq_nonempty_cond, NULL) != 0) { if (pthread_cond_init(&spmcq_datawaiter_cond, NULL) != 0) {
sprintf(errmsg, "cond_init failed: %s", strerror(errno)); sprintf(errmsg, "cond_init failed: %s", strerror(errno));
return(errmsg); return(errmsg);
} }
config.maxopen_scale = 0; config.maxopen_scale = MIN_MAXOPEN_SCALE;
config.maxdone_scale = MIN_MAXDONE_SCALE;
err = SPMCQ_Init(); err = SPMCQ_Init();
sprintf(errmsg, "SPMCQ_Init: %s", strerror(err)); sprintf(errmsg, "SPMCQ_Init: %s", strerror(err));
mu_assert(errmsg, err == 0); mu_assert(errmsg, err == 0);
...@@ -224,14 +227,14 @@ static const char ...@@ -224,14 +227,14 @@ static const char
mu_assert(errmsg, err == 0); mu_assert(errmsg, err == 0);
/* /*
* must only modify run under spmcq_nonempty_lock to ensure that * must only modify run under spmcq_datawaiter_lock to ensure that
* we signal all waiting consumers (otherwise a consumer could go * we signal all waiting consumers (otherwise a consumer could go
* waiting _after_ we have broadcasted and so miss the event. * waiting _after_ we have broadcasted and so miss the event.
*/ */
MAZ(pthread_mutex_lock(&spmcq_nonempty_lock)); MAZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
run = 0; run = 0;
MAZ(pthread_cond_broadcast(&spmcq_nonempty_cond)); MAZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
MAZ(pthread_mutex_unlock(&spmcq_nonempty_lock)); MAZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
err = pthread_join(con1, (void **) &con1_data); err = pthread_join(con1, (void **) &con1_data);
sprintf(errmsg, "Failed to join consumer 1: %s", strerror(err)); sprintf(errmsg, "Failed to join consumer 1: %s", strerror(err));
...@@ -305,14 +308,14 @@ static const char ...@@ -305,14 +308,14 @@ static const char
prodsum = prod_data->sum; prodsum = prod_data->sum;
/* /*
* must only modify run under spmcq_nonempty_lock to ensure that * must only modify run under spmcq_datawaiter_lock to ensure that
* we signal all waiting consumers (otherwise a consumer could go * we signal all waiting consumers (otherwise a consumer could go
* waiting _after_ we have broadcasted and so miss the event. * waiting _after_ we have broadcasted and so miss the event.
*/ */
MAZ(pthread_mutex_lock(&spmcq_nonempty_lock)); MAZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
run = 0; run = 0;
MAZ(pthread_cond_broadcast(&spmcq_nonempty_cond)); MAZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
MAZ(pthread_mutex_unlock(&spmcq_nonempty_lock)); MAZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
for (int i = 0; i < NCON; i++) { for (int i = 0; i < NCON; i++) {
err = pthread_join(con[i], (void **) &con_data[i]); err = pthread_join(con[i], (void **) &con_data[i]);
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include "../trackrdrd.h" #include "../trackrdrd.h"
#include "vas.h" #include "vas.h"
#include "miniobj.h" #include "miniobj.h"
#include "libvarnish.h"
#define DEBUG 0 #define DEBUG 0
#define debug_print(fmt, ...) \ #define debug_print(fmt, ...) \
...@@ -58,7 +59,8 @@ static char ...@@ -58,7 +59,8 @@ static char
printf("... testing worker initialization\n"); printf("... testing worker initialization\n");
config.maxopen_scale = 0; config.maxopen_scale = 10;
config.maxdone_scale = 10;
config.nworkers = NWORKERS; config.nworkers = NWORKERS;
strcpy(config.mq_uri, "tcp://localhost:61616"); strcpy(config.mq_uri, "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test"); strcpy(config.mq_qname, "lhoste/tracking/test");
...@@ -72,6 +74,7 @@ static char ...@@ -72,6 +74,7 @@ static char
mu_assert(errmsg, err == 0); mu_assert(errmsg, err == 0);
AZ(LOG_Open("test_worker")); AZ(LOG_Open("test_worker"));
AZ(HASH_Init());
AZ(DATA_Init()); AZ(DATA_Init());
AZ(SPMCQ_Init()); AZ(SPMCQ_Init());
...@@ -82,6 +85,7 @@ static char ...@@ -82,6 +85,7 @@ static char
*test_worker_run(void) *test_worker_run(void)
{ {
dataentry *entry; dataentry *entry;
hashentry *he;
printf("... testing run of %d workers\n", NWORKERS); printf("... testing run of %d workers\n", NWORKERS);
...@@ -89,15 +93,17 @@ static char ...@@ -89,15 +93,17 @@ static char
unsigned xid = (unsigned int) lrand48(); unsigned xid = (unsigned int) lrand48();
WRK_Start(); WRK_Start();
DATA_noMT_Register();
for (int i = 0; i < 1024; i++) { for (int i = 0; i < 1024; i++) {
entry = DATA_Insert(xid); entry = DATA_noMT_Get();
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC); CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
he = HASH_Insert(++xid, entry, TIM_mono());
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry->xid = xid; entry->xid = xid;
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1); sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data); entry->end = strlen(entry->data);
entry->state = DATA_DONE; entry->state = DATA_DONE;
sprintf(errmsg, "SPMCQ_Enq: queue full"); HASH_Submit(he);
mu_assert(errmsg, SPMCQ_Enq(entry));
} }
WRK_Halt(); WRK_Halt();
......
...@@ -76,7 +76,8 @@ spmcq_t spmcq; ...@@ -76,7 +76,8 @@ spmcq_t spmcq;
int SPMCQ_Init(void); int SPMCQ_Init(void);
bool SPMCQ_Enq(void *ptr); bool SPMCQ_Enq(void *ptr);
void *SPMCQ_Deq(void); void *SPMCQ_Deq(void);
int SPMCQ_Len(void); bool SPMCQ_NeedWorker(void);
bool SPMCQ_StopWorker(void);
#define spmcq_wait(what) \ #define spmcq_wait(what) \
do { \ do { \
...@@ -118,30 +119,6 @@ pthread_cond_t spmcq_datawaiter_cond; ...@@ -118,30 +119,6 @@ pthread_cond_t spmcq_datawaiter_cond;
pthread_mutex_t spmcq_datawaiter_lock; pthread_mutex_t spmcq_datawaiter_lock;
int spmcq_datawaiter; int spmcq_datawaiter;
/*
* should we wake up another worker?
*
* M = l / (u x p)
*
* l: arrival rate
* u: service rate
* p: utilization
*
* to get an optimal M, we would need to measure l and u, so to
* simplify, we just try to keep the number of workers proportional to
* the queue length
*
* wake up another worker if queue is sufficiently full
* Q_Len > working * qlen_goal / max_workers
*/
#define SPMCQ_need_worker(qlen, working, max_workers, qlen_goal) \
((qlen) > (working) * (qlen_goal) / max_workers)
/* stop workers when we have one more than we need */
#define SPMCQ_stop_worker(qlen, working, max_workers, qlen_goal) \
((qlen) < ((MAX(working,1)) - 1) * (qlen_goal) / max_workers)
/* mq.c */ /* mq.c */
const char *MQ_GlobalInit(void); const char *MQ_GlobalInit(void);
const char *MQ_WorkerInit(void **priv); const char *MQ_WorkerInit(void **priv);
......
...@@ -160,7 +160,7 @@ static void ...@@ -160,7 +160,7 @@ static void
wrk_send(amq_worker, entry, wrk); wrk_send(amq_worker, entry, wrk);
/* should we go to sleep ? */ /* should we go to sleep ? */
if (SPMCQ_stop_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter), nworkers, (1 << config.qlen_goal_scale))) if (SPMCQ_StopWorker())
goto sleep; goto sleep;
continue; continue;
...@@ -188,10 +188,7 @@ static void ...@@ -188,10 +188,7 @@ static void
* *
* also re-check the stop condition under the lock * also re-check the stop condition under the lock
*/ */
if (run && if (run && ((! entry) || SPMCQ_StopWorker())) {
((! entry) ||
SPMCQ_stop_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter),
nworkers, (1 << config.qlen_goal_scale)))) {
wrk->waits++; wrk->waits++;
spmcq_datawaiter++; spmcq_datawaiter++;
wrk->state = WRK_WAITING; wrk->state = WRK_WAITING;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment