Commit 1d1b593f authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: do not submit empty data records to worker threads

parent 5384a036
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <string.h> #include <string.h>
#include <stdint.h> #include <stdint.h>
#include <limits.h> #include <limits.h>
#include <stdbool.h>
#include "libvarnish.h" #include "libvarnish.h"
#include "miniobj.h" #include "miniobj.h"
...@@ -144,13 +145,14 @@ DATA_Init(void) ...@@ -144,13 +145,14 @@ DATA_Init(void)
datatable init_tbl = datatable init_tbl =
{ .magic = DATATABLE_MAGIC, .len = entries, .collisions = 0, { .magic = DATATABLE_MAGIC, .len = entries, .collisions = 0,
.insert_probes = 0, .find_probes = 0, .seen = 0, .open = 0, .done = 0, .insert_probes = 0, .find_probes = 0, .seen = 0, .open = 0, .done = 0,
.len_overflows = 0, .data_overflows = 0, .submitted = 0, .occ_hi = 0, .nodata = 0, .len_overflows = 0, .data_overflows = 0, .submitted = 0,
.data_hi = 0, .entry = entryptr, .buf = bufptr }; .occ_hi = 0, .data_hi = 0, .entry = entryptr, .buf = bufptr };
memcpy(&tbl, &init_tbl, sizeof(datatable)); memcpy(&tbl, &init_tbl, sizeof(datatable));
for (int i = 0; i < entries; i++) { for (int i = 0; i < entries; i++) {
tbl.entry[i].magic = DATA_MAGIC; tbl.entry[i].magic = DATA_MAGIC;
tbl.entry[i].state = DATA_EMPTY; tbl.entry[i].state = DATA_EMPTY;
tbl.entry[i].hasdata = false;
tbl.entry[i].data = &tbl.buf[i * bufsize]; tbl.entry[i].data = &tbl.buf[i * bufsize];
} }
atexit(data_Cleanup); atexit(data_Cleanup);
......
...@@ -71,12 +71,13 @@ void ...@@ -71,12 +71,13 @@ void
LOG_Log(LOG_INFO, LOG_Log(LOG_INFO,
"Data table: len=%d collisions=%d insert_probes=%d find_probes=%d " "Data table: len=%d collisions=%d insert_probes=%d find_probes=%d "
"open=%d done=%d load=%.2f len_overflows=%d data_overflows=%d " "open=%d done=%d load=%.2f len_overflows=%d data_overflows=%d "
"occ_hi=%d seen=%d submitted=%d sent=%d failed=%d wait_qfull=%d " "occ_hi=%d seen=%d submitted=%d nodata=%d sent=%d failed=%d "
"data_hi=%d", "wait_qfull=%d data_hi=%d",
tbl.len, tbl.collisions, tbl.insert_probes, tbl.find_probes, tbl.len, tbl.collisions, tbl.insert_probes, tbl.find_probes,
tbl.open, tbl.done, 100.0 * ((float) tbl.open + tbl.done) / tbl.len, tbl.open, tbl.done, 100.0 * ((float) tbl.open + tbl.done) / tbl.len,
tbl.len_overflows, tbl.data_overflows, tbl.occ_hi, tbl.seen, tbl.len_overflows, tbl.data_overflows, tbl.occ_hi, tbl.seen,
tbl.submitted, tbl.sent, tbl.failed, tbl.wait_qfull, tbl.data_hi); tbl.submitted, tbl.nodata, tbl.sent, tbl.failed, tbl.wait_qfull,
tbl.data_hi);
WRK_Stats(); WRK_Stats();
} }
...@@ -123,6 +124,11 @@ MON_StatsUpdate(stats_update_t update) ...@@ -123,6 +124,11 @@ MON_StatsUpdate(stats_update_t update)
if (tbl.open + tbl.done > tbl.occ_hi) if (tbl.open + tbl.done > tbl.occ_hi)
tbl.occ_hi = tbl.open + tbl.done; tbl.occ_hi = tbl.open + tbl.done;
break; break;
case STATS_NODATA:
tbl.nodata++;
tbl.done--;
break;
default: default:
/* Unreachable */ /* Unreachable */
......
...@@ -112,6 +112,12 @@ submit(unsigned xid) ...@@ -112,6 +112,12 @@ submit(unsigned xid)
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC); CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->state == DATA_DONE); assert(entry->state == DATA_DONE);
LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", entry->end, entry->data); LOG_Log(LOG_DEBUG, "submit: data=[%.*s]", entry->end, entry->data);
if (! entry->hasdata) {
entry->state = DATA_EMPTY;
MON_StatsUpdate(STATS_NODATA);
return;
}
while (!SPMCQ_Enq((void *) entry)) { while (!SPMCQ_Enq((void *) entry)) {
tbl.wait_qfull++; tbl.wait_qfull++;
LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue"); LOG_Log(LOG_ALERT, "%s", "Internal queue full, waiting for dequeue");
...@@ -139,6 +145,7 @@ static inline dataentry ...@@ -139,6 +145,7 @@ static inline dataentry
entry->state = DATA_OPEN; entry->state = DATA_OPEN;
entry->xid = xid; entry->xid = xid;
entry->tid = fd; entry->tid = fd;
entry->hasdata = false;
sprintf(entry->data, "XID=%d", xid); sprintf(entry->data, "XID=%d", xid);
entry->end = strlen(entry->data); entry->end = strlen(entry->data);
if (entry->end > tbl.data_hi) if (entry->end > tbl.data_hi)
...@@ -246,6 +253,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len, ...@@ -246,6 +253,7 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
break; break;
append(entry, tag, xid, data, datalen); append(entry, tag, xid, data, datalen);
entry->hasdata = true;
break; break;
case SLT_ReqEnd: case SLT_ReqEnd:
......
...@@ -106,6 +106,7 @@ typedef struct { ...@@ -106,6 +106,7 @@ typedef struct {
unsigned xid; unsigned xid;
unsigned tid; /* 'Thread ID', fd in the callback */ unsigned tid; /* 'Thread ID', fd in the callback */
unsigned end; /* End of string index in data */ unsigned end; /* End of string index in data */
bool hasdata;
char *data; char *data;
} dataentry; } dataentry;
...@@ -121,8 +122,9 @@ typedef struct { ...@@ -121,8 +122,9 @@ typedef struct {
unsigned done; unsigned done;
unsigned len_overflows; unsigned len_overflows;
unsigned data_overflows; unsigned data_overflows;
unsigned submitted; /* Records submitted */ unsigned submitted; /* Submitted to worker threads */
unsigned sent; /* Records sent to MQ */ unsigned nodata; /* Not submitted, no data */
unsigned sent; /* Sent successfully to MQ */
unsigned failed; /* MQ send fails */ unsigned failed; /* MQ send fails */
unsigned wait_qfull; /* Waits for SPMCQ */ unsigned wait_qfull; /* Waits for SPMCQ */
unsigned occ_hi; /* Occupancy high water mark */ unsigned occ_hi; /* Occupancy high water mark */
...@@ -197,6 +199,8 @@ typedef enum { ...@@ -197,6 +199,8 @@ typedef enum {
STATS_DONE, STATS_DONE,
/* Update occupancy high water mark */ /* Update occupancy high water mark */
STATS_OCCUPANCY, STATS_OCCUPANCY,
/* ReqEnd seen, no data in the record */
STATS_NODATA,
} stats_update_t; } stats_update_t;
void *MON_StatusThread(void *arg); void *MON_StatusThread(void *arg);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment