Commit 05052b58 authored by Geoff Simmons's avatar Geoff Simmons

move some worker thread intialization into WRK_Init() (out of the

thread function), and add some integrity checks
parent d7535d5a
......@@ -257,8 +257,8 @@ static void
dataentry *entry;
const char *err;
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
wrk->state = WRK_INITIALIZING;
err = mqf.worker_init(&mq_worker, wrk->id);
......@@ -274,10 +274,6 @@ static void
}
wrk_log_connection(mq_worker, wrk->id);
VSTAILQ_INIT(&wrk->freerec);
wrk->nfree_rec = 0;
VSTAILQ_INIT(&wrk->freechunk);
wrk->nfree_chunk = 0;
wrk->state = WRK_RUNNING;
AZ(pthread_mutex_lock(&running_lock));
......@@ -387,6 +383,10 @@ WRK_Init(void)
wrk->sb = (struct vsb *) malloc(sizeof(struct vsb));
AN(wrk->sb);
AN(VSB_new(wrk->sb, NULL, config.max_reclen + 1, VSB_FIXEDLEN));
VSTAILQ_INIT(&wrk->freerec);
wrk->nfree_rec = 0;
VSTAILQ_INIT(&wrk->freechunk);
wrk->nfree_chunk = 0;
wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= wrk->restarts = wrk->recoverables = wrk->bytes = 0;
......@@ -409,9 +409,11 @@ void
WRK_Start(void)
{
run = 1;
for (int i = 0; i < config.nworkers; i++)
for (int i = 0; i < config.nworkers; i++) {
CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC);
AZ(pthread_create(&thread_data[i].worker, NULL, wrk_main,
thread_data[i].wrk_data));
thread_data[i].wrk_data));
}
}
int
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment