Commit 99affe3d authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - fixed bug in thread exit after reconnect failure

	- restored hash entry check against fd, but not if stale (-1)
	- removed membar after client bit not set (didn't help)
	- intial data send fail (before reconnect) is a warning (not alert)
parent 298e6fbd
......@@ -175,7 +175,7 @@ entry_assert_failure(const char *func, const char *file, int line,
/*--------------------------------------------------------------------*/
static inline void
check_entry(hashentry *he, unsigned xid)
check_entry(hashentry *he, unsigned xid, unsigned fd)
{
dataentry *de;
CHECK_OBJ_NOTNULL(he, HASH_MAGIC);
......@@ -186,6 +186,8 @@ check_entry(hashentry *he, unsigned xid)
entry_assert(he, de != NULL);
entry_assert(he, de->magic == DATA_MAGIC);
entry_assert(he, de->xid == xid);
if (fd != (unsigned int) -1)
entry_assert(he, de->tid == fd);
}
/*--------------------------------------------------------------------*/
......@@ -696,12 +698,13 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
}
/* spec != 'c' */
if ((spec & VSL_S_CLIENT) == 0) {
if ((spec & VSL_S_CLIENT) == 0)
LOG_Log(LOG_WARNING, "%s: Client bit ('c') not set [%.*s]",
VSL_tags[tag], len, ptr);
/* This may signal that data are not fresh */
VRMB();
}
if (fd == (unsigned int) -1)
LOG_Log(LOG_WARNING, "%s: File descriptor not set [%.*s]",
VSL_tags[tag], len, ptr);
switch (tag) {
case SLT_ReqStart:
......@@ -749,15 +752,8 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
break;
}
check_entry(he, xid);
check_entry(he, xid, fd);
de = he->de;
if (de->tid != fd) {
LOG_Log(LOG_ERR, "%s: fd mismatch, was %u, saw %u (XID=%u), "
"data DISCARDED [%.*s]",
VSL_tags[tag], de->tid, fd, xid, datalen, data);
htbl.drop_vcl_log++;
}
append(de, tag, xid, data, datalen);
de->hasdata = true;
break;
......@@ -784,14 +780,8 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
}
break;
}
check_entry(he, xid);
check_entry(he, xid, fd);
de = he->de;
if (de->tid != fd) {
LOG_Log(LOG_ERR, "%s: fd mismatch, was %u, saw %u (XID=%u), "
"data DISCARDED [%.*s]",
VSL_tags[tag], de->tid, fd, xid, datalen, data);
htbl.drop_reqend++;
}
sprintf(reqend_str, "%s=%u.%09lu", REQEND_T_VAR,
(unsigned) reqend_t.tv_sec, reqend_t.tv_nsec);
......@@ -824,6 +814,9 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
}
pptr = ptr;
if (WRK_Exited() > 0)
return 1;
return 0;
}
......@@ -985,6 +978,8 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
while (VSL_Dispatch(vd, OSL_Track, NULL) > 0)
if (term || !endless)
break;
else if (WRK_Exited() > 0)
WRK_Restart();
else {
LOG_Log0(LOG_WARNING, "Log read interrupted, continuing");
continue;
......
......@@ -82,8 +82,10 @@ void PRIV_Sandbox(void);
*/
int WRK_Init(void);
void WRK_Start(void);
void WRK_Restart(void);
void WRK_Stats(void);
int WRK_Running(void);
int WRK_Exited(void);
void WRK_Halt(void);
void WRK_Shutdown(void);
......
......@@ -42,7 +42,7 @@
#define VERSION_LEN 64
#define CLIENT_ID_LEN 80
static int running = 0;
static int running = 0, exited = 0;
typedef enum {
WRK_NOTSTARTED = 0,
......@@ -126,11 +126,12 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
/* XXX: report entry->incomplete to backend ? */
err = MQ_Send(*amq_worker, entry->data, entry->end);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: Failed to send data: %s", wrk->id, err);
LOG_Log(LOG_WARNING, "Worker %d: Failed to send data: %s",
wrk->id, err);
LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id);
err = MQ_Reconnect(amq_worker);
if (err != NULL) {
amq_worker = NULL;
*amq_worker = NULL;
LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id,
err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
......@@ -189,6 +190,7 @@ static void
wrk->id, err);
wrk->status = EXIT_FAILURE;
wrk->state = WRK_EXITED;
exited++;
pthread_exit((void *) wrk);
}
......@@ -248,7 +250,9 @@ static void
wrk->state = WRK_SHUTTINGDOWN;
if (amq_worker != NULL) {
if (amq_worker == NULL)
wrk->status = EXIT_FAILURE;
else {
/* Prepare to exit, drain the queue */
while ((entry = SPMCQ_Deq()) != NULL) {
wrk->deqs++;
......@@ -263,11 +267,10 @@ static void
wrk->status = EXIT_FAILURE;
}
}
else
wrk->status = EXIT_FAILURE;
AZ(pthread_mutex_lock(&running_lock));
running--;
exited++;
AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: exiting", wrk->id);
wrk->state = WRK_EXITED;
......@@ -340,6 +343,25 @@ WRK_Start(void)
thread_data[i].wrk_data));
}
void
WRK_Restart(void)
{
worker_data_t *wrk;
for (int i = 0; i < config.nworkers; i++) {
CHECK_OBJ_NOTNULL(thread_data[i].wrk_data, WORKER_DATA_MAGIC);
wrk = thread_data[i].wrk_data;
if (wrk->state == WRK_EXITED) {
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= 0;
wrk->state = WRK_NOTSTARTED;
AZ(pthread_create(&thread_data[i].worker, NULL, wrk_main,
thread_data[i].wrk_data));
exited--;
}
}
}
void
WRK_Stats(void)
{
......@@ -362,6 +384,12 @@ WRK_Running(void)
return running;
}
int
WRK_Exited(void)
{
return exited;
}
void
WRK_Halt(void)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment