Commit 5dc4e6c2 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: added an assertion failure callback and stats monitor for

worker threads
parent 23174001
......@@ -76,6 +76,7 @@ void
tbl.open, tbl.done, 100.0 * ((float) tbl.open + tbl.done) / tbl.len,
tbl.occ_hi, tbl.seen, tbl.submitted, tbl.sent, tbl.failed,
tbl.wait_qfull, tbl.data_hi);
WRK_Stats();
}
LOG_Log0(LOG_INFO, "Monitoring thread exiting");
......
......@@ -262,15 +262,26 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
struct vpf_fh *pfh = NULL;
static void
stacktrace(int sig)
assert_failure(const char *func, const char *file, int line, const char *cond,
int err, int xxx)
{
(void) xxx;
LOG_Log(LOG_ALERT, "Condition (%s) failed in %s(), %s line %d",
cond, func, file, line);
if (err)
LOG_Log(LOG_ALERT, "errno = %d (%s)", err, strerror(err));
abort();
}
static void
stacktrace(void)
{
void *buf[MAX_STACK_DEPTH];
int depth, i;
char **strings;
depth = backtrace (buf, MAX_STACK_DEPTH);
LOG_Log(LOG_ALERT, "Received signal %d (%s), stacktrace follows", sig,
strsignal(sig));
if (depth == 0) {
LOG_Log0(LOG_ERR, "Stacktrace empty");
return;
......@@ -280,8 +291,9 @@ stacktrace(int sig)
LOG_Log0(LOG_ERR, "Cannot retrieve symbols for stacktrace");
return;
}
/* XXX: get symbol names from nm? cf. cache_panic.c/pan_backtrace */
for (i = 0; i < depth; i++)
LOG_Log(LOG_ERR, "%p: %s", buf[i], strings[i]);
LOG_Log(LOG_ERR, "%s", strings[i]);
free(strings);
}
......@@ -303,7 +315,9 @@ terminate(int sig)
static void
stacktrace_abort(int sig)
{
stacktrace(sig);
LOG_Log(LOG_ALERT, "Received signal %d (%s), stacktrace follows", sig,
strsignal(sig));
stacktrace();
AZ(sigaction(SIGABRT, &default_action, NULL));
LOG_Log0(LOG_ALERT, "Aborting");
abort();
......@@ -592,10 +606,13 @@ main(int argc, char * const *argv)
if (LOG_Open(PACKAGE_NAME) != 0) {
exit(EXIT_FAILURE);
}
VAS_Fail = assert_failure;
if (d_flag)
LOG_SetLevel(LOG_DEBUG);
LOG_Log0(LOG_INFO, "initializing");
LOG_Log0(LOG_INFO,
"initializing (v" PACKAGE_VERSION " revision " REVISION ")");
CONF_Dump();
......
......@@ -45,6 +45,7 @@
*/
int WRK_Init(void);
void WRK_Start(void);
void WRK_Stats(void);
void WRK_Halt(void);
void WRK_Shutdown(void);
......
......@@ -44,6 +44,10 @@ typedef struct {
#define WORKER_DATA_MAGIC 0xd8eef137
unsigned id;
unsigned status;
unsigned deqs;
unsigned waits;
unsigned sends;
unsigned fails;
} worker_data_t;
typedef struct {
......@@ -55,7 +59,7 @@ static unsigned run, cleaned = 0;
static thread_data_t *thread_data;
static inline void
wrk_send(void *amq_worker, dataentry *entry, unsigned id)
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
{
const char *err;
......@@ -66,13 +70,16 @@ wrk_send(void *amq_worker, dataentry *entry, unsigned id)
err = MQ_Send(amq_worker, entry->data, entry->end);
if (err != NULL) {
/* XXX: error recovery? reconnect? preserve the data? */
LOG_Log(LOG_ALERT, "Worker %d: Failed to send data: %s", id, err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", id, entry->end,
entry->data);
wrk->fails++;
LOG_Log(LOG_ALERT, "Worker %d: Failed to send data: %s", wrk->id, err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED);
}
else
else {
wrk->sends++;
MON_StatsUpdate(STATS_SENT);
}
entry->state = DATA_EMPTY;
/* From Varnish vmb.h -- platform-independent write memory barrier */
VWMB();
......@@ -100,8 +107,8 @@ static void
while (run) {
entry = (dataentry *) SPMCQ_Deq();
if (entry != NULL) {
/* Dequeued a data entry */
wrk_send(amq_worker, entry, wrk->id);
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
continue;
}
/* Queue is empty, wait until data are available, or quit is
......@@ -110,15 +117,19 @@ static void
barrier */
AZ(pthread_mutex_lock(&spmcq_nonempty_lock));
/* run is guaranteed to be fresh here */
if (run)
if (run) {
wrk->waits++;
AZ(pthread_cond_wait(&spmcq_nonempty_cond,
&spmcq_nonempty_lock));
}
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
}
/* Prepare to exit, drain the queue */
while ((entry = (dataentry *) SPMCQ_Deq()) != NULL)
wrk_send(amq_worker, entry, wrk->id);
while ((entry = (dataentry *) SPMCQ_Deq()) != NULL) {
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
}
wrk->status = EXIT_SUCCESS;
err = MQ_WorkerShutdown(&amq_worker);
......@@ -165,9 +176,11 @@ WRK_Init(void)
i+1, strerror(errno));
return(errno);
}
thread_data[i].wrk_data->magic = WORKER_DATA_MAGIC;
thread_data[i].wrk_data->id = i + 1;
worker_data_t *wrk = thread_data[i].wrk_data;
wrk->magic = WORKER_DATA_MAGIC;
wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = 0;
}
AZ(pthread_mutex_init(&spmcq_nonempty_lock, NULL));
......@@ -187,6 +200,20 @@ WRK_Start(void)
thread_data[i].wrk_data));
}
void
WRK_Stats(void)
{
worker_data_t *wrk;
if (!run) return;
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO, "Worker %d: seen=%d waits=%d sent=%d failed=%d",
wrk->id, wrk->deqs, wrk->waits, wrk->sends, wrk->fails);
}
}
void
WRK_Halt(void)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment