Commit 0c0fbf21 authored by Geoff Simmons's avatar Geoff Simmons

Prototype v0.5.7: Forward-porting v0.4.6 (added the -i option for HTTP

timeout, more detail in monitoring output)
parent 89907fe7
...@@ -18,12 +18,13 @@ data sets to the processor ...@@ -18,12 +18,13 @@ data sets to the processor
=head1 SYNOPSIS =head1 SYNOPSIS
$ trackrdr.pl [-c config_file] $ trackrdr.pl [-c config_file]
[-t [MQ|HTTP]] [-u http_url] [-m mq_url] [-q queue] [-t [MQ|HTTP]] [-u http_url] [-i http_timeout]
[-m mq_url] [-q queue]
[[-n varnish_logfile] | [-f varnishlog_outputfile]] [[-n varnish_logfile] | [-f varnishlog_outputfile]]
[-v varnish_prefix] [-r max_restarts] [-d] [-v varnish_prefix] [-r max_restarts] [-d]
[-l logfile] [-p pidfile] [-s status_interval] [-l logfile] [-p pidfile] [-s status_interval]
[-b [stdbuf_path]:[stdbuf_bufsize]] [-b [stdbuf_path]:[stdbuf_bufsize]]
[--help] [--version] [-d] [--help] [--version]
=head1 DESCRIPTION =head1 DESCRIPTION
...@@ -117,6 +118,11 @@ process, or 0 for unlimited, default 0 ...@@ -117,6 +118,11 @@ process, or 0 for unlimited, default 0
Config variable B<debug>. Switches on debug mode, off by default. (In Config variable B<debug>. Switches on debug mode, off by default. (In
a config file, set B<debug> to B<true> or B<false>.) a config file, set B<debug> to B<true> or B<false>.)
=item B<-i http_timeout>
Timeout in seconds for HTTP connections to the processor. Default 5
seconds
=item B<-l logfile> =item B<-l logfile>
Config variable B<log>. Log file for status, warning, debug and error Config variable B<log>. Log file for status, warning, debug and error
...@@ -227,7 +233,7 @@ use Getopt::Std; ...@@ -227,7 +233,7 @@ use Getopt::Std;
use Pod::Usage; use Pod::Usage;
$Getopt::Std::STANDARD_HELP_VERSION = 1; $Getopt::Std::STANDARD_HELP_VERSION = 1;
$main::VERSION = "0.5.6"; $main::VERSION = "0.5.7";
sub HELP_MESSAGE { sub HELP_MESSAGE {
pod2usage(-exit => 0, -verbose => 1); pod2usage(-exit => 0, -verbose => 1);
...@@ -249,6 +255,7 @@ my %config = ( ...@@ -249,6 +255,7 @@ my %config = (
'transport' => '', 'transport' => '',
'mq.url' => 'stomp://127.0.0.1:61613', 'mq.url' => 'stomp://127.0.0.1:61613',
'http.url' => 'http://localhost/ts-processor/httpProcess', 'http.url' => 'http://localhost/ts-processor/httpProcess',
'http.timeout' => 5,
'queue' => 'lhotse/tracking/rdr2proc', 'queue' => 'lhotse/tracking/rdr2proc',
'restarts' => 0, 'restarts' => 0,
'debug' => 0, 'debug' => 0,
...@@ -302,7 +309,7 @@ if (-e defaultConfig) { ...@@ -302,7 +309,7 @@ if (-e defaultConfig) {
} }
my %opts; my %opts;
getopts("dn:v:r:u:f:l:p:s:o:c:m:t:q:b:z:", \%opts); getopts("dn:v:r:u:f:l:p:s:o:c:m:t:q:b:z:i:", \%opts);
if ($opts{c}) { if ($opts{c}) {
if (!-e $opts{c}) { if (!-e $opts{c}) {
...@@ -318,6 +325,7 @@ $config{'processor.log'} = $opts{o} if $opts{o}; ...@@ -318,6 +325,7 @@ $config{'processor.log'} = $opts{o} if $opts{o};
$config{'varnish.prefix'} = $opts{v} if $opts{v}; $config{'varnish.prefix'} = $opts{v} if $opts{v};
$config{'varnish.name'} = $opts{n} if $opts{n}; $config{'varnish.name'} = $opts{n} if $opts{n};
$config{'http.url'} = $opts{u} if $opts{u}; $config{'http.url'} = $opts{u} if $opts{u};
$config{'http.timeout'} = $opts{i} if $opts{i};
$config{'pid.file'} = $opts{p} if $opts{p}; $config{'pid.file'} = $opts{p} if $opts{p};
$config{'monitor.interval'} = $opts{s} if $opts{s}; $config{'monitor.interval'} = $opts{s} if $opts{s};
$config{'mq.url'} = $opts{m} if $opts{m}; $config{'mq.url'} = $opts{m} if $opts{m};
...@@ -469,15 +477,17 @@ our $records :shared; ...@@ -469,15 +477,17 @@ our $records :shared;
our $quit :shared; our $quit :shared;
our $open :shared; our $open :shared;
our $dubious :shared; our $dubious :shared;
our $failures :shared;
our $submitted :shared;
sub statusThread { sub statusThread {
my ($interval) = @_; my $interval = shift;
logg(NOTICE, "Monitor thread starting: tid =", threads->tid()); logg(NOTICE, "Monitor thread starting: tid =", threads->tid());
while(!$quit) { while(!$quit) {
sleep($interval); sleep($interval);
logg(NOTICE, "$records records submitted, $open open, ", logg(NOTICE, "$records records seen, $submitted submitted,",
"$dubious dubious"); "$open open, $failures failed, $dubious dubious");
logflush(); logflush();
} }
logg(NOTICE, "Monitor thread exiting"); logg(NOTICE, "Monitor thread exiting");
...@@ -489,6 +499,7 @@ sub prepHTTP { ...@@ -489,6 +499,7 @@ sub prepHTTP {
$connect->{ua} = new LWP::UserAgent( $connect->{ua} = new LWP::UserAgent(
agent => "Track Reader Prototype $main::VERSION", agent => "Track Reader Prototype $main::VERSION",
conn_cache => LWP::ConnCache->new(), conn_cache => LWP::ConnCache->new(),
timeout => $config{'http.timeout'},
); );
} }
...@@ -505,6 +516,11 @@ sub submitHTTP { ...@@ -505,6 +516,11 @@ sub submitHTTP {
my $resp = $ua->request($req); my $resp = $ua->request($req);
if ($resp->code != RC_NO_CONTENT) { if ($resp->code != RC_NO_CONTENT) {
logg(ERROR, "Processor error: ", $resp->status_line()); logg(ERROR, "Processor error: ", $resp->status_line());
$failures++;
logflush();
}
else {
$submitted++;
} }
if ($PROCLOGFH) { if ($PROCLOGFH) {
print $PROCLOGFH '[', scalar(localtime), "] $data ", $resp->code, "\n"; print $PROCLOGFH '[', scalar(localtime), "] $data ", $resp->code, "\n";
...@@ -529,9 +545,14 @@ sub submitMQ { ...@@ -529,9 +545,14 @@ sub submitMQ {
my $mq = $connect->{mq}; my $mq = $connect->{mq};
my $status = $mq->send(destination => $connect->{queue}, body => $data); my $status = $mq->send(destination => $connect->{queue}, body => $data);
unless (defined $status) { if (defined $status) {
$submitted++;
}
else {
$failures++;
logg(ERROR, "Cannot send message to queue ", $connect->{queue}, logg(ERROR, "Cannot send message to queue ", $connect->{queue},
": $Net::STOMP::Client::Error::Message\n"); ": $Net::STOMP::Client::Error::Message\n");
logflush();
} }
if ($PROCLOGFH) { if ($PROCLOGFH) {
print $PROCLOGFH '[', scalar(localtime), "] $data ", print $PROCLOGFH '[', scalar(localtime), "] $data ",
...@@ -595,9 +616,7 @@ sub run_varnishlog { ...@@ -595,9 +616,7 @@ sub run_varnishlog {
} }
my (%record, %dubious); my (%record, %dubious);
$open = 0; $open = $dubious = $quit = $failures = $submitted = 0;
$dubious = 0;
$quit = 0;
my $monitor = threads->create(\&statusThread, my $monitor = threads->create(\&statusThread,
$config{'monitor.interval'}); $config{'monitor.interval'});
unless (defined $monitor) { unless (defined $monitor) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment