Commit cbc829fe authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Scrape back the waiter API, and rewrite the poll waiter to handle

sparse fd-sets efficiently.
parent aeee5b5a
......@@ -374,7 +374,6 @@ struct waited {
unsigned magic;
#define WAITED_MAGIC 0x1743992d
int fd;
VTAILQ_ENTRY(waited) list;
void *ptr;
double idle;
};
......
......@@ -37,7 +37,6 @@
#include "vcli_priv.h"
#include "vrnd.h"
#include "waiter/waiter.h"
#include "hash/hash_slinger.h"
......@@ -213,8 +212,6 @@ child_main(void)
PAN_Init();
VFP_Init();
Waiter_Init();
VCL_Init();
HTTP_Init();
......
......@@ -36,174 +36,17 @@
#include "cache/cache.h"
#include "vfil.h"
#include "vtim.h"
#include "waiter/waiter.h"
#include "waiter/waiter_priv.h"
#define NEV 8192
static VTAILQ_HEAD(, waiter) waiters = VTAILQ_HEAD_INITIALIZER(waiters);
static int nwaiters;
static struct lock wait_mtx;
static pthread_t wait_thr;
static void *
wait_poker_thread(void *arg)
{
struct waiter *w;
double now;
(void)arg;
THR_SetName("Waiter timer");
while (1) {
/* Avoid thundering herds and resonances */
(void)usleep(990013/nwaiters);
now = VTIM_real();
Lck_Lock(&wait_mtx);
w = VTAILQ_FIRST(&waiters);
VTAILQ_REMOVE(&waiters, w, list);
VTAILQ_INSERT_TAIL(&waiters, w, list);
assert(w->pipes[1] >= 0);
if (w->next_idle + *w->tmo < now)
(void)write(w->pipes[1], &w->pipe_w, sizeof w->pipe_w);
Lck_Unlock(&wait_mtx);
}
NEEDLESS_RETURN(NULL);
}
void
Wait_UsePipe(struct waiter *w)
{
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
AN(waiter->inject);
AZ(pipe(w->pipes));
AZ(VFIL_nonblocking(w->pipes[0]));
AZ(VFIL_nonblocking(w->pipes[1]));
ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
w->pipe_w->fd = w->pipes[0];
w->pipe_w->idle = 0;
VTAILQ_INSERT_HEAD(&w->waithead, w->pipe_w, list);
waiter->inject(w, w->pipe_w);
}
int
Wait_Enter(const struct waiter *w, struct waited *wp)
{
ssize_t written;
uintptr_t up;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
assert(wp->fd > 0); // stdin never comes here
AZ(w->dismantle);
if (w->impl->pass != NULL)
return (w->impl->pass(w->priv, wp));
assert(w->pipes[1] > 0);
up = (uintptr_t)wp;
written = write(w->pipes[1], &up, sizeof up);
if (written != sizeof up && (errno == EAGAIN || errno == EWOULDBLOCK))
return (-1);
assert (written == sizeof up);
return (0);
}
static void
wait_updidle(struct waiter *w, double now)
{
struct waited *wp;
wp = VTAILQ_FIRST(&w->waithead);
if (wp == NULL)
return;
if (wp == w->pipe_w) {
VTAILQ_REMOVE(&w->waithead, wp, list);
VTAILQ_INSERT_TAIL(&w->waithead, wp, list);
wp->idle = now;
wp = VTAILQ_FIRST(&w->waithead);
}
w->next_idle = wp->idle;
}
void
Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
{
uintptr_t ss[NEV];
struct waited *wp2;
int i, j, dotimer = 0;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_ORNULL(wp, WAITED_MAGIC);
if (wp != NULL) {
if (wp == w->pipe_w) {
w->do_pipe = 1;
VTAILQ_REMOVE(&w->waithead, w->pipe_w, list);
wp->idle = now;
VTAILQ_INSERT_TAIL(&w->waithead, w->pipe_w, list);
} else {
if (w->impl->evict != NULL)
w->impl->evict(w, wp);
VTAILQ_REMOVE(&w->waithead, wp, list);
w->func(wp, ev, now);
wait_updidle(w, now);
}
return;
}
AZ(wp);
if (!w->do_pipe)
return;
w->do_pipe = 0;
i = read(w->pipes[0], ss, sizeof ss);
if (i == -1 && errno == EAGAIN)
return;
for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
if (ss[j] == 0) {
AN(w->dismantle);
continue;
}
ss[j] &= ~1;
CAST_OBJ_NOTNULL(wp2, (void*)ss[j], WAITED_MAGIC);
if (wp2 == w->pipe_w) {
dotimer = 1;
} else {
assert(wp2->fd >= 0);
VTAILQ_INSERT_TAIL(&w->waithead, wp2, list);
w->impl->inject(w, wp2);
}
}
AZ(i);
wait_updidle(w, now);
if (!dotimer)
return;
VTAILQ_FOREACH_SAFE(wp, &w->waithead, list, wp2) {
if (wp == w->pipe_w)
continue;
if (wp->idle + *w->tmo > now)
break;
if (w->impl->evict != NULL)
w->impl->evict(w, wp);
VTAILQ_REMOVE(&w->waithead, wp, list);
w->func(wp, WAITER_TIMEOUT, now);
}
wait_updidle(w, now);
assert(wp->fd > 0); // stdin never comes here
return (w->impl->enter(w->priv, wp));
}
/**********************************************************************/
......@@ -215,7 +58,7 @@ Waiter_GetName(void)
if (waiter != NULL)
return (waiter->name);
else
return ("no_waiter");
return ("(No Waiter?)");
}
struct waiter *
......@@ -226,6 +69,8 @@ Waiter_New(waiter_handle_f *func, volatile double *tmo)
AN(waiter);
AN(waiter->name);
AN(waiter->init);
AN(waiter->enter);
AN(waiter->fini);
w = calloc(1, sizeof (struct waiter) + waiter->size);
AN(w);
......@@ -234,20 +79,10 @@ Waiter_New(waiter_handle_f *func, volatile double *tmo)
w->impl = waiter;
w->func = func;
w->tmo = tmo;
w->pipes[0] = w->pipes[1] = -1;
VTAILQ_INIT(&w->waithead);
waiter->init(w);
AN(w->impl->pass || w->pipes[1] >= 0);
Lck_Lock(&wait_mtx);
VTAILQ_INSERT_TAIL(&waiters, w, list);
nwaiters++;
/* We assume all waiters either use pipes or don't use pipes */
if (w->pipes[1] >= 0 && nwaiters == 1)
AZ(pthread_create(&wait_thr, NULL, wait_poker_thread, NULL));
Lck_Unlock(&wait_mtx);
return (w);
}
......@@ -255,47 +90,13 @@ void
Waiter_Destroy(struct waiter **wp)
{
struct waiter *w;
struct waited *wx = NULL;
int written;
double now;
AN(wp);
w = *wp;
*wp = NULL;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
Lck_Lock(&wait_mtx);
VTAILQ_REMOVE(&waiters, w, list);
w->dismantle = 1;
Lck_Unlock(&wait_mtx);
if (w->pipes[1] >= 0) {
while (1) {
written = write(w->pipes[1], &wx, sizeof wx);
if (written == sizeof wx)
break;
(void)usleep(10000);
}
}
AN(w->impl->fini);
w->impl->fini(w);
now = VTIM_real();
while (1) {
wx = VTAILQ_FIRST(&w->waithead);
if (wx == NULL)
break;
VTAILQ_REMOVE(&w->waithead, wx, list);
if (wx == w->pipe_w)
FREE_OBJ(wx);
else
w->func(wx, WAITER_CLOSE, now);
}
FREE_OBJ(w);
}
void
Waiter_Init(void)
{
Lck_New(&wait_mtx, lck_misc);
}
......@@ -45,80 +45,88 @@ struct vwp {
#define VWP_MAGIC 0x4b2cc735
struct waiter *waiter;
int pipes[2];
pthread_t thread;
struct pollfd *pollfd;
unsigned npoll;
unsigned hpoll;
struct waited **idx;
size_t npoll;
size_t hpoll;
};
/*--------------------------------------------------------------------*/
/*--------------------------------------------------------------------
* It would make much more sense to not use two large vectors, but
* the poll(2) API forces us to use at least one, so ... KISS.
*/
static void
vwp_pollspace(struct vwp *vwp, unsigned fd)
vwp_extend_pollspace(struct vwp *vwp)
{
struct pollfd *newpollfd = vwp->pollfd;
unsigned newnpoll;
if (fd < vwp->npoll)
return;
newnpoll = vwp->npoll;
if (newnpoll == 0)
newnpoll = 1;
while (fd >= newnpoll)
newnpoll = newnpoll * 2;
VSL(SLT_Debug, 0, "Acceptor poll space increased to %u", newnpoll);
newpollfd = realloc(newpollfd, newnpoll * sizeof *newpollfd);
XXXAN(newpollfd);
memset(newpollfd + vwp->npoll, 0,
(newnpoll - vwp->npoll) * sizeof *newpollfd);
vwp->pollfd = newpollfd;
while (vwp->npoll < newnpoll)
size_t inc = (1<<16);
VSL(SLT_Debug, 0, "Acceptor poll space increased by %zu to %zu",
inc, vwp->npoll + inc);
vwp->pollfd = realloc(vwp->pollfd,
(vwp->npoll + inc) * sizeof(*vwp->pollfd));
AN(vwp->pollfd);
memset(vwp->pollfd + vwp->npoll, 0, inc * sizeof(*vwp->pollfd));
vwp->idx = realloc(vwp->idx, (vwp->npoll + inc) * sizeof(*vwp->idx));
AN(vwp->idx);
memset(vwp->idx + vwp->npoll, 0, inc * sizeof(*vwp->idx));
for (; inc > 0; inc--)
vwp->pollfd[vwp->npoll++].fd = -1;
assert(fd < vwp->npoll);
}
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_inject_f)
vwp_inject(const struct waiter *w, struct waited *wp)
static void
vwp_add(struct vwp *vwp, struct waited *w)
{
struct vwp *vwp;
int fd;
CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
fd = wp->fd;
VSL(SLT_Debug, 0, "POLL Inject %d", fd);
assert(fd >= 0);
vwp_pollspace(vwp, (unsigned)fd);
assert(fd < vwp->npoll);
if (vwp->hpoll < fd)
vwp->hpoll = fd;
assert(vwp->pollfd[fd].fd == -1);
AZ(vwp->pollfd[fd].events);
AZ(vwp->pollfd[fd].revents);
vwp->pollfd[fd].fd = fd;
vwp->pollfd[fd].events = POLLIN;
if (vwp->hpoll == vwp->npoll)
vwp_extend_pollspace(vwp);
assert(vwp->hpoll < vwp->npoll);
assert(vwp->pollfd[vwp->hpoll].fd == -1);
AZ(vwp->idx[vwp->hpoll]);
vwp->pollfd[vwp->hpoll].fd = w->fd;
vwp->pollfd[vwp->hpoll].events = POLLIN;
vwp->idx[vwp->hpoll] = w;
vwp->hpoll++;
}
static void __match_proto__(waiter_evict_f)
vwp_evict(const struct waiter *w, struct waited *wp)
static void
vwp_del(struct vwp *vwp, int n)
{
struct vwp *vwp;
int fd;
vwp->hpoll--;
if (n != vwp->hpoll) {
vwp->pollfd[n] = vwp->pollfd[vwp->hpoll];
vwp->idx[n] = vwp->idx[vwp->hpoll];
}
memset(&vwp->pollfd[vwp->hpoll], 0, sizeof(*vwp->pollfd));
vwp->pollfd[vwp->hpoll].fd = -1;
vwp->idx[vwp->hpoll] = NULL;
}
CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
fd = wp->fd;
VSL(SLT_Debug, 0, "POLL Evict %d", fd);
assert(fd >= 0);
assert(fd < vwp->npoll);
vwp_pollspace(vwp, (unsigned)fd);
vwp->pollfd[fd].fd = -1;
vwp->pollfd[fd].events = 0;
/*--------------------------------------------------------------------*/
static void
vwp_dopipe(struct vwp *vwp)
{
struct waited *w[128];
ssize_t ss;
int i;
ss = read(vwp->pipes[0], w, sizeof w);
assert(ss > 0);
i = 0;
while (ss) {
CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
assert(w[i]->fd > 0); // no stdin
vwp_add(vwp, w[i++]);
}
}
/*--------------------------------------------------------------------*/
......@@ -128,50 +136,66 @@ vwp_main(void *priv)
{
int v, v2;
struct vwp *vwp;
struct waited *sp, *sp2;
struct waited *wp;
double now, idle;
int fd;
int i;
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
THR_SetName("cache-poll");
while (!vwp->waiter->dismantle) {
while (1) {
// Try to keep the high point as low as possible
assert(vwp->hpoll < vwp->npoll);
while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
vwp->hpoll--;
// XXX: sleep on ->tmo
v = poll(vwp->pollfd, vwp->hpoll + 1, -1);
assert(v >= 0);
v2 = v;
now = VTIM_real();
idle = now - *vwp->waiter->tmo;
VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->waithead, list, sp2) {
if (v != 0 && v2 == 0)
break;
CHECK_OBJ_NOTNULL(sp, WAITED_MAGIC);
fd = sp->fd;
VSL(SLT_Debug, 0,
"POLL Handle %d %x", fd, vwp->pollfd[fd].revents);
assert(fd >= 0);
assert(fd <= vwp->hpoll);
assert(fd < vwp->npoll);
assert(vwp->pollfd[fd].fd == fd);
if (vwp->pollfd[fd].revents) {
i = 1;
while (v2 > 0 && i < vwp->hpoll) {
wp = vwp->idx[i];
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
if (vwp->pollfd[i].revents != 0) {
v2--;
vwp->pollfd[fd].revents = 0;
Wait_Handle(vwp->waiter, sp, WAITER_ACTION,
now);
} else if (sp->idle <= idle) {
Wait_Handle(vwp->waiter, sp, WAITER_TIMEOUT,
now);
assert(wp->fd > 0);
assert(wp->fd == vwp->pollfd[i].fd);
VSL(SLT_Debug, wp->fd, "POLL Handle %d %x",
wp->fd, vwp->pollfd[i].revents);
vwp_del(vwp, i);
vwp->waiter->func(wp, WAITER_ACTION, now);
} else if (wp->idle <= idle) {
vwp_del(vwp, i);
vwp->waiter->func(wp, WAITER_TIMEOUT, now);
} else {
i++;
}
}
Wait_Handle(vwp->waiter, NULL, WAITER_ACTION, now);
if (vwp->pollfd[0].revents)
vwp_dopipe(vwp);
}
NEEDLESS_RETURN(NULL);
}
/*--------------------------------------------------------------------*/
static int __match_proto__(waiter_enter_f)
vwp_enter(void *priv, struct waited *wp)
{
struct vwp *vwp;
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
if (write(vwp->pipes[1], &wp, sizeof wp) != sizeof wp)
return (-1);
return (0);
}
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_init_f)
vwp_init(struct waiter *w)
{
......@@ -181,9 +205,13 @@ vwp_init(struct waiter *w)
vwp = w->priv;
INIT_OBJ(vwp, VWP_MAGIC);
vwp->waiter = w;
AZ(pipe(vwp->pipes));
// XXX: set write pipe non-blocking
vwp_pollspace(vwp, 256);
Wait_UsePipe(w);
vwp_extend_pollspace(vwp);
vwp->pollfd[0].fd = vwp->pipes[0];
vwp->pollfd[0].events = POLLIN;
vwp->hpoll = 1;
AZ(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
}
......@@ -196,6 +224,9 @@ vwp_fini(struct waiter *w)
void *vp;
CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
vp = NULL;
// XXX: set write pipe blocking
assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
AZ(pthread_join(vwp->thread, &vp));
free(vwp->pollfd);
}
......@@ -206,7 +237,6 @@ const struct waiter_impl waiter_poll = {
.name = "poll",
.init = vwp_init,
.fini = vwp_fini,
.inject = vwp_inject,
.evict = vwp_evict,
.enter = vwp_enter,
.size = sizeof(struct vwp),
};
......@@ -61,7 +61,6 @@ int Wait_Enter(const struct waiter *, struct waited *);
struct waiter *Waiter_New(waiter_handle_f *, volatile double *timeout);
void Waiter_Destroy(struct waiter **);
const char *Waiter_GetName(void);
void Waiter_Init(void);
/* mgt_waiter.c */
int Wait_Argument(struct vsb *vsb, const char *arg);
......@@ -33,27 +33,25 @@ struct waited;
struct waiter {
unsigned magic;
#define WAITER_MAGIC 0x17c399db
#define WAITER_MAGIC 0x17c399db
const struct waiter_impl *impl;
VTAILQ_ENTRY(waiter) list;
VTAILQ_HEAD(,waited) waithead;
int dismantle;
waiter_handle_f * func;
int pipes[2];
struct waited *pipe_w;
double next_idle;
int do_pipe;
volatile double *tmo;
VTAILQ_HEAD(,waited) waithead;
void *priv;
};
typedef void waiter_init_f(struct waiter *);
typedef void waiter_fini_f(struct waiter *);
typedef int waiter_pass_f(void *priv, struct waited *);
typedef int waiter_enter_f(void *priv, struct waited *);
typedef void waiter_inject_f(const struct waiter *, struct waited *);
typedef void waiter_evict_f(const struct waiter *, struct waited *);
......@@ -61,16 +59,11 @@ struct waiter_impl {
const char *name;
waiter_init_f *init;
waiter_fini_f *fini;
waiter_pass_f *pass;
waiter_enter_f *enter;
waiter_inject_f *inject;
waiter_evict_f *evict;
size_t size;
};
/* cache_waiter.c */
void Wait_Handle(struct waiter *, struct waited *, enum wait_event, double now);
void Wait_UsePipe(struct waiter *w);
/* mgt_waiter.c */
extern struct waiter_impl const * waiter;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment