Commit cbdb4874 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

More shuffling deskchairs in waiters.

parent 777bbffc
......@@ -60,6 +60,8 @@ WAIT_Init(waiter_handle_f *func, volatile double *tmo)
AN(waiter->init);
w = waiter->init(func, tmo);
w->impl = waiter;
w->func = func;
w->tmo = tmo;
AN(w->impl->pass || w->pfd > 0);
return (w);
}
......@@ -83,3 +85,13 @@ WAIT_Enter(const struct waiter *w, struct waited *wp)
assert (written == sizeof wp);
return (0);
}
void
WAIT_handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
{
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
VTAILQ_REMOVE(&w->sesshead, wp, list);
w->func(wp, ev, now);
}
......@@ -64,7 +64,6 @@ struct vwe {
waiter_handle_f *func;
volatile double *tmo;
VTAILQ_HEAD(,waited) sesshead;
int pipes[2];
int timer_pipes[2];
};
......@@ -117,7 +116,7 @@ vwe_eev(struct vwe *vwe, const struct epoll_event *ep, double now)
while (i >= sizeof ss[0]) {
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&vwe->sesshead, ss[j], list);
VTAILQ_INSERT_TAIL(&vwe->waiter->sesshead, ss[j], list);
vwe_cond_modadd(vwe, ss[j]->fd, ss[j]);
j++;
i -= sizeof ss[0];
......@@ -127,17 +126,13 @@ vwe_eev(struct vwe *vwe, const struct epoll_event *ep, double now)
} else {
CAST_OBJ_NOTNULL(sp, ep->data.ptr, WAITED_MAGIC);
if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
vwe->func(sp, WAITER_ACTION, now);
WAIT_handle(vwe->waiter, sp, WAITER_ACTION, now);
} else if (ep->events & EPOLLERR) {
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
vwe->func(sp, WAITER_REMCLOSE, now);
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLHUP) {
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
vwe->func(sp, WAITER_REMCLOSE, now);
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLRDHUP) {
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
vwe->func(sp, WAITER_REMCLOSE, now);
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
}
}
}
......@@ -183,13 +178,12 @@ vwe_thread(void *priv)
/* check for timeouts */
deadline = now - *vwe->tmo;
for (;;) {
sp = VTAILQ_FIRST(&vwe->sesshead);
sp = VTAILQ_FIRST(&vwe->waiter->sesshead);
if (sp == NULL)
break;
if (sp->deadline > deadline)
break;
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
vwe->func(sp, WAITER_TIMEOUT, now);
WAIT_handle(vwe->waiter, sp, WAITER_TIMEOUT, now);
}
}
return (NULL);
......@@ -229,7 +223,7 @@ vwe_init(waiter_handle_f *func, volatile double *tmo)
INIT_OBJ(vwe->waiter, WAITER_MAGIC);
VTAILQ_INIT(&vwe->sesshead);
VTAILQ_INIT(&vwe->waiter->sesshead);
AZ(pipe(vwe->pipes));
AZ(pipe(vwe->timer_pipes));
......
......@@ -62,7 +62,6 @@ struct vwk {
int kq;
struct kevent ki[NKEV];
unsigned nki;
VTAILQ_HEAD(,waited) sesshead;
};
/*--------------------------------------------------------------------*/
......@@ -107,7 +106,7 @@ vwk_pipe_ev(struct vwk *vwk, const struct kevent *kp)
while (i >= sizeof ss[0]) {
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&vwk->sesshead, ss[j], list);
VTAILQ_INSERT_TAIL(&vwk->waiter->sesshead, ss[j], list);
vwk_kq_sess(vwk, ss[j], EV_ADD | EV_ONESHOT);
j++;
i -= sizeof ss[0];
......@@ -127,12 +126,10 @@ vwk_sess_ev(struct vwk *vwk, const struct kevent *kp, double now)
CAST_OBJ_NOTNULL(sp, kp->udata, WAITED_MAGIC);
if (kp->data > 0) {
VTAILQ_REMOVE(&vwk->sesshead, sp, list);
vwk->func(sp, WAITER_ACTION, now);
WAIT_handle(vwk->waiter, sp, WAITER_ACTION, now);
return;
} else if (kp->flags & EV_EOF) {
VTAILQ_REMOVE(&vwk->sesshead, sp, list);
vwk->func(sp, WAITER_REMCLOSE, now);
WAIT_handle(vwk->waiter, sp, WAITER_REMCLOSE, now);
return;
} else {
WRONG("unknown kqueue state");
......@@ -198,14 +195,12 @@ vwk_thread(void *priv)
vwk_kq_flush(vwk);
deadline = now - *vwk->tmo;
for (;;) {
sp = VTAILQ_FIRST(&vwk->sesshead);
sp = VTAILQ_FIRST(&vwk->waiter->sesshead);
if (sp == NULL)
break;
if (sp->deadline > deadline)
break;
VTAILQ_REMOVE(&vwk->sesshead, sp, list);
// XXX: not yet (void)VTCP_linger(sp->fd, 0);
vwk->func(sp, WAITER_TIMEOUT, now);
WAIT_handle(vwk->waiter, sp, WAITER_TIMEOUT, now);
}
}
NEEDLESS_RETURN(NULL);
......@@ -228,7 +223,7 @@ vwk_init(waiter_handle_f *func, volatile double *tmo)
vwk->func = func;
vwk->tmo = tmo;
VTAILQ_INIT(&vwk->sesshead);
VTAILQ_INIT(&vwk->waiter->sesshead);
AZ(pipe(vwk->pipes));
AZ(VFIL_nonblocking(vwk->pipes[0]));
......
......@@ -55,8 +55,6 @@ struct vwp {
struct pollfd *pollfd;
unsigned npoll;
unsigned hpoll;
VTAILQ_HEAD(,waited) sesshead;
};
/*--------------------------------------------------------------------*/
......@@ -150,7 +148,7 @@ vwp_main(void *priv)
now = VTIM_real();
deadline = now - *vwp->tmo;
v2 = v;
VTAILQ_FOREACH_SAFE(sp, &vwp->sesshead, list, sp2) {
VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->sesshead, list, sp2) {
if (v != 0 && v2 == 0)
break;
CHECK_OBJ_NOTNULL(sp, WAITED_MAGIC);
......@@ -162,14 +160,13 @@ vwp_main(void *priv)
if (vwp->pollfd[fd].revents) {
v2--;
vwp->pollfd[fd].revents = 0;
VTAILQ_REMOVE(&vwp->sesshead, sp, list);
vwp_unpoll(vwp, fd);
vwp->func(sp, WAITER_ACTION, now);
WAIT_handle(vwp->waiter, sp, WAITER_ACTION,
now);
} else if (sp->deadline <= deadline) {
VTAILQ_REMOVE(&vwp->sesshead, sp, list);
vwp_unpoll(vwp, fd);
// XXX: not yet (void)VTCP_linger(sp->fd, 0);
vwp->func(sp, WAITER_TIMEOUT, now);
WAIT_handle(vwp->waiter, sp, WAITER_TIMEOUT,
now);
}
}
if (v2 && vwp->pollfd[vwp->pipes[0]].revents) {
......@@ -186,7 +183,7 @@ vwp_main(void *priv)
for (j = 0; j * sizeof ss[0] < i; j++) {
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&vwp->sesshead, ss[j], list);
VTAILQ_INSERT_TAIL(&vwp->waiter->sesshead, ss[j], list);
vwp_poll(vwp, ss[j]->fd);
}
}
......@@ -207,7 +204,7 @@ vwp_poll_init(waiter_handle_f *func, volatile double *tmo)
AN(vwp);
INIT_OBJ(vwp->waiter, WAITER_MAGIC);
VTAILQ_INIT(&vwp->sesshead);
VTAILQ_INIT(&vwp->waiter->sesshead);
AZ(pipe(vwp->pipes));
vwp->func = func;
......
......@@ -57,7 +57,6 @@ struct vws {
volatile double *tmo;
pthread_t ports_thread;
int dport;
VTAILQ_HEAD(,waited) sesshead;
};
static inline void
......@@ -82,7 +81,7 @@ vws_port_ev(struct vws *vws, port_event_t *ev, double now) {
if(ev->portev_source == PORT_SOURCE_USER) {
CAST_OBJ_NOTNULL(sp, ev->portev_user, WAITED_MAGIC);
assert(sp->fd >= 0);
VTAILQ_INSERT_TAIL(&vws->sesshead, sp, list);
VTAILQ_INSERT_TAIL(&vws->waiter->sesshead, sp, list);
vws_add(vws, sp->fd, sp);
} else {
assert(ev->portev_source == PORT_SOURCE_FD);
......@@ -90,8 +89,7 @@ vws_port_ev(struct vws *vws, port_event_t *ev, double now) {
assert(sp->fd >= 0);
if(ev->portev_events & POLLERR) {
vws_del(vws, sp->fd);
VTAILQ_REMOVE(&vws->sesshead, sp, list);
vws->func(sp, WAITER_REMCLOSE, now);
WAIT_handle(vws->waiter, sp, WAITER_REMCLOSE, now);
return;
}
......@@ -109,10 +107,7 @@ vws_port_ev(struct vws *vws, port_event_t *ev, double now) {
* threadID=129476&tstart=0
*/
vws_del(vws, sp->fd);
VTAILQ_REMOVE(&vws->sesshead, sp, list);
/* also handle errors */
vws->func(sp, WAITER_ACTION, now);
WAIT_handle(vws->waiter, sp, WAITER_ACTION, now);
}
return;
}
......@@ -206,17 +201,14 @@ vws_thread(void *priv)
*/
for (;;) {
sp = VTAILQ_FIRST(&vws->sesshead);
sp = VTAILQ_FIRST(&vws->waiter->sesshead);
if (sp == NULL)
break;
if (sp->deadline > deadline) {
break;
}
VTAILQ_REMOVE(&vws->sesshead, sp, list);
if(sp->fd != -1) {
vws_del(vws, sp->fd);
}
vws->func(sp, WAITER_TIMEOUT, now);
vws_del(vws, sp->fd);
WAIT_handle(vws->waiter, sp, WAITER_TIMEOUT, now);
}
/*
......@@ -272,7 +264,7 @@ vws_init(waiter_handle_f *func, volatile double *tmo)
vws->func = func;
vws->tmo = tmo;
VTAILQ_INIT(&vws->sesshead);
VTAILQ_INIT(&vws->waiter->sesshead);
AZ(pthread_create(&vws->ports_thread, NULL, vws_thread, vws));
return (vws->waiter);
}
......
......@@ -35,6 +35,11 @@ struct waiter {
unsigned magic;
#define WAITER_MAGIC 0x17c399db
const struct waiter_impl *impl;
waiter_handle_f * func;
volatile double *tmo;
VTAILQ_HEAD(,waited) sesshead;
void *priv;
int pfd;
};
......@@ -48,6 +53,9 @@ struct waiter_impl {
waiter_pass_f *pass;
};
/* cache_waiter.c */
void WAIT_handle(struct waiter *, struct waited *, enum wait_event, double now);
/* mgt_waiter.c */
extern struct waiter_impl const * waiter;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment