Commit 644b0b5c authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - all data & functions exclusive to VSL reader are now

	static in trackrdrd.c (part of data.c and all of hash.c)
	- replaced the global nworkers with WRK_Running(), since
	nworkers caused too many dependencies (esp. for unit tests)
parent 9814a0b4
......@@ -125,114 +125,6 @@ DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
}
/* ------------------------------------------------------------ */
/* noMT Functions to be used by one thread (the VSL reader) only */
/* ------------------------------------------------------------ */
static struct freehead_s data_noMT_freelist =
VSTAILQ_HEAD_INITIALIZER(data_noMT_freelist);
static pthread_t data_noMT_threadid = 0;
#if defined(WITHOUT_EXPENSIVE_ASSERTS) || defined(WITHOUT_ASSERTS)
#define DATA_noMT_check_thread() do {} while(0)
#else
#define DATA_noMT_check_thread() \
assert(data_noMT_threadid == pthread_self());
#endif
/* the one thread has to register */
void
DATA_noMT_Register(void)
{
AZ(data_noMT_threadid);
data_noMT_threadid = pthread_self();
}
/* efficiently retrieve a single data entry */
dataentry
*DATA_noMT_Get(void)
{
dataentry *data;
DATA_noMT_check_thread();
take:
data = VSTAILQ_FIRST(&data_noMT_freelist);
if (data) {
VSTAILQ_REMOVE_HEAD(&data_noMT_freelist, freelist);
} else {
assert(VSTAILQ_EMPTY(&data_noMT_freelist));
while (dtbl.nfree == 0) {
dtbl.w_stats.wait_room++;
spmcq_wait(room);
}
DATA_Take_Freelist(&data_noMT_freelist);
assert(! VSTAILQ_EMPTY(&data_noMT_freelist));
goto take;
}
assert(data->state == DATA_EMPTY);
return (data);
}
/* return to our own local cache */
static inline void
data_nomt_free(dataentry *de)
{
DATA_noMT_check_thread();
assert(de->state == DATA_EMPTY);
VSTAILQ_INSERT_HEAD(&data_noMT_freelist, de, freelist);
}
void
DATA_noMT_Free(dataentry *de)
{
data_nomt_free(de);
}
void
DATA_noMT_Submit(dataentry *de)
{
DATA_noMT_check_thread();
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
assert(de->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, de->data);
if (de->hasdata == false) {
de->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
data_nomt_free(de);
return;
}
while (!SPMCQ_Enq((void *) de)) {
dtbl.w_stats.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
spmcq_wait(room);
}
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
if (SPMCQ_NeedWorker())
spmcq_signal(data);
/*
* base case: wake up a worker if all are sleeping
*
* this is an un-synced access to spmcq_data_waiter, but
* if we don't wake them up now, we will next time around
*/
if (nworkers == spmcq_datawaiter)
spmcq_signal(data);
}
void
DATA_Dump1(dataentry *entry, int i)
{
......
......@@ -29,7 +29,7 @@
* SUCH DAMAGE.
*
*/
#if 0
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
......@@ -343,3 +343,4 @@ HASH_Dump(void)
for (int i = 0; i < htbl.len; i++)
HASH_Dump1(&htbl.entry[i], i);
}
#endif
......@@ -43,41 +43,12 @@ static int run;
static void
log_output(void)
{
LOG_Log(LOG_INFO,
"Hash table: "
"len=%u "
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
htbl.len,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
100.0 * htbl.open / htbl.len,
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.occ_hi_this = 0;
/* Eliminate the dependency of trackrdrd.o for unit tests */
#ifndef TEST_DRIVER
HASH_Stats();
#endif
LOG_Log(LOG_INFO,
"Data table writer: "
"len=%u "
......
......@@ -78,9 +78,9 @@ spmcq_cleanup(void)
}
static inline int
spmcq_wrk_len_ratio(int working)
spmcq_wrk_len_ratio(int working, int running)
{
return working * qlen_goal / nworkers;
return working * qlen_goal / running;
}
int
......@@ -148,19 +148,21 @@ void
*/
bool
SPMCQ_NeedWorker(void)
SPMCQ_NeedWorker(int running)
{
if (nworkers == 0)
if (running == 0)
return false;
return spmcq_len() > spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter);
return spmcq_len() > spmcq_wrk_len_ratio(running - spmcq_datawaiter,
running);
}
bool
SPMCQ_StopWorker(void)
SPMCQ_StopWorker(int running)
{
if (nworkers == 0)
if (running == 0)
return false;
return spmcq_len() < spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter - 1);
return spmcq_len() < spmcq_wrk_len_ratio(running - spmcq_datawaiter - 1,
running);
}
#ifdef TEST_DRIVER
......
......@@ -22,13 +22,7 @@ test_data_SOURCES = \
test_data_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
../data.$(OBJEXT)
test_mq_SOURCES = \
minunit.h \
......@@ -48,13 +42,7 @@ test_spmcq_SOURCES = \
test_spmcq_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../spmcq.$(OBJEXT) \
@AMQ_LIBS@
../spmcq.$(OBJEXT)
test_spmcq_len_SOURCES = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
......@@ -62,19 +50,14 @@ test_spmcq_len_SOURCES = \
../trackrdrd.h
test_spmcq_len_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
$(VARNISHSRC)/lib/libvarnish/libvarnish.la
test_spmcq_len_CFLAGS = -DTEST_DRIVER
test_worker_SOURCES = \
minunit.h \
test_worker.c \
../monitor.c \
../trackrdrd.h
test_worker_LDADD = \
......@@ -85,6 +68,6 @@ test_worker_LDADD = \
../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../monitor.$(OBJEXT) \
../hash.$(OBJEXT) \
@AMQ_LIBS@
test_worker_CFLAGS = -DTEST_DRIVER
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '2982448694 234027' ]; then
if [ "$CKSUM" != '1346049411 234027' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
......@@ -74,7 +74,6 @@ static char
mu_assert(errmsg, err == 0);
AZ(LOG_Open("test_worker"));
AZ(HASH_Init());
AZ(DATA_Init());
AZ(SPMCQ_Init());
......@@ -85,7 +84,6 @@ static char
*test_worker_run(void)
{
dataentry *entry;
hashentry *he;
printf("... testing run of %d workers\n", NWORKERS);
......@@ -93,17 +91,14 @@ static char
unsigned xid = (unsigned int) lrand48();
WRK_Start();
DATA_noMT_Register();
for (int i = 0; i < 1024; i++) {
entry = DATA_noMT_Get();
entry = &dtbl.entry[i];
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
he = HASH_Insert(++xid, entry, TIM_mono());
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry->xid = xid;
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data);
entry->state = DATA_DONE;
HASH_Submit(he);
assert(SPMCQ_Enq(entry));
}
WRK_Halt();
......
This diff is collapsed.
......@@ -76,8 +76,8 @@ spmcq_t spmcq;
int SPMCQ_Init(void);
bool SPMCQ_Enq(void *ptr);
void *SPMCQ_Deq(void);
bool SPMCQ_NeedWorker(void);
bool SPMCQ_StopWorker(void);
bool SPMCQ_NeedWorker(int running);
bool SPMCQ_StopWorker(int running);
#define spmcq_wait(what) \
do { \
......@@ -202,21 +202,14 @@ datatable dtbl;
int DATA_Init(void);
void DATA_Take_Freelist(struct freehead_s *dst);
void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
/*
* the noMT functions are _not_ MT-safe, so they can only be called
* from the registered thread
*/
void DATA_noMT_Register(void);
dataentry *DATA_noMT_Get(void);
void DATA_noMT_Free(dataentry *de);
void DATA_noMT_Submit(dataentry *de);
void DATA_Dump1(dataentry *entry, int i);
void DATA_Dump(void);
/* hash.c */
/* trackrdrd.c */
void HASH_Stats(void);
#if 0
typedef enum {
HASH_EMPTY = 0,
/* OPEN when the main thread is filling data, ReqEnd not yet seen. */
......@@ -290,7 +283,7 @@ hashentry *HASH_Insert(const unsigned xid, dataentry *de, const float t);
hashentry *HASH_Find(unsigned xid);
void HASH_Dump1(hashentry *entry, int i);
void HASH_Dump(void);
#endif
/* config.c */
#define EMPTY(s) (s[0] == '\0')
......
......@@ -39,7 +39,7 @@
#include "vas.h"
#include "miniobj.h"
int nworkers = 0;
static int running = 0;
typedef enum {
WRK_NOTSTARTED = 0,
......@@ -88,6 +88,8 @@ typedef struct {
static unsigned run, cleaned = 0;
static thread_data_t *thread_data;
static pthread_mutex_t running_lock;
static inline void
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
{
......@@ -152,6 +154,9 @@ static void
wrk->wrk_nfree = 0;
wrk->state = WRK_RUNNING;
AZ(pthread_mutex_lock(&running_lock));
running++;
AZ(pthread_mutex_unlock(&running_lock));
while (run) {
entry = (dataentry *) SPMCQ_Deq();
......@@ -160,7 +165,7 @@ static void
wrk_send(amq_worker, entry, wrk);
/* should we go to sleep ? */
if (SPMCQ_StopWorker())
if (SPMCQ_StopWorker(running))
goto sleep;
continue;
......@@ -188,7 +193,7 @@ static void
*
* also re-check the stop condition under the lock
*/
if (run && ((! entry) || SPMCQ_StopWorker())) {
if (run && ((! entry) || SPMCQ_StopWorker(running))) {
wrk->waits++;
spmcq_datawaiter++;
wrk->state = WRK_WAITING;
......@@ -305,22 +310,7 @@ WRK_Stats(void)
int
WRK_Running(void)
{
worker_data_t *wrk;
while (1) {
int initialized = 0, running = 0;
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
if (wrk->state > WRK_INITIALIZING)
initialized++;
if (wrk->state == WRK_RUNNING ||
wrk->state == WRK_SHUTTINGDOWN ||
wrk->state == WRK_WAITING)
running++;
}
if (initialized == config.nworkers)
return running;
}
return running;
}
void
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment