Commit f94ebe2f authored by Geoff Simmons's avatar Geoff Simmons

v0.5: Added transport via MQ, revamped configuration, config files can be used

parent ce950159
......@@ -17,8 +17,10 @@ data sets to the processor
$ [[-n varnish_logfile] | [-f varnishlog_outputfile]]
[-v varnish_prefix] [-r max_restarts] [-u processor_url] [-d]
$ [-c config_file]
[-t [MQ|HTTP]] [-u http_url] [-m mq_url] [-q queue]
[[-n varnish_logfile] | [-f varnishlog_outputfile]]
[-v varnish_prefix] [-r max_restarts] [-d]
[-l logfile] [-p pidfile] [-s status_interval]
[--help] [--version]
......@@ -26,70 +28,119 @@ data sets to the processor
C<> starts an instance of C<varnishlog>, parses its
output for tags relevant to tracking, and sends complete data records
to the processor via HTTP.
to the processor via HTTP or a message queue.
By default, C<> reads its configuration from
C</etc/trackrdr.conf>. That configuration may be overriden by a file
given with the C<-c> option, and individual values may be specified by
other command line options.
The format of a config file is:
# Test configuration for the varnish log tracking reader
transport = mq
# mq.url = stomp://
varnish.prefix = /usr/local
log = /tmp/trackrdr.log
There is a command line option corresponding to each configuration
variable, so these are all documented in the section L</OPTIONS>.
=head1 OPTIONS
No command line options are required, defaults are described in the
A choice for B<-t/transport> is B<required>. No command line options
are required, defaults are described in the following.
=item B<-t [HTTP|MQ]>
Config variable B<transport>. Whether HTTP or a message queue is used
to submit data to the processor (no default).
=item B<-c config_file>
A configuration file for C<>. If C</etc/trackrdr.conf>
exists, then the configuration is read from there first. Values in the
file given with B<-c> may override values from C</etc/trackrdr.conf>,
and values given with command line options may in turn override values
from any config file.
=item B<-u processor_url>
Config variable B<http.url>. URL of the processor application, to
which data records are submitted if HTTP transport is chosen. Default:
=item B<-m mq_url>
Config variable B<mq.url>. URL used to connect to a message queue
broker, if MQ transport is chosen. Default: C<stomp://>
=item B<-q queue>
Config variable B<queue>. Name of the queue to which messages are
sent, if MQ transport is chosen. Default: C<lhotse/tracking/rdr2proc>
=item B<-n varnish_logfile>
The "varnish name" indicating the mmap'd log file used by C<varnishd>
and C<varnishlog>, used for the C<-n> option to start
C<varnishlog>. By default, C<varnishlog> is started without an C<-n>
option (so the default for C<varnishlog> holds).
Config variable B<>. The "varnish name" indicating the
mmap'd log file used by C<varnishd> and C<varnishlog>, used for the
C<-n> option to start C<varnishlog>. By default, C<varnishlog> is
started without an C<-n> option (so the default for C<varnishlog>
=item B<-f varnishlog_outputfile>
Path of a file created by redirecting standard output of
C<varnishlog>, useful for debugging purposes. The options B<-n> and
B<-f> are mutually exclusive. By default, the default choice for B<-n>
is assumed (read the SHM log at the default location for
Config variable B<varnishlog.dump>. Path of a file created by
redirecting standard output of C<varnishlog>, useful for debugging
purposes. If specified, C<> reads from this file, instead
of reading live C<varnishlog> output. The options B<-n> and B<-f> are
mutually exclusive. By default, the default choice for B<-n> is
assumed (read the SHM log at the default location for C<varnishlog>).
=item B<-v varnish_prefix>
Installation directory for varnish, default: C</var/opt/varnish>
Config variable B<varnish.prefix>. Installation directory for varnish,
default: C</var/opt/varnish>
=item B<-r max_restarts>
Maximum number of restarts for the child process, or 0 for unlimited,
default 0
Config variable B<restarts>. Maximum number of restarts for the child
process, or 0 for unlimited, default 0
=item B<-d>
Switches on debug mode, off by default
=item B<-u processor_url>
URL of the processor application, to which data records are submitted.
Default: C<http://localhost/ts-processor/httpProcess>
Config variable B<debug>. Switches on debug mode, off by default. (In
a config file, set B<debug> to B<true> or B<false>.)
=item B<-l logfile>
Log file for status, warning, debug and error messages. By default,
status and debug messages are written to C<STDOUT>, and warnings and
error messages are written to C<STDERR>.
Config variable B<log>. Log file for status, warning, debug and error
messages. By default, status and debug messages are written to
C<STDOUT>, and warnings and error messages are written to C<STDERR>.
=item B<-p pidfile>
File in which the process ID of the parent process is stored. To stop
the script, it suffices to send a C<TERM> signal to that process
(e.g. with the C<kill> command); the parent process stops all of its
child processes before exiting. Default: C</var/run/>
Config variable B<pid.file>. File in which the process ID of the
parent process is stored. To stop the script, it suffices to send a
C<TERM> signal to that process (e.g. with the C<kill> command); the
parent process stops all of its child processes before
exiting. Default: C</var/run/>
=item B<-s status_interval>
The minimum number of seconds between status output to the log,
reporting interbal statistics (such as completed records read,
currently open records, etc.).
Config variable B<monitor.interval>. The minimum number of seconds
between status output to the log, reporting internal statistics (such
as completed records read, currently open records, etc.). Default: 30
=item B<-o processor_logfile>
Log file to contain the contents of all POST requests to the processor, for debugging purposes. By default no processor log file is written.
Config variable B<processor.log>. Log file to contain the contents of
all POST requests to the processor, for debugging purposes. By default
no processor log file is written.
=item B<--help>
......@@ -152,40 +203,20 @@ use threads::shared;
use LWP::UserAgent;
use LWP::ConnCache;
use Net::STOMP::Client;
use Net::STOMP::Client::Error;
use POSIX qw(setsid);
use FileHandle;
use Getopt::Std;
use Pod::Usage;
$main::VERSION = "0.4";
$main::VERSION = "0.5";
pod2usage(-exit => 0, -verbose => 1);
my %opts;
getopts("dn:v:r:u:f:l:p:s:o:", \%opts);
# 0 to run forever
my $MAX_RESTARTS = $opts{r} || 0;
my $DEBUG = $opts{d} || 0;
my $LOGFILE = $opts{l};
my $VLOGFILE = $opts{f};
my $PROCLOGFILE = $opts{o};
my @SHMTAGS = qw(ReqStart VCL_Log ReqEnd);
my $VARNISH_PRE = $opts{v} || '/var/opt/varnish';
# my $VARNISHLOG_CMD = "/usr/bin/stdbuf -o100M $VARNISH_PRE/bin/varnishlog -i ".join(',', @SHMTAGS);
my $VARNISHLOG_CMD = "$VARNISH_PRE/bin/varnishlog -i ".join(',', @SHMTAGS);
$VARNISHLOG_CMD = "$VARNISHLOG_CMD -n $opts{n}" if $opts{n};
my $PROC_URL = $opts{u} || 'http://localhost/ts-processor/httpProcess';
my $PIDFILE = $opts{p} || '/var/run/';
my $SLEEP = $opts{s} || 30;
# be prepared to start with SMF
use constant {
......@@ -196,29 +227,150 @@ use constant {
use constant (defaultConfig => '/etc/trackrdr.conf');
my %config = (
'transport' => '',
'mq.url' => 'stomp://',
'http.url' => 'http://localhost/ts-processor/httpProcess',
'queue' => 'lhotse/tracking/rdr2proc',
'restarts' => 0,
'debug' => 0,
'log' => '',
'varnishlog.dump' => '',
'processor.log' => '',
'' => '',
'varnish.prefix' => '/var/opt/varnish',
'pid.file' => '/var/run/',
'monitor.interval' => 30,
sub readConfig {
my ($file, $config) = @_;
if (! -r $file) {
die "$file not readable\n";
my $fh = new FileHandle $file;
if (! defined $fh) {
my $err = $!;
die "Cannot open $file: $err\n";
while (<$fh>) {
next unless $_;
my @kv = split /\s*=\s*/;
if ($#kv != 1) {
die "Cannot parse $file line $.: $_\n";
if (!defined($config->{$kv[0]})) {
die "Unknown config param $kv[0] ($file line $.)\n";
$config->{$kv[0]} = $kv[1];
if (-e defaultConfig) {
readConfig(defaultConfig, \%config);
my %opts;
getopts("dn:v:r:u:f:l:p:s:o:c:m:t:q:", \%opts);
if ($opts{c}) {
if (!-e $opts{c}) {
die "$opts{c} not found\n";
readConfig($opts{c}, \%config);
$config{log} = $opts{l} if $opts{l};
$config{'varnishlog.dump'} = $opts{f} if $opts{f};
$config{'processor.log'} = $opts{o} if $opts{o};
$config{'varnish.prefix'} = $opts{v} if $opts{v};
$config{''} = $opts{n} if $opts{n};
$config{'http.url'} = $opts{u} if $opts{u};
$config{'pid.file'} = $opts{p} if $opts{p};
$config{'monitor.interval'} = $opts{s} if $opts{s};
$config{'mq.url'} = $opts{m} if $opts{m};
$config{'transport'} = $opts{t} if $opts{t};
$config{'queue'} = $opts{q} if $opts{q};
if (!$config{transport} or $config{transport} !~ /^(mq|http)$/i) {
die "transport/-t must either MQ or HTTP\n";
unless ($config{lc($config{transport}).'.url'}) {
die "No URL configured for transport $config{transport}\n";
if ($config{transport} =~ /mq/i and not $config{queue}) {
die "No queue configured for transport MQ\n";
if ($config{''} and $config{'varnishlog.dump'}) {
die "Configure either or varnishlog.dump/-f\n";
# 0 to run forever
$config{restarts} = $opts{r} if $opts{r};
if ($config{restarts} !~ /^\d+$/) {
die "restarts/-r must be numeric ($config{restarts})\n";
$config{debug} = $opts{d} if $opts{d};
if ($config{debug}) {
if ($config{debug} !~ /^(true|false|on|off|yes|no|0|1)$/i) {
die "debug/-d must be boolean ($config{debug})\n";
$config{debug} = $config{debug} =~ /^(true|on|yes|1)$/i;
my @SHMTAGS = qw(ReqStart VCL_Log ReqEnd);
= $config{'varnish.prefix'}."/bin/varnishlog -i ".join(',', @SHMTAGS);
$VARNISHLOG_CMD .= '-n '.$config{''} if $config{''};
use constant {
DEBUG => 0,
NOTICE => 1,
WARN => 2,
FATAL => 3,
ERROR => 3,
FATAL => 4,
my @logtag = ("DEBUG", "NOTICE", "WARN", "FATAL");
my @logtag = ("DEBUG", "NOTICE", "WARN", "ERROR", "FATAL");
my $PIDFH = new FileHandle "> $PIDFILE";
my $PIDFH = new FileHandle ">".$config{'pid.file'};
unless (defined $PIDFH) {
my $err = $!;
die "Cannot open pidfile $PIDFILE: $err\n";
die "Cannot open pidfile ".$config{'pid.file'}.": $err\n";
my $LOGFH;
if ($LOGFILE) {
$LOGFH = new FileHandle ">$LOGFILE";
if ($config{log}) {
$LOGFH = new FileHandle ">$config{log}";
unless (defined $LOGFH) {
my $err = $!;
die "Cannot open $LOGFILE: $err\n";
die "Cannot open $config{log}: $err\n";
else {
......@@ -226,12 +378,12 @@ else {
$PROCLOGFH = new FileHandle ">$PROCLOGFILE";
if ($config{'processor.log'}) {
$PROCLOGFH = new FileHandle ">".$config{'processor.log'};
unless (defined $PROCLOGFH) {
my $err = $!;
die "Cannot open $PROCLOGFILE: $err\n";
die "Cannot open ".$config{'processor.log'}.": $err\n";
......@@ -239,7 +391,7 @@ if ($PROCLOGFILE) {
sub logg {
my ($level, @args) = @_;
return if ($level == DEBUG and !$DEBUG);
return if ($level == DEBUG and !$config{debug});
print $LOGFH "[", scalar(localtime), "] $logtag[$level]: @args\n";
......@@ -259,6 +411,12 @@ sub logflush {
$SIG{__WARN__} = sub { logg (WARN, @_); };
$SIG{__DIE__} = sub { logg (FATAL, @_); die "@_\n"; };
if ($config{debug}) {
for (keys(%config)) {
logg(DEBUG, "$_ = $config{$_}");
our %pids;
our $parent_pid;
our $initial_pid = $$;
......@@ -276,13 +434,76 @@ sub statusThread {
logg(NOTICE, "Monitoring thread starting: tid =", threads->tid());
while(!$quit) {
logg(NOTICE, "$records records submitted,",
"$open records open, $dubious records dubious");
logg(NOTICE, "$records records submitted, $open open, ",
"$dubious dubious");
logg(NOTICE, "Monitor thread exiting");
sub prepHTTP {
my $connect = shift;
$connect->{ua} = new LWP::UserAgent(
agent => "Track Reader Prototype $main::VERSION",
conn_cache => LWP::ConnCache->new(),
sub submitHTTP {
my ($connect, $data) = @_;
my $ua = $connect->{ua};
my $resp = $ua->post($connect->{url}, Content => $data);
if ($resp->code != RC_NO_CONTENT) {
logg(ERROR, "Processor error: ", $resp->status_line());
print $PROCLOGFH '[', scalar(localtime), "] $data ", $resp->code, "\n";
sub prepMQ {
my $connect = shift;
$connect->{mq} = Net::STOMP::Client->new(uri => $config{'mq.url'});
no warnings 'once';
$Net::Stomp::Client::Error::Die = 0;
unless (defined $connect->{mq}->connect()) {
die "Cannot connect to ", $config{'mq.url'},
": $Net::STOMP::Client::Error::Message\n";
logg(NOTICE, "Successfully connected to ", $config{'mq.url'});
sub submitMQ {
my ($connect, $data) = @_;
my $mq = $connect->{mq};
my $status = $mq->send(destination => $connect->{queue}, body => $data);
unless (defined $status) {
logg(ERROR, "Cannot send message to queue ", $connect->{queue},
": $Net::STOMP::Client::Error::Message\n");
print $PROCLOGFH '[', scalar(localtime), "] $data ",
defined $status ? "success" : "FAIL", "\n";
our %connect;
if ($config{transport} =~ /mq/i) {
$connect{url} = $config{'mq.url'};
$connect{queue} = $config{'queue'};
$connect{prep} = \&prepMQ;
$connect{submit} = \&submitMQ;
else {
$connect{url} = $config{'http.url'};
$connect{prep} = \&prepHTTP;
$connect{submit} = \&submitHTTP;
sub fork_varnishlog {
my $f = fork;
if (! defined($f)) {
......@@ -299,25 +520,22 @@ sub fork_varnishlog {
sub run_varnishlog {
my $ua = new LWP::UserAgent(
agent => "Track Reader Prototype $main::VERSION",
conn_cache => LWP::ConnCache->new(),
$records = 0;
$quit = 0;
# Prepare MQ or HTTP transport
while (1) {
logg(DEBUG, "varnishlog=$VARNISHLOG_CMD");
my $log;
if ($VLOGFILE) {
logg(DEBUG, "logfile=$VLOGFILE");
unless (open($log, $VLOGFILE)) {
if ($config{'varnishlog.dump'}) {
logg(DEBUG, "logfile=".$config{'varnishlog.dump'});
unless (open($log, $config{'varnishlog.dump'})) {
my $err = $!;
die ("open $VLOGFILE failed: $err\n");
die ("open ".$config{'varnishlog.dump'}." failed: $err\n");
logg(NOTICE, "Starting to read $VLOGFILE");
logg(NOTICE, "Starting to read ".$config{'varnishlog.dump'});
else {
logg(DEBUG, "varnishlog=$VARNISHLOG_CMD");
......@@ -332,11 +550,10 @@ sub run_varnishlog {
my (%record, %dubious_tid);
$open = 0;
$dubious = 0;
#my $laststatus = time();
my $monitor = threads->create(\&statusThread, $SLEEP, $records, $open,
$dubious, $quit);
my $monitor = threads->create(\&statusThread,
unless (defined $monitor) {
logg(WARN, "Monitor thread failed to start");
logg(ERROR, "Monitor thread failed to start");
while(<$log>) {
......@@ -380,15 +597,14 @@ sub run_varnishlog {
my $data = join('&', @{$record{$tid}{data}});
logg(DEBUG, "$records complete records found");
'[', scalar(localtime), "] $data\n";
my $resp = $ua->post($PROC_URL, Content => $data);
if ($resp->code != RC_NO_CONTENT) {
logg(WARN, "Processor error: ",
#if ($PROCLOGFH) {
# print $PROCLOGFH
# '[', scalar(localtime), "] $data\n";
#submitHTTP({url => $config{'http.url'}, ua => $ua},
# $data);
#submitMQ({queue => 'track', mq => $mq}, $data);
&{$connect{submit}}(\%connect, $data);
'DATA: ', join('&', @{$record{$tid}{data}}));
......@@ -419,16 +635,9 @@ sub run_varnishlog {
$open = scalar(keys %record);
$dubious = scalar(keys %dubious_tid);
# if (time() >= $laststatus + $SLEEP) {
# logg(NOTICE, "$records records submitted, ",
# scalar(keys %record), " records open, ",
# scalar(keys %dubious_tid), " records dubious");
# logflush();
# $laststatus = time();
# }
if ($VLOGFILE) {
if ($config{'varnishlog.dump'}) {
kill('TERM', $parent_pid);
logg(NOTICE, "exiting");
......@@ -556,8 +765,8 @@ while(! $term) {
logg(NOTICE, "varnishlog perl reader for pid $pid died: ",
"restart ", ++$restarts);
($restarts > $MAX_RESTARTS)) {
if ($config{restarts} &&
($restarts > $config{restarts})) {
logg(FATAL, "too many restarts: $restarts");
