Commit ae46a67d authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Bite the bullet and write an alternate acceptor which uses kqueue

directly instead of libevent.

Degeneralize the header reading code in cache_http.c which seems to
be cleaner anyway.

An #ifdef at the top of cache_acceptor.c selects which implementation
you want: libevent or kqueue.



git-svn-id: http://www.varnish-cache.org/svn/trunk/varnish-cache@597 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent b2c44715
......@@ -46,8 +46,6 @@ enum step {
* RSN: struct worker and struct session will have one of these embedded.
*/
typedef void http_callback_f(void *, int bad);
struct http_hdr {
char *b;
char *e;
......@@ -56,9 +54,6 @@ struct http_hdr {
struct http {
unsigned magic;
#define HTTP_MAGIC 0x6428b5c9
struct event ev;
http_callback_f *callback;
void *arg;
char *s; /* (S)tart of buffer */
char *t; /* start of (T)railing data */
......@@ -230,6 +225,7 @@ struct sess {
int id;
unsigned xid;
struct event ev;
struct worker *wrk;
unsigned sockaddrlen;
......@@ -346,7 +342,9 @@ int http_GetStatus(struct http *hp);
int http_HdrIs(struct http *hp, const char *hdr, const char *val);
int http_GetTail(struct http *hp, unsigned len, char **b, char **e);
int http_Read(struct http *hp, int fd, void *b, unsigned len);
void http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg);
void http_RecvPrep(struct http *hp);
int http_RecvPrepAgain(struct http *hp);
int http_RecvSome(int fd, struct http *hp);
int http_RecvHead(struct http *hp, int fd);
int http_DissectRequest(struct http *sp, int fd);
int http_DissectResponse(struct http *sp, int fd);
......
......@@ -6,6 +6,9 @@
* write the session pointer to a pipe which the event engine monitors.
*/
#define ACCEPTOR_USE_KQUEUE
#undef ACCEPTOR_USE_LIBEVENT
#include <stdio.h>
#include <errno.h>
#include <string.h>
......@@ -16,14 +19,73 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include "config.h"
#include "libvarnish.h"
#include "heritage.h"
#include "shmlog.h"
#include "cache.h"
static pthread_t vca_thread;
static unsigned xids;
static struct sess *
vca_accept_sess(int fd)
{
socklen_t l;
struct sockaddr addr[2]; /* XXX: IPv6 hack */
struct sess *sp;
int i;
struct linger linger;
VSL_stats->client_conn++;
l = sizeof addr;
i = accept(fd, addr, &l);
if (i < 0) {
VSL(SLT_Debug, fd, "Accept failed errno=%d", errno);
/* XXX: stats ? */
return (NULL);
}
sp = SES_New(addr, l);
assert(sp != NULL); /* XXX handle */
sp->fd = i;
sp->id = i;
#ifdef SO_NOSIGPIPE /* XXX Linux */
i = 1;
AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i));
#endif
#ifdef SO_LINGER /* XXX Linux*/
linger.l_onoff = 0;
linger.l_linger = 0;
AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger));
#endif
TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port);
VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port);
return (sp);
}
static void
vca_handover(struct sess *sp, int bad)
{
if (bad) {
vca_close_session(sp,
bad == 1 ? "overflow" : "no request");
vca_return_session(sp);
return;
}
sp->step = STP_RECV;
VSL_stats->client_req++;
sp->xid = xids++;
VSL(SLT_XID, sp->fd, "%u", sp->xid);
WRK_QueueSession(sp);
}
#ifdef ACCEPTOR_USE_LIBEVENT
static struct event_base *evb;
static struct event pipe_e;
static int pipes[2];
......@@ -31,12 +93,11 @@ static int pipes[2];
static struct event tick_e;
static struct timeval tick_rate;
static pthread_t vca_thread;
static unsigned xids;
static struct event accept_e[2 * HERITAGE_NSOCKS];
static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
/*--------------------------------------------------------------------*/
static void
vca_tick(int a, short b, void *c)
{
......@@ -59,25 +120,33 @@ vca_tick(int a, short b, void *c)
}
static void
vca_callback(void *arg, int bad)
vca_rcvhd_f(int fd, short event, void *arg)
{
struct sess *sp;
int i;
(void)event;
CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC);
TAILQ_REMOVE(&sesshead, sp, list);
if (bad) {
if (bad == 1)
vca_close_session(sp, "overflow");
else
vca_close_session(sp, "no request");
vca_return_session(sp);
i = http_RecvSome(fd, sp->http);
if (i < 0)
return;
}
sp->step = STP_RECV;
VSL_stats->client_req++;
sp->xid = xids++;
VSL(SLT_XID, sp->fd, "%u", sp->xid);
WRK_QueueSession(sp);
event_del(&sp->ev);
TAILQ_REMOVE(&sesshead, sp, list);
vca_handover(sp, i);
}
static void
vca_rcvhdev(struct sess *sp)
{
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
TAILQ_INSERT_TAIL(&sesshead, sp, list);
event_set(&sp->ev, sp->fd, EV_READ | EV_PERSIST, vca_rcvhd_f, sp);
AZ(event_base_set(evb, &sp->ev));
AZ(event_add(&sp->ev, NULL)); /* XXX: timeout */
}
static void
......@@ -90,53 +159,27 @@ pipe_f(int fd, short event, void *arg)
(void)arg;
i = read(fd, &sp, sizeof sp);
assert(i == sizeof sp);
clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
TAILQ_INSERT_TAIL(&sesshead, sp, list);
http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp);
if (http_RecvPrepAgain(sp->http)) {
vca_handover(sp, 0);
return;
}
vca_rcvhdev(sp);
}
static void
accept_f(int fd, short event, void *arg)
{
socklen_t l;
struct sockaddr addr[2]; /* XXX: IPv6 hack */
struct sess *sp;
int i;
struct linger linger;
(void)event;
(void)arg;
VSL_stats->client_conn++;
l = sizeof addr;
i = accept(fd, addr, &l);
if (i < 0) {
VSL(SLT_Debug, fd, "Accept failed errno=%d", errno);
/* XXX: stats ? */
sp = vca_accept_sess(fd);
if (sp == NULL)
return;
}
sp = SES_New(addr, l);
assert(sp != NULL); /* XXX handle */
sp->fd = i;
sp->id = i;
#ifdef SO_NOSIGPIPE /* XXX Linux */
i = 1;
AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i));
#endif
#ifdef SO_LINGER /* XXX Linux*/
linger.l_onoff = 0;
linger.l_linger = 0;
AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger));
#endif
TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port);
VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port);
clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
TAILQ_INSERT_TAIL(&sesshead, sp, list);
http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp);
http_RecvPrep(sp->http);
vca_rcvhdev(sp);
}
static void *
......@@ -147,6 +190,8 @@ vca_main(void *arg)
(void)arg;
tick_rate.tv_sec = 1;
tick_rate.tv_usec = 0;
AZ(pipe(pipes));
evb = event_init();
assert(evb != NULL);
......@@ -187,13 +232,114 @@ vca_main(void *arg)
/*--------------------------------------------------------------------*/
void
vca_close_session(struct sess *sp, const char *why)
vca_return_session(struct sess *sp)
{
VSL(SLT_SessionClose, sp->fd, why);
if (sp->fd >= 0)
AZ(close(sp->fd));
sp->fd = -1;
if (sp->fd < 0) {
SES_Delete(sp);
return;
}
VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
}
#endif /* ACCEPTOR_USE_LIBEVENT */
#ifdef ACCEPTOR_USE_KQUEUE
#include <sys/event.h>
static int kq = -1;
static void
vca_kq_sess(struct sess *sp, int arm)
{
struct kevent ke[2];
assert(arm == EV_ADD || arm == EV_DELETE);
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
memset(ke, 0, sizeof ke);
EV_SET(&ke[0], sp->fd, EVFILT_READ, arm, 0, 0, sp);
EV_SET(&ke[1], sp->fd, EVFILT_TIMER, arm , 0, 5000, sp);
AZ(kevent(kq, ke, 2, NULL, 0, NULL));
}
static void
accept_f(int fd)
{
struct sess *sp;
sp = vca_accept_sess(fd);
if (sp == NULL)
return;
clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
http_RecvPrep(sp->http);
vca_kq_sess(sp, EV_ADD);
}
static void *
vca_main(void *arg)
{
unsigned u;
struct kevent ke;
int i;
struct sess *sp;
(void)arg;
kq = kqueue();
assert(kq >= 0);
for (u = 0; u < HERITAGE_NSOCKS; u++) {
if (heritage.sock_local[u] >= 0) {
memset(&ke, 0, sizeof ke);
EV_SET(&ke, heritage.sock_local[u],
EVFILT_READ, EV_ADD, 0, 0, accept_f);
AZ(kevent(kq, &ke, 1, NULL, 0, NULL));
}
if (heritage.sock_remote[u] >= 0) {
memset(&ke, 0, sizeof ke);
EV_SET(&ke, heritage.sock_remote[u],
EVFILT_READ, EV_ADD, 0, 0, accept_f);
AZ(kevent(kq, &ke, 1, NULL, 0, NULL));
}
}
while (1) {
i = kevent(kq, NULL, 0, &ke, 1, NULL);
assert(i == 1);
#if 0
printf("i = %d\n", i);
printf("ke.ident = %ju\n", (uintmax_t)ke.ident);
printf("ke.filter = %u\n", ke.filter);
printf("ke.flags = %u\n", ke.flags);
printf("ke.fflags = %u\n", ke.fflags);
printf("ke.data = %jd\n", (intmax_t)ke.data);
printf("ke.udata = %p\n", ke.udata);
#endif
if (ke.udata == accept_f) {
accept_f(ke.ident);
continue;
}
CAST_OBJ_NOTNULL(sp, ke.udata, SESS_MAGIC);
if (ke.filter == EVFILT_READ) {
i = http_RecvSome(sp->fd, sp->http);
if (i == -1)
continue;
vca_kq_sess(sp, EV_DELETE);
vca_handover(sp, i);
continue;
}
if (ke.filter == EVFILT_TIMER) {
vca_kq_sess(sp, EV_DELETE);
vca_close_session(sp, "timeout");
vca_return_session(sp);
continue;
}
INCOMPL();
}
INCOMPL();
}
/*--------------------------------------------------------------------*/
......@@ -202,12 +348,29 @@ void
vca_return_session(struct sess *sp)
{
if (sp->fd >= 0) {
VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
} else {
if (sp->fd < 0) {
SES_Delete(sp);
return;
}
VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
if (http_RecvPrepAgain(sp->http))
vca_handover(sp, 0);
else
vca_kq_sess(sp, EV_ADD);
}
#endif /* ACCEPTOR_USE_KQUEUE */
/*--------------------------------------------------------------------*/
void
vca_close_session(struct sess *sp, const char *why)
{
VSL(SLT_SessionClose, sp->fd, why);
if (sp->fd >= 0)
AZ(close(sp->fd));
sp->fd = -1;
}
/*--------------------------------------------------------------------*/
......@@ -216,8 +379,6 @@ void
VCA_Init(void)
{
tick_rate.tv_sec = 1;
tick_rate.tv_usec = 0;
AZ(pthread_create(&vca_thread, NULL, vca_main, NULL));
srandomdev();
xids = random();
......
......@@ -436,14 +436,15 @@ http_header_complete(struct http *hp)
if (++p > hp->v)
return (0);
hp->t = p;
assert(hp->t > hp->s);
assert(hp->t <= hp->v);
return (1);
}
/*--------------------------------------------------------------------*/
static void
http_preprecv(struct http *hp)
void
http_RecvPrep(struct http *hp)
{
unsigned l;
......@@ -462,10 +463,19 @@ http_preprecv(struct http *hp)
}
}
int
http_RecvPrepAgain(struct http *hp)
{
http_RecvPrep(hp);
if (hp->v == hp->s)
return (0);
return (http_header_complete(hp));
}
/*--------------------------------------------------------------------*/
static int
http_read_hdr(int fd, struct http *hp)
int
http_RecvSome(int fd, struct http *hp)
{
unsigned l;
int i;
......@@ -507,55 +517,15 @@ http_read_hdr(int fd, struct http *hp)
/*--------------------------------------------------------------------*/
static void
http_read_f(int fd, short event, void *arg)
{
struct http *hp;
int i;
(void)event;
CAST_OBJ_NOTNULL(hp, arg, HTTP_MAGIC);
i = http_read_hdr(fd, hp);
if (i < 0)
return;
event_del(&hp->ev);
if (hp->callback != NULL)
hp->callback(hp->arg, i);
}
void
http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg)
{
CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC);
assert(func != NULL);
http_preprecv(hp);
if (hp->v != hp->s && http_header_complete(hp)) {
func(arg, 0);
return;
}
hp->callback = func;
hp->arg = arg;
event_set(&hp->ev, fd, EV_READ | EV_PERSIST, http_read_f, hp);
AZ(event_base_set(eb, &hp->ev));
AZ(event_add(&hp->ev, NULL)); /* XXX: timeout */
return;
}
/*--------------------------------------------------------------------*/
int
http_RecvHead(struct http *hp, int fd)
{
int i;
CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC);
http_preprecv(hp);
http_RecvPrep(hp);
do
i = http_read_hdr(fd, hp);
i = http_RecvSome(fd, hp);
while (i == -1);
return (i);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment