Commit a26a06fb authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: check and monitor state of worker threads, shutdown if

           all of them have exited
parent 4a8c2bf8
...@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf" ...@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it # the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum) CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '718198351 232285' ]; then if [ "$CKSUM" != '1675646904 232318' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM" echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1 exit 1
fi fi
......
...@@ -492,7 +492,7 @@ vsl_diag(void *priv, const char *fmt, ...) ...@@ -492,7 +492,7 @@ vsl_diag(void *priv, const char *fmt, ...)
static void static void
child_main(struct VSM_data *vd, int endless, int readconfig) child_main(struct VSM_data *vd, int endless, int readconfig)
{ {
int errnum; int errnum, nworkers;
const char *errmsg; const char *errmsg;
pthread_t monitor; pthread_t monitor;
struct passwd *pw; struct passwd *pw;
...@@ -562,7 +562,6 @@ child_main(struct VSM_data *vd, int endless, int readconfig) ...@@ -562,7 +562,6 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
strerror(errnum)); strerror(errnum));
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if ((errnum = SPMCQ_Init()) != 0) { if ((errnum = SPMCQ_Init()) != 0) {
LOG_Log(LOG_ERR, "Cannot initialize internal worker queue: %s", LOG_Log(LOG_ERR, "Cannot initialize internal worker queue: %s",
strerror(errnum)); strerror(errnum));
...@@ -573,6 +572,16 @@ child_main(struct VSM_data *vd, int endless, int readconfig) ...@@ -573,6 +572,16 @@ child_main(struct VSM_data *vd, int endless, int readconfig)
/* Start worker threads */ /* Start worker threads */
WRK_Start(); WRK_Start();
nworkers = WRK_Running();
LOG_Log0(LOG_INFO, "Worker threads initialized");
if (nworkers < config.nworkers) {
LOG_Log(LOG_WARNING, "%d of %d worker threads running", nworkers,
config.nworkers);
if (nworkers == 0) {
LOG_Log0(LOG_ALERT, "Worker process shutting down");
exit(EXIT_FAILURE);
}
}
/* Main loop */ /* Main loop */
term = 0; term = 0;
......
...@@ -52,6 +52,7 @@ void PRIV_Sandbox(void); ...@@ -52,6 +52,7 @@ void PRIV_Sandbox(void);
int WRK_Init(void); int WRK_Init(void);
void WRK_Start(void); void WRK_Start(void);
void WRK_Stats(void); void WRK_Stats(void);
int WRK_Running(void);
void WRK_Halt(void); void WRK_Halt(void);
void WRK_Shutdown(void); void WRK_Shutdown(void);
......
...@@ -39,11 +39,20 @@ ...@@ -39,11 +39,20 @@
#include "miniobj.h" #include "miniobj.h"
#include "vmb.h" #include "vmb.h"
typedef enum {
WRK_NOTSTARTED = 0,
WRK_INITIALIZING,
WRK_RUNNING,
WRK_SHUTTINGDOWN,
WRK_EXITED
} wrk_state_e;
typedef struct { typedef struct {
unsigned magic; unsigned magic;
#define WORKER_DATA_MAGIC 0xd8eef137 #define WORKER_DATA_MAGIC 0xd8eef137
unsigned id; unsigned id;
unsigned status; unsigned status; /* exit status */
wrk_state_e state;
unsigned deqs; unsigned deqs;
unsigned waits; unsigned waits;
unsigned sends; unsigned sends;
...@@ -57,6 +66,8 @@ typedef struct { ...@@ -57,6 +66,8 @@ typedef struct {
static unsigned run, cleaned = 0; static unsigned run, cleaned = 0;
static thread_data_t *thread_data; static thread_data_t *thread_data;
static const char* statename[5] = { "not started", "initializing", "running",
"shutting down", "exited" };
static inline void static inline void
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk) wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
...@@ -98,13 +109,18 @@ static void ...@@ -98,13 +109,18 @@ static void
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id); LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
wrk->state = WRK_INITIALIZING;
err = MQ_WorkerInit(&amq_worker); err = MQ_WorkerInit(&amq_worker);
if (err != NULL) { if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s", LOG_Log(LOG_ALERT, "Worker %d: Cannot initialize queue connection: %s",
wrk->id, err); wrk->id, err);
wrk->status = EXIT_FAILURE; wrk->status = EXIT_FAILURE;
wrk->state = WRK_EXITED;
pthread_exit((void *) wrk); pthread_exit((void *) wrk);
} }
wrk->state = WRK_RUNNING;
while (run) { while (run) {
entry = (dataentry *) SPMCQ_Deq(); entry = (dataentry *) SPMCQ_Deq();
...@@ -126,6 +142,8 @@ static void ...@@ -126,6 +142,8 @@ static void
} }
AZ(pthread_mutex_unlock(&spmcq_nonempty_lock)); AZ(pthread_mutex_unlock(&spmcq_nonempty_lock));
} }
wrk->state = WRK_SHUTTINGDOWN;
/* Prepare to exit, drain the queue */ /* Prepare to exit, drain the queue */
while ((entry = (dataentry *) SPMCQ_Deq()) != NULL) { while ((entry = (dataentry *) SPMCQ_Deq()) != NULL) {
...@@ -142,6 +160,7 @@ static void ...@@ -142,6 +160,7 @@ static void
} }
LOG_Log(LOG_INFO, "Worker %d: exiting", wrk->id); LOG_Log(LOG_INFO, "Worker %d: exiting", wrk->id);
wrk->state = WRK_EXITED;
pthread_exit((void *) wrk); pthread_exit((void *) wrk);
} }
...@@ -183,6 +202,7 @@ WRK_Init(void) ...@@ -183,6 +202,7 @@ WRK_Init(void)
wrk->magic = WORKER_DATA_MAGIC; wrk->magic = WORKER_DATA_MAGIC;
wrk->id = i + 1; wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = 0; wrk->deqs = wrk->waits = wrk->sends = wrk->fails = 0;
wrk->state = WRK_NOTSTARTED;
} }
AZ(pthread_mutex_init(&spmcq_nonempty_lock, NULL)); AZ(pthread_mutex_init(&spmcq_nonempty_lock, NULL));
...@@ -211,8 +231,28 @@ WRK_Stats(void) ...@@ -211,8 +231,28 @@ WRK_Stats(void)
for (int i = 0; i < config.nworkers; i++) { for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data; wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO, "Worker %d: seen=%d waits=%d sent=%d failed=%d", LOG_Log(LOG_INFO, "Worker %d (%s): seen=%d waits=%d sent=%d failed=%d",
wrk->id, wrk->deqs, wrk->waits, wrk->sends, wrk->fails); wrk->id, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends,
wrk->fails);
}
}
int
WRK_Running(void)
{
worker_data_t *wrk;
while (1) {
int initialized = 0, running = 0;
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
if (wrk->state > WRK_INITIALIZING)
initialized++;
if (wrk->state == WRK_RUNNING || wrk->state == WRK_SHUTTINGDOWN)
running++;
}
if (initialized == config.nworkers)
return running;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment