Commit 23d37582 authored by Cecilie Fritzvold's avatar Cecilie Fritzvold

parellising varnishreplay. Work in progress.


git-svn-id: http://www.varnish-cache.org/svn/trunk/varnish-cache@1620 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent ee94594a
......@@ -14,4 +14,6 @@ varnishreplay_CFLAGS = -include config.h
varnishreplay_LDADD = \
$(top_builddir)/lib/libvarnish/libvarnish.la \
$(top_builddir)/lib/libcompat/libcompat.a \
$(top_builddir)/lib/libvarnishapi/libvarnishapi.la
$(top_builddir)/lib/libvarnishapi/libvarnishapi.la \
${PTHREAD_LIBS}
......@@ -30,32 +30,63 @@
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include "libvarnish.h"
#include "queue.h"
#include "varnishapi.h"
#include "vss.h"
static struct request {
char *df_H; /* %H, Protocol version */
char *df_Host; /* %{Host}i */
char *df_Uq; /* %U%q, URL path and query string */
char *df_m; /* %m, Request method*/
char *df_c; /* Connection info (keep-alive, close) */
int bogus; /* bogus request */
} **req;
static size_t nreq;
static struct thread {
pthread_t thread_id;
struct mailbox *mbox;
} **threads;
struct mailbox {
pthread_mutex_t lock;
pthread_cond_t has_mail;
STAILQ_HEAD(msgq_head, message) messages;
};
struct message {
enum shmlogtag tag;
char *ptr;
unsigned len;
STAILQ_ENTRY(message) list;
};
static size_t nthreads;
static struct vss_addr *adr_info;
static int sock;
static int reopen;
static int debug;
static void
mailbox_put(struct mailbox *mbox, struct message *msg)
{
pthread_mutex_lock(&mbox->lock);
STAILQ_INSERT_TAIL(&mbox->messages, msg, list);
pthread_cond_signal(&mbox->has_mail);
pthread_mutex_unlock(&mbox->lock);
}
static struct message *
mailbox_get(struct mailbox *mbox)
{
struct message *msg;
pthread_mutex_lock(&mbox->lock);
while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL)
pthread_cond_wait(&mbox->has_mail, &mbox->lock);
STAILQ_REMOVE_HEAD(&mbox->messages, list);
pthread_mutex_unlock(&mbox->lock);
return msg;
}
static int
isprefix(const char *str, const char *prefix, const char *end, const char **next)
{
......@@ -153,7 +184,7 @@ init_connection(const char *address)
* A line is terminated by \r\n
*/
static int
read_line(char **line)
read_line(char **line, int sock)
{
char *buf;
unsigned nbuf, lbuf;
......@@ -191,7 +222,7 @@ read_line(char **line)
* the number of bytes read.
*/
static int
read_block(int length)
read_block(int length, int sock)
{
char *buf;
int n, nbuf;
......@@ -214,7 +245,7 @@ read_block(int length)
/* Receive the response after sending a request.
*/
static int
receive_response(void)
receive_response(int sock)
{
char *line, *end;
const char *next;
......@@ -229,7 +260,7 @@ receive_response(void)
/* Read header */
while (1) {
line_len = read_line(&line);
line_len = read_line(&line, sock);
end = line + line_len;
if (*line == '\r' && *(line + 1) == '\n') {
......@@ -260,7 +291,7 @@ receive_response(void)
/* Fixed body size, read content_length bytes */
if (debug)
fprintf(stderr, "fixed length\n");
n = read_block(content_length);
n = read_block(content_length, sock);
if (debug) {
fprintf(stderr, "size of body: %d\n", (int)content_length);
fprintf(stderr, "bytes read: %d\n", n);
......@@ -270,22 +301,22 @@ receive_response(void)
if (debug)
fprintf(stderr, "chunked encoding\n");
while (1) {
line_len = read_line(&line);
line_len = read_line(&line, sock);
end = line + line_len;
block_len = strtol(line, &end, 16);
if (block_len == 0) {
break;
}
n = read_block(block_len);
n = read_block(block_len, sock);
if (debug) {
fprintf(stderr, "size of body: %d\n", (int)block_len);
fprintf(stderr, "bytes read: %d\n", n);
}
free(line);
n = read_line(&line);
n = read_line(&line, sock);
free(line);
}
n = read_line(&line);
n = read_line(&line, sock);
free(line);
} else if ((content_length <= 0 && !chunked) || req_failed) {
/* No body --> stop reading. */
......@@ -302,155 +333,198 @@ receive_response(void)
return close_connection;
}
static int
gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
unsigned len, unsigned spec, const char *ptr)
static void *
pthread_main(void *arg)
{
struct message *msg;
struct thread *th = (struct thread*)arg;
enum shmlogtag tag;
int len;
char *ptr;
const char *end, *next;
FILE *fo;
struct request *rp;
end = ptr + len;
if (!(spec & VSL_S_CLIENT))
return (0);
if (fd >= nreq) {
struct request **newreq = req;
size_t newnreq = nreq;
while (fd >= newnreq)
newnreq += newnreq + 1;
newreq = realloc(newreq, newnreq * sizeof *newreq);
assert(newreq != NULL);
memset(newreq + nreq, 0, (newnreq - nreq) * sizeof *newreq);
req = newreq;
nreq = newnreq;
}
if (req[fd] == NULL) {
req[fd] = calloc(sizeof *req[fd], 1);
assert(req[fd] != NULL);
}
rp = req[fd];
switch (tag) {
case SLT_RxRequest:
if (tag == SLT_RxRequest && (spec & VSL_S_BACKEND))
char *df_H = NULL; /* %H, Protocol version */
char *df_Host = NULL; /* %{Host}i */
char *df_Uq = NULL; /* %U%q, URL path and query string */
char *df_m = NULL; /* %m, Request method*/
char *df_c = NULL; /* Connection info (keep-alive, close) */
int bogus = 0; /* bogus request */
int sock, reopen = 1;
//fprintf(stderr, "thread started\n");
do {
msg = mailbox_get(th->mbox);
tag = msg->tag;
len = msg->len;
ptr = msg->ptr;
end = ptr + len;
//fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr);
switch (tag) {
case SLT_RxRequest:
if (df_m != NULL)
bogus = 1;
else
df_m = trimline(ptr, end);
break;
if (rp->df_m != NULL)
rp->bogus = 1;
else
rp->df_m = trimline(ptr, end);
break;
case SLT_RxURL:
if (tag == SLT_RxURL && (spec & VSL_S_BACKEND))
case SLT_RxURL:
if (df_Uq != NULL)
bogus = 1;
else
df_Uq = trimline(ptr, end);
break;
if (rp->df_Uq != NULL)
rp->bogus = 1;
else
rp->df_Uq = trimline(ptr, end);
break;
case SLT_RxProtocol:
if (df_H != NULL)
bogus = 1;
else
df_H = trimline(ptr, end);
break;
case SLT_RxProtocol:
if (tag == SLT_RxProtocol && (spec & VSL_S_BACKEND))
case SLT_RxHeader:
if (isprefix(ptr, "host:", end, &next))
df_Host = trimline(next, end);
if (isprefix(ptr, "connection:", end, &next))
df_c = trimline(next, end);
break;
if (rp->df_H != NULL)
rp->bogus = 1;
else
rp->df_H = trimline(ptr, end);
break;
case SLT_RxHeader:
if (isprefix(ptr, "host:", end, &next))
rp->df_Host = trimline(next, end);
if (isprefix(ptr, "connection:", end, &next))
rp->df_c = trimline(next, end);
break;
default:
break;
}
default:
break;
}
if ((spec & VSL_S_CLIENT) && tag != SLT_ReqEnd)
return (0);
if (tag != SLT_ReqEnd)
continue;
//fprintf(stderr, "bogus: %d %s\n", bogus, df_m);
if (!rp->bogus) {
fo = priv;
if (!bogus) {
/* If the method is supported (GET or HEAD), send the request out
* on the socket. If the socket needs reopening, reopen it first.
* When the request is sent, call the function for receiving
* the answer.
*/
if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) {
if (reopen)
sock = VSS_connect(adr_info);
reopen = 0;
if (debug) {
fprintf(stderr, "%s ", df_m);
fprintf(stderr, "%s ", df_Uq);
fprintf(stderr, "%s ", df_H);
fprintf(stderr, "\n");
fprintf(stderr, "Host: ");
}
write(sock, df_m, strlen(df_m));
write(sock, " ", 1);
write(sock, df_Uq, strlen(df_Uq));
write(sock, " ", 1);
write(sock, df_H, strlen(df_H));
write(sock, " ", 1);
write(sock, "\r\n", 2);
/* If the method is supported (GET or HEAD), send the request out
* on the socket. If the socket needs reopening, reopen it first.
* When the request is sent, call the function for receiving
* the answer.
*/
if (!(strncmp(rp->df_m, "GET", 3) && strncmp(rp->df_m, "HEAD", 4))) {
if (reopen)
sock = VSS_connect(adr_info);
reopen = 0;
if (strncmp(df_H, "HTTP/1.0", 8))
reopen = 1;
if (debug) {
fprintf(fo, "%s ", rp->df_m);
fprintf(fo, "%s ", rp->df_Uq);
fprintf(fo, "%s ", rp->df_H);
fprintf(fo, "\n");
fprintf(fo, "Host: ");
}
write(sock, rp->df_m, strlen(rp->df_m));
write(sock, " ", 1);
write(sock, rp->df_Uq, strlen(rp->df_Uq));
write(sock, " ", 1);
write(sock, rp->df_H, strlen(rp->df_H));
write(sock, " ", 1);
write(sock, "\r\n", 2);
if (strncmp(rp->df_H, "HTTP/1.0", 8))
reopen = 1;
write(sock, "Host: ", 6);
if (rp->df_Host) {
write(sock, "Host: ", 6);
if (df_Host) {
if (debug)
fprintf(stderr, df_Host);
write(sock, df_Host, strlen(df_Host));
}
if (debug)
fprintf(fo, rp->df_Host);
write(sock, rp->df_Host, strlen(rp->df_Host));
}
if (debug)
fprintf(fo, "\n");
write(sock, "\r\n", 2);
if (rp->df_c) {
fprintf(stderr, "\n");
write(sock, "\r\n", 2);
if (df_c) {
if (debug)
fprintf(stderr, "Connection: %s\n", df_c);
write(sock, "Connection: ", 12);
write(sock, df_c, strlen(df_c));
write(sock, "\r\n", 2);
if (isequal(df_c, "keep-alive", df_c + strlen(df_c)))
reopen = 0;
}
if (debug)
fprintf(fo, "Connection: %s\n", rp->df_c);
write(sock, "Connection: ", 12);
write(sock, rp->df_c, strlen(rp->df_c));
fprintf(stderr, "\n");
write(sock, "\r\n", 2);
if (isequal(rp->df_c, "keep-alive", rp->df_c + strlen(rp->df_c)))
reopen = 0;
if (!reopen)
reopen = receive_response(sock);
if (reopen)
close(sock);
}
if (debug)
fprintf(fo, "\n");
write(sock, "\r\n", 2);
if (!reopen)
reopen = receive_response();
if (reopen)
close(sock);
}
}
/* clean up */
/* clean up */
#define freez(x) do { if (x) free(x); x = NULL; } while (0);
freez(rp->df_H);
freez(rp->df_Host);
freez(rp->df_Uq);
freez(rp->df_m);
freez(rp->df_c);
freez(df_H);
freez(df_Host);
freez(df_Uq);
freez(df_m);
freez(df_c);
#undef freez
rp->bogus = 0;
bogus = 0;
} while (1);
return (0);
}
static int
gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
unsigned len, unsigned spec, const char *ptr)
{
const char *end;
struct message *msg;
int err;
(void)priv;
end = ptr + len;
if (!(spec & VSL_S_CLIENT))
return (0);
//fprintf(stderr, "gen_traffic\n");
if (fd >= nthreads) {
struct thread **newthreads = threads;
size_t newnthreads = nthreads;
while (fd >= newnthreads)
newnthreads += newnthreads + 1;
newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
assert(newthreads != NULL);
memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads);
threads = newthreads;
nthreads = newnthreads;
}
if (threads[fd] == NULL) {
threads[fd] = malloc(sizeof *threads[fd]);
assert(threads[fd] != NULL);
threads[fd]->mbox = malloc(sizeof (struct mailbox));
STAILQ_INIT(&threads[fd]->mbox->messages);
pthread_mutex_init(&threads[fd]->mbox->lock, NULL);
pthread_cond_init(&threads[fd]->mbox->has_mail, NULL);
err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]);
if (err)
fprintf(stderr, "thread creation failed\n");
fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err);
}
msg = malloc(sizeof (struct message));
msg->tag = tag;
msg->ptr = strdup(ptr);
msg->len = len;
mailbox_put(threads[fd]->mbox, msg);
//fprintf(stderr, "message put\n");
return 0;
}
/* This function is for testing only, and only sends
* the raw data from the file to the address.
* The receive function is called for each blank line.
......@@ -461,6 +535,8 @@ send_test_request(char *file, const char *address)
int fd = open(file, O_RDONLY);
char buf[2];
char last = ' ';
int sock, reopen = 1;
adr_info = init_connection(address);
sock = VSS_connect(adr_info);
while (read(fd, buf, 1)) {
......@@ -468,7 +544,7 @@ send_test_request(char *file, const char *address)
fprintf(stderr, "%s", buf);
if (*buf == '\n' && last == '\n'){
fprintf(stderr, "receive\n");
reopen = receive_response();
reopen = receive_response(sock);
}
last = *buf;
}
......@@ -491,15 +567,14 @@ main(int argc, char *argv[])
{
int c;
struct VSL_data *vd;
const char *ofn = NULL;
const char *address = NULL;
FILE *of;
char *test_file = NULL;
vd = VSL_New();
debug = 0;
VSL_Arg(vd, 'c', NULL);
while ((c = getopt(argc, argv, "a:Dr:t:")) != -1) {
switch (c) {
case 'a':
......@@ -534,18 +609,10 @@ main(int argc, char *argv[])
if (VSL_OpenLog(vd, NULL))
exit(1);
ofn = "stdout";
of = stdout;
adr_info = init_connection(address);
reopen = 1;
while (VSL_Dispatch(vd, gen_traffic, of) == 0) {
if (fflush(of) != 0) {
perror(ofn);
exit(1);
}
}
while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
/* nothing */ ;
exit(0);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment