Commit ef6918d6 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Attempt to implement waiter destruction in kqueue and poll waiters.

parent d0341485
......@@ -272,6 +272,8 @@ ses_handle(struct waited *wp, enum wait_event ev, double now)
if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT))
SES_Delete(sp, SC_OVERLOAD, now);
break;
case WAITER_CLOSE:
WRONG("Should not see WAITER_CLOSE on client side");
default:
WRONG("Wrong event in ses_handle");
}
......
......@@ -124,13 +124,42 @@ void
Wait_Destroy(struct waiter **wp)
{
struct waiter *w;
struct waited *wx = NULL;
int written;
double now;
AN(wp);
w = *wp;
*wp = NULL;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
Lck_Lock(&wait_mtx);
VTAILQ_REMOVE(&waiters, w, list);
w->dismantle = 1;
Lck_Unlock(&wait_mtx);
if (w->pipes[1] >= 0) {
while (1) {
written = write(w->pipes[1], &wx, sizeof wx);
if (written == sizeof wx)
break;
(void)usleep(10000);
}
}
AN(w->impl->fini);
w->impl->fini(w);
now = VTIM_real();
while (1) {
wx = VTAILQ_FIRST(&w->waithead);
if (wx == NULL)
break;
VTAILQ_REMOVE(&w->waithead, wx, list);
if (wx == w->pipe_w)
FREE_OBJ(wx);
else
w->func(wx, WAITER_CLOSE, now);
}
FREE_OBJ(w);
}
void
......@@ -157,6 +186,7 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
assert(wp->fd >= 0);
AZ(w->dismantle);
if (w->impl->pass != NULL)
return (w->impl->pass(w->priv, wp));
......@@ -218,11 +248,13 @@ Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
if (ss[j] == w->pipe_w) {
dotimer = 1;
continue;
} else if (ss[j] == NULL) {
AN(w->dismantle);
} else {
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
w->impl->inject(w, ss[j]);
}
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
w->impl->inject(w, ss[j]);
}
AZ(i);
......
......@@ -57,7 +57,7 @@ struct vwe {
#define VWE_MAGIC 0x6bd73424
struct waiter *waiter;
pthread_t epoll_thread;
pthread_t thread;
int epfd;
};
......@@ -113,7 +113,7 @@ vwe_thread(void *priv)
THR_SetName("cache-epoll");
while (1) {
while (!vew->waiter->dismantle) {
n = epoll_wait(vwe->epfd, ev, NEEV, -1);
now = VTIM_real();
for (ep = ev, i = 0; i < n; i++, ep++)
......@@ -139,7 +139,21 @@ vwe_init(struct waiter *w)
Wait_UsePipe(w);
AZ(pthread_create(&vwe->epoll_thread, NULL, vwe_thread, vwe));
AZ(pthread_create(&vwe->thread, NULL, vwe_thread, vwe));
}
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_fini_f)
vwe_fini(struct waiter *w)
{
struct vwe *vwe;
void *vp;
CAST_OBJ_NOTNULL(vwe, w->priv, VWE_MAGIC);
AZ(pthread_join(vwe->thread, &vp));
WRONG("Not Yet Implemented");
}
/*--------------------------------------------------------------------*/
......@@ -147,6 +161,7 @@ vwe_init(struct waiter *w)
const struct waiter_impl waiter_epoll = {
.name = "epoll",
.init = vwe_init,
.fini = vwe_fini,
.inject = vwe_inject,
.size = sizeof(struct vwe),
};
......
......@@ -133,13 +133,10 @@ vwk_thread(void *priv)
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
THR_SetName("cache-kqueue");
vwk->kq = kqueue();
assert(vwk->kq >= 0);
vwk_kq_flush(vwk);
vwk->nki = 0;
while (1) {
while (!vwk->waiter->dismantle) {
n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
assert(n <= NKEV);
if (n == 0) {
......@@ -168,6 +165,9 @@ vwk_init(struct waiter *w)
INIT_OBJ(vwk, VWK_MAGIC);
vwk->waiter = w;
vwk->kq = kqueue();
assert(vwk->kq >= 0);
Wait_UsePipe(w);
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
......@@ -175,9 +175,23 @@ vwk_init(struct waiter *w)
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_fini_f)
vwk_fini(struct waiter *w)
{
struct vwk *vwk;
void *vp;
CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
AZ(pthread_join(vwk->thread, &vp));
AZ(close(vwk->kq));
}
/*--------------------------------------------------------------------*/
const struct waiter_impl waiter_kqueue = {
.name = "kqueue",
.init = vwk_init,
.fini = vwk_fini,
.inject = vwk_inject,
.size = sizeof(struct vwk),
};
......
......@@ -45,7 +45,7 @@ struct vwp {
#define VWP_MAGIC 0x4b2cc735
struct waiter *waiter;
pthread_t poll_thread;
pthread_t thread;
struct pollfd *pollfd;
unsigned npoll;
unsigned hpoll;
......@@ -137,7 +137,7 @@ vwp_main(void *priv)
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
THR_SetName("cache-poll");
while (1) {
while (!vwp->waiter->dismantle) {
assert(vwp->hpoll < vwp->npoll);
while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
vwp->hpoll--;
......@@ -172,7 +172,7 @@ vwp_main(void *priv)
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_init_f)
vwp_poll_init(struct waiter *w)
vwp_init(struct waiter *w)
{
struct vwp *vwp;
......@@ -183,14 +183,28 @@ vwp_poll_init(struct waiter *w)
vwp_pollspace(vwp, 256);
Wait_UsePipe(w);
AZ(pthread_create(&vwp->poll_thread, NULL, vwp_main, vwp));
AZ(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
}
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_fini_f)
vwp_fini(struct waiter *w)
{
struct vwp *vwp;
void *vp;
CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
AZ(pthread_join(vwp->thread, &vp));
free(vwp->pollfd);
}
/*--------------------------------------------------------------------*/
const struct waiter_impl waiter_poll = {
.name = "poll",
.init = vwp_poll_init,
.init = vwp_init,
.fini = vwp_fini,
.inject = vwp_inject,
.evict = vwp_evict,
.size = sizeof(struct vwp),
......
......@@ -53,7 +53,7 @@ struct vws {
#define VWS_MAGIC 0x0b771473
struct waiter *waiter;
pthread_t ports_thread;
pthread_t thread;
int dport;
};
......@@ -150,7 +150,7 @@ vws_thread(void *priv)
timeout = &max_ts;
while (1) {
while (!vws->waiter->dismantle) {
port_event_t ev[MAX_EVENTS];
u_int nevents;
int ei, ret;
......@@ -259,7 +259,20 @@ vws_init(struct waiter *w)
INIT_OBJ(vws, VWS_MAGIC);
vws->waiter = w;
AZ(pthread_create(&vws->ports_thread, NULL, vws_thread, vws));
AZ(pthread_create(&vws->thread, NULL, vws_thread, vws));
}
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_fini_f)
vws_fini(struct waiter *w)
{
struct vws *vwe;
void *vp;
CAST_OBJ_NOTNULL(vws, w->priv, VWS_MAGIC);
AZ(pthread_join(vwp->thread, &vp));
WRONG("Not Yet Implemented");
}
/*--------------------------------------------------------------------*/
......@@ -267,6 +280,7 @@ vws_init(struct waiter *w)
const struct waiter_impl waiter_ports = {
.name = "ports",
.init = vws_init,
.fini = vws_fini,
.pass = vws_pass,
.size = sizeof(struct vws),
};
......
......@@ -48,7 +48,8 @@ struct waiter;
enum wait_event {
WAITER_REMCLOSE,
WAITER_TIMEOUT,
WAITER_ACTION
WAITER_ACTION,
WAITER_CLOSE
};
#define WAITER_DEFAULT "platform dependent"
......
......@@ -36,6 +36,7 @@ struct waiter {
#define WAITER_MAGIC 0x17c399db
const struct waiter_impl *impl;
VTAILQ_ENTRY(waiter) list;
int dismantle;
waiter_handle_f * func;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment