Commit eff28c6d authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Generalize the "special" way we use to schedule accepted sockets

onto a worker thread (falling back to the current (=acceptor) thread
if the pool is empty.
parent 96f5732e
...@@ -322,6 +322,7 @@ struct wrk_accept { ...@@ -322,6 +322,7 @@ struct wrk_accept {
socklen_t acceptaddrlen; socklen_t acceptaddrlen;
int acceptsock; int acceptsock;
struct listen_sock *acceptlsock; struct listen_sock *acceptlsock;
struct sesspool *sesspool;
}; };
/* Worker pool stuff -------------------------------------------------*/ /* Worker pool stuff -------------------------------------------------*/
......
...@@ -49,6 +49,7 @@ struct poolsock { ...@@ -49,6 +49,7 @@ struct poolsock {
#define POOLSOCK_MAGIC 0x1b0a2d38 #define POOLSOCK_MAGIC 0x1b0a2d38
struct listen_sock *lsock; struct listen_sock *lsock;
struct pool_task task; struct pool_task task;
struct sesspool *sesspool;
}; };
/* Number of work requests queued in excess of worker threads available */ /* Number of work requests queued in excess of worker threads available */
...@@ -177,6 +178,47 @@ pool_getidleworker(struct pool *pp) ...@@ -177,6 +178,47 @@ pool_getidleworker(struct pool *pp)
return (wrk); return (wrk);
} }
/*--------------------------------------------------------------------
* Special scheduling: If no thread can be found, the current thread
* will be prepared for rescheduling instead.
* The selected threads workspace is reserved and the argument put there.
* Return one if another thread was scheduled, otherwise zero.
*/
static int
Pool_Task_Arg(struct worker *wrk, const void *arg, size_t arg_len)
{
struct pool *pp;
struct worker *wrk2;
int retval;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
AN(arg);
AN(arg_len);
pp = wrk->pool;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
Lck_Lock(&pp->mtx);
wrk2 = pool_getidleworker(pp);
if (wrk2 != NULL) {
VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
retval = 1;
} else {
wrk2 = wrk;
retval = 0;
}
Lck_Unlock(&pp->mtx);
AZ(wrk2->task.func);
assert(arg_len == WS_Reserve(wrk2->aws, arg_len));
memcpy(wrk2->aws->f, arg, arg_len);
wrk2->task.func = SES_pool_accept_task;
wrk2->task.priv = wrk2->aws->f;
if (retval)
AZ(pthread_cond_signal(&wrk2->cond));
return (retval);
}
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* Nobody is accepting on this socket, so we do. * Nobody is accepting on this socket, so we do.
* *
...@@ -191,19 +233,13 @@ pool_getidleworker(struct pool *pp) ...@@ -191,19 +233,13 @@ pool_getidleworker(struct pool *pp)
static void __match_proto__(task_func_t) static void __match_proto__(task_func_t)
pool_accept(struct worker *wrk, void *arg) pool_accept(struct worker *wrk, void *arg)
{ {
struct worker *wrk2; struct wrk_accept wa;
struct wrk_accept *wa, *wa2;
struct pool *pp;
struct poolsock *ps; struct poolsock *ps;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
pp = wrk->pool;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC); CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
CHECK_OBJ_NOTNULL(ps->lsock, LISTEN_SOCK_MAGIC); CHECK_OBJ_NOTNULL(ps->lsock, LISTEN_SOCK_MAGIC);
assert(sizeof *wa == WS_Reserve(wrk->aws, sizeof *wa));
wa = (void*)wrk->aws->f;
/* Delay until we are ready (flag is set when all /* Delay until we are ready (flag is set when all
* initialization has finished) */ * initialization has finished) */
...@@ -211,36 +247,22 @@ pool_accept(struct worker *wrk, void *arg) ...@@ -211,36 +247,22 @@ pool_accept(struct worker *wrk, void *arg)
VTIM_sleep(.1); VTIM_sleep(.1);
while (1) { while (1) {
INIT_OBJ(wa, WRK_ACCEPT_MAGIC); INIT_OBJ(&wa, WRK_ACCEPT_MAGIC);
wa.sesspool = ps->sesspool;
assert(ps->lsock->sock > 0); // We know where stdin is assert(ps->lsock->sock > 0); // We know where stdin is
if (VCA_Accept(ps->lsock, wa) < 0) { if (VCA_Accept(ps->lsock, &wa) < 0) {
wrk->stats->sess_fail++; wrk->stats->sess_fail++;
/* We're going to pace in vca anyway... */ /* We're going to pace in vca anyway... */
(void)Pool_TrySumstat(wrk); (void)Pool_TrySumstat(wrk);
continue; continue;
} }
Lck_Lock(&pp->mtx); if (!Pool_Task_Arg(wrk, &wa, sizeof wa)) {
wrk2 = pool_getidleworker(pp); AZ(Pool_Task(wrk->pool, &ps->task, POOL_QUEUE_BACK));
if (wrk2 == NULL) {
/* No idle threads, do it ourselves */
Lck_Unlock(&pp->mtx);
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
wrk->task.func = SES_pool_accept_task;
wrk->task.priv = pp->sesspool;
return; return;
} }
VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
AZ(wrk2->task.func);
assert(sizeof *wa2 == WS_Reserve(wrk2->aws, sizeof *wa2));
wa2 = (void*)wrk2->aws->f;
memcpy(wa2, wa, sizeof *wa);
wrk2->task.func = SES_pool_accept_task;
wrk2->task.priv = pp->sesspool;
Lck_Unlock(&pp->mtx);
AZ(pthread_cond_signal(&wrk2->cond));
/* /*
* We were able to hand off, so release this threads VCL * We were able to hand off, so release this threads VCL
...@@ -589,11 +611,12 @@ pool_mkpool(unsigned pool_no) ...@@ -589,11 +611,12 @@ pool_mkpool(unsigned pool_no)
VTAILQ_INIT(&pp->idle_queue); VTAILQ_INIT(&pp->idle_queue);
VTAILQ_INIT(&pp->front_queue); VTAILQ_INIT(&pp->front_queue);
VTAILQ_INIT(&pp->back_queue); VTAILQ_INIT(&pp->back_queue);
pp->sesspool = SES_NewPool(pp, pool_no);
AN(pp->sesspool);
AZ(pthread_cond_init(&pp->herder_cond, NULL)); AZ(pthread_cond_init(&pp->herder_cond, NULL));
AZ(pthread_create(&pp->herder_thr, NULL, pool_herder, pp)); AZ(pthread_create(&pp->herder_thr, NULL, pool_herder, pp));
pp->sesspool = SES_NewPool(pp, pool_no);
AN(pp->sesspool);
VTAILQ_FOREACH(ls, &heritage.socks, list) { VTAILQ_FOREACH(ls, &heritage.socks, list) {
assert(ls->sock > 0); // We know where stdin is assert(ls->sock > 0); // We know where stdin is
ALLOC_OBJ(ps, POOLSOCK_MAGIC); ALLOC_OBJ(ps, POOLSOCK_MAGIC);
...@@ -601,6 +624,7 @@ pool_mkpool(unsigned pool_no) ...@@ -601,6 +624,7 @@ pool_mkpool(unsigned pool_no)
ps->lsock = ls; ps->lsock = ls;
ps->task.func = pool_accept; ps->task.func = pool_accept;
ps->task.priv = ps; ps->task.priv = ps;
ps->sesspool = pp->sesspool;
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK)); AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
} }
......
...@@ -196,9 +196,11 @@ SES_pool_accept_task(struct worker *wrk, void *arg) ...@@ -196,9 +196,11 @@ SES_pool_accept_task(struct worker *wrk, void *arg)
struct sesspool *pp; struct sesspool *pp;
struct sess *sp; struct sess *sp;
const char *lsockname; const char *lsockname;
struct wrk_accept *wa;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(pp, arg, SESSPOOL_MAGIC); CAST_OBJ_NOTNULL(wa, arg, WRK_ACCEPT_MAGIC);
pp = wa->sesspool;
/* Turn accepted socket into a session */ /* Turn accepted socket into a session */
AN(wrk->aws->r); AN(wrk->aws->r);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment