Commit 19729051 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Change epoll waiter to use common pipe code.

Various other cleanups while here.

There are a couple of central things I do not understand about how
this works, if anybody has epoll(2) clue, please contact me.
parent 6862bb63
......@@ -61,76 +61,44 @@ struct vwe {
pthread_t timer_thread;
int epfd;
int pipes[2];
int timer_pipes[2];
};
static void
vwe_modadd(struct vwe *vwe, int fd, void *data, short arm)
vwe_inject(const struct waiter *w, struct waited *wp)
{
struct vwe *vwe;
/* XXX: EPOLLET (edge triggered) can cause rather Bad Things to
* XXX: happen: If NEEV+1 threads get stuck in write(), all threads
* XXX: will hang. See #644.
*/
assert(fd >= 0);
assert(data == vwe->pipes || data == vwe->timer_pipes);
struct epoll_event ev = {
EPOLLIN | EPOLLPRI , { data }
};
AZ(epoll_ctl(vwe->epfd, arm, fd, &ev));
}
static void
vwe_cond_modadd(struct vwe *vwe, int fd, void *data)
{
struct waited *sp = (struct waited *)data;
CAST_OBJ_NOTNULL(vwe, w->priv, VWE_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
assert(wp->fd >= 0);
assert(fd >= 0);
CHECK_OBJ_NOTNULL(sp, WAITED_MAGIC);
if (sp->ev.data.ptr)
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_MOD, fd, &sp->ev));
if (wp->ev.data.ptr)
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_MOD, wp->fd, &wp->ev));
else {
sp->ev.data.ptr = data;
sp->ev.events = EPOLLIN | EPOLLPRI | EPOLLONESHOT | EPOLLRDHUP;
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_ADD, fd, &sp->ev));
wp->ev.data.ptr = wp;
wp->ev.events = EPOLLIN | EPOLLPRI | EPOLLRDHUP;
if (wp != w->pipe_w)
wp->ev.events |= EPOLLONESHOT;
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_ADD, wp->fd, &wp->ev));
}
}
static void
vwe_eev(struct vwe *vwe, const struct epoll_event *ep, double now)
{
struct waited *ss[NEEV], *sp;
int i, j;
struct waited *sp;
AN(ep->data.ptr);
if (ep->data.ptr == vwe->pipes) {
if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
j = 0;
i = read(vwe->pipes[0], ss, sizeof ss);
if (i == -1 && errno == EAGAIN)
return;
while (i >= sizeof ss[0]) {
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&vwe->waiter->sesshead, ss[j], list);
vwe_cond_modadd(vwe, ss[j]->fd, ss[j]);
j++;
i -= sizeof ss[0];
}
AZ(i);
}
} else {
CAST_OBJ_NOTNULL(sp, ep->data.ptr, WAITED_MAGIC);
if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
WAIT_handle(vwe->waiter, sp, WAITER_ACTION, now);
} else if (ep->events & EPOLLERR) {
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLHUP) {
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLRDHUP) {
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
}
CAST_OBJ_NOTNULL(sp, ep->data.ptr, WAITED_MAGIC);
if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
WAIT_handle(vwe->waiter, sp, WAITER_ACTION, now);
} else if (ep->events & EPOLLERR) {
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLHUP) {
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLRDHUP) {
WAIT_handle(vwe->waiter, sp, WAITER_REMCLOSE, now);
}
}
......@@ -140,7 +108,7 @@ static void *
vwe_thread(void *priv)
{
struct epoll_event ev[NEEV], *ep;
struct waited *sp;
struct waited *sp, *sp2;
char junk;
double now, deadline;
int dotimer, i, n;
......@@ -150,12 +118,6 @@ vwe_thread(void *priv)
THR_SetName("cache-epoll");
vwe->epfd = epoll_create(1);
assert(vwe->epfd >= 0);
vwe_modadd(vwe, vwe->pipes[0], vwe->pipes, EPOLL_CTL_ADD);
vwe_modadd(vwe, vwe->timer_pipes[0], vwe->timer_pipes, EPOLL_CTL_ADD);
while (1) {
dotimer = 0;
n = epoll_wait(vwe->epfd, ev, NEEV, -1);
......@@ -174,13 +136,10 @@ vwe_thread(void *priv)
/* check for timeouts */
deadline = now - *vwe->waiter->tmo;
for (;;) {
sp = VTAILQ_FIRST(&vwe->waiter->sesshead);
if (sp == NULL)
break;
if (sp->deadline > deadline)
break;
WAIT_handle(vwe->waiter, sp, WAITER_TIMEOUT, now);
VTAILQ_FOREACH_SAFE(sp, &vwe->waiter->sesshead, list, sp2) {
if (sp->deadline < deadline)
WAIT_handle(vwe->waiter, sp,
WAITER_TIMEOUT, now);
}
}
return (NULL);
......@@ -211,24 +170,27 @@ static void __match_proto__(waiter_init_f)
vwe_init(struct waiter *w)
{
struct vwe *vwe;
struct epoll_event ev;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
vwe = w->priv;
INIT_OBJ(vwe, VWE_MAGIC);
vwe->waiter = w;
AZ(pipe(vwe->pipes));
AZ(pipe(vwe->timer_pipes));
vwe->epfd = epoll_create(1);
assert(vwe->epfd >= 0);
AZ(VFIL_nonblocking(vwe->pipes[0]));
AZ(VFIL_nonblocking(vwe->pipes[1]));
AZ(VFIL_nonblocking(vwe->timer_pipes[0]));
WAIT_UsePipe(w);
w->pfd = vwe->pipes[1];
AZ(pipe(vwe->timer_pipes));
AZ(VFIL_nonblocking(vwe->timer_pipes[0]));
ev.data.ptr = vwe->timer_pipes;
ev.events = EPOLLIN | EPOLLPRI;
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_ADD, vwe->timer_pipes[0], &ev));
AZ(pthread_create(&vwe->epoll_thread, NULL, vwe_thread, vwe));
AZ(pthread_create(&vwe->timer_thread,
NULL, vwe_timeout_idle_ticker, vwe));
AZ(pthread_create(&vwe->epoll_thread, NULL, vwe_thread, vwe));
}
/*--------------------------------------------------------------------*/
......@@ -236,6 +198,7 @@ vwe_init(struct waiter *w)
const struct waiter_impl waiter_epoll = {
.name = "epoll",
.init = vwe_init,
.inject = vwe_inject,
.size = sizeof(struct vwe),
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment