Commit 12bf085b authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Get rid of the single acceptor thread.

Instead of a single thread which accepts all sockets with a poll/accept
sequence, each thread-pool will have a thread accepting on each socket.

If no threads are available, the sockets will not be accepted on.

CAVEATS:

This commit undoubtedly leaves a number of minor issues dangling,
they will be cleaned up as we find them.

Please notice that there are changes to stats counters (some don't
work right now a nd more changes will be coming)

Changing certain acceptor-related params on the fly may not work.

Testing would be very welcome.
parent ea743433
......@@ -305,7 +305,9 @@ struct worker {
/* Accept stuff */
struct sockaddr_storage acceptaddr;
socklen_t acceptaddrlen;
int acceptsock;
struct listen_sock *acceptlsock;
struct wrw wrw;
......@@ -641,7 +643,7 @@ void VCA_Prep(struct sess *sp);
void VCA_Init(void);
void VCA_Shutdown(void);
int VCA_Accept(int sock, socklen_t *slp, struct sockaddr_storage *sap);
extern pthread_t VCA_thread;
void VCA_SetupSess(struct worker *w);
/* cache_backend.c */
void VBE_UseHealth(const struct director *vdi);
......@@ -849,12 +851,13 @@ void WRW_Sendfile(struct worker *w, int fd, off_t off, unsigned len);
/* cache_session.c [SES] */
void SES_Init(void);
struct sess *SES_New(struct sesspool *pp);
struct sess *SES_New(struct worker *wrk, struct sesspool *pp);
struct sess *SES_Alloc(void);
void SES_Close(struct sess *sp, const char *reason);
void SES_Delete(struct sess *sp, const char *reason);
void SES_Charge(struct sess *sp);
struct sesspool *SES_NewPool(void);
/* cache_shmlog.c */
void VSL_Init(void);
......
......@@ -33,6 +33,7 @@
#include <errno.h>
#include <poll.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
......@@ -41,7 +42,7 @@
#include "cli_priv.h"
#include "cache.h"
pthread_t VCA_thread;
static pthread_t VCA_thread;
static struct timeval tv_sndtimeo;
static struct timeval tv_rcvtimeo;
......@@ -178,6 +179,7 @@ vca_pace_check(void)
static void
vca_pace_bad(void)
{
Lck_Lock(&pace_mtx);
vca_pace += params->acceptor_sleep_incr;
if (vca_pace > params->acceptor_sleep_max)
......@@ -202,13 +204,19 @@ vca_pace_good(void)
* Accept on a listen socket, and handle error returns.
*/
static int hack_ready;
int
VCA_Accept(int sock, socklen_t *slp, struct sockaddr_storage *sap)
{
int i;
assert(sock >= 0);
vca_pace_check();
while(!hack_ready)
(void)usleep(100*1000);
*slp = sizeof *sap;
i = accept(sock, (void*)sap, slp);
......@@ -234,45 +242,67 @@ VCA_Accept(int sock, socklen_t *slp, struct sockaddr_storage *sap)
/*--------------------------------------------------------------------*/
void
VCA_SetupSess(struct worker *w)
{
struct sess *sp;
sp = w->sp;
if (sp == NULL) {
AZ(close(w->acceptsock));
w->acceptsock = -1;
VSC_C_main->client_drop++;
/* XXX: 50x Reply ? */
vca_pace_bad();
INCOMPL();
}
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
sp->fd = w->acceptsock;
sp->id = w->acceptsock;
w->acceptsock = -1;
sp->t_open = TIM_real();
sp->t_end = sp->t_end;
sp->mylsock = w->acceptlsock;
assert(w->acceptaddrlen <= sp->sockaddrlen);
memcpy(sp->sockaddr, &w->acceptaddr, w->acceptaddrlen);
sp->sockaddrlen = w->acceptaddrlen;
sp->step = STP_FIRST;
vca_pace_good();
w->sp = sp;
w->stats.client_conn++;
}
/*--------------------------------------------------------------------*/
static void *
vca_acct(void *arg)
{
struct sess *sp;
socklen_t l;
struct sockaddr_storage addr_s;
struct sockaddr *addr;
#ifdef SO_RCVTIMEO_WORKS
double sess_timeout = 0;
#endif
#ifdef SO_SNDTIMEO_WORKS
double send_timeout = 0;
#endif
int i;
struct pollfd *pfd;
struct listen_sock *ls;
unsigned u;
double t0, now;
THR_SetName("cache-acceptor");
(void)arg;
/* Set up the poll argument */
pfd = calloc(sizeof *pfd, heritage.nsocks);
AN(pfd);
i = 0;
VTAILQ_FOREACH(ls, &heritage.socks, list) {
if (ls->sock < 0)
continue;
AZ(listen(ls->sock, params->listen_depth));
AZ(setsockopt(ls->sock, SOL_SOCKET, SO_LINGER,
&linger, sizeof linger));
pfd[i].events = POLLIN;
pfd[i++].fd = ls->sock;
}
hack_ready = 1;
need_test = 1;
t0 = TIM_real();
while (1) {
(void)sleep(1);
#ifdef SO_SNDTIMEO_WORKS
if (params->send_timeout != send_timeout) {
need_test = 1;
......@@ -301,45 +331,8 @@ vca_acct(void *arg)
}
}
#endif
i = poll(pfd, heritage.nsocks, 1000);
now = TIM_real();
VSC_C_main->uptime = (uint64_t)(now - t0);
u = 0;
VTAILQ_FOREACH(ls, &heritage.socks, list) {
if (ls->sock < 0)
continue;
if (pfd[u++].revents == 0)
continue;
VSC_C_main->client_conn++;
l = sizeof addr_s;
addr = (void*)&addr_s;
i = VCA_Accept(ls->sock, &l, &addr_s);
if (i < 0)
continue;
sp = SES_New(NULL);
if (sp == NULL) {
AZ(close(i));
VSC_C_main->client_drop++;
vca_pace_bad();
continue;
}
sp->fd = i;
sp->id = i;
sp->t_open = now;
sp->t_end = now;
sp->mylsock = ls;
assert(l < sp->sockaddrlen);
memcpy(sp->sockaddr, addr, l);
sp->sockaddrlen = l;
sp->step = STP_FIRST;
if (Pool_QueueSession(sp)) {
VSC_C_main->client_drop++;
vca_pace_bad();
} else {
vca_pace_good();
}
}
}
NEEDLESS_RETURN(NULL);
}
......
......@@ -369,7 +369,6 @@ cnt_done(struct sess *sp)
if (sp->fd < 0) {
sp->wrk->stats.sess_closed++;
sp->wrk = NULL;
SES_Delete(sp, NULL);
return (1);
}
......
......@@ -509,7 +509,6 @@ hsh_rush(struct objhead *oh)
* We could not schedule the session, leave the
* rest on the busy list.
*/
VSC_C_main->client_drop_late++;
break;
}
}
......
......@@ -113,7 +113,6 @@ child_main(void)
VCL_Init();
HTTP_Init();
SES_Init();
VBE_Init();
VBP_Init();
......
......@@ -66,6 +66,7 @@ struct poolsock {
unsigned magic;
#define POOLSOCK_MAGIC 0x1b0a2d38
VTAILQ_ENTRY(poolsock) list;
struct listen_sock *lsock;
int sock;
};
......@@ -83,6 +84,7 @@ struct pool {
unsigned last_lqueue;
uintmax_t ndrop;
uintmax_t nqueue;
struct sesspool *sesspool;
};
static struct pool **wq;
......@@ -95,11 +97,46 @@ static struct lock herder_mtx;
/*--------------------------------------------------------------------*/
static void
pool_accept(struct pool *pp, struct worker *w, const struct poolsock *ps)
{
struct worker *w2;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(ps, POOLSOCK_MAGIC);
assert(ps->sock >= 0);
Lck_AssertHeld(&pp->mtx);
Lck_Unlock(&pp->mtx);
while (1) {
w->acceptsock =
VCA_Accept(ps->sock, &w->acceptaddrlen, &w->acceptaddr);
if (w->acceptsock == -1)
continue;
w->acceptlsock = ps->lsock;
Lck_Lock(&pp->mtx);
if (VTAILQ_EMPTY(&pp->idle))
return;
w2 = VTAILQ_FIRST(&pp->idle);
VTAILQ_REMOVE(&pp->idle, w2, list);
Lck_Unlock(&pp->mtx);
w2->acceptaddr = w->acceptaddr;
w2->acceptaddrlen = w->acceptaddrlen;
w2->acceptsock = w->acceptsock;
w2->acceptlsock = w->acceptlsock;
AZ(pthread_cond_signal(&w2->cond));
}
}
/*--------------------------------------------------------------------*/
void
Pool_Work_Thread(void *priv, struct worker *w)
{
struct pool *qp;
int stats_clean;
struct poolsock *ps;
CAST_OBJ_NOTNULL(qp, priv, POOL_MAGIC);
w->pool = qp;
......@@ -107,6 +144,9 @@ Pool_Work_Thread(void *priv, struct worker *w)
qp->nthr++;
stats_clean = 1;
while (1) {
Lck_AssertHeld(&qp->mtx);
CHECK_OBJ_NOTNULL(w->bereq, HTTP_MAGIC);
CHECK_OBJ_NOTNULL(w->beresp, HTTP_MAGIC);
CHECK_OBJ_NOTNULL(w->resp, HTTP_MAGIC);
......@@ -117,17 +157,29 @@ Pool_Work_Thread(void *priv, struct worker *w)
if (w->sp != NULL) {
VTAILQ_REMOVE(&qp->queue, w->sp, poollist);
qp->lqueue--;
} else {
} else if (VTAILQ_EMPTY(&qp->socks)) {
if (isnan(w->lastused))
w->lastused = TIM_real();
VTAILQ_INSERT_HEAD(&qp->idle, w, list);
if (!stats_clean)
WRK_SumStat(w);
Lck_CondWait(&w->cond, &qp->mtx);
} else {
ps = VTAILQ_FIRST(&qp->socks);
VTAILQ_REMOVE(&qp->socks, ps, list);
pool_accept(qp, w, ps);
Lck_AssertHeld(&qp->mtx);
VTAILQ_INSERT_TAIL(&qp->socks, ps, list);
}
if (w->sp == NULL)
if (w->sp == NULL && w->acceptsock == -1)
break;
Lck_Unlock(&qp->mtx);
if (w->sp == NULL) {
w->sp = SES_New(w, qp->sesspool);
VCA_SetupSess(w);
}
AN(w->sp);
assert(w->acceptsock == -1);
stats_clean = 0;
w->lastused = NAN;
WS_Reset(w->ws, NULL);
......@@ -221,7 +273,9 @@ Pool_QueueSession(struct sess *sp)
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
AZ(sp->wrk);
if (WRK_Queue(sp) == 0)
return (0);
return(0);
VSC_C_main->client_drop_late++;
/*
* Couldn't queue it -- kill it.
......@@ -258,11 +312,16 @@ pool_mkpool(void)
VTAILQ_INIT(&pp->queue);
VTAILQ_INIT(&pp->idle);
VTAILQ_INIT(&pp->socks);
pp->sesspool = SES_NewPool();
AN(pp->sesspool);
VTAILQ_FOREACH(ls, &heritage.socks, list) {
if (ls->sock < 0)
continue;
ALLOC_OBJ(ps, POOLSOCK_MAGIC);
XXXAN(ps);
ps->sock = ls->sock;
ps->lsock = ls;
VTAILQ_INSERT_TAIL(&pp->socks, ps, list);
}
return (pp);
......
......@@ -28,6 +28,9 @@
*
* Session management
*
* This is a little bit of a mixed back, containing both memory management
* and various state-change functions.
*
*/
#include "config.h"
......@@ -39,7 +42,6 @@
#include <sys/socket.h>
#include "cache.h"
#include "cache_backend.h"
#include "cache_waiter.h"
/*--------------------------------------------------------------------*/
......@@ -63,16 +65,12 @@ struct sesspool {
VTAILQ_HEAD(,sessmem) freelist;
struct lock mtx;
unsigned nsess;
unsigned maxsess;
unsigned dly_free_cnt;
};
static struct sesspool *sesspool;
/*--------------------------------------------------------------------*/
static struct lock stat_mtx;
/*--------------------------------------------------------------------*/
/*--------------------------------------------------------------------
* Charge statistics from worker to request and session.
*/
void
SES_Charge(struct sess *sp)
......@@ -112,16 +110,12 @@ ses_sm_alloc(void)
hl = HTTP_estimate(nhttp);
l = sizeof *sm + nws + 2 * hl;
VSC_C_main->g_sessmem_size = l;
p = malloc(l);
if (p == NULL)
return (NULL);
q = p + l;
/* XXX Stats */
Lck_Lock(&stat_mtx);
VSC_C_main->n_sess_mem++;
Lck_Unlock(&stat_mtx);
/* Don't waste time zeroing the workspace */
memset(p, 0, l - nws);
......@@ -179,38 +173,42 @@ ses_setup(struct sessmem *sm)
*/
struct sess *
SES_New(struct sesspool *pp)
SES_New(struct worker *wrk, struct sesspool *pp)
{
struct sessmem *sm;
struct sess *sp;
int do_alloc = 0;
int do_alloc;
if (pp == NULL)
pp = sesspool;
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
do_alloc = 0;
Lck_Lock(&pp->mtx);
sm = VTAILQ_FIRST(&pp->freelist);
if (sm != NULL) {
VTAILQ_REMOVE(&pp->freelist, sm, list);
} else if (pp->nsess < pp->maxsess) {
} else if (pp->nsess < params->max_sess) {
pp->nsess++;
do_alloc = 1;
}
wrk->stats.c_sessmem_free += pp->dly_free_cnt;
pp->dly_free_cnt = 0;
Lck_Unlock(&pp->mtx);
if (do_alloc) {
sm = ses_sm_alloc();
if (sm != NULL) {
wrk->stats.c_sessmem_alloc++;
sm->pool = pp;
ses_setup(sm);
} else {
wrk->stats.c_sessmem_fail++;
}
} else if (sm == NULL) {
wrk->stats.c_sessmem_limit++;
}
if (sm == NULL)
return (NULL);
sp = &sm->sess;
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
/* XXX Stats */
VSC_C_main->n_sess++; /* XXX: locking ? */
return (sp);
}
......@@ -251,8 +249,7 @@ SES_Handle(struct sess *sp, int status)
break;
case 1:
sp->step = STP_START;
if (Pool_QueueSession(sp))
VSC_C_main->client_drop_late++;
(void)Pool_QueueSession(sp);
break;
default:
WRONG("Unexpected return from HTC_Rx()");
......@@ -276,16 +273,21 @@ SES_Close(struct sess *sp, const char *reason)
}
/*--------------------------------------------------------------------
* (Close &) Recycle a session. If the workspace has changed, deleted it,
* otherwise wash it, and put it up for adoption.
* (Close &) Free or Recycle a session.
*
* If the workspace has changed, deleted it, otherwise wash it, and put
* it up for adoption.
*
* XXX: We should also check nhttp
*/
void
SES_Delete(struct sess *sp, const char *reason)
{
struct acct *b = &sp->acct_ses;
struct acct *b;
struct sessmem *sm;
static char noaddr[] = "-";
struct worker *wrk;
struct sesspool *pp;
......@@ -294,6 +296,9 @@ SES_Delete(struct sess *sp, const char *reason)
CHECK_OBJ_NOTNULL(sm, SESSMEM_MAGIC);
pp = sm->pool;
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
wrk = sp->wrk;
CHECK_OBJ_ORNULL(wrk, WORKER_MAGIC);
if (reason != NULL)
SES_Close(sp, reason);
......@@ -301,54 +306,55 @@ SES_Delete(struct sess *sp, const char *reason)
AZ(sp->obj);
AZ(sp->vcl);
VSC_C_main->n_sess--; /* XXX: locking ? */
assert(!isnan(b->first));
assert(!isnan(sp->t_end));
if (sp->addr == NULL)
sp->addr = noaddr;
if (sp->port == NULL)
sp->port = noaddr;
b = &sp->acct_ses;
assert(!isnan(b->first));
assert(!isnan(sp->t_end));
VSL(SLT_StatSess, sp->id, "%s %s %.0f %ju %ju %ju %ju %ju %ju %ju",
sp->addr, sp->port, sp->t_end - b->first,
b->sess, b->req, b->pipe, b->pass,
b->fetch, b->hdrbytes, b->bodybytes);
if (sm->workspace != params->sess_workspace) {
Lck_Lock(&stat_mtx);
VSC_C_main->n_sess_mem--;
Lck_Unlock(&stat_mtx);
if (sm->workspace != params->sess_workspace ||
pp->nsess > params->max_sess) {
free(sm);
Lck_Lock(&pp->mtx);
if (wrk != NULL)
wrk->stats.c_sessmem_free++;
else
pp->dly_free_cnt++;
pp->nsess--;
Lck_Unlock(&pp->mtx);
} else {
/* Clean and prepare for reuse */
ses_setup(sm);
Lck_Lock(&pp->mtx);
if (wrk != NULL) {
wrk->stats.c_sessmem_free += pp->dly_free_cnt;
pp->dly_free_cnt = 0;
}
VTAILQ_INSERT_HEAD(&pp->freelist, sm, list);
Lck_Unlock(&pp->mtx);
}
}
/*--------------------------------------------------------------------*/
/*--------------------------------------------------------------------
* Create a new pool to allocate from
*/
static struct sesspool *
SES_NewPool(unsigned maxsess)
struct sesspool *
SES_NewPool(void)
{
struct sesspool *sp;
ALLOC_OBJ(sp, SESSPOOL_MAGIC);
AN(sp);
VTAILQ_INIT(&sp->freelist);
Lck_New(&sp->mtx, lck_sessmem);
sp->maxsess = maxsess;
return (sp);
}
void
SES_Init()
{
sesspool = SES_NewPool(params->max_sess);
Lck_New(&stat_mtx, lck_stat);
}
......@@ -165,6 +165,7 @@ wrk_thread_real(void *priv, unsigned shm_workspace, unsigned sess_workspace,
w->bereq = HTTP_create(http0, nhttp);
w->beresp = HTTP_create(http1, nhttp);
w->resp = HTTP_create(http2, nhttp);
w->acceptsock = -1;
w->wrw.iov = iov;
w->wrw.siov = siov;
w->wrw.ciov = siov;
......
......@@ -36,7 +36,6 @@ LOCK(hsl)
LOCK(hcb)
LOCK(hcl)
LOCK(vcl)
LOCK(stat)
LOCK(sessmem)
LOCK(wstat)
LOCK(herder)
......
......@@ -241,7 +241,6 @@ open_sockets(void)
* closes before we call accept(2) and nobody else are in
* the listen queue to release us.
*/
(void)VTCP_nonblocking(ls->sock);
(void)VTCP_filter_http(ls->sock);
good++;
}
......
......@@ -41,7 +41,7 @@
#ifdef VSC_DO_MAIN
VSC_F(client_conn, uint64_t, 0, 'a', "Client connections accepted")
VSC_F(client_conn, uint64_t, 1, 'a', "Client connections accepted")
VSC_F(client_drop, uint64_t, 0, 'a',
"Connection dropped, no sess/wrk")
VSC_F(client_req, uint64_t, 1, 'a', "Client requests received")
......@@ -72,6 +72,12 @@ VSC_F(fetch_1xx, uint64_t, 1, 'a', "Fetch no body (1xx)")
VSC_F(fetch_204, uint64_t, 1, 'a', "Fetch no body (204)")
VSC_F(fetch_304, uint64_t, 1, 'a', "Fetch no body (304)")
/* Sessmem cache_session.c */
VSC_F(g_sessmem_size, uint64_t, 1, 'i', "Session mem size")
VSC_F(c_sessmem_alloc, uint64_t, 1, 'a', "Session mem allocated")
VSC_F(c_sessmem_free, uint64_t, 1, 'a', "Session mem freed")
VSC_F(c_sessmem_fail, uint64_t, 1, 'a', "Session mem alloc failed")
VSC_F(c_sessmem_limit, uint64_t, 1, 'a', "Session mem alloc limited")
VSC_F(n_sess_mem, uint64_t, 0, 'i', "N struct sess_mem")
VSC_F(n_sess, uint64_t, 0, 'i', "N struct sess")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment