Commit e86f8068 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - all data & functions exclusive to VSL reader are now

	static in trackrdrd.c (part of data.c and all of hash.c)
	- replaced the global nworkers with WRK_Running(), since
	nworkers caused too many dependencies (esp. for unit tests)
parent 6d0bbd38
...@@ -125,114 +125,6 @@ DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned) ...@@ -125,114 +125,6 @@ DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
AZ(pthread_mutex_unlock(&dtbl.freelist_lock)); AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
} }
/* ------------------------------------------------------------ */
/* noMT Functions to be used by one thread (the VSL reader) only */
/* ------------------------------------------------------------ */
static struct freehead_s data_noMT_freelist =
VSTAILQ_HEAD_INITIALIZER(data_noMT_freelist);
static pthread_t data_noMT_threadid = 0;
#if defined(WITHOUT_EXPENSIVE_ASSERTS) || defined(WITHOUT_ASSERTS)
#define DATA_noMT_check_thread() do {} while(0)
#else
#define DATA_noMT_check_thread() \
assert(data_noMT_threadid == pthread_self());
#endif
/* the one thread has to register */
void
DATA_noMT_Register(void)
{
AZ(data_noMT_threadid);
data_noMT_threadid = pthread_self();
}
/* efficiently retrieve a single data entry */
dataentry
*DATA_noMT_Get(void)
{
dataentry *data;
DATA_noMT_check_thread();
take:
data = VSTAILQ_FIRST(&data_noMT_freelist);
if (data) {
VSTAILQ_REMOVE_HEAD(&data_noMT_freelist, freelist);
} else {
assert(VSTAILQ_EMPTY(&data_noMT_freelist));
while (dtbl.nfree == 0) {
dtbl.w_stats.wait_room++;
spmcq_wait(room);
}
DATA_Take_Freelist(&data_noMT_freelist);
assert(! VSTAILQ_EMPTY(&data_noMT_freelist));
goto take;
}
assert(data->state == DATA_EMPTY);
return (data);
}
/* return to our own local cache */
static inline void
data_nomt_free(dataentry *de)
{
DATA_noMT_check_thread();
assert(de->state == DATA_EMPTY);
VSTAILQ_INSERT_HEAD(&data_noMT_freelist, de, freelist);
}
void
DATA_noMT_Free(dataentry *de)
{
data_nomt_free(de);
}
void
DATA_noMT_Submit(dataentry *de)
{
DATA_noMT_check_thread();
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
assert(de->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, de->data);
if (de->hasdata == false) {
de->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
data_nomt_free(de);
return;
}
while (!SPMCQ_Enq((void *) de)) {
dtbl.w_stats.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
spmcq_wait(room);
}
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
if (SPMCQ_NeedWorker())
spmcq_signal(data);
/*
* base case: wake up a worker if all are sleeping
*
* this is an un-synced access to spmcq_data_waiter, but
* if we don't wake them up now, we will next time around
*/
if (nworkers == spmcq_datawaiter)
spmcq_signal(data);
}
void void
DATA_Dump1(dataentry *entry, int i) DATA_Dump1(dataentry *entry, int i)
{ {
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
* SUCH DAMAGE. * SUCH DAMAGE.
* *
*/ */
#if 0
#include <stdlib.h> #include <stdlib.h>
#include <syslog.h> #include <syslog.h>
#include <string.h> #include <string.h>
...@@ -343,3 +343,4 @@ HASH_Dump(void) ...@@ -343,3 +343,4 @@ HASH_Dump(void)
for (int i = 0; i < htbl.len; i++) for (int i = 0; i < htbl.len; i++)
HASH_Dump1(&htbl.entry[i], i); HASH_Dump1(&htbl.entry[i], i);
} }
#endif
...@@ -43,41 +43,12 @@ static int run; ...@@ -43,41 +43,12 @@ static int run;
static void static void
log_output(void) log_output(void)
{ {
LOG_Log(LOG_INFO,
"Hash table: "
"len=%u "
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
htbl.len,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
100.0 * htbl.open / htbl.len,
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.occ_hi_this = 0;
/* Eliminate the dependency of trackrdrd.o for unit tests */
#ifndef TEST_DRIVER
HASH_Stats();
#endif
LOG_Log(LOG_INFO, LOG_Log(LOG_INFO,
"Data table writer: " "Data table writer: "
"len=%u " "len=%u "
......
...@@ -78,9 +78,9 @@ spmcq_cleanup(void) ...@@ -78,9 +78,9 @@ spmcq_cleanup(void)
} }
static inline int static inline int
spmcq_wrk_len_ratio(int working) spmcq_wrk_len_ratio(int working, int running)
{ {
return working * qlen_goal / nworkers; return working * qlen_goal / running;
} }
int int
...@@ -148,19 +148,21 @@ void ...@@ -148,19 +148,21 @@ void
*/ */
bool bool
SPMCQ_NeedWorker(void) SPMCQ_NeedWorker(int running)
{ {
if (nworkers == 0) if (running == 0)
return false; return false;
return spmcq_len() > spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter); return spmcq_len() > spmcq_wrk_len_ratio(running - spmcq_datawaiter,
running);
} }
bool bool
SPMCQ_StopWorker(void) SPMCQ_StopWorker(int running)
{ {
if (nworkers == 0) if (running == 0)
return false; return false;
return spmcq_len() < spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter - 1); return spmcq_len() < spmcq_wrk_len_ratio(running - spmcq_datawaiter - 1,
running);
} }
#ifdef TEST_DRIVER #ifdef TEST_DRIVER
......
...@@ -22,13 +22,7 @@ test_data_SOURCES = \ ...@@ -22,13 +22,7 @@ test_data_SOURCES = \
test_data_LDADD = \ test_data_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \ $(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \ ../data.$(OBJEXT)
../monitor.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
test_mq_SOURCES = \ test_mq_SOURCES = \
minunit.h \ minunit.h \
...@@ -48,13 +42,7 @@ test_spmcq_SOURCES = \ ...@@ -48,13 +42,7 @@ test_spmcq_SOURCES = \
test_spmcq_LDADD = \ test_spmcq_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \ $(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \ ../spmcq.$(OBJEXT)
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../spmcq.$(OBJEXT) \
@AMQ_LIBS@
test_spmcq_len_SOURCES = \ test_spmcq_len_SOURCES = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \ $(VARNISHSRC)/lib/libvarnish/libvarnish.la \
...@@ -62,19 +50,14 @@ test_spmcq_len_SOURCES = \ ...@@ -62,19 +50,14 @@ test_spmcq_len_SOURCES = \
../trackrdrd.h ../trackrdrd.h
test_spmcq_len_LDADD = \ test_spmcq_len_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \ $(VARNISHSRC)/lib/libvarnish/libvarnish.la
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
test_spmcq_len_CFLAGS = -DTEST_DRIVER test_spmcq_len_CFLAGS = -DTEST_DRIVER
test_worker_SOURCES = \ test_worker_SOURCES = \
minunit.h \ minunit.h \
test_worker.c \ test_worker.c \
../monitor.c \
../trackrdrd.h ../trackrdrd.h
test_worker_LDADD = \ test_worker_LDADD = \
...@@ -85,6 +68,6 @@ test_worker_LDADD = \ ...@@ -85,6 +68,6 @@ test_worker_LDADD = \
../spmcq.$(OBJEXT) \ ../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \ ../amq.$(OBJEXT) \
../data.$(OBJEXT) \ ../data.$(OBJEXT) \
../monitor.$(OBJEXT) \
../hash.$(OBJEXT) \
@AMQ_LIBS@ @AMQ_LIBS@
test_worker_CFLAGS = -DTEST_DRIVER
...@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf" ...@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it # the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum) CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '2982448694 234027' ]; then if [ "$CKSUM" != '1346049411 234027' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM" echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1 exit 1
fi fi
......
...@@ -74,7 +74,6 @@ static char ...@@ -74,7 +74,6 @@ static char
mu_assert(errmsg, err == 0); mu_assert(errmsg, err == 0);
AZ(LOG_Open("test_worker")); AZ(LOG_Open("test_worker"));
AZ(HASH_Init());
AZ(DATA_Init()); AZ(DATA_Init());
AZ(SPMCQ_Init()); AZ(SPMCQ_Init());
...@@ -85,7 +84,6 @@ static char ...@@ -85,7 +84,6 @@ static char
*test_worker_run(void) *test_worker_run(void)
{ {
dataentry *entry; dataentry *entry;
hashentry *he;
printf("... testing run of %d workers\n", NWORKERS); printf("... testing run of %d workers\n", NWORKERS);
...@@ -93,17 +91,14 @@ static char ...@@ -93,17 +91,14 @@ static char
unsigned xid = (unsigned int) lrand48(); unsigned xid = (unsigned int) lrand48();
WRK_Start(); WRK_Start();
DATA_noMT_Register();
for (int i = 0; i < 1024; i++) { for (int i = 0; i < 1024; i++) {
entry = DATA_noMT_Get(); entry = &dtbl.entry[i];
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC); CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
he = HASH_Insert(++xid, entry, TIM_mono());
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry->xid = xid; entry->xid = xid;
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1); sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data); entry->end = strlen(entry->data);
entry->state = DATA_DONE; entry->state = DATA_DONE;
HASH_Submit(he); assert(SPMCQ_Enq(entry));
} }
WRK_Halt(); WRK_Halt();
......
This diff is collapsed.
...@@ -76,8 +76,8 @@ spmcq_t spmcq; ...@@ -76,8 +76,8 @@ spmcq_t spmcq;
int SPMCQ_Init(void); int SPMCQ_Init(void);
bool SPMCQ_Enq(void *ptr); bool SPMCQ_Enq(void *ptr);
void *SPMCQ_Deq(void); void *SPMCQ_Deq(void);
bool SPMCQ_NeedWorker(void); bool SPMCQ_NeedWorker(int running);
bool SPMCQ_StopWorker(void); bool SPMCQ_StopWorker(int running);
#define spmcq_wait(what) \ #define spmcq_wait(what) \
do { \ do { \
...@@ -202,21 +202,14 @@ datatable dtbl; ...@@ -202,21 +202,14 @@ datatable dtbl;
int DATA_Init(void); int DATA_Init(void);
void DATA_Take_Freelist(struct freehead_s *dst); void DATA_Take_Freelist(struct freehead_s *dst);
void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned); void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
/*
* the noMT functions are _not_ MT-safe, so they can only be called
* from the registered thread
*/
void DATA_noMT_Register(void);
dataentry *DATA_noMT_Get(void);
void DATA_noMT_Free(dataentry *de);
void DATA_noMT_Submit(dataentry *de);
void DATA_Dump1(dataentry *entry, int i); void DATA_Dump1(dataentry *entry, int i);
void DATA_Dump(void); void DATA_Dump(void);
/* hash.c */ /* trackrdrd.c */
void HASH_Stats(void);
#if 0
typedef enum { typedef enum {
HASH_EMPTY = 0, HASH_EMPTY = 0,
/* OPEN when the main thread is filling data, ReqEnd not yet seen. */ /* OPEN when the main thread is filling data, ReqEnd not yet seen. */
...@@ -290,7 +283,7 @@ hashentry *HASH_Insert(const unsigned xid, dataentry *de, const float t); ...@@ -290,7 +283,7 @@ hashentry *HASH_Insert(const unsigned xid, dataentry *de, const float t);
hashentry *HASH_Find(unsigned xid); hashentry *HASH_Find(unsigned xid);
void HASH_Dump1(hashentry *entry, int i); void HASH_Dump1(hashentry *entry, int i);
void HASH_Dump(void); void HASH_Dump(void);
#endif
/* config.c */ /* config.c */
#define EMPTY(s) (s[0] == '\0') #define EMPTY(s) (s[0] == '\0')
......
...@@ -39,7 +39,7 @@ ...@@ -39,7 +39,7 @@
#include "vas.h" #include "vas.h"
#include "miniobj.h" #include "miniobj.h"
int nworkers = 0; static int running = 0;
typedef enum { typedef enum {
WRK_NOTSTARTED = 0, WRK_NOTSTARTED = 0,
...@@ -88,6 +88,8 @@ typedef struct { ...@@ -88,6 +88,8 @@ typedef struct {
static unsigned run, cleaned = 0; static unsigned run, cleaned = 0;
static thread_data_t *thread_data; static thread_data_t *thread_data;
static pthread_mutex_t running_lock;
static inline void static inline void
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk) wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
{ {
...@@ -152,6 +154,9 @@ static void ...@@ -152,6 +154,9 @@ static void
wrk->wrk_nfree = 0; wrk->wrk_nfree = 0;
wrk->state = WRK_RUNNING; wrk->state = WRK_RUNNING;
AZ(pthread_mutex_lock(&running_lock));
running++;
AZ(pthread_mutex_unlock(&running_lock));
while (run) { while (run) {
entry = (dataentry *) SPMCQ_Deq(); entry = (dataentry *) SPMCQ_Deq();
...@@ -160,7 +165,7 @@ static void ...@@ -160,7 +165,7 @@ static void
wrk_send(amq_worker, entry, wrk); wrk_send(amq_worker, entry, wrk);
/* should we go to sleep ? */ /* should we go to sleep ? */
if (SPMCQ_StopWorker()) if (SPMCQ_StopWorker(running))
goto sleep; goto sleep;
continue; continue;
...@@ -188,7 +193,7 @@ static void ...@@ -188,7 +193,7 @@ static void
* *
* also re-check the stop condition under the lock * also re-check the stop condition under the lock
*/ */
if (run && ((! entry) || SPMCQ_StopWorker())) { if (run && ((! entry) || SPMCQ_StopWorker(running))) {
wrk->waits++; wrk->waits++;
spmcq_datawaiter++; spmcq_datawaiter++;
wrk->state = WRK_WAITING; wrk->state = WRK_WAITING;
...@@ -305,22 +310,7 @@ WRK_Stats(void) ...@@ -305,22 +310,7 @@ WRK_Stats(void)
int int
WRK_Running(void) WRK_Running(void)
{ {
worker_data_t *wrk; return running;
while (1) {
int initialized = 0, running = 0;
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
if (wrk->state > WRK_INITIALIZING)
initialized++;
if (wrk->state == WRK_RUNNING ||
wrk->state == WRK_SHUTTINGDOWN ||
wrk->state == WRK_WAITING)
running++;
}
if (initialized == config.nworkers)
return running;
}
} }
void void
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment