Commit 70b07983 authored by Geoff Simmons's avatar Geoff Simmons

add the config parameter reader.timeout -- the reader may wait for

a limited time for data buffers to become free
parent 6ae529cd
...@@ -128,6 +128,16 @@ conf_getDouble(const char *rval, double *d) ...@@ -128,6 +128,16 @@ conf_getDouble(const char *rval, double *d)
return(0); \ return(0); \
} }
#define confDouble(name,fld) \
if (strcmp(lval, name) == 0) { \
double d; \
int err = conf_getDouble(rval, &d); \
if (err != 0) \
return err; \
config.fld = d; \
return(0); \
}
int int
CONF_Add(const char *lval, const char *rval) CONF_Add(const char *lval, const char *rval)
{ {
...@@ -150,6 +160,9 @@ CONF_Add(const char *lval, const char *rval) ...@@ -150,6 +160,9 @@ CONF_Add(const char *lval, const char *rval)
confUnsigned("output.bufsiz", output_bufsiz); confUnsigned("output.bufsiz", output_bufsiz);
confUnsigned("append", append); confUnsigned("append", append);
confDouble("output.timeout", output_timeout);
confDouble("reader.timeout", reader_timeout);
if (strcmp(lval, "syslog.facility") == 0) { if (strcmp(lval, "syslog.facility") == 0) {
if ((ret = conf_getFacility(rval)) < 0) if ((ret = conf_getFacility(rval)) < 0)
return EINVAL; return EINVAL;
...@@ -160,13 +173,6 @@ CONF_Add(const char *lval, const char *rval) ...@@ -160,13 +173,6 @@ CONF_Add(const char *lval, const char *rval)
return(0); return(0);
} }
if (strcmp(lval, "output.timeout") == 0) {
int err = conf_getDouble(rval, &config.output_timeout);
if (err != 0)
return err;
return(0);
}
return EINVAL; return EINVAL;
} }
...@@ -226,6 +232,7 @@ CONF_Init(void) ...@@ -226,6 +232,7 @@ CONF_Init(void)
config.append = 0; config.append = 0;
config.output_timeout = 0.; config.output_timeout = 0.;
config.reader_timeout = 0.;
} }
static int static int
...@@ -323,6 +330,7 @@ CONF_Dump(void) ...@@ -323,6 +330,7 @@ CONF_Dump(void)
EMPTY(config.output_file) ? "stdout" : config.output_file); EMPTY(config.output_file) ? "stdout" : config.output_file);
confdump("append = %u", config.append); confdump("append = %u", config.append);
confdump("output.timeout = %f", config.output_timeout); confdump("output.timeout = %f", config.output_timeout);
confdump("reader.timeout = %f", config.reader_timeout);
confdump("cformat = %s", VSB_data(config.cformat)); confdump("cformat = %s", VSB_data(config.cformat));
confdump("bformat = %s", VSB_data(config.bformat)); confdump("bformat = %s", VSB_data(config.bformat));
confdump("rformat = %s", VSB_data(config.rformat)); confdump("rformat = %s", VSB_data(config.rformat));
......
...@@ -50,6 +50,12 @@ RDR_Depleted(void) ...@@ -50,6 +50,12 @@ RDR_Depleted(void)
return 0; return 0;
} }
int
RDR_Waiting(void)
{
return 0;
}
static char static char
*test_timeout(void) *test_timeout(void)
{ {
......
...@@ -92,10 +92,10 @@ const char *version = PACKAGE_TARNAME "-" PACKAGE_VERSION " revision " \ ...@@ -92,10 +92,10 @@ const char *version = PACKAGE_TARNAME "-" PACKAGE_VERSION " revision " \
VCS_Version " branch " VCS_Branch; VCS_Version " branch " VCS_Branch;
static unsigned len_hi = 0, closed = 0, overrun = 0, ioerr = 0, reacquire = 0, static unsigned len_hi = 0, closed = 0, overrun = 0, ioerr = 0, reacquire = 0,
tx_thresh, rec_thresh, chunk_thresh; tx_thresh, rec_thresh, chunk_thresh, waiting = 0;
static unsigned long seen = 0, submitted = 0, len_overflows = 0, no_free_tx = 0, static unsigned long seen = 0, submitted = 0, len_overflows = 0, no_free_tx = 0,
no_free_rec = 0, no_free_chunk = 0, eol = 0; no_free_rec = 0, no_free_chunk = 0, eol = 0, waits = 0;
/* Hack, because we cannot have #ifdef in the macro definition SIGDISP */ /* Hack, because we cannot have #ifdef in the macro definition SIGDISP */
#define _UNDEFINED(SIG) ((#SIG)[0] == 0) #define _UNDEFINED(SIG) ((#SIG)[0] == 0)
...@@ -127,19 +127,22 @@ static unsigned rdr_tx_free = 0; ...@@ -127,19 +127,22 @@ static unsigned rdr_tx_free = 0;
static int tx_type_log[VSL_t__MAX], debug = 0; static int tx_type_log[VSL_t__MAX], debug = 0;
static char tx_type_name[VSL_t__MAX]; static char tx_type_name[VSL_t__MAX];
static const char *statename[] = { "running", "waiting" };
static double idle_pause = MAX_IDLE_PAUSE; static double idle_pause = MAX_IDLE_PAUSE;
void void
RDR_Stats(void) RDR_Stats(void)
{ {
LOG_Log(LOG_INFO, "Reader: seen=%lu submitted=%lu free_tx=%u free_rec=%u " LOG_Log(LOG_INFO, "Reader (%s): seen=%lu submitted=%lu free_tx=%u "
"free_chunk=%u no_free_tx=%lu no_free_rec=%lu no_free_chunk=%lu " "free_rec=%u free_chunk=%u no_free_tx=%lu no_free_rec=%lu "
"len_hi=%u len_overflows=%lu eol=%lu idle_pause=%.06f closed=%u " "no_free_chunk=%lu len_hi=%u len_overflows=%lu eol=%lu "
"overrun=%u ioerr=%u reacquire=%u", "idle_pause=%.06f waits=%lu closed=%u overrun=%u ioerr=%u "
seen, submitted, rdr_tx_free, rdr_rec_free, rdr_chunk_free, "reacquire=%u",
no_free_tx, no_free_rec, no_free_chunk, len_hi, len_overflows, statename[waiting], seen, submitted, rdr_tx_free, rdr_rec_free,
eol, idle_pause, closed, overrun, ioerr, reacquire); rdr_chunk_free, no_free_tx, no_free_rec, no_free_chunk, len_hi,
len_overflows, eol, idle_pause, waits, closed, overrun, ioerr,
reacquire);
} }
int int
...@@ -149,6 +152,12 @@ RDR_Depleted(void) ...@@ -149,6 +152,12 @@ RDR_Depleted(void)
|| (rdr_chunk_free < chunk_thresh); || (rdr_chunk_free < chunk_thresh);
} }
int
RDR_Waiting(void)
{
return waiting;
}
static inline void static inline void
signal_spscq_ready(void) signal_spscq_ready(void)
{ {
...@@ -159,6 +168,25 @@ signal_spscq_ready(void) ...@@ -159,6 +168,25 @@ signal_spscq_ready(void)
} }
} }
static void
data_wait(void)
{
assert(config.reader_timeout > 0.);
if (!WRT_Waiting()) {
struct timespec ts;
int ret;
AZ(pthread_mutex_lock(&data_ready_lock));
waits++;
waiting = 1;
ts = VTIM_timespec(VTIM_real() + config.reader_timeout);
ret = pthread_cond_timedwait(&data_ready_cond, &data_ready_lock, &ts);
assert(ret == 0 || ret == ETIMEDOUT);
waiting = 0;
AZ(pthread_mutex_unlock(&data_ready_lock));
}
}
static inline chunk_t static inline chunk_t
*take_chunk(void) *take_chunk(void)
{ {
...@@ -167,8 +195,14 @@ static inline chunk_t ...@@ -167,8 +195,14 @@ static inline chunk_t
if (VSTAILQ_EMPTY(&rdr_chunk_freelist)) { if (VSTAILQ_EMPTY(&rdr_chunk_freelist)) {
signal_spscq_ready(); signal_spscq_ready();
rdr_chunk_free = DATA_Take_Freechunk(&rdr_chunk_freelist); rdr_chunk_free = DATA_Take_Freechunk(&rdr_chunk_freelist);
if (VSTAILQ_EMPTY(&rdr_chunk_freelist)) if (VSTAILQ_EMPTY(&rdr_chunk_freelist)) {
return NULL; if (config.reader_timeout <= 0.)
return NULL;
data_wait();
rdr_chunk_free = DATA_Take_Freechunk(&rdr_chunk_freelist);
if (VSTAILQ_EMPTY(&rdr_chunk_freelist))
return NULL;
}
if (debug) if (debug)
LOG_Log(LOG_DEBUG, "Reader: took %u free chunks", rdr_chunk_free); LOG_Log(LOG_DEBUG, "Reader: took %u free chunks", rdr_chunk_free);
} }
...@@ -187,8 +221,14 @@ static inline rec_t ...@@ -187,8 +221,14 @@ static inline rec_t
if (VSTAILQ_EMPTY(&rdr_rec_freelist)) { if (VSTAILQ_EMPTY(&rdr_rec_freelist)) {
signal_spscq_ready(); signal_spscq_ready();
rdr_rec_free = DATA_Take_Freerec(&rdr_rec_freelist); rdr_rec_free = DATA_Take_Freerec(&rdr_rec_freelist);
if (VSTAILQ_EMPTY(&rdr_rec_freelist)) if (VSTAILQ_EMPTY(&rdr_rec_freelist)) {
return NULL; if (config.reader_timeout <= 0.)
return NULL;
data_wait();
rdr_rec_free = DATA_Take_Freerec(&rdr_rec_freelist);
if (VSTAILQ_EMPTY(&rdr_rec_freelist))
return NULL;
}
if (debug) if (debug)
LOG_Log(LOG_DEBUG, "Reader: took %u free records", rdr_rec_free); LOG_Log(LOG_DEBUG, "Reader: took %u free records", rdr_rec_free);
} }
...@@ -207,8 +247,14 @@ static inline tx_t ...@@ -207,8 +247,14 @@ static inline tx_t
if (VSTAILQ_EMPTY(&rdr_tx_freelist)) { if (VSTAILQ_EMPTY(&rdr_tx_freelist)) {
signal_spscq_ready(); signal_spscq_ready();
rdr_tx_free = DATA_Take_Freetx(&rdr_tx_freelist); rdr_tx_free = DATA_Take_Freetx(&rdr_tx_freelist);
if (VSTAILQ_EMPTY(&rdr_tx_freelist)) if (VSTAILQ_EMPTY(&rdr_tx_freelist)) {
return NULL; if (config.reader_timeout <= 0.)
return NULL;
data_wait();
rdr_tx_free = DATA_Take_Freetx(&rdr_tx_freelist);
if (VSTAILQ_EMPTY(&rdr_tx_freelist))
return NULL;
}
if (debug) if (debug)
LOG_Log(LOG_DEBUG, "Reader: took %u free tx", rdr_tx_free); LOG_Log(LOG_DEBUG, "Reader: took %u free tx", rdr_tx_free);
} }
...@@ -800,6 +846,8 @@ main(int argc, char *argv[]) ...@@ -800,6 +846,8 @@ main(int argc, char *argv[])
AZ(pthread_cond_init(&spscq_ready_cond, NULL)); AZ(pthread_cond_init(&spscq_ready_cond, NULL));
AZ(pthread_mutex_init(&spscq_ready_lock, NULL)); AZ(pthread_mutex_init(&spscq_ready_lock, NULL));
AZ(pthread_cond_init(&data_ready_cond, NULL));
AZ(pthread_mutex_init(&data_ready_lock, NULL));
if (config.monitor_interval > 0) if (config.monitor_interval > 0)
MON_Start(); MON_Start();
...@@ -960,6 +1008,8 @@ main(int argc, char *argv[]) ...@@ -960,6 +1008,8 @@ main(int argc, char *argv[])
FMT_Fini(); FMT_Fini();
AZ(pthread_cond_destroy(&spscq_ready_cond)); AZ(pthread_cond_destroy(&spscq_ready_cond));
AZ(pthread_mutex_destroy(&spscq_ready_lock)); AZ(pthread_mutex_destroy(&spscq_ready_lock));
AZ(pthread_cond_destroy(&data_ready_cond));
AZ(pthread_mutex_destroy(&data_ready_lock));
if (pfh != NULL) { if (pfh != NULL) {
errno = 0; errno = 0;
if (VPF_Remove(pfh) != 0) if (VPF_Remove(pfh) != 0)
......
...@@ -164,12 +164,18 @@ include_t *hdr_include_tbl[MAX_VSL_TAG]; ...@@ -164,12 +164,18 @@ include_t *hdr_include_tbl[MAX_VSL_TAG];
pthread_cond_t spscq_ready_cond; pthread_cond_t spscq_ready_cond;
pthread_mutex_t spscq_ready_lock; pthread_mutex_t spscq_ready_lock;
/* Reader waits for this condition if any of the freelists are exhausted.
Writer signals the condition when it returns freelists. */
pthread_cond_t data_ready_cond;
pthread_mutex_t data_ready_lock;
struct config { struct config {
char log_file[PATH_MAX + 1]; char log_file[PATH_MAX + 1];
char output_file[PATH_MAX + 1]; char output_file[PATH_MAX + 1];
unsigned append; unsigned append;
double output_timeout; double output_timeout;
double reader_timeout;
/* VSL 'r' argument */ /* VSL 'r' argument */
char varnish_bindump[PATH_MAX + 1]; char varnish_bindump[PATH_MAX + 1];
...@@ -199,6 +205,7 @@ struct config { ...@@ -199,6 +205,7 @@ struct config {
/* varnishevent.c */ /* varnishevent.c */
void RDR_Stats(void); void RDR_Stats(void);
int RDR_Depleted(void); int RDR_Depleted(void);
int RDR_Waiting(void);
/* config.c */ /* config.c */
void CONF_Init(void); void CONF_Init(void);
......
...@@ -136,11 +136,23 @@ open_log(void) ...@@ -136,11 +136,23 @@ open_log(void)
return 0; return 0;
} }
static void
wrt_signal_data_ready(void)
{
if (RDR_Waiting()) {
AZ(pthread_mutex_lock(&data_ready_lock));
if (RDR_Waiting())
AZ(pthread_cond_signal(&data_ready_cond));
AZ(pthread_mutex_unlock(&data_ready_lock));
}
}
static inline void static inline void
wrt_return_freelist(void) wrt_return_freelist(void)
{ {
if (wrt_nfree_chunks > 0) { if (wrt_nfree_chunks > 0) {
DATA_Return_Freechunk(&wrt_freechunks, wrt_nfree_chunks); DATA_Return_Freechunk(&wrt_freechunks, wrt_nfree_chunks);
wrt_signal_data_ready();
LOG_Log(LOG_DEBUG, "Writer: returned %u chunks to free list", LOG_Log(LOG_DEBUG, "Writer: returned %u chunks to free list",
wrt_nfree_chunks); wrt_nfree_chunks);
wrt_nfree_chunks = 0; wrt_nfree_chunks = 0;
...@@ -148,6 +160,7 @@ wrt_return_freelist(void) ...@@ -148,6 +160,7 @@ wrt_return_freelist(void)
} }
if (wrt_nfree_recs > 0) { if (wrt_nfree_recs > 0) {
DATA_Return_Freerec(&wrt_freerecs, wrt_nfree_recs); DATA_Return_Freerec(&wrt_freerecs, wrt_nfree_recs);
wrt_signal_data_ready();
LOG_Log(LOG_DEBUG, "Writer: returned %u records to free list", LOG_Log(LOG_DEBUG, "Writer: returned %u records to free list",
wrt_nfree_recs); wrt_nfree_recs);
wrt_nfree_recs = 0; wrt_nfree_recs = 0;
...@@ -155,6 +168,7 @@ wrt_return_freelist(void) ...@@ -155,6 +168,7 @@ wrt_return_freelist(void)
} }
if (wrt_nfree_tx > 0) { if (wrt_nfree_tx > 0) {
DATA_Return_Freetx(&wrt_freetx, wrt_nfree_tx); DATA_Return_Freetx(&wrt_freetx, wrt_nfree_tx);
wrt_signal_data_ready();
LOG_Log(LOG_DEBUG, "Writer: returned %u tx to free list", wrt_nfree_tx); LOG_Log(LOG_DEBUG, "Writer: returned %u tx to free list", wrt_nfree_tx);
wrt_nfree_tx = 0; wrt_nfree_tx = 0;
assert(VSTAILQ_EMPTY(&wrt_freetx)); assert(VSTAILQ_EMPTY(&wrt_freetx));
...@@ -250,7 +264,7 @@ wrt_write(tx_t *tx) ...@@ -250,7 +264,7 @@ wrt_write(tx_t *tx)
assert(tx->state == TX_FREE); assert(tx->state == TX_FREE);
if (RDR_Depleted() || wrt_nfree_tx > tx_thresh if (RDR_Waiting() || RDR_Depleted() || wrt_nfree_tx > tx_thresh
|| wrt_nfree_recs > rec_thresh || wrt_nfree_chunks > chunk_thresh) || wrt_nfree_recs > rec_thresh || wrt_nfree_chunks > chunk_thresh)
wrt_return_freelist(); wrt_return_freelist();
} }
...@@ -296,7 +310,7 @@ static void ...@@ -296,7 +310,7 @@ static void
/* /*
* run is guaranteed to be fresh after the lock * run is guaranteed to be fresh after the lock
*/ */
if (run) { if (run && !RDR_Waiting()) {
waits++; waits++;
AZ(pthread_cond_wait(&spscq_ready_cond, &spscq_ready_lock)); AZ(pthread_cond_wait(&spscq_ready_cond, &spscq_ready_lock));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment