Commit 493f9757 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Convert waiter_poll to the central pipe code

parent 65fbdd20
......@@ -90,6 +90,7 @@ WAIT_UsePipe(struct waiter *w)
ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
w->pipe_w->fd = w->pipes[0];
w->pipe_w->deadline = 9e99;
VTAILQ_INSERT_HEAD(&w->sesshead, w->pipe_w, list);
waiter->inject(w, w->pipe_w);
}
......
......@@ -197,7 +197,7 @@ vwk_init(struct waiter *w)
EV_SET(&vwk->ki[vwk->nki], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
vwk->nki++;
WAIT_UsePipe(vwk->waiter);
WAIT_UsePipe(w);
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
}
......
......@@ -39,16 +39,12 @@
#include "waiter/waiter.h"
#include "waiter/waiter_priv.h"
#include "vtim.h"
#include "vfil.h"
#define NEEV 128
struct vwp {
unsigned magic;
#define VWP_MAGIC 0x4b2cc735
struct waiter *waiter;
int pipes[2];
pthread_t poll_thread;
struct pollfd *pollfd;
unsigned npoll;
......@@ -102,6 +98,15 @@ vwp_poll(struct vwp *vwp, int fd)
vwp->pollfd[fd].events = POLLIN;
}
static void __match_proto__(waiter_inject_f)
vwp_inject(const struct waiter *w, struct waited *wp)
{
struct vwp *vwp;
CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
vwp_poll(vwp, wp->fd);
}
static void
vwp_unpoll(struct vwp *vwp, int fd)
{
......@@ -125,27 +130,22 @@ vwp_main(void *priv)
{
int v, v2;
struct vwp *vwp;
struct waited *ss[NEEV], *sp, *sp2;
struct waited *sp, *sp2;
double now, deadline;
int i, j, fd;
int fd;
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
THR_SetName("cache-poll");
vwp_poll(vwp, vwp->pipes[0]);
while (1) {
assert(vwp->hpoll < vwp->npoll);
while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
vwp->hpoll--;
assert(vwp->pipes[0] <= vwp->hpoll);
assert(vwp->pollfd[vwp->pipes[0]].fd == vwp->pipes[0]);
assert(vwp->pollfd[vwp->pipes[1]].fd == -1);
v = poll(vwp->pollfd, vwp->hpoll + 1, 100);
assert(v >= 0);
v2 = v;
now = VTIM_real();
deadline = now - *vwp->waiter->tmo;
v2 = v;
VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->sesshead, list, sp2) {
if (v != 0 && v2 == 0)
break;
......@@ -158,7 +158,8 @@ vwp_main(void *priv)
if (vwp->pollfd[fd].revents) {
v2--;
vwp->pollfd[fd].revents = 0;
vwp_unpoll(vwp, fd);
if (sp != vwp->waiter->pipe_w)
vwp_unpoll(vwp, fd);
WAIT_handle(vwp->waiter, sp, WAITER_ACTION,
now);
} else if (sp->deadline <= deadline) {
......@@ -167,25 +168,6 @@ vwp_main(void *priv)
now);
}
}
if (v2 && vwp->pollfd[vwp->pipes[0]].revents) {
if (vwp->pollfd[vwp->pipes[0]].revents != POLLIN)
VSL(SLT_Debug, 0, "pipe.revents= 0x%x",
vwp->pollfd[vwp->pipes[0]].revents);
assert(vwp->pollfd[vwp->pipes[0]].revents == POLLIN);
vwp->pollfd[vwp->pipes[0]].revents = 0;
v2--;
i = read(vwp->pipes[0], ss, sizeof ss);
assert(i >= 0);
AZ((unsigned)i % sizeof ss[0]);
for (j = 0; j * sizeof ss[0] < i; j++) {
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
assert(ss[j]->fd >= 0);
VTAILQ_INSERT_TAIL(&vwp->waiter->sesshead, ss[j], list);
vwp_poll(vwp, ss[j]->fd);
}
}
AZ(v2);
}
NEEDLESS_RETURN(NULL);
}
......@@ -202,12 +184,8 @@ vwp_poll_init(struct waiter *w)
INIT_OBJ(vwp, VWP_MAGIC);
vwp->waiter = w;
AZ(pipe(vwp->pipes));
AZ(VFIL_nonblocking(vwp->pipes[1]));
vwp->waiter->pfd = vwp->pipes[1];
vwp_pollspace(vwp, 256);
WAIT_UsePipe(w);
AZ(pthread_create(&vwp->poll_thread, NULL, vwp_main, vwp));
}
......@@ -216,5 +194,6 @@ vwp_poll_init(struct waiter *w)
const struct waiter_impl waiter_poll = {
.name = "poll",
.init = vwp_poll_init,
.inject = vwp_inject,
.size = sizeof(struct vwp),
};
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment