Commit fd2a01ad authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: always clean up private messaging implementation objects

           by calling MQ_WorkerShutdown() after resend/reconnect
           failures (may have been leaking threads and connections)
parent 762e8407
...@@ -96,10 +96,10 @@ ...@@ -96,10 +96,10 @@
* then MQ_Send() is attempted again with the same data. * then MQ_Send() is attempted again with the same data.
* - If either MQ_Reconnect() fails, or the resend after a successful call * - If either MQ_Reconnect() fails, or the resend after a successful call
* to MQ_Reconnect() fails, then the private worker object is discarded * to MQ_Reconnect() fails, then the private worker object is discarded
* (set to `NULL`), and the worker thread stops (without calling * (with a call to MQ_WorkerShutdown()), and the worker thread
* MQ_WorkerShutdown()). The tracking reader may attempt to start a new * stops. The tracking reader may attempt to start a new thread in its
* thread in its place, in which case a new private worker object for * place, in which case a new private worker object for the messaging
* the messaging implementation is initialized. * implementation is initialized.
*/ */
/** /**
......
...@@ -134,7 +134,7 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk) ...@@ -134,7 +134,7 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id); LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id);
err = mqf.reconnect(amq_worker); err = mqf.reconnect(amq_worker);
if (err != NULL) { if (err != NULL) {
*amq_worker = NULL; wrk->status = EXIT_FAILURE;
LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id, LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id,
err); err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id, LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
...@@ -147,7 +147,7 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk) ...@@ -147,7 +147,7 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
err = mqf.send(*amq_worker, entry->data, entry->end); err = mqf.send(*amq_worker, entry->data, entry->end);
if (err != NULL) { if (err != NULL) {
wrk->fails++; wrk->fails++;
*amq_worker = NULL; wrk->status = EXIT_FAILURE;
LOG_Log(LOG_ALERT, LOG_Log(LOG_ALERT,
"Worker %d: Failed to send data after reconnect: %s", "Worker %d: Failed to send data after reconnect: %s",
wrk->id, err); wrk->id, err);
...@@ -215,7 +215,7 @@ static void ...@@ -215,7 +215,7 @@ static void
wrk->deqs++; wrk->deqs++;
wrk_send(&amq_worker, entry, wrk); wrk_send(&amq_worker, entry, wrk);
if (amq_worker == NULL) if (wrk->status == EXIT_FAILURE)
break; break;
continue; continue;
} }
...@@ -252,24 +252,22 @@ static void ...@@ -252,24 +252,22 @@ static void
wrk->state = WRK_SHUTTINGDOWN; wrk->state = WRK_SHUTTINGDOWN;
if (amq_worker == NULL) if (wrk->status != EXIT_FAILURE) {
wrk->status = EXIT_FAILURE;
else {
/* Prepare to exit, drain the queue */ /* Prepare to exit, drain the queue */
while ((entry = SPMCQ_Deq()) != NULL) { while ((entry = SPMCQ_Deq()) != NULL) {
wrk->deqs++; wrk->deqs++;
wrk_send(&amq_worker, entry, wrk); wrk_send(&amq_worker, entry, wrk);
} }
wrk->status = EXIT_SUCCESS; wrk->status = EXIT_SUCCESS;
err = mqf.worker_shutdown(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s",
wrk->id, err);
wrk->status = EXIT_FAILURE;
}
} }
err = mqf.worker_shutdown(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s",
wrk->id, err);
wrk->status = EXIT_FAILURE;
}
AZ(pthread_mutex_lock(&running_lock)); AZ(pthread_mutex_lock(&running_lock));
running--; running--;
exited++; exited++;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment