Commit e429f8f6 authored by Geoff Simmons's avatar Geoff Simmons

tarckrdrd: - bugfix/tweak worker thread restarts

	- "< max threads running" is a warning, not alert
parent 4a8dcf74
...@@ -693,7 +693,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len, ...@@ -693,7 +693,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
if (wrk_running < config.nworkers) { if (wrk_running < config.nworkers) {
wrk_running = WRK_Running(); wrk_running = WRK_Running();
if (wrk_running < config.nworkers) if (wrk_running < config.nworkers)
LOG_Log(LOG_ALERT, "%d of %d workers running", wrk_running, LOG_Log(LOG_WARNING, "%d of %d workers running", wrk_running,
config.nworkers); config.nworkers);
} }
...@@ -978,8 +978,13 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig) ...@@ -978,8 +978,13 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
while (VSL_Dispatch(vd, OSL_Track, NULL) > 0) while (VSL_Dispatch(vd, OSL_Track, NULL) > 0)
if (term || !endless) if (term || !endless)
break; break;
else if (WRK_Exited() > 0) else if (WRK_Exited() > 0) {
WRK_Restart(); if ((errnum = WRK_Restart()) != 0) {
LOG_Log(LOG_ALERT, "Cannot restart worker threads, giving up "
"(%s)", strerror(errnum));
break;
}
}
else { else {
LOG_Log0(LOG_WARNING, "Log read interrupted, continuing"); LOG_Log0(LOG_WARNING, "Log read interrupted, continuing");
continue; continue;
......
...@@ -82,7 +82,7 @@ void PRIV_Sandbox(void); ...@@ -82,7 +82,7 @@ void PRIV_Sandbox(void);
*/ */
int WRK_Init(void); int WRK_Init(void);
void WRK_Start(void); void WRK_Start(void);
void WRK_Restart(void); int WRK_Restart(void);
void WRK_Stats(void); void WRK_Stats(void);
int WRK_Running(void); int WRK_Running(void);
int WRK_Exited(void); int WRK_Exited(void);
......
...@@ -80,6 +80,7 @@ struct worker_data_s { ...@@ -80,6 +80,7 @@ struct worker_data_s {
unsigned sends; unsigned sends;
unsigned fails; unsigned fails;
unsigned reconnects; unsigned reconnects;
unsigned restarts;
}; };
typedef struct worker_data_s worker_data_t; typedef struct worker_data_s worker_data_t;
...@@ -315,7 +316,8 @@ WRK_Init(void) ...@@ -315,7 +316,8 @@ WRK_Init(void)
worker_data_t *wrk = thread_data[i].wrk_data; worker_data_t *wrk = thread_data[i].wrk_data;
wrk->magic = WORKER_DATA_MAGIC; wrk->magic = WORKER_DATA_MAGIC;
wrk->id = i + 1; wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects = 0; wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= wrk->restarts = 0;
wrk->state = WRK_NOTSTARTED; wrk->state = WRK_NOTSTARTED;
} }
...@@ -343,7 +345,7 @@ WRK_Start(void) ...@@ -343,7 +345,7 @@ WRK_Start(void)
thread_data[i].wrk_data)); thread_data[i].wrk_data));
} }
void int
WRK_Restart(void) WRK_Restart(void)
{ {
worker_data_t *wrk; worker_data_t *wrk;
...@@ -352,14 +354,24 @@ WRK_Restart(void) ...@@ -352,14 +354,24 @@ WRK_Restart(void)
CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC); CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC);
wrk = thread_data[i].wrk_data; wrk = thread_data[i].wrk_data;
if (wrk->state == WRK_EXITED) { if (wrk->state == WRK_EXITED) {
exited--;
AZ(pthread_detach(thread_data[i].worker));
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= 0; = 0;
wrk->restarts++;
wrk->state = WRK_NOTSTARTED; wrk->state = WRK_NOTSTARTED;
AZ(pthread_create(&thread_data[i].worker, NULL, wrk_main, if (pthread_create(&thread_data[i].worker, NULL, wrk_main, wrk)
thread_data[i].wrk_data)); != 0) {
exited--; /* EAGAIN means we've hit a system limit trying to restart
threads, so it's time to give up. Any other errno is a
programming error.
*/
assert(errno == EAGAIN);
return errno;
}
} }
} }
return 0;
} }
void void
...@@ -372,9 +384,10 @@ WRK_Stats(void) ...@@ -372,9 +384,10 @@ WRK_Stats(void)
for (int i = 0; i < config.nworkers; i++) { for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data; wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO, LOG_Log(LOG_INFO,
"Worker %d (%s): seen=%d waits=%d sent=%d reconnects=%d failed=%d", "Worker %d (%s): seen=%u waits=%u sent=%u reconnects=%u "
"restarts=%u failed=%d",
wrk->id, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends, wrk->id, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends,
wrk->reconnects, wrk->fails); wrk->reconnects, wrk->restarts, wrk->fails);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment