Commit 15ec79cd authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Concentrate the acceptor-setup-session code.

parent 684467d9
......@@ -311,20 +311,6 @@ struct vrt_privs {
VTAILQ_HEAD(,vrt_priv) privs;
};
/*--------------------------------------------------------------------*/
struct wrk_accept {
unsigned magic;
#define WRK_ACCEPT_MAGIC 0x8c4b4d59
/* Accept stuff */
struct sockaddr_storage acceptaddr;
socklen_t acceptaddrlen;
int acceptsock;
struct listen_sock *acceptlsock;
struct sesspool *sesspool;
};
/* Worker pool stuff -------------------------------------------------*/
typedef void task_func_t(struct worker *wrk, void *priv);
......@@ -704,8 +690,6 @@ struct sess {
/* cache_acceptor.c */
void VCA_Init(void);
void VCA_Shutdown(void);
const char *VCA_SetupSess(struct worker *w, struct sess *sp);
void VCA_FailSess(struct worker *w);
void VCA_New_SessPool(struct pool *pp, struct sesspool *sp);
/* cache_backend_cfg.c */
......@@ -1007,6 +991,7 @@ size_t V1L_Write(const struct worker *w, const void *ptr, ssize_t len);
void VRG_dorange(struct req *req, struct busyobj *bo, const char *r);
/* cache_session.c [SES] */
struct sess *SES_New(struct sesspool *);
void SES_Close(struct sess *sp, enum sess_close reason);
void SES_Wait(struct sess *sp);
void SES_Delete(struct sess *sp, enum sess_close reason, double now);
......@@ -1015,8 +1000,8 @@ void SES_DeletePool(struct sesspool *sp);
int SES_ScheduleReq(struct req *);
struct req *SES_GetReq(const struct worker *, struct sess *);
void SES_ReleaseReq(struct req *);
task_func_t SES_pool_accept_task;
void SES_vsl_socket(struct sess *sp, const char *lsockname);
void SES_sess_pool_task(struct worker *wrk, void *arg);
/* cache_shmlog.c */
extern struct VSC_C_main *VSC_C_main;
......
......@@ -29,11 +29,6 @@
* This source file has the various trickery surrounding the accept/listen
* sockets.
*
* Once the session is allocated we move into it with a call to
* VCA_SetupSess().
*
* If we fail to allocate a session we call VCA_FailSess() to clean up
* and initiate pacing.
*/
#include "config.h"
......@@ -57,6 +52,18 @@ static double vca_pace = 0.0;
static struct lock pace_mtx;
static unsigned pool_accepting;
struct wrk_accept {
unsigned magic;
#define WRK_ACCEPT_MAGIC 0x8c4b4d59
/* Accept stuff */
struct sockaddr_storage acceptaddr;
socklen_t acceptaddrlen;
int acceptsock;
struct listen_sock *acceptlsock;
struct sesspool *sesspool;
};
struct poolsock {
unsigned magic;
#define POOLSOCK_MAGIC 0x1b0a2d38
......@@ -274,45 +281,48 @@ vca_pace_good(void)
}
/*--------------------------------------------------------------------
* Fail a session
* The pool-task for a newly accepted session
*
* This happens if we accept the socket, but cannot get a session
* structure.
*
* We consider this a DoS situation (false positive: Extremely popular
* busy objects) and silently close the connection with minimum effort
* and fuzz, rather than try to send an intelligent message back.
* Called from assigned worker thread
*/
void
VCA_FailSess(struct worker *wrk)
static void __match_proto__(task_func_t)
vca_make_session(struct worker *wrk, void *arg)
{
struct sesspool *pp;
struct sess *sp;
const char *lsockname;
struct wrk_accept *wa;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(wa, (void*)wrk->aws->f, WRK_ACCEPT_MAGIC);
AZ(close(wa->acceptsock));
wrk->stats->sess_drop++;
vca_pace_bad();
WS_Release(wrk->aws, 0);
}
CAST_OBJ_NOTNULL(wa, arg, WRK_ACCEPT_MAGIC);
pp = wa->sesspool;
/*--------------------------------------------------------------------
* We have allocated a session, move our info into it.
*/
/* Turn accepted socket into a session */
AN(wrk->aws->r);
sp = SES_New(pp);
if (sp == NULL) {
/*
* We consider this a DoS situation and silently close the
* connection with minimum effort and fuzz, rather than try
* to send an intelligent message back.
*/
AZ(close(wa->acceptsock));
wrk->stats->sess_drop++;
vca_pace_bad();
WS_Release(wrk->aws, 0);
return;
}
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
wrk->stats->s_sess++;
const char *
VCA_SetupSess(struct worker *wrk, struct sess *sp)
{
struct wrk_accept *wa;
const char *retval;
sp->t_open = VTIM_real();
sp->t_idle = sp->t_open;
sp->vxid = VXID_Get(wrk, VSL_CLIENTMARKER);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
CAST_OBJ_NOTNULL(wa, (void*)wrk->aws->f, WRK_ACCEPT_MAGIC);
sp->fd = wa->acceptsock;
wa->acceptsock = -1;
retval = wa->acceptlsock->name;
lsockname = wa->acceptlsock->name;
assert(wa->acceptaddrlen <= vsa_suckaddr_len);
AN(VSA_Build(sess_remote_addr(sp), &wa->acceptaddr, wa->acceptaddrlen));
vca_pace_good();
......@@ -324,18 +334,19 @@ VCA_SetupSess(struct worker *wrk, struct sess *sp)
need_test = 0;
}
vca_tcp_opt_set(sp->fd, 0);
return (retval);
SES_vsl_socket(sp, lsockname);
wrk->task.func = SES_sess_pool_task;
wrk->task.priv = sp;
}
/*--------------------------------------------------------------------
* Nobody is accepting on this socket, so we do.
* This function accepts on a single socket for a single session pool.
*
* As long as we can stick the accepted connection to another thread
* we do so, otherwise we put the socket back on the "BACK" queue
* we do so, otherwise we put the socket back on the "BACK" pool
* and handle the new connection ourselves.
*
* We store data about the accept in reserved workspace on the reserved
* worker workspace. SES_pool_accept_task() knows about this.
*/
static void __match_proto__(task_func_t)
......@@ -361,8 +372,6 @@ vca_accept_task(struct worker *wrk, void *arg)
wa.sesspool = ps->sesspool;
wa.acceptlsock = ls;
assert(ls->sock > 0); // We know where stdin is
vca_pace_check();
wa.acceptaddrlen = sizeof wa.acceptaddr;
......@@ -392,7 +401,7 @@ vca_accept_task(struct worker *wrk, void *arg)
wa.acceptsock = i;
if (!Pool_Task_Arg(wrk, SES_pool_accept_task, &wa, sizeof wa)) {
if (!Pool_Task_Arg(wrk, vca_make_session, &wa, sizeof wa)) {
AZ(Pool_Task(wrk->pool, &ps->task, POOL_QUEUE_BACK));
return;
}
......@@ -413,7 +422,6 @@ VCA_New_SessPool(struct pool *pp, struct sesspool *sp)
struct poolsock *ps;
VTAILQ_FOREACH(ls, &heritage.socks, list) {
assert(ls->sock > 0); // We know where stdin is
ALLOC_OBJ(ps, POOLSOCK_MAGIC);
AN(ps);
ps->lsock = ls;
......@@ -451,9 +459,9 @@ vca_acct(void *arg)
}
}
need_test = 1;
pool_accepting = 1;
need_test = 1;
t0 = VTIM_real();
while (1) {
(void)sleep(1);
......@@ -502,8 +510,6 @@ ccf_listen_address(struct cli *cli, const char * const *av, void *priv)
(void)usleep(100*1000);
VTAILQ_FOREACH(ls, &heritage.socks, list) {
if (ls->sock < 0)
continue;
VTCP_myname(ls->sock, h, sizeof h, p, sizeof p);
VCLI_Out(cli, "%s %s\n", h, p);
}
......@@ -534,8 +540,6 @@ VCA_Shutdown(void)
int i;
VTAILQ_FOREACH(ls, &heritage.socks, list) {
if (ls->sock < 0)
continue;
i = ls->sock;
ls->sock = -1;
(void)close(i);
......
......@@ -589,7 +589,6 @@ cli_debug_sizeof(struct cli *cli, const char * const *av, void *priv)
SZOF(struct http_conn);
SZOF(struct acct_req);
SZOF(struct worker);
SZOF(struct wrk_accept);
SZOF(struct storage);
SZOF(struct busyobj);
SZOF(struct object);
......
......@@ -70,8 +70,8 @@ struct sesspool {
* workspace
*/
static struct sess *
ses_new(struct sesspool *pp)
struct sess *
SES_New(struct sesspool *pp)
{
struct sess *sp;
unsigned sz;
......@@ -125,8 +125,8 @@ ses_req_pool_task(struct worker *wrk, void *arg)
* Allocate a request + vxid, call ses_req_pool_task()
*/
static void __match_proto__(task_func_t)
ses_sess_pool_task(struct worker *wrk, void *arg)
void __match_proto__(task_func_t)
SES_sess_pool_task(struct worker *wrk, void *arg)
{
struct req *req;
struct sess *sp;
......@@ -153,8 +153,8 @@ ses_sess_pool_task(struct worker *wrk, void *arg)
*
*/
static void
ses_vsl_socket(struct sess *sp, const char *lsockname)
void
SES_vsl_socket(struct sess *sp, const char *lsockname)
{
struct sockaddr_storage ss;
socklen_t sl;
......@@ -184,44 +184,6 @@ ses_vsl_socket(struct sess *sp, const char *lsockname)
sp->t_open, sp->fd);
}
/*--------------------------------------------------------------------
* The pool-task for a newly accepted session
*
* Called from assigned worker thread
*/
void __match_proto__(task_func_t)
SES_pool_accept_task(struct worker *wrk, void *arg)
{
struct sesspool *pp;
struct sess *sp;
const char *lsockname;
struct wrk_accept *wa;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(wa, arg, WRK_ACCEPT_MAGIC);
pp = wa->sesspool;
/* Turn accepted socket into a session */
AN(wrk->aws->r);
sp = ses_new(pp);
if (sp == NULL) {
VCA_FailSess(wrk);
return;
}
wrk->stats->s_sess++;
sp->t_open = VTIM_real();
sp->t_idle = sp->t_open;
sp->vxid = VXID_Get(wrk, VSL_CLIENTMARKER);
lsockname = VCA_SetupSess(wrk, sp);
ses_vsl_socket(sp, lsockname);
wrk->task.func = ses_sess_pool_task;
wrk->task.priv = sp;
}
/*--------------------------------------------------------------------
* Schedule a request back on a work-thread from its sessions pool
*
......@@ -271,7 +233,7 @@ ses_handle(struct waited *wp, enum wait_event ev, double now)
pp = sp->sesspool;
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
AN(pp->pool);
sp->task.func = ses_sess_pool_task;
sp->task.func = SES_sess_pool_task;
sp->task.priv = sp;
if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT))
SES_Delete(sp, SC_OVERLOAD, now);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment