Commit 27bab722 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Update and reenable epoll(2) waiter to new reality.

parent d74fd185
......@@ -31,7 +31,6 @@
* write the session pointer to a pipe which the event engine monitors.
*/
#if 0
#include "config.h"
#if defined(HAVE_EPOLL_CTL)
......@@ -56,27 +55,14 @@
struct vwe {
unsigned magic;
#define VWE_MAGIC 0x6bd73424
int epfd;
struct waiter *waiter;
pthread_t thread;
int epfd;
VTAILQ_HEAD(,waited) list;
struct lock mtx;
int die;
};
static void
vwe_inject(const struct waiter *w, struct waited *wp)
{
struct vwe *vwe;
struct epoll_event ev;
CAST_OBJ_NOTNULL(vwe, w->priv, VWE_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
assert(wp->fd >= 0);
ev.data.ptr = wp;
ev.events = EPOLLIN | EPOLLRDHUP;
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_ADD, wp->fd, &ev));
}
static void
vwe_eev(struct vwe *vwe, const struct epoll_event *ep, double now)
{
......@@ -84,16 +70,14 @@ vwe_eev(struct vwe *vwe, const struct epoll_event *ep, double now)
AN(ep->data.ptr);
CAST_OBJ_NOTNULL(wp, ep->data.ptr, WAITED_MAGIC);
if (wp != vwe->waiter->pipe_w)
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_DEL, wp->fd, NULL));
if (ep->events & EPOLLIN) {
Wait_Handle(vwe->waiter, wp, WAITER_ACTION, now);
vwe->waiter->func(wp, WAITER_ACTION, now);
} else if (ep->events & EPOLLERR) {
Wait_Handle(vwe->waiter, wp, WAITER_REMCLOSE, now);
vwe->waiter->func(wp, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLHUP) {
Wait_Handle(vwe->waiter, wp, WAITER_REMCLOSE, now);
vwe->waiter->func(wp, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLRDHUP) {
Wait_Handle(vwe->waiter, wp, WAITER_REMCLOSE, now);
vwe->waiter->func(wp, WAITER_REMCLOSE, now);
}
}
......@@ -103,26 +87,77 @@ static void *
vwe_thread(void *priv)
{
struct epoll_event ev[NEEV], *ep;
double now;
struct waited *wp, *wp2;
double now, idle, last_idle;
int i, n;
struct vwe *vwe;
VTAILQ_HEAD(,waited) tlist;
CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
THR_SetName("cache-epoll");
while (!vwe->waiter->dismantle) {
n = epoll_wait(vwe->epfd, ev, NEEV, -1);
last_idle = 0.0;
while (1) {
i = floor(.3 * 1e3 * *vwe->waiter->tmo);
n = epoll_wait(vwe->epfd, ev, NEEV, i);
if (n < 0 && vwe->die)
break;
assert(n >= 0);
now = VTIM_real();
for (ep = ev, i = 0; i < n; i++, ep++)
for (ep = ev, i = 0; i < n; i++, ep++) {
CAST_OBJ_NOTNULL(wp, ep->data.ptr, WAITED_MAGIC);
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_DEL, wp->fd, NULL));
Lck_Lock(&vwe->mtx);
VTAILQ_REMOVE(&vwe->list, wp, list);
Lck_Unlock(&vwe->mtx);
vwe_eev(vwe, ep, now);
Wait_Handle(vwe->waiter, NULL, WAITER_ACTION, now);
}
idle = now - *vwe->waiter->tmo;
if (now - last_idle < .3 * *vwe->waiter->tmo)
continue;
last_idle = now;
VTAILQ_INIT(&tlist);
Lck_Lock(&vwe->mtx);
VTAILQ_FOREACH_SAFE(wp, &vwe->list, list, wp2) {
if (wp->idle > idle)
continue;
VTAILQ_REMOVE(&vwe->list, wp, list);
VTAILQ_INSERT_TAIL(&tlist, wp, list);
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_DEL, wp->fd, NULL));
}
Lck_Unlock(&vwe->mtx);
while(1) {
wp = VTAILQ_FIRST(&tlist);
if (wp == NULL)
break;
VTAILQ_REMOVE(&tlist, wp, list);
vwe->waiter->func(wp, WAITER_TIMEOUT, now);
}
}
return (NULL);
}
/*--------------------------------------------------------------------*/
static int __match_proto__(waiter_enter_f)
vwe_enter(void *priv, struct waited *wp)
{
struct vwe *vwe;
struct epoll_event ee;
CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
ee.events = EPOLLIN | EPOLLRDHUP;
ee.data.ptr = wp;
Lck_Lock(&vwe->mtx);
VTAILQ_INSERT_TAIL(&vwe->list, wp, list);
AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_ADD, wp->fd, &ee));
Lck_Unlock(&vwe->mtx);
return(0);
}
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_init_f)
vwe_init(struct waiter *w)
{
......@@ -135,24 +170,39 @@ vwe_init(struct waiter *w)
vwe->epfd = epoll_create(1);
assert(vwe->epfd >= 0);
Wait_UsePipe(w);
VTAILQ_INIT(&vwe->list);
Lck_New(&vwe->mtx, lck_misc);
AZ(pthread_create(&vwe->thread, NULL, vwe_thread, vwe));
}
/*--------------------------------------------------------------------*/
/*--------------------------------------------------------------------
* It is the callers responsibility to trigger all fd's waited on to
* fail somehow.
*/
static void __match_proto__(waiter_fini_f)
vwe_fini(struct waiter *w)
{
struct vwe *vwe;
void *vp;
int i;
CAST_OBJ_NOTNULL(vwe, w->priv, VWE_MAGIC);
Lck_Lock(&vwe->mtx);
while (!VTAILQ_EMPTY(&vwe->list)) {
Lck_Unlock(&vwe->mtx);
(void)usleep(100000);
Lck_Lock(&vwe->mtx);
}
vwe->die = 1;
i = vwe->epfd;
vwe->epfd = -1;
AZ(close(i));
Lck_Unlock(&vwe->mtx);
AZ(pthread_join(vwe->thread, &vp));
AZ(close(vwe->epfd));
Lck_Delete(&vwe->mtx);
}
/*--------------------------------------------------------------------*/
......@@ -161,9 +211,8 @@ const struct waiter_impl waiter_epoll = {
.name = "epoll",
.init = vwe_init,
.fini = vwe_fini,
.inject = vwe_inject,
.enter = vwe_enter,
.size = sizeof(struct vwe),
};
#endif /* defined(HAVE_EPOLL_CTL) */
#endif
......@@ -42,10 +42,10 @@ static const struct waiter_impl *const waiter_impls[] = {
#if defined(HAVE_KQUEUE)
&waiter_kqueue,
#endif
#if 0
#if defined(HAVE_EPOLL_CTL)
&waiter_epoll,
#endif
#if 0
#if defined(HAVE_PORT_CREATE)
&waiter_ports,
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment