Commit b9d87969 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Centralise the timer/poker function of pipe-based waiters, so that

we can avoid thundering-herd/resonance phenomena when we have multiple
waiters.
parent 1520837f
......@@ -397,7 +397,7 @@ struct waited {
VTAILQ_ENTRY(waited) list;
int fd;
void *ptr;
double deadline;
double idle;
#if defined(HAVE_EPOLL_CTL)
struct epoll_event ev;
#endif
......
......@@ -39,6 +39,7 @@
#include "vcli_priv.h"
#include "vrnd.h"
#include "waiter/waiter.h"
#include "hash/hash_slinger.h"
......@@ -214,6 +215,8 @@ child_main(void)
CLI_Init();
VFP_Init();
Wait_Init();
VCL_Init();
HTTP_Init();
......
......@@ -295,7 +295,7 @@ SES_Wait(struct sess *sp)
sp->waited.magic = WAITED_MAGIC;
sp->waited.fd = sp->fd;
sp->waited.ptr = sp;
sp->waited.deadline = sp->t_idle;
sp->waited.idle = sp->t_idle;
if (Wait_Enter(pp->http1_waiter, &sp->waited)) {
VSC_C_main->sess_pipe_overflow++;
SES_Delete(sp, SC_SESS_PIPE_OVERFLOW, NAN);
......
......@@ -38,12 +38,53 @@
#include "cache/cache.h"
#include "vfil.h"
#include "vtim.h"
#include "waiter/waiter.h"
#include "waiter/waiter_priv.h"
#define NEV 8192
static VTAILQ_HEAD(, waiter) waiters = VTAILQ_HEAD_INITIALIZER(waiters);
static int nwaiters;
static struct lock wait_mtx;
static pthread_t wait_thr;
static void *
wait_poker_thread(void *arg)
{
struct waiter *w;
struct waited *wp;
double now;
(void)arg;
THR_SetName("Waiter timer");
while (1) {
/* Avoid thundering herds and resonances */
(void)usleep(990013/nwaiters);
now = VTIM_real();
Lck_Lock(&wait_mtx);
w = VTAILQ_FIRST(&waiters);
VTAILQ_REMOVE(&waiters, w, list);
VTAILQ_INSERT_TAIL(&waiters, w, list);
assert(w->pipes[1] >= 0);
wp = VTAILQ_FIRST(&w->waithead);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
if (wp == w->pipe_w) {
VTAILQ_REMOVE(&w->waithead, wp, list);
VTAILQ_INSERT_TAIL(&w->waithead, wp, list);
wp = VTAILQ_FIRST(&w->waithead);
}
if (wp->idle + *w->tmo < now)
(void)write(w->pipes[1], &w->pipe_w, sizeof w->pipe_w);
Lck_Unlock(&wait_mtx);
}
NEEDLESS_RETURN(NULL);
}
const char *
Wait_GetName(void)
{
......@@ -70,10 +111,21 @@ Wait_New(waiter_handle_f *func, volatile double *tmo)
w->impl = waiter;
w->func = func;
w->tmo = tmo;
w->pipes[0] = w->pipes[1] = -1;
VTAILQ_INIT(&w->waithead);
waiter->init(w);
AN(w->impl->pass || w->pfd > 0);
AN(w->impl->pass || w->pipes[1] >= 0);
if (w->pipes[1] >= 0 && VTAILQ_EMPTY(&waiters)) {
/* Start timer poker thread */
AZ(pthread_create(&wait_thr, NULL, wait_poker_thread, NULL));
}
Lck_Lock(&wait_mtx);
VTAILQ_INSERT_TAIL(&waiters, w, list);
nwaiters++;
Lck_Unlock(&wait_mtx);
return (w);
}
......@@ -86,10 +138,9 @@ Wait_UsePipe(struct waiter *w)
AZ(pipe(w->pipes));
AZ(VFIL_nonblocking(w->pipes[0]));
AZ(VFIL_nonblocking(w->pipes[1]));
w->pfd = w->pipes[1];
ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
w->pipe_w->fd = w->pipes[0];
w->pipe_w->deadline = 9e99;
w->pipe_w->idle = 9e99;
VTAILQ_INSERT_HEAD(&w->waithead, w->pipe_w, list);
waiter->inject(w, w->pipe_w);
}
......@@ -106,9 +157,9 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
if (w->impl->pass != NULL)
return (w->impl->pass(w->priv, wp));
assert(w->pfd >= 0);
assert(w->pipes[1] > 0);
written = write(w->pfd, &wp, sizeof wp);
written = write(w->pipes[1], &wp, sizeof wp);
if (written != sizeof wp && (errno == EAGAIN || errno == EWOULDBLOCK))
return (-1);
assert (written == sizeof wp);
......@@ -118,28 +169,55 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
void
Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
{
struct waited *ss[NEV];
int i, j;
struct waited *ss[NEV], *wp2;
int i, j, dotimer = 0;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
if (wp == w->pipe_w) {
i = read(w->pipes[0], ss, sizeof ss);
if (i == -1 && errno == EAGAIN)
return;
for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
w->impl->inject(w, ss[j]);
if (wp != w->pipe_w) {
if (w->impl->evict != NULL)
w->impl->evict(w, wp);
VTAILQ_REMOVE(&w->waithead, wp, list);
w->func(wp, ev, now);
return;
}
i = read(w->pipes[0], ss, sizeof ss);
if (i == -1 && errno == EAGAIN)
return;
for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
if (ss[j] == w->pipe_w) {
dotimer = 1;
continue;
}
AZ(i);
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
w->impl->inject(w, ss[j]);
}
AZ(i);
if (!dotimer)
return;
VTAILQ_FOREACH_SAFE(wp, &w->waithead, list, wp2) {
if (wp == w->pipe_w)
continue;
if (wp->idle + *w->tmo > now)
break;
if (w->impl->evict != NULL)
w->impl->evict(w, wp);
VTAILQ_REMOVE(&w->waithead, wp, list);
w->func(wp, WAITER_TIMEOUT, now);
}
if (w->impl->evict != NULL)
w->impl->evict(w, wp);
}
void
Wait_Init(void)
{
VTAILQ_REMOVE(&w->waithead, wp, list);
w->func(wp, ev, now);
Lck_New(&wait_mtx, lck_misc);
}
......@@ -110,7 +110,7 @@ vwe_thread(void *priv)
struct epoll_event ev[NEEV], *ep;
struct waited *sp, *sp2;
char junk;
double now, deadline;
double now, idle;
int dotimer, i, n;
struct vwe *vwe;
......@@ -135,9 +135,9 @@ vwe_thread(void *priv)
continue;
/* check for timeouts */
deadline = now - *vwe->waiter->tmo;
idle = now - *vwe->waiter->tmo;
VTAILQ_FOREACH_SAFE(sp, &vwe->waiter->waithead, list, sp2) {
if (sp->deadline < deadline)
if (sp->idle < idle)
Wait_Handle(vwe->waiter, sp,
WAITER_TIMEOUT, now);
}
......
......@@ -127,9 +127,8 @@ vwk_thread(void *priv)
{
struct vwk *vwk;
struct kevent ke[NKEV], *kp;
int j, n, dotimer;
double now, deadline;
struct waited *sp;
int j, n;
double now;
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
THR_SetName("cache-kqueue");
......@@ -141,42 +140,17 @@ vwk_thread(void *priv)
vwk->nki = 0;
while (1) {
dotimer = 0;
n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
now = VTIM_real();
assert(n <= NKEV);
if (n == 0) {
/* This happens on OSX in m00011.vtc */
dotimer = 1;
(void)usleep(10000);
}
vwk->nki = 0;
now = VTIM_real();
for (kp = ke, j = 0; j < n; j++, kp++) {
if (kp->filter == EVFILT_TIMER) {
dotimer = 1;
} else {
assert(kp->filter == EVFILT_READ);
vwk_sess_ev(vwk, kp, now);
}
}
if (!dotimer)
continue;
/*
* Make sure we have no pending changes for the fd's
* we are about to close, in case the accept(2) in the
* other thread creates new fd's betwen our close and
* the kevent(2) at the top of this loop, the kernel
* would not know we meant "the old fd of this number".
*/
vwk_kq_flush(vwk);
deadline = now - *vwk->waiter->tmo;
for (;;) {
sp = VTAILQ_FIRST(&vwk->waiter->waithead);
if (sp == NULL)
break;
if (sp->deadline > deadline)
break;
Wait_Handle(vwk->waiter, sp, WAITER_TIMEOUT, now);
assert(kp->filter == EVFILT_READ);
vwk_sess_ev(vwk, kp, now);
}
}
NEEDLESS_RETURN(NULL);
......@@ -194,9 +168,6 @@ vwk_init(struct waiter *w)
INIT_OBJ(vwk, VWK_MAGIC);
vwk->waiter = w;
EV_SET(&vwk->ki[vwk->nki], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
vwk->nki++;
Wait_UsePipe(w);
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
......
......@@ -131,7 +131,7 @@ vwp_main(void *priv)
int v, v2;
struct vwp *vwp;
struct waited *sp, *sp2;
double now, deadline;
double now, idle;
int fd;
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
......@@ -141,11 +141,11 @@ vwp_main(void *priv)
assert(vwp->hpoll < vwp->npoll);
while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
vwp->hpoll--;
v = poll(vwp->pollfd, vwp->hpoll + 1, 100);
v = poll(vwp->pollfd, vwp->hpoll + 1, -1);
assert(v >= 0);
v2 = v;
now = VTIM_real();
deadline = now - *vwp->waiter->tmo;
idle = now - *vwp->waiter->tmo;
VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->waithead, list, sp2) {
if (v != 0 && v2 == 0)
break;
......@@ -160,7 +160,7 @@ vwp_main(void *priv)
vwp->pollfd[fd].revents = 0;
Wait_Handle(vwp->waiter, sp, WAITER_ACTION,
now);
} else if (sp->deadline <= deadline) {
} else if (sp->idle <= idle) {
Wait_Handle(vwp->waiter, sp, WAITER_TIMEOUT,
now);
}
......
......@@ -154,7 +154,7 @@ vws_thread(void *priv)
port_event_t ev[MAX_EVENTS];
u_int nevents;
int ei, ret;
double now, deadline;
double now, idle;
/*
* XXX Do we want to scale this up dynamically to increase
......@@ -189,7 +189,7 @@ vws_thread(void *priv)
vws_port_ev(vws, ev + ei, now);
/* check for timeouts */
deadline = now - *vws->waiter->tmo;
idle = now - *vws->waiter->tmo;
/*
* This loop assumes that the oldest sessions are always at the
......@@ -202,7 +202,7 @@ vws_thread(void *priv)
sp = VTAILQ_FIRST(&vws->waiter->waithead);
if (sp == NULL)
break;
if (sp->deadline > deadline) {
if (sp->idle > idle) {
break;
}
vws_del(vws, sp->fd);
......@@ -214,7 +214,7 @@ vws_thread(void *priv)
*/
if (sp) {
double tmo = (sp->deadline + *vws->waiter->tmo) - now;
double tmo = (sp->idle + *vws->waiter->tmo) - now;
if (tmo < min_t) {
timeout = &min_ts;
......
......@@ -59,6 +59,7 @@ typedef void waiter_handle_f(struct waited *, enum wait_event, double now);
int Wait_Enter(const struct waiter *, struct waited *);
struct waiter *Wait_New(waiter_handle_f *, volatile double *timeout);
const char *Wait_GetName(void);
void Wait_Init(void);
/* mgt_waiter.c */
int Wait_Argument(struct vsb *vsb, const char *arg);
......@@ -35,6 +35,8 @@ struct waiter {
unsigned magic;
#define WAITER_MAGIC 0x17c399db
const struct waiter_impl *impl;
VTAILQ_ENTRY(waiter) list;
waiter_handle_f * func;
int pipes[2];
......@@ -44,7 +46,6 @@ struct waiter {
VTAILQ_HEAD(,waited) waithead;
void *priv;
int pfd;
};
typedef void waiter_init_f(struct waiter *);
......
......@@ -54,4 +54,5 @@ LOCK(busyobj)
LOCK(mempool)
LOCK(vxid)
LOCK(pipestat)
LOCK(misc)
/*lint -restore */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment