Commit df7a8657 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Pass the handling function (SES_Handle) as param to WAIT_Init()

and call back into that from the waiters.
parent 917566c1
......@@ -975,7 +975,6 @@ struct sesspool *SES_NewPool(struct pool *pp, unsigned pool_no);
void SES_DeletePool(struct sesspool *sp);
int SES_ScheduleReq(struct req *);
struct req *SES_GetReq(const struct worker *, struct sess *);
void SES_Handle(struct sess *sp, double now);
void SES_ReleaseReq(struct req *);
pool_func_t SES_pool_accept_task;
......
......@@ -211,7 +211,7 @@ child_main(void)
Lck_New(&vxid_lock, lck_vxid);
WAIT_Init(NULL);
WAIT_Init(SES_Handle);
PAN_Init();
CLI_Init();
VFP_Init();
......
......@@ -245,19 +245,34 @@ SES_ScheduleReq(struct req *req)
* Handle a session (from waiter)
*/
void
SES_Handle(struct sess *sp, double now)
void __match_proto__(waiter_handle_f)
SES_Handle(void *ptr, int fd, enum wait_event ev, double now)
{
struct sess *sp;
struct sesspool *pp;
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
pp = sp->sesspool;
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
AN(pp->pool);
sp->task.func = ses_sess_pool_task;
sp->task.priv = sp;
if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT))
SES_Delete(sp, SC_OVERLOAD, now);
CAST_OBJ_NOTNULL(sp, ptr, SESS_MAGIC);
(void)fd;
switch (ev) {
case WAITER_TIMEOUT:
SES_Delete(sp, SC_RX_TIMEOUT, now);
break;
case WAITER_REMCLOSE:
SES_Delete(sp, SC_REM_CLOSE, now);
break;
case WAITER_ACTION:
pp = sp->sesspool;
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
AN(pp->pool);
sp->task.func = ses_sess_pool_task;
sp->task.priv = sp;
if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT))
SES_Delete(sp, SC_OVERLOAD, now);
break;
default:
WRONG("Wrong event in SES_Handle");
}
}
/*--------------------------------------------------------------------
......
......@@ -59,6 +59,8 @@ struct vwe {
pthread_t timer_thread;
int epfd;
waiter_handle_f *func
VTAILQ_HEAD(,sess) sesshead;
int pipes[2];
int timer_pipes[2];
......@@ -130,16 +132,16 @@ vwe_eev(struct vwe *vwe, const struct epoll_event *ep, double now)
CAST_OBJ_NOTNULL(sp, ep->data.ptr, SESS_MAGIC);
if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
SES_Handle(sp, now);
vwe->func(sp, sp->fd, WAITER_ACTION, now)
} else if (ep->events & EPOLLERR) {
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
SES_Delete(sp, SC_REM_CLOSE, now);
vwe->func(sp, sp->fd, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLHUP) {
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
SES_Delete(sp, SC_REM_CLOSE, now);
vwe->func(sp, sp->fd, WAITER_REMCLOSE, now);
} else if (ep->events & EPOLLRDHUP) {
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
SES_Delete(sp, SC_REM_CLOSE, now);
vwe->func(sp, sp->fd, WAITER_REMCLOSE, now);
}
}
}
......@@ -192,7 +194,7 @@ vwe_thread(void *priv)
break;
VTAILQ_REMOVE(&vwe->sesshead, sp, list);
// XXX: not yet VTCP_linger(sp->fd, 0);
SES_Delete(sp, SC_RX_TIMEOUT, now);
vwe->func(sp, sp->fd, WAITER_TIMEOUT, now);
}
}
return (NULL);
......@@ -236,7 +238,7 @@ vwe_init(waiter_handle_f *func)
{
struct vwe *vwe;
(void)func;
AN(func);
ALLOC_OBJ(vwe, VWE_MAGIC);
AN(vwe);
VTAILQ_INIT(&vwe->sesshead);
......@@ -247,6 +249,8 @@ vwe_init(waiter_handle_f *func)
AZ(VFIL_nonblocking(vwe->pipes[1]));
AZ(VFIL_nonblocking(vwe->timer_pipes[0]));
vwe->func = func;
AZ(pthread_create(&vwe->timer_thread,
NULL, vwe_timeout_idle_ticker, vwe));
AZ(pthread_create(&vwe->epoll_thread, NULL, vwe_thread, vwe));
......
......@@ -52,6 +52,7 @@
struct vwk {
unsigned magic;
#define VWK_MAGIC 0x1cc2acc2
waiter_handle_f *func;
pthread_t thread;
int pipes[2];
int kq;
......@@ -127,11 +128,11 @@ vwk_sess_ev(struct vwk *vwk, const struct kevent *kp, double now)
if (kp->data > 0) {
VTAILQ_REMOVE(&vwk->sesshead, sp, list);
SES_Handle(sp, now);
vwk->func(sp, sp->fd, WAITER_ACTION, now);
return;
} else if (kp->flags & EV_EOF) {
VTAILQ_REMOVE(&vwk->sesshead, sp, list);
SES_Delete(sp, SC_REM_CLOSE, now);
vwk->func(sp, sp->fd, WAITER_REMCLOSE, now);
return;
} else {
VSL(SLT_Debug, sp->vxid,
......@@ -207,7 +208,7 @@ vwk_thread(void *priv)
break;
VTAILQ_REMOVE(&vwk->sesshead, sp, list);
// XXX: not yet (void)VTCP_linger(sp->fd, 0);
SES_Delete(sp, SC_RX_TIMEOUT, now);
vwk->func(sp, sp->fd, WAITER_TIMEOUT, now);
}
}
NEEDLESS_RETURN(NULL);
......@@ -232,10 +233,12 @@ vwk_init(waiter_handle_f *func)
{
struct vwk *vwk;
(void)func;
AN(func);
ALLOC_OBJ(vwk, VWK_MAGIC);
AN(vwk);
vwk->func = func;
VTAILQ_INIT(&vwk->sesshead);
AZ(pipe(vwk->pipes));
......
......@@ -45,6 +45,7 @@
struct vwp {
unsigned magic;
#define VWP_MAGIC 0x4b2cc735
waiter_handle_f *func;
int pipes[2];
pthread_t poll_thread;
struct pollfd *pollfd;
......@@ -159,12 +160,12 @@ vwp_main(void *priv)
vwp->pollfd[fd].revents = 0;
VTAILQ_REMOVE(&vwp->sesshead, sp, list);
vwp_unpoll(vwp, fd);
SES_Handle(sp, now);
vwp->func(sp, sp->fd, WAITER_ACTION, now);
} else if (sp->t_idle <= deadline) {
VTAILQ_REMOVE(&vwp->sesshead, sp, list);
vwp_unpoll(vwp, fd);
// XXX: not yet (void)VTCP_linger(sp->fd, 0);
SES_Delete(sp, SC_RX_TIMEOUT, now);
vwp->func(sp, sp->fd, WAITER_TIMEOUT, now);
}
}
if (v2 && vwp->pollfd[vwp->pipes[0]].revents) {
......@@ -209,12 +210,14 @@ vwp_poll_init(waiter_handle_f *func)
{
struct vwp *vwp;
(void)func;
AN(func);
ALLOC_OBJ(vwp, VWP_MAGIC);
AN(vwp);
VTAILQ_INIT(&vwp->sesshead);
AZ(pipe(vwp->pipes));
vwp->func = func;
AZ(VFIL_nonblocking(vwp->pipes[1]));
vwp_pollspace(vwp, 256);
......
......@@ -50,6 +50,7 @@
struct vws {
unsigned magic;
#define VWS_MAGIC 0x0b771473
waiter_handle_f *func;
pthread_t ports_thread;
int dport;
VTAILQ_HEAD(,sess) sesshead;
......@@ -86,7 +87,7 @@ vws_port_ev(struct vws *vws, port_event_t *ev, double now) {
if(ev->portev_events & POLLERR) {
vws_del(vws, sp->fd);
VTAILQ_REMOVE(&vws->sesshead, sp, list);
SES_Delete(sp, SC_REM_CLOSE, now);
vws->func(sp, sp->fd, WAITER_REMCLOSE);
return;
}
......@@ -106,8 +107,8 @@ vws_port_ev(struct vws *vws, port_event_t *ev, double now) {
vws_del(vws, sp->fd);
VTAILQ_REMOVE(&vws->sesshead, sp, list);
/* SES_Handle will also handle errors */
SES_Handle(sp, now);
/* also handle errors */
vws->func(sp, sp->fd, WAITER_ACTION);
}
return;
}
......@@ -211,7 +212,7 @@ vws_thread(void *priv)
if(sp->fd != -1) {
vws_del(vws, sp->fd);
}
SES_Delete(sp, SC_RX_TIMEOUT, now);
vws->func(sp, sp->fd, WAITER_TIMEOUT);
}
/*
......@@ -260,9 +261,10 @@ vws_init(waiter_handle_f *func)
{
struct vws *vws;
(void)func
AN(func);
ALLOC_OBJ(vws, VWS_MAGIC);
AN(vws);
vws->func = func;
VTAILQ_INIT(&vws->sesshead);
AZ(pthread_create(&vws->ports_thread, NULL, vws_thread, vws));
return (vws);
......
......@@ -36,7 +36,7 @@ enum wait_event {
WAITER_ACTION
};
typedef void waiter_handle_f(void *ptr, int fd, enum wait_event);
typedef void waiter_handle_f(void *ptr, int fd, enum wait_event, double now);
typedef void* waiter_init_f(waiter_handle_f *);
typedef int waiter_pass_f(void *priv, struct sess *);
......@@ -48,6 +48,8 @@ struct waiter {
waiter_pass_f *pass;
};
waiter_handle_f SES_Handle;
/* cache_waiter.c */
int WAIT_Enter(struct sess *sp);
void WAIT_Init(waiter_handle_f *);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment