Commit 3c1c9703 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp Committed by Pål Hermunn Johansen

Dramatically simplify VEV

Fixes: #2200
parent 2bc20ee0
...@@ -18,6 +18,6 @@ client c1 { ...@@ -18,6 +18,6 @@ client c1 {
txreq txreq
} -run } -run
delay 5 delay 7
varnish v1 -cliok "panic.show" varnish v1 -cliok "panic.show"
...@@ -41,6 +41,7 @@ struct vev { ...@@ -41,6 +41,7 @@ struct vev {
const char *name; const char *name;
int fd; int fd;
unsigned fd_flags; unsigned fd_flags;
unsigned fd_events;
#define EV_RD POLLIN #define EV_RD POLLIN
#define EV_WR POLLOUT #define EV_WR POLLOUT
#define EV_ERR POLLERR #define EV_ERR POLLERR
...@@ -54,11 +55,9 @@ struct vev { ...@@ -54,11 +55,9 @@ struct vev {
/* priv */ /* priv */
double __when; double __when;
VTAILQ_ENTRY(vev) __list;
unsigned __binheap_idx; unsigned __binheap_idx;
unsigned __privflags; unsigned __privflags;
struct vev_base *__vevb; struct vev_base *__vevb;
int __poll_idx;
}; };
struct vev_base *vev_new_base(void); struct vev_base *vev_new_base(void);
......
...@@ -43,7 +43,6 @@ ...@@ -43,7 +43,6 @@
#include "vas.h" #include "vas.h"
#include "binary_heap.h" #include "binary_heap.h"
#include "vqueue.h"
#include "vev.h" #include "vev.h"
#include "vtim.h" #include "vtim.h"
...@@ -67,13 +66,11 @@ static int vev_nsig; ...@@ -67,13 +66,11 @@ static int vev_nsig;
struct vev_base { struct vev_base {
unsigned magic; unsigned magic;
#define VEV_BASE_MAGIC 0x477bcf3d #define VEV_BASE_MAGIC 0x477bcf3d
VTAILQ_HEAD(,vev) events;
struct pollfd *pfd; struct pollfd *pfd;
struct vev **pev;
unsigned npfd; unsigned npfd;
unsigned lpfd; unsigned lpfd;
struct binheap *binheap; struct binheap *binheap;
unsigned char compact_pfd;
unsigned char disturbed;
unsigned psig; unsigned psig;
pthread_t thread; pthread_t thread;
#ifdef DEBUG_EVENTS #ifdef DEBUG_EVENTS
...@@ -90,6 +87,7 @@ struct vev_base { ...@@ -90,6 +87,7 @@ struct vev_base {
} while (0); } while (0);
#else #else
#define DBG(evb, ...) /* ... */ #define DBG(evb, ...) /* ... */
//#define DBG(evb, ...) fprintf(stderr, __VA_ARGS__);
#endif #endif
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
...@@ -102,7 +100,14 @@ vev_bh_update(void *priv, void *a, unsigned u) ...@@ -102,7 +100,14 @@ vev_bh_update(void *priv, void *a, unsigned u)
CAST_OBJ_NOTNULL(evb, priv, VEV_BASE_MAGIC); CAST_OBJ_NOTNULL(evb, priv, VEV_BASE_MAGIC);
CAST_OBJ_NOTNULL(e, a, VEV_MAGIC); CAST_OBJ_NOTNULL(e, a, VEV_MAGIC);
assert(u < evb->lpfd);
e->__binheap_idx = u; e->__binheap_idx = u;
if (u != BINHEAP_NOIDX) {
evb->pev[u] = e;
evb->pfd[u].fd = e->fd;
evb->pfd[u].events =
e->fd_flags & (EV_RD|EV_WR|EV_ERR|EV_HUP);
}
} }
static int __match_proto__(binheap_cmp_t) static int __match_proto__(binheap_cmp_t)
...@@ -123,7 +128,6 @@ static int ...@@ -123,7 +128,6 @@ static int
vev_get_pfd(struct vev_base *evb) vev_get_pfd(struct vev_base *evb)
{ {
unsigned u; unsigned u;
void *p;
if (evb->lpfd + 1 < evb->npfd) if (evb->lpfd + 1 < evb->npfd)
return (0); return (0);
...@@ -134,11 +138,11 @@ vev_get_pfd(struct vev_base *evb) ...@@ -134,11 +138,11 @@ vev_get_pfd(struct vev_base *evb)
u = evb->npfd + 256; u = evb->npfd + 256;
else else
u = evb->npfd * 2; u = evb->npfd * 2;
p = realloc(evb->pfd, sizeof *evb->pfd * u);
if (p == NULL)
return (1);
evb->npfd = u; evb->npfd = u;
evb->pfd = p; evb->pfd = realloc(evb->pfd, sizeof(*evb->pfd) * u);
AN(evb->pfd);
evb->pev = realloc(evb->pev, sizeof(*evb->pev) * u);
AN(evb->pev);
return (0); return (0);
} }
...@@ -190,12 +194,12 @@ vev_new_base(void) ...@@ -190,12 +194,12 @@ vev_new_base(void)
evb = calloc(sizeof *evb, 1); evb = calloc(sizeof *evb, 1);
if (evb == NULL) if (evb == NULL)
return (evb); return (evb);
evb->lpfd = BINHEAP_NOIDX + 1;
if (vev_get_pfd(evb)) { if (vev_get_pfd(evb)) {
free(evb); free(evb);
return (NULL); return (NULL);
} }
evb->magic = VEV_BASE_MAGIC; evb->magic = VEV_BASE_MAGIC;
VTAILQ_INIT(&evb->events);
evb->binheap = binheap_new(evb, vev_bh_cmp, vev_bh_update); evb->binheap = binheap_new(evb, vev_bh_cmp, vev_bh_update);
evb->thread = pthread_self(); evb->thread = pthread_self();
#ifdef DEBUG_EVENTS #ifdef DEBUG_EVENTS
...@@ -248,13 +252,14 @@ vev_add(struct vev_base *evb, struct vev *e) ...@@ -248,13 +252,14 @@ vev_add(struct vev_base *evb, struct vev *e)
assert(evb->thread == pthread_self()); assert(evb->thread == pthread_self());
DBG(evb, "ev_add(%p) fd = %d\n", e, e->fd); DBG(evb, "ev_add(%p) fd = %d\n", e, e->fd);
if (e->sig > 0 && vev_get_sig(e->sig)) if (vev_get_pfd(evb))
return (ENOMEM); return (ENOMEM);
if (e->fd >= 0 && vev_get_pfd(evb)) if (e->sig > 0) {
if (vev_get_sig(e->sig))
return (ENOMEM); return (ENOMEM);
if (e->sig > 0) { assert(e->fd < 0);
es = &vev_sigs[e->sig]; es = &vev_sigs[e->sig];
if (es->vev != NULL) if (es->vev != NULL)
return (EBUSY); return (EBUSY);
...@@ -267,36 +272,19 @@ vev_add(struct vev_base *evb, struct vev *e) ...@@ -267,36 +272,19 @@ vev_add(struct vev_base *evb, struct vev *e)
es = NULL; es = NULL;
} }
if (e->fd >= 0) {
assert(evb->lpfd < evb->npfd);
evb->pfd[evb->lpfd].fd = e->fd;
evb->pfd[evb->lpfd].events =
e->fd_flags & (EV_RD|EV_WR|EV_ERR|EV_HUP);
e->__poll_idx = evb->lpfd;
evb->lpfd++;
DBG(evb, "... pidx = %d lpfd = %d\n",
e->__poll_idx, evb->lpfd);
} else
e->__poll_idx = -1;
e->magic = VEV_MAGIC; /* before binheap_insert() */ e->magic = VEV_MAGIC; /* before binheap_insert() */
if (e->timeout != 0.0) { if (e->timeout != 0.0)
e->__when += VTIM_mono() + e->timeout; e->__when += VTIM_mono() + e->timeout;
else
e->__when = 9e99;
evb->lpfd++;
binheap_insert(evb->binheap, e); binheap_insert(evb->binheap, e);
assert(e->__binheap_idx > 0); assert(e->__binheap_idx != BINHEAP_NOIDX);
DBG(evb, "... bidx = %d\n", e->__binheap_idx);
} else {
e->__when = 0.0;
e->__binheap_idx = 0;
}
e->__vevb = evb; e->__vevb = evb;
e->__privflags = 0; e->__privflags = 0;
if (e->fd < 0)
VTAILQ_INSERT_TAIL(&evb->events, e, __list);
else
VTAILQ_INSERT_HEAD(&evb->events, e, __list);
if (e->sig > 0) { if (e->sig > 0) {
assert(es != NULL); assert(es != NULL);
...@@ -315,24 +303,16 @@ vev_del(struct vev_base *evb, struct vev *e) ...@@ -315,24 +303,16 @@ vev_del(struct vev_base *evb, struct vev *e)
CHECK_OBJ_NOTNULL(evb, VEV_BASE_MAGIC); CHECK_OBJ_NOTNULL(evb, VEV_BASE_MAGIC);
CHECK_OBJ_NOTNULL(e, VEV_MAGIC); CHECK_OBJ_NOTNULL(e, VEV_MAGIC);
DBG(evb, "ev_del(%p) fd = %d\n", e, e->fd); DBG(evb, "ev_del(%p) fd = %d i=%u L=%d\n", e, e->fd, e->__binheap_idx, evb->lpfd);
assert(evb == e->__vevb); assert(evb == e->__vevb);
assert(evb->thread == pthread_self()); assert(evb->thread == pthread_self());
assert(evb->pev[e->__binheap_idx] == e);
if (e->__binheap_idx != 0) assert(e->__binheap_idx != BINHEAP_NOIDX);
e->fd = -1;
binheap_delete(evb->binheap, e->__binheap_idx); binheap_delete(evb->binheap, e->__binheap_idx);
AZ(e->__binheap_idx); assert(e->__binheap_idx == BINHEAP_NOIDX);
if (e->fd >= 0) {
DBG(evb, "... pidx = %d\n", e->__poll_idx);
evb->pfd[e->__poll_idx].fd = -1;
if (e->__poll_idx == evb->lpfd - 1)
evb->lpfd--; evb->lpfd--;
else
evb->compact_pfd++;
e->fd = -1;
DBG(evb, "... lpfd = %d\n", evb->lpfd);
}
if (e->sig > 0) { if (e->sig > 0) {
assert(e->sig < vev_nsig); assert(e->sig < vev_nsig);
...@@ -346,12 +326,8 @@ vev_del(struct vev_base *evb, struct vev *e) ...@@ -346,12 +326,8 @@ vev_del(struct vev_base *evb, struct vev *e)
es->happened = 0; es->happened = 0;
} }
VTAILQ_REMOVE(&evb->events, e, __list);
e->magic = 0; e->magic = 0;
e->__vevb = NULL; e->__vevb = NULL;
evb->disturbed = 1;
} }
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
...@@ -371,38 +347,6 @@ vev_schedule(struct vev_base *evb) ...@@ -371,38 +347,6 @@ vev_schedule(struct vev_base *evb)
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
static void
vev_compact_pfd(struct vev_base *evb)
{
unsigned u;
struct pollfd *p;
struct vev *ep;
int lfd;
DBG(evb, "compact_pfd() lpfd = %d\n", evb->lpfd);
p = evb->pfd;
for (u = 0; u < evb->lpfd; u++, p++) {
DBG(evb, "...[%d] fd = %d\n", u, p->fd);
if (p->fd >= 0)
continue;
if (u == evb->lpfd - 1)
break;
lfd = evb->pfd[evb->lpfd - 1].fd;
VTAILQ_FOREACH(ep, &evb->events, __list)
if (ep->fd == lfd)
break;
AN(ep);
DBG(evb, "...[%d] move %p pidx %d\n", u, ep, ep->__poll_idx);
*p = evb->pfd[--evb->lpfd];
ep->__poll_idx = u;
}
evb->lpfd = u;
evb->compact_pfd = 0;
DBG(evb, "... lpfd = %d\n", evb->lpfd);
}
/*--------------------------------------------------------------------*/
static int static int
vev_sched_timeout(struct vev_base *evb, struct vev *e, double t) vev_sched_timeout(struct vev_base *evb, struct vev *e, double t)
{ {
...@@ -449,16 +393,20 @@ int ...@@ -449,16 +393,20 @@ int
vev_schedule_one(struct vev_base *evb) vev_schedule_one(struct vev_base *evb)
{ {
double t; double t;
struct vev *e, *e2, *e3; struct vev *e;
int i, j, tmo; int i, j, k, tmo;
struct pollfd *pfd;
CHECK_OBJ_NOTNULL(evb, VEV_BASE_MAGIC); CHECK_OBJ_NOTNULL(evb, VEV_BASE_MAGIC);
assert(evb->thread == pthread_self()); assert(evb->thread == pthread_self());
assert(evb->lpfd < evb->npfd);
if (evb->psig)
return (vev_sched_signal(evb));
e = binheap_root(evb->binheap); e = binheap_root(evb->binheap);
if (e != NULL) { if (e != NULL) {
CHECK_OBJ_NOTNULL(e, VEV_MAGIC); CHECK_OBJ_NOTNULL(e, VEV_MAGIC);
assert(e->__binheap_idx == 1); assert(e->__binheap_idx == BINHEAP_NOIDX + 1);
t = VTIM_mono(); t = VTIM_mono();
if (e->__when <= t) if (e->__when <= t)
return (vev_sched_timeout(evb, e, t)); return (vev_sched_timeout(evb, e, t));
...@@ -468,56 +416,45 @@ vev_schedule_one(struct vev_base *evb) ...@@ -468,56 +416,45 @@ vev_schedule_one(struct vev_base *evb)
} else } else
tmo = INFTIM; tmo = INFTIM;
if (evb->compact_pfd) if (tmo == INFTIM && evb->lpfd == BINHEAP_NOIDX + 1)
vev_compact_pfd(evb);
if (tmo == INFTIM && evb->lpfd == 0)
return (0); return (0);
if (evb->psig) i = poll(evb->pfd + 1, evb->lpfd - 1, tmo);
return (vev_sched_signal(evb));
assert(evb->lpfd < evb->npfd);
i = poll(evb->pfd, evb->lpfd, tmo);
if (i == -1 && errno == EINTR) if (i == -1 && errno == EINTR)
return (vev_sched_signal(evb)); return (vev_sched_signal(evb));
if (i == 0) { if (i == 0) {
assert(e != NULL); assert(e != NULL);
t = VTIM_mono(); t = VTIM_mono();
if (e->__when <= t) if (e->__when <= t)
return (vev_sched_timeout(evb, e, t)); return (vev_sched_timeout(evb, e, t));
} }
evb->disturbed = 0;
VTAILQ_FOREACH_SAFE(e, &evb->events, __list, e2) { k = 0;
if (i == 0) for(j = 1; j < evb->lpfd; j++) {
break; evb->pev[j]->fd_events = evb->pfd[j].revents;
if (e->fd < 0) if (evb->pev[j]->fd_events)
continue; k++;
assert(e->__poll_idx < evb->lpfd); }
pfd = &evb->pfd[e->__poll_idx]; assert(k == i);
assert(pfd->fd == e->fd);
if (!pfd->revents) DBG(evb, "EVENTS %d\n", i);
while (i > 0) {
for(j = BINHEAP_NOIDX + 1; j < evb->lpfd; j++) {
e = evb->pev[j];
if (e->fd_events == 0)
continue; continue;
DBG(evb, "callback(%p) fd = %d what = 0x%x pidx = %d\n", DBG(evb, "EVENT %p j=%d fd=%d ev=0x%x %d\n",
e, e->fd, pfd->revents, e->__poll_idx); e, j, e->fd, e->fd_events, i);
j = e->callback(e, pfd->revents); k = e->callback(e, e->fd_events);
e->fd_events = 0;
i--; i--;
if (evb->disturbed) { if (k) {
VTAILQ_FOREACH(e3, &evb->events, __list) {
if (e3 == e) {
e3 = VTAILQ_NEXT(e, __list);
break;
} else if (e3 == e2)
break;
}
e2 = e3;
evb->disturbed = 0;
}
if (j) {
vev_del(evb, e); vev_del(evb, e);
evb->disturbed = 0;
free(e); free(e);
} }
} }
}
AZ(i); AZ(i);
return (1); return (1);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment