Commit ee16f13e authored by Geoff Simmons's avatar Geoff Simmons

move some worker thread intialization into WRK_Init() (out of the

thread function), and add some integrity checks
parent 3f7d68b9
...@@ -257,8 +257,8 @@ static void ...@@ -257,8 +257,8 @@ static void
dataentry *entry; dataentry *entry;
const char *err; const char *err;
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
wrk->state = WRK_INITIALIZING; wrk->state = WRK_INITIALIZING;
err = mqf.worker_init(&mq_worker, wrk->id); err = mqf.worker_init(&mq_worker, wrk->id);
...@@ -274,10 +274,6 @@ static void ...@@ -274,10 +274,6 @@ static void
} }
wrk_log_connection(mq_worker, wrk->id); wrk_log_connection(mq_worker, wrk->id);
VSTAILQ_INIT(&wrk->freerec);
wrk->nfree_rec = 0;
VSTAILQ_INIT(&wrk->freechunk);
wrk->nfree_chunk = 0;
wrk->state = WRK_RUNNING; wrk->state = WRK_RUNNING;
AZ(pthread_mutex_lock(&running_lock)); AZ(pthread_mutex_lock(&running_lock));
...@@ -387,6 +383,10 @@ WRK_Init(void) ...@@ -387,6 +383,10 @@ WRK_Init(void)
wrk->sb = (struct vsb *) malloc(sizeof(struct vsb)); wrk->sb = (struct vsb *) malloc(sizeof(struct vsb));
AN(wrk->sb); AN(wrk->sb);
AN(VSB_new(wrk->sb, NULL, config.max_reclen + 1, VSB_FIXEDLEN)); AN(VSB_new(wrk->sb, NULL, config.max_reclen + 1, VSB_FIXEDLEN));
VSTAILQ_INIT(&wrk->freerec);
wrk->nfree_rec = 0;
VSTAILQ_INIT(&wrk->freechunk);
wrk->nfree_chunk = 0;
wrk->id = i + 1; wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= wrk->restarts = wrk->recoverables = wrk->bytes = 0; = wrk->restarts = wrk->recoverables = wrk->bytes = 0;
...@@ -409,9 +409,11 @@ void ...@@ -409,9 +409,11 @@ void
WRK_Start(void) WRK_Start(void)
{ {
run = 1; run = 1;
for (int i = 0; i < config.nworkers; i++) for (int i = 0; i < config.nworkers; i++) {
CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC);
AZ(pthread_create(&thread_data[i].worker, NULL, wrk_main, AZ(pthread_create(&thread_data[i].worker, NULL, wrk_main,
thread_data[i].wrk_data)); thread_data[i].wrk_data));
}
} }
int int
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment