Commit ff6f8f0f authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - all data & functions exclusive to VSL reader are now

	static in trackrdrd.c (part of data.c and all of hash.c)
	- replaced the global nworkers with WRK_Running(), since
	nworkers caused too many dependencies (esp. for unit tests)
parent 6c3ae785
......@@ -125,114 +125,6 @@ DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
}
/* ------------------------------------------------------------ */
/* noMT Functions to be used by one thread (the VSL reader) only */
/* ------------------------------------------------------------ */
static struct freehead_s data_noMT_freelist =
VSTAILQ_HEAD_INITIALIZER(data_noMT_freelist);
static pthread_t data_noMT_threadid = 0;
#if defined(WITHOUT_EXPENSIVE_ASSERTS) || defined(WITHOUT_ASSERTS)
#define DATA_noMT_check_thread() do {} while(0)
#else
#define DATA_noMT_check_thread() \
assert(data_noMT_threadid == pthread_self());
#endif
/* the one thread has to register */
void
DATA_noMT_Register(void)
{
AZ(data_noMT_threadid);
data_noMT_threadid = pthread_self();
}
/* efficiently retrieve a single data entry */
dataentry
*DATA_noMT_Get(void)
{
dataentry *data;
DATA_noMT_check_thread();
take:
data = VSTAILQ_FIRST(&data_noMT_freelist);
if (data) {
VSTAILQ_REMOVE_HEAD(&data_noMT_freelist, freelist);
} else {
assert(VSTAILQ_EMPTY(&data_noMT_freelist));
while (dtbl.nfree == 0) {
dtbl.w_stats.wait_room++;
spmcq_wait(room);
}
DATA_Take_Freelist(&data_noMT_freelist);
assert(! VSTAILQ_EMPTY(&data_noMT_freelist));
goto take;
}
assert(data->state == DATA_EMPTY);
return (data);
}
/* return to our own local cache */
static inline void
data_nomt_free(dataentry *de)
{
DATA_noMT_check_thread();
assert(de->state == DATA_EMPTY);
VSTAILQ_INSERT_HEAD(&data_noMT_freelist, de, freelist);
}
void
DATA_noMT_Free(dataentry *de)
{
data_nomt_free(de);
}
void
DATA_noMT_Submit(dataentry *de)
{
DATA_noMT_check_thread();
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
assert(de->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, de->data);
if (de->hasdata == false) {
de->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
data_nomt_free(de);
return;
}
while (!SPMCQ_Enq((void *) de)) {
dtbl.w_stats.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
spmcq_wait(room);
}
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
if (SPMCQ_NeedWorker())
spmcq_signal(data);
/*
* base case: wake up a worker if all are sleeping
*
* this is an un-synced access to spmcq_data_waiter, but
* if we don't wake them up now, we will next time around
*/
if (nworkers == spmcq_datawaiter)
spmcq_signal(data);
}
void
DATA_Dump1(dataentry *entry, int i)
{
......
......@@ -29,7 +29,7 @@
* SUCH DAMAGE.
*
*/
#if 0
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
......@@ -343,3 +343,4 @@ HASH_Dump(void)
for (int i = 0; i < htbl.len; i++)
HASH_Dump1(&htbl.entry[i], i);
}
#endif
......@@ -43,41 +43,12 @@ static int run;
static void
log_output(void)
{
LOG_Log(LOG_INFO,
"Hash table: "
"len=%u "
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
htbl.len,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
100.0 * htbl.open / htbl.len,
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.occ_hi_this = 0;
/* Eliminate the dependency of trackrdrd.o for unit tests */
#ifndef TEST_DRIVER
HASH_Stats();
#endif
LOG_Log(LOG_INFO,
"Data table writer: "
"len=%u "
......
......@@ -78,9 +78,9 @@ spmcq_cleanup(void)
}
static inline int
spmcq_wrk_len_ratio(int working)
spmcq_wrk_len_ratio(int working, int running)
{
return working * qlen_goal / nworkers;
return working * qlen_goal / running;
}
int
......@@ -148,19 +148,21 @@ void
*/
bool
SPMCQ_NeedWorker(void)
SPMCQ_NeedWorker(int running)
{
if (nworkers == 0)
if (running == 0)
return false;
return spmcq_len() > spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter);
return spmcq_len() > spmcq_wrk_len_ratio(running - spmcq_datawaiter,
running);
}
bool
SPMCQ_StopWorker(void)
SPMCQ_StopWorker(int running)
{
if (nworkers == 0)
if (running == 0)
return false;
return spmcq_len() < spmcq_wrk_len_ratio(nworkers - spmcq_datawaiter - 1);
return spmcq_len() < spmcq_wrk_len_ratio(running - spmcq_datawaiter - 1,
running);
}
#ifdef TEST_DRIVER
......
......@@ -22,13 +22,7 @@ test_data_SOURCES = \
test_data_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../spmcq.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
../data.$(OBJEXT)
test_mq_SOURCES = \
minunit.h \
......@@ -48,13 +42,7 @@ test_spmcq_SOURCES = \
test_spmcq_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../spmcq.$(OBJEXT) \
@AMQ_LIBS@
../spmcq.$(OBJEXT)
test_spmcq_len_SOURCES = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
......@@ -62,19 +50,14 @@ test_spmcq_len_SOURCES = \
../trackrdrd.h
test_spmcq_len_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../worker.$(OBJEXT) \
../monitor.$(OBJEXT) \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
@AMQ_LIBS@
$(VARNISHSRC)/lib/libvarnish/libvarnish.la
test_spmcq_len_CFLAGS = -DTEST_DRIVER
test_worker_SOURCES = \
minunit.h \
test_worker.c \
../monitor.c \
../trackrdrd.h
test_worker_LDADD = \
......@@ -85,6 +68,6 @@ test_worker_LDADD = \
../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../monitor.$(OBJEXT) \
../hash.$(OBJEXT) \
@AMQ_LIBS@
test_worker_CFLAGS = -DTEST_DRIVER
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '2982448694 234027' ]; then
if [ "$CKSUM" != '1346049411 234027' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
......@@ -74,7 +74,6 @@ static char
mu_assert(errmsg, err == 0);
AZ(LOG_Open("test_worker"));
AZ(HASH_Init());
AZ(DATA_Init());
AZ(SPMCQ_Init());
......@@ -85,7 +84,6 @@ static char
*test_worker_run(void)
{
dataentry *entry;
hashentry *he;
printf("... testing run of %d workers\n", NWORKERS);
......@@ -93,17 +91,14 @@ static char
unsigned xid = (unsigned int) lrand48();
WRK_Start();
DATA_noMT_Register();
for (int i = 0; i < 1024; i++) {
entry = DATA_noMT_Get();
entry = &dtbl.entry[i];
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
he = HASH_Insert(++xid, entry, TIM_mono());
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry->xid = xid;
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
entry->end = strlen(entry->data);
entry->state = DATA_DONE;
HASH_Submit(he);
assert(SPMCQ_Enq(entry));
}
WRK_Halt();
......
......@@ -52,6 +52,7 @@
#include <sys/wait.h>
#include <sys/types.h>
#include <pwd.h>
#include <limits.h>
#ifndef HAVE_EXECINFO_H
#include "compat/execinfo.h"
......@@ -103,31 +104,105 @@ static struct sigaction terminate_action, dump_action, ignore_action,
static char cli_config_filename[BUFSIZ] = "";
static int wrk_running = 0;
/* Local freelist */
static struct freehead_s reader_freelist =
VSTAILQ_HEAD_INITIALIZER(reader_freelist);
typedef enum {
HASH_EMPTY = 0,
/* OPEN when the main thread is filling data, ReqEnd not yet seen. */
HASH_OPEN
/* hashes become HASH_EMPTY for DATA_DONE */
} hash_state_e;
struct hashentry_s {
unsigned magic;
#define HASH_MAGIC 0xf8e12130
/* set in HASH_Insert */
hash_state_e state;
unsigned xid; /* == de->xid */
float insert_time;
VTAILQ_ENTRY(hashentry_s) insert_list;
dataentry *de;
};
typedef struct hashentry_s hashentry;
VTAILQ_HEAD(insert_head_s, hashentry_s);
struct hashtable_s {
unsigned magic;
#define HASHTABLE_MAGIC 0x89ea1d00
unsigned len;
hashentry *entry;
struct insert_head_s insert_head;
/* config */
unsigned max_probes;
float ttl; /* max age for a record */
float mlt; /* min life time */
/* == stats == */
unsigned seen; /* Records (ReqStarts) seen */
/*
* records we have dropped because of no hash, no data
* or no entry
*/
unsigned drop_reqstart;
unsigned drop_vcl_log;
unsigned drop_reqend;
unsigned expired;
unsigned evacuated;
unsigned open;
unsigned collisions;
unsigned insert_probes;
unsigned find_probes;
unsigned fail; /* failed to get record - no space */
unsigned occ_hi; /* Occupancy high water mark */
unsigned occ_hi_this; /* Occupancy high water mark this reporting
interval*/
};
typedef struct hashtable_s hashtable;
static hashtable htbl;
#ifdef WITHOUT_ASSERTS
#define entry_assert(e, cond) do { (void)(e);(void)(cond);} while(0)
#else /* WITH_ASSERTS */
#define entry_assert(e, cond) \
do { \
if (!(cond)) \
entry_assert_failure(__func__, __FILE__, __LINE__, #cond, (e), errno, 0); \
} while (0)
do { \
if (!(cond)) \
entry_assert_failure(__func__, __FILE__, __LINE__, #cond, (e), \
errno, 0); \
} while (0)
static void assert_failure(const char *func, const char *file, int line, const char *cond,
int err, int xxx);
static void assert_failure(const char *func, const char *file, int line,
const char *cond, int err, int xxx);
static void
entry_assert_failure(const char *func, const char *file, int line, const char *cond,
hashentry *he, int err, int xxx)
{
dataentry *de = he->de;
LOG_Log(LOG_ALERT, "Hashentry %p magic %0x state %u xid %u insert_time %f de %p",
(he), (he)->magic, (he)->state, (he)->xid, (he)->insert_time, (he)->de);
if (de)
LOG_Log(LOG_ALERT, "Dataentry %p magic %0x state %u xid %u tid %u end %u",
(de), (de)->magic, (de)->state, (de)->xid, (de)->tid, (de)->end);
else
LOG_Log(LOG_ALERT, "Dataentry %p NULL!", (de));
assert_failure(func, file, line, cond, err, xxx);
entry_assert_failure(const char *func, const char *file, int line,
const char *cond, hashentry *he, int err, int xxx)
{
dataentry *de = he->de;
LOG_Log(LOG_ALERT,
"Hashentry %p magic %0x state %u xid %u insert_time %f de %p",
(he), (he)->magic, (he)->state, (he)->xid, (he)->insert_time, (he)->de);
if (de)
LOG_Log(LOG_ALERT,
"Dataentry %p magic %0x state %u xid %u tid %u end %u",
(de), (de)->magic, (de)->state, (de)->xid, (de)->tid, (de)->end);
else
LOG_Log(LOG_ALERT, "Dataentry %p NULL!", (de));
assert_failure(func, file, line, cond, err, xxx);
}
#endif
......@@ -149,16 +224,16 @@ assert_failure(const char *func, const char *file, int line, const char *cond,
static inline void
check_entry(hashentry *he, unsigned xid, unsigned tid)
{
dataentry *de;
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry_assert(he, he->xid == xid);
entry_assert(he, he->state == HASH_OPEN);
de = he->de;
entry_assert(he, de != NULL);
entry_assert(he, de->magic == DATA_MAGIC);
entry_assert(he, de->xid == xid);
entry_assert(he, de->tid == tid);
dataentry *de;
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry_assert(he, he->xid == xid);
entry_assert(he, he->state == HASH_OPEN);
de = he->de;
entry_assert(he, de != NULL);
entry_assert(he, de->magic == DATA_MAGIC);
entry_assert(he, de->xid == xid);
entry_assert(he, de->tid == tid);
}
static void
......@@ -198,18 +273,417 @@ stacktrace_abort(int sig)
/*--------------------------------------------------------------------*/
/* efficiently retrieve a single data entry */
static inline dataentry
*data_get(void)
{
dataentry *data;
while (VSTAILQ_EMPTY(&reader_freelist)) {
while (dtbl.nfree == 0) {
dtbl.w_stats.wait_room++;
spmcq_wait(room);
}
DATA_Take_Freelist(&reader_freelist);
}
data = VSTAILQ_FIRST(&reader_freelist);
VSTAILQ_REMOVE_HEAD(&reader_freelist, freelist);
assert(data->state == DATA_EMPTY);
return (data);
}
/* return to our own local cache */
static inline void
data_free(dataentry *de)
{
assert(de->state == DATA_EMPTY);
VSTAILQ_INSERT_HEAD(&reader_freelist, de, freelist);
}
static inline void
data_submit(dataentry *de)
{
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
assert(de->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, de->data);
if (de->hasdata == false) {
de->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
data_free(de);
return;
}
while (!SPMCQ_Enq((void *) de)) {
dtbl.w_stats.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
spmcq_wait(room);
}
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
if (SPMCQ_NeedWorker(wrk_running))
spmcq_signal(data);
/*
* base case: wake up a worker if all are sleeping
*
* this is an un-synced access to spmcq_data_waiter, but
* if we don't wake them up now, we will next time around
*/
if (wrk_running == spmcq_datawaiter)
spmcq_signal(data);
}
/*--------------------------------------------------------------------*/
#define INDEX(u) ((u) & (htbl.len - 1))
/*
* N.B.: Hash functions defined for XIDs, which are declared in Varnish as
* unsigned int, assuming that they are 32 bit.
*/
#if UINT_MAX != UINT32_MAX
#error "Unsigned ints are not 32 bit"
#endif
#define rotr(v,n) (((v) >> (n)) | ((v) << (32 - (n))))
#define USE_JENKMULVEY1
#define h1(k) jenkmulvey1(k)
#define h2(k) wang(k)
#ifdef USE_JENKMULVEY1
/*
* http://home.comcast.net/~bretm/hash/3.html
* Bret Mulvey ascribes this to Bob Jenkins, but I can't find any
* reference to it by Jenkins himself.
*/
static uint32_t
jenkmulvey1(uint32_t n)
{
n += (n << 12);
n ^= (n >> 22);
n += (n << 4);
n ^= (n >> 9);
n += (n << 10);
n ^= (n >> 2);
n += (n << 7);
n ^= (n >> 12);
return(n);
}
#endif
#ifdef USE_JENKMULVEY2
/*
* http://home.comcast.net/~bretm/hash/4.html
* Mulvey's modification of the (alleged) Jenkins algorithm
*/
static uint32_t
jenkmulvey2(uint32_t n)
{
n += (n << 16);
n ^= (n >> 13);
n += (n << 4);
n ^= (n >> 7);
n += (n << 10);
n ^= (n >> 5);
n += (n << 8);
n ^= (n >> 16);
return(n);
}
#endif
/*
* http://www.cris.com/~Ttwang/tech/inthash.htm
*/
static uint32_t
wang(uint32_t n)
{
n = ~n + (n << 15); // n = (n << 15) - n - 1;
n ^= rotr(n,12);
n += (n << 2);
n ^= rotr(n,4);
n = (n + (n << 3)) + (n << 11);
n ^= rotr(n,16);
return n;
}
void
HASH_Stats(void)
{
LOG_Log(LOG_INFO,
"Hash table: "
"len=%u "
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
htbl.len,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
100.0 * htbl.open / htbl.len,
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.occ_hi_this = 0;
}
static void
hash_cleanup(void)
{
free(htbl.entry);
}
static int
hash_init(void)
{
hashentry *entryptr;
int entries = 1 << config.maxopen_scale;
entryptr = (hashentry *) calloc(entries, sizeof(hashentry));
if (entryptr == NULL)
return(errno);
memset(&htbl, 0, sizeof(hashtable));
htbl.magic = HASHTABLE_MAGIC;
htbl.len = entries;
htbl.entry = entryptr;
VTAILQ_INIT(&htbl.insert_head);
htbl.max_probes = config.hash_max_probes;
htbl.ttl = config.hash_ttl;
htbl.mlt = config.hash_mlt;
/* entries init */
for (int i = 0; i < entries; i++) {
htbl.entry[i].magic = HASH_MAGIC;
htbl.entry[i].state = HASH_EMPTY;
}
atexit(hash_cleanup);
return(0);
}
static inline void
hash_free(hashentry *he)
{
VTAILQ_REMOVE(&htbl.insert_head, he, insert_list);
he->state = HASH_EMPTY;
he->de = NULL;
htbl.open--;
}
static inline void
hash_submit(hashentry *he)
{
dataentry *de = he->de;
assert(he->xid == de->xid);
data_submit(de);
}
static inline void
incomplete(hashentry *he)
{
dataentry *de;
de = he->de;
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
de->incomplete = true;
MON_StatsUpdate(STATS_DONE);
de->state = DATA_DONE;
}
static void
hash_exp(float limit)
{
hashentry *he;
float p_insert_time = 0.0;
while ((he = VTAILQ_FIRST(&htbl.insert_head))) {
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
if (he->insert_time > limit)
return;
assert(p_insert_time <= he->insert_time);
p_insert_time = he->insert_time;
LOG_Log(LOG_DEBUG, "expire: hash=%u insert_time=%f limit=%f",
he->xid, he->insert_time, limit);
htbl.expired++;
incomplete(he);
hash_submit(he);
hash_free(he);
}
}
static inline void
submit(hashentry *he)
{
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
LOG_Log(LOG_DEBUG, "submit: hash=%u", he->xid);
hash_submit(he);
hash_free(he);
}
/* like Submit, but for recrods in HASH_OPEN */
static void
hash_evacuate(hashentry *he)
{
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
LOG_Log(LOG_DEBUG, "evacuate: hash=%u insert_time=%f",
he->xid, he->insert_time);
htbl.evacuated++;
incomplete(he);
hash_submit(he);
hash_free(he);
}
static hashentry
*hash_insert(const unsigned xid, dataentry *de, const float t)
{
hashentry *he, *oldest;
unsigned probes = 0;
uint32_t h = h1(xid);
const uint32_t h2 = h2(xid);
he = &htbl.entry[INDEX(h)];
if (he->state == HASH_EMPTY)
goto ok;
htbl.collisions++;
oldest = he;
do {
h += h2;
he = &htbl.entry[INDEX(h)];
probes++;
if (he->state == HASH_EMPTY)
goto ok;
if (he->insert_time < oldest->insert_time)
oldest = he;
} while (probes <= htbl.max_probes);
/* none eligable for evacuation */
if ((oldest->insert_time + htbl.mlt) > t) {
htbl.fail++;
htbl.insert_probes += probes;
return (NULL);
}
hash_evacuate(oldest);
he = oldest;
ok:
htbl.insert_probes += probes;
he->state = HASH_OPEN;
he->xid = xid;
he->insert_time = t;
VTAILQ_INSERT_TAIL(&htbl.insert_head, he, insert_list);
he->de = de;
/* stats */
htbl.open++;
if (htbl.open > htbl.occ_hi)
htbl.occ_hi = htbl.open;
if (htbl.open > htbl.occ_hi_this)
htbl.occ_hi_this = htbl.open;
return(he);
}
static hashentry
*hash_find(const unsigned xid)
{
hashentry *he;
unsigned probes = 0;
uint32_t h = h1(xid);
const uint32_t h2 = h2(xid);
he = &htbl.entry[INDEX(h)];
if (he->xid == xid)
return (he);
do {
h += h2;
he = &htbl.entry[INDEX(h)];
probes++;
if (he->xid == xid)
break;
} while (probes <= htbl.max_probes);
htbl.find_probes += probes;
if (probes > htbl.max_probes)
return NULL;
return (he);
}
static void
hash_dump1(hashentry *entry, int i)
{
if (entry->state == HASH_EMPTY)
return;
LOG_Log(LOG_INFO, "Hash entry %d: XID=%d",
i, entry->xid);
DATA_Dump1(entry->de, 0);
assert(entry->xid == entry->de->xid);
}
static void
hash_dump(void)
{
for (int i = 0; i < htbl.len; i++)
hash_dump1(&htbl.entry[i], i);
}
/*--------------------------------------------------------------------*/
static inline dataentry
*insert(unsigned xid, unsigned fd, float tim)
{
dataentry *de = DATA_noMT_Get();
hashentry *he = HASH_Insert(xid, de, tim);
dataentry *de = data_get();
hashentry *he = hash_insert(xid, de, tim);
if (! he) {
LOG_Log(LOG_WARNING, "Insert: Could not insert hash for XID %d",
xid);
DATA_noMT_Free(de);
return (NULL);
LOG_Log(LOG_WARNING, "Insert: Could not insert hash for XID %d",
xid);
data_free(de);
return (NULL);
}
/* he being filled out by Hash_Insert, we need to look after de */
......@@ -259,7 +733,7 @@ append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
* consequences:
* - hash lookups become inefficient
* - inserts become more likely to fail
* - before we had HASH_Exp, the hash would become useless
* - before we had hash_Exp, the hash would become useless
* - if the VSL wraps, we will see corrupt data
*
* so if we really cannot create an entry at ReqStart time, we need to thow
......@@ -293,9 +767,15 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
(void) priv;
(void) bitmap;
if (term && htbl.open == 0)
return 1;
if (wrk_running < config.nworkers) {
wrk_running = WRK_Running();
if (wrk_running < config.nworkers)
LOG_Log(LOG_ALERT, "%d of %d workers running", wrk_running,
config.nworkers);
}
/* spec != 'c' */
if ((spec & VSL_S_CLIENT) == 0)
......@@ -322,7 +802,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
/* configurable ? */
if ((tim - tim_exp_check) > 10) {
HASH_Exp(tim - htbl.ttl);
hash_exp(tim - htbl.ttl);
tim_exp_check = tim;
}
break;
......@@ -338,7 +818,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
LOG_Log(LOG_DEBUG, "%s: XID=%u, data=[%.*s]", VSL_tags[tag],
xid, datalen, data);
he = HASH_Find(xid);
he = hash_find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
......@@ -364,7 +844,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
xid_spread_sum += (last_end_xid - last_start_xid);
xid_spread_count++;
he = HASH_Find(xid);
he = hash_find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
......@@ -379,7 +859,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
append(de, tag, xid, reqend_str, strlen(reqend_str));
de->state = DATA_DONE;
MON_StatsUpdate(STATS_DONE);
HASH_Submit(he);
submit(he);
break;
default:
......@@ -420,7 +900,7 @@ static void
dump(int sig)
{
(void) sig;
HASH_Dump();
hash_dump();
}
static void
......@@ -630,7 +1110,7 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
LOG_Log(LOG_ERR, "Cannot init data table: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if (HASH_Init() != 0) {
if (hash_init() != 0) {
LOG_Log(LOG_ERR, "Cannot init hash table: %s", strerror(errno));
exit(EXIT_FAILURE);
}
......@@ -672,24 +1152,27 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
exit(EXIT_FAILURE);
}
/* Start worker threads */
WRK_Start();
nworkers = WRK_Running();
LOG_Log0(LOG_INFO, "Worker threads initialized");
if (nworkers < config.nworkers) {
LOG_Log(LOG_WARNING, "%d of %d worker threads running", nworkers,
config.nworkers);
if (nworkers == 0) {
LOG_Log0(LOG_ALERT, "Worker process shutting down");
exit(EXIT_FAILURE);
if (config.nworkers > 0) {
WRK_Start();
/* XXX: wrk_wait & sleep interval configurable */
int wrk_wait = 0;
while ((wrk_running = WRK_Running()) == 0) {
if (wrk_wait++ > 10) {
LOG_Log0(LOG_ALERT,
"Worker threads not starting, shutting down");
exit(EXIT_FAILURE);
}
TIM_sleep(1);
}
LOG_Log(LOG_INFO, "%d worker threads running", wrk_running);
}
else
LOG_Log0(LOG_INFO, "Worker threads not running");
/* Main loop */
term = 0;
/* XXX: Varnish restart? */
/* XXX: TERM not noticed until request received */
DATA_noMT_Register();
while (VSL_Dispatch(vd, OSL_Track, NULL) > 0)
if (term || !endless)
break;
......
......@@ -76,8 +76,8 @@ spmcq_t spmcq;
int SPMCQ_Init(void);
bool SPMCQ_Enq(void *ptr);
void *SPMCQ_Deq(void);
bool SPMCQ_NeedWorker(void);
bool SPMCQ_StopWorker(void);
bool SPMCQ_NeedWorker(int running);
bool SPMCQ_StopWorker(int running);
#define spmcq_wait(what) \
do { \
......@@ -202,21 +202,14 @@ datatable dtbl;
int DATA_Init(void);
void DATA_Take_Freelist(struct freehead_s *dst);
void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
/*
* the noMT functions are _not_ MT-safe, so they can only be called
* from the registered thread
*/
void DATA_noMT_Register(void);
dataentry *DATA_noMT_Get(void);
void DATA_noMT_Free(dataentry *de);
void DATA_noMT_Submit(dataentry *de);
void DATA_Dump1(dataentry *entry, int i);
void DATA_Dump(void);
/* hash.c */
/* trackrdrd.c */
void HASH_Stats(void);
#if 0
typedef enum {
HASH_EMPTY = 0,
/* OPEN when the main thread is filling data, ReqEnd not yet seen. */
......@@ -290,7 +283,7 @@ hashentry *HASH_Insert(const unsigned xid, dataentry *de, const float t);
hashentry *HASH_Find(unsigned xid);
void HASH_Dump1(hashentry *entry, int i);
void HASH_Dump(void);
#endif
/* config.c */
#define EMPTY(s) (s[0] == '\0')
......
......@@ -39,7 +39,7 @@
#include "vas.h"
#include "miniobj.h"
int nworkers = 0;
static int running = 0;
typedef enum {
WRK_NOTSTARTED = 0,
......@@ -88,6 +88,8 @@ typedef struct {
static unsigned run, cleaned = 0;
static thread_data_t *thread_data;
static pthread_mutex_t running_lock;
static inline void
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
{
......@@ -152,6 +154,9 @@ static void
wrk->wrk_nfree = 0;
wrk->state = WRK_RUNNING;
AZ(pthread_mutex_lock(&running_lock));
running++;
AZ(pthread_mutex_unlock(&running_lock));
while (run) {
entry = (dataentry *) SPMCQ_Deq();
......@@ -160,7 +165,7 @@ static void
wrk_send(amq_worker, entry, wrk);
/* should we go to sleep ? */
if (SPMCQ_StopWorker())
if (SPMCQ_StopWorker(running))
goto sleep;
continue;
......@@ -188,7 +193,7 @@ static void
*
* also re-check the stop condition under the lock
*/
if (run && ((! entry) || SPMCQ_StopWorker())) {
if (run && ((! entry) || SPMCQ_StopWorker(running))) {
wrk->waits++;
spmcq_datawaiter++;
wrk->state = WRK_WAITING;
......@@ -305,22 +310,7 @@ WRK_Stats(void)
int
WRK_Running(void)
{
worker_data_t *wrk;
while (1) {
int initialized = 0, running = 0;
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
if (wrk->state > WRK_INITIALIZING)
initialized++;
if (wrk->state == WRK_RUNNING ||
wrk->state == WRK_SHUTTINGDOWN ||
wrk->state == WRK_WAITING)
running++;
}
if (initialized == config.nworkers)
return running;
}
return running;
}
void
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment