Commit 98665466 authored by Nils Goroll's avatar Nils Goroll Committed by Geoff Simmons

Various performance and stability improvements, hash/data table separation

major changes
=============

hash/data table
---------------

The hash table is now only used for _OPEN records, and the actual data
is stored in a data table. Upon submit, hash entries are cleared and
data continues to live in the data table until it gets freed by a
worker (or upon submit if it is a NODATA record).

This drastically reduces the hash table load and significantly
increases worst case performance. In particular, the hash table load
is now independend of ActiveMQ backend performance (read: stalls).

Preliminary recommendations fon table sizing:

* hash table: double max_sessions from varnish

  e.g.

  maxopen.scale = 16

  for 64K hash table entries to support >32K sessions
  (savely and efficiently)

* data table: max(req/s) * max(ActiveMQ stall time)

  e.g. to survive 8000 req/s with 60 seconds ActiveMQ stall time,
  the data table should be >240K in size, so

  maxdone.scale = 19

  (= 512K entries) should be on the safe side also to provide
  sufficient buffer for temporary load peaks

hash table performance
----------------------

Previously, the maximum number of probes to the hash table was set to
the hash table size - which resulted in bad insert performance and
even worse lookup performance.

Now that the hash table is freed of _OPEN records, we can remove this
burden and limit the maximum number of probles to a sensible value (10
to start with, configurable as hash_max_probes.

As another consequence, as we don't require 100% capacity on the hash
table, we don't need to run an exhaustive search upon insert. Thus,
probing has been changed from liner to hash (by h2()).

only ever insert on ReqStart - and drop if we can't
---------------------------------------------------

Keeping up with the VSL is essential. Once we fall behind, we are in
real trouble:

- If we miss ReqEnd, we will clobber our hash, with drastic effects:
  - hash lookups become inefficient
  - inserts become more likely to fail
  - before we had HASH_Exp (see below), the hash would become useless

- When the VSL writer overtakes our reader, we will see corrupt data
  and miss _many_ VCL Logs and ReqEnds (as many as can be found in the
  whole VSL), so, again, our hash and data arrays will get clobbered
  with incomplete data (which needs to be cleaned up by HASH_Exp).

The latter point is the most relevant, corrupt records are likely to
trigger assertions.

Thus, keeping up with the VSL needs to be our primary objective. When
the VSL overtakes, we will loose a massive amount auf reconds anyway
(and we won't even know how many). As long as we don't stop Varnish
when we fall behind, we can't avoid loosing records under certain
circumstances anway (for instance, when the backend stalls and the
data table runs full), so we should rather drop early, in a controlled
manner - and without drastic performance penalty.

Under this doctrine, it does not make sense to insert records for
VSL_Log or ReqEnd, so if an xid can't be found for these tags, the
respective events will get dropped (and logged).

performance optimizations
=========================

spmcq reader/writer synchronization
-----------------------------------

Various measures have been implemented to reduce syscall and general
function call overhead for reader/writer synchroniration on the
spmcq. Previously, the writer would issue a pthread_cond_signal to
potentially wake up a reader, irrespective of whether or not a reader
was actually blocking on the CV.

- now, the number of waiting readers (workers) is modified inside a
  lock, but queried first from outside the lock, so if there are no
  readers waiting the CV is not signalled.

- The number of running readers is (attempted to be) kept proportional
  to the queue length for queue lengths between 0 and
  2^qlen_goal.scale to further reduce the number of worker thread
  block/wakeup transitions under low to averade load.

pthread_mutex / ptherad_condvar attributes
------------------------------------------

Attributes are now being used to allow the O/S implementation to
choose more efficient low-level synchronization primitives because we
know that we are using these only within one multi-threaded process.

data table freelist
-------------------

To allow for efficient allocation of new data table entries, a free
list with local caches is maintained:

- The data writer (VSL reader thread) maintains its own freelist and
  serves requests from it without any synchronization overhead.

- Only when the data writer's own freelist is exchausted will it
  access the global freelist (under a lock). It will take the whole
  list at once and resume serving new records from its own cache.

- Workers also maintain their own freelist of entries to be returned
  to the global freelist as long as

  - they are running
  - there are entries on the global list.

  Before a worker thread goes to block on the spmcq condvar, it
  returns all its freelist entries to the global freelist. Also, it
  will always check if the global list is empty and return any entries
  immediately if it is.

stability improvements
======================

record timeouts
---------------

Every hash entry gets added to the insert_list ordered by insertion
time. Not any more often then x seconds (currently hard-coded to x=10,
check only performed when ReqStart is seen), the list is checked for
records which have reached their ttl (configured by hash_ttl, default
120 seconds). These get submitted despite the fact that no ReqEnd has
been seen - under the assumption that no ReqEnd is ever to be expected
after a certain time has passed.

hash evacuation
---------------

If no free entry is found when probing all possible locations for an
insert, the oldest record is evacuated from the hash and submitted to
the backend if its live time has exceeded hash_mlt under the
assumption that it is better to submit records early (which are likely
to carry useful log information already) than throwing away records.

If this behavior is not desired, hash_mtl can be set to hash_ttl.

various code changes
====================

* statistics have been reorganized to seperate out
  - hash
  - data writer/VSL reader
  - data reader/worker (partially shared with writer)
  statistics

* print the native thread ID for workers (to allow to correllation
  with prstat/top output)

* workers have a new state when blocking on the spmcq CV: WRK_WAITING
  / "waiting" in monitor output

* because falling behind with VSL reading (the VSL writer overtaking
  our reader) is so bad, notices are logged whenever the new VSL data
  pointer is less than the previous one, iow the VSL ring buffer
  wraps.

  this is not the same as a detection of the VSL writer overtaking
  (which would require varnishapi changes), but noting information and
  some statistics about VSL wraps can (and did) help analyze track
  down strance issues to VSL overtaking.

config file changes
===================

* The _scale options

  maxopen.scale
  maxdone.scale (new, see below)
  maxdata.scale

  are now being used directly, rather than in addition to a base value
  of 10 as before.

  10 is now the minimum value and an EINVAL error will get thrown
  when lower values are used in the config file.

new config options
==================

see trackrdrd.h for documentation in comments:

* maxdone.scale

  Scale for records in _DONE states, determines size of
  - the data table (which is maxopen + maxdone)
  - the spmcq

* qlen_goal.scale

  Scale for the spmcq queue length goal. All worker threads will be
  used when the queue length corresponding to the scale is reached.

  For shorter queue lengths, the number of worker threads will be
  scaled propotionally.

* hash_max_probes

  Maximum number of probes to the hash.

  Smaller values increase efficiency, but reduce the capacity of the
  hash (more ReqStart records may get lost) - and vice versa for
  higher values.

* hash_ttl

  Maximum time to live for records in the _OPEN state

  Entries which are older than this ttl _may_ get expired from the
  trackrdrd state.

  This should get set to a value significantly longer than your
  maximum session lifetime in Varnish.

* hash_mlt

  Minimum lifetime for entries in HASH_OPEN before they could get
  evacuated.

  Entries are guaranteed to remain in trackrdrd for this duration.
  Once the mlt is reached, they _may_ get expired when trackrdrd needs
  space in the hash.
parent 705bab6d
......@@ -10,6 +10,7 @@ trackrdrd_SOURCES = \
parse.c \
log.c \
config.c \
hash.c \
data.c \
monitor.c \
mq.c \
......
......@@ -101,6 +101,19 @@ conf_getUnsignedInt(const char *rval, unsigned *i)
return(0); \
}
#define confUnsignedMinVal(name,fld,min) \
if (strcmp(lval, name) == 0) { \
unsigned int i; \
int err = conf_getUnsignedInt(rval, &i); \
if (err != 0) \
return err; \
if (i < min) \
return (EINVAL); \
config.fld = i; \
return(0); \
}
int
CONF_Add(const char *lval, const char *rval)
{
......@@ -114,8 +127,13 @@ CONF_Add(const char *lval, const char *rval)
confString("mq.uri", mq_uri);
confString("mq.qname", mq_qname);
confUnsigned("maxopen.scale", maxopen_scale);
confUnsigned("maxdata.scale", maxdata_scale);
confUnsignedMinVal("maxopen.scale", maxopen_scale, MIN_MAXOPEN_SCALE);
confUnsignedMinVal("maxdone.scale", maxdone_scale, MIN_MAXDONE_SCALE);
confUnsignedMinVal("maxdata.scale", maxdata_scale, MIN_MAXDATA_SCALE);
confUnsigned("qlen_goal.scale", qlen_goal_scale);
confUnsigned("hash_max_probes", hash_max_probes);
confUnsigned("hash_ttl", hash_ttl);
confUnsigned("hash_mlt", hash_mlt);
confUnsigned("nworkers", nworkers);
confUnsigned("restarts", restarts);
confUnsigned("monitor.interval", monitor_interval);
......@@ -200,8 +218,14 @@ CONF_Init(void)
config.monitor_interval = 30;
config.monitor_workers = false;
config.processor_log[0] = '\0';
config.maxopen_scale = 0;
config.maxdata_scale = 0;
config.maxopen_scale = MIN_MAXOPEN_SCALE;
config.maxdone_scale = MIN_MAXDONE_SCALE;
config.maxdata_scale = MIN_MAXDATA_SCALE;
config.qlen_goal_scale = DEF_QLEN_GOAL_SCALE;
config.hash_max_probes = DEF_HASH_MAX_PROBES;
config.hash_ttl = DEF_HASH_TTL;
config.hash_mlt = DEF_HASH_MTL;
config.mq_uri[0] = '\0';
config.mq_qname[0] = '\0';
config.nworkers = 1;
......@@ -283,6 +307,12 @@ CONF_Dump(void)
confdump("processor.log = %s", config.processor_log);
confdump("maxopen.scale = %u", config.maxopen_scale);
confdump("maxdata.scale = %u", config.maxdata_scale);
confdump("qlen_goal.scale = %u", config.qlen_goal_scale);
confdump("hash_max_probes", config.hash_max_probes);
confdump("hash_ttl", config.hash_ttl);
confdump("hash_mlt", config.hash_mlt);
confdump("mq.uri = %s", config.mq_uri);
confdump("mq.qname = %s", config.mq_qname);
confdump("nworkers = %u", config.nworkers);
......
......@@ -4,7 +4,8 @@
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
......@@ -30,97 +31,20 @@
*/
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <stdint.h>
#include <limits.h>
#include <stdbool.h>
#include "libvarnish.h"
#include "miniobj.h"
#include <syslog.h>
#include "trackrdrd.h"
#define MIN_DATA_SCALE 10
#define INDEX(u) ((u) & (tbl.len - 1))
#include "vas.h"
#include "miniobj.h"
static const char *statename[3] = { "EMPTY", "OPEN", "DONE" };
/*
* N.B.: Hash functions defined for XIDs, which are declared in Varnish as
* unsigned int, assuming that they are 32 bit.
*/
#if UINT_MAX != UINT32_MAX
#error "Unsigned ints are not 32 bit"
#endif
#define rotr(v,n) (((v) >> (n)) | ((v) << (32 - (n))))
#define USE_JENKMULVEY1
#define h1(k) jenkmulvey1(k)
#define h2(k) wang(k)
#ifdef USE_JENKMULVEY1
/*
* http://home.comcast.net/~bretm/hash/3.html
* Bret Mulvey ascribes this to Bob Jenkins, but I can't find any
* reference to it by Jenkins himself.
*/
static uint32_t
jenkmulvey1(uint32_t n)
{
n += (n << 12);
n ^= (n >> 22);
n += (n << 4);
n ^= (n >> 9);
n += (n << 10);
n ^= (n >> 2);
n += (n << 7);
n ^= (n >> 12);
return(n);
}
#endif
#ifdef USE_JENKMULVEY2
/*
* http://home.comcast.net/~bretm/hash/4.html
* Mulvey's modification of the (alleged) Jenkins algorithm
*/
static uint32_t
jenkmulvey2(uint32_t n)
{
n += (n << 16);
n ^= (n >> 13);
n += (n << 4);
n ^= (n >> 7);
n += (n << 10);
n ^= (n >> 5);
n += (n << 8);
n ^= (n >> 16);
return(n);
}
#endif
/*
* http://www.cris.com/~Ttwang/tech/inthash.htm
*/
static uint32_t
wang(uint32_t n)
{
n = ~n + (n << 15); // n = (n << 15) - n - 1;
n ^= rotr(n,12);
n += (n << 2);
n ^= rotr(n,4);
n = (n + (n << 3)) + (n << 11);
n ^= rotr(n,16);
return n;
}
static void
data_Cleanup(void)
{
free(tbl.entry);
free(tbl.buf);
free(dtbl.entry);
free(dtbl.buf);
AZ(pthread_mutex_destroy(&dtbl.freelist_lock));
}
int
......@@ -129,8 +53,13 @@ DATA_Init(void)
dataentry *entryptr;
char *bufptr;
int bufsize = 1 << (config.maxdata_scale + MIN_DATA_SCALE);
int entries = 1 << (config.maxopen_scale + MIN_TABLE_SCALE);
int bufsize = 1 << config.maxdata_scale;
/*
* we want enough space to accomodate all open and done records
*
*/
int entries = (1 << config.maxopen_scale) + (1 << config.maxdone_scale);
entryptr = (dataentry *) calloc(entries, sizeof(dataentry));
if (entryptr == NULL)
......@@ -142,69 +71,181 @@ DATA_Init(void)
return(errno);
}
datatable init_tbl =
{ .magic = DATATABLE_MAGIC, .len = entries, .collisions = 0,
.insert_probes = 0, .find_probes = 0, .seen = 0, .open = 0, .done = 0,
.nodata = 0, .len_overflows = 0, .data_overflows = 0, .submitted = 0,
.occ_hi = 0, .data_hi = 0, .entry = entryptr, .buf = bufptr };
memcpy(&tbl, &init_tbl, sizeof(datatable));
memset(&dtbl, 0, sizeof(datatable));
dtbl.magic = DATATABLE_MAGIC;
dtbl.len = entries;
VSTAILQ_INIT(&dtbl.freehead);
AZ(pthread_mutex_init(&dtbl.freelist_lock, &attr_lock));
dtbl.entry = entryptr;
dtbl.buf = bufptr;
dtbl.nfree = 0;
for (int i = 0; i < entries; i++) {
tbl.entry[i].magic = DATA_MAGIC;
tbl.entry[i].state = DATA_EMPTY;
tbl.entry[i].hasdata = false;
tbl.entry[i].data = &tbl.buf[i * bufsize];
dtbl.entry[i].magic = DATA_MAGIC;
dtbl.entry[i].state = DATA_EMPTY;
dtbl.entry[i].hasdata = false;
dtbl.entry[i].data = &dtbl.buf[i * bufsize];
VSTAILQ_INSERT_TAIL(&dtbl.freehead, &dtbl.entry[i], freelist);
dtbl.nfree++;
}
assert(dtbl.nfree == entries);
assert(VSTAILQ_FIRST(&dtbl.freehead));
atexit(data_Cleanup);
return(0);
}
/* XXX: set xid and DATA_OPEN in the entry */
dataentry
*DATA_Insert(unsigned xid)
/*
* take all free entries from the datatable for lockless
* allocation
*/
void
DATA_Take_Freelist(struct freehead_s *dst)
{
uint32_t h = h1(xid);
if (tbl.entry[INDEX(h)].state == DATA_EMPTY)
return(&tbl.entry[INDEX(h)]);
unsigned probes = 0;
tbl.collisions++;
h += h2(xid);
while (++probes <= tbl.len && tbl.entry[INDEX(h)].state != DATA_EMPTY)
h++;
tbl.insert_probes += probes;
if (probes > tbl.len) {
tbl.len_overflows++;
return(NULL);
}
return(&tbl.entry[INDEX(h)]);
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_CONCAT(dst, &dtbl.freehead);
dtbl.nfree = 0;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
}
/*
* return to dtbl.freehead
*
* returned must be locked by caller, if required
*/
void
DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
{
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_CONCAT(&dtbl.freehead, returned);
dtbl.nfree += nreturned;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
}
/* ------------------------------------------------------------ */
/* noMT Functions to be used by one thread (the VSL reader) only */
/* ------------------------------------------------------------ */
static struct freehead_s data_noMT_freelist =
VSTAILQ_HEAD_INITIALIZER(data_noMT_freelist);
static pthread_t data_noMT_threadid = 0;
#if defined(WITHOUT_EXPENSIVE_ASSERTS) || defined(WITHOUT_ASSERTS)
#define DATA_noMT_check_thread() do {} while(0)
#else
#define DATA_noMT_check_thread() \
assert(data_noMT_threadid == pthread_self());
#endif
/* the one thread has to register */
void
DATA_noMT_Register(void)
{
AZ(data_noMT_threadid);
data_noMT_threadid = pthread_self();
}
/* efficiently retrieve a single data entry */
dataentry
*DATA_Find(unsigned xid)
*DATA_noMT_Get(void)
{
dataentry *data;
DATA_noMT_check_thread();
take:
data = VSTAILQ_FIRST(&data_noMT_freelist);
if (data) {
VSTAILQ_REMOVE_HEAD(&data_noMT_freelist, freelist);
} else {
assert(VSTAILQ_EMPTY(&data_noMT_freelist));
while (dtbl.nfree == 0) {
dtbl.w_stats.wait_room++;
spmcq_wait(room);
}
DATA_Take_Freelist(&data_noMT_freelist);
assert(! VSTAILQ_EMPTY(&data_noMT_freelist));
goto take;
}
assert(data->state == DATA_EMPTY);
return (data);
}
/* return to our own local cache */
static inline void
data_nomt_free(dataentry *de)
{
uint32_t h = h1(xid);
if (tbl.entry[INDEX(h)].xid == xid)
return &tbl.entry[INDEX(h)];
h += h2(xid);
unsigned probes = 0;
while (++probes <= tbl.len && tbl.entry[INDEX(h)].xid != xid)
h++;
tbl.find_probes += probes;
if (probes > tbl.len)
return NULL;
return &tbl.entry[INDEX(h)];
DATA_noMT_check_thread();
assert(de->state == DATA_EMPTY);
VSTAILQ_INSERT_HEAD(&data_noMT_freelist, de, freelist);
}
void
DATA_Dump(void)
DATA_noMT_Free(dataentry *de)
{
data_nomt_free(de);
}
void
DATA_noMT_Submit(dataentry *de)
{
for (int i = 0; i < tbl.len; i++) {
dataentry entry = tbl.entry[i];
if (entry.state == DATA_EMPTY)
continue;
DATA_noMT_check_thread();
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
assert(de->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, de->data);
if (de->hasdata == false) {
de->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
data_nomt_free(de);
return;
}
while (!SPMCQ_Enq((void *) de)) {
dtbl.w_stats.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
spmcq_wait(room);
}
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
if (SPMCQ_need_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter), nworkers, (1 << config.qlen_goal_scale)))
spmcq_signal(data);
/*
* base case: wake up a worker if all are sleeping
*
* this is an un-synced access to spmcq_data_waiter, but
* if we don't wake them up now, we will next time around
*/
if (nworkers == spmcq_datawaiter)
spmcq_signal(data);
}
void
DATA_Dump1(dataentry *entry, int i)
{
if (entry->state == DATA_EMPTY)
return;
LOG_Log(LOG_INFO, "Data entry %d: XID=%d tid=%d state=%s data=[%.*s]",
i, entry.xid, entry.tid, statename[entry.state], entry.end,
entry.data);
}
i, entry->xid, entry->tid, statename[entry->state], entry->end,
entry->data);
}
void
DATA_Dump(void)
{
for (int i = 0; i < dtbl.len; i++)
DATA_Dump1(&dtbl.entry[i], i);
}
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <stdint.h>
#include <limits.h>
#include <stdbool.h>
#include "libvarnish.h"
#include "miniobj.h"
#include "trackrdrd.h"
#define INDEX(u) ((u) & (htbl.len - 1))
#ifdef UNUSED
static const char *statename[2] = { "EMPTY", "OPEN" };
#endif
/*
* N.B.: Hash functions defined for XIDs, which are declared in Varnish as
* unsigned int, assuming that they are 32 bit.
*/
#if UINT_MAX != UINT32_MAX
#error "Unsigned ints are not 32 bit"
#endif
#define rotr(v,n) (((v) >> (n)) | ((v) << (32 - (n))))
#define USE_JENKMULVEY1
#define h1(k) jenkmulvey1(k)
#define h2(k) wang(k)
#ifdef USE_JENKMULVEY1
/*
* http://home.comcast.net/~bretm/hash/3.html
* Bret Mulvey ascribes this to Bob Jenkins, but I can't find any
* reference to it by Jenkins himself.
*/
static uint32_t
jenkmulvey1(uint32_t n)
{
n += (n << 12);
n ^= (n >> 22);
n += (n << 4);
n ^= (n >> 9);
n += (n << 10);
n ^= (n >> 2);
n += (n << 7);
n ^= (n >> 12);
return(n);
}
#endif
#ifdef USE_JENKMULVEY2
/*
* http://home.comcast.net/~bretm/hash/4.html
* Mulvey's modification of the (alleged) Jenkins algorithm
*/
static uint32_t
jenkmulvey2(uint32_t n)
{
n += (n << 16);
n ^= (n >> 13);
n += (n << 4);
n ^= (n >> 7);
n += (n << 10);
n ^= (n >> 5);
n += (n << 8);
n ^= (n >> 16);
return(n);
}
#endif
/*
* http://www.cris.com/~Ttwang/tech/inthash.htm
*/
static uint32_t
wang(uint32_t n)
{
n = ~n + (n << 15); // n = (n << 15) - n - 1;
n ^= rotr(n,12);
n += (n << 2);
n ^= rotr(n,4);
n = (n + (n << 3)) + (n << 11);
n ^= rotr(n,16);
return n;
}
static void
hash_Cleanup(void)
{
free(htbl.entry);
}
/*
* all hash functions must only ever be called by the
* vsl reader thread!
*/
int
HASH_Init(void)
{
hashentry *entryptr;
int entries = 1 << config.maxopen_scale;
entryptr = (hashentry *) calloc(entries, sizeof(hashentry));
if (entryptr == NULL)
return(errno);
memset(&htbl, 0, sizeof(hashtable));
htbl.magic = HASHTABLE_MAGIC;
htbl.len = entries;
htbl.entry = entryptr;
VTAILQ_INIT(&htbl.insert_head);
htbl.max_probes = config.hash_max_probes;
htbl.ttl = config.hash_ttl;
htbl.mlt = config.hash_mlt;
/* entries init */
for (int i = 0; i < entries; i++) {
htbl.entry[i].magic = HASH_MAGIC;
htbl.entry[i].state = HASH_EMPTY;
}
atexit(hash_Cleanup);
return(0);
}
static inline void
hash_free(hashentry *he)
{
VTAILQ_REMOVE(&htbl.insert_head, he, insert_list);
he->state = HASH_EMPTY;
he->de = NULL;
htbl.open--;
}
static inline void
hash_submit(hashentry *he)
{
dataentry *de = he->de;
assert(he->xid == de->xid);
DATA_noMT_Submit(de);
}
static inline void
incomplete(hashentry *he)
{
dataentry *de;
de = he->de;
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
de->incomplete = true;
MON_StatsUpdate(STATS_DONE);
de->state = DATA_DONE;
}
void
HASH_Exp(float limit)
{
hashentry *he;
float p_insert_time = 0.0;
while ((he = VTAILQ_FIRST(&htbl.insert_head))) {
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
if (he->insert_time > limit)
return;
assert(p_insert_time <= he->insert_time);
p_insert_time = he->insert_time;
LOG_Log(LOG_DEBUG, "expire: hash=%u insert_time=%f limit=%f",
he->xid, he->insert_time, limit);
htbl.expired++;
incomplete(he);
hash_submit(he);
hash_free(he);
}
}
void
HASH_Submit(hashentry *he)
{
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
LOG_Log(LOG_DEBUG, "submit: hash=%u", he->xid);
hash_submit(he);
hash_free(he);
}
/* like Submit, but for recrods in HASH_OPEN */
void
HASH_Evacuate(hashentry *he)
{
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
LOG_Log(LOG_DEBUG, "evacuate: hash=%u insert_time=%f",
he->xid, he->insert_time);
htbl.evacuated++;
incomplete(he);
hash_submit(he);
hash_free(he);
}
hashentry
*HASH_Insert(const unsigned xid, dataentry *de, const float t)
{
hashentry *he, *oldest;
unsigned probes = 0;
uint32_t h = h1(xid);
const uint32_t h2 = h2(xid);
he = &htbl.entry[INDEX(h)];
if (he->state == HASH_EMPTY)
goto ok;
htbl.collisions++;
oldest = he;
do {
h += h2;
he = &htbl.entry[INDEX(h)];
probes++;
if (he->state == HASH_EMPTY)
goto ok;
if (he->insert_time < oldest->insert_time)
oldest = he;
} while (probes <= htbl.max_probes);
/* none eligable for evacuation */
if ((oldest->insert_time + htbl.mlt) > t) {
htbl.fail++;
htbl.insert_probes += probes;
return (NULL);
}
HASH_Evacuate(oldest);
he = oldest;
ok:
htbl.insert_probes += probes;
he->state = HASH_OPEN;
he->xid = xid;
he->insert_time = t;
VTAILQ_INSERT_TAIL(&htbl.insert_head, he, insert_list);
he->de = de;
/* stats */
htbl.open++;
if (htbl.open > htbl.occ_hi)
htbl.occ_hi = htbl.open;
if (htbl.open > htbl.occ_hi_this)
htbl.occ_hi_this = htbl.open;
return(he);
}
hashentry
*HASH_Find(const unsigned xid)
{
hashentry *he;
unsigned probes = 0;
uint32_t h = h1(xid);
const uint32_t h2 = h2(xid);
he = &htbl.entry[INDEX(h)];
if (he->xid == xid)
return (he);
do {
h += h2;
he = &htbl.entry[INDEX(h)];
probes++;
if (he->xid == xid)
break;
} while (probes <= htbl.max_probes);
htbl.find_probes += probes;
if (probes > htbl.max_probes)
return NULL;
return (he);
}
void
HASH_Dump1(hashentry *entry, int i)
{
if (entry->state == HASH_EMPTY)
return;
LOG_Log(LOG_INFO, "Hash entry %d: XID=%d",
i, entry->xid);
DATA_Dump1(entry->de, 0);
assert(entry->xid == entry->de->xid);
}
void
HASH_Dump(void)
{
for (int i = 0; i < htbl.len; i++)
HASH_Dump1(&htbl.entry[i], i);
}
......@@ -44,15 +44,78 @@ static void
log_output(void)
{
LOG_Log(LOG_INFO,
"Data table: len=%u collisions=%u insert_probes=%u find_probes=%u "
"open=%u done=%u load=%.2f len_overflows=%u data_overflows=%u "
"occ_hi=%u seen=%u submitted=%u nodata=%u sent=%u failed=%u "
"wait_qfull=%u data_hi=%u",
tbl.len, tbl.collisions, tbl.insert_probes, tbl.find_probes,
tbl.open, tbl.done, 100.0 * ((float) tbl.open + tbl.done) / tbl.len,
tbl.len_overflows, tbl.data_overflows, tbl.occ_hi, tbl.seen,
tbl.submitted, tbl.nodata, tbl.sent, tbl.failed, tbl.wait_qfull,
tbl.data_hi);
"Hash table: "
"len=%u "
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
htbl.len,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
100.0 * htbl.open / htbl.len,
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.occ_hi_this = 0;
LOG_Log(LOG_INFO,
"Data table writer: "
"len=%u "
"nodata=%u "
"submitted=%u "
"wait_qfull=%u "
"wait_room=%u "
"data_hi=%u "
"data_overflows=%u ",
dtbl.len,
dtbl.w_stats.nodata,
dtbl.w_stats.submitted,
dtbl.w_stats.wait_qfull,
dtbl.w_stats.wait_room,
dtbl.w_stats.data_hi,
dtbl.w_stats.data_overflows);
LOG_Log(LOG_INFO,
"Data table reader: "
"done=%u "
"open=%u "
"load=%.2f "
"sent=%u "
"failed=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
dtbl.r_stats.done,
dtbl.r_stats.open,
(100.0 * (1.0 * dtbl.r_stats.done + 1.0 * dtbl.r_stats.open) / dtbl.len),
dtbl.r_stats.sent,
dtbl.r_stats.failed,
dtbl.r_stats.occ_hi,
dtbl.r_stats.occ_hi_this
);
/* locking would be overkill */
dtbl.r_stats.occ_hi_this = 0;
if (config.monitor_workers)
WRK_Stats();
}
......@@ -109,49 +172,52 @@ MON_StatusShutdown(pthread_t monitor)
run = 0;
AZ(pthread_cancel(monitor));
AZ(pthread_join(monitor, NULL));
AZ(pthread_mutex_destroy(&dtbl.r_stats.mutex));
}
void
MON_StatsInit(void)
{
AZ(pthread_mutex_init(&stats_update_lock, NULL));
AZ(pthread_mutex_init(&dtbl.r_stats.mutex, &attr_lock));
}
void
MON_StatsUpdate(stats_update_t update)
{
AZ(pthread_mutex_lock(&stats_update_lock));
AZ(pthread_mutex_lock(&dtbl.r_stats.mutex));
switch(update) {
case STATS_SENT:
tbl.sent++;
tbl.done--;
dtbl.r_stats.sent++;
dtbl.r_stats.done--;
break;
case STATS_FAILED:
tbl.failed++;
tbl.done--;
dtbl.r_stats.failed++;
dtbl.r_stats.done--;
break;
case STATS_DONE:
tbl.done++;
tbl.open--;
dtbl.r_stats.done++;
dtbl.r_stats.open--;
break;
case STATS_OCCUPANCY:
tbl.open++;
if (tbl.open + tbl.done > tbl.occ_hi)
tbl.occ_hi = tbl.open + tbl.done;
dtbl.r_stats.open++;
if (dtbl.r_stats.open + dtbl.r_stats.done > dtbl.r_stats.occ_hi)
dtbl.r_stats.occ_hi = dtbl.r_stats.open + dtbl.r_stats.done;
if (dtbl.r_stats.open + dtbl.r_stats.done > dtbl.r_stats.occ_hi_this)
dtbl.r_stats.occ_hi_this = dtbl.r_stats.open + dtbl.r_stats.done;
break;
case STATS_NODATA:
tbl.nodata++;
tbl.done--;
dtbl.w_stats.nodata++;
dtbl.r_stats.done--;
break;
default:
/* Unreachable */
AN(NULL);
}
AZ(pthread_mutex_unlock(&stats_update_lock));
AZ(pthread_mutex_unlock(&dtbl.r_stats.mutex));
}
......@@ -37,6 +37,7 @@
#include "trackrdrd.h"
#include "vas.h"
#include "vmb.h"
static pthread_mutex_t spmcq_deq_lock;
......@@ -48,6 +49,24 @@ spmcq_len(void)
return UINT_MAX - spmcq.head + 1 + spmcq.tail;
}
/*
* this is only approximately correct and could even become negative when values
* get updated while we read them!
*
*/
int SPMCQ_Len(void) {
unsigned l;
do {
l = spmcq_len();
if (l <= spmcq.mask + 1)
break;
VRMB();
} while (1);
return (l);
}
static void
spmcq_cleanup(void)
{
......@@ -60,7 +79,7 @@ SPMCQ_Init(void)
{
void *buf;
size_t n = 1 << (MIN_TABLE_SCALE + config.maxopen_scale);
size_t n = 1 << config.maxdone_scale;
buf = calloc(n, sizeof(void *));
if (buf == NULL)
return(errno);
......@@ -108,7 +127,7 @@ main(int argc, char * const *argv)
printf("\nTEST: %s\n", argv[0]);
printf("... test SMPCQ enqueue at UINT_MAX overflow\n");
config.maxopen_scale = 0;
config.maxopen_scale = 10;
SPMCQ_Init();
spmcq.head = spmcq.tail = UINT_MAX - 2;
......@@ -120,6 +139,7 @@ main(int argc, char * const *argv)
assert(SPMCQ_Enq(NULL));
assert(SPMCQ_Enq(NULL));
assert(spmcq_len() == 7);
assert(SPMCQ_Len() == 7);
printf("%s: 1 test run\n", argv[0]);
exit(0);
......
......@@ -4,7 +4,9 @@
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Portions adopted from varnishlog.c from the Varnish project
* Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
* Copyright (c) 2006 Verdens Gang AS
......@@ -101,68 +103,142 @@ static struct sigaction terminate_action, dump_action, ignore_action,
static char cli_config_filename[BUFSIZ] = "";
#ifdef WITHOUT_ASSERTS
#define entry_assert(e, cond) do { (void)(e);(void)(cond);} while(0)
#else /* WITH_ASSERTS */
#define entry_assert(e, cond) \
do { \
if (!(cond)) \
entry_assert_failure(__func__, __FILE__, __LINE__, #cond, (e), errno, 0); \
} while (0)
static void assert_failure(const char *func, const char *file, int line, const char *cond,
int err, int xxx);
static void
entry_assert_failure(const char *func, const char *file, int line, const char *cond,
hashentry *he, int err, int xxx)
{
dataentry *de = he->de;
LOG_Log(LOG_ALERT, "Hashentry %p magic %0x state %u xid %u insert_time %f de %p",
(he), (he)->magic, (he)->state, (he)->xid, (he)->insert_time, (he)->de);
if (de)
LOG_Log(LOG_ALERT, "Dataentry %p magic %0x state %u xid %u tid %u end %u",
(de), (de)->magic, (de)->state, (de)->xid, (de)->tid, (de)->end);
else
LOG_Log(LOG_ALERT, "Dataentry %p NULL!", (de));
assert_failure(func, file, line, cond, err, xxx);
}
#endif
/*--------------------------------------------------------------------*/
static void
submit(unsigned xid)
assert_failure(const char *func, const char *file, int line, const char *cond,
int err, int xxx)
{
dataentry *entry;
entry = DATA_Find(xid);
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", entry->end, entry->data);
(void) xxx;
if (! entry->hasdata) {
entry->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
return;
LOG_Log(LOG_ALERT, "Condition (%s) failed in %s(), %s line %d",
cond, func, file, line);
if (err)
LOG_Log(LOG_ALERT, "errno = %d (%s)", err, strerror(err));
abort();
}
static inline void
check_entry(hashentry *he, unsigned xid, unsigned tid)
{
dataentry *de;
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry_assert(he, he->xid == xid);
entry_assert(he, he->state == HASH_OPEN);
de = he->de;
entry_assert(he, de != NULL);
entry_assert(he, de->magic == DATA_MAGIC);
entry_assert(he, de->xid == xid);
entry_assert(he, de->tid == tid);
}
static void
stacktrace(void)
{
void *buf[MAX_STACK_DEPTH];
int depth, i;
char **strings;
depth = backtrace (buf, MAX_STACK_DEPTH);
if (depth == 0) {
LOG_Log0(LOG_ERR, "Stacktrace empty");
return;
}
while (!SPMCQ_Enq((void *) entry)) {
tbl.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
AZ(pthread_mutex_lock(&spmcq_nonfull_lock));
AZ(pthread_cond_wait(&spmcq_nonfull_cond, &spmcq_nonfull_lock));
strings = backtrace_symbols(buf, depth);
if (strings == NULL) {
LOG_Log0(LOG_ERR, "Cannot retrieve symbols for stacktrace");
return;
}
AZ(pthread_cond_signal(&spmcq_nonempty_cond));
tbl.submitted++;
/* XXX: get symbol names from nm? cf. cache_panic.c/pan_backtrace */
for (i = 0; i < depth; i++)
LOG_Log(LOG_ERR, "%s", strings[i]);
free(strings);
}
static inline dataentry
*find(unsigned xid, enum VSL_tag_e tag, unsigned fd, const char *ptr,
unsigned len)
static void
stacktrace_abort(int sig)
{
dataentry *entry;
LOG_Log(LOG_ALERT, "Received signal %d (%s), stacktrace follows", sig,
strsignal(sig));
stacktrace();
AZ(sigaction(SIGABRT, &default_action, NULL));
LOG_Log0(LOG_ALERT, "Aborting");
abort();
}
/*--------------------------------------------------------------------*/
entry = DATA_Find(xid);
if (entry == NULL) {
if (!term)
LOG_Log(LOG_WARNING,
"%s: XID %d not found, fd=%d, DISCARDING [%.*s]",
VSL_tags[tag], xid, fd, len, ptr);
return NULL;
}
CHECK_OBJ(entry, DATA_MAGIC);
assert(entry->xid == xid);
assert(entry->tid == fd);
assert(entry->state == DATA_OPEN);
static inline dataentry
*insert(unsigned xid, unsigned fd, float tim)
{
dataentry *de = DATA_noMT_Get();
hashentry *he = HASH_Insert(xid, de, tim);
if (! he) {
LOG_Log(LOG_WARNING, "Insert: Could not insert hash for XID %d",
xid);
DATA_noMT_Free(de);
return (NULL);
}
return entry;
/* he being filled out by Hash_Insert, we need to look after de */
de->xid = xid;
de->state = DATA_OPEN;
de->tid = fd;
de->hasdata = false;
sprintf(de->data, "XID=%d", xid);
de->end = strlen(de->data);
if (de->end > dtbl.w_stats.data_hi)
dtbl.w_stats.data_hi = de->end;
MON_StatsUpdate(STATS_OCCUPANCY);
return (de);
}
static inline void
append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
int datalen)
{
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
/* Data overflow */
/* XXX: Encapsulate (1 << (config.maxdata_scale+10)) */
if (entry->end + datalen + 1 > (1 << (config.maxdata_scale+10))) {
if (entry->end + datalen + 1 > (1 << (config.maxdata_scale))) {
LOG_Log(LOG_ALERT,
"%s: Data too long, XID=%d, current length=%d, "
"DISCARDING data=[%.*s]", VSL_tags[tag], xid, entry->end,
datalen, data);
tbl.data_overflows++;
dtbl.w_stats.data_overflows++;
return;
}
......@@ -170,25 +246,55 @@ append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
entry->end++;
memcpy(&entry->data[entry->end], data, datalen);
entry->end += datalen;
if (entry->end > tbl.data_hi)
tbl.data_hi = entry->end;
if (entry->end > dtbl.w_stats.data_hi)
dtbl.w_stats.data_hi = entry->end;
return;
}
/*
* rules for reading VSL:
*
* Under all circumstances do we need to avoid to fall behind reading the VSL:
* - if we miss ReqEnd, we will clobber our hash, which has a bunch of negative
* consequences:
* - hash lookups become inefficient
* - inserts become more likely to fail
* - before we had HASH_Exp, the hash would become useless
* - if the VSL wraps, we will see corrupt data
*
* so if we really cannot create an entry at ReqStart time, we need to thow
* it away, and process the next log/end records to make room
*
*/
static int
OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
unsigned spec, const char *ptr, uint64_t bitmap)
{
unsigned xid;
dataentry *entry;
hashentry *he;
dataentry *de;
int err, datalen;
char *data, reqend_str[strlen(REQEND_T_VAR)+22];
struct timespec reqend_t;
float tim, tim_exp_check = 0.0;
/* wrap detection statistics */
static const char *pptr = (const char *)UINTPTR_MAX;
static unsigned wrap_start_xid = 0;
static unsigned wrap_end_xid = 0;
static unsigned last_start_xid = 0;
static unsigned last_end_xid = 0;
static unsigned xid_spread_sum = 0;
static unsigned xid_spread_count = 0;
(void) priv;
(void) bitmap;
if (term && tbl.open == 0)
if (term && htbl.open == 0)
return 1;
/* spec != 'c' */
......@@ -198,34 +304,28 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
switch (tag) {
case SLT_ReqStart:
if (term) return(0);
tbl.seen++;
htbl.seen++;
err = Parse_ReqStart(ptr, len, &xid);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%d", VSL_tags[tag], xid);
LOG_Log(LOG_DEBUG, "%s: XID=%u", VSL_tags[tag], xid);
entry = DATA_Insert(xid);
if (entry == NULL) {
LOG_Log(LOG_ALERT,
"%s: Cannot insert data, XID=%d tid=%d DISCARDED",
VSL_tags[tag], xid, fd);
break;
}
CHECK_OBJ(entry, DATA_MAGIC);
entry->state = DATA_OPEN;
entry->xid = xid;
entry->tid = fd;
entry->hasdata = false;
sprintf(entry->data, "XID=%d", xid);
entry->end = strlen(entry->data);
if (entry->end > tbl.data_hi)
tbl.data_hi = entry->end;
MON_StatsUpdate(STATS_OCCUPANCY);
break;
if (xid > last_start_xid)
last_start_xid = xid;
tim = TIM_mono();
if (! insert(xid, fd, tim)) {
htbl.drop_reqstart++;
break;
}
/* configurable ? */
if ((tim - tim_exp_check) > 10) {
HASH_Exp(tim - htbl.ttl);
tim_exp_check = tim;
}
break;
case SLT_VCL_Log:
/* Skip VCL_Log entries without the "track " prefix. */
......@@ -235,33 +335,51 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
err = Parse_VCL_Log(&ptr[TRACKLOG_PREFIX_LEN], len-TRACKLOG_PREFIX_LEN,
&xid, &data, &datalen);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%d, data=[%.*s]", VSL_tags[tag],
LOG_Log(LOG_DEBUG, "%s: XID=%u, data=[%.*s]", VSL_tags[tag],
xid, datalen, data);
entry = find(xid, tag, fd, ptr, len);
if (entry == NULL)
break;
append(entry, tag, xid, data, datalen);
entry->hasdata = true;
he = HASH_Find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
htbl.drop_vcl_log++;
break;
}
check_entry(he, xid, fd);
de = he->de;
append(de, tag, xid, data, datalen);
de->hasdata = true;
break;
case SLT_ReqEnd:
err = Parse_ReqEnd(ptr, len, &xid, &reqend_t);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%d req_endt=%u.%09lu", VSL_tags[tag], xid,
LOG_Log(LOG_DEBUG, "%s: XID=%u req_endt=%u.%09lu", VSL_tags[tag], xid,
(unsigned) reqend_t.tv_sec, reqend_t.tv_nsec);
entry = find(xid, tag, fd, ptr, len);
if (entry == NULL)
break;
if (xid > last_end_xid)
last_end_xid = xid;
xid_spread_sum += (last_end_xid - last_start_xid);
xid_spread_count++;
he = HASH_Find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
htbl.drop_reqend++;
break;
}
check_entry(he, xid, fd);
de = he->de;
sprintf(reqend_str, "%s=%u.%09lu", REQEND_T_VAR,
(unsigned) reqend_t.tv_sec, reqend_t.tv_nsec);
append(entry, tag, xid, reqend_str, strlen(reqend_str));
entry->state = DATA_DONE;
append(de, tag, xid, reqend_str, strlen(reqend_str));
de->state = DATA_DONE;
MON_StatsUpdate(STATS_DONE);
submit(xid);
HASH_Submit(he);
break;
default:
......@@ -269,69 +387,40 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
AN(NULL);
return(1);
}
return(0);
}
/*--------------------------------------------------------------------*/
/* Handle for the PID file */
struct vpf_fh *pfh = NULL;
static void
read_default_config(void) {
if (access(DEFAULT_CONFIG, F_OK) == 0) {
if (access(DEFAULT_CONFIG, R_OK) != 0) {
perror(DEFAULT_CONFIG);
exit(EXIT_FAILURE);
}
printf("Reading config from %s\n", DEFAULT_CONFIG);
if (CONF_ReadFile(DEFAULT_CONFIG) != 0)
exit(EXIT_FAILURE);
/*
* log when the vsl ptr wraps, so we can relate lost records, if
* applicable
*/
if (ptr < pptr) {
LOG_Log(LOG_INFO, "VSL wrap at %u", xid);
if (wrap_start_xid) {
LOG_Log(LOG_INFO, "VSL wrap start xid %10u current %10u delta %10d",
wrap_start_xid, last_start_xid, (last_start_xid - wrap_start_xid));
LOG_Log(LOG_INFO, "VSL wrap end xid %10u current %10u delta %10d",
wrap_end_xid, last_end_xid, (last_end_xid - wrap_end_xid));
/* AAARRRGLLL, I confess: yes, I am calculating an average here */
LOG_Log(LOG_INFO, "VSL wrap xid spread is %u - avg xid spread is %f",
(last_start_xid - last_end_xid),
(1.0 * xid_spread_sum / xid_spread_count));
xid_spread_count = xid_spread_sum = 0;
}
wrap_start_xid = last_start_xid;
wrap_end_xid = last_end_xid;
}
}
pptr = ptr;
static void
assert_failure(const char *func, const char *file, int line, const char *cond,
int err, int xxx)
{
(void) xxx;
LOG_Log(LOG_ALERT, "Condition (%s) failed in %s(), %s line %d",
cond, func, file, line);
if (err)
LOG_Log(LOG_ALERT, "errno = %d (%s)", err, strerror(err));
abort();
return(0);
}
static void
stacktrace(void)
{
void *buf[MAX_STACK_DEPTH];
int depth, i;
char **strings;
depth = backtrace (buf, MAX_STACK_DEPTH);
if (depth == 0) {
LOG_Log0(LOG_ERR, "Stacktrace empty");
return;
}
strings = backtrace_symbols(buf, depth);
if (strings == NULL) {
LOG_Log0(LOG_ERR, "Cannot retrieve symbols for stacktrace");
return;
}
/* XXX: get symbol names from nm? cf. cache_panic.c/pan_backtrace */
for (i = 0; i < depth; i++)
LOG_Log(LOG_ERR, "%s", strings[i]);
free(strings);
}
/*--------------------------------------------------------------------*/
static void
dump(int sig)
{
(void) sig;
DATA_Dump();
HASH_Dump();
}
static void
......@@ -348,17 +437,23 @@ restart(int sig)
reload = 1;
}
/* Handle for the PID file */
struct vpf_fh *pfh = NULL;
static void
stacktrace_abort(int sig)
{
LOG_Log(LOG_ALERT, "Received signal %d (%s), stacktrace follows", sig,
strsignal(sig));
stacktrace();
AZ(sigaction(SIGABRT, &default_action, NULL));
LOG_Log0(LOG_ALERT, "Aborting");
abort();
read_default_config(void) {
if (access(DEFAULT_CONFIG, F_OK) == 0) {
if (access(DEFAULT_CONFIG, R_OK) != 0) {
perror(DEFAULT_CONFIG);
exit(EXIT_FAILURE);
}
printf("Reading config from %s\n", DEFAULT_CONFIG);
if (CONF_ReadFile(DEFAULT_CONFIG) != 0)
exit(EXIT_FAILURE);
}
}
static void
parent_shutdown(int status, pid_t child_pid)
{
......@@ -478,14 +573,30 @@ vsl_diag(void *priv, const char *fmt, ...)
va_end(ap);
}
static void
init_pthread_attrs(void)
{
AZ(pthread_mutexattr_init(&attr_lock));
AZ(pthread_condattr_init(&attr_cond));
// important to make mutex/cv efficient
AZ(pthread_mutexattr_setpshared(&attr_lock,
PTHREAD_PROCESS_PRIVATE));
AZ(pthread_condattr_setpshared(&attr_cond,
PTHREAD_PROCESS_PRIVATE));
}
static void
child_main(struct VSM_data *vd, int endless, int readconfig)
{
int errnum, nworkers;
int errnum;
const char *errmsg;
pthread_t monitor;
struct passwd *pw;
init_pthread_attrs();
MON_StatsInit();
LOG_Log0(LOG_INFO, "Worker process starting");
/* XXX: does not re-configure logging. Feature or bug? */
......@@ -519,6 +630,10 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
LOG_Log(LOG_ERR, "Cannot init data table: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if (HASH_Init() != 0) {
LOG_Log(LOG_ERR, "Cannot init hash table: %s", strerror(errno));
exit(EXIT_FAILURE);
}
VSM_Diag(vd, vsl_diag, NULL);
if (VSL_Open(vd, 1))
......@@ -557,8 +672,6 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
exit(EXIT_FAILURE);
}
MON_StatsInit();
/* Start worker threads */
WRK_Start();
nworkers = WRK_Running();
......@@ -576,6 +689,7 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
term = 0;
/* XXX: Varnish restart? */
/* XXX: TERM not noticed until request received */
DATA_noMT_Register();
while (VSL_Dispatch(vd, OSL_Track, NULL) > 0)
if (term || !endless)
break;
......@@ -745,7 +859,15 @@ main(int argc, char * const *argv)
AZ(sigemptyset(&restart_action.sa_mask));
restart_action.sa_flags &= ~SA_RESTART;
/* dont' get proper gdb backtraces with the handler in place */
#ifdef DISABLE_STACKTRACE
do {
void *foo;
foo = stacktrace_abort;
} while (0);
#else
stacktrace_action.sa_handler = stacktrace_abort;
#endif
ignore_action.sa_handler = SIG_IGN;
default_action.sa_handler = SIG_DFL;
......
......@@ -4,7 +4,8 @@
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
......@@ -34,8 +35,10 @@
#include <pthread.h>
#include <sys/types.h>
#include <time.h>
#include "vqueue.h"
#define MIN_TABLE_SCALE 10
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#define MAX(x, y) ((x) < (y) ? (y) : (x))
/* sandbox.c */
......@@ -73,16 +76,71 @@ spmcq_t spmcq;
int SPMCQ_Init(void);
bool SPMCQ_Enq(void *ptr);
void *SPMCQ_Deq(void);
int SPMCQ_Len(void);
#define spmcq_wait(what) \
do { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter--; \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} while (0)
/*
* the first test is not synced, so we might enter the if body too late or
* unnecessarily
*
* * too late: doesn't matter, will come back next time
* * unnecessarily: we'll find out now
*/
#define spmcq_signal(what) \
do { \
if (spmcq_##what##waiter) { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
if (spmcq_##what##waiter) \
AZ(pthread_cond_signal(&spmcq_##what##waiter_cond)); \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} \
} while (0)
/* Producer waits for this condition when the spmc queue is full.
Consumers signal this condition after dequeue. */
pthread_cond_t spmcq_nonfull_cond;
pthread_mutex_t spmcq_nonfull_lock;
pthread_cond_t spmcq_roomwaiter_cond;
pthread_mutex_t spmcq_roomwaiter_lock;
int spmcq_roomwaiter;
/* Consumers wait for this condition when the spmc queue is empty.
Producer signals this condition after enqueue. */
pthread_cond_t spmcq_nonempty_cond;
pthread_mutex_t spmcq_nonempty_lock;
pthread_cond_t spmcq_datawaiter_cond;
pthread_mutex_t spmcq_datawaiter_lock;
int spmcq_datawaiter;
/*
* should we wake up another worker?
*
* M = l / (u x p)
*
* l: arrival rate
* u: service rate
* p: utilization
*
* to get an optimal M, we would need to measure l and u, so to
* simplify, we just try to keep the number of workers proportional to
* the queue length
*
* wake up another worker if queue is sufficiently full
* Q_Len > working * qlen_goal / max_workers
*/
#define SPMCQ_need_worker(qlen, working, max_workers, qlen_goal) \
((qlen) > (working) * (qlen_goal) / max_workers)
/* stop workers when we have one more than we need */
#define SPMCQ_stop_worker(qlen, working, max_workers, qlen_goal) \
((qlen) < ((MAX(working,1)) - 1) * (qlen_goal) / max_workers)
/* mq.c */
const char *MQ_GlobalInit(void);
......@@ -100,48 +158,163 @@ typedef enum {
DATA_DONE
} data_state_e;
typedef struct {
unsigned magic;
struct dataentry_s {
unsigned magic;
#define DATA_MAGIC 0xb41cb1e1
data_state_e state;
unsigned xid;
unsigned tid; /* 'Thread ID', fd in the callback */
unsigned end; /* End of string index in data */
bool hasdata;
char *data;
} dataentry;
VSTAILQ_ENTRY(dataentry_s) freelist;
typedef struct {
unsigned magic;
#define DATATABLE_MAGIC 0xd3ef3bd4
const unsigned len;
unsigned collisions;
unsigned insert_probes;
unsigned find_probes;
unsigned seen; /* Records (ReqStarts) seen */
unsigned open;
unsigned done;
data_state_e state;
unsigned xid;
unsigned tid; /* 'Thread ID', fd in the callback */
unsigned end; /* End of string index in data */
bool hasdata;
bool incomplete; /* expired or evacuated */
char *data;
};
typedef struct dataentry_s dataentry;
VSTAILQ_HEAD(freehead_s, dataentry_s);
/* stats owned by VSL thread */
struct data_writer_stats_s {
unsigned nodata; /* Not submitted, no data */
unsigned submitted; /* Submitted to worker threads */
unsigned wait_qfull; /* Waits for SPMCQ - should not happen */
unsigned wait_room; /* waits for space in dtbl */
unsigned data_hi; /* max string length of entry->data */
#ifdef REMOVE
unsigned len_overflows;
#endif
unsigned data_overflows;
unsigned submitted; /* Submitted to worker threads */
unsigned nodata; /* Not submitted, no data */
};
/* stats protected by mutex */
struct data_reader_stats_s {
pthread_mutex_t mutex;
unsigned done;
unsigned open;
unsigned sent; /* Sent successfully to MQ */
unsigned failed; /* MQ send fails */
unsigned wait_qfull; /* Waits for SPMCQ */
unsigned occ_hi; /* Occupancy high water mark */
unsigned data_hi; /* Data high water mark */
dataentry *entry;
char *buf;
} datatable;
unsigned occ_hi_this; /* Occupancy high water mark this reporting interval*/
};
struct datatable_s {
unsigned magic;
#define DATATABLE_MAGIC 0xd3ef3bd4
unsigned len;
/* protected by freelist_lock */
struct freehead_s freehead;
pthread_mutex_t freelist_lock;
unsigned nfree;
dataentry *entry;
char *buf;
datatable tbl;
struct data_writer_stats_s w_stats;
struct data_reader_stats_s r_stats;
};
typedef struct datatable_s datatable;
datatable dtbl;
/* XXX: inline DATA_Insert and DATA_Find */
int DATA_Init(void);
dataentry *DATA_Insert(unsigned xid);
dataentry *DATA_Find(unsigned xid);
void DATA_Take_Freelist(struct freehead_s *dst);
void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
/*
* the noMT functions are _not_ MT-safe, so they can only be called
* from the registered thread
*/
void DATA_noMT_Register(void);
dataentry *DATA_noMT_Get(void);
void DATA_noMT_Free(dataentry *de);
void DATA_noMT_Submit(dataentry *de);
void DATA_Dump1(dataentry *entry, int i);
void DATA_Dump(void);
/* hash.c */
typedef enum {
HASH_EMPTY = 0,
/* OPEN when the main thread is filling data, ReqEnd not yet seen. */
HASH_OPEN
/* hashes become HASH_EMPTY for DATA_DONE */
} hash_state_e;
struct hashentry_s {
unsigned magic;
#define HASH_MAGIC 0xf8e12130
/* set in HASH_Insert */
hash_state_e state;
unsigned xid; /* == de->xid */
float insert_time;
VTAILQ_ENTRY(hashentry_s) insert_list;
dataentry *de;
};
typedef struct hashentry_s hashentry;
VTAILQ_HEAD(insert_head_s, hashentry_s);
struct hashtable_s {
unsigned magic;
#define HASHTABLE_MAGIC 0x89ea1d00
unsigned len;
hashentry *entry;
struct insert_head_s insert_head;
/* config */
unsigned max_probes;
float ttl; /* max age for a record */
float mlt; /* min life time */
/* == stats == */
unsigned seen; /* Records (ReqStarts) seen */
/*
* records we have dropped because of no hash, no data
* or no entry
*/
unsigned drop_reqstart;
unsigned drop_vcl_log;
unsigned drop_reqend;
unsigned expired;
unsigned evacuated;
unsigned open;
unsigned collisions;
unsigned insert_probes;
unsigned find_probes;
unsigned fail; /* failed to get record - no space */
unsigned occ_hi; /* Occupancy high water mark */
unsigned occ_hi_this; /* Occupancy high water mark this reporting interval*/
};
typedef struct hashtable_s hashtable;
hashtable htbl;
int HASH_Init(void);
void HASH_Exp(float limit);
void HASH_Submit(hashentry *he);
void HASH_Evacuate(hashentry *he);
hashentry *HASH_Insert(const unsigned xid, dataentry *de, const float t);
hashentry *HASH_Find(unsigned xid);
void HASH_Dump1(hashentry *entry, int i);
void HASH_Dump(void);
/* config.c */
#define EMPTY(s) (s[0] == '\0')
......@@ -155,8 +328,53 @@ struct config {
unsigned monitor_interval;
bool monitor_workers;
char processor_log[BUFSIZ];
unsigned maxopen_scale;
unsigned maxdata_scale;
/* scale: unit is log(2,n), iow scale is taken to the power of 2 */
unsigned maxopen_scale; /* max number of records in *_OPEN state */
#define MIN_MAXOPEN_SCALE 10
unsigned maxdone_scale; /* max number of records in *_DONE state */
#define MIN_MAXDONE_SCALE 10
unsigned maxdata_scale; /* scale for char data buffer */
#define MIN_MAXDATA_SCALE 10
/*
* scale of queue-length goal:
*
* we scale te number of running workers dynamically propotionally to the
* queue length.
*
* this scale (log(2,n)) specifies the queue length at which all workers
* should be running
*/
unsigned qlen_goal_scale;
#define DEF_QLEN_GOAL_SCALE 10
/* max number of probes for insert/lookup */
unsigned hash_max_probes;
#define DEF_HASH_MAX_PROBES 10
/*
* hash_ttl: max ttl for entries in HASH_OPEN
*
* entries which are older than this ttl _may_ get expired from the
* trackrdrd state.
*
* set to a value significantly longer than your maximum session lifetime in
* varnish.
*/
unsigned hash_ttl;
#define DEF_HASH_TTL 120
/*
* hash_mlt: min lifetime for entries in HASH_OPEN before they could get evacuated
*
* entries are guaranteed to remain in trackrdrd for this duration.
* once the mlt is reached, they _may_ get expired if trackrdrd needs space
* in the hash
*/
unsigned hash_mlt;
#define DEF_HASH_MTL 5
char mq_uri[BUFSIZ];
char mq_qname[BUFSIZ];
unsigned nworkers;
......@@ -210,9 +428,6 @@ void MON_StatusShutdown(pthread_t monitor);
void MON_StatsInit(void);
void MON_StatsUpdate(stats_update_t update);
/* Mutex for multi-threaded stats updates. */
pthread_mutex_t stats_update_lock;
/* parse.c */
int Parse_XID(const char *str, int len, unsigned *xid);
int Parse_ReqStart(const char *ptr, int len, unsigned *xid);
......@@ -221,3 +436,9 @@ int Parse_ReqEnd(const char *ptr, unsigned len, unsigned *xid,
int Parse_VCL_Log(const char *ptr, int len, unsigned *xid,
char **data, int *datalen);
/* generic init attributes */
pthread_mutexattr_t attr_lock;
pthread_condattr_t attr_cond;
/* globals */
extern int nworkers;
......@@ -4,7 +4,8 @@
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
......@@ -37,27 +38,47 @@
#include "trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
#include "vmb.h"
int nworkers = 0;
typedef enum {
WRK_NOTSTARTED = 0,
WRK_INITIALIZING,
WRK_RUNNING,
WRK_WAITING,
WRK_SHUTTINGDOWN,
WRK_EXITED
WRK_EXITED,
WRK_STATE_E_LIMIT
} wrk_state_e;
typedef struct {
static const char* statename[WRK_STATE_E_LIMIT] = {
[WRK_NOTSTARTED] = "not started",
[WRK_INITIALIZING] = "initializing",
[WRK_RUNNING] = "running",
[WRK_WAITING] = "waiting",
[WRK_SHUTTINGDOWN] = "shutting down",
[WRK_EXITED] = "exited"};
struct worker_data_s {
unsigned magic;
#define WORKER_DATA_MAGIC 0xd8eef137
unsigned id;
pthread_t tid;
unsigned status; /* exit status */
wrk_state_e state;
/* per-worker freelist - return space in chunks */
struct freehead_s wrk_freelist;
unsigned wrk_nfree;
/* stats */
unsigned deqs;
unsigned waits;
unsigned sends;
unsigned fails;
} worker_data_t;
};
typedef struct worker_data_s worker_data_t;
typedef struct {
pthread_t worker;
......@@ -66,8 +87,6 @@ typedef struct {
static unsigned run, cleaned = 0;
static thread_data_t *thread_data;
static const char* statename[5] = { "not started", "initializing", "running",
"shutting down", "exited" };
static inline void
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
......@@ -78,6 +97,7 @@ wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
assert(entry->state == DATA_DONE);
AN(amq_worker);
/* XXX: report entry->incomplete to backend ? */
err = MQ_Send(amq_worker, entry->data, entry->end);
if (err != NULL) {
/* XXX: error recovery? reconnect? preserve the data? */
......@@ -94,9 +114,16 @@ wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
entry->end, entry->data);
}
entry->state = DATA_EMPTY;
/* From Varnish vmb.h -- platform-independent write memory barrier */
VWMB();
AZ(pthread_cond_signal(&spmcq_nonfull_cond));
VSTAILQ_INSERT_TAIL(&wrk->wrk_freelist, entry, freelist);
wrk->wrk_nfree++;
if (dtbl.nfree == 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
assert(VSTAILQ_EMPTY(&wrk->wrk_freelist));
}
spmcq_signal(room);
}
static void
......@@ -110,7 +137,8 @@ static void
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING;
wrk->tid = pthread_self();
err = MQ_WorkerInit(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
......@@ -120,6 +148,9 @@ static void
pthread_exit((void *) wrk);
}
VSTAILQ_INIT(&wrk->wrk_freelist);
wrk->wrk_nfree = 0;
wrk->state = WRK_RUNNING;
while (run) {
......@@ -127,20 +158,49 @@ static void
if (entry != NULL) {
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
continue;
/* should we go to sleep ? */
if (SPMCQ_stop_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter), nworkers, (1 << config.qlen_goal_scale)))
goto sleep;
continue;
}
/* Queue is empty, wait until data are available, or quit is
signaled.
Grab the CV lock, which also constitutes an implicit memory
barrier */
AZ(pthread_mutex_lock(&spmcq_nonempty_lock));
/* run is guaranteed to be fresh here */
if (run) {
wrk->waits++;
AZ(pthread_cond_wait(&spmcq_nonempty_cond,
&spmcq_nonempty_lock));
sleep:
/* return space before sleeping */
if (wrk->wrk_nfree > 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
}
/*
* Queue is empty or we should backoff
*
* wait until data are available, or quit is signaled.
*
* Grab the CV lock, which also constitutes an implicit memory
* barrier
*/
AZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
/*
* run is guaranteed to be fresh here
*
* also re-check the stop condition under the lock
*/
if (run &&
((! entry) ||
SPMCQ_stop_worker(SPMCQ_Len(), (nworkers - spmcq_datawaiter),
nworkers, (1 << config.qlen_goal_scale)))) {
wrk->waits++;
spmcq_datawaiter++;
wrk->state = WRK_WAITING;
AZ(pthread_cond_wait(&spmcq_datawaiter_cond,
&spmcq_datawaiter_lock));
spmcq_datawaiter--;
wrk->state = WRK_RUNNING;
}
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
AZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
}
wrk->state = WRK_SHUTTINGDOWN;
......@@ -171,10 +231,10 @@ static void wrk_cleanup(void)
for (int i = 0; i < config.nworkers; i++)
free(thread_data[i].wrk_data);
free(thread_data);
AZ(pthread_mutex_destroy(&spmcq_nonempty_lock));
AZ(pthread_cond_destroy(&spmcq_nonempty_cond));
AZ(pthread_mutex_destroy(&spmcq_nonfull_lock));
AZ(pthread_cond_destroy(&spmcq_nonfull_cond));
AZ(pthread_mutex_destroy(&spmcq_datawaiter_lock));
AZ(pthread_cond_destroy(&spmcq_datawaiter_cond));
AZ(pthread_mutex_destroy(&spmcq_roomwaiter_lock));
AZ(pthread_cond_destroy(&spmcq_roomwaiter_cond));
cleaned = 1;
}
......@@ -183,6 +243,7 @@ WRK_Init(void)
{
thread_data
= (thread_data_t *) malloc(config.nworkers * sizeof(thread_data_t));
if (thread_data == NULL) {
LOG_Log(LOG_ALERT, "Cannot allocate thread data: %s", strerror(errno));
return(errno);
......@@ -204,11 +265,18 @@ WRK_Init(void)
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = 0;
wrk->state = WRK_NOTSTARTED;
}
AZ(pthread_mutex_init(&spmcq_nonempty_lock, NULL));
AZ(pthread_cond_init(&spmcq_nonempty_cond, NULL));
AZ(pthread_mutex_init(&spmcq_nonfull_lock, NULL));
AZ(pthread_cond_init(&spmcq_nonfull_cond, NULL));
spmcq_datawaiter = 0;
AZ(pthread_mutex_init(&spmcq_datawaiter_lock, &attr_lock));
AZ(pthread_cond_init(&spmcq_datawaiter_cond, &attr_cond));
spmcq_roomwaiter = 0;
AZ(pthread_mutex_init(&spmcq_roomwaiter_lock, &attr_lock));
AZ(pthread_cond_init(&spmcq_roomwaiter_cond, &attr_cond));
AZ(pthread_mutexattr_destroy(&attr_lock));
AZ(pthread_condattr_destroy(&attr_cond));
atexit(wrk_cleanup);
return 0;
}
......@@ -231,8 +299,8 @@ WRK_Stats(void)
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO, "Worker %d (%s): seen=%d waits=%d sent=%d failed=%d",
wrk->id, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends,
LOG_Log(LOG_INFO, "Worker %d tid %u (%s): seen=%d waits=%d sent=%d failed=%d",
wrk->id, wrk->tid, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends,
wrk->fails);
}
}
......@@ -248,7 +316,9 @@ WRK_Running(void)
wrk = thread_data[i].wrk_data;
if (wrk->state > WRK_INITIALIZING)
initialized++;
if (wrk->state == WRK_RUNNING || wrk->state == WRK_SHUTTINGDOWN)
if (wrk->state == WRK_RUNNING ||
wrk->state == WRK_SHUTTINGDOWN ||
wrk->state == WRK_WAITING)
running++;
}
if (initialized == config.nworkers)
......@@ -260,14 +330,14 @@ void
WRK_Halt(void)
{
/*
* must only modify run under spmcq_nonempty_lock to ensure that
* must only modify run under spmcq_datawaiter_lock to ensure that
* we signal all waiting consumers (otherwise a consumer could go
* waiting _after_ we have broadcasted and so miss the event.
*/
AZ(pthread_mutex_lock(&spmcq_nonempty_lock));
AZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
run = 0;
AZ(pthread_cond_broadcast(&spmcq_nonempty_cond));
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
AZ(pthread_cond_broadcast(&spmcq_datawaiter_cond));
AZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
for(int i = 0; i < config.nworkers; i++) {
AZ(pthread_join(thread_data[i].worker,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment