Commit 65fbdd20 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

More rearranging the deskchairs in waiters

parent 3dc7838f
......@@ -62,10 +62,17 @@ WAIT_Init(waiter_handle_f *func, volatile double *tmo)
AN(waiter);
AN(waiter->name);
AN(waiter->init);
w = waiter->init(func, tmo);
w = calloc(1, sizeof (struct waiter) + waiter->size);
AN(w);
INIT_OBJ(w, WAITER_MAGIC);
w->priv = (void*)(w + 1);
w->impl = waiter;
w->func = func;
w->tmo = tmo;
VTAILQ_INIT(&w->sesshead);
waiter->init(w);
AN(w->impl->pass || w->pfd > 0);
return (w);
}
......
......@@ -55,15 +55,12 @@
struct vwe {
unsigned magic;
#define VWE_MAGIC 0x6bd73424
struct waiter waiter[1];
struct waiter *waiter;
pthread_t epoll_thread;
pthread_t timer_thread;
int epfd;
waiter_handle_f *func;
volatile double *tmo;
int pipes[2];
int timer_pipes[2];
};
......@@ -176,7 +173,7 @@ vwe_thread(void *priv)
continue;
/* check for timeouts */
deadline = now - *vwe->tmo;
deadline = now - *vwe->waiter->tmo;
for (;;) {
sp = VTAILQ_FIRST(&vwe->waiter->sesshead);
if (sp == NULL)
......@@ -210,20 +207,16 @@ vwe_timeout_idle_ticker(void *priv)
/*--------------------------------------------------------------------*/
static struct waiter * __match_proto__(waiter_init_f)
vwe_init(waiter_handle_f *func, volatile double *tmo)
static void __match_proto__(waiter_init_f)
vwe_init(struct waiter *w)
{
struct vwe *vwe;
AN(func);
AN(tmo);
ALLOC_OBJ(vwe, VWE_MAGIC);
AN(vwe);
INIT_OBJ(vwe->waiter, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
vwe = w->priv;
INIT_OBJ(vwe, VWE_MAGIC);
vwe->waiter = w;
VTAILQ_INIT(&vwe->waiter->sesshead);
AZ(pipe(vwe->pipes));
AZ(pipe(vwe->timer_pipes));
......@@ -231,14 +224,11 @@ vwe_init(waiter_handle_f *func, volatile double *tmo)
AZ(VFIL_nonblocking(vwe->pipes[1]));
AZ(VFIL_nonblocking(vwe->timer_pipes[0]));
vwe->func = func;
vwe->tmo = tmo;
vwe->waiter->pfd = vwe->pipes[1];
w->pfd = vwe->pipes[1];
AZ(pthread_create(&vwe->timer_thread,
NULL, vwe_timeout_idle_ticker, vwe));
AZ(pthread_create(&vwe->epoll_thread, NULL, vwe_thread, vwe));
return(vwe->waiter);
}
/*--------------------------------------------------------------------*/
......@@ -246,6 +236,7 @@ vwe_init(waiter_handle_f *func, volatile double *tmo)
const struct waiter_impl waiter_epoll = {
.name = "epoll",
.init = vwe_init,
.size = sizeof(struct vwe),
};
#endif /* defined(HAVE_EPOLL_CTL) */
......@@ -52,10 +52,8 @@
struct vwk {
unsigned magic;
#define VWK_MAGIC 0x1cc2acc2
struct waiter waiter[1];
struct waiter *waiter;
waiter_handle_f *func;
volatile double *tmo;
pthread_t thread;
int kq;
struct kevent ki[NKEV];
......@@ -104,7 +102,7 @@ vwk_inject(const struct waiter *w, struct waited *wp)
/*--------------------------------------------------------------------*/
static void
vwk_sess_ev(struct vwk *vwk, const struct kevent *kp, double now)
vwk_sess_ev(const struct vwk *vwk, const struct kevent *kp, double now)
{
struct waited *sp;
......@@ -171,7 +169,7 @@ vwk_thread(void *priv)
* would not know we meant "the old fd of this number".
*/
vwk_kq_flush(vwk);
deadline = now - *vwk->tmo;
deadline = now - *vwk->waiter->tmo;
for (;;) {
sp = VTAILQ_FIRST(&vwk->waiter->sesshead);
if (sp == NULL)
......@@ -186,30 +184,22 @@ vwk_thread(void *priv)
/*--------------------------------------------------------------------*/
static struct waiter * __match_proto__(waiter_init_f)
vwk_init(waiter_handle_f *func, volatile double *tmo)
static void __match_proto__(waiter_init_f)
vwk_init(struct waiter *w)
{
struct vwk *vwk;
AN(func);
AN(tmo);
ALLOC_OBJ(vwk, VWK_MAGIC);
AN(vwk);
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
vwk = w->priv;
INIT_OBJ(vwk, VWK_MAGIC);
vwk->waiter = w;
INIT_OBJ(vwk->waiter, WAITER_MAGIC);
VTAILQ_INIT(&vwk->waiter->sesshead);
vwk->waiter->priv = vwk;
EV_SET(&vwk->ki[vwk->nki], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
vwk->nki++;
WAIT_UsePipe(vwk->waiter);
vwk->func = func;
vwk->tmo = tmo;
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
return (vwk->waiter);
}
/*--------------------------------------------------------------------*/
......@@ -218,6 +208,7 @@ const struct waiter_impl waiter_kqueue = {
.name = "kqueue",
.init = vwk_init,
.inject = vwk_inject,
.size = sizeof(struct vwk),
};
#endif /* defined(HAVE_KQUEUE) */
......@@ -46,10 +46,8 @@
struct vwp {
unsigned magic;
#define VWP_MAGIC 0x4b2cc735
struct waiter waiter[1];
struct waiter *waiter;
waiter_handle_f *func;
volatile double *tmo;
int pipes[2];
pthread_t poll_thread;
struct pollfd *pollfd;
......@@ -146,7 +144,7 @@ vwp_main(void *priv)
v = poll(vwp->pollfd, vwp->hpoll + 1, 100);
assert(v >= 0);
now = VTIM_real();
deadline = now - *vwp->tmo;
deadline = now - *vwp->waiter->tmo;
v2 = v;
VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->sesshead, list, sp2) {
if (v != 0 && v2 == 0)
......@@ -194,28 +192,23 @@ vwp_main(void *priv)
/*--------------------------------------------------------------------*/
static struct waiter * __match_proto__(waiter_init_f)
vwp_poll_init(waiter_handle_f *func, volatile double *tmo)
static void __match_proto__(waiter_init_f)
vwp_poll_init(struct waiter *w)
{
struct vwp *vwp;
AN(func);
ALLOC_OBJ(vwp, VWP_MAGIC);
AN(vwp);
INIT_OBJ(vwp->waiter, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
vwp = w->priv;
INIT_OBJ(vwp, VWP_MAGIC);
vwp->waiter = w;
VTAILQ_INIT(&vwp->waiter->sesshead);
AZ(pipe(vwp->pipes));
vwp->func = func;
vwp->tmo = tmo;
AZ(VFIL_nonblocking(vwp->pipes[1]));
vwp->waiter->pfd = vwp->pipes[1];
vwp_pollspace(vwp, 256);
AZ(pthread_create(&vwp->poll_thread, NULL, vwp_main, vwp));
return (vwp->waiter);
}
/*--------------------------------------------------------------------*/
......@@ -223,4 +216,5 @@ vwp_poll_init(waiter_handle_f *func, volatile double *tmo)
const struct waiter_impl waiter_poll = {
.name = "poll",
.init = vwp_poll_init,
.size = sizeof(struct vwp),
};
......@@ -51,10 +51,8 @@
struct vws {
unsigned magic;
#define VWS_MAGIC 0x0b771473
struct waiter waiter[1];
struct waiter *waiter;
waiter_handle_f *func;
volatile double *tmo;
pthread_t ports_thread;
int dport;
};
......@@ -191,7 +189,7 @@ vws_thread(void *priv)
vws_port_ev(vws, ev + ei, now);
/* check for timeouts */
deadline = now - *vws->tmo;
deadline = now - *vws->waiter->tmo;
/*
* This loop assumes that the oldest sessions are always at the
......@@ -251,22 +249,17 @@ vws_pass(void *priv, struct waited *sp)
/*--------------------------------------------------------------------*/
static struct waiter * __match_proto__(waiter_init_f)
vws_init(waiter_handle_f *func, volatile double *tmo)
static void __match_proto__(waiter_init_f)
vws_init(struct waiter *w)
{
struct vws *vws;
AN(func);
AN(tmo);
ALLOC_OBJ(vws, VWS_MAGIC);
AN(vws);
INIT_OBJ(vws->waiter, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
vws = w->priv;
INIT_OBJ(vws, VWS_MAGIC);
vws->waiter = w;
vws->func = func;
vws->tmo = tmo;
VTAILQ_INIT(&vws->waiter->sesshead);
AZ(pthread_create(&vws->ports_thread, NULL, vws_thread, vws));
return (vws->waiter);
}
/*--------------------------------------------------------------------*/
......@@ -274,7 +267,8 @@ vws_init(waiter_handle_f *func, volatile double *tmo)
const struct waiter_impl waiter_ports = {
.name = "ports",
.init = vws_init,
.pass = vws_pass
.pass = vws_pass,
.size = sizeof(struct vws),
};
#endif /* defined(HAVE_PORT_CREATE) */
......@@ -47,7 +47,7 @@ struct waiter {
int pfd;
};
typedef struct waiter* waiter_init_f(waiter_handle_f *, volatile double *);
typedef void waiter_init_f(struct waiter *);
typedef int waiter_pass_f(void *priv, struct waited *);
typedef void waiter_inject_f(const struct waiter *, struct waited *);
......@@ -56,6 +56,7 @@ struct waiter_impl {
waiter_init_f *init;
waiter_pass_f *pass;
waiter_inject_f *inject;
size_t size;
};
/* cache_waiter.c */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment