Commit 99de8f95 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - no longer using SPMCQ_StopWorker() it was too aggressive

	- more carefully counting the running and exited workers
parent 301c31b5
...@@ -689,12 +689,10 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len, ...@@ -689,12 +689,10 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
if (term && htbl.open == 0) if (term && htbl.open == 0)
return 1; return 1;
if (wrk_running < config.nworkers) { wrk_running = WRK_Running();
wrk_running = WRK_Running(); if (wrk_running < config.nworkers)
if (wrk_running < config.nworkers) LOG_Log(LOG_WARNING, "%d of %d workers running", wrk_running,
LOG_Log(LOG_WARNING, "%d of %d workers running", wrk_running, config.nworkers);
config.nworkers);
}
/* spec != 'c' */ /* spec != 'c' */
if ((spec & VSL_S_CLIENT) == 0) if ((spec & VSL_S_CLIENT) == 0)
......
...@@ -152,12 +152,3 @@ SPMCQ_NeedWorker(int running) ...@@ -152,12 +152,3 @@ SPMCQ_NeedWorker(int running)
return spmcq_len() > spmcq_wrk_len_ratio(running - spmcq_datawaiter, return spmcq_len() > spmcq_wrk_len_ratio(running - spmcq_datawaiter,
running); running);
} }
bool
SPMCQ_StopWorker(int running)
{
if (running == 0)
return false;
return spmcq_len() < spmcq_wrk_len_ratio(running - spmcq_datawaiter - 1,
running);
}
...@@ -184,7 +184,6 @@ void SPMCQ_Enq(dataentry *ptr); ...@@ -184,7 +184,6 @@ void SPMCQ_Enq(dataentry *ptr);
dataentry *SPMCQ_Deq(void); dataentry *SPMCQ_Deq(void);
void SPMCQ_Drain(void); void SPMCQ_Drain(void);
bool SPMCQ_NeedWorker(int running); bool SPMCQ_NeedWorker(int running);
bool SPMCQ_StopWorker(int running);
#define spmcq_wait(what) \ #define spmcq_wait(what) \
do { \ do { \
......
...@@ -191,7 +191,9 @@ static void ...@@ -191,7 +191,9 @@ static void
wrk->id, err); wrk->id, err);
wrk->status = EXIT_FAILURE; wrk->status = EXIT_FAILURE;
wrk->state = WRK_EXITED; wrk->state = WRK_EXITED;
AZ(pthread_mutex_lock(&running_lock));
exited++; exited++;
AZ(pthread_mutex_unlock(&running_lock));
pthread_exit((void *) wrk); pthread_exit((void *) wrk);
} }
...@@ -212,8 +214,7 @@ static void ...@@ -212,8 +214,7 @@ static void
if (amq_worker == NULL) if (amq_worker == NULL)
break; break;
if (!SPMCQ_StopWorker(running)) continue;
continue;
} }
/* return space before sleeping */ /* return space before sleeping */
...@@ -223,9 +224,8 @@ static void ...@@ -223,9 +224,8 @@ static void
} }
/* /*
* Queue is empty or we should backoff * Queue is empty, wait until data are available, or quit is
* * signaled.
* wait until data are available, or quit is signaled.
* *
* Grab the CV lock, which also constitutes an implicit memory * Grab the CV lock, which also constitutes an implicit memory
* barrier * barrier
...@@ -233,11 +233,9 @@ static void ...@@ -233,11 +233,9 @@ static void
AZ(pthread_mutex_lock(&spmcq_datawaiter_lock)); AZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
/* /*
* run is guaranteed to be fresh here * run is guaranteed to be fresh here
*
* also re-check the stop condition under the lock
*/ */
SPMCQ_Drain(); SPMCQ_Drain();
if (run && ((! entry) || SPMCQ_StopWorker(running))) { if (run) {
wrk->waits++; wrk->waits++;
spmcq_datawaiter++; spmcq_datawaiter++;
wrk->state = WRK_WAITING; wrk->state = WRK_WAITING;
...@@ -354,7 +352,9 @@ WRK_Restart(void) ...@@ -354,7 +352,9 @@ WRK_Restart(void)
CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC); CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC);
wrk = thread_data[i].wrk_data; wrk = thread_data[i].wrk_data;
if (wrk->state == WRK_EXITED) { if (wrk->state == WRK_EXITED) {
AZ(pthread_mutex_lock(&running_lock));
exited--; exited--;
AZ(pthread_mutex_unlock(&running_lock));
AZ(pthread_detach(thread_data[i].worker)); AZ(pthread_detach(thread_data[i].worker));
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= 0; = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment