Commit 273788ae authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Move session scheduling to new task api

parent 7fc7fa7d
...@@ -288,7 +288,7 @@ struct wrk_accept { ...@@ -288,7 +288,7 @@ struct wrk_accept {
/* Worker pool stuff -------------------------------------------------*/ /* Worker pool stuff -------------------------------------------------*/
typedef void pool_func_t(struct pool *pp, struct worker *wrk, void *priv); typedef void pool_func_t(struct worker *wrk, void *priv);
struct pool_task { struct pool_task {
VTAILQ_ENTRY(pool_task) list; VTAILQ_ENTRY(pool_task) list;
...@@ -324,8 +324,7 @@ struct worker { ...@@ -324,8 +324,7 @@ struct worker {
struct dstat stats; struct dstat stats;
/* New Pool stuff */ /* New Pool stuff */
pool_func_t *pool_func; struct pool_task task;
void *pool_priv;
/* Pool stuff */ /* Pool stuff */
enum e_do_what do_what; enum e_do_what do_what;
...@@ -914,7 +913,6 @@ void PipeSession(struct sess *sp); ...@@ -914,7 +913,6 @@ void PipeSession(struct sess *sp);
/* cache_pool.c */ /* cache_pool.c */
void Pool_Init(void); void Pool_Init(void);
void Pool_Work_Thread(void *priv, struct worker *w); void Pool_Work_Thread(void *priv, struct worker *w);
int Pool_Schedule(struct pool *pp, struct sess *sp);
int Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how); int Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how);
#define WRW_IsReleased(w) ((w)->wrw.wfd == NULL) #define WRW_IsReleased(w) ((w)->wrw.wfd == NULL)
......
...@@ -199,8 +199,8 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how) ...@@ -199,8 +199,8 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
if (wrk != NULL) { if (wrk != NULL) {
VTAILQ_REMOVE(&pp->idle, wrk, list); VTAILQ_REMOVE(&pp->idle, wrk, list);
Lck_Unlock(&pp->mtx); Lck_Unlock(&pp->mtx);
wrk->pool_func = task->func; wrk->task.func = task->func;
wrk->pool_priv = task->priv; wrk->task.priv = task->priv;
AZ(pthread_cond_signal(&wrk->cond)); AZ(pthread_cond_signal(&wrk->cond));
return (0); return (0);
} }
...@@ -210,7 +210,15 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how) ...@@ -210,7 +210,15 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
retval = -1; retval = -1;
break; break;
case POOL_QUEUE_FRONT: case POOL_QUEUE_FRONT:
VTAILQ_INSERT_TAIL(&pp->front_queue, task, list); /* If we have too much in the queue already, refuse. */
if (pp->lqueue > (cache_param->queue_max * pp->nthr) / 100) {
pp->ndropped++;
retval = -1;
} else {
VTAILQ_INSERT_TAIL(&pp->front_queue, task, list);
pp->nqueued++;
pp->lqueue++;
}
break; break;
case POOL_QUEUE_BACK: case POOL_QUEUE_BACK:
VTAILQ_INSERT_TAIL(&pp->back_queue, task, list); VTAILQ_INSERT_TAIL(&pp->back_queue, task, list);
...@@ -219,6 +227,8 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how) ...@@ -219,6 +227,8 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
WRONG("Unknown enum pool_how"); WRONG("Unknown enum pool_how");
} }
Lck_Unlock(&pp->mtx); Lck_Unlock(&pp->mtx);
if (how == POOL_QUEUE_FRONT && !retval)
AZ(pthread_cond_signal(&pp->herder_cond));
return (retval); return (retval);
} }
...@@ -248,8 +258,10 @@ Pool_Work_Thread(void *priv, struct worker *wrk) ...@@ -248,8 +258,10 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
WS_Reset(wrk->ws, NULL); WS_Reset(wrk->ws, NULL);
tp = VTAILQ_FIRST(&pp->front_queue); tp = VTAILQ_FIRST(&pp->front_queue);
if (tp != NULL) if (tp != NULL) {
pp->lqueue--;
VTAILQ_REMOVE(&pp->front_queue, tp, list); VTAILQ_REMOVE(&pp->front_queue, tp, list);
}
if (tp == NULL) { if (tp == NULL) {
tp = VTAILQ_FIRST(&pp->back_queue); tp = VTAILQ_FIRST(&pp->back_queue);
...@@ -260,7 +272,8 @@ Pool_Work_Thread(void *priv, struct worker *wrk) ...@@ -260,7 +272,8 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
if (tp != NULL) { if (tp != NULL) {
Lck_Unlock(&pp->mtx); Lck_Unlock(&pp->mtx);
AN(tp->func); AN(tp->func);
tp->func(pp, wrk, tp->priv); assert(wrk->pool == pp);
tp->func(wrk, tp->priv);
stats_clean = WRK_TrySumStat(wrk); stats_clean = WRK_TrySumStat(wrk);
Lck_Lock(&pp->mtx); Lck_Lock(&pp->mtx);
continue; continue;
...@@ -297,6 +310,17 @@ Pool_Work_Thread(void *priv, struct worker *wrk) ...@@ -297,6 +310,17 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
(void)Lck_CondWait(&wrk->cond, &pp->mtx, NULL); (void)Lck_CondWait(&wrk->cond, &pp->mtx, NULL);
} }
if (wrk->task.func != NULL) {
Lck_Unlock(&pp->mtx);
assert(wrk->pool == pp);
wrk->task.func(wrk, wrk->task.priv);
wrk->task.func = NULL;
wrk->task.priv = NULL;
stats_clean = WRK_TrySumStat(wrk);
Lck_Lock(&pp->mtx);
continue;
}
if (wrk->do_what == pool_do_die) if (wrk->do_what == pool_do_die)
break; break;
...@@ -320,7 +344,8 @@ Pool_Work_Thread(void *priv, struct worker *wrk) ...@@ -320,7 +344,8 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
if (wrk->do_what == pool_do_sess) { if (wrk->do_what == pool_do_sess) {
stats_clean = 0; stats_clean = 0;
SES_pool_task(pp, wrk, wrk->sp); assert(wrk->pool == pp);
SES_pool_task(wrk, wrk->sp);
} else if (wrk->do_what == pool_do_nothing) { } else if (wrk->do_what == pool_do_nothing) {
/* we already did */ /* we already did */
} else { } else {
...@@ -333,56 +358,6 @@ Pool_Work_Thread(void *priv, struct worker *wrk) ...@@ -333,56 +358,6 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
wrk->pool = NULL; wrk->pool = NULL;
} }
/*--------------------------------------------------------------------
* Queue a workrequest if possible.
*
* Return zero if the request was queued, negative if it wasn't.
*/
static int
pool_queue(struct pool *pp, struct sess *sp)
{
struct worker *wrk;
Lck_Lock(&pp->mtx);
/* If there are idle threads, we tickle the first one into action */
wrk = VTAILQ_FIRST(&pp->idle);
if (wrk != NULL) {
VTAILQ_REMOVE(&pp->idle, wrk, list);
Lck_Unlock(&pp->mtx);
wrk->sp = sp;
wrk->do_what = pool_do_sess;
AZ(pthread_cond_signal(&wrk->cond));
return (0);
}
/* If we have too much in the queue already, refuse. */
if (pp->lqueue > (cache_param->queue_max * pp->nthr) / 100) {
pp->ndropped++;
Lck_Unlock(&pp->mtx);
return (-1);
}
VTAILQ_INSERT_TAIL(&pp->queue, sp, list);
pp->nqueued++;
pp->lqueue++;
Lck_Unlock(&pp->mtx);
AZ(pthread_cond_signal(&pp->herder_cond));
return (0);
}
/*--------------------------------------------------------------------*/
int
Pool_Schedule(struct pool *pp, struct sess *sp)
{
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
AZ(sp->wrk);
return(pool_queue(pp, sp));
}
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* Create another thread, if necessary & possible * Create another thread, if necessary & possible
*/ */
......
...@@ -133,19 +133,20 @@ SES_Alloc(void) ...@@ -133,19 +133,20 @@ SES_Alloc(void)
*/ */
void void
SES_pool_task(struct pool *pp, struct worker *wrk, void *arg) SES_pool_task(struct worker *wrk, void *arg)
{ {
struct sess *sp; struct sess *sp;
AN(pp);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC); CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC);
AZ(wrk->ws->r); AZ(wrk->ws->r);
wrk->lastused = NAN; wrk->lastused = NAN;
THR_SetSession(sp); THR_SetSession(sp);
// AZ(wrk->sp); if (wrk->sp == NULL)
// wrk->sp = sp; wrk->sp = sp;
else
assert(wrk->sp == sp);
AZ(sp->wrk); AZ(sp->wrk);
sp->wrk = wrk; sp->wrk = wrk;
CNT_Session(sp); CNT_Session(sp);
...@@ -178,7 +179,11 @@ SES_Schedule(struct sess *sp) ...@@ -178,7 +179,11 @@ SES_Schedule(struct sess *sp)
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC); CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
AN(pp->pool); AN(pp->pool);
if (Pool_Schedule(pp->pool, sp)) { AZ(sp->wrk);
sp->task.func = SES_pool_task;
sp->task.priv = sp;
if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT)) {
VSC_C_main->client_drop_late++; VSC_C_main->client_drop_late++;
sp->t_idle = VTIM_real(); sp->t_idle = VTIM_real();
if (sp->req != NULL && sp->req->vcl != NULL) { if (sp->req != NULL && sp->req->vcl != NULL) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment