Commit ac9eb9bc authored by Nils Goroll's avatar Nils Goroll Committed by Geoff Simmons

Various performance and stability improvements, hash/data table separation

major changes
=============

hash/data table
---------------

The hash table is now only used for _OPEN records, and the actual data
is stored in a data table. Upon submit, hash entries are cleared and
data continues to live in the data table until it gets freed by a
worker (or upon submit if it is a NODATA record).

This drastically reduces the hash table load and significantly
increases worst case performance. In particular, the hash table load
is now independend of ActiveMQ backend performance (read: stalls).

Preliminary recommendations fon table sizing:

* hash table: double max_sessions from varnish

  e.g.

  maxopen.scale = 16

  for 64K hash table entries to support >32K sessions
  (savely and efficiently)

* data table: max(req/s) * max(ActiveMQ stall time)

  e.g. to survive 8000 req/s with 60 seconds ActiveMQ stall time,
  the data table should be >240K in size, so

  maxdone.scale = 19

  (= 512K entries) should be on the safe side also to provide
  sufficient buffer for temporary load peaks

hash table performance
----------------------

Previously, the maximum number of probes to the hash table was set to
the hash table size - which resulted in bad insert performance and
even worse lookup performance.

Now that the hash table is freed of _OPEN records, we can remove this
burden and limit the maximum number of probles to a sensible value (10
to start with, configurable as hash_max_probes.

As another consequence, as we don't require 100% capacity on the hash
table, we don't need to run an exhaustive search upon insert. Thus,
probing has been changed from liner to hash (by h2()).

only ever insert on ReqStart - and drop if we can't
---------------------------------------------------

Keeping up with the VSL is essential. Once we fall behind, we are in
real trouble:

- If we miss ReqEnd, we will clobber our hash, with drastic effects:
  - hash lookups become inefficient
  - inserts become more likely to fail
  - before we had HASH_Exp (see below), the hash would become useless

- When the VSL writer overtakes our reader, we will see corrupt data
  and miss _many_ VCL Logs and ReqEnds (as many as can be found in the
  whole VSL), so, again, our hash and data arrays will get clobbered
  with incomplete data (which needs to be cleaned up by HASH_Exp).

The latter point is the most relevant, corrupt records are likely to
trigger assertions.

Thus, keeping up with the VSL needs to be our primary objective. When
the VSL overtakes, we will loose a massive amount auf reconds anyway
(and we won't even know how many). As long as we don't stop Varnish
when we fall behind, we can't avoid loosing records under certain
circumstances anway (for instance, when the backend stalls and the
data table runs full), so we should rather drop early, in a controlled
manner - and without drastic performance penalty.

Under this doctrine, it does not make sense to insert records for
VSL_Log or ReqEnd, so if an xid can't be found for these tags, the
respective events will get dropped (and logged).

performance optimizations
=========================

spmcq reader/writer synchronization
-----------------------------------

Various measures have been implemented to reduce syscall and general
function call overhead for reader/writer synchroniration on the
spmcq. Previously, the writer would issue a pthread_cond_signal to
potentially wake up a reader, irrespective of whether or not a reader
was actually blocking on the CV.

- now, the number of waiting readers (workers) is modified inside a
  lock, but queried first from outside the lock, so if there are no
  readers waiting the CV is not signalled.

- The number of running readers is (attempted to be) kept proportional
  to the queue length for queue lengths between 0 and
  2^qlen_goal.scale to further reduce the number of worker thread
  block/wakeup transitions under low to averade load.

pthread_mutex / ptherad_condvar attributes
------------------------------------------

Attributes are now being used to allow the O/S implementation to
choose more efficient low-level synchronization primitives because we
know that we are using these only within one multi-threaded process.

data table freelist
-------------------

To allow for efficient allocation of new data table entries, a free
list with local caches is maintained:

- The data writer (VSL reader thread) maintains its own freelist and
  serves requests from it without any synchronization overhead.

- Only when the data writer's own freelist is exchausted will it
  access the global freelist (under a lock). It will take the whole
  list at once and resume serving new records from its own cache.

- Workers also maintain their own freelist of entries to be returned
  to the global freelist as long as

  - they are running
  - there are entries on the global list.

  Before a worker thread goes to block on the spmcq condvar, it
  returns all its freelist entries to the global freelist. Also, it
  will always check if the global list is empty and return any entries
  immediately if it is.

stability improvements
======================

record timeouts
---------------

Every hash entry gets added to the insert_list ordered by insertion
time. Not any more often then x seconds (currently hard-coded to x=10,
check only performed when ReqStart is seen), the list is checked for
records which have reached their ttl (configured by hash_ttl, default
120 seconds). These get submitted despite the fact that no ReqEnd has
been seen - under the assumption that no ReqEnd is ever to be expected
after a certain time has passed.

hash evacuation
---------------

If no free entry is found when probing all possible locations for an
insert, the oldest record is evacuated from the hash and submitted to
the backend if its live time has exceeded hash_mlt under the
assumption that it is better to submit records early (which are likely
to carry useful log information already) than throwing away records.

If this behavior is not desired, hash_mtl can be set to hash_ttl.

various code changes
====================

* statistics have been reorganized to seperate out
  - hash
  - data writer/VSL reader
  - data reader/worker (partially shared with writer)
  statistics

* print the native thread ID for workers (to allow to correllation
  with prstat/top output)

* workers have a new state when blocking on the spmcq CV: WRK_WAITING
  / "waiting" in monitor output

* because falling behind with VSL reading (the VSL writer overtaking
  our reader) is so bad, notices are logged whenever the new VSL data
  pointer is less than the previous one, iow the VSL ring buffer
  wraps.

  this is not the same as a detection of the VSL writer overtaking
  (which would require varnishapi changes), but noting information and
  some statistics about VSL wraps can (and did) help analyze track
  down strance issues to VSL overtaking.

config file changes
===================

* The _scale options

  maxopen.scale
  maxdone.scale (new, see below)
  maxdata.scale

  are now being used directly, rather than in addition to a base value
  of 10 as before.

  10 is now the minimum value and an EINVAL error will get thrown
  when lower values are used in the config file.

new config options
==================

see trackrdrd.h for documentation in comments:

* maxdone.scale

  Scale for records in _DONE states, determines size of
  - the data table (which is maxopen + maxdone)
  - the spmcq

* qlen_goal.scale

  Scale for the spmcq queue length goal. All worker threads will be
  used when the queue length corresponding to the scale is reached.

  For shorter queue lengths, the number of worker threads will be
  scaled propotionally.

* hash_max_probes

  Maximum number of probes to the hash.

  Smaller values increase efficiency, but reduce the capacity of the
  hash (more ReqStart records may get lost) - and vice versa for
  higher values.

* hash_ttl

  Maximum time to live for records in the _OPEN state

  Entries which are older than this ttl _may_ get expired from the
  trackrdrd state.

  This should get set to a value significantly longer than your
  maximum session lifetime in Varnish.

* hash_mlt

  Minimum lifetime for entries in HASH_OPEN before they could get
  evacuated.

  Entries are guaranteed to remain in trackrdrd for this duration.
  Once the mlt is reached, they _may_ get expired when trackrdrd needs
  space in the hash.
parent 9ad71611
......@@ -10,6 +10,7 @@ trackrdrd_SOURCES = \
parse.c \
log.c \
config.c \
hash.c \
data.c \
monitor.c \
mq.c \
......
......@@ -101,6 +101,19 @@ conf_getUnsignedInt(const char *rval, unsigned *i)
return(0); \
}
#define confUnsignedMinVal(name,fld,min) \
if (strcmp(lval, name) == 0) { \
unsigned int i; \
int err = conf_getUnsignedInt(rval, &i); \
if (err != 0) \
return err; \
if (i < min) \
return (EINVAL); \
config.fld = i; \
return(0); \
}
int
CONF_Add(const char *lval, const char *rval)
{
......@@ -114,8 +127,13 @@ CONF_Add(const char *lval, const char *rval)
confString("mq.uri", mq_uri);
confString("mq.qname", mq_qname);
confUnsigned("maxopen.scale", maxopen_scale);
confUnsigned("maxdata.scale", maxdata_scale);
confUnsignedMinVal("maxopen.scale", maxopen_scale, MIN_MAXOPEN_SCALE);
confUnsignedMinVal("maxdone.scale", maxdone_scale, MIN_MAXDONE_SCALE);
confUnsignedMinVal("maxdata.scale", maxdata_scale, MIN_MAXDATA_SCALE);
confUnsigned("qlen_goal.scale", qlen_goal_scale);
confUnsigned("hash_max_probes", hash_max_probes);
confUnsigned("hash_ttl", hash_ttl);
confUnsigned("hash_mlt", hash_mlt);
confUnsigned("nworkers", nworkers);
confUnsigned("restarts", restarts);
confUnsigned("monitor.interval", monitor_interval);
......@@ -200,8 +218,14 @@ CONF_Init(void)
config.monitor_interval = 30;
config.monitor_workers = false;
config.processor_log[0] = '\0';
config.maxopen_scale = 0;
config.maxdata_scale = 0;
config.maxopen_scale = MIN_MAXOPEN_SCALE;
config.maxdone_scale = MIN_MAXDONE_SCALE;
config.maxdata_scale = MIN_MAXDATA_SCALE;
config.qlen_goal_scale = DEF_QLEN_GOAL_SCALE;
config.hash_max_probes = DEF_HASH_MAX_PROBES;
config.hash_ttl = DEF_HASH_TTL;
config.hash_mlt = DEF_HASH_MTL;
config.mq_uri[0] = '\0';
config.mq_qname[0] = '\0';
config.nworkers = 1;
......@@ -283,6 +307,12 @@ CONF_Dump(void)
confdump("processor.log = %s", config.processor_log);
confdump("maxopen.scale = %u", config.maxopen_scale);
confdump("maxdata.scale = %u", config.maxdata_scale);
confdump("qlen_goal.scale = %u", config.qlen_goal_scale);
confdump("hash_max_probes", config.hash_max_probes);
confdump("hash_ttl", config.hash_ttl);
confdump("hash_mlt", config.hash_mlt);
confdump("mq.uri = %s", config.mq_uri);
confdump("mq.qname = %s", config.mq_qname);
confdump("nworkers = %u", config.nworkers);
......
......@@ -4,7 +4,8 @@
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
......@@ -30,97 +31,20 @@
*/
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <stdint.h>
#include <limits.h>
#include <stdbool.h>
#include "libvarnish.h"
#include "miniobj.h"
#include <syslog.h>
#include "trackrdrd.h"
#define MIN_DATA_SCALE 10
#define INDEX(u) ((u) & (tbl.len - 1))
#include "vas.h"
#include "miniobj.h"
static const char *statename[3] = { "EMPTY", "OPEN", "DONE" };
/*
* N.B.: Hash functions defined for XIDs, which are declared in Varnish as
* unsigned int, assuming that they are 32 bit.
*/
#if UINT_MAX != UINT32_MAX
#error "Unsigned ints are not 32 bit"
#endif
#define rotr(v,n) (((v) >> (n)) | ((v) << (32 - (n))))
#define USE_JENKMULVEY1
#define h1(k) jenkmulvey1(k)
#define h2(k) wang(k)
#ifdef USE_JENKMULVEY1
/*
* http://home.comcast.net/~bretm/hash/3.html
* Bret Mulvey ascribes this to Bob Jenkins, but I can't find any
* reference to it by Jenkins himself.
*/
static uint32_t
jenkmulvey1(uint32_t n)
{
n += (n << 12);
n ^= (n >> 22);
n += (n << 4);
n ^= (n >> 9);
n += (n << 10);
n ^= (n >> 2);
n += (n << 7);
n ^= (n >> 12);
return(n);
}
#endif
#ifdef USE_JENKMULVEY2
/*
* http://home.comcast.net/~bretm/hash/4.html
* Mulvey's modification of the (alleged) Jenkins algorithm
*/
static uint32_t
jenkmulvey2(uint32_t n)
{
n += (n << 16);
n ^= (n >> 13);
n += (n << 4);
n ^= (n >> 7);
n += (n << 10);
n ^= (n >> 5);
n += (n << 8);
n ^= (n >> 16);
return(n);
}
#endif
/*
* http://www.cris.com/~Ttwang/tech/inthash.htm
*/
static uint32_t
wang(uint32_t n)
{
n = ~n + (n << 15); // n = (n << 15) - n - 1;
n ^= rotr(n,12);
n += (n << 2);
n ^= rotr(n,4);
n = (n + (n << 3)) + (n << 11);
n ^= rotr(n,16);
return n;
}
static void
data_Cleanup(void)
{
free(tbl.entry);
free(tbl.buf);
free(dtbl.entry);
free(dtbl.buf);
AZ(pthread_mutex_destroy(&dtbl.freelist_lock));
}
int
......@@ -129,8 +53,13 @@ DATA_Init(void)
dataentry *entryptr;
char *bufptr;
int bufsize = 1 << (config.maxdata_scale + MIN_DATA_SCALE);
int entries = 1 << (config.maxopen_scale + MIN_TABLE_SCALE);
int bufsize = 1 << config.maxdata_scale;
/*
* we want enough space to accomodate all open and done records
*
*/
int entries = (1 << config.maxopen_scale) + (1 << config.maxdone_scale);
entryptr = (dataentry *) calloc(entries, sizeof(dataentry));
if (entryptr == NULL)
......@@ -142,69 +71,181 @@ DATA_Init(void)
return(errno);
}
datatable init_tbl =
{ .magic = DATATABLE_MAGIC, .len = entries, .collisions = 0,
.insert_probes = 0, .find_probes = 0, .seen = 0, .open = 0, .done = 0,
.nodata = 0, .len_overflows = 0, .data_overflows = 0, .submitted = 0,
.occ_hi = 0, .data_hi = 0, .entry = entryptr, .buf = bufptr };
memcpy(&tbl, &init_tbl, sizeof(datatable));
memset(&dtbl, 0, sizeof(datatable));
dtbl.magic = DATATABLE_MAGIC;
dtbl.len = entries;
VSTAILQ_INIT(&dtbl.freehead);
AZ(pthread_mutex_init(&dtbl.freelist_lock, &attr_lock));
dtbl.entry = entryptr;
dtbl.buf = bufptr;
dtbl.nfree = 0;
for (int i = 0; i < entries; i++) {
tbl.entry[i].magic = DATA_MAGIC;
tbl.entry[i].state = DATA_EMPTY;
tbl.entry[i].hasdata = false;
tbl.entry[i].data = &tbl.buf[i * bufsize];
dtbl.entry[i].magic = DATA_MAGIC;
dtbl.entry[i].state = DATA_EMPTY;
dtbl.entry[i].hasdata = false;
dtbl.entry[i].data = &dtbl.buf[i * bufsize];
VSTAILQ_INSERT_TAIL(&dtbl.freehead, &dtbl.entry[i], freelist);
dtbl.nfree++;
}
assert(dtbl.nfree == entries);
assert(VSTAILQ_FIRST(&dtbl.freehead));
atexit(data_Cleanup);
return(0);
}
/* XXX: set xid and DATA_OPEN in the entry */
dataentry
*DATA_Insert(unsigned xid)
/*
* take all free entries from the datatable for lockless
* allocation
*/
void
DATA_Take_Freelist(struct freehead_s *dst)
{
uint32_t h = h1(xid);
if (tbl.entry[INDEX(h)].state == DATA_EMPTY)
return(&tbl.entry[INDEX(h)]);
unsigned probes = 0;
tbl.collisions++;
h += h2(xid);
while (++probes <= tbl.len && tbl.entry[INDEX(h)].state != DATA_EMPTY)
h++;
tbl.insert_probes += probes;
if (probes > tbl.len) {
tbl.len_overflows++;
return(NULL);
}
return(&tbl.entry[INDEX(h)]);
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_CONCAT(dst, &dtbl.freehead);
dtbl.nfree = 0;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
}
/*
* return to dtbl.freehead
*
* returned must be locked by caller, if required
*/
void
DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
{
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_CONCAT(&dtbl.freehead, returned);
dtbl.nfree += nreturned;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
}
/* ------------------------------------------------------------ */
/* noMT Functions to be used by one thread (the VSL reader) only */
/* ------------------------------------------------------------ */
static struct freehead_s data_noMT_freelist =
VSTAILQ_HEAD_INITIALIZER(data_noMT_freelist);
static pthread_t data_noMT_threadid = 0;
#if defined(WITHOUT_EXPENSIVE_ASSERTS) || defined(WITHOUT_ASSERTS)
#define DATA_noMT_check_thread() do {} while(0)
#else
#define DATA_noMT_check_thread() \
assert(data_noMT_threadid == pthread_self());
#endif
/* the one thread has to register */
void
DATA_noMT_Register(void)
{
AZ(data_noMT_threadid);
data_noMT_threadid = pthread_self();
}
/* efficiently retrieve a single data entry */
dataentry
*DATA_Find(unsigned xid)
*DATA_noMT_Get(void)
{
dataentry *data;
DATA_noMT_check_thread();
take:
data = VSTAILQ_FIRST(&data_noMT_freelist);
if (data) {
VSTAILQ_REMOVE_HEAD(&data_noMT_freelist, freelist);
} else {
assert(VSTAILQ_EMPTY(&data_noMT_freelist));
while (dtbl.nfree == 0) {
dtbl.w_stats.wait_room++;
spmcq_wait(room);
}
DATA_Take_Freelist(&data_noMT_freelist);
assert(! VSTAILQ_EMPTY(&data_noMT_freelist));
goto take;
}
assert(data->state == DATA_EMPTY);
return (data);
}
/* return to our own local cache */
static inline void
data_nomt_free(dataentry *de)
{
uint32_t h = h1(xid);
if (tbl.entry[INDEX(h)].xid == xid)
return &tbl.entry[INDEX(h)];
h += h2(xid);
unsigned probes = 0;
while (++probes <= tbl.len && tbl.entry[INDEX(h)].xid != xid)
h++;
tbl.find_probes += probes;
if (probes > tbl.len)
return NULL;
return &tbl.entry[INDEX(h)];
DATA_noMT_check_thread();
assert(de->state == DATA_EMPTY);
VSTAILQ_INSERT_HEAD(&data_noMT_freelist, de, freelist);
}
void
DATA_Dump(void)
DATA_noMT_Free(dataentry *de)
{
data_nomt_free(de);
}
void
DATA_noMT_Submit(dataentry *de)
{
for (int i = 0; i < tbl.len; i++) {
dataentry entry = tbl.entry[i];
if (entry.state == DATA_EMPTY)
continue;
DATA_noMT_check_thread();
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
assert(de->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, de->data);
if (de->hasdata == false) {
de->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
data_nomt_free(de);
return;
}
while (!SPMCQ_Enq((void *) de)) {
dtbl.w_stats.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
spmcq_wait(room);
}
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
if (SPMCQ_need_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter), nworkers, (1 << config.qlen_goal_scale)))
spmcq_signal(data);
/*
* base case: wake up a worker if all are sleeping
*
* this is an un-synced access to spmcq_data_waiter, but
* if we don't wake them up now, we will next time around
*/
if (nworkers == spmcq_datawaiter)
spmcq_signal(data);
}
void
DATA_Dump1(dataentry *entry, int i)
{
if (entry->state == DATA_EMPTY)
return;
LOG_Log(LOG_INFO, "Data entry %d: XID=%d tid=%d state=%s data=[%.*s]",
i, entry.xid, entry.tid, statename[entry.state], entry.end,
entry.data);
}
i, entry->xid, entry->tid, statename[entry->state], entry->end,
entry->data);
}
void
DATA_Dump(void)
{
for (int i = 0; i < dtbl.len; i++)
DATA_Dump1(&dtbl.entry[i], i);
}
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <stdint.h>
#include <limits.h>
#include <stdbool.h>
#include "libvarnish.h"
#include "miniobj.h"
#include "trackrdrd.h"
#define INDEX(u) ((u) & (htbl.len - 1))
#ifdef UNUSED
static const char *statename[2] = { "EMPTY", "OPEN" };
#endif
/*
* N.B.: Hash functions defined for XIDs, which are declared in Varnish as
* unsigned int, assuming that they are 32 bit.
*/
#if UINT_MAX != UINT32_MAX
#error "Unsigned ints are not 32 bit"
#endif
#define rotr(v,n) (((v) >> (n)) | ((v) << (32 - (n))))
#define USE_JENKMULVEY1
#define h1(k) jenkmulvey1(k)
#define h2(k) wang(k)
#ifdef USE_JENKMULVEY1
/*
* http://home.comcast.net/~bretm/hash/3.html
* Bret Mulvey ascribes this to Bob Jenkins, but I can't find any
* reference to it by Jenkins himself.
*/
static uint32_t
jenkmulvey1(uint32_t n)
{
n += (n << 12);
n ^= (n >> 22);
n += (n << 4);
n ^= (n >> 9);
n += (n << 10);
n ^= (n >> 2);
n += (n << 7);
n ^= (n >> 12);
return(n);
}
#endif
#ifdef USE_JENKMULVEY2
/*
* http://home.comcast.net/~bretm/hash/4.html
* Mulvey's modification of the (alleged) Jenkins algorithm
*/
static uint32_t
jenkmulvey2(uint32_t n)
{
n += (n << 16);
n ^= (n >> 13);
n += (n << 4);
n ^= (n >> 7);
n += (n << 10);
n ^= (n >> 5);
n += (n << 8);
n ^= (n >> 16);
return(n);
}
#endif
/*
* http://www.cris.com/~Ttwang/tech/inthash.htm
*/
static uint32_t
wang(uint32_t n)
{
n = ~n + (n << 15); // n = (n << 15) - n - 1;
n ^= rotr(n,12);
n += (n << 2);
n ^= rotr(n,4);
n = (n + (n << 3)) + (n << 11);
n ^= rotr(n,16);
return n;
}
static void
hash_Cleanup(void)
{
free(htbl.entry);
}
/*
* all hash functions must only ever be called by the
* vsl reader thread!
*/
int
HASH_Init(void)
{
hashentry *entryptr;
int entries = 1 << config.maxopen_scale;
entryptr = (hashentry *) calloc(entries, sizeof(hashentry));
if (entryptr == NULL)
return(errno);
memset(&htbl, 0, sizeof(hashtable));
htbl.magic = HASHTABLE_MAGIC;
htbl.len = entries;
htbl.entry = entryptr;
VTAILQ_INIT(&htbl.insert_head);
htbl.max_probes = config.hash_max_probes;
htbl.ttl = config.hash_ttl;
htbl.mlt = config.hash_mlt;
/* entries init */
for (int i = 0; i < entries; i++) {
htbl.entry[i].magic = HASH_MAGIC;
htbl.entry[i].state = HASH_EMPTY;
}
atexit(hash_Cleanup);
return(0);
}
static inline void
hash_free(hashentry *he)
{
VTAILQ_REMOVE(&htbl.insert_head, he, insert_list);
he->state = HASH_EMPTY;
he->de = NULL;
htbl.open--;
}
static inline void
hash_submit(hashentry *he)
{
dataentry *de = he->de;
assert(he->xid == de->xid);
DATA_noMT_Submit(de);
}
static inline void
incomplete(hashentry *he)
{
dataentry *de;
de = he->de;
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
de->incomplete = true;
MON_StatsUpdate(STATS_DONE);
de->state = DATA_DONE;
}
void
HASH_Exp(float limit)
{
hashentry *he;
float p_insert_time = 0.0;
while ((he = VTAILQ_FIRST(&htbl.insert_head))) {
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
if (he->insert_time > limit)
return;
assert(p_insert_time <= he->insert_time);
p_insert_time = he->insert_time;
LOG_Log(LOG_DEBUG, "expire: hash=%u insert_time=%f limit=%f",
he->xid, he->insert_time, limit);
htbl.expired++;
incomplete(he);
hash_submit(he);
hash_free(he);
}
}
void
HASH_Submit(hashentry *he)
{
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
LOG_Log(LOG_DEBUG, "submit: hash=%u", he->xid);
hash_submit(he);
hash_free(he);
}
/* like Submit, but for recrods in HASH_OPEN */
void
HASH_Evacuate(hashentry *he)
{
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
LOG_Log(LOG_DEBUG, "evacuate: hash=%u insert_time=%f",
he->xid, he->insert_time);
htbl.evacuated++;
incomplete(he);
hash_submit(he);
hash_free(he);
}
hashentry
*HASH_Insert(const unsigned xid, dataentry *de, const float t)
{
hashentry *he, *oldest;
unsigned probes = 0;
uint32_t h = h1(xid);
const uint32_t h2 = h2(xid);
he = &htbl.entry[INDEX(h)];
if (he->state == HASH_EMPTY)
goto ok;
htbl.collisions++;
oldest = he;
do {
h += h2;
he = &htbl.entry[INDEX(h)];
probes++;
if (he->state == HASH_EMPTY)
goto ok;
if (he->insert_time < oldest->insert_time)
oldest = he;
} while (probes <= htbl.max_probes);
/* none eligable for evacuation */
if ((oldest->insert_time + htbl.mlt) > t) {
htbl.fail++;
htbl.insert_probes += probes;
return (NULL);
}
HASH_Evacuate(oldest);
he = oldest;
ok:
htbl.insert_probes += probes;
he->state = HASH_OPEN;
he->xid = xid;
he->insert_time = t;
VTAILQ_INSERT_TAIL(&htbl.insert_head, he, insert_list);
he->de = de;
/* stats */
htbl.open++;
if (htbl.open > htbl.occ_hi)
htbl.occ_hi = htbl.open;
if (htbl.open > htbl.occ_hi_this)
htbl.occ_hi_this = htbl.open;
return(he);
}
hashentry
*HASH_Find(const unsigned xid)
{
hashentry *he;
unsigned probes = 0;
uint32_t h = h1(xid);
const uint32_t h2 = h2(xid);
he = &htbl.entry[INDEX(h)];
if (he->xid == xid)
return (he);
do {
h += h2;
he = &htbl.entry[INDEX(h)];
probes++;
if (he->xid == xid)
break;
} while (probes <= htbl.max_probes);
htbl.find_probes += probes;
if (probes > htbl.max_probes)
return NULL;
return (he);
}
void
HASH_Dump1(hashentry *entry, int i)
{
if (entry->state == HASH_EMPTY)
return;
LOG_Log(LOG_INFO, "Hash entry %d: XID=%d",
i, entry->xid);
DATA_Dump1(entry->de, 0);
assert(entry->xid == entry->de->xid);
}
void
HASH_Dump(void)
{
for (int i = 0; i < htbl.len; i++)
HASH_Dump1(&htbl.entry[i], i);
}
......@@ -44,15 +44,78 @@ static void
log_output(void)
{
LOG_Log(LOG_INFO,
"Data table: len=%u collisions=%u insert_probes=%u find_probes=%u "
"open=%u done=%u load=%.2f len_overflows=%u data_overflows=%u "
"occ_hi=%u seen=%u submitted=%u nodata=%u sent=%u failed=%u "
"wait_qfull=%u data_hi=%u",
tbl.len, tbl.collisions, tbl.insert_probes, tbl.find_probes,
tbl.open, tbl.done, 100.0 * ((float) tbl.open + tbl.done) / tbl.len,
tbl.len_overflows, tbl.data_overflows, tbl.occ_hi, tbl.seen,
tbl.submitted, tbl.nodata, tbl.sent, tbl.failed, tbl.wait_qfull,
tbl.data_hi);
"Hash table: "
"len=%u "
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
htbl.len,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
100.0 * htbl.open / htbl.len,
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.occ_hi_this = 0;
LOG_Log(LOG_INFO,
"Data table writer: "
"len=%u "
"nodata=%u "
"submitted=%u "
"wait_qfull=%u "
"wait_room=%u "
"data_hi=%u "
"data_overflows=%u ",
dtbl.len,
dtbl.w_stats.nodata,
dtbl.w_stats.submitted,
dtbl.w_stats.wait_qfull,
dtbl.w_stats.wait_room,
dtbl.w_stats.data_hi,
dtbl.w_stats.data_overflows);
LOG_Log(LOG_INFO,
"Data table reader: "
"done=%u "
"open=%u "
"load=%.2f "
"sent=%u "
"failed=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
dtbl.r_stats.done,
dtbl.r_stats.open,
(100.0 * (1.0 * dtbl.r_stats.done + 1.0 * dtbl.r_stats.open) / dtbl.len),
dtbl.r_stats.sent,
dtbl.r_stats.failed,
dtbl.r_stats.occ_hi,
dtbl.r_stats.occ_hi_this
);
/* locking would be overkill */
dtbl.r_stats.occ_hi_this = 0;
if (config.monitor_workers)
WRK_Stats();
}
......@@ -109,49 +172,52 @@ MON_StatusShutdown(pthread_t monitor)
run = 0;
AZ(pthread_cancel(monitor));
AZ(pthread_join(monitor, NULL));
AZ(pthread_mutex_destroy(&dtbl.r_stats.mutex));
}
void
MON_StatsInit(void)
{
AZ(pthread_mutex_init(&stats_update_lock, NULL));
AZ(pthread_mutex_init(&dtbl.r_stats.mutex, &attr_lock));
}
void
MON_StatsUpdate(stats_update_t update)
{
AZ(pthread_mutex_lock(&stats_update_lock));
AZ(pthread_mutex_lock(&dtbl.r_stats.mutex));
switch(update) {
case STATS_SENT:
tbl.sent++;
tbl.done--;
dtbl.r_stats.sent++;
dtbl.r_stats.done--;
break;
case STATS_FAILED:
tbl.failed++;
tbl.done--;
dtbl.r_stats.failed++;
dtbl.r_stats.done--;
break;
case STATS_DONE:
tbl.done++;
tbl.open--;
dtbl.r_stats.done++;
dtbl.r_stats.open--;
break;
case STATS_OCCUPANCY:
tbl.open++;
if (tbl.open + tbl.done > tbl.occ_hi)
tbl.occ_hi = tbl.open + tbl.done;
dtbl.r_stats.open++;
if (dtbl.r_stats.open + dtbl.r_stats.done > dtbl.r_stats.occ_hi)
dtbl.r_stats.occ_hi = dtbl.r_stats.open + dtbl.r_stats.done;
if (dtbl.r_stats.open + dtbl.r_stats.done > dtbl.r_stats.occ_hi_this)
dtbl.r_stats.occ_hi_this = dtbl.r_stats.open + dtbl.r_stats.done;
break;
case STATS_NODATA:
tbl.nodata++;
tbl.done--;
dtbl.w_stats.nodata++;
dtbl.r_stats.done--;
break;
default:
/* Unreachable */
AN(NULL);
}
AZ(pthread_mutex_unlock(&stats_update_lock));
AZ(pthread_mutex_unlock(&dtbl.r_stats.mutex));
}
......@@ -37,6 +37,7 @@
#include "trackrdrd.h"
#include "vas.h"
#include "vmb.h"
static pthread_mutex_t spmcq_deq_lock;
......@@ -48,6 +49,24 @@ spmcq_len(void)
return UINT_MAX - spmcq.head + 1 + spmcq.tail;
}
/*
* this is only approximately correct and could even become negative when values
* get updated while we read them!
*
*/
int SPMCQ_Len(void) {
unsigned l;
do {
l = spmcq_len();
if (l <= spmcq.mask + 1)
break;
VRMB();
} while (1);
return (l);
}
static void
spmcq_cleanup(void)
{
......@@ -60,7 +79,7 @@ SPMCQ_Init(void)
{
void *buf;
size_t n = 1 << (MIN_TABLE_SCALE + config.maxopen_scale);
size_t n = 1 << config.maxdone_scale;
buf = calloc(n, sizeof(void *));
if (buf == NULL)
return(errno);
......@@ -108,7 +127,7 @@ main(int argc, char * const *argv)
printf("\nTEST: %s\n", argv[0]);
printf("... test SMPCQ enqueue at UINT_MAX overflow\n");
config.maxopen_scale = 0;
config.maxopen_scale = 10;
SPMCQ_Init();
spmcq.head = spmcq.tail = UINT_MAX - 2;
......@@ -120,6 +139,7 @@ main(int argc, char * const *argv)
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(spmcq_len() == 7);
assert(SPMCQ_Len() == 7);
printf("%s: 1 test run\n", argv[0]);
exit(0);
......
This diff is collapsed.
This diff is collapsed.
......@@ -4,7 +4,8 @@
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
......@@ -37,27 +38,47 @@
#include "trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
#include "vmb.h"
int nworkers = 0;
typedef enum {
WRK_NOTSTARTED = 0,
WRK_INITIALIZING,
WRK_RUNNING,
WRK_WAITING,
WRK_SHUTTINGDOWN,
WRK_EXITED
WRK_EXITED,
WRK_STATE_E_LIMIT
} wrk_state_e;
typedef struct {
static const char* statename[WRK_STATE_E_LIMIT] = {
[WRK_NOTSTARTED] = "not started",
[WRK_INITIALIZING] = "initializing",
[WRK_RUNNING] = "running",
[WRK_WAITING] = "waiting",
[WRK_SHUTTINGDOWN] = "shutting down",
[WRK_EXITED] = "exited"};
struct worker_data_s {
unsigned magic;
#define WORKER_DATA_MAGIC 0xd8eef137
unsigned id;
pthread_t tid;
unsigned status; /* exit status */
wrk_state_e state;
/* per-worker freelist - return space in chunks */
struct freehead_s wrk_freelist;
unsigned wrk_nfree;
/* stats */
unsigned deqs;
unsigned waits;
unsigned sends;
unsigned fails;
} worker_data_t;
};
typedef struct worker_data_s worker_data_t;
typedef struct {
pthread_t worker;
......@@ -66,8 +87,6 @@ typedef struct {
static unsigned run, cleaned = 0;
static thread_data_t *thread_data;
static const char* statename[5] = { "not started", "initializing", "running",
"shutting down", "exited" };
static inline void
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
......@@ -78,6 +97,7 @@ wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
assert(entry->state == DATA_DONE);
AN(amq_worker);
/* XXX: report entry->incomplete to backend ? */
err = MQ_Send(amq_worker, entry->data, entry->end);
if (err != NULL) {
/* XXX: error recovery? reconnect? preserve the data? */
......@@ -94,9 +114,16 @@ wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
entry->end, entry->data);
}
entry->state = DATA_EMPTY;
/* From Varnish vmb.h -- platform-independent write memory barrier */
VWMB();
AZ(pthread_cond_signal(&spmcq_nonfull_cond));
VSTAILQ_INSERT_TAIL(&wrk->wrk_freelist, entry, freelist);
wrk->wrk_nfree++;
if (dtbl.nfree == 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
assert(VSTAILQ_EMPTY(&wrk->wrk_freelist));
}
spmcq_signal(room);
}
static void
......@@ -110,7 +137,8 @@ static void
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING;
wrk->tid = pthread_self();
err = MQ_WorkerInit(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
......@@ -120,6 +148,9 @@ static void
pthread_exit((void *) wrk);
}
VSTAILQ_INIT(&wrk->wrk_freelist);
wrk->wrk_nfree = 0;
wrk->state = WRK_RUNNING;
while (run) {
......@@ -127,20 +158,49 @@ static void
if (entry != NULL) {
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
continue;
/* should we go to sleep ? */
if (SPMCQ_stop_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter), nworkers, (1 << config.qlen_goal_scale)))
goto sleep;
continue;
}
/* Queue is empty, wait until data are available, or quit is
signaled.
Grab the CV lock, which also constitutes an implicit memory
barrier */
AZ(pthread_mutex_lock(&spmcq_nonempty_lock));
/* run is guaranteed to be fresh here */
if (run) {
wrk->waits++;
AZ(pthread_cond_wait(&spmcq_nonempty_cond,
&spmcq_nonempty_lock));
sleep:
/* return space before sleeping */
if (wrk->wrk_nfree > 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
}
/*
* Queue is empty or we should backoff
*
* wait until data are available, or quit is signaled.
*
* Grab the CV lock, which also constitutes an implicit memory
* barrier
*/
AZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
/*
* run is guaranteed to be fresh here
*
* also re-check the stop condition under the lock
*/
if (run &&
((! entry) ||
SPMCQ_stop_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter),
nworkers, (1 << config.qlen_goal_scale)))) {
wrk->waits++;
spmcq_datawaiter++;
wrk->state = WRK_WAITING;
AZ(pthread_cond_wait(&spmcq_datawaiter_cond,
&spmcq_datawaiter_lock));
spmcq_datawaiter--;
wrk->state = WRK_RUNNING;
}
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
AZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
}
wrk->state = WRK_SHUTTINGDOWN;
......@@ -171,10 +231,10 @@ static void wrk_cleanup(void)
for (int i = 0; i < config.nworkers; i++)
free(thread_data[i].wrk_data);
free(thread_data);
AZ(pthread_mutex_destroy(&spmcq_nonempty_lock));
AZ(pthread_cond_destroy(&spmcq_nonempty_cond));
AZ(pthread_mutex_destroy(&spmcq_nonfull_lock));
AZ(pthread_cond_destroy(&spmcq_nonfull_cond));
AZ(pthread_mutex_destroy(&spmcq_datawaiter_lock));
AZ(pthread_cond_destroy(&spmcq_datawaiter_cond));
AZ(pthread_mutex_destroy(&spmcq_roomwaiter_lock));
AZ(pthread_cond_destroy(&spmcq_roomwaiter_cond));
cleaned = 1;
}
......@@ -183,6 +243,7 @@ WRK_Init(void)
{
thread_data
= (thread_data_t *) malloc(config.nworkers * sizeof(thread_data_t));
if (thread_data == NULL) {
LOG_Log(LOG_ALERT, "Cannot allocate thread data: %s", strerror(errno));
return(errno);
......@@ -204,11 +265,18 @@ WRK_Init(void)
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = 0;
wrk->state = WRK_NOTSTARTED;
}
AZ(pthread_mutex_init(&spmcq_nonempty_lock, NULL));
AZ(pthread_cond_init(&spmcq_nonempty_cond, NULL));
AZ(pthread_mutex_init(&spmcq_nonfull_lock, NULL));
AZ(pthread_cond_init(&spmcq_nonfull_cond, NULL));
spmcq_datawaiter = 0;
AZ(pthread_mutex_init(&spmcq_datawaiter_lock, &attr_lock));
AZ(pthread_cond_init(&spmcq_datawaiter_cond, &attr_cond));
spmcq_roomwaiter = 0;
AZ(pthread_mutex_init(&spmcq_roomwaiter_lock, &attr_lock));
AZ(pthread_cond_init(&spmcq_roomwaiter_cond, &attr_cond));
AZ(pthread_mutexattr_destroy(&attr_lock));
AZ(pthread_condattr_destroy(&attr_cond));
atexit(wrk_cleanup);
return 0;
}
......@@ -231,8 +299,8 @@ WRK_Stats(void)
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO, "Worker %d (%s): seen=%d waits=%d sent=%d failed=%d",
wrk->id, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends,
LOG_Log(LOG_INFO, "Worker %d tid %u (%s): seen=%d waits=%d sent=%d failed=%d",
wrk->id, wrk->tid, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends,
wrk->fails);
}
}
......@@ -248,7 +316,9 @@ WRK_Running(void)
wrk = thread_data[i].wrk_data;
if (wrk->state > WRK_INITIALIZING)
initialized++;
if (wrk->state == WRK_RUNNING || wrk->state == WRK_SHUTTINGDOWN)
if (wrk->state == WRK_RUNNING ||
wrk->state == WRK_SHUTTINGDOWN ||
wrk->state == WRK_WAITING)
running++;
}
if (initialized == config.nworkers)
......@@ -260,14 +330,14 @@ void
WRK_Halt(void)
{
/*
* must only modify run under spmcq_nonempty_lock to ensure that
* must only modify run under spmcq_datawaiter_lock to ensure that
* we signal all waiting consumers (otherwise a consumer could go
* waiting _after_ we have broadcasted and so miss the event.
*/
AZ(pthread_mutex_lock(&spmcq_nonempty_lock));
AZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
run = 0;
AZ(pthread_cond_broadcast(&spmcq_nonempty_cond));
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
AZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
AZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
for(int i = 0; i < config.nworkers; i++) {
AZ(pthread_join(thread_data[i].worker,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment