Commit 31a66a33 authored by Geoff Simmons's avatar Geoff Simmons

monitor emits stats about bytes sent by worker threads

parent d746fc90
......@@ -411,7 +411,7 @@ dispatch(struct VSL_data *vsl, struct VSL_transaction * const pt[], void *priv)
(unsigned) de->reqend_t.tv_sec, de->reqend_t.tv_usec);
append(de, SLT_Timestamp, de->xid, reqend_str, REQEND_T_LEN - 1);
de->occupied = 1;
MON_StatsUpdate(STATS_OCCUPANCY);
MON_StatsUpdate(STATS_OCCUPANCY, 0);
data_submit(de);
if (term)
......
......@@ -41,13 +41,14 @@
static int run;
static pthread_mutex_t mutex;
static unsigned occ;
static unsigned long sent; /* Sent successfully to MQ */
static unsigned long failed; /* MQ send fails */
static unsigned long reconnects; /* Reconnects to MQ */
static unsigned long restarts; /* Worker thread restarts */
static unsigned occ_hi; /* Occupancy high water mark */
static unsigned occ_hi_this; /* Occupancy high water mark
static unsigned occ = 0;
static unsigned long sent = 0; /* Sent successfully to MQ */
static unsigned long bytes = 0; /* Total bytes successfully sent */
static unsigned long failed = 0; /* MQ send fails */
static unsigned long reconnects = 0; /* Reconnects to MQ */
static unsigned long restarts = 0; /* Worker thread restarts */
static unsigned occ_hi = 0; /* Occupancy high water mark */
static unsigned occ_hi_this = 0;/* Occupancy high water mark
this reporting interval*/
static void
......@@ -75,9 +76,9 @@ log_output(void)
/* XXX: seen, bytes sent */
LOG_Log(LOG_INFO, "Workers: active=%d running=%d waiting=%d running_hi=%d "
"exited=%d abandoned=%u reconnects=%lu restarts=%lu sent=%lu "
"failed=%lu",
"failed=%lu bytes=%lu",
wrk_active, wrk_running, spmcq_datawaiter, wrk_running_hi,
WRK_Exited(), abandoned, reconnects, restarts, sent, failed);
WRK_Exited(), abandoned, reconnects, restarts, sent, failed, bytes);
/* locking would be overkill */
occ_hi_this = 0;
......@@ -153,13 +154,14 @@ MON_StatsInit(void)
}
void
MON_StatsUpdate(stats_update_t update)
MON_StatsUpdate(stats_update_t update, unsigned n)
{
AZ(pthread_mutex_lock(&mutex));
switch(update) {
case STATS_SENT:
sent++;
bytes += n;
occ--;
break;
......
......@@ -53,9 +53,10 @@ static void *mqh;
/* Called from worker.c, but we don't want to pull in all of monitor.c's
dependecies. */
void
MON_StatsUpdate(stats_update_t update)
MON_StatsUpdate(stats_update_t update, unsigned n)
{
(void) update;
(void) n;
}
static void
......
......@@ -312,7 +312,7 @@ void *MON_StatusThread(void *arg);
void MON_Output(void);
void MON_StatusShutdown(pthread_t monitor);
void MON_StatsInit(void);
void MON_StatsUpdate(stats_update_t update);
void MON_StatsUpdate(stats_update_t update, unsigned n);
/* parse.c */
......
......@@ -78,13 +78,14 @@ struct worker_data_s {
unsigned wrk_nfree;
/* stats */
unsigned deqs;
unsigned waits;
unsigned sends;
unsigned fails;
unsigned recoverables;
unsigned reconnects;
unsigned restarts;
unsigned long deqs;
unsigned long waits;
unsigned long sends;
unsigned long bytes;
unsigned long fails;
unsigned long recoverables;
unsigned long reconnects;
unsigned long restarts;
};
typedef struct worker_data_s worker_data_t;
......@@ -137,7 +138,7 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
wrk->id, err);
if (errnum > 0) {
wrk->recoverables++;
MON_StatsUpdate(STATS_FAILED);
MON_StatsUpdate(STATS_FAILED, 0);
}
else {
/* Non-recoverable error */
......@@ -149,12 +150,12 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", wrk->id,
entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED);
MON_StatsUpdate(STATS_FAILED, 0);
}
else {
wrk->reconnects++;
wrk_log_connection(*mq_worker, wrk->id);
MON_StatsUpdate(STATS_RECONNECT);
MON_StatsUpdate(STATS_RECONNECT, 0);
errnum = mqf.send(*mq_worker, entry->data, entry->end,
entry->key, entry->keylen, &err);
if (errnum != 0) {
......@@ -162,7 +163,7 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
"after reconnect: %s", wrk->id, err);
if (errnum > 0) {
wrk->recoverables++;
MON_StatsUpdate(STATS_FAILED);
MON_StatsUpdate(STATS_FAILED, 0);
}
else {
/* Fail after reconnect, give up */
......@@ -170,7 +171,7 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
wrk->status = EXIT_FAILURE;
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]",
wrk->id, entry->end, entry->data);
MON_StatsUpdate(STATS_FAILED);
MON_StatsUpdate(STATS_FAILED, 0);
}
}
}
......@@ -178,7 +179,8 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
}
if (errnum == 0) {
wrk->sends++;
MON_StatsUpdate(STATS_SENT);
wrk->bytes += entry->end;
MON_StatsUpdate(STATS_SENT, entry->end);
LOG_Log(LOG_DEBUG, "Worker %d: Successfully sent data [%.*s]", wrk->id,
entry->end, entry->data);
}
......@@ -335,7 +337,7 @@ WRK_Init(void)
wrk->magic = WORKER_DATA_MAGIC;
wrk->id = i + 1;
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= wrk->restarts = wrk->recoverables = 0;
= wrk->restarts = wrk->recoverables = wrk->bytes = 0;
wrk->state = WRK_NOTSTARTED;
}
......@@ -384,7 +386,7 @@ WRK_Restart(void)
wrk->deqs = wrk->waits = wrk->sends = wrk->fails = wrk->reconnects
= 0;
wrk->restarts++;
MON_StatsUpdate(STATS_RESTART);
MON_StatsUpdate(STATS_RESTART, 0);
wrk->state = WRK_NOTSTARTED;
if (pthread_create(&thread_data[i].worker, NULL, wrk_main, wrk)
!= 0) {
......@@ -410,11 +412,11 @@ WRK_Stats(void)
for (int i = 0; i < config.nworkers; i++) {
wrk = thread_data[i].wrk_data;
LOG_Log(LOG_INFO,
"Worker %d (%s): seen=%u waits=%u sent=%u reconnects=%u "
"restarts=%u failed_recoverable=%u failed=%u",
"Worker %d (%s): seen=%lu waits=%lu sent=%lu bytes=%lu "
"reconnects=%lu restarts=%lu failed_recoverable=%lu failed=%lu",
wrk->id, statename[wrk->state], wrk->deqs, wrk->waits,
wrk->sends, wrk->reconnects, wrk->restarts, wrk->recoverables,
wrk->fails);
wrk->sends, wrk->bytes, wrk->reconnects, wrk->restarts,
wrk->recoverables, wrk->fails);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment