Commit b04b83ba authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Move the accept sockets into the new pool-task API and GC all the

cruft now not needed.
parent ac787244
......@@ -304,14 +304,6 @@ enum pool_how {
/*--------------------------------------------------------------------*/
enum e_do_what {
pool_do_inval = 0,
pool_do_sess,
pool_do_accept,
pool_do_nothing,
pool_do_die,
};
struct worker {
unsigned magic;
#define WORKER_MAGIC 0x6391adcf
......@@ -323,12 +315,8 @@ struct worker {
void *nhashpriv;
struct dstat stats;
/* New Pool stuff */
struct pool_task task;
/* Pool stuff */
enum e_do_what do_what;
double lastused;
struct wrw wrw;
......@@ -938,7 +926,6 @@ int SES_Schedule(struct sess *sp);
void SES_Handle(struct sess *sp, double now);
void SES_GetReq(struct sess *sp);
void SES_ReleaseReq(struct sess *sp);
pool_func_t SES_pool_task;
pool_func_t SES_pool_accept_task;
/* cache_shmlog.c */
......
......@@ -85,8 +85,8 @@ VTAILQ_HEAD(taskhead, pool_task);
struct poolsock {
unsigned magic;
#define POOLSOCK_MAGIC 0x1b0a2d38
VTAILQ_ENTRY(poolsock) list;
struct listen_sock *lsock;
struct pool_task task;
};
/* Number of work requests queued in excess of worker threads available */
......@@ -104,7 +104,6 @@ struct pool {
struct taskhead idle_queue;
struct taskhead front_queue;
struct taskhead back_queue;
VTAILQ_HEAD(, poolsock) socks;
unsigned nthr;
unsigned lqueue;
unsigned last_lqueue;
......@@ -126,6 +125,7 @@ pool_getidleworker(const struct pool *pp, int back)
struct worker *wrk;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
Lck_AssertHeld(&pp->mtx);
if (back)
pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
else
......@@ -141,28 +141,27 @@ pool_getidleworker(const struct pool *pp, int back)
* Nobody is accepting on this socket, so we do.
*
* As long as we can stick the accepted connection to another thread
* we do so, otherwise we return and handle it ourselves.
*
* Notice calling convention: Called locked and returns locked, but
* works lock in the meantime.
* we do so, otherwise we put the socket back on the "BACK" queue
* and handle the new connection ourselves.
*
* We store data about the accept in reserved workspace, it is only used
* for a brief moment and it takes up around 144 bytes.
* We store data about the accept in reserved workspace on the reserved
* worker workspace. SES_pool_accept_task() knows about this.
*/
static int
pool_accept(struct pool *pp, struct worker *wrk, const struct poolsock *ps)
static void
pool_accept(struct worker *wrk, void *arg)
{
struct worker *wrk2;
struct wrk_accept *wa, *wa2;
struct pool *pp;
struct poolsock *ps;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(ps, POOLSOCK_MAGIC);
pp = wrk->pool;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
CHECK_OBJ_NOTNULL(ps->lsock, LISTEN_SOCK_MAGIC);
Lck_AssertHeld(&pp->mtx);
Lck_Unlock(&pp->mtx);
assert(sizeof *wa == WS_Reserve(wrk->ws, sizeof *wa));
wa = (void*)wrk->ws->f;
while (1) {
......@@ -171,8 +170,9 @@ pool_accept(struct pool *pp, struct worker *wrk, const struct poolsock *ps)
if (ps->lsock->sock < 0) {
/* Socket Shutdown */
Lck_Lock(&pp->mtx);
return (-1);
FREE_OBJ(ps);
WS_Release(wrk->ws, 0);
return;
}
if (VCA_Accept(ps->lsock, wa) < 0) {
wrk->stats.sess_fail++;
......@@ -183,8 +183,13 @@ pool_accept(struct pool *pp, struct worker *wrk, const struct poolsock *ps)
Lck_Lock(&pp->mtx);
wrk2 = pool_getidleworker(pp, 0);
if (wrk2 == NULL)
return (0);
if (wrk2 == NULL) {
/* No idle threads, do it ourselves */
Lck_Unlock(&pp->mtx);
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
SES_pool_accept_task(wrk, pp->sesspool);
return;
}
VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
Lck_Unlock(&pp->mtx);
assert(sizeof *wa2 == WS_Reserve(wrk2->ws, sizeof *wa2));
......@@ -248,7 +253,7 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
WRONG("Unknown enum pool_how");
}
Lck_Unlock(&pp->mtx);
if (how == POOL_QUEUE_FRONT && !retval)
if (retval)
AZ(pthread_cond_signal(&pp->herder_cond));
return (retval);
}
......@@ -261,18 +266,14 @@ void
Pool_Work_Thread(void *priv, struct worker *wrk)
{
struct pool *pp;
int stats_clean, i;
struct poolsock *ps;
int stats_clean;
struct pool_task *tp;
CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
wrk->pool = pp;
Lck_Lock(&pp->mtx);
stats_clean = 1;
while (1) {
Lck_AssertHeld(&pp->mtx);
wrk->do_what = pool_do_inval;
Lck_Lock(&pp->mtx);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
......@@ -282,39 +283,13 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
if (tp != NULL) {
pp->lqueue--;
VTAILQ_REMOVE(&pp->front_queue, tp, list);
}
if (tp == NULL) {
} else {
tp = VTAILQ_FIRST(&pp->back_queue);
if (tp != NULL)
VTAILQ_REMOVE(&pp->back_queue, tp, list);
}
if (tp != NULL) {
Lck_Unlock(&pp->mtx);
AN(tp->func);
assert(wrk->pool == pp);
tp->func(wrk, tp->priv);
stats_clean = WRK_TrySumStat(wrk);
Lck_Lock(&pp->mtx);
continue;
}
if (!VTAILQ_EMPTY(&pp->socks)) {
/* Accept on a socket */
ps = VTAILQ_FIRST(&pp->socks);
VTAILQ_REMOVE(&pp->socks, ps, list);
i = pool_accept(pp, wrk, ps);
Lck_AssertHeld(&pp->mtx);
if (i < 0) {
/* Socket Shutdown */
FREE_OBJ(ps);
WS_Release(wrk->ws, 0);
continue;
}
VTAILQ_INSERT_TAIL(&pp->socks, ps, list);
wrk->do_what = pool_do_accept;
} else {
if (tp == NULL) {
/* Nothing to do: To sleep, perchance to dream ... */
if (isnan(wrk->lastused))
wrk->lastused = VTIM_real();
......@@ -324,45 +299,17 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
if (!stats_clean)
WRK_SumStat(wrk);
(void)Lck_CondWait(&wrk->cond, &pp->mtx, NULL);
tp = &wrk->task;
}
if (wrk->task.func != NULL) {
Lck_Unlock(&pp->mtx);
assert(wrk->pool == pp);
wrk->task.func(wrk, wrk->task.priv);
wrk->task.func = NULL;
wrk->task.priv = NULL;
stats_clean = WRK_TrySumStat(wrk);
Lck_Lock(&pp->mtx);
continue;
}
if (wrk->do_what == pool_do_die)
break;
Lck_Unlock(&pp->mtx);
if (wrk->do_what == pool_do_accept) {
SES_pool_accept_task(wrk, pp->sesspool);
if (wrk->sp == NULL)
wrk->do_what = pool_do_nothing;
else
wrk->do_what = pool_do_sess;
}
if (tp->func == NULL)
break;
if (wrk->do_what == pool_do_sess) {
stats_clean = 0;
assert(wrk->pool == pp);
SES_pool_task(wrk, wrk->sp);
} else if (wrk->do_what == pool_do_nothing) {
/* we already did */
} else {
WRONG("Invalid wrk->do_what");
}
assert(wrk->pool == pp);
tp->func(wrk, tp->priv);
stats_clean = WRK_TrySumStat(wrk);
Lck_Lock(&pp->mtx);
}
Lck_Unlock(&pp->mtx);
wrk->pool = NULL;
}
......@@ -474,11 +421,10 @@ pool_herder(void *priv)
VSC_C_main->sess_dropped += pp->ndropped;
pp->nqueued = pp->ndropped = 0;
wrk = pool_getidleworker(pp, 1);
if (wrk != NULL &&
(wrk->lastused < t_idle ||
if (wrk != NULL && (wrk->lastused < t_idle ||
pp->nthr > cache_param->wthread_max)) {
VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
} else
} else
wrk = NULL;
Lck_Unlock(&pp->mtx);
......@@ -490,7 +436,8 @@ pool_herder(void *priv)
VSC_C_main->threads_destroyed++;
Lck_Unlock(&pool_mtx);
AZ(wrk->sp);
wrk->do_what = pool_do_die;
wrk->task.func = NULL;
wrk->task.priv = NULL;
AZ(pthread_cond_signal(&wrk->cond));
}
}
......@@ -514,7 +461,6 @@ pool_mkpool(unsigned pool_no)
Lck_New(&pp->mtx, lck_wq);
VTAILQ_INIT(&pp->idle_queue);
VTAILQ_INIT(&pp->socks);
VTAILQ_INIT(&pp->front_queue);
VTAILQ_INIT(&pp->back_queue);
pp->sesspool = SES_NewPool(pp, pool_no);
......@@ -526,7 +472,9 @@ pool_mkpool(unsigned pool_no)
ALLOC_OBJ(ps, POOLSOCK_MAGIC);
XXXAN(ps);
ps->lsock = ls;
VTAILQ_INSERT_TAIL(&pp->socks, ps, list);
ps->task.func = pool_accept;
ps->task.priv = ps;
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
}
AZ(pthread_condattr_init(&cv_attr));
......
......@@ -127,40 +127,12 @@ SES_Alloc(void)
return (sp);
}
/*--------------------------------------------------------------------
* The pool-task for a newly accepted session
*/
void
SES_pool_accept_task(struct worker *wrk, void *arg)
{
struct sesspool *pp;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(pp, arg, SESSPOOL_MAGIC);
/* Turn accepted socket into a session */
AZ(wrk->sp);
AN(wrk->ws->r);
wrk->sp = ses_new(pp);
if (wrk->sp == NULL) {
VCA_FailSess(wrk);
return;
}
VCA_SetupSess(wrk);
wrk->sp->step = STP_FIRST;
WS_Release(wrk->ws, 0);
SES_pool_task(wrk, wrk->sp);
}
/*--------------------------------------------------------------------
* The pool-task function for sessions
*/
void
SES_pool_task(struct worker *wrk, void *arg)
static void
ses_pool_task(struct worker *wrk, void *arg)
{
struct sess *sp;
......@@ -191,6 +163,32 @@ SES_pool_task(struct worker *wrk, void *arg)
}
}
/*--------------------------------------------------------------------
* The pool-task for a newly accepted session
*/
void
SES_pool_accept_task(struct worker *wrk, void *arg)
{
struct sesspool *pp;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(pp, arg, SESSPOOL_MAGIC);
/* Turn accepted socket into a session */
AZ(wrk->sp);
AN(wrk->ws->r);
wrk->sp = ses_new(pp);
if (wrk->sp == NULL) {
VCA_FailSess(wrk);
return;
}
VCA_SetupSess(wrk);
wrk->sp->step = STP_FIRST;
WS_Release(wrk->ws, 0);
ses_pool_task(wrk, wrk->sp);
}
/*--------------------------------------------------------------------
* Schedule a session back on a work-thread from its pool
*/
......@@ -207,7 +205,7 @@ SES_Schedule(struct sess *sp)
AN(pp->pool);
AZ(sp->wrk);
sp->task.func = SES_pool_task;
sp->task.func = ses_pool_task;
sp->task.priv = sp;
if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT)) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment