Commit e805d832 authored by Nils Goroll's avatar Nils Goroll

add a lightweight output buffering implementation

parent 77e6fdc1
......@@ -36,9 +36,13 @@
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <poll.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include "varnishevent.h"
#include "writer.h"
......@@ -99,9 +103,239 @@ typedef struct writer_data_s {
} writer_data_t;
static writer_data_t wrt_data;
static unsigned run, reopen = 0, tx_thresh, rec_thresh, chunk_thresh;
static inline void wrt_return_freelist(void);
struct outb {
struct pollfd poll;
char *buf, *p, *e;
int fd;
};
static inline size_t
outb_space(const struct outb *b)
{
return (b->e - b->p);
}
static struct outb *
outb_init(int fd, size_t sz)
{
int on = 1;
struct outb *b;
b = malloc(sizeof *b);
if (b == NULL) {
errno = ENOMEM;
return NULL;
}
b->buf = malloc(sz);
if (b->buf == NULL) {
free(b);
errno = ENOMEM;
return NULL;
}
b->p = b->buf;
b->e = b->buf + sz;
b->fd = fd;
b->poll.fd = fd;
b->poll.events = POLLOUT;
(void) ioctl(fd, FIONBIO, &on);
return (b);
}
static struct outb *
outb_open(const char *pathname, int flags, mode_t mode, size_t sz)
{
struct outb *b;
int e, fd;
fd = open(pathname, flags, mode);
if (fd < 0)
return NULL;
b = outb_init(fd, sz);
if (b == NULL) {
e = errno;
(void) close(fd);
errno = e;
}
return (b);
}
static struct outb *
outb_dup(int ifd, size_t sz)
{
struct outb *b;
int e, fd;
fd = dup(ifd);
if (fd < 0)
return NULL;
b = outb_init(fd, sz);
if (b == NULL) {
e = errno;
(void) close(fd);
errno = e;
}
return (b);
}
static ssize_t
_outb_poll(struct outb *b)
{
int i;
double t0;
t0 = VTIM_mono();
do {
i = poll(&b->poll, 1, timeout);
if (i < 0)
assert(errno == EAGAIN || errno == EINTR);
} while (i < 0);
pollt += VTIM_mono() - t0;
AZ(b->poll.revents & POLLNVAL);
if (b->poll.revents & POLLERR)
return (-1);
if (i != 0) {
assert (i == 1);
AN(b->poll.revents & POLLOUT);
}
return (i);
}
static ssize_t
_outb_wr(struct outb *b, const void *buf, size_t count)
{
int i;
ssize_t c;
double t0;
t0 = VTIM_mono();
c = write(b->fd, buf, count);
writet += VTIM_mono() - t0;
if (c >= 0) {
writes++;
bytes += c;
return (c);
}
if (errno != EAGAIN && errno != EWOULDBLOCK) {
LOG_Log(LOG_ERR, "Output error %d (%s), DATA DISCARDED: %s",
errno, strerror(errno), buf);
errors++;
return (-1);
}
assert(errno == EAGAIN || errno == EWOULDBLOCK);
i = _outb_poll(b);
if (i == -1) {
LOG_Log(LOG_ERR,
"Error waiting for ready output %d (%s), "
"DATA DISCARDED: %s", errno, strerror(errno), buf);
errors++;
return (-1);
}
if (i == 0) {
wrt_return_freelist();
LOG_Log(LOG_ERR,
"Timeout waiting for ready output, DATA DISCARDED: %s",
buf);
timeouts++;
return (-1);
}
return (_outb_wr(b, buf, count));
}
static ssize_t
outb_flush(struct outb *b)
{
ssize_t w, t = 0, l = b->p - b->buf;
assert(l >= 0);
for (;;) {
if (l - t == 0)
break;
assert(l > t);
w = _outb_wr(b, b->buf + t, l - t);
if (w < 0) {
t = w;
break;
}
assert (w > 0);
t += w;
}
b->p = b->buf;
return (t);
}
static ssize_t
outb_write(struct outb *b, const char *s, size_t l)
{
ssize_t w, t = 0, spc = outb_space(b);
assert(spc >= 0);
if (spc < l) {
w = outb_flush(b);
if (w < 0)
return (w);
spc = outb_space(b);
}
for (;;) {
w = l - t;
if (w == 0)
break;
if (spc >= w) {
memcpy(b->p, s + t, w);
b->p += w;
break;
}
w = _outb_wr(b, s + t, w);
if (w < 0)
return (w);
assert (w > 0);
t += w;
}
return (t);
}
static int
outb_close(struct outb **bp)
{
struct outb *b = *bp;
int r;
*bp = NULL;
r = outb_flush(b);
if (close(b->fd))
r = -1;
free (b->buf);
free (b);
return (r);
}
static int
open_log(void)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment