Commit 56c353f7 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Give session memory pools a back pointer to their (thread) pool.

Schedule sessions from busy-lists and from the waiter via
SES_Schedule(), to Pool_Schedule() in the right pool.
parent 6810e7c3
...@@ -841,9 +841,9 @@ void PipeSession(struct sess *sp); ...@@ -841,9 +841,9 @@ void PipeSession(struct sess *sp);
/* cache_pool.c */ /* cache_pool.c */
void Pool_Init(void); void Pool_Init(void);
int Pool_QueueSession(struct sess *sp);
void Pool_Work_Thread(void *priv, struct worker *w); void Pool_Work_Thread(void *priv, struct worker *w);
void Pool_Wait(struct sess *sp); void Pool_Wait(struct sess *sp);
int Pool_Schedule(struct pool *pp, struct sess *sp);
#define WRW_IsReleased(w) ((w)->wrw.wfd == NULL) #define WRW_IsReleased(w) ((w)->wrw.wfd == NULL)
int WRW_Error(const struct worker *w); int WRW_Error(const struct worker *w);
...@@ -864,8 +864,10 @@ struct sess *SES_Alloc(void); ...@@ -864,8 +864,10 @@ struct sess *SES_Alloc(void);
void SES_Close(struct sess *sp, const char *reason); void SES_Close(struct sess *sp, const char *reason);
void SES_Delete(struct sess *sp, const char *reason); void SES_Delete(struct sess *sp, const char *reason);
void SES_Charge(struct sess *sp); void SES_Charge(struct sess *sp);
struct sesspool *SES_NewPool(void); struct sesspool *SES_NewPool(struct pool *pp);
void SES_DeletePool(struct sesspool *sp, struct worker *wrk); void SES_DeletePool(struct sesspool *sp, struct worker *wrk);
int SES_Schedule(struct sess *sp);
/* cache_shmlog.c */ /* cache_shmlog.c */
void VSL_Init(void); void VSL_Init(void);
......
...@@ -504,7 +504,7 @@ hsh_rush(struct objhead *oh) ...@@ -504,7 +504,7 @@ hsh_rush(struct objhead *oh)
AZ(sp->wrk); AZ(sp->wrk);
VTAILQ_REMOVE(&wl->list, sp, list); VTAILQ_REMOVE(&wl->list, sp, list);
DSL(0x20, SLT_Debug, sp->id, "off waiting list"); DSL(0x20, SLT_Debug, sp->id, "off waiting list");
if (Pool_QueueSession(sp)) { if (SES_Schedule(sp)) {
/* /*
* We could not schedule the session, leave the * We could not schedule the session, leave the
* rest on the busy list. * rest on the busy list.
......
...@@ -261,47 +261,33 @@ Pool_Work_Thread(void *priv, struct worker *w) ...@@ -261,47 +261,33 @@ Pool_Work_Thread(void *priv, struct worker *w)
*/ */
static int static int
WRK_Queue(struct sess *sp) WRK_Queue(struct pool *pp, struct sess *sp)
{ {
struct worker *w; struct worker *w;
struct pool *qp;
static unsigned nq = 0;
unsigned onq;
/* Lck_Lock(&pp->mtx);
* Select which pool we issue to
* XXX: better alg ?
* XXX: per CPU ?
*/
onq = nq + 1;
if (onq >= nwq)
onq = 0;
qp = wq[onq];
nq = onq;
Lck_Lock(&qp->mtx);
/* If there are idle threads, we tickle the first one into action */ /* If there are idle threads, we tickle the first one into action */
w = VTAILQ_FIRST(&qp->idle); w = VTAILQ_FIRST(&pp->idle);
if (w != NULL) { if (w != NULL) {
VTAILQ_REMOVE(&qp->idle, w, list); VTAILQ_REMOVE(&pp->idle, w, list);
Lck_Unlock(&qp->mtx); Lck_Unlock(&pp->mtx);
w->sp = sp; w->sp = sp;
AZ(pthread_cond_signal(&w->cond)); AZ(pthread_cond_signal(&w->cond));
return (0); return (0);
} }
/* If we have too much in the queue already, refuse. */ /* If we have too much in the queue already, refuse. */
if (qp->lqueue > queue_max) { if (pp->lqueue > queue_max) {
qp->ndrop++; pp->ndrop++;
Lck_Unlock(&qp->mtx); Lck_Unlock(&pp->mtx);
return (-1); return (-1);
} }
VTAILQ_INSERT_TAIL(&qp->queue, sp, poollist); VTAILQ_INSERT_TAIL(&pp->queue, sp, poollist);
qp->nqueue++; pp->nqueue++;
qp->lqueue++; pp->lqueue++;
Lck_Unlock(&qp->mtx); Lck_Unlock(&pp->mtx);
AZ(pthread_cond_signal(&herder_cond)); AZ(pthread_cond_signal(&herder_cond));
return (0); return (0);
} }
...@@ -309,11 +295,11 @@ WRK_Queue(struct sess *sp) ...@@ -309,11 +295,11 @@ WRK_Queue(struct sess *sp)
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
int int
Pool_QueueSession(struct sess *sp) Pool_Schedule(struct pool *pp, struct sess *sp)
{ {
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
AZ(sp->wrk); AZ(sp->wrk);
if (WRK_Queue(sp) == 0) if (WRK_Queue(pp, sp) == 0)
return(0); return(0);
VSC_C_main->client_drop_late++; VSC_C_main->client_drop_late++;
...@@ -332,7 +318,6 @@ Pool_QueueSession(struct sess *sp) ...@@ -332,7 +318,6 @@ Pool_QueueSession(struct sess *sp)
*/ */
VCL_Rel(&sp->vcl); VCL_Rel(&sp->vcl);
} }
SES_Delete(sp, "dropped");
return (1); return (1);
} }
...@@ -353,7 +338,7 @@ pool_mkpool(void) ...@@ -353,7 +338,7 @@ pool_mkpool(void)
VTAILQ_INIT(&pp->queue); VTAILQ_INIT(&pp->queue);
VTAILQ_INIT(&pp->idle); VTAILQ_INIT(&pp->idle);
VTAILQ_INIT(&pp->socks); VTAILQ_INIT(&pp->socks);
pp->sesspool = SES_NewPool(); pp->sesspool = SES_NewPool(pp);
AN(pp->sesspool); AN(pp->sesspool);
VTAILQ_FOREACH(ls, &heritage.socks, list) { VTAILQ_FOREACH(ls, &heritage.socks, list) {
......
...@@ -64,6 +64,7 @@ struct sessmem { ...@@ -64,6 +64,7 @@ struct sessmem {
struct sesspool { struct sesspool {
unsigned magic; unsigned magic;
#define SESSPOOL_MAGIC 0xd916e202 #define SESSPOOL_MAGIC 0xd916e202
struct pool *pool;
VTAILQ_HEAD(,sessmem) freelist; VTAILQ_HEAD(,sessmem) freelist;
struct lock mtx; struct lock mtx;
unsigned nsess; unsigned nsess;
...@@ -236,6 +237,35 @@ SES_Alloc(void) ...@@ -236,6 +237,35 @@ SES_Alloc(void)
return (sp); return (sp);
} }
/*--------------------------------------------------------------------
* Schedule a session back on a work-thread from its pool
*/
int
SES_Schedule(struct sess *sp)
{
struct sessmem *sm;
struct sesspool *pp;
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
AZ(sp->wrk);
sm = sp->mem;
CHECK_OBJ_NOTNULL(sm, SESSMEM_MAGIC);
pp = sm->pool;
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
AN(pp->pool);
if (Pool_Schedule(pp->pool, sp)) {
SES_Delete(sp, "dropped");
return (1);
}
return (0);
}
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* Handle a session (from waiter) * Handle a session (from waiter)
* *
...@@ -255,7 +285,7 @@ SES_Handle(struct sess *sp, int status) ...@@ -255,7 +285,7 @@ SES_Handle(struct sess *sp, int status)
break; break;
case 1: case 1:
sp->step = STP_START; sp->step = STP_START;
(void)Pool_QueueSession(sp); (void)SES_Schedule(sp);
break; break;
default: default:
WRONG("Unexpected return from HTC_Rx()"); WRONG("Unexpected return from HTC_Rx()");
...@@ -355,12 +385,13 @@ SES_Delete(struct sess *sp, const char *reason) ...@@ -355,12 +385,13 @@ SES_Delete(struct sess *sp, const char *reason)
*/ */
struct sesspool * struct sesspool *
SES_NewPool(void) SES_NewPool(struct pool *pp)
{ {
struct sesspool *sp; struct sesspool *sp;
ALLOC_OBJ(sp, SESSPOOL_MAGIC); ALLOC_OBJ(sp, SESSPOOL_MAGIC);
AN(sp); AN(sp);
sp->pool = pp;
VTAILQ_INIT(&sp->freelist); VTAILQ_INIT(&sp->freelist);
Lck_New(&sp->mtx, lck_sessmem); Lck_New(&sp->mtx, lck_sessmem);
return (sp); return (sp);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment