Commit edf5144f authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Add the new pool-task API so Martin can see it.

Presently unused and untested.
parent 29fcad40
......@@ -286,6 +286,22 @@ struct wrk_accept {
struct listen_sock *acceptlsock;
};
/* Worker pool stuff -------------------------------------------------*/
typedef void pool_func_t(struct pool *pp, void *priv);
struct pool_task {
VTAILQ_ENTRY(pool_task) list;
pool_func_t *func;
void *priv;
};
enum pool_how {
POOL_NO_QUEUE,
POOL_QUEUE_FRONT,
POOL_QUEUE_BACK
};
/*--------------------------------------------------------------------*/
enum e_do_what {
......@@ -307,6 +323,10 @@ struct worker {
void *nhashpriv;
struct dstat stats;
/* New Pool stuff */
pool_func_t *pool_func;
void *pool_priv;
/* Pool stuff */
enum e_do_what do_what;
......@@ -648,6 +668,7 @@ struct sess {
struct worker *wrk;
struct req *req;
struct pool_task task;
VTAILQ_ENTRY(sess) list;
/* Session related fields ------------------------------------*/
......@@ -894,6 +915,7 @@ void PipeSession(struct sess *sp);
void Pool_Init(void);
void Pool_Work_Thread(void *priv, struct worker *w);
int Pool_Schedule(struct pool *pp, struct sess *sp);
int Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how);
#define WRW_IsReleased(w) ((w)->wrw.wfd == NULL)
int WRW_Error(const struct worker *w);
......
......@@ -92,24 +92,26 @@ struct poolsock {
/* Number of work requests queued in excess of worker threads available */
struct pool {
unsigned magic;
#define POOL_MAGIC 0x606658fa
VTAILQ_ENTRY(pool) list;
pthread_cond_t herder_cond;
struct lock herder_mtx;
pthread_t herder_thr;
struct lock mtx;
struct workerhead idle;
VTAILQ_HEAD(, sess) queue;
VTAILQ_HEAD(, poolsock) socks;
unsigned nthr;
unsigned lqueue;
unsigned last_lqueue;
uintmax_t ndropped;
uintmax_t nqueued;
struct sesspool *sesspool;
unsigned magic;
#define POOL_MAGIC 0x606658fa
VTAILQ_ENTRY(pool) list;
pthread_cond_t herder_cond;
struct lock herder_mtx;
pthread_t herder_thr;
struct lock mtx;
struct workerhead idle;
VTAILQ_HEAD(, pool_task) front_queue;
VTAILQ_HEAD(, pool_task) back_queue;
VTAILQ_HEAD(, sess) queue;
VTAILQ_HEAD(, poolsock) socks;
unsigned nthr;
unsigned lqueue;
unsigned last_lqueue;
uintmax_t ndropped;
uintmax_t nqueued;
struct sesspool *sesspool;
};
static struct lock pool_mtx;
......@@ -173,6 +175,51 @@ pool_accept(struct pool *pp, struct worker *wrk, const struct poolsock *ps)
}
}
/*--------------------------------------------------------------------
* Enter a new task to be done
*/
int
Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
{
struct worker *wrk;
int retval = 0;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
Lck_Lock(&pp->mtx);
/*
* The common case first: Take an idle thread, do it.
*/
wrk = VTAILQ_FIRST(&pp->idle);
if (wrk != NULL) {
VTAILQ_REMOVE(&pp->idle, wrk, list);
Lck_Unlock(&pp->mtx);
wrk->pool_func = task->func;
wrk->pool_priv = task->priv;
AZ(pthread_cond_signal(&wrk->cond));
return (0);
}
switch (how) {
case POOL_NO_QUEUE:
retval = -1;
break;
case POOL_QUEUE_FRONT:
VTAILQ_INSERT_TAIL(&pp->front_queue, task, list);
break;
case POOL_QUEUE_BACK:
VTAILQ_INSERT_TAIL(&pp->back_queue, task, list);
break;
default:
WRONG("Unknown enum pool_how");
}
Lck_Unlock(&pp->mtx);
return (retval);
}
/*--------------------------------------------------------------------
* This is the work function for worker threads in the pool.
*/
......@@ -183,6 +230,7 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
struct pool *pp;
int stats_clean, i;
struct poolsock *ps;
struct pool_task *tp;
CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
wrk->pool = pp;
......@@ -197,6 +245,24 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
WS_Reset(wrk->ws, NULL);
tp = VTAILQ_FIRST(&pp->front_queue);
if (tp != NULL)
VTAILQ_REMOVE(&pp->front_queue, tp, list);
if (tp == NULL) {
tp = VTAILQ_FIRST(&pp->back_queue);
if (tp != NULL)
VTAILQ_REMOVE(&pp->back_queue, tp, list);
}
if (tp != NULL) {
Lck_Unlock(&pp->mtx);
tp->func(pp, tp->priv);
stats_clean = WRK_TrySumStat(wrk);
Lck_Lock(&pp->mtx);
continue;
}
wrk->sp = VTAILQ_FIRST(&pp->queue);
if (wrk->sp != NULL) {
/* Process queued requests, if any */
......@@ -483,6 +549,8 @@ pool_mkpool(unsigned pool_no)
VTAILQ_INIT(&pp->queue);
VTAILQ_INIT(&pp->idle);
VTAILQ_INIT(&pp->socks);
VTAILQ_INIT(&pp->front_queue);
VTAILQ_INIT(&pp->back_queue);
pp->sesspool = SES_NewPool(pp, pool_no);
AN(pp->sesspool);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment