Commit 4c88bc96 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Add yet another thread with an event engine to monitor idle backend

connections and clean them out if the backend closes.


git-svn-id: http://www.varnish-cache.org/svn/trunk@158 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent 861ff13d
/* /*
* $Id$ * $Id$
*
* Manage backend connections.
*
* For each backend ip number we maintain a shadow backend structure so
* that backend connections can be reused across VCL changes.
*
* For each IP we maintain a list of busy and free backend connections,
* and free connections are monitored to detect if the backend closes
* the connection.
*
* We recycle backend connections in most recently used order to minimize
* the number of open connections to any one backend.
*
* XXX:
* I'm not happy about recycling always going through the monitor thread
* but not doing so is slightly more tricky: A connection might be reused
* several times before the monitor thread got around to it, and it would
* have to double check if it had already armed the event for that connection.
* Hopefully this is nowhere close to a performance issue, but if it is,
* it can be optimized at the expense of more complex code.
*/ */
#include <sys/types.h> #include <sys/types.h>
...@@ -15,24 +35,27 @@ ...@@ -15,24 +35,27 @@
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <sys/select.h> #include <sys/select.h>
#include <sys/filio.h>
#include <event.h>
#include "libvarnish.h" #include "libvarnish.h"
#include "shmlog.h" #include "shmlog.h"
#include "vcl_lang.h" #include "vcl_lang.h"
#include "cache.h" #include "cache.h"
/* /* A backend connection */
* The internal backend structure for managing connection pools per
* backend. We need to shadow the backend stucture from the VCL
* in order let connections live across VCL switches.
*/
struct vbe_conn { struct vbe_conn {
TAILQ_ENTRY(vbe_conn) list; TAILQ_ENTRY(vbe_conn) list;
struct vbe *vbe; struct vbe *vbe;
int fd; int fd;
struct event ev;
int inuse;
}; };
/* A backend IP */
struct vbe { struct vbe {
unsigned ip; unsigned ip;
TAILQ_ENTRY(vbe) list; TAILQ_ENTRY(vbe) list;
...@@ -45,6 +68,10 @@ static TAILQ_HEAD(,vbe) vbe_head = TAILQ_HEAD_INITIALIZER(vbe_head); ...@@ -45,6 +68,10 @@ static TAILQ_HEAD(,vbe) vbe_head = TAILQ_HEAD_INITIALIZER(vbe_head);
static pthread_mutex_t vbemtx; static pthread_mutex_t vbemtx;
static pthread_t vbe_thread;
static struct event_base *vbe_evb;
static int vbe_pipe[2];
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* XXX: we should not call getaddrinfo() every time, we should cache * XXX: we should not call getaddrinfo() every time, we should cache
* and apply round-robin with blacklisting of entries that do not respond * and apply round-robin with blacklisting of entries that do not respond
...@@ -89,36 +116,89 @@ connect_to_backend(struct vbe_conn *vc, struct backend *bp) ...@@ -89,36 +116,89 @@ connect_to_backend(struct vbe_conn *vc, struct backend *bp)
return; return;
} }
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------
* When backend connections have been used, they are passed to us through
* the vbe_pipe. If fd == -1 it has been closed and will be reclaimed,
* otherwise arm an event to monitor if the backend closes and recycle.
*/
int static void
tst_fd(int fd) vbe_rdp(int fd, short event __unused, void *arg __unused)
{ {
fd_set r,w,e; struct vbe_conn *vc;
int i; int i;
struct timeval tv;
char c; i = read(fd, &vc, sizeof vc);
assert(i == sizeof vc);
FD_ZERO(&r); AZ(pthread_mutex_lock(&vbemtx));
FD_ZERO(&w); TAILQ_REMOVE(&vc->vbe->bconn, vc, list);
FD_ZERO(&e); if (vc->fd < 0) {
FD_SET(fd, &r); free(vc);
FD_SET(fd, &w); } else {
FD_SET(fd, &e); vc->inuse = 0;
tv.tv_sec = 0; event_add(&vc->ev, NULL);
tv.tv_usec = 0; TAILQ_INSERT_HEAD(&vc->vbe->fconn, vc, list);
i = select(fd + 1, &r, &w, &e, &tv);
printf("tst_fd fd %d i %d flag %d/%d/%d\n",
fd, i, FD_ISSET(fd, &r), FD_ISSET(fd, &w), FD_ISSET(fd, &e));
if (FD_ISSET(fd, &r)) {
i = read(fd, &c, 1);
if (i == 0)
return (1);
} }
return (0); AZ(pthread_mutex_unlock(&vbemtx));
} }
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------
* A backend connection became ready. This can happen if it was reused
* in which case we unarm the event and get out of the way, or if the
* backend closed the connection in which case we clean up.
*/
static void
vbe_rdf(int fd, short event __unused, void *arg)
{
struct vbe_conn *vc;
int j;
vc = arg;
AZ(pthread_mutex_lock(&vbemtx));
if (vc->inuse) {
event_del(&vc->ev);
AZ(pthread_mutex_unlock(&vbemtx));
return;
}
AZ(ioctl(vc->fd, FIONREAD, &j));
VSL(SLT_BackendClose, vc->fd, "Remote (%d chars)", j);
TAILQ_REMOVE(&vc->vbe->fconn, vc, list);
AZ(pthread_mutex_unlock(&vbemtx));
event_del(&vc->ev);
close(vc->fd);
free(vc);
}
/* Backend monitoring thread -----------------------------------------*/
static void *
vbe_main(void *priv __unused)
{
struct event pev;
vbe_evb = event_init();
assert(vbe_evb != NULL);
AZ(pipe(vbe_pipe));
memset(&pev, 0, sizeof pev);
event_set(&pev, vbe_pipe[0], EV_READ | EV_PERSIST, vbe_rdp, NULL);
event_base_set(vbe_evb, &pev);
event_add(&pev, NULL);
event_base_loop(vbe_evb, 0);
assert(__LINE__ == 0);
return (NULL);
}
/* Get a backend connection ------------------------------------------
*
* First locate the backend shadow, if necessary by creating one.
* If there are free connections, use the first, otherwise build a
* new connection.
*/
int int
VBE_GetFd(struct backend *bp, void **ptr) VBE_GetFd(struct backend *bp, void **ptr)
...@@ -126,7 +206,6 @@ VBE_GetFd(struct backend *bp, void **ptr) ...@@ -126,7 +206,6 @@ VBE_GetFd(struct backend *bp, void **ptr)
struct vbe *vp; struct vbe *vp;
struct vbe_conn *vc; struct vbe_conn *vc;
again:
AZ(pthread_mutex_lock(&vbemtx)); AZ(pthread_mutex_lock(&vbemtx));
vp = bp->vbe; vp = bp->vbe;
if (vp == NULL) { if (vp == NULL) {
...@@ -146,55 +225,54 @@ again: ...@@ -146,55 +225,54 @@ again:
/* XXX: check nconn vs backend->maxcon */ /* XXX: check nconn vs backend->maxcon */
vc = TAILQ_FIRST(&vp->fconn); vc = TAILQ_FIRST(&vp->fconn);
if (vc != NULL) { if (vc != NULL) {
vc->inuse = 1;
TAILQ_REMOVE(&vp->fconn, vc, list); TAILQ_REMOVE(&vp->fconn, vc, list);
TAILQ_INSERT_TAIL(&vp->bconn, vc, list); TAILQ_INSERT_TAIL(&vp->bconn, vc, list);
AZ(pthread_mutex_unlock(&vbemtx)); AZ(pthread_mutex_unlock(&vbemtx));
if (tst_fd(vc->fd)) {
VBE_ClosedFd(vc);
goto again;
}
} else { } else {
vc = calloc(sizeof *vc, 1); vc = calloc(sizeof *vc, 1);
assert(vc != NULL); assert(vc != NULL);
vc->vbe = vp; vc->vbe = vp;
vc->fd = -1; vc->fd = -1;
vc->inuse = 1;
TAILQ_INSERT_TAIL(&vp->bconn, vc, list); TAILQ_INSERT_TAIL(&vp->bconn, vc, list);
AZ(pthread_mutex_unlock(&vbemtx)); AZ(pthread_mutex_unlock(&vbemtx));
connect_to_backend(vc, bp); connect_to_backend(vc, bp);
event_set(&vc->ev, vc->fd, EV_READ | EV_PERSIST, vbe_rdf, vc);
event_base_set(vbe_evb, &vc->ev);
} }
*ptr = vc; *ptr = vc;
return (vc->fd); return (vc->fd);
} }
/*--------------------------------------------------------------------*/ /* Close a connection ------------------------------------------------*/
void void
VBE_ClosedFd(void *ptr) VBE_ClosedFd(void *ptr)
{ {
struct vbe_conn *vc; struct vbe_conn *vc;
int i;
vc = ptr; vc = ptr;
VSL(SLT_BackendClose, vc->fd, ""); VSL(SLT_BackendClose, vc->fd, "");
close(vc->fd); close(vc->fd);
AZ(pthread_mutex_lock(&vbemtx)); vc->fd = -1;
TAILQ_REMOVE(&vc->vbe->bconn, vc, list); i = write(vbe_pipe[1], &vc, sizeof vc);
AZ(pthread_mutex_unlock(&vbemtx)); assert(i == sizeof vc);
free(vc);
} }
/*--------------------------------------------------------------------*/ /* Recycle a connection ----------------------------------------------*/
void void
VBE_RecycleFd(void *ptr) VBE_RecycleFd(void *ptr)
{ {
struct vbe_conn *vc; struct vbe_conn *vc;
int i;
vc = ptr; vc = ptr;
VSL(SLT_BackendReuse, vc->fd, ""); VSL(SLT_BackendReuse, vc->fd, "");
AZ(pthread_mutex_lock(&vbemtx)); i = write(vbe_pipe[1], &vc, sizeof vc);
TAILQ_REMOVE(&vc->vbe->bconn, vc, list); assert(i == sizeof vc);
TAILQ_INSERT_HEAD(&vc->vbe->fconn, vc, list);
AZ(pthread_mutex_unlock(&vbemtx));
} }
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
...@@ -204,4 +282,5 @@ VBE_Init(void) ...@@ -204,4 +282,5 @@ VBE_Init(void)
{ {
AZ(pthread_mutex_init(&vbemtx, NULL)); AZ(pthread_mutex_init(&vbemtx, NULL));
AZ(pthread_create(&vbe_thread, NULL, vbe_main, NULL));
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment