Commit ddf374ae authored by Geoff Simmons's avatar Geoff Simmons

the reader thread's idle pause is periodically adjusted to match

the VSL read rate
parent ce738522
...@@ -87,6 +87,8 @@ ...@@ -87,6 +87,8 @@
#define DISPATCH_FLUSH 12 #define DISPATCH_FLUSH 12
#define DISPATCH_WRK_ABANDONED 13 #define DISPATCH_WRK_ABANDONED 13
#define MAX_IDLE_PAUSE 0.01
const char *version = PACKAGE_TARNAME "-" PACKAGE_VERSION " revision " \ const char *version = PACKAGE_TARNAME "-" PACKAGE_VERSION " revision " \
VCS_Version " branch " VCS_Branch; VCS_Version " branch " VCS_Branch;
...@@ -97,6 +99,8 @@ static unsigned long seen = 0, submitted = 0, len_overflows = 0, no_data = 0, ...@@ -97,6 +99,8 @@ static unsigned long seen = 0, submitted = 0, len_overflows = 0, no_data = 0,
ioerr = 0, reacquire = 0, truncated = 0, key_hi = 0, key_overflows = 0, ioerr = 0, reacquire = 0, truncated = 0, key_hi = 0, key_overflows = 0,
no_free_chunk = 0, eol = 0; no_free_chunk = 0, eol = 0;
static double idle_pause = MAX_IDLE_PAUSE;
static volatile sig_atomic_t flush = 0, term = 0; static volatile sig_atomic_t flush = 0, term = 0;
static struct sigaction terminate_action, dump_action, flush_action; static struct sigaction terminate_action, dump_action, flush_action;
...@@ -114,14 +118,15 @@ void ...@@ -114,14 +118,15 @@ void
RDR_Stats(void) RDR_Stats(void)
{ {
LOG_Log(LOG_INFO, "Reader: seen=%lu submitted=%lu nodata=%lu eol=%lu " LOG_Log(LOG_INFO, "Reader: seen=%lu submitted=%lu nodata=%lu eol=%lu "
"idle_pause=%.09f "
"free_rec=%u free_chunk=%u no_free_rec=%lu no_free_chunk=%lu " "free_rec=%u free_chunk=%u no_free_rec=%lu no_free_chunk=%lu "
"len_hi=%u key_hi=%lu len_overflows=%lu truncated=%lu " "len_hi=%u key_hi=%lu len_overflows=%lu truncated=%lu "
"key_overflows=%lu vcl_log_err=%lu vsl_err=%lu closed=%lu " "key_overflows=%lu vcl_log_err=%lu vsl_err=%lu closed=%lu "
"overrun=%lu ioerr=%lu reacquire=%lu", "overrun=%lu ioerr=%lu reacquire=%lu",
seen, submitted, no_data, eol, rdr_rec_free, rdr_chunk_free, seen, submitted, no_data, eol, idle_pause, rdr_rec_free,
no_free_data, no_free_chunk, len_hi, key_hi, len_overflows, rdr_chunk_free, no_free_data, no_free_chunk, len_hi, key_hi,
truncated, key_overflows, vcl_log_err, vsl_errs, closed, overrun, len_overflows, truncated, key_overflows, vcl_log_err, vsl_errs,
ioerr, reacquire); closed, overrun, ioerr, reacquire);
} }
int int
...@@ -596,6 +601,8 @@ CHILD_Main(int readconfig) ...@@ -596,6 +601,8 @@ CHILD_Main(int readconfig)
struct VSLQ *vslq; struct VSLQ *vslq;
struct VSM_data *vsm = NULL; struct VSM_data *vsm = NULL;
struct VSL_cursor *cursor; struct VSL_cursor *cursor;
unsigned long last_seen = 0;
double last_t;
MON_StatsInit(); MON_StatsInit();
debug = (LOG_GetLevel() == LOG_DEBUG); debug = (LOG_GetLevel() == LOG_DEBUG);
...@@ -788,6 +795,7 @@ CHILD_Main(int readconfig) ...@@ -788,6 +795,7 @@ CHILD_Main(int readconfig)
LOG_Log0(LOG_INFO, "Worker threads not running"); LOG_Log0(LOG_INFO, "Worker threads not running");
/* Main loop */ /* Main loop */
last_t = VTIM_mono();
term = 0; term = 0;
while (!term) { while (!term) {
status = VSLQ_Dispatch(vslq, dispatch, NULL); status = VSLQ_Dispatch(vslq, dispatch, NULL);
...@@ -799,7 +807,18 @@ CHILD_Main(int readconfig) ...@@ -799,7 +807,18 @@ CHILD_Main(int readconfig)
case DISPATCH_EOL: case DISPATCH_EOL:
take_free(); take_free();
eol++; eol++;
VTIM_sleep(config.idle_pause); /* re-adjust idle pause every 1024 seen txn */
if (seen >> 10 > last_seen) {
double t = VTIM_mono();
idle_pause = (t - last_t) / (double) (seen - (last_seen << 10));
last_seen = seen >> 10;
if (idle_pause > MAX_IDLE_PAUSE)
idle_pause = MAX_IDLE_PAUSE;
if (idle_pause < 1e-9)
idle_pause = 1e-9;
last_t = t;
}
VTIM_sleep(idle_pause);
break; break;
case DISPATCH_TERMINATE: case DISPATCH_TERMINATE:
AN(term); AN(term);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment