Commit 684467d9 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Move all the acceptor related stuff out of cache_pool.c and

into cache_acceptor.c where it belongs.
parent eff28c6d
......@@ -704,9 +704,9 @@ struct sess {
/* cache_acceptor.c */
void VCA_Init(void);
void VCA_Shutdown(void);
int VCA_Accept(struct listen_sock *ls, struct wrk_accept *wa);
const char *VCA_SetupSess(struct worker *w, struct sess *sp);
void VCA_FailSess(struct worker *w);
void VCA_New_SessPool(struct pool *pp, struct sesspool *sp);
/* cache_backend_cfg.c */
void VBE_InitCfg(void);
......@@ -987,10 +987,12 @@ const char *sess_close_2str(enum sess_close sc, int want_desc);
/* cache_pool.c */
void Pool_Init(void);
void Pool_Accept(void);
void Pool_Work_Thread(struct pool *, struct worker *w);
int Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how);
int Pool_Task_Arg(struct worker *, task_func_t *,
const void *arg, size_t arg_len);
void Pool_Sumstat(struct worker *w);
int Pool_TrySumstat(struct worker *wrk);
void Pool_PurgeStat(unsigned nobj);
#define V1L_IsReleased(w) ((w)->v1l == NULL)
......
......@@ -29,9 +29,6 @@
* This source file has the various trickery surrounding the accept/listen
* sockets.
*
* The actual acceptance is done from cache_pool.c, by calling
* into VCA_Accept() in this file.
*
* Once the session is allocated we move into it with a call to
* VCA_SetupSess().
*
......@@ -56,9 +53,17 @@
#include "vtim.h"
static pthread_t VCA_thread;
static int hack_ready;
static double vca_pace = 0.0;
static struct lock pace_mtx;
static unsigned pool_accepting;
struct poolsock {
unsigned magic;
#define POOLSOCK_MAGIC 0x1b0a2d38
struct listen_sock *lsock;
struct pool_task task;
struct sesspool *sesspool;
};
/*--------------------------------------------------------------------
* TCP options we want to control
......@@ -268,49 +273,6 @@ vca_pace_good(void)
Lck_Unlock(&pace_mtx);
}
/*--------------------------------------------------------------------
* Accept on a listen socket, and handle error returns.
*
* Called from a worker thread from a pool
*/
int
VCA_Accept(struct listen_sock *ls, struct wrk_accept *wa)
{
int i;
CHECK_OBJ_NOTNULL(ls, LISTEN_SOCK_MAGIC);
vca_pace_check();
while(!hack_ready)
(void)usleep(100*1000);
wa->acceptaddrlen = sizeof wa->acceptaddr;
do {
i = accept(ls->sock, (void*)&wa->acceptaddr,
&wa->acceptaddrlen);
} while (i < 0 && errno == EAGAIN);
if (i < 0) {
switch (errno) {
case ECONNABORTED:
break;
case EMFILE:
VSL(SLT_Debug, ls->sock, "Too many open files");
vca_pace_bad();
break;
default:
VSL(SLT_Debug, ls->sock, "Accept failed: %s",
strerror(errno));
vca_pace_bad();
break;
}
}
wa->acceptlsock = ls;
wa->acceptsock = i;
return (i);
}
/*--------------------------------------------------------------------
* Fail a session
*
......@@ -365,6 +327,103 @@ VCA_SetupSess(struct worker *wrk, struct sess *sp)
return (retval);
}
/*--------------------------------------------------------------------
* Nobody is accepting on this socket, so we do.
*
* As long as we can stick the accepted connection to another thread
* we do so, otherwise we put the socket back on the "BACK" queue
* and handle the new connection ourselves.
*
* We store data about the accept in reserved workspace on the reserved
* worker workspace. SES_pool_accept_task() knows about this.
*/
static void __match_proto__(task_func_t)
vca_accept_task(struct worker *wrk, void *arg)
{
struct wrk_accept wa;
struct poolsock *ps;
struct listen_sock *ls;
int i;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
ls = ps->lsock;
CHECK_OBJ_NOTNULL(ls, LISTEN_SOCK_MAGIC);
/* Delay until we are ready (flag is set when all
* initialization has finished) */
while (!pool_accepting)
VTIM_sleep(.1);
while (1) {
INIT_OBJ(&wa, WRK_ACCEPT_MAGIC);
wa.sesspool = ps->sesspool;
wa.acceptlsock = ls;
assert(ls->sock > 0); // We know where stdin is
vca_pace_check();
wa.acceptaddrlen = sizeof wa.acceptaddr;
do {
i = accept(ls->sock, (void*)&wa.acceptaddr,
&wa.acceptaddrlen);
} while (i < 0 && errno == EAGAIN);
if (i < 0) {
switch (errno) {
case ECONNABORTED:
break;
case EMFILE:
VSL(SLT_Debug, ls->sock, "Too many open files");
vca_pace_bad();
break;
default:
VSL(SLT_Debug, ls->sock, "Accept failed: %s",
strerror(errno));
vca_pace_bad();
break;
}
wrk->stats->sess_fail++;
(void)Pool_TrySumstat(wrk);
continue;
}
wa.acceptsock = i;
if (!Pool_Task_Arg(wrk, SES_pool_accept_task, &wa, sizeof wa)) {
AZ(Pool_Task(wrk->pool, &ps->task, POOL_QUEUE_BACK));
return;
}
/*
* We were able to hand off, so release this threads VCL
* reference (if any) so we don't hold on to discarded VCLs.
*/
if (wrk->vcl != NULL)
VCL_Rel(&wrk->vcl);
}
}
void
VCA_New_SessPool(struct pool *pp, struct sesspool *sp)
{
struct listen_sock *ls;
struct poolsock *ps;
VTAILQ_FOREACH(ls, &heritage.socks, list) {
assert(ls->sock > 0); // We know where stdin is
ALLOC_OBJ(ps, POOLSOCK_MAGIC);
AN(ps);
ps->lsock = ls;
ps->task.func = vca_accept_task;
ps->task.priv = ps;
ps->sesspool = sp;
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
}
}
/*--------------------------------------------------------------------*/
static void *
......@@ -380,8 +439,7 @@ vca_acct(void *arg)
(void)vca_tcp_opt_init();
VTAILQ_FOREACH(ls, &heritage.socks, list) {
if (ls->sock < 0)
continue;
assert (ls->sock > 0); // We know where stdin is
AZ(listen(ls->sock, cache_param->listen_depth));
vca_tcp_opt_set(ls->sock, 1);
if (cache_param->accept_filter) {
......@@ -393,18 +451,15 @@ vca_acct(void *arg)
}
}
hack_ready = 1;
pool_accepting = 1;
need_test = 1;
t0 = VTIM_real();
while (1) {
(void)sleep(1);
if (vca_tcp_opt_init()) {
VTAILQ_FOREACH(ls, &heritage.socks, list) {
if (ls->sock < 0)
continue;
VTAILQ_FOREACH(ls, &heritage.socks, list)
vca_tcp_opt_set(ls->sock, 1);
}
}
now = VTIM_real();
VSC_C_main->uptime = (uint64_t)(now - t0);
......@@ -412,7 +467,6 @@ vca_acct(void *arg)
NEEDLESS_RETURN(NULL);
}
/*--------------------------------------------------------------------*/
static void
......@@ -444,7 +498,7 @@ ccf_listen_address(struct cli *cli, const char * const *av, void *priv)
* a race where varnishtest::client would attempt to connect(2)
* before listen(2) has been called.
*/
while(!hack_ready)
while(!pool_accepting)
(void)usleep(100*1000);
VTAILQ_FOREACH(ls, &heritage.socks, list) {
......
......@@ -245,8 +245,6 @@ child_main(void)
if (FEATURE(FEATURE_WAIT_SILO))
SMP_Ready();
Pool_Accept();
CLI_Run();
BAN_Shutdown();
......
......@@ -38,20 +38,11 @@
#include <stdlib.h>
#include "cache.h"
#include "common/heritage.h"
#include "vtim.h"
VTAILQ_HEAD(taskhead, pool_task);
struct poolsock {
unsigned magic;
#define POOLSOCK_MAGIC 0x1b0a2d38
struct listen_sock *lsock;
struct pool_task task;
struct sesspool *sesspool;
};
/* Number of work requests queued in excess of worker threads available */
struct pool {
......@@ -78,7 +69,6 @@ struct pool {
static struct lock pool_mtx;
static pthread_t thr_pool_herder;
static unsigned pool_accepting = 0;
static struct lock wstat_mtx;
......@@ -110,7 +100,7 @@ Pool_Sumstat(struct worker *wrk)
memset(wrk->stats, 0, sizeof *wrk->stats);
}
static int
int
Pool_TrySumstat(struct worker *wrk)
{
if (Lck_Trylock(&wstat_mtx))
......@@ -185,8 +175,9 @@ pool_getidleworker(struct pool *pp)
* Return one if another thread was scheduled, otherwise zero.
*/
static int
Pool_Task_Arg(struct worker *wrk, const void *arg, size_t arg_len)
int
Pool_Task_Arg(struct worker *wrk, task_func_t *func,
const void *arg, size_t arg_len)
{
struct pool *pp;
struct worker *wrk2;
......@@ -212,67 +203,13 @@ Pool_Task_Arg(struct worker *wrk, const void *arg, size_t arg_len)
assert(arg_len == WS_Reserve(wrk2->aws, arg_len));
memcpy(wrk2->aws->f, arg, arg_len);
wrk2->task.func = SES_pool_accept_task;
wrk2->task.func = func;
wrk2->task.priv = wrk2->aws->f;
if (retval)
AZ(pthread_cond_signal(&wrk2->cond));
return (retval);
}
/*--------------------------------------------------------------------
* Nobody is accepting on this socket, so we do.
*
* As long as we can stick the accepted connection to another thread
* we do so, otherwise we put the socket back on the "BACK" queue
* and handle the new connection ourselves.
*
* We store data about the accept in reserved workspace on the reserved
* worker workspace. SES_pool_accept_task() knows about this.
*/
static void __match_proto__(task_func_t)
pool_accept(struct worker *wrk, void *arg)
{
struct wrk_accept wa;
struct poolsock *ps;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
CHECK_OBJ_NOTNULL(ps->lsock, LISTEN_SOCK_MAGIC);
/* Delay until we are ready (flag is set when all
* initialization has finished) */
while (!pool_accepting)
VTIM_sleep(.1);
while (1) {
INIT_OBJ(&wa, WRK_ACCEPT_MAGIC);
wa.sesspool = ps->sesspool;
assert(ps->lsock->sock > 0); // We know where stdin is
if (VCA_Accept(ps->lsock, &wa) < 0) {
wrk->stats->sess_fail++;
/* We're going to pace in vca anyway... */
(void)Pool_TrySumstat(wrk);
continue;
}
if (!Pool_Task_Arg(wrk, &wa, sizeof wa)) {
AZ(Pool_Task(wrk->pool, &ps->task, POOL_QUEUE_BACK));
return;
}
/*
* We were able to hand off, so release this threads VCL
* reference (if any) so we don't hold on to discarded VCLs.
*/
if (wrk->vcl != NULL)
VCL_Rel(&wrk->vcl);
}
}
/*--------------------------------------------------------------------
* Enter a new task to be done
*/
......@@ -596,8 +533,6 @@ static struct pool *
pool_mkpool(unsigned pool_no)
{
struct pool *pp;
struct listen_sock *ls;
struct poolsock *ps;
ALLOC_OBJ(pp, POOL_MAGIC);
if (pp == NULL)
......@@ -617,17 +552,6 @@ pool_mkpool(unsigned pool_no)
pp->sesspool = SES_NewPool(pp, pool_no);
AN(pp->sesspool);
VTAILQ_FOREACH(ls, &heritage.socks, list) {
assert(ls->sock > 0); // We know where stdin is
ALLOC_OBJ(ps, POOLSOCK_MAGIC);
AN(ps);
ps->lsock = ls;
ps->task.func = pool_accept;
ps->task.priv = ps;
ps->sesspool = pp->sesspool;
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
}
return (pp);
}
......@@ -672,14 +596,6 @@ pool_poolherder(void *priv)
/*--------------------------------------------------------------------*/
void
Pool_Accept(void)
{
ASSERT_CLI();
pool_accepting = 1;
}
void
Pool_Init(void)
{
......
......@@ -505,6 +505,8 @@ SES_NewPool(struct pool *wp, unsigned pool_no)
pp->mpl_sess = MPL_New(nb, &cache_param->sess_pool,
&cache_param->workspace_session);
pp->http1_waiter = Wait_New(ses_handle, &cache_param->timeout_idle);
VCA_New_SessPool(wp, pp);
return (pp);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment