Commit 0c5c731f authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: - MQ reconnect and one retry after send failure

	- no connection pooling, always one connection per worker
parent 85688fc8
......@@ -123,7 +123,6 @@ CONF_Add(const char *lval, const char *rval)
confUnsigned("hash.ttl", hash_ttl);
confUnsigned("hash.mlt", hash_mlt);
confUnsigned("nworkers", nworkers);
confUnsigned("mq.pool_size", mq_pool_size);
confUnsigned("restarts", restarts);
confUnsigned("monitor.interval", monitor_interval);
......@@ -242,7 +241,6 @@ CONF_Init(void)
config.mq_uri = (char **) malloc (sizeof(char **));
AN(config.mq_uri);
config.mq_qname[0] = '\0';
config.mq_pool_size = 5;
config.nworkers = 1;
config.restarts = 1;
......@@ -348,7 +346,6 @@ CONF_Dump(void)
LOG_Log0(LOG_DEBUG, "config: mq.uri = ");
confdump("mq.qname = %s", config.mq_qname);
confdump("mq.pool_size = %u", config.mq_pool_size);
confdump("nworkers = %u", config.nworkers);
confdump("restarts = %u", config.restarts);
confdump("user = %s", config.user_name);
......
......@@ -61,6 +61,7 @@ log_output(void)
"open=%u "
"load=%.2f "
"sent=%u "
"reconnects=%u "
"failed=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
......@@ -74,6 +75,7 @@ log_output(void)
dtbl.r_stats.open,
(100.0 * (1.0 * dtbl.r_stats.done + 1.0 * dtbl.r_stats.open) / dtbl.len),
dtbl.r_stats.sent,
dtbl.r_stats.reconnects,
dtbl.r_stats.failed,
dtbl.r_stats.occ_hi,
dtbl.r_stats.occ_hi_this
......@@ -174,6 +176,10 @@ MON_StatsUpdate(stats_update_t update)
dtbl.r_stats.open--;
break;
case STATS_RECONNECT:
dtbl.r_stats.reconnects++;
break;
case STATS_OCCUPANCY:
dtbl.r_stats.open++;
if (dtbl.r_stats.open + dtbl.r_stats.done > dtbl.r_stats.occ_hi)
......
......@@ -40,7 +40,6 @@
#include "vas.h"
static AMQ_Connection **connections;
static unsigned nconnections;
static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
static unsigned connection = 0;
......@@ -59,27 +58,24 @@ MQ_InitConnections(void)
if (config.n_mq_uris == 0)
return NULL;
nconnections = config.n_mq_uris * config.mq_pool_size;
connections = (AMQ_Connection **) calloc(sizeof(AMQ_Connection *),
nconnections);
config.nworkers);
if (connections == NULL)
return strerror(errno);
for (int i = 0; i < config.n_mq_uris; i++)
for (int j = 0; j < config.mq_pool_size; j++) {
err = AMQ_ConnectionInit(&conn, config.mq_uri[i]);
if (err != NULL)
return err;
connections[i*config.mq_pool_size + j] = conn;
}
for (int i = 0; i < config.nworkers; i++) {
err = AMQ_ConnectionInit(&conn, config.mq_uri[i % config.n_mq_uris]);
if (err != NULL)
return err;
connections[i] = conn;
}
return NULL;
}
const char *
MQ_WorkerInit(void **priv)
{
AN(nconnections);
AZ(pthread_mutex_lock(&connection_lock));
AMQ_Connection *conn = connections[connection++ % nconnections];
AMQ_Connection *conn = connections[connection++ % config.nworkers];
AZ(pthread_mutex_unlock(&connection_lock));
return AMQ_WorkerInit((AMQ_Worker **) priv, conn, config.mq_qname);
}
......@@ -90,6 +86,22 @@ MQ_Send(void *priv, const char *data, unsigned len)
return AMQ_Send((AMQ_Worker *) priv, data, len);
}
const char *
MQ_Reconnect(void **priv)
{
const char *err;
AMQ_Connection *conn;
err = AMQ_WorkerShutdown((AMQ_Worker **) priv);
if (err != NULL)
return err;
err = AMQ_ConnectionInit(&conn,
config.mq_uri[connection++ % config.n_mq_uris]);
if (err != NULL)
return err;
return AMQ_WorkerInit((AMQ_Worker **) priv, conn, config.mq_qname);
}
const char *
MQ_Version(void *priv, char *version)
{
......
......@@ -40,19 +40,6 @@
#include "vas.h"
#include "vqueue.h"
#if 0
typedef struct {
unsigned magic;
#define SPMCQ_MAGIC 0xe9a5d0a8
const unsigned mask;
void **data;
volatile unsigned head;
volatile unsigned tail;
} spmcq_t;
spmcq_t spmcq;
#endif
static volatile unsigned long enqs = 0, deqs = 0;
static pthread_mutex_t spmcq_lock;
static pthread_mutex_t spmcq_deq_lock;
......
......@@ -22,7 +22,7 @@ CMD="../trackrdrd -D -f varnish.binlog -l - -d -c test.conf"
# the user running it
CKSUM=$( $CMD | sed -e 's/\(initializing\) \(.*\)/\1/' | sed -e 's/\(Running as\) \([a-zA-Z0-9]*\)$/\1/' | grep -v 'Not running as root' | cksum)
if [ "$CKSUM" != '358541331 234058' ]; then
if [ "$CKSUM" != '3013966607 234026' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
......@@ -66,6 +66,7 @@ static char
printf("... testing MQ connection initialization\n");
config.n_mq_uris = 1;
config.nworkers = 1;
config.mq_uri = (char **) malloc(sizeof(char **));
AN(config.mq_uri);
config.mq_uri[0] = (char *) malloc(strlen("tcp://localhost:61616") + 1);
......@@ -73,7 +74,6 @@ static char
strcpy(config.mq_uri[0], "tcp://localhost:61616");
strcpy(config.mq_qname, "lhoste/tracking/test");
config.mq_pool_size = 1;
err = MQ_InitConnections();
if (err != NULL && strstr(err, "Connection refused") != NULL) {
printf("Connection refused, ActiveMQ assumed not running\n");
......
......@@ -68,7 +68,6 @@ static char
config.maxdata = 1024;
config.nworkers = NWORKERS;
strcpy(config.mq_qname, "lhoste/tracking/test");
config.mq_pool_size = 2;
config.n_mq_uris = 2;
config.mq_uri = (char **) malloc(2 * sizeof(char**));
......
......@@ -93,6 +93,7 @@ const char *MQ_WorkerInit(void **priv);
const char *MQ_Send(void *priv, const char *data, unsigned len);
const char *MQ_Version(void *priv, char *version);
const char *MQ_ClientID(void *priv, char *clientID);
const char *MQ_Reconnect(void **priv);
const char *MQ_WorkerShutdown(void **priv);
const char *MQ_GlobalShutdown(void);
......@@ -140,8 +141,10 @@ struct data_reader_stats_s {
unsigned open;
unsigned sent; /* Sent successfully to MQ */
unsigned failed; /* MQ send fails */
unsigned reconnects;
unsigned occ_hi; /* Occupancy high water mark */
unsigned occ_hi_this; /* Occupancy high water mark this reporting interval*/
unsigned occ_hi_this; /* Occupancy high water mark
this reporting interval*/
};
struct datatable_s {
......@@ -294,7 +297,6 @@ struct config {
unsigned n_mq_uris;
char **mq_uri;
char mq_qname[BUFSIZ];
unsigned mq_pool_size;
unsigned nworkers;
unsigned restarts;
char user_name[BUFSIZ];
......@@ -334,6 +336,8 @@ typedef enum {
STATS_SENT,
/* Failed to send record to MQ */
STATS_FAILED,
/* Reconnected to MQ */
STATS_RECONNECT,
/* ReqStart seen, finished reading record from SHM log */
STATS_DONE,
/* Update occupancy high water mark */
......
......@@ -79,6 +79,7 @@ struct worker_data_s {
unsigned waits;
unsigned sends;
unsigned fails;
unsigned reconnects;
};
typedef struct worker_data_s worker_data_t;
......@@ -93,8 +94,28 @@ static thread_data_t *thread_data;
static pthread_mutex_t running_lock;
static void
wrk_log_connection(void *amq_worker, unsigned id)
{
const char *err;
char version[VERSION_LEN], clientID[CLIENT_ID_LEN];
err = MQ_Version(amq_worker, version);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ version", id, err);
version[0] = '\0';
}
err = MQ_ClientID(amq_worker, clientID);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ client ID", id, err);
clientID[0] = '\0';
}
LOG_Log(LOG_INFO, "Worker %d: connected (%s, id = %s)", id, version,
clientID);
}
static inline void
wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
{
const char *err;
......@@ -103,16 +124,35 @@ wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
AN(amq_worker);
/* XXX: report entry->incomplete to backend ? */
err = MQ_Send(amq_worker, entry->data, entry->end);
err = MQ_Send(*amq_worker, entry->data, entry->end);
if (err != NULL) {
/* XXX: error recovery? reconnect? preserve the data? */
wrk->fails++;
LOG_Log(LOG_ALERT, "Worker %d: Failed to send data: %s", wrk->id, err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED);
LOG_Log(LOG_INFO, "Worker %d: Reconnecting", wrk->id);
err = MQ_Reconnect(amq_worker);
if (err != NULL) {
amq_worker = NULL;
LOG_Log(LOG_ALERT, "Worker %d: Reconnect failed (%s)", wrk->id,
err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
entry->end, entry->data);
}
else {
wrk->reconnects++;
wrk_log_connection(*amq_worker, wrk->id);
MON_StatsUpdate(STATS_RECONNECT);
err = MQ_Send(*amq_worker, entry->data, entry->end);
if (err != NULL) {
wrk->fails++;
LOG_Log(LOG_ALERT,
"Worker %d: Failed to send data after reconnect: %s",
wrk->id, err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED);
}
}
}
else {
if (err == NULL) {
wrk->sends++;
MON_StatsUpdate(STATS_SENT);
LOG_Log(LOG_DEBUG, "Worker %d: Successfully sent data [%.*s]", wrk->id,
......@@ -138,7 +178,6 @@ static void
void *amq_worker;
dataentry *entry;
const char *err;
char version[VERSION_LEN], clientID[CLIENT_ID_LEN];
LOG_Log(LOG_INFO, "Worker %d: starting", wrk->id);
CHECK_OBJ_NOTNULL(wrk, WORKER_DATA_MAGIC);
......@@ -153,18 +192,7 @@ static void
pthread_exit((void *) wrk);
}
err = MQ_Version(amq_worker, version);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ version", wrk->id, err);
version[0] = '\0';
}
err = MQ_ClientID(amq_worker, clientID);
if (err != NULL) {
LOG_Log(LOG_ERR, "Worker %d: Failed to get MQ client ID", wrk->id, err);
clientID[0] = '\0';
}
wrk_log_connection(amq_worker, wrk->id);
VSTAILQ_INIT(&wrk->wrk_freelist);
wrk->wrk_nfree = 0;
......@@ -173,15 +201,14 @@ static void
running++;
AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: running (%s, id = %s)", wrk->id, version,
clientID);
while (run) {
entry = SPMCQ_Deq();
if (entry != NULL) {
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
wrk_send(&amq_worker, entry, wrk);
if (amq_worker == NULL)
break;
if (!SPMCQ_StopWorker(running))
continue;
}
......@@ -220,21 +247,28 @@ static void
}
wrk->state = WRK_SHUTTINGDOWN;
/* Prepare to exit, drain the queue */
while ((entry = SPMCQ_Deq()) != NULL) {
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
}
wrk->status = EXIT_SUCCESS;
err = MQ_WorkerShutdown(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s",
wrk->id, err);
wrk->status = EXIT_FAILURE;
if (amq_worker != NULL) {
/* Prepare to exit, drain the queue */
while ((entry = SPMCQ_Deq()) != NULL) {
wrk->deqs++;
wrk_send(&amq_worker, entry, wrk);
}
wrk->status = EXIT_SUCCESS;
err = MQ_WorkerShutdown(&amq_worker);
if (err != NULL) {
LOG_Log(LOG_ALERT, "Worker %d: MQ worker shutdown failed: %s",
wrk->id, err);
wrk->status = EXIT_FAILURE;
}
}
else
wrk->status = EXIT_FAILURE;
AZ(pthread_mutex_lock(&running_lock));
running--;
AZ(pthread_mutex_unlock(&running_lock));
LOG_Log(LOG_INFO, "Worker %d: exiting", wrk->id);
wrk->state = WRK_EXITED;
pthread_exit((void *) wrk);
......@@ -278,7 +312,7 @@ WRK_Init(void)
worker_data_t *wrk = thread_data[i].wrk_data;
wrk->magic = WORKER_DATA_MAGIC;
wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = 0;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects = 0;
wrk->state = WRK_NOTSTARTED;
}
......@@ -315,9 +349,10 @@ WRK_Stats(void)
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO, "Worker %d (%s): seen=%d waits=%d sent=%d failed=%d",
LOG_Log(LOG_INFO,
"Worker %d (%s): seen=%d waits=%d sent=%d reconnects=%d failed=%d",
wrk->id, statename[wrk->state], wrk->deqs, wrk->waits, wrk->sends,
wrk->fails);
wrk->reconnects, wrk->fails);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment