Commit 092db7ff authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd code reorg:

	- child process in child.c (including hashing code)
	- common signal handlers in handler.c
	- other code common to parent and child in trackrdrd.h & config.c
parent b855aedf
......@@ -10,7 +10,6 @@ trackrdrd_SOURCES = \
parse.c \
log.c \
config.c \
hash.c \
data.c \
monitor.c \
mq.c \
......@@ -18,7 +17,9 @@ trackrdrd_SOURCES = \
activemq/amq.cpp \
spmcq.c \
worker.c \
sandbox.c
sandbox.c \
child.c \
handler.c
trackrdrd_LDADD = \
$(VARNISHSRC)/lib/libvarnishcompat/libvarnishcompat.la \
......
......@@ -290,6 +290,20 @@ CONF_ReadFile(const char *file) {
return(0);
}
/* XXX: stdout is /dev/null in child process */
int
CONF_ReadDefault(void)
{
if (access(DEFAULT_CONFIG, F_OK) == 0) {
if (access(DEFAULT_CONFIG, R_OK) != 0)
return(errno);
printf("Reading config from %s\n", DEFAULT_CONFIG);
if (CONF_ReadFile(DEFAULT_CONFIG) != 0)
return -1;
}
return 0;
}
#define confdump(str,val) \
LOG_Log(LOG_DEBUG, "config: " str, (val))
......
......@@ -29,21 +29,25 @@
*
*/
PARENT(SIGTERM, terminate_action);
PARENT(SIGINT, terminate_action);
PARENT(SIGHUP, restart_action);
PARENT(SIGUSR1, restart_action);
PARENT(SIGUSR2, ignore_action);
PARENT(SIGHUP, restart_action);
#ifndef DISABLE_STACKTRACE
PARENT(SIGABRT, stacktrace_action);
PARENT(SIGSEGV, stacktrace_action);
PARENT(SIGBUS, stacktrace_action);
#endif
CHILD(SIGTERM, terminate_action);
CHILD(SIGINT, terminate_action);
CHILD(SIGUSR1, dump_action);
CHILD(SIGUSR2, ignore_action);
CHILD(SIGHUP, ignore_action);
#ifndef DISABLE_STACKTRACE
CHILD(SIGABRT, stacktrace_action);
CHILD(SIGSEGV, stacktrace_action);
CHILD(SIGBUS, stacktrace_action);
#endif
......@@ -52,7 +52,6 @@
#include <sys/wait.h>
#include <sys/types.h>
#include <pwd.h>
#include <limits.h>
#ifndef HAVE_EXECINFO_H
#include "compat/execinfo.h"
......@@ -73,16 +72,6 @@
#include "revision.h"
#include "usage.h"
#define TRACK_TAGS "ReqStart,VCL_log,ReqEnd"
#define DEFAULT_CONFIG "/etc/trackrdrd.conf"
/* XXX: should these be configurable ? */
#define TRACKLOG_PREFIX "track "
#define TRACKLOG_PREFIX_LEN (sizeof(TRACKLOG_PREFIX)-1)
#define MAX_STACK_DEPTH 100
#define REQEND_T_VAR "req_endt"
/* Hack, because we cannot have #ifdef in the macro definition SIGDISP */
#define _UNDEFINED(SIG) ((#SIG)[0] == 0)
#define UNDEFINED(SIG) _UNDEFINED(SIG)
......@@ -95,118 +84,18 @@
strerror(errno)); \
} while(0)
static void child_main(struct VSM_data *vd, int endless, int readconfig);
static volatile sig_atomic_t term, reload;
static struct sigaction terminate_action, dump_action, ignore_action,
stacktrace_action, default_action, restart_action;
static char cli_config_filename[BUFSIZ] = "";
static int wrk_running = 0;
/* Local freelist */
static struct freehead_s reader_freelist =
VSTAILQ_HEAD_INITIALIZER(reader_freelist);
typedef enum {
HASH_EMPTY = 0,
/* OPEN when the main thread is filling data, ReqEnd not yet seen. */
HASH_OPEN
/* hashes become HASH_EMPTY for DATA_DONE */
} hash_state_e;
struct hashentry_s {
unsigned magic;
#define HASH_MAGIC 0xf8e12130
/* set in HASH_Insert */
hash_state_e state;
unsigned xid; /* == de->xid */
float insert_time;
VTAILQ_ENTRY(hashentry_s) insert_list;
dataentry *de;
};
typedef struct hashentry_s hashentry;
VTAILQ_HEAD(insert_head_s, hashentry_s);
struct hashtable_s {
unsigned magic;
#define HASHTABLE_MAGIC 0x89ea1d00
unsigned len;
hashentry *entry;
struct insert_head_s insert_head;
/* config */
unsigned max_probes;
float ttl; /* max age for a record */
float mlt; /* min life time */
/* == stats == */
unsigned seen; /* Records (ReqStarts) seen */
/*
* records we have dropped because of no hash, no data
* or no entry
*/
unsigned drop_reqstart;
unsigned drop_vcl_log;
unsigned drop_reqend;
unsigned expired;
unsigned evacuated;
unsigned open;
unsigned collisions;
unsigned insert_probes;
unsigned find_probes;
unsigned fail; /* failed to get record - no space */
unsigned occ_hi; /* Occupancy high water mark */
unsigned occ_hi_this; /* Occupancy high water mark this reporting
interval*/
};
typedef struct hashtable_s hashtable;
static hashtable htbl;
static volatile sig_atomic_t reload;
#ifdef WITHOUT_ASSERTS
#define entry_assert(e, cond) do { (void)(e);(void)(cond);} while(0)
#else /* WITH_ASSERTS */
#define entry_assert(e, cond) \
do { \
if (!(cond)) \
entry_assert_failure(__func__, __FILE__, __LINE__, #cond, (e), \
errno, 0); \
} while (0)
static struct sigaction restart_action;
static void assert_failure(const char *func, const char *file, int line,
const char *cond, int err, int xxx);
/*--------------------------------------------------------------------*/
static void
entry_assert_failure(const char *func, const char *file, int line,
const char *cond, hashentry *he, int err, int xxx)
restart(int sig)
{
dataentry *de = he->de;
LOG_Log(LOG_ALERT,
"Hashentry %p magic %0x state %u xid %u insert_time %f de %p",
(he), (he)->magic, (he)->state, (he)->xid, (he)->insert_time, (he)->de);
if (de)
LOG_Log(LOG_ALERT,
"Dataentry %p magic %0x state %u xid %u tid %u end %u",
(de), (de)->magic, (de)->state, (de)->xid, (de)->tid, (de)->end);
else
LOG_Log(LOG_ALERT, "Dataentry %p NULL!", (de));
assert_failure(func, file, line, cond, err, xxx);
(void) sig;
reload = 1;
}
#endif
/*--------------------------------------------------------------------*/
static void
assert_failure(const char *func, const char *file, int line, const char *cond,
......@@ -221,719 +110,11 @@ assert_failure(const char *func, const char *file, int line, const char *cond,
abort();
}
static inline void
check_entry(hashentry *he, unsigned xid, unsigned tid)
{
dataentry *de;
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
entry_assert(he, he->xid == xid);
entry_assert(he, he->state == HASH_OPEN);
de = he->de;
entry_assert(he, de != NULL);
entry_assert(he, de->magic == DATA_MAGIC);
entry_assert(he, de->xid == xid);
entry_assert(he, de->tid == tid);
}
static void
stacktrace(void)
{
void *buf[MAX_STACK_DEPTH];
int depth, i;
char **strings;
depth = backtrace (buf, MAX_STACK_DEPTH);
if (depth == 0) {
LOG_Log0(LOG_ERR, "Stacktrace empty");
return;
}
strings = backtrace_symbols(buf, depth);
if (strings == NULL) {
LOG_Log0(LOG_ERR, "Cannot retrieve symbols for stacktrace");
return;
}
/* XXX: get symbol names from nm? cf. cache_panic.c/pan_backtrace */
for (i = 0; i < depth; i++)
LOG_Log(LOG_ERR, "%s", strings[i]);
free(strings);
}
static void
stacktrace_abort(int sig)
{
LOG_Log(LOG_ALERT, "Received signal %d (%s), stacktrace follows", sig,
strsignal(sig));
stacktrace();
AZ(sigaction(SIGABRT, &default_action, NULL));
LOG_Log0(LOG_ALERT, "Aborting");
abort();
}
/*--------------------------------------------------------------------*/
/* efficiently retrieve a single data entry */
static inline dataentry
*data_get(void)
{
dataentry *data;
while (VSTAILQ_EMPTY(&reader_freelist)) {
while (dtbl.nfree == 0) {
dtbl.w_stats.wait_room++;
spmcq_wait(room);
}
DATA_Take_Freelist(&reader_freelist);
}
data = VSTAILQ_FIRST(&reader_freelist);
VSTAILQ_REMOVE_HEAD(&reader_freelist, freelist);
assert(data->state == DATA_EMPTY);
return (data);
}
/* return to our own local cache */
static inline void
data_free(dataentry *de)
{
assert(de->state == DATA_EMPTY);
VSTAILQ_INSERT_HEAD(&reader_freelist, de, freelist);
}
static inline void
data_submit(dataentry *de)
{
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
assert(de->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", de->end, de->data);
if (de->hasdata == false) {
de->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
data_free(de);
return;
}
while (!SPMCQ_Enq((void *) de)) {
dtbl.w_stats.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
spmcq_wait(room);
}
dtbl.w_stats.submitted++;
/* should we wake up another worker? */
if (SPMCQ_NeedWorker(wrk_running))
spmcq_signal(data);
/*
* base case: wake up a worker if all are sleeping
*
* this is an un-synced access to spmcq_data_waiter, but
* if we don't wake them up now, we will next time around
*/
if (wrk_running == spmcq_datawaiter)
spmcq_signal(data);
}
/*--------------------------------------------------------------------*/
#define INDEX(u) ((u) & (htbl.len - 1))
/*
* N.B.: Hash functions defined for XIDs, which are declared in Varnish as
* unsigned int, assuming that they are 32 bit.
*/
#if UINT_MAX != UINT32_MAX
#error "Unsigned ints are not 32 bit"
#endif
#define rotr(v,n) (((v) >> (n)) | ((v) << (32 - (n))))
#define USE_JENKMULVEY1
#define h1(k) jenkmulvey1(k)
#define h2(k) wang(k)
#ifdef USE_JENKMULVEY1
/*
* http://home.comcast.net/~bretm/hash/3.html
* Bret Mulvey ascribes this to Bob Jenkins, but I can't find any
* reference to it by Jenkins himself.
*/
static uint32_t
jenkmulvey1(uint32_t n)
{
n += (n << 12);
n ^= (n >> 22);
n += (n << 4);
n ^= (n >> 9);
n += (n << 10);
n ^= (n >> 2);
n += (n << 7);
n ^= (n >> 12);
return(n);
}
#endif
#ifdef USE_JENKMULVEY2
/*
* http://home.comcast.net/~bretm/hash/4.html
* Mulvey's modification of the (alleged) Jenkins algorithm
*/
static uint32_t
jenkmulvey2(uint32_t n)
{
n += (n << 16);
n ^= (n >> 13);
n += (n << 4);
n ^= (n >> 7);
n += (n << 10);
n ^= (n >> 5);
n += (n << 8);
n ^= (n >> 16);
return(n);
}
#endif
/*
* http://www.cris.com/~Ttwang/tech/inthash.htm
*/
static uint32_t
wang(uint32_t n)
{
n = ~n + (n << 15); // n = (n << 15) - n - 1;
n ^= rotr(n,12);
n += (n << 2);
n ^= rotr(n,4);
n = (n + (n << 3)) + (n << 11);
n ^= rotr(n,16);
return n;
}
void
HASH_Stats(void)
{
LOG_Log(LOG_INFO,
"Hash table: "
"len=%u "
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
htbl.len,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
100.0 * htbl.open / htbl.len,
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.occ_hi_this = 0;
}
static void
hash_cleanup(void)
{
free(htbl.entry);
}
static int
hash_init(void)
{
hashentry *entryptr;
int entries = 1 << config.maxopen_scale;
entryptr = (hashentry *) calloc(entries, sizeof(hashentry));
if (entryptr == NULL)
return(errno);
memset(&htbl, 0, sizeof(hashtable));
htbl.magic = HASHTABLE_MAGIC;
htbl.len = entries;
htbl.entry = entryptr;
VTAILQ_INIT(&htbl.insert_head);
htbl.max_probes = config.hash_max_probes;
htbl.ttl = config.hash_ttl;
htbl.mlt = config.hash_mlt;
/* entries init */
for (int i = 0; i < entries; i++) {
htbl.entry[i].magic = HASH_MAGIC;
htbl.entry[i].state = HASH_EMPTY;
}
atexit(hash_cleanup);
return(0);
}
static inline void
hash_free(hashentry *he)
{
VTAILQ_REMOVE(&htbl.insert_head, he, insert_list);
he->state = HASH_EMPTY;
he->de = NULL;
htbl.open--;
}
static inline void
hash_submit(hashentry *he)
{
dataentry *de = he->de;
assert(he->xid == de->xid);
data_submit(de);
}
static inline void
incomplete(hashentry *he)
{
dataentry *de;
de = he->de;
CHECK_OBJ_NOTNULL(de, DATA_MAGIC);
de->incomplete = true;
MON_StatsUpdate(STATS_DONE);
de->state = DATA_DONE;
}
static void
hash_exp(float limit)
{
hashentry *he;
float p_insert_time = 0.0;
while ((he = VTAILQ_FIRST(&htbl.insert_head))) {
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
if (he->insert_time > limit)
return;
assert(p_insert_time <= he->insert_time);
p_insert_time = he->insert_time;
LOG_Log(LOG_DEBUG, "expire: hash=%u insert_time=%f limit=%f",
he->xid, he->insert_time, limit);
htbl.expired++;
incomplete(he);
hash_submit(he);
hash_free(he);
}
}
static inline void
submit(hashentry *he)
{
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
LOG_Log(LOG_DEBUG, "submit: hash=%u", he->xid);
hash_submit(he);
hash_free(he);
}
/* like Submit, but for recrods in HASH_OPEN */
static void
hash_evacuate(hashentry *he)
{
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
assert(he->state == HASH_OPEN);
LOG_Log(LOG_DEBUG, "evacuate: hash=%u insert_time=%f",
he->xid, he->insert_time);
htbl.evacuated++;
incomplete(he);
hash_submit(he);
hash_free(he);
}
static hashentry
*hash_insert(const unsigned xid, dataentry *de, const float t)
{
hashentry *he, *oldest;
unsigned probes = 0;
uint32_t h = h1(xid);
const uint32_t h2 = h2(xid);
he = &htbl.entry[INDEX(h)];
if (he->state == HASH_EMPTY)
goto ok;
htbl.collisions++;
oldest = he;
do {
h += h2;
he = &htbl.entry[INDEX(h)];
probes++;
if (he->state == HASH_EMPTY)
goto ok;
if (he->insert_time < oldest->insert_time)
oldest = he;
} while (probes <= htbl.max_probes);
/* none eligable for evacuation */
if ((oldest->insert_time + htbl.mlt) > t) {
htbl.fail++;
htbl.insert_probes += probes;
return (NULL);
}
hash_evacuate(oldest);
he = oldest;
ok:
htbl.insert_probes += probes;
he->state = HASH_OPEN;
he->xid = xid;
he->insert_time = t;
VTAILQ_INSERT_TAIL(&htbl.insert_head, he, insert_list);
he->de = de;
/* stats */
htbl.open++;
if (htbl.open > htbl.occ_hi)
htbl.occ_hi = htbl.open;
if (htbl.open > htbl.occ_hi_this)
htbl.occ_hi_this = htbl.open;
return(he);
}
static hashentry
*hash_find(const unsigned xid)
{
hashentry *he;
unsigned probes = 0;
uint32_t h = h1(xid);
const uint32_t h2 = h2(xid);
he = &htbl.entry[INDEX(h)];
if (he->xid == xid)
return (he);
do {
h += h2;
he = &htbl.entry[INDEX(h)];
probes++;
if (he->xid == xid)
break;
} while (probes <= htbl.max_probes);
htbl.find_probes += probes;
if (probes > htbl.max_probes)
return NULL;
return (he);
}
static void
hash_dump1(hashentry *entry, int i)
{
if (entry->state == HASH_EMPTY)
return;
LOG_Log(LOG_INFO, "Hash entry %d: XID=%d",
i, entry->xid);
DATA_Dump1(entry->de, 0);
assert(entry->xid == entry->de->xid);
}
static void
hash_dump(void)
{
for (int i = 0; i < htbl.len; i++)
hash_dump1(&htbl.entry[i], i);
}
/*--------------------------------------------------------------------*/
static inline dataentry
*insert(unsigned xid, unsigned fd, float tim)
{
dataentry *de = data_get();
hashentry *he = hash_insert(xid, de, tim);
if (! he) {
LOG_Log(LOG_WARNING, "Insert: Could not insert hash for XID %d",
xid);
data_free(de);
return (NULL);
}
/* he being filled out by Hash_Insert, we need to look after de */
de->xid = xid;
de->state = DATA_OPEN;
de->tid = fd;
de->hasdata = false;
sprintf(de->data, "XID=%d", xid);
de->end = strlen(de->data);
if (de->end > dtbl.w_stats.data_hi)
dtbl.w_stats.data_hi = de->end;
MON_StatsUpdate(STATS_OCCUPANCY);
return (de);
}
static inline void
append(dataentry *entry, enum VSL_tag_e tag, unsigned xid, char *data,
int datalen)
{
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
/* Data overflow */
if (entry->end + datalen + 1 > (1 << (config.maxdata_scale))) {
LOG_Log(LOG_ALERT,
"%s: Data too long, XID=%d, current length=%d, "
"DISCARDING data=[%.*s]", VSL_tags[tag], xid, entry->end,
datalen, data);
dtbl.w_stats.data_overflows++;
return;
}
entry->data[entry->end] = '&';
entry->end++;
memcpy(&entry->data[entry->end], data, datalen);
entry->end += datalen;
if (entry->end > dtbl.w_stats.data_hi)
dtbl.w_stats.data_hi = entry->end;
return;
}
/*
* rules for reading VSL:
*
* Under all circumstances do we need to avoid to fall behind reading the VSL:
* - if we miss ReqEnd, we will clobber our hash, which has a bunch of negative
* consequences:
* - hash lookups become inefficient
* - inserts become more likely to fail
* - before we had hash_Exp, the hash would become useless
* - if the VSL wraps, we will see corrupt data
*
* so if we really cannot create an entry at ReqStart time, we need to thow
* it away, and process the next log/end records to make room
*
*/
static int
OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
unsigned spec, const char *ptr, uint64_t bitmap)
{
unsigned xid;
hashentry *he;
dataentry *de;
int err, datalen;
char *data, reqend_str[strlen(REQEND_T_VAR)+22];
struct timespec reqend_t;
float tim, tim_exp_check = 0.0;
/* wrap detection statistics */
static const char *pptr = (const char *)UINTPTR_MAX;
static unsigned wrap_start_xid = 0;
static unsigned wrap_end_xid = 0;
static unsigned last_start_xid = 0;
static unsigned last_end_xid = 0;
static unsigned xid_spread_sum = 0;
static unsigned xid_spread_count = 0;
(void) priv;
(void) bitmap;
if (term && htbl.open == 0)
return 1;
if (wrk_running < config.nworkers) {
wrk_running = WRK_Running();
if (wrk_running < config.nworkers)
LOG_Log(LOG_ALERT, "%d of %d workers running", wrk_running,
config.nworkers);
}
/* spec != 'c' */
if ((spec & VSL_S_CLIENT) == 0)
LOG_Log(LOG_WARNING, "%s: Client bit ('c') not set [%.*s]",
VSL_tags[tag], len, ptr);
switch (tag) {
case SLT_ReqStart:
if (term) return(0);
htbl.seen++;
err = Parse_ReqStart(ptr, len, &xid);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%u", VSL_tags[tag], xid);
if (xid > last_start_xid)
last_start_xid = xid;
tim = TIM_mono();
if (! insert(xid, fd, tim)) {
htbl.drop_reqstart++;
break;
}
/* configurable ? */
if ((tim - tim_exp_check) > 10) {
hash_exp(tim - htbl.ttl);
tim_exp_check = tim;
}
break;
case SLT_VCL_Log:
/* Skip VCL_Log entries without the "track " prefix. */
if (strncmp(ptr, TRACKLOG_PREFIX, TRACKLOG_PREFIX_LEN) != 0)
break;
err = Parse_VCL_Log(&ptr[TRACKLOG_PREFIX_LEN], len-TRACKLOG_PREFIX_LEN,
&xid, &data, &datalen);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%u, data=[%.*s]", VSL_tags[tag],
xid, datalen, data);
he = hash_find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
htbl.drop_vcl_log++;
break;
}
check_entry(he, xid, fd);
de = he->de;
append(de, tag, xid, data, datalen);
de->hasdata = true;
break;
case SLT_ReqEnd:
err = Parse_ReqEnd(ptr, len, &xid, &reqend_t);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%u req_endt=%u.%09lu", VSL_tags[tag], xid,
(unsigned) reqend_t.tv_sec, reqend_t.tv_nsec);
if (xid > last_end_xid)
last_end_xid = xid;
xid_spread_sum += (last_end_xid - last_start_xid);
xid_spread_count++;
he = hash_find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
htbl.drop_reqend++;
break;
}
check_entry(he, xid, fd);
de = he->de;
sprintf(reqend_str, "%s=%u.%09lu", REQEND_T_VAR,
(unsigned) reqend_t.tv_sec, reqend_t.tv_nsec);
append(de, tag, xid, reqend_str, strlen(reqend_str));
de->state = DATA_DONE;
MON_StatsUpdate(STATS_DONE);
submit(he);
break;
default:
/* Unreachable */
AN(NULL);
return(1);
}
/*
* log when the vsl ptr wraps, so we can relate lost records, if
* applicable
*/
if (ptr < pptr) {
LOG_Log(LOG_INFO, "VSL wrap at %u", xid);
if (wrap_start_xid) {
LOG_Log(LOG_INFO, "VSL wrap start xid %10u current %10u delta %10d",
wrap_start_xid, last_start_xid, (last_start_xid - wrap_start_xid));
LOG_Log(LOG_INFO, "VSL wrap end xid %10u current %10u delta %10d",
wrap_end_xid, last_end_xid, (last_end_xid - wrap_end_xid));
/* AAARRRGLLL, I confess: yes, I am calculating an average here */
LOG_Log(LOG_INFO, "VSL wrap xid spread is %u - avg xid spread is %f",
(last_start_xid - last_end_xid),
(1.0 * xid_spread_sum / xid_spread_count));
xid_spread_count = xid_spread_sum = 0;
}
wrap_start_xid = last_start_xid;
wrap_end_xid = last_end_xid;
}
pptr = ptr;
return(0);
}
/*--------------------------------------------------------------------*/
static void
dump(int sig)
{
(void) sig;
hash_dump();
}
static void
terminate(int sig)
{
(void) sig;
term = 1;
}
static void
restart(int sig)
{
(void) sig;
reload = 1;
}
/* Handle for the PID file */
struct vpf_fh *pfh = NULL;
static void
read_default_config(void) {
if (access(DEFAULT_CONFIG, F_OK) == 0) {
if (access(DEFAULT_CONFIG, R_OK) != 0) {
perror(DEFAULT_CONFIG);
exit(EXIT_FAILURE);
}
printf("Reading config from %s\n", DEFAULT_CONFIG);
if (CONF_ReadFile(DEFAULT_CONFIG) != 0)
exit(EXIT_FAILURE);
}
}
static void
parent_shutdown(int status, pid_t child_pid)
{
......@@ -973,7 +154,7 @@ child_restart(pid_t child_pid, struct VSM_data *vd, int endless, int readconfig)
parent_shutdown(EXIT_FAILURE, child_pid);
}
else if (child_pid == 0)
child_main(vd, endless, readconfig);
CHILD_Main(vd, endless, readconfig);
return child_pid;
}
......@@ -1040,157 +221,6 @@ parent_main(pid_t child_pid, struct VSM_data *vd, int endless)
}
}
/* Matches typedef VSM_diag_f in include/varnishapi.h
Log error messages from VSL_Open and VSL_Arg */
static void
vsl_diag(void *priv, const char *fmt, ...)
{
(void) priv;
va_list ap;
va_start(ap, fmt);
logconf.log(LOG_ERR, fmt, ap);
va_end(ap);
}
static void
init_pthread_attrs(void)
{
AZ(pthread_mutexattr_init(&attr_lock));
AZ(pthread_condattr_init(&attr_cond));
// important to make mutex/cv efficient
AZ(pthread_mutexattr_setpshared(&attr_lock,
PTHREAD_PROCESS_PRIVATE));
AZ(pthread_condattr_setpshared(&attr_cond,
PTHREAD_PROCESS_PRIVATE));
}
static void
child_main(struct VSM_data *vd, int endless, int readconfig)
{
int errnum;
const char *errmsg;
pthread_t monitor;
struct passwd *pw;
init_pthread_attrs();
MON_StatsInit();
LOG_Log0(LOG_INFO, "Worker process starting");
/* XXX: does not re-configure logging. Feature or bug? */
if (readconfig) {
LOG_Log0(LOG_INFO, "Re-reading config");
CONF_Init();
read_default_config();
if (! EMPTY(cli_config_filename))
LOG_Log(LOG_INFO, "Reading config from %s", cli_config_filename);
/* XXX: CONF_ReadFile prints err messages to stderr */
if (CONF_ReadFile(cli_config_filename) != 0) {
LOG_Log(LOG_ERR, "Error reading config from %s",
cli_config_filename);
exit(EXIT_FAILURE);
}
}
PRIV_Sandbox();
pw = getpwuid(geteuid());
AN(pw);
LOG_Log(LOG_INFO, "Running as %s", pw->pw_name);
/* install signal handlers */
#define CHILD(SIG,disp) SIGDISP(SIG,disp)
#define PARENT(SIG,disp) ((void) 0)
#include "signals.h"
#undef PARENT
#undef CHILD
if (DATA_Init() != 0) {
LOG_Log(LOG_ERR, "Cannot init data table: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if (hash_init() != 0) {
LOG_Log(LOG_ERR, "Cannot init hash table: %s", strerror(errno));
exit(EXIT_FAILURE);
}
VSM_Diag(vd, vsl_diag, NULL);
if (VSL_Open(vd, 1))
exit(EXIT_FAILURE);
/* Only read the VSL tags relevant to tracking */
assert(VSL_Arg(vd, 'i', TRACK_TAGS) > 0);
/* Start the monitor thread */
if (config.monitor_interval > 0.0) {
if (pthread_create(&monitor, NULL, MON_StatusThread,
(void *) &config.monitor_interval) != 0) {
LOG_Log(LOG_ERR, "Cannot start monitoring thread: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
}
else
LOG_Log0(LOG_INFO, "Monitoring thread not running");
errmsg = MQ_GlobalInit();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize message broker access: %s", errmsg);
exit(EXIT_FAILURE);
}
errnum = WRK_Init();
if (errnum != 0) {
LOG_Log(LOG_ERR, "Cannot prepare worker threads: %s",
strerror(errnum));
exit(EXIT_FAILURE);
}
if ((errnum = SPMCQ_Init()) != 0) {
LOG_Log(LOG_ERR, "Cannot initialize internal worker queue: %s",
strerror(errnum));
exit(EXIT_FAILURE);
}
if (config.nworkers > 0) {
WRK_Start();
/* XXX: wrk_wait & sleep interval configurable */
int wrk_wait = 0;
while ((wrk_running = WRK_Running()) == 0) {
if (wrk_wait++ > 10) {
LOG_Log0(LOG_ALERT,
"Worker threads not starting, shutting down");
exit(EXIT_FAILURE);
}
TIM_sleep(1);
}
LOG_Log(LOG_INFO, "%d worker threads running", wrk_running);
}
else
LOG_Log0(LOG_INFO, "Worker threads not running");
/* Main loop */
term = 0;
/* XXX: Varnish restart? */
/* XXX: TERM not noticed until request received */
while (VSL_Dispatch(vd, OSL_Track, NULL) > 0)
if (term || !endless)
break;
else {
LOG_Log0(LOG_WARNING, "Log read interrupted, continuing");
continue;
}
WRK_Halt();
WRK_Shutdown();
AZ(MQ_GlobalShutdown());
if (config.monitor_interval > 0.0)
MON_StatusShutdown(monitor);
LOG_Log0(LOG_INFO, "Worker process exiting");
LOG_Close();
exit(EXIT_SUCCESS);
}
static void
usage(int status)
{
......@@ -1211,7 +241,13 @@ main(int argc, char * const *argv)
VSL_Setup(vd);
CONF_Init();
read_default_config();
if ((err = CONF_ReadDefault()) != 0) {
if (err != -1)
LOG_Log(LOG_ALERT, "Cannot read %s: %s", DEFAULT_CONFIG,
strerror(err));
exit(EXIT_FAILURE);
}
cli_config_filename[0] = '\0';
while ((c = getopt(argc, argv, "u:P:Vn:hl:df:y:c:D")) != -1) {
switch (c) {
......@@ -1330,27 +366,15 @@ main(int argc, char * const *argv)
if (pfh != NULL)
VPF_Write(pfh);
terminate_action.sa_handler = terminate;
terminate_action.sa_handler = HNDL_Terminate;
AZ(sigemptyset(&terminate_action.sa_mask));
terminate_action.sa_flags &= ~SA_RESTART;
dump_action.sa_handler = dump;
AZ(sigemptyset(&dump_action.sa_mask));
dump_action.sa_flags |= SA_RESTART;
restart_action.sa_handler = restart;
AZ(sigemptyset(&restart_action.sa_mask));
restart_action.sa_flags &= ~SA_RESTART;
/* dont' get proper gdb backtraces with the handler in place */
#ifdef DISABLE_STACKTRACE
do {
void *foo;
foo = stacktrace_abort;
} while (0);
#else
stacktrace_action.sa_handler = stacktrace_abort;
#endif
stacktrace_action.sa_handler = HNDL_Abort;
ignore_action.sa_handler = SIG_IGN;
default_action.sa_handler = SIG_DFL;
......@@ -1362,10 +386,10 @@ main(int argc, char * const *argv)
LOG_Log(LOG_ALERT,
"Cannot fork (%s), running as single process",
strerror(errno));
child_main(vd, endless, 0);
CHILD_Main(vd, endless, 0);
break;
case 0:
child_main(vd, endless, 0);
CHILD_Main(vd, endless, 0);
break;
default:
parent_main(child_pid, vd, endless);
......@@ -1374,6 +398,6 @@ main(int argc, char * const *argv)
}
else {
LOG_Log0(LOG_INFO, "Running as non-demon single process");
child_main(vd, endless, 0);
CHILD_Main(vd, endless, 0);
}
}
......@@ -35,11 +35,36 @@
#include <pthread.h>
#include <sys/types.h>
#include <time.h>
#include <signal.h>
#include "vqueue.h"
#include "varnishapi.h"
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#define MAX(x, y) ((x) < (y) ? (y) : (x))
/* handler.c */
/* Hack, because we cannot have #ifdef in the macro definition SIGDISP */
#define _UNDEFINED(SIG) ((#SIG)[0] == 0)
#define UNDEFINED(SIG) _UNDEFINED(SIG)
#define SIGDISP(SIG, action) \
do { if (UNDEFINED(SIG)) break; \
if (sigaction((SIG), (&action), NULL) != 0) \
LOG_Log(LOG_ALERT, \
"Cannot install handler for " #SIG ": %s", \
strerror(errno)); \
} while(0)
volatile sig_atomic_t term;
struct sigaction terminate_action, ignore_action, stacktrace_action,
default_action;
void HNDL_Abort(int sig);
void HNDL_Terminate(int sig);
/* sandbox.c */
void PRIV_Sandbox(void);
......@@ -63,12 +88,12 @@ void WRK_Shutdown(void);
/* Single producer multiple consumer bounded FIFO queue */
typedef struct {
unsigned magic;
unsigned magic;
#define SPMCQ_MAGIC 0xe9a5d0a8
const unsigned mask;
void **data;
volatile unsigned head;
volatile unsigned tail;
const unsigned mask;
void **data;
volatile unsigned head;
volatile unsigned tail;
} spmcq_t;
spmcq_t spmcq;
......@@ -80,14 +105,14 @@ bool SPMCQ_NeedWorker(int running);
bool SPMCQ_StopWorker(int running);
#define spmcq_wait(what) \
do { \
do { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter--; \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} while (0)
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter--; \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} while (0)
/*
* the first test is not synced, so we might enter the if body too late or
......@@ -98,14 +123,14 @@ bool SPMCQ_StopWorker(int running);
*/
#define spmcq_signal(what) \
do { \
if (spmcq_##what##waiter) { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
if (spmcq_##what##waiter) \
AZ(pthread_cond_signal(&spmcq_##what##waiter_cond)); \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} \
} while (0)
do { \
if (spmcq_##what##waiter) { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
if (spmcq_##what##waiter) \
AZ(pthread_cond_signal(&spmcq_##what##waiter_cond)); \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} \
} while (0)
/* Producer waits for this condition when the spmc queue is full.
Consumers signal this condition after dequeue. */
......@@ -206,88 +231,17 @@ void DATA_Dump1(dataentry *entry, int i);
void DATA_Dump(void);
/* trackrdrd.c */
void HASH_Stats(void);
#if 0
typedef enum {
HASH_EMPTY = 0,
/* OPEN when the main thread is filling data, ReqEnd not yet seen. */
HASH_OPEN
/* hashes become HASH_EMPTY for DATA_DONE */
} hash_state_e;
struct hashentry_s {
unsigned magic;
#define HASH_MAGIC 0xf8e12130
/* set in HASH_Insert */
hash_state_e state;
unsigned xid; /* == de->xid */
float insert_time;
VTAILQ_ENTRY(hashentry_s) insert_list;
dataentry *de;
};
typedef struct hashentry_s hashentry;
VTAILQ_HEAD(insert_head_s, hashentry_s);
struct hashtable_s {
unsigned magic;
#define HASHTABLE_MAGIC 0x89ea1d00
unsigned len;
hashentry *entry;
struct insert_head_s insert_head;
/* config */
unsigned max_probes;
float ttl; /* max age for a record */
float mlt; /* min life time */
/* == stats == */
unsigned seen; /* Records (ReqStarts) seen */
/*
* records we have dropped because of no hash, no data
* or no entry
*/
unsigned drop_reqstart;
unsigned drop_vcl_log;
unsigned drop_reqend;
unsigned expired;
unsigned evacuated;
unsigned open;
unsigned collisions;
unsigned insert_probes;
unsigned find_probes;
unsigned fail; /* failed to get record - no space */
unsigned occ_hi; /* Occupancy high water mark */
unsigned occ_hi_this; /* Occupancy high water mark this reporting interval*/
};
typedef struct hashtable_s hashtable;
hashtable htbl;
int HASH_Init(void);
void HASH_Exp(float limit);
void HASH_Submit(hashentry *he);
void HASH_Evacuate(hashentry *he);
hashentry *HASH_Insert(const unsigned xid, dataentry *de, const float t);
hashentry *HASH_Find(unsigned xid);
void HASH_Dump1(hashentry *entry, int i);
void HASH_Dump(void);
#endif
/* child.c */
void CHILD_Main(struct VSM_data *vd, int endless, int readconfig);
/* config.c */
#define EMPTY(s) (s[0] == '\0')
#define DEFAULT_CONFIG "/etc/trackrdrd.conf"
char cli_config_filename[BUFSIZ];
struct config {
char pid_file[BUFSIZ];
char varnish_name[BUFSIZ];
......@@ -357,6 +311,7 @@ struct config {
void CONF_Init(void);
int CONF_Add(const char *lval, const char *rval);
int CONF_ReadFile(const char *file);
int CONF_ReadDefault(void);
void CONF_Dump(void);
/* log.c */
......@@ -409,6 +364,3 @@ int Parse_VCL_Log(const char *ptr, int len, unsigned *xid,
/* generic init attributes */
pthread_mutexattr_t attr_lock;
pthread_condattr_t attr_cond;
/* globals */
extern int nworkers;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment