Commit 64fbc761 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Bring back the kqueue waiter.

parent 385930c3
......@@ -376,6 +376,7 @@ struct waited {
int fd;
void *ptr;
double idle;
VTAILQ_ENTRY(waited) list;
};
/* Stored object -----------------------------------------------------
......
......@@ -26,12 +26,8 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* XXX: We need to pass sessions back into the event engine when they are
* reused. Not sure what the most efficient way is for that. For now
* write the session pointer to a pipe which the event engine monitors.
*/
#if 0
#include "config.h"
#if defined(HAVE_KQUEUE)
......@@ -39,6 +35,7 @@
#include <sys/types.h>
#include <sys/event.h>
#include <errno.h>
#include <stdlib.h>
#include "cache/cache.h"
......@@ -47,95 +44,18 @@
#include "waiter/waiter_priv.h"
#include "vtim.h"
#define NKEV 100
#define NKEV 256
struct vwk {
unsigned magic;
#define VWK_MAGIC 0x1cc2acc2
int kq;
struct waiter *waiter;
pthread_t thread;
int kq;
struct kevent ki[NKEV];
unsigned nki;
};
/*--------------------------------------------------------------------*/
static void
vwk_kq_flush(struct vwk *vwk)
{
int i;
if (vwk->nki == 0)
return;
i = kevent(vwk->kq, vwk->ki, vwk->nki, NULL, 0, NULL);
AZ(i);
vwk->nki = 0;
}
static void
vwk_kq_sess(struct vwk *vwk, struct waited *sp, short arm)
{
CHECK_OBJ_NOTNULL(sp, WAITED_MAGIC);
assert(sp->fd >= 0);
EV_SET(&vwk->ki[vwk->nki], sp->fd, EVFILT_READ, arm, 0, 0, sp);
if (++vwk->nki == NKEV)
vwk_kq_flush(vwk);
}
/*--------------------------------------------------------------------*/
static void
vwk_inject(const struct waiter *w, struct waited *wp)
{
struct vwk *vwk;
CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
if (wp == w->pipe_w)
vwk_kq_sess(vwk, wp, EV_ADD);
else
vwk_kq_sess(vwk, wp, EV_ADD | EV_ONESHOT);
}
#if 0
static void
vwk_evict(const struct waiter *w, struct waited *wp)
{
struct vwk *vwk;
CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
vwk_kq_sess(vwk, wp, EV_DELETE);
}
#endif
/*--------------------------------------------------------------------*/
static void
vwk_sess_ev(const struct vwk *vwk, const struct kevent *kp, double now)
{
struct waited *sp;
double idle;
AN(kp->udata);
CAST_OBJ_NOTNULL(sp, kp->udata, WAITED_MAGIC);
idle = now - *vwk->waiter->tmo;
if (sp->idle <= idle) {
Wait_Handle(vwk->waiter, sp, WAITER_TIMEOUT, now);
} else if (kp->flags & EV_EOF) {
Wait_Handle(vwk->waiter, sp, WAITER_REMCLOSE, now);
} else {
if (kp->data == 0)
VSL(SLT_Debug, 0,
"KQR d %ju filter %d data %jd flags 0x%x idle %g",
(uintmax_t)kp->ident, kp->filter,
(intmax_t)kp->data, kp->flags, sp->idle - idle);
Wait_Handle(vwk->waiter, sp, WAITER_ACTION, now);
}
}
VTAILQ_HEAD(,waited) list;
struct lock mtx;
};
/*--------------------------------------------------------------------*/
......@@ -145,30 +65,77 @@ vwk_thread(void *priv)
struct vwk *vwk;
struct kevent ke[NKEV], *kp;
int j, n;
double now;
double now, idle, last_idle;
struct timespec ts;
struct waited *wp, *wp2;
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
THR_SetName("cache-kqueue");
vwk_kq_flush(vwk);
vwk->nki = 0;
while (!vwk->waiter->dismantle) {
n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
last_idle = 0.0;
while (1) {
now = .3 * *vwk->waiter->tmo;
ts.tv_sec = (time_t)floor(now);
ts.tv_nsec = (long)(1e9 * (now - ts.tv_sec));
n = kevent(vwk->kq, NULL, 0, ke, NKEV, &ts);
if (n < 0 && errno == EBADF)
break;
assert(n <= NKEV);
if (n == 0) {
/* This happens on OSX in m00011.vtc */
(void)usleep(10000);
}
vwk->nki = 0;
now = VTIM_real();
idle = now - *vwk->waiter->tmo;
for (kp = ke, j = 0; j < n; j++, kp++) {
assert(kp->filter == EVFILT_READ);
vwk_sess_ev(vwk, kp, now);
CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
Lck_Lock(&vwk->mtx);
VTAILQ_REMOVE(&vwk->list, wp, list);
Lck_Unlock(&vwk->mtx);
if (wp->idle <= idle)
vwk->waiter->func(wp, WAITER_TIMEOUT, now);
else if (kp->flags & EV_EOF)
vwk->waiter->func(wp, WAITER_REMCLOSE, now);
else
vwk->waiter->func(wp, WAITER_ACTION, now);
}
if (now - last_idle > .3 * *vwk->waiter->tmo) {
last_idle = now;
n = 0;
Lck_Lock(&vwk->mtx);
VTAILQ_FOREACH_SAFE(wp, &vwk->list, list, wp2) {
if (wp->idle > idle)
continue;
EV_SET(ke + n, wp->fd,
EVFILT_READ, EV_DELETE, 0, 0, wp);
if (++n == NKEV)
break;
}
if (n > 0)
AZ(kevent(vwk->kq, ke, n, NULL, 0, NULL));
for (j = 0; j < n; j++) {
CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
VTAILQ_REMOVE(&vwk->list, wp, list);
vwk->waiter->func(wp, WAITER_TIMEOUT, now);
}
Lck_Unlock(&vwk->mtx);
}
Wait_Handle(vwk->waiter, NULL, WAITER_ACTION, now);
}
NEEDLESS_RETURN(NULL);
return(NULL);
}
/*--------------------------------------------------------------------*/
static int __match_proto__(waiter_enter_f)
vwk_enter(void *priv, struct waited *wp)
{
struct vwk *vwk;
struct kevent ke;
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
EV_SET(&ke, wp->fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, wp);
Lck_Lock(&vwk->mtx);
VTAILQ_INSERT_TAIL(&vwk->list, wp, list);
AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
Lck_Unlock(&vwk->mtx);
return(0);
}
/*--------------------------------------------------------------------*/
......@@ -185,13 +152,16 @@ vwk_init(struct waiter *w)
vwk->kq = kqueue();
assert(vwk->kq >= 0);
Wait_UsePipe(w);
VTAILQ_INIT(&vwk->list);
Lck_New(&vwk->mtx, lck_misc);
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
}
/*--------------------------------------------------------------------*/
/*--------------------------------------------------------------------
* It is the callers responsibility to trigger all fd's waited on to
* fail somehow.
*/
static void __match_proto__(waiter_fini_f)
vwk_fini(struct waiter *w)
......@@ -200,8 +170,16 @@ vwk_fini(struct waiter *w)
void *vp;
CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
AZ(pthread_join(vwk->thread, &vp));
Lck_Lock(&vwk->mtx);
while (!VTAILQ_EMPTY(&vwk->list)) {
Lck_Unlock(&vwk->mtx);
(void)usleep(100000);
Lck_Lock(&vwk->mtx);
}
AZ(close(vwk->kq));
vwk->kq = -1;
Lck_Unlock(&vwk->mtx);
AZ(pthread_join(vwk->thread, &vp));
}
/*--------------------------------------------------------------------*/
......@@ -210,9 +188,8 @@ const struct waiter_impl waiter_kqueue = {
.name = "kqueue",
.init = vwk_init,
.fini = vwk_fini,
.inject = vwk_inject,
.enter = vwk_enter,
.size = sizeof(struct vwk),
};
#endif /* defined(HAVE_KQUEUE) */
#endif
......@@ -39,10 +39,10 @@
#include "waiter/waiter_priv.h"
static const struct waiter_impl *const waiter_impls[] = {
#if 0
#if defined(HAVE_KQUEUE)
&waiter_kqueue,
#endif
#if 0
#if defined(HAVE_EPOLL_CTL)
&waiter_epoll,
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment