Commit 374f355d authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: implemented MQ sends in worker threads, submits in main

Updated monitoring stats for MQ sends or fails, mutexing stats updates
parent 693abdec
......@@ -41,7 +41,7 @@
#include "libvarnish.h"
static const char *level2name[LOG_DEBUG];
static const char *level2name[LOG_DEBUG+1];
static void
syslog_setlevel(int level)
......
......@@ -36,6 +36,7 @@
#include <pthread.h>
#include "trackrdrd.h"
#include "vas.h"
void
*MON_StatusThread(void *arg)
......@@ -65,9 +66,50 @@ void
}
LOG_Log(LOG_INFO,
"Data table: len=%d collisions=%d insert_probes=%d find_probes=%d "
"open=%d done=%d load=%.2f occ_hi=%d seen=%d submitted=%d data_hi=%d",
"open=%d done=%d load=%.2f occ_hi=%d seen=%d submitted=%d "
"sent=%d failed=%d wait_qfull=%d data_hi=%d",
tbl.len, tbl.collisions, tbl.insert_probes, tbl.find_probes,
tbl.open, tbl.done, 100.0 * ((float) tbl.open + tbl.done) / tbl.len,
tbl.occ_hi, tbl.seen, tbl.submitted, tbl.data_hi);
tbl.occ_hi, tbl.seen, tbl.submitted, tbl.sent, tbl.failed,
tbl.wait_qfull, tbl.data_hi);
}
}
void
MON_StatsInit(void)
{
AZ(pthread_mutex_init(&stats_update_lock, NULL));
}
void
MON_StatsUpdate(stats_update_t update)
{
AZ(pthread_mutex_lock(&stats_update_lock));
switch(update) {
case STATS_SENT:
tbl.sent++;
tbl.done--;
break;
case STATS_FAILED:
tbl.failed++;
tbl.done--;
break;
case STATS_DONE:
tbl.done++;
tbl.open--;
break;
case STATS_OCCUPANCY:
if (tbl.open + tbl.done > tbl.occ_hi)
tbl.occ_hi = tbl.open + tbl.done;
break;
default:
/* Unreachable */
AN(NULL);
}
AZ(pthread_mutex_unlock(&stats_update_lock));
}
......@@ -55,4 +55,5 @@ test_worker_LDADD = \
../spmcq.$(OBJEXT) \
../amq.$(OBJEXT) \
../data.$(OBJEXT) \
../monitor.$(OBJEXT) \
@AMQ_LIBS@
......@@ -69,7 +69,6 @@
/*--------------------------------------------------------------------*/
/* XXX: Temporary, for testing */
static void
submit(unsigned xid)
{
......@@ -79,9 +78,14 @@ submit(unsigned xid)
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", entry->end, entry->data);
tbl.done--;
/* XXX: Termination */
while (SPMCQ_Enq((void *) entry) == NULL) {
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
AZ(pthread_mutex_lock(&spmcq_nonfull_lock));
AZ(pthread_cond_wait(&spmcq_nonfull_cond, &spmcq_nonempty_lock));
}
AZ(pthread_cond_broadcast(&spmcq_nonempty_cond));
tbl.submitted++;
entry->state = DATA_EMPTY;
}
static void
......@@ -91,6 +95,33 @@ sigusr1(int sig)
signal(sig, sigusr1);
}
static dataentry
*insert(unsigned xid, enum VSL_tag_e tag, unsigned fd)
{
dataentry *entry;
entry = DATA_Insert(xid);
if (entry == NULL) {
LOG_Log(LOG_ALERT,
"%s: Cannot insert data, XID=%d tid=%d DISCARDED",
VSL_tags[tag], xid, fd);
return NULL;
}
CHECK_OBJ(entry, DATA_MAGIC);
entry->state = DATA_OPEN;
entry->xid = xid;
entry->tid = fd;
sprintf(entry->data, "XID=%d", xid);
entry->end = strlen(entry->data);
if (entry->end > tbl.data_hi)
tbl.data_hi = entry->end;
tbl.open++;
MON_StatsUpdate(STATS_OCCUPANCY);
return entry;
}
static int
OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
unsigned spec, const char *ptr, uint64_t bitmap)
......@@ -115,25 +146,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
LOG_Log(LOG_DEBUG, "%s: XID=%d", VSL_tags[tag], xid);
tbl.seen++;
entry = DATA_Insert(xid);
if (entry == NULL) {
LOG_Log(LOG_ALERT,
"%s: Cannot insert data, XID=%d tid=%d DISCARDED",
VSL_tags[tag], xid, fd);
break;
}
CHECK_OBJ(entry, DATA_MAGIC);
entry->state = DATA_OPEN;
entry->xid = xid;
entry->tid = fd;
sprintf(entry->data, "XID=%d", xid);
entry->end = strlen(entry->data);
if (entry->end > tbl.data_hi)
tbl.data_hi = entry->end;
tbl.open++;
if (tbl.open + tbl.done > tbl.occ_hi)
tbl.occ_hi = tbl.open + tbl.done;
(void) insert(xid, tag, fd);
break;
case SLT_VCL_Log:
......@@ -151,10 +164,19 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
/* assert((hash(XID) exists) && hash(XID).tid == fd
&& !hash(XID).done); */
entry = DATA_Find(xid);
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->xid == xid);
assert(entry->tid == fd);
assert(entry->state == DATA_OPEN);
if (entry == NULL) {
LOG_Log(LOG_WARNING, "%s: XID %d not found, attempting insert",
VSL_tags[tag], xid);
entry = insert(xid, tag, fd);
if (entry == NULL)
break;
}
else {
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->xid == xid);
assert(entry->tid == fd);
assert(entry->state == DATA_OPEN);
}
/* Data overflow */
/* XXX: Encapsulate (1 << (config.maxdata_scale+10)) */
......@@ -189,8 +211,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
/*hash(XID).done = TRUE;*/
entry->state = DATA_DONE;
tbl.done++;
tbl.open--;
MON_StatsUpdate(STATS_DONE);
submit(xid);
break;
......@@ -214,9 +235,9 @@ usage(int status)
int
main(int argc, char * const *argv)
{
int c, d_flag = 0;
int c, d_flag = 0, err;
const char *P_arg = NULL, *l_arg = NULL, *n_arg = NULL, *f_arg = NULL,
*y_arg = NULL, *c_arg = NULL;
*y_arg = NULL, *c_arg = NULL, *errmsg;
struct vpf_fh *pfh = NULL;
struct VSM_data *vd;
pthread_t monitor;
......@@ -361,14 +382,43 @@ main(int argc, char * const *argv)
else
LOG_Log0(LOG_INFO, "Monitoring thread not running");
errmsg = MQ_GlobalInit();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize queueing: %s", errmsg);
exit(EXIT_FAILURE);
}
err = WRK_Init();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot prepare worker threads: %s",
strerror(err));
exit(EXIT_FAILURE);
}
if ((err = SPMCQ_Init()) != 0) {
LOG_Log(LOG_ERR, "Cannot initialize internal worker queue: %s",
strerror(err));
exit(EXIT_FAILURE);
}
MON_StatsInit();
/* Start worker threads */
WRK_Start();
/* Main loop */
while (VSL_Dispatch(vd, OSL_Track, NULL) >= 0)
;
WRK_Halt();
WRK_Shutdown();
AZ(MQ_GlobalShutdown());
/* XXX: Parent removes PID */
if (pfh != NULL)
VPF_Remove(pfh);
LOG_Log0(LOG_INFO, "exiting");
LOG_Close();
exit(EXIT_SUCCESS);
}
......@@ -109,6 +109,7 @@ typedef struct {
unsigned submitted; /* Records submitted */
unsigned sent; /* Records sent to MQ */
unsigned failed; /* MQ send fails */
unsigned wait_qfull; /* Waits for SPMCQ */
unsigned occ_hi; /* Occupancy high water mark */
unsigned data_hi; /* Data high water mark */
dataentry *entry;
......@@ -168,7 +169,23 @@ int LOG_Open(const char *progname);
#define LOG_Close() logconf.close()
/* monitor.c */
typedef enum {
/* Record sent successfully to MQ */
STATS_SENT,
/* Failed to send record to MQ */
STATS_FAILED,
/* ReqStart seen, finished reading record from SHM log */
STATS_DONE,
/* Update occupancy high water mark */
STATS_OCCUPANCY,
} stats_update_t;
void *MON_StatusThread(void *arg);
void MON_StatsInit(void);
void MON_StatsUpdate(stats_update_t update);
/* Mutex for multi-threaded stats updates. */
pthread_mutex_t stats_update_lock;
/* parse.c */
int Parse_XID(const char *str, int len, unsigned *xid);
......
......@@ -37,6 +37,7 @@
#include "trackrdrd.h"
#include "vas.h"
#include "miniobj.h"
#include "vmb.h"
typedef struct {
unsigned magic;
......@@ -68,11 +69,14 @@ wrk_send(void *amq_worker, dataentry *entry, unsigned id)
LOG_Log(LOG_ALERT, "Worker %d: Failed to send data: %s", id, err);
LOG_Log(LOG_ERR, "Worker %d: Data DISCARDED [%.*s]", id, entry->end,
entry->data);
tbl.failed++;
MON_StatsUpdate(STATS_FAILED);
}
else
tbl.sent++;
entry->state = DATA_OPEN;
MON_StatsUpdate(STATS_SENT);
entry->state = DATA_EMPTY;
/* From Varnish vmb.h -- platform-independent write memory barrier */
VWMB();
AZ(pthread_cond_signal(&spmcq_nonfull_cond));
}
static void
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment