Commit b1dd6679 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Add a new mode to server instance 'b0', where it acts like "a normal

webserver" and dispatches a thread for each incomming connection.

This is an example use:

	server s0 {
		loop 10 {
			rxreq
			txresp -body "foo1"
		}
		rxreq
		txresp -hdr "Connection: close" -body "foo1"
		expect_close
	} -dispatch

Each connection will spawn a dynamically created s%d instance
starting with s1, s2 ...

Each of these will respond to 11 requests on the accepted connection,
the last response will have "Connection: close" and they will
expect the other end (varnish) to do that.

The main limitation on using this for bulk traffic tests is the
finite and limited size of the vtc_log which varnishtest will
collect (half a megabyte).

Test b00048 is about as much traffic as will fit in the vtc_log.
parent 51f4c23e
varnishtest "Run a lot of transactions through"
server s0 {
loop 10 {
rxreq
txresp -body "foo1"
}
rxreq
txresp -hdr "Connection: close" -body "foo1"
expect_close
} -dispatch
varnish v1 -arg "-p waiter=poll" -vcl+backend {
sub vcl_recv {
return (pass);
}
sub vcl_backend_fetch {
set bereq.backend = s0;
}
} -start
client c1 {
loop 20 {
txreq -url /c1
rxresp
expect resp.status == 200
}
} -start
client c2 {
loop 20 {
txreq -url /c2
rxresp
expect resp.status == 200
}
} -start
client c3 {
loop 20 {
txreq -url /c3
rxresp
expect resp.status == 200
}
} -start
client c1 -wait
client c2 -wait
client c3 -wait
......@@ -643,6 +643,7 @@ exec_file(const char *fn, const char *script, const char *tmpdir,
init_macro();
init_sema();
init_server();
/* Apply extmacro definitions */
VTAILQ_FOREACH(m, &extmacro_list, list)
......
......@@ -71,6 +71,7 @@ extern int iflg;
extern unsigned vtc_maxdur;
void init_sema(void);
void init_server(void);
int http_process(struct vtclog *vl, const char *spec, int sock, int *sfd);
......
......@@ -54,6 +54,7 @@ struct server {
int depth;
int sock;
int fd;
char listen[256];
char aaddr[32];
char aport[32];
......@@ -61,9 +62,84 @@ struct server {
pthread_t tp;
};
static pthread_mutex_t server_mtx;
static VTAILQ_HEAD(, server) servers =
VTAILQ_HEAD_INITIALIZER(servers);
/**********************************************************************
* Allocate and initialize a server
*/
static struct server *
server_new(const char *name)
{
struct server *s;
AN(name);
ALLOC_OBJ(s, SERVER_MAGIC);
AN(s);
REPLACE(s->name, name);
s->vl = vtc_logopen(s->name);
AN(s->vl);
bprintf(s->listen, "%s", "127.0.0.1 0");
s->repeat = 1;
s->depth = 10;
s->sock = -1;
s->fd = -1;
AZ(pthread_mutex_lock(&server_mtx));
VTAILQ_INSERT_TAIL(&servers, s, list);
AZ(pthread_mutex_unlock(&server_mtx));
return (s);
}
/**********************************************************************
* Clean up a server
*/
static void
server_delete(struct server *s)
{
CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
macro_undef(s->vl, s->name, "addr");
macro_undef(s->vl, s->name, "port");
macro_undef(s->vl, s->name, "sock");
vtc_logclose(s->vl);
free(s->name);
/* XXX: MEMLEAK (?) (VSS ??) */
FREE_OBJ(s);
}
/**********************************************************************
* Server listen
*/
static void
server_listen(struct server *s)
{
const char *err;
CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
if (s->sock >= 0)
VTCP_close(&s->sock);
s->sock = VTCP_listen_on(s->listen, "0", s->depth, &err);
if (err != NULL)
vtc_log(s->vl, 0,
"Server listen address (%s) cannot be resolved: %s",
s->listen, err);
assert(s->sock > 0);
VTCP_myname(s->sock, s->aaddr, sizeof s->aaddr,
s->aport, sizeof s->aport);
macro_def(s->vl, s->name, "addr", "%s", s->aaddr);
macro_def(s->vl, s->name, "port", "%s", s->aport);
macro_def(s->vl, s->name, "sock", "%s %s", s->aaddr, s->aport);
/* Record the actual port, and reuse it on subsequent starts */
bprintf(s->listen, "%s %s", s->aaddr, s->aport);
}
/**********************************************************************
* Server thread
*/
......@@ -83,7 +159,7 @@ server_thread(void *priv)
vl = vtc_logopen(s->name);
vtc_log(vl, 2, "Started on %s %s", s->aaddr, s->aport);
vtc_log(vl, 2, "Started on %s", s->listen);
for (i = 0; i < s->repeat; i++) {
if (s->repeat > 1)
vtc_log(vl, 3, "Iteration %d", i);
......@@ -104,80 +180,92 @@ server_thread(void *priv)
return (NULL);
}
/**********************************************************************
* Allocate and initialize a server
* Start the server thread
*/
static struct server *
server_new(const char *name)
static void
server_start(struct server *s)
{
struct server *s;
AN(name);
ALLOC_OBJ(s, SERVER_MAGIC);
AN(s);
REPLACE(s->name, name);
s->vl = vtc_logopen(name);
AN(s->vl);
if (*s->name != 's')
vtc_log(s->vl, 0, "Server name must start with 's'");
bprintf(s->listen, "%s", "127.0.0.1 0");
s->repeat = 1;
s->depth = 10;
s->sock = -1;
VTAILQ_INSERT_TAIL(&servers, s, list);
return (s);
CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
vtc_log(s->vl, 2, "Starting server");
server_listen(s);
vtc_log(s->vl, 1, "Listen on %s", s->listen);
s->run = 1;
AZ(pthread_create(&s->tp, NULL, server_thread, s));
}
/**********************************************************************
* Clean up a server
*/
static void
server_delete(struct server *s)
static void *
server_dispatch_wrk(void *priv)
{
struct server *s;
struct vtclog *vl;
int j, fd;
CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
macro_undef(s->vl, s->name, "addr");
macro_undef(s->vl, s->name, "port");
macro_undef(s->vl, s->name, "sock");
vtc_logclose(s->vl);
free(s->name);
/* XXX: MEMLEAK (?) (VSS ??) */
FREE_OBJ(s);
CAST_OBJ_NOTNULL(s, priv, SERVER_MAGIC);
assert(s->sock < 0);
vl = vtc_logopen(s->name);
fd = s->fd;
vtc_log(vl, 3, "start with fd %d", fd);
fd = http_process(vl, s->spec, fd, &s->sock);
vtc_log(vl, 3, "shutting fd %d", fd);
j = shutdown(fd, SHUT_WR);
if (!VTCP_Check(j))
vtc_log(vl, 0, "Shutdown failed: %s", strerror(errno));
VTCP_close(&s->fd);
vtc_log(vl, 2, "Ending");
return (NULL);
}
/**********************************************************************
* Start the server thread
*/
static void *
server_dispatch_thread(void *priv)
{
struct server *s, *s2;
int sn = 1, fd;
char snbuf[8];
struct vtclog *vl;
struct sockaddr_storage addr_s;
struct sockaddr *addr;
socklen_t l;
CAST_OBJ_NOTNULL(s, priv, SERVER_MAGIC);
assert(s->sock >= 0);
vl = vtc_logopen(s->name);
vtc_log(vl, 2, "Dispatch started on %s", s->listen);
while (1) {
addr = (void*)&addr_s;
l = sizeof addr_s;
fd = accept(s->sock, addr, &l);
if (fd < 0)
vtc_log(vl, 0, "Accepted failed: %s", strerror(errno));
bprintf(snbuf, "s%d", sn++);
vtc_log(vl, 3, "dispatch fd %d -> %s", fd, snbuf);
s2 = server_new(snbuf);
s2->spec = s->spec;
strcpy(s2->listen, s->listen);
s2->fd = fd;
s2->run = 1;
AZ(pthread_create(&s2->tp, NULL, server_dispatch_wrk, s2));
}
}
static void
server_start(struct server *s)
server_dispatch(struct server *s)
{
const char *err;
CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
vtc_log(s->vl, 2, "Starting server");
if (s->sock < 0) {
s->sock = VTCP_listen_on(s->listen, "0", s->depth, &err);
if (err != NULL)
vtc_log(s->vl, 0,
"Server listen address (%s) cannot be resolved: %s",
s->listen, err);
assert(s->sock > 0);
VTCP_myname(s->sock, s->aaddr, sizeof s->aaddr,
s->aport, sizeof s->aport);
macro_def(s->vl, s->name, "addr", "%s", s->aaddr);
macro_def(s->vl, s->name, "port", "%s", s->aport);
macro_def(s->vl, s->name, "sock", "%s %s", s->aaddr, s->aport);
/* Record the actual port, and reuse it on subsequent starts */
bprintf(s->listen, "%s %s", s->aaddr, s->aport);
}
vtc_log(s->vl, 1, "Listen on %s", s->listen);
server_listen(s);
vtc_log(s->vl, 2, "Starting dispatch server");
s->run = 1;
AZ(pthread_create(&s->tp, NULL, server_thread, s));
AZ(pthread_create(&s->tp, NULL, server_dispatch_thread, s));
}
/**********************************************************************
......@@ -207,7 +295,7 @@ server_wait(struct server *s)
void *res;
CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
vtc_log(s->vl, 2, "Waiting for server");
vtc_log(s->vl, 2, "Waiting for server (%d/%d)", s->sock, s->fd);
AZ(pthread_join(s->tp, &res));
if (res != NULL && !vtc_stop)
vtc_log(s->vl, 0, "Server returned \"%p\"",
......@@ -225,11 +313,13 @@ cmd_server_genvcl(struct vsb *vsb)
{
struct server *s;
AZ(pthread_mutex_lock(&server_mtx));
VTAILQ_FOREACH(s, &servers, list) {
VSB_printf(vsb,
"backend %s { .host = \"%s\"; .port = \"%s\"; }\n",
s->name, s->aaddr, s->aport);
}
AZ(pthread_mutex_unlock(&server_mtx));
}
......@@ -240,7 +330,7 @@ cmd_server_genvcl(struct vsb *vsb)
void
cmd_server(CMD_ARGS)
{
struct server *s, *s2;
struct server *s;
(void)priv;
(void)cmd;
......@@ -248,9 +338,15 @@ cmd_server(CMD_ARGS)
if (av == NULL) {
/* Reset and free */
VTAILQ_FOREACH_SAFE(s, &servers, list, s2) {
CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
VTAILQ_REMOVE(&servers, s, list);
while (1) {
AZ(pthread_mutex_lock(&server_mtx));
s = VTAILQ_FIRST(&servers);
CHECK_OBJ_ORNULL(s, SERVER_MAGIC);
if (s != NULL)
VTAILQ_REMOVE(&servers, s, list);
AZ(pthread_mutex_unlock(&server_mtx));
if (s == NULL)
break;
if (s->run) {
(void)pthread_cancel(s->tp);
server_wait(s);
......@@ -265,9 +361,17 @@ cmd_server(CMD_ARGS)
AZ(strcmp(av[0], "server"));
av++;
if (*av[0] != 's') {
fprintf(stderr, "Server name must start with 's' (is: %s)\n",
av[0]);
exit(1);
}
AZ(pthread_mutex_lock(&server_mtx));
VTAILQ_FOREACH(s, &servers, list)
if (!strcmp(s->name, av[0]))
break;
AZ(pthread_mutex_unlock(&server_mtx));
if (s == NULL)
s = server_new(av[0]);
CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
......@@ -312,8 +416,23 @@ cmd_server(CMD_ARGS)
server_start(s);
continue;
}
if (!strcmp(*av, "-dispatch")) {
if (strcmp(s->name, "s0")) {
fprintf(stderr,
"server -parallel only works on s0\n");
exit(1);
}
server_dispatch(s);
continue;
}
if (**av == '-')
vtc_log(s->vl, 0, "Unknown server argument: %s", *av);
s->spec = *av;
}
}
void
init_server(void)
{
AZ(pthread_mutex_init(&server_mtx, NULL));
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment