Commit 939128be authored by Dag Erling Smørgrav's avatar Dag Erling Smørgrav

Reorganize the code a little, and add code to wait for all threads to finish

processing pending messages before we exit.

Note that VSL_Dispatch() will read in log data as fast as it can, so when
working from a log file, varnishreplay will usually read in the entire file
into memory within the first few seconds.


git-svn-id: http://www.varnish-cache.org/svn/trunk/varnish-cache@1623 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent a99655b1
......@@ -32,6 +32,7 @@
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
......@@ -42,32 +43,60 @@
#include "varnishapi.h"
#include "vss.h"
static struct thread {
pthread_t thread_id;
struct mailbox *mbox;
} **threads;
struct mailbox {
pthread_mutex_t lock;
pthread_cond_t has_mail;
STAILQ_HEAD(msgq_head, message) messages;
};
#ifndef HAVE_STRNDUP
#include "compat/strndup.h"
#endif
#define freez(x) do { if (x) free(x); x = NULL; } while (0);
static struct vss_addr *addr_info;
static int debug;
/*
* mailbox toolkit
*/
struct message {
enum shmlogtag tag;
size_t len;
char *ptr;
unsigned len;
STAILQ_ENTRY(message) list;
};
static size_t nthreads;
struct mailbox {
pthread_mutex_t lock;
pthread_cond_t has_mail;
int open;
STAILQ_HEAD(msgq_head, message) messages;
};
static struct vss_addr *adr_info;
static int debug;
static void
mailbox_create(struct mailbox *mbox)
{
STAILQ_INIT(&mbox->messages);
pthread_mutex_init(&mbox->lock, NULL);
pthread_cond_init(&mbox->has_mail, NULL);
mbox->open = 1;
}
static void
mailbox_destroy(struct mailbox *mbox)
{
struct message *msg;
while ((msg = STAILQ_FIRST(&mbox->messages))) {
STAILQ_REMOVE_HEAD(&mbox->messages, list);
free(msg);
}
pthread_cond_destroy(&mbox->has_mail);
pthread_mutex_destroy(&mbox->lock);
}
static void
mailbox_put(struct mailbox *mbox, struct message *msg)
{
pthread_mutex_lock(&mbox->lock);
STAILQ_INSERT_TAIL(&mbox->messages, msg, list);
pthread_cond_signal(&mbox->has_mail);
......@@ -78,15 +107,113 @@ static struct message *
mailbox_get(struct mailbox *mbox)
{
struct message *msg;
pthread_mutex_lock(&mbox->lock);
while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL)
while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL && mbox->open)
pthread_cond_wait(&mbox->has_mail, &mbox->lock);
STAILQ_REMOVE_HEAD(&mbox->messages, list);
if (msg != NULL)
STAILQ_REMOVE_HEAD(&mbox->messages, list);
pthread_mutex_unlock(&mbox->lock);
return msg;
}
static void
mailbox_close(struct mailbox *mbox)
{
pthread_mutex_lock(&mbox->lock);
mbox->open = 0;
pthread_cond_signal(&mbox->has_mail);
pthread_mutex_unlock(&mbox->lock);
}
/*
* thread toolkit
*/
struct thread {
pthread_t thread_id;
struct mailbox mbox;
};
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
static void
thread_log(int lvl, const char *fmt, ...)
{
va_list ap;
if (lvl > debug)
return;
pthread_mutex_lock(&log_mutex);
fprintf(stderr, "%08x ", (unsigned int)pthread_self());
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
pthread_mutex_unlock(&log_mutex);
}
static struct thread **threads;
static size_t nthreads;
static struct thread *
thread_get(int fd, void *(*thread_main)(void *))
{
assert(fd != 0);
if (fd >= nthreads) {
struct thread **newthreads = threads;
size_t newnthreads = nthreads;
while (fd >= newnthreads)
newnthreads += newnthreads + 1;
newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
assert(newthreads != NULL);
memset(newthreads + nthreads, 0,
(newnthreads - nthreads) * sizeof *newthreads);
threads = newthreads;
nthreads = newnthreads;
}
if (threads[fd] == NULL) {
threads[fd] = malloc(sizeof *threads[fd]);
assert(threads[fd] != NULL);
mailbox_create(&threads[fd]->mbox);
if (pthread_create(&threads[fd]->thread_id, NULL,
thread_main, threads[fd]) != 0) {
thread_log(0, "thread creation failed\n");
mailbox_destroy(&threads[fd]->mbox);
freez(threads[fd]);
}
thread_log(1, "thread %08x started\n",
(unsigned int)threads[fd]->thread_id);
}
return (threads[fd]);
}
static void
thread_close(int fd)
{
assert(fd < nthreads);
if (fd == 0) {
for (fd = 1; fd < nthreads; ++fd)
thread_close(fd);
return;
}
if (threads[fd] == NULL)
return;
mailbox_close(&threads[fd]->mbox);
pthread_join(threads[fd]->thread_id, NULL);
thread_log(1, "thread %08x stopped\n",
(unsigned int)threads[fd]->thread_id);
mailbox_destroy(&threads[fd]->mbox);
freez(threads[fd]);
}
/*
* ...
*/
static int
isprefix(const char *str, const char *prefix, const char *end, const char **next)
{
......@@ -159,14 +286,14 @@ init_connection(const char *address)
int i, n;
if (VSS_parse(address, &addr, &port) != 0) {
fprintf(stderr, "Invalid address\n");
thread_log(0, "Invalid address\n");
exit(2);
}
n = VSS_resolve(addr, port, &ta);
free(addr);
free(port);
if (n == 0) {
fprintf(stderr, "Could not connect to server\n");
thread_log(0, "Could not connect to server\n");
exit(2);
}
for (i = 1; i < n; ++i) {
......@@ -200,12 +327,11 @@ read_line(char **line, int sock)
buf = realloc(buf, lbuf);
XXXAN(buf);
}
//fprintf(stderr, "start reading\n");
i = read(sock, buf + nbuf, 1);
if (i <= 0) {
perror("error in reading\n");
thread_log(0, "read(): %s\n", strerror(errno));
free(buf);
exit(1);
return (-1);
}
nbuf += i;
if (nbuf >= 2 && buf[nbuf-2] == '\r' && buf[nbuf-1] == '\n')
......@@ -233,8 +359,8 @@ read_block(int length, int sock)
n = read(sock, buf + nbuf,
(2048 < length - nbuf ? 2048 : length - nbuf));
if (n <= 0) {
perror("failed reading the block\n");
break;
thread_log(0, "failed reading the block\n");
return (-1);
}
nbuf += n;
}
......@@ -282,24 +408,20 @@ receive_response(int sock)
free(line);
}
if (debug)
fprintf(stderr, "status: %d\n", status);
thread_log(1, "status: %d\n", status);
/* Read body */
if (content_length > 0 && !chunked) {
/* Fixed body size, read content_length bytes */
if (debug)
fprintf(stderr, "fixed length\n");
n = read_block(content_length, sock);
if (debug) {
fprintf(stderr, "size of body: %d\n", (int)content_length);
fprintf(stderr, "bytes read: %d\n", n);
}
thread_log(1, "fixed length\n");
thread_log(1, "size of body: %ld\n", content_length);
if ((n = read_block(content_length, sock)) < 0)
return (1);
thread_log(1, "bytes read: %d\n", n);
} else if (chunked) {
/* Chunked encoding, read size and bytes until no more */
if (debug)
fprintf(stderr, "chunked encoding\n");
thread_log(1, "chunked encoding\n");
while (1) {
line_len = read_line(&line, sock);
end = line + line_len;
......@@ -308,10 +430,8 @@ receive_response(int sock)
break;
}
n = read_block(block_len, sock);
if (debug) {
fprintf(stderr, "size of body: %d\n", (int)block_len);
fprintf(stderr, "bytes read: %d\n", n);
}
thread_log(1, "size of body: %d\n", (int)block_len);
thread_log(1, "bytes read: %d\n", n);
free(line);
n = read_line(&line, sock);
free(line);
......@@ -320,29 +440,27 @@ receive_response(int sock)
free(line);
} else if ((content_length <= 0 && !chunked) || req_failed) {
/* No body --> stop reading. */
if (debug)
fprintf(stderr, "no body\n");
thread_log(1, "no body\n");
return (1);
} else {
/* Unhandled case. */
fprintf(stderr, "An error occured\n");
exit(1);
thread_log(0, "An error occured\n");
return (1);
}
if (debug)
fprintf(stderr, "\n");
return close_connection;
}
static void *
pthread_main(void *arg)
replay_thread(void *arg)
{
struct thread *thr = arg;
struct message *msg;
struct thread *th = (struct thread*)arg;
enum shmlogtag tag;
int len;
size_t len;
char *ptr;
const char *end, *next;
char *df_H = NULL; /* %H, Protocol version */
char *df_Host = NULL; /* %{Host}i */
char *df_Uq = NULL; /* %U%q, URL path and query string */
......@@ -351,18 +469,15 @@ pthread_main(void *arg)
int bogus = 0; /* bogus request */
int sock, reopen = 1;
//fprintf(stderr, "thread started\n");
do {
msg = mailbox_get(th->mbox);
while ((msg = mailbox_get(&thr->mbox)) != NULL) {
tag = msg->tag;
len = msg->len;
ptr = msg->ptr;
end = ptr + len;
//fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr);
thread_log(2, "%s(%s)\n", VSL_tags[tag], msg->ptr);
switch (tag) {
case SLT_RxRequest:
if (df_m != NULL)
......@@ -398,10 +513,10 @@ pthread_main(void *arg)
if (tag != SLT_ReqEnd)
continue;
//fprintf(stderr, "bogus: %d %s\n", bogus, df_m);
if (!bogus) {
if (bogus) {
thread_log(1, "bogus\n");
} else {
/* If the method is supported (GET or HEAD), send the request out
* on the socket. If the socket needs reopening, reopen it first.
* When the request is sent, call the function for receiving
......@@ -409,16 +524,11 @@ pthread_main(void *arg)
*/
if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) {
if (reopen)
sock = VSS_connect(adr_info);
sock = VSS_connect(addr_info);
reopen = 0;
if (debug) {
fprintf(stderr, "%s ", df_m);
fprintf(stderr, "%s ", df_Uq);
fprintf(stderr, "%s ", df_H);
fprintf(stderr, "\n");
fprintf(stderr, "Host: ");
}
thread_log(1, "%s %s %s\n", df_m, df_Uq, df_H);
write(sock, df_m, strlen(df_m));
write(sock, " ", 1);
write(sock, df_Uq, strlen(df_Uq));
......@@ -432,16 +542,12 @@ pthread_main(void *arg)
write(sock, "Host: ", 6);
if (df_Host) {
if (debug)
fprintf(stderr, df_Host);
thread_log(1, "Host: %s\n", df_Host);
write(sock, df_Host, strlen(df_Host));
}
if (debug)
fprintf(stderr, "\n");
write(sock, "\r\n", 2);
if (df_c) {
if (debug)
fprintf(stderr, "Connection: %s\n", df_c);
thread_log(1, "Connection: %s\n", df_c);
write(sock, "Connection: ", 12);
write(sock, df_c, strlen(df_c));
write(sock, "\r\n", 2);
......@@ -449,7 +555,7 @@ pthread_main(void *arg)
reopen = 0;
}
if (debug)
fprintf(stderr, "\n");
thread_log(0, "\n");
write(sock, "\r\n", 2);
if (!reopen)
reopen = receive_response(sock);
......@@ -459,16 +565,15 @@ pthread_main(void *arg)
}
/* clean up */
#define freez(x) do { if (x) free(x); x = NULL; } while (0);
freez(msg->ptr);
freez(msg);
freez(df_H);
freez(df_Host);
freez(df_Uq);
freez(df_m);
freez(df_c);
#undef freez
bogus = 0;
} while (1);
}
return (0);
}
......@@ -477,50 +582,25 @@ static int
gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
unsigned len, unsigned spec, const char *ptr)
{
struct thread *thr;
const char *end;
struct message *msg;
int err;
(void)priv;
end = ptr + len;
if (!(spec & VSL_S_CLIENT))
if (fd == 0 || !(spec & VSL_S_CLIENT))
return (0);
//fprintf(stderr, "gen_traffic\n");
if (fd >= nthreads) {
struct thread **newthreads = threads;
size_t newnthreads = nthreads;
while (fd >= newnthreads)
newnthreads += newnthreads + 1;
newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
assert(newthreads != NULL);
memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads);
threads = newthreads;
nthreads = newnthreads;
}
if (threads[fd] == NULL) {
threads[fd] = malloc(sizeof *threads[fd]);
assert(threads[fd] != NULL);
threads[fd]->mbox = malloc(sizeof (struct mailbox));
STAILQ_INIT(&threads[fd]->mbox->messages);
pthread_mutex_init(&threads[fd]->mbox->lock, NULL);
pthread_cond_init(&threads[fd]->mbox->has_mail, NULL);
err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]);
if (err)
fprintf(stderr, "thread creation failed\n");
fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err);
}
thread_log(2, "%d %s\n", fd, VSL_tags[tag]);
thr = thread_get(fd, replay_thread);
msg = malloc(sizeof (struct message));
msg->tag = tag;
msg->ptr = strdup(ptr);
msg->len = len;
mailbox_put(threads[fd]->mbox, msg);
//fprintf(stderr, "message put\n");
msg->ptr = strndup(ptr, len);
mailbox_put(&thr->mbox, msg);
return 0;
}
......@@ -536,14 +616,14 @@ send_test_request(char *file, const char *address)
char buf[2];
char last = ' ';
int sock, reopen = 1;
adr_info = init_connection(address);
sock = VSS_connect(adr_info);
addr_info = init_connection(address);
sock = VSS_connect(addr_info);
while (read(fd, buf, 1)) {
write(sock, buf, 1);
fprintf(stderr, "%s", buf);
thread_log(0, "%s", buf);
if (*buf == '\n' && last == '\n'){
fprintf(stderr, "receive\n");
thread_log(0, "receive\n");
reopen = receive_response(sock);
}
last = *buf;
......@@ -581,7 +661,7 @@ main(int argc, char *argv[])
address = optarg;
break;
case 'D':
debug = 1;
++debug;
break;
case 't':
/* This option is for testing only. The test file must contain
......@@ -609,10 +689,10 @@ main(int argc, char *argv[])
if (VSL_OpenLog(vd, NULL))
exit(1);
adr_info = init_connection(address);
addr_info = init_connection(address);
while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
/* nothing */ ;
thread_close(0);
exit(0);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment