Commit a5b04fde authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: check and monitor state of worker threads, shutdown if

           all of them have exited
parent 18c07e34
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '718198351 232285' ]; then
if [ "$CKSUM" != '1675646904 232318' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
......@@ -492,7 +492,7 @@ vsl_diag(void *priv, const char *fmt, ...)
static void
child_main(struct VSM_data *vd, int endless, int readconfig)
{
int errnum;
int errnum, nworkers;
const char *errmsg;
pthread_t monitor;
struct passwd *pw;
......@@ -562,7 +562,6 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
strerror(errnum));
exit(EXIT_FAILURE);
}
if ((errnum = SPMCQ_Init()) != 0) {
LOG_Log(LOG_ERR, "Cannot initialize internal worker queue: %s",
strerror(errnum));
......@@ -573,6 +572,16 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
/* Start worker threads */
WRK_Start();
nworkers = WRK_Running();
LOG_Log0(LOG_INFO, "Worker threads initialized");
if (nworkers < config.nworkers) {
LOG_Log(LOG_WARNING, "%d of %d worker threads running", nworkers,
config.nworkers);
if (nworkers == 0) {
LOG_Log0(LOG_ALERT, "Worker process shutting down");
exit(EXIT_FAILURE);
}
}
/* Main loop */
term = 0;
......
......@@ -52,6 +52,7 @@ void PRIV_Sandbox(void);
int WRK_Init(void);
void WRK_Start(void);
void WRK_Stats(void);
int WRK_Running(void);
void WRK_Halt(void);
void WRK_Shutdown(void);
......
......@@ -39,11 +39,20 @@
#include "miniobj.h"
#include "vmb.h"
typedef enum {
WRK_NOTSTARTED = 0,
WRK_INITIALIZING,
WRK_RUNNING,
WRK_SHUTTINGDOWN,
WRK_EXITED
} wrk_state_e;
typedef struct {
unsigned magic;
#define WORKER_DATA_MAGIC 0xd8eef137
unsigned id;
unsigned status;
unsigned status; /* exit status */
wrk_state_e state;
unsigned deqs;
unsigned waits;
unsigned sends;
......@@ -57,6 +66,8 @@ typedef struct {
static unsigned run, cleaned = 0;
static thread_data_t *thread_data;
static const char* statename[5] = { "not started", "initializing", "running",
"shutting down", "exited" };
static inline void
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
......@@ -98,13 +109,18 @@ static void
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING;
err = MQ_WorkerInit(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err);
wrk->status = EXIT_FAILURE;
wrk->state = WRK_EXITED;
pthread_exit((void *) wrk);
}
wrk->state = WRK_RUNNING;
while (run) {
entry = (dataentry *) SPMCQ_Deq();
......@@ -126,6 +142,8 @@ static void
}
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
}
wrk->state = WRK_SHUTTINGDOWN;
/* Prepare to exit, drain the queue */
while ((entry = (dataentry *) SPMCQ_Deq()) != NULL) {
......@@ -142,6 +160,7 @@ static void
}
LOG_Log(LOG_INFO, "Worker %d: exiting", wrk->id);
wrk->state = WRK_EXITED;
pthread_exit((void *) wrk);
}
......@@ -183,6 +202,7 @@ WRK_Init(void)
wrk->magic = WORKER_DATA_MAGIC;
wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = 0;
wrk->state = WRK_NOTSTARTED;
}
AZ(pthread_mutex_init(&spmcq_nonempty_lock, NULL));
......@@ -211,8 +231,28 @@ WRK_Stats(void)
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO, "Worker %d: seen=%d waits=%d sent=%d failed=%d",
wrk->id, wrk->deqs, wrk->waits, wrk->sends, wrk->fails);
LOG_Log(LOG_INFO, "Worker %d (%s): seen=%d waits=%d sent=%d failed=%d",
wrk->id, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends,
wrk->fails);
}
}
int
WRK_Running(void)
{
worker_data_t *wrk;
while (1) {
int initialized = 0, running = 0;
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
if (wrk->state > WRK_INITIALIZING)
initialized++;
if (wrk->state == WRK_RUNNING || wrk->state == WRK_SHUTTINGDOWN)
running++;
}
if (initialized == config.nworkers)
return running;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment