Commit 1ea3ed91 authored by Geoff Simmons's avatar Geoff Simmons

use poll() (not select()) to wait for ready output with timeout

on output streams that may block
parent 4eb7a7e2
...@@ -43,7 +43,6 @@ ...@@ -43,7 +43,6 @@
#include "config.h" #include "config.h"
#include "varnishevent.h" #include "varnishevent.h"
#include "vtim.h"
#include "vas.h" #include "vas.h"
#include "vdef.h" #include "vdef.h"
...@@ -162,11 +161,9 @@ CONF_Add(const char *lval, const char *rval) ...@@ -162,11 +161,9 @@ CONF_Add(const char *lval, const char *rval)
} }
if (strcmp(lval, "output.timeout") == 0) { if (strcmp(lval, "output.timeout") == 0) {
double to; int err = conf_getDouble(rval, &config.output_timeout);
int err = conf_getDouble(rval, &to);
if (err != 0) if (err != 0)
return err; return err;
config.output_timeout = VTIM_timeval(to);
return(0); return(0);
} }
...@@ -228,8 +225,7 @@ CONF_Init(void) ...@@ -228,8 +225,7 @@ CONF_Init(void)
config.chunk_size = DEFAULT_CHUNK_SIZE; config.chunk_size = DEFAULT_CHUNK_SIZE;
config.append = 0; config.append = 0;
config.output_timeout.tv_sec = 0; config.output_timeout = 0.;
config.output_timeout.tv_usec = 0;
} }
static int static int
...@@ -326,9 +322,7 @@ CONF_Dump(void) ...@@ -326,9 +322,7 @@ CONF_Dump(void)
confdump("output.file = %s", confdump("output.file = %s",
EMPTY(config.output_file) ? "stdout" : config.output_file); EMPTY(config.output_file) ? "stdout" : config.output_file);
confdump("append = %u", config.append); confdump("append = %u", config.append);
confdump("output.timeout = %f", confdump("output.timeout = %f", config.output_timeout);
config.output_timeout.tv_sec
+ (double) config.output_timeout.tv_usec / 1e-6);
confdump("cformat = %s", VSB_data(config.cformat)); confdump("cformat = %s", VSB_data(config.cformat));
confdump("bformat = %s", VSB_data(config.bformat)); confdump("bformat = %s", VSB_data(config.bformat));
confdump("rformat = %s", VSB_data(config.rformat)); confdump("rformat = %s", VSB_data(config.rformat));
......
...@@ -31,12 +31,14 @@ ...@@ -31,12 +31,14 @@
#include "../varnishevent.h" #include "../varnishevent.h"
#include "../writer.h" #include "../writer.h"
#include "../vtim.h"
#include "minunit.h" #include "minunit.h"
int tests_run = 0; int tests_run = 0;
static char errmsg[BUFSIZ]; static char errmsg[BUFSIZ];
#define THRESHOLD 1000 #define N 1000
#define THRESHOLD 1e-3
void void
RDR_Stats(void) RDR_Stats(void)
...@@ -60,11 +62,10 @@ static char ...@@ -60,11 +62,10 @@ static char
VSB_clear(config.cformat); VSB_clear(config.cformat);
MAZ(FMT_Init(&errmsg[0])); MAZ(FMT_Init(&errmsg[0]));
strcpy(config.log_file, "-"); strcpy(config.log_file, "/dev/null");
MAZ(LOG_Open("test_writer")); MAZ(LOG_Open("test_writer"));
config.output_timeout.tv_sec = 1; config.output_timeout = 1.;
config.output_timeout.tv_usec = 0;
MAZ(WRT_Init()); MAZ(WRT_Init());
...@@ -78,13 +79,14 @@ static char ...@@ -78,13 +79,14 @@ static char
node.rec = NULL; node.rec = NULL;
node.hdrs = NULL; node.hdrs = NULL;
for (int i = 0; i < THRESHOLD; i++) { for (int i = 0; i < N; i++) {
double t = VTIM_mono();
tx.state = TX_SUBMITTED; tx.state = TX_SUBMITTED;
tx.type = VSL_t_req; tx.type = VSL_t_req;
t = VTIM_mono();
wrt_write(&tx); wrt_write(&tx);
MAZ(to.tv_sec); MASSERT(VTIM_mono() - t < THRESHOLD);
MASSERT(1e6 - to.tv_usec < THRESHOLD);
} }
return NULL; return NULL;
} }
......
...@@ -169,7 +169,7 @@ struct config { ...@@ -169,7 +169,7 @@ struct config {
char output_file[PATH_MAX + 1]; char output_file[PATH_MAX + 1];
unsigned append; unsigned append;
struct timeval output_timeout; double output_timeout;
/* VSL 'r' argument */ /* VSL 'r' argument */
char varnish_bindump[PATH_MAX + 1]; char varnish_bindump[PATH_MAX + 1];
......
...@@ -34,8 +34,8 @@ ...@@ -34,8 +34,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <syslog.h> #include <syslog.h>
#include <string.h> #include <string.h>
#include <sys/select.h> #include <sys/stat.h>
#include <sys/time.h> #include <poll.h>
#include <errno.h> #include <errno.h>
#include "varnishevent.h" #include "varnishevent.h"
...@@ -76,9 +76,8 @@ static unsigned wrt_nfree_tx, wrt_nfree_recs, wrt_nfree_chunks; ...@@ -76,9 +76,8 @@ static unsigned wrt_nfree_tx, wrt_nfree_recs, wrt_nfree_chunks;
static struct vsb *os; static struct vsb *os;
static FILE *fo; static FILE *fo;
static int fd; static struct pollfd fds[1];
static fd_set set; static int blocking = 0, timeout = -1;
static struct timeval *timeout = NULL;
static char *obuf = NULL; static char *obuf = NULL;
static pthread_mutex_t reopen_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t reopen_lock = PTHREAD_MUTEX_INITIALIZER;
...@@ -101,30 +100,28 @@ static writer_data_t wrt_data; ...@@ -101,30 +100,28 @@ static writer_data_t wrt_data;
static unsigned run, reopen = 0, tx_thresh, rec_thresh, chunk_thresh; static unsigned run, reopen = 0, tx_thresh, rec_thresh, chunk_thresh;
static int
set_fdset(void)
{
errno = 0;
fd = fileno(fo);
if (fd == -1)
return errno;
FD_ZERO(&set);
FD_SET(fd, &set);
return 0;
}
static int static int
open_log(void) open_log(void)
{ {
struct stat st;
if (EMPTY(config.output_file) || strcmp(config.output_file, "-") == 0) if (EMPTY(config.output_file) || strcmp(config.output_file, "-") == 0)
fo = stdout; fo = stdout;
else if ((fo = fopen(config.output_file, config.append ? "a" : "w")) else {
== NULL) if ((fo = fopen(config.output_file, config.append ? "a" : "w"))
return errno; == NULL)
return errno;
if (stat(config.output_file, &st) < 0)
return errno;
if (S_ISDIR(st.st_mode))
return EISDIR;
blocking = !S_ISREG(st.st_mode);
if (blocking) {
fds[0].fd = fileno(fo);
fds[0].events = POLLOUT;
}
}
if (set_fdset() != 0)
return errno;
if (obuf != NULL) if (obuf != NULL)
free(obuf); free(obuf);
obuf = (char *) malloc(config.output_bufsiz); obuf = (char *) malloc(config.output_bufsiz);
...@@ -165,13 +162,15 @@ wrt_return_freelist(void) ...@@ -165,13 +162,15 @@ wrt_return_freelist(void)
void void
wrt_write(tx_t *tx) wrt_write(tx_t *tx)
{ {
int errnum; int ready = 1;
CHECK_OBJ_NOTNULL(tx, TX_MAGIC); CHECK_OBJ_NOTNULL(tx, TX_MAGIC);
assert(tx->state == TX_SUBMITTED); assert(tx->state == TX_SUBMITTED);
AZ(pthread_mutex_lock(&reopen_lock)); AZ(pthread_mutex_lock(&reopen_lock));
if (reopen && fo != stdout) { if (reopen && fo != stdout) {
int errnum;
wrt_return_freelist(); wrt_return_freelist();
if (fflush(fo) != 0) if (fflush(fo) != 0)
LOG_Log(LOG_ERR, "Cannot flush to %s, DATA DISCARDED: %s", LOG_Log(LOG_ERR, "Cannot flush to %s, DATA DISCARDED: %s",
...@@ -190,50 +189,52 @@ wrt_write(tx_t *tx) ...@@ -190,50 +189,52 @@ wrt_write(tx_t *tx)
} }
AZ(pthread_mutex_unlock(&reopen_lock)); AZ(pthread_mutex_unlock(&reopen_lock));
VRMB();
VSB_clear(os); VSB_clear(os);
VRMB();
FMT_Format(tx, os); FMT_Format(tx, os);
VSB_finish(os); VSB_finish(os);
assert(tx->state == TX_WRITTEN); assert(tx->state == TX_WRITTEN);
if (timeout != NULL) if (blocking) {
to = config.output_timeout; int nfds;
if ((errnum = select(fd + 1, NULL, &set, NULL, timeout)) == -1) {
LOG_Log(LOG_ERR, ready = 0;
"Error waiting for ready output %d (%s), DATA DISCARDED: %s", do {
errno, strerror(errno), VSB_data(os)); nfds = poll(fds, 1, timeout);
errors++; if (nfds < 0)
if (set_fdset() != 0) { assert(errno == EAGAIN || errno == EINTR);
LOG_Log(LOG_ALERT, } while (nfds < 0);
"Cannot reset fd set after select() error, exiting: %s", AZ(fds[0].revents & POLLNVAL);
strerror(errno)); if (fds[0].revents & POLLERR) {
exit(EXIT_FAILURE); LOG_Log(LOG_ERR,
"Error waiting for ready output %d (%s), "
"DATA DISCARDED: %s", errno, strerror(errno), VSB_data(os));
errors++;
} }
} else if (nfds == 0) {
else if (errnum == 0) { wrt_return_freelist();
wrt_return_freelist(); LOG_Log(LOG_ERR,
LOG_Log(LOG_ERR, "Timeout waiting for ready output, DATA DISCARDED: %s",
"Timeout waiting for ready output, DATA DISCARDED: %s", VSB_data(os));
VSB_data(os)); timeouts++;
timeouts++; }
if (set_fdset() != 0) { else if (nfds != 1)
LOG_Log(LOG_ALERT, "Cannot reset fd set after timeout, exiting: %s", WRONG("More than one ready file descriptor for output");
strerror(errno)); else {
exit(EXIT_FAILURE); AN(fds[0].revents & POLLOUT);
ready = 1;
} }
} }
else if (errnum != 1) if (ready) {
WRONG("More than one ready file descriptor for output"); if (fprintf(fo, "%s", VSB_data(os)) < 0) {
else if (!FD_ISSET(fd, &set)) LOG_Log(LOG_ERR, "Output error %d (%s), DATA DISCARDED: %s",
WRONG("Wrong file descriptor found ready for output"); errno, strerror(errno), VSB_data(os));
else if (fprintf(fo, "%s", VSB_data(os)) < 0) { errors++;
LOG_Log(LOG_ERR, "Output error %d (%s), DATA DISCARDED: %s", }
errno, strerror(errno), VSB_data(os)); else {
errors++; writes++;
} bytes += VSB_len(os);
else { }
writes++;
bytes += VSB_len(os);
} }
/* clean up */ /* clean up */
...@@ -330,11 +331,8 @@ WRT_Init(void) ...@@ -330,11 +331,8 @@ WRT_Init(void)
/* XXX: fixed size? */ /* XXX: fixed size? */
os = VSB_new_auto(); os = VSB_new_auto();
if (config.output_timeout.tv_sec != 0 if (config.output_timeout != 0.)
|| config.output_timeout.tv_usec != 0) { timeout = config.output_timeout * 1e3;
to = config.output_timeout;
timeout = &to;
}
tx_thresh = config.max_data >> 1; tx_thresh = config.max_data >> 1;
rec_thresh = nrecords >> 1; rec_thresh = nrecords >> 1;
......
...@@ -34,10 +34,4 @@ ...@@ -34,10 +34,4 @@
/* local freelist - return space in chunks */ /* local freelist - return space in chunks */
struct txhead_s wrt_freetx; struct txhead_s wrt_freetx;
/*
* Set to the write timeout, and indicates the time elapsed after the
* select() call.
*/
struct timeval to;
void wrt_write(tx_t *tx); void wrt_write(tx_t *tx);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment