Commit 4a8dcf74 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - fixed bug in thread exit after reconnect failure

	- restored hash entry check against fd, but not if stale (-1)
	- removed membar after client bit not set (didn't help)
	- intial data send fail (before reconnect) is a warning (not alert)
parent d27e4edd
...@@ -175,7 +175,7 @@ entry_assert_failure(const char *func, const char *file, int line, ...@@ -175,7 +175,7 @@ entry_assert_failure(const char *func, const char *file, int line,
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
static inline void static inline void
check_entry(hashentry *he, unsigned xid) check_entry(hashentry *he, unsigned xid, unsigned fd)
{ {
dataentry *de; dataentry *de;
CHECK_OBJ_NOTNULL(he, HASH_MAGIC); CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
...@@ -186,6 +186,8 @@ check_entry(hashentry *he, unsigned xid) ...@@ -186,6 +186,8 @@ check_entry(hashentry *he, unsigned xid)
entry_assert(he, de != NULL); entry_assert(he, de != NULL);
entry_assert(he, de->magic == DATA_MAGIC); entry_assert(he, de->magic == DATA_MAGIC);
entry_assert(he, de->xid == xid); entry_assert(he, de->xid == xid);
if (fd != (unsigned int) -1)
entry_assert(he, de->tid == fd);
} }
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
...@@ -696,12 +698,13 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len, ...@@ -696,12 +698,13 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
} }
/* spec != 'c' */ /* spec != 'c' */
if ((spec & VSL_S_CLIENT) == 0) { if ((spec & VSL_S_CLIENT) == 0)
LOG_Log(LOG_WARNING, "%s: Client bit ('c') not set [%.*s]", LOG_Log(LOG_WARNING, "%s: Client bit ('c') not set [%.*s]",
VSL_tags[tag], len, ptr); VSL_tags[tag], len, ptr);
/* This may signal that data are not fresh */
VRMB(); if (fd == (unsigned int) -1)
} LOG_Log(LOG_WARNING, "%s: File descriptor not set [%.*s]",
VSL_tags[tag], len, ptr);
switch (tag) { switch (tag) {
case SLT_ReqStart: case SLT_ReqStart:
...@@ -749,15 +752,8 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len, ...@@ -749,15 +752,8 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
break; break;
} }
check_entry(he, xid); check_entry(he, xid, fd);
de = he->de; de = he->de;
if (de->tid != fd) {
LOG_Log(LOG_ERR, "%s: fd mismatch, was %u, saw %u (XID=%u), "
"data DISCARDED [%.*s]",
VSL_tags[tag], de->tid, fd, xid, datalen, data);
htbl.drop_vcl_log++;
}
append(de, tag, xid, data, datalen); append(de, tag, xid, data, datalen);
de->hasdata = true; de->hasdata = true;
break; break;
...@@ -784,14 +780,8 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len, ...@@ -784,14 +780,8 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
} }
break; break;
} }
check_entry(he, xid); check_entry(he, xid, fd);
de = he->de; de = he->de;
if (de->tid != fd) {
LOG_Log(LOG_ERR, "%s: fd mismatch, was %u, saw %u (XID=%u), "
"data DISCARDED [%.*s]",
VSL_tags[tag], de->tid, fd, xid, datalen, data);
htbl.drop_reqend++;
}
sprintf(reqend_str, "%s=%u.%09lu", REQEND_T_VAR, sprintf(reqend_str, "%s=%u.%09lu", REQEND_T_VAR,
(unsigned) reqend_t.tv_sec, reqend_t.tv_nsec); (unsigned) reqend_t.tv_sec, reqend_t.tv_nsec);
...@@ -824,6 +814,9 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len, ...@@ -824,6 +814,9 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
} }
pptr = ptr; pptr = ptr;
if (WRK_Exited() > 0)
return 1;
return 0; return 0;
} }
...@@ -985,6 +978,8 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig) ...@@ -985,6 +978,8 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
while (VSL_Dispatch(vd, OSL_Track, NULL) > 0) while (VSL_Dispatch(vd, OSL_Track, NULL) > 0)
if (term || !endless) if (term || !endless)
break; break;
else if (WRK_Exited() > 0)
WRK_Restart();
else { else {
LOG_Log0(LOG_WARNING, "Log read interrupted, continuing"); LOG_Log0(LOG_WARNING, "Log read interrupted, continuing");
continue; continue;
......
...@@ -82,8 +82,10 @@ void PRIV_Sandbox(void); ...@@ -82,8 +82,10 @@ void PRIV_Sandbox(void);
*/ */
int WRK_Init(void); int WRK_Init(void);
void WRK_Start(void); void WRK_Start(void);
void WRK_Restart(void);
void WRK_Stats(void); void WRK_Stats(void);
int WRK_Running(void); int WRK_Running(void);
int WRK_Exited(void);
void WRK_Halt(void); void WRK_Halt(void);
void WRK_Shutdown(void); void WRK_Shutdown(void);
......
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
#define VERSION_LEN 64 #define VERSION_LEN 64
#define CLIENT_ID_LEN 80 #define CLIENT_ID_LEN 80
static int running = 0; static int running = 0, exited = 0;
typedef enum { typedef enum {
WRK_NOTSTARTED = 0, WRK_NOTSTARTED = 0,
...@@ -126,11 +126,12 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk) ...@@ -126,11 +126,12 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
/* XXX: report entry->incomplete to backend ? */ /* XXX: report entry->incomplete to backend ? */
err = MQ_Send(*amq_worker, entry->data, entry->end); err = MQ_Send(*amq_worker, entry->data, entry->end);
if (err != NULL) { if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Failed to send data: %s", wrk->id, err); LOG_Log(LOG_WARNING, "Worker %d: Failed to send data: %s",
wrk->id, err);
LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id); LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id);
err = MQ_Reconnect(amq_worker); err = MQ_Reconnect(amq_worker);
if (err != NULL) { if (err != NULL) {
amq_worker = NULL; *amq_worker = NULL;
LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id, LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id,
err); err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id, LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
...@@ -189,6 +190,7 @@ static void ...@@ -189,6 +190,7 @@ static void
wrk->id, err); wrk->id, err);
wrk->status = EXIT_FAILURE; wrk->status = EXIT_FAILURE;
wrk->state = WRK_EXITED; wrk->state = WRK_EXITED;
exited++;
pthread_exit((void *) wrk); pthread_exit((void *) wrk);
} }
...@@ -248,7 +250,9 @@ static void ...@@ -248,7 +250,9 @@ static void
wrk->state = WRK_SHUTTINGDOWN; wrk->state = WRK_SHUTTINGDOWN;
if (amq_worker != NULL) { if (amq_worker == NULL)
wrk->status = EXIT_FAILURE;
else {
/* Prepare to exit, drain the queue */ /* Prepare to exit, drain the queue */
while ((entry = SPMCQ_Deq()) != NULL) { while ((entry = SPMCQ_Deq()) != NULL) {
wrk->deqs++; wrk->deqs++;
...@@ -263,11 +267,10 @@ static void ...@@ -263,11 +267,10 @@ static void
wrk->status = EXIT_FAILURE; wrk->status = EXIT_FAILURE;
} }
} }
else
wrk->status = EXIT_FAILURE;
AZ(pthread_mutex_lock(&running_lock)); AZ(pthread_mutex_lock(&running_lock));
running--; running--;
exited++;
AZ(pthread_mutex_unlock(&running_lock)); AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: exiting", wrk->id); LOG_Log(LOG_INFO, "Worker %d: exiting", wrk->id);
wrk->state = WRK_EXITED; wrk->state = WRK_EXITED;
...@@ -340,6 +343,25 @@ WRK_Start(void) ...@@ -340,6 +343,25 @@ WRK_Start(void)
thread_data[i].wrk_data)); thread_data[i].wrk_data));
} }
void
WRK_Restart(void)
{
worker_data_t *wrk;
for (int i = 0; i < config.nworkers; i++) {
CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC);
wrk = thread_data[i].wrk_data;
if (wrk->state == WRK_EXITED) {
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= 0;
wrk->state = WRK_NOTSTARTED;
AZ(pthread_create(&thread_data[i].worker, NULL, wrk_main,
thread_data[i].wrk_data));
exited--;
}
}
}
void void
WRK_Stats(void) WRK_Stats(void)
{ {
...@@ -362,6 +384,12 @@ WRK_Running(void) ...@@ -362,6 +384,12 @@ WRK_Running(void)
return running; return running;
} }
int
WRK_Exited(void)
{
return exited;
}
void void
WRK_Halt(void) WRK_Halt(void)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment