Commit b887b8ff authored by Olav Kvittem's avatar Olav Kvittem

latest version of qstream-gap-ana

parent 4bd265fb
#!/usr/bin/perl
# use strict;
# use warnings 'all';
# use PDL;
# use PDL::Ops;
# use PDL::Fit::Polynomial;
use Socket;
use Statistics::LineFit;
use Statistics::Basic qw(:all);
use Date::Format;
use Getopt::Long;
use JSON::PP;
#use LWP::Simple;
use DateTime;
# use Math::Int64 qw(uint64 uint64_to_number);
use constant JAN_1970 => 0x83aa7e80;
# use LWP::UserAgent ();
# use LWP::Protocol::https;
# date --date 'jan 1 2000' +%s
$min_tx=946681200;
$max_tx=1893452400; # 2030-01-01
$maxseqreorder=1000; #
$max_small_gap=10; # the max size of a small graph
$max_small_graphs=20;
$max_big_graphs=20;
$late_delay=3; # seconds to doom a packet late
require "newgetopt.pl";
$usage="$0 '[-title text] [-minloss n] [-win n] [-graph file] [max-small-graphs n] [-outdir dir] [-head|-rhead] [-id id] [-names file] [-json file] [-v] [file]...
my $version="0.1.5";
my $min_tx=946681200;
my $max_tx=1893452400; # 2030-01-01
my $maxseqreorder=1000; #
my $max_small_gap=10; # the max size of a small graph
my $max_small_graphs=20;
my $max_big_graphs=20;
my $gap_limit={'big'=>10000, 'small'=>1000, 'tiny'=>100}; # packets
my $max_gaps={'big'=>10000, 'small'=>10000, 'tiny'=>1000}; # count
my $late_delay=3000; # ms to doom a packet late
my %src_adr=(), my %dst_adr=();
my %least_delay; # least delay observed
my %hostname=();
# parms for jitter report
my %n_normal=(); # count of normal packets for modulo
my $min_jit = 10; # jitter in ms
my $min_ddelay = 10;
my $min_slope=0.5;
my $jitter_period=600; # seconds between jitter reports
my %jitter_last=(); # last time jitter reported
my %jitter_values=(); # last jitter values reported
my %mindelay; # minimum delay last slep
my %minseq; # minimum delay last slep
my $usage="$0 '[-title text] [-minloss n] [-win n] [-graph file] [max-small-graphs n] [-outdir dir] [-head|-rhead] [-id id] [-names file] [-json file] [-v] [-version] [-esmond url] [file]...
Analyse gaps in a crude packet log
- output a list of statistical qos parameters as text or json
- make linear regression to see the delay trend around a gap
......@@ -25,19 +57,56 @@ Parameters
-rhead n - output headers so that R can make the headings in the text tables
-max-small-graphs n - limit number(20) of graphs to output from small few packet losses
-slep n - number(1000) of crude lines in the circular buffer
-index Elastic Search index name to use
-owamp url - get json owamp data from pscheduler
-json file - filename to store json event documents (intended for logstash ?)
";
&NGetOpt( 'h', 'help', 'id=s', 'slep=s', 'minloss=s', 'win=s', 'max-small-graphs=s', 'head', 'rhead', 'graph=s', 'outdir=s', 'title=s', 'names=s', 'json=s', 'v') || die "$!" . $usage ."\n";
-jitter secs - emit jitter stats with secs interval
-change_factor - fraction change to cause threshold jitter report (0.3)
-threshold - jitter,ddelay,slope - threshold values for reporting
";
my ($opt_h, $opt_help, $opt_id, $opt_slep, $opt_minloss, $opt_win, $opt_recover, $opt_max_small_graphs, $opt_head, $opt_rhead, $opt_graph, $opt_outdir, $opt_title, $opt_names, $opt_threshold, $opt_owamp, $opt_json, $opt_jitter, $opt_change_factor, $opt_index, $opt_v, $opt_version);
GetOptions( 'h' => \$opt_h, 'help' => \$opt_help, 'id=s' => \$opt_id, 'slep=s' => \$opt_slep,
'minloss=s' => \$opt_minloss, 'win=s' => \$opt_win, 'recover=s' => \$opt_recover,
'max-small-graphs=s' => \$opt_max_small_graphs,
'head' => \$opt_head, 'rhead' => \$opt_rhead, 'graph=s' => \$opt_graph, 'outdir=s' => \$opt_outdir,
'title=s' => \$opt_title, 'names=s' => \$opt_names, 'json=s' => \$opt_json, 'index=s' => \$opt_index,
'jitter=s' => \$opt_jitter, 'threshold' => \$opt_threshold, 'owamp=s' => \$opt_owamp,
'change_factor=s' => \$opt_change_factor, 'v' => \$opt_v, 'version' => \$opt_version )
or die $usage;
# &NGetOpt( 'h', 'help', 'id=s', 'slep=s', 'minloss=s', 'win=s', 'max-small-graphs=s', 'head', 'rhead', 'graph=s', 'outdir=s', 'title=s', 'names=s', 'json=s', 'index=s', 'v', 'version') || die "$!" . $usage ."\n";
if ( $opt_h || $opt_help) {
printf "Version $version\n";
printf "$usage\n";
exit(0);
}
if ( $opt_version) {
printf "Version $version\n";
exit(0);
}
if ($opt_owamp){
require LWP::Simple;
# require perfSONAR_PS::Client::Esmond::ApiConnect;
}
if ($opt_threshold){
my ( $min_jit, $min_dddelay, $min_slope) = split(',', $opt_threshold);
}
my $jitter_delta={'jit'=> $min_jit/2, 'ddelay'=> $min_ddelay/2, 'slope'=>$min_slope/2};
$jitter_period= $opt_jitter if $opt_jitter;
my $change_factor= $opt_change_factor || 0.3;
my $jitter_factor={'jit'=> $change_factor, 'ddelay'=> $change_factor, 'slope'=>$change_factor};
my @heads= qw/id date time tunix x1 nloss tloss seqloss x2 seqtail overlap x3 h_n h_jit h_ddelay h_delay h_min_d h_slope_10 h_slope_20 h_slope_30 h_slope_40 h_slope_50 x4 t_n t_jit t_ddelay t_delay t_min_d t_slope_10 t_slope_20 t_slope_30 t_slope_40 t_slope_50/;
my @heads= qw/id date time tunix x1 nloss tloss seqloss x2 seqtail overlap x3 h_n h_jit h_ddelay h_delay h_min_d h_slope_10 h_slope_20 h_slope_30 h_slope_40 h_slope_50 x4 t_n t_jit t_ddelay t_delay t_min_d t_slope_10 t_slope_20 t_slope_30 t_slope_40 t_slope_50 dTTL/;
if ($opt_rhead){
my @a=split(" ", $head);
@heads=split(" ", $opt_rhead);
my $h="";
foreach $a (@heads ){
$h.='"'.$a.'", ' }
......@@ -49,17 +118,20 @@ if ($opt_rhead){
}
my %hix=(); # hash on name to index in @heads
foreach $i(0..$#heads){
foreach my $i(0..$#heads){
$hix{$heads[$i]}=$i;
}
my $coder; # json coder
if ( $opt_json){
if ( $opt_json || $opt_owamp){
require JSON::XS;
}
if ( $opt_json){
my $json=$opt_json;
open JSON, ">$json" || die "Could not open $json ; $!";
$coder = JSON::XS->new->ascii->pretty->allow_nonref;
$encoder=$coder->canonical([1]);
open JSON, ">>$json" || die "Could not open $json ; $!";
# $coder = JSON::XS->new->ascii->pretty->allow_nonref;
$coder = JSON::XS->new->ascii->allow_nonref;
my $encoder=$coder->canonical([1]); # use of this ?
}
......@@ -74,38 +146,64 @@ if ( $opt_names){
get_names($opt_names);
}
$maxslep=$opt_slep || 1000 ;
$maxhead=$opt_win || 10; # packets to keep before
$maxtail=$opt_win || 10; # packets to keep after
$min_slopes=5; # slopes to report on text report
$minloss= $opt_minloss || 1;
$minrecover = $opt_recover || 5;
$outdir=$opt_outdir || ".";
$title=$opt_title || 'Delay';
$bv_fmt='^([\d]+)\s+([\d\.\:]+)\s+([\d\.]+)\s+([\d\.]+)'; # BV's condensed format for crude
$id= $opt_id || "ukjent" ;
my $maxslep=$opt_slep || 10000 ;
my $maxhead=$opt_win || 10; # packets to keep before
my $maxtail=$opt_win || 10; # packets to keep after
my $min_slopes=5; # slopes to report on text report
my $minloss= $opt_minloss || 1;
my $minrecover = $opt_recover || 5;
my $outdir=$opt_outdir || ".";
my $title=$opt_title || 'Delay';
my $crude_fmt='^ID=(\d+) SEQ=(\d+) SRC=([\w\:\.]+):\d+ DST=([\w\:\.]+):\d+.*Tx=([\d\.]+) .*Rx=([\d\.]+) .*SIZE=(\d+)';
my $bv_fmt='^([\d]+)\s+([\d\.\:]+)\s+([\d\.]+)\s+([\d\.]+)'; # BV's condensed format for crude
my $exp_num='[\d\.e\-]+'; # 2.32831e-10
my $owamp_fmt='^(\d+)\s+(\d+)\s+($exp_num)\s+(\d+)\s+(\d+)\s+($exp_num)\s+(\d+)';
my $id= $opt_id || "ukjent" ;
my %npackets=(); # keep track of all ids
my $print_line;
my %duration; # seconds per id
my %late_n=();
my $t0; # start time for each source
my %last_tx; # last tx seen
my %t0=();
my (%late_sum, %nsmall_gaps, %nbig_gaps);
if ($opt_owamp){
if ( $opt_owamp =~ /^http.*:\/\// ){
read_owamp_curl($opt_owamp);
} else {
read_owamp_file($opt_owamp);
}
} else {
read_crude();
}
foreach $id ( keys %t0 ) {
$duration{$id}=$last_tx{$id}-$t0{$id};
}
read_crude();
if ( $opt_v ){
foreach $id ( sort keys %dupl){
printf STDERR "%-30s %d duplicates\n", $id, $dupl{$id};
}
# foreach $id ( sort keys %dupl){
# printf STDERR "%-30s %d duplicates\n", $id, $dupl{$id};
# }
foreach $id ( sort keys %reorder){
printf STDERR "%-30s %d reordered (%d ppm)\n", $id, $reorder{$id}, $reorder{$id}*10^6/$npackets{$id};
}
# foreach $id ( sort keys %reorder){
# printf STDERR "%-30s %d reordered (%d ppm)\n", $id, $reorder{$id}, $reorder{$id}*10^6/$npackets{$id};
# }
foreach $id ( sort keys %npackets){
foreach $id ( sort keys %npackets){
my $lates=0;
$lates=$late_sum{$id}/$late_n{$id} if $late_n{$id} > 0;
printf STDERR "%-30s lasted %02d:%02d:%02d ( %d seconds ) and has %d small and %d big gaps and lost %.3f small and %.3f big seconds %d resets %d late %d ppm.\n", $id,
printf STDERR "%-30s lasted %02d:%02d:%02d ( %d seconds ) and has %d small and %d big gaps and lost %.3f small and %.3f big seconds, %d resets, %d late n, %.1fs late, %d duplicates, %d reordered, %d ttl changes and downtime %d ppm.\n", $id,
$duration{$id}/3600, $duration{$id}%3600/60, $duration{$id}%60, $duration{$id},
$nsmall_gaps{$id}, $nbig_gaps{$id},
$small_time{$id}, $big_time{$id}, $resets{$id}, $late_n{$id},
$small_time{$id}, $big_time{$id}, $resets{$id}, $late_n{$id},$lates,
$dupl{$id}, $reorder{$id}, $dttl_count{$id},
10**6 * ($small_time{$id} + $big_time{$id}) / $duration{$id}; # ppm
}
printf STDERR "Big gap limit %d packets.\n", $minloss;
......@@ -116,214 +214,593 @@ if ( $opt_v ){
}
foreach $id (keys %small_tx){
print STDERR "ID $id has Tx to small in $small_tx{$id} packets\n";
print STDERR "ID $id has Tx too small in $small_tx{$id} packets\n";
}
if ($opt_json){
emit_summary_json();
close JSON;
}
close JSON if $opt_json;
exit(0);
################################################################################
sub GetOwampDataFromEsmond($$$$$) {
my $url = shift;
my $uri = shift;
my $start_time = shift;
my $end_time = shift;
my $results = shift;
my $filter = new perfSONAR_PS::Client::Esmond::ApiFilters();
$filter->time_start($start_time);
$filter->time_end($end_time);
my $result_client = new perfSONAR_PS::Client::Esmond::ApiConnect(
url => $url,
filters => $filter
);
my $data = $result_client->get_data($uri); # the uri from previous phase
if($result_client->error) {
return($result_client->error);
}
# for each datapoint
foreach my $d (@{$data}){
#print "Time: " . $d->datetime . "\n";
foreach my $hop (@{$d->val}){
#print "ttl=" . $hop->{ttl} . ",query=" . $hop->{query};
if($hop->{success}){
#print ",ip=" . $hop->{ip} . ",rtt=" . $hop->{rtt} . ",mtu=" . $hop->{mtu} . "\n";
$$results{$d->ts}{$hop->{ttl}}{$hop->{ip}} = {
'mtu' => $hop->{mtu},
'rtt' => $hop->{rtt},
};
}else{
if (defined($hop->{error_message})) {
$$results{$d->ts}{$hop->{ttl}}{$hop->{error_message}} = 1;
} else {
$$results{$d->ts}{$hop->{ttl}}{'error'} = 1;
}
}
}
}
return('');
}
################################################################################
#name id dns ip
sub emit_summary_json{
foreach $id ( sort keys %npackets){
my $latems=0;
$latems=$late_sum{$id}/$late_n{$id} if $late_n{$id} > 0;
my $json={
"event_type" => "gapsum",
"lasted" => sprintf ( "%02d:%02d:%02d", $duration{$id}/3600, $duration{$id}%3600/60, $duration{$id}%60 ),
"lasted_sec" => sprintf ( "%.3f", $duration{$id} ) * 1.0,
"small_gaps" => $nsmall_gaps{$id} * 1 || 0,
"big_gaps" => $nbig_gaps{$id} * 1 || 0,
"small_time" => sprintf ( "%.3f", $small_time{$id} ) * 1.0,
"big_time" => sprintf ( "%.3f", $big_time{$id} ) * 1.0,
"resets" => $resets{$id} * 1 || 0,
"late" => $late_n{$id} * 1 || 0,
"late_sec" => sprintf ( "%.3f", $latems/1000 ) * 1.0,
"duplicates" => $dupl{$id} * 1 || 0,
"reordered" => $reorder{$id} * 1 || 0,
"dTTL" => $dttl_count{$id} * 1 || 0,
"least_delay" => sprintf("%.3f", $least_delay{$id}*1000),
"down_ppm" => sprintf ( "%.3f", 10**6 * ($small_time{$id} + $big_time{$id}) / $duration{$id} ) * 1.0, # ppm
"h_jit" => median( $jitter_obs{$id}{"h_jit"} )->query, # ->query to put it in numerical context
"h_ddelay" => median( $jitter_obs{$id}{"h_ddelay"} )->query,
"h_min_d" => median( $jitter_obs{$id}{"h_min_d"} )->query,
"h_delay" => median( $jitter_obs{$id}{"h_delay"} )->query,
"h_slope_10" => median( $jitter_obs{$id}{"h_slope_10"} )->query,
"h_jit_sdv" => stddev( $jitter_obs{$id}{"h_jit"} )->query,
"h_ddelay_sdv" => stddev( $jitter_obs{$id}{"h_ddelay"} )->query,
"h_min_d_sdv" => stddev( $jitter_obs{$id}{"h_min_d"} )->query,
"h_delay_sdv" => stddev( $jitter_obs{$id}{"h_delay"} )->query,
"h_slope_10_sdv" => stddev( $jitter_obs{$id}{"h_slope_10"} )->query
};
foreach $gap_type( keys %$gap_limit ){ # note dropped gaps
$json{"dropped_$gap_type"}=$dropped_gaps{$id}{$gap_type};
}
emit_json( $json, $id, $ptx{$id} );
}
}
sub emit_json{
my ($json, $id, $tunix)=@_;
my $to;
if ( $hostname{$dst_adr{$id}}){
$to=$hostname{$dst_adr{$id}}
} else {
$to=`hostname`; chomp($to);
$hostname{$dst_adr{$id}}=$to;
}
my @gt=gmtime $tunix;
my $datems= sprintf "%s.%03d", strftime("%Y-%m-%dT%T", @gt), int( $tunix * 1000 ) % 1000 ;
my $head= {
"\@date" => $datems,
"timestamp"=> $tunix * 1.0,
"timestamp_zone" => "GMT",
"datetime"=> $datems,
"from"=> $id, "to"=> $to,
"from_adr" => $src_adr{$id},
"to_adr" => $dst_adr{$id}};
# $new{keys %$json} = values %$json;
my $new={ %$head, %$json }; # a reference to hash
print JSON $coder->encode($new ) ."\n" || warn "JSON print failed : $!";
}
################################################################################
sub get_names {
$file=shift;
if ( open NAMES, "<$file"){
if ( open NAMES, "<$file" ){
while(<NAMES>){
next if /\s*#/;
($name, $user, $dns, $ip)=split;
next if /^\s*#/;
my ($name, $user, $dns, $ip)=split;
$hostname{$ip}=$name;
}
close NAMES;
} else {
die "Could not open mp-list $file : $!";
}
}
sub get_name{
my $adr=shift;
my $name;
if ($hostname{$adr}){
$name=$hostname{$adr};
} else {
if ( $hostname = gethostbyaddr(inet_aton($adr), AF_INET) ){
$name=$hostname;
} else {
$name=$adr;
}
$hostname{$adr}=$name;
}
return $name;
}
################################################################################
sub owptime2datetime {
my ($owptime) = @_;
my $tstamp =$owptime;
# $tstamp = uint64_to_number(($tstamp >> 32) & 0xFFFFFFFF);
$tstamp = $owptime / 2 ** 32;
$tstamp -= JAN_1970;
#return DateTime->from_epoch(epoch => $tstamp);
return $tstamp;
}
sub read_owamp_json($){
my $data=shift;
my $r= decode_json($data);
if ( $r && $r->{state} eq 'finished' ){
my $result=$r->{result};
my $raw=$result->{'raw-packets'};
foreach $p (@$raw ){
my $tx=owptime2datetime($p->{'src-ts'});
my $rx=owptime2datetime($p->{'dst-ts'});
my $seq=$p->{'seq-num'};
my ($src, $dst)=('anywhere', 'elsewhere');
$_= sprintf "ID=%s SEQ=%d SRC=%s DST=%s Tx=%.3f Rx=%.3f SIZE=%d HOPLIMIT=%d",
0, $seq, $src, $dst, $tx, $rx, 64, $p->{'ip-ttl'};
analyze_packet($seq, $src, $dst, $tx, $rx);
}
}
}
sub read_owamp_file{
my $file=shift;
open OWAMP, "<$file" || die 'Could not open ' . $file . ' because ' . $!;
local $/=undef;
my $data=<OWAMP>;
read_owamp_json( $data);
}
sub read_owamp_curl{
my $url=shift;
$data=`curl -k -s $url`;
read_owamp_json( $data);
}
sub read_owamp {
my $url=shift;
$ENV{'PERL_LWP_SSL_VERIFY_HOSTNAME'} = 0;
$ENV{PERL_NET_HTTPS_SSL_SOCKET_CLASS} = 'Net::SSL';
my $resp=get($url);
if ( $resp){
read_owamp_json($resp);
} else {
printf 'error getting url:' . $url . " code: " . $!;
}
}
sub read_owamp_ua {
my $url=shift;
$ENV{'PERL_LWP_SSL_VERIFY_HOSTNAME'} = 0;
$ENV{PERL_NET_HTTPS_SSL_SOCKET_CLASS} = 'Net::SSL';
my $ua = LWP::UserAgent->new;
$ua->ssl_opts( verify_hostname => 0); # certs don't verify in ps-land
my $resp = $ua->get($url);
if ( $resp->is_success ){
read_owamp_json( $resp->content);
} else {
print $resp->content;
}
}
sub read_crude {
my $tx; # current transmit time
while(<>){
if ( /crude version 0.9.0/){
if ( $. <= 1 && /^crude version 0.9.0/){
# die "### Versjon med feil i Rx : $_";
}
my $seq;
if ( /crude version/){ # new file restart sequence control
undef %pseq, %selp, %gap_slep;
} elsif ( ( ($rudeid,$seq, $src, $tx, $rx, $size) = /^ID=(\d+) SEQ=(\d+) SRC=([\w\:\.]+):\d+ .*Tx=([\d\.]+) .*Rx=([\d\.]+) .*SIZE=(\d+)/ )
|| ( ($seq, $src, $tx, $rx) = /$bv_fmt/ ) ){
if ( ( my ( $rudeid, $seq, $src, $dst, $tx, $rx, $size) = /$crude_fmt/ ) ) {
next if $size < 50; # just a sanity filter for the packets i harstad-mp that has SIZE=4
analyze_packet($seq, $src, $dst, $tx, $rx);
} elsif ( /crude version/){ # new file restart sequence control
undef %pseq, %slep, %gap_slep, %slep_data, %gap_data;
} elsif ( ( my ($seq, $tx, $ssync, $serr, $rx, $rsync, $rerr, $ttl) = /$owamp_fmt/ )
|| ( my ($seq, $src, $tx, $rx) = /$bv_fmt/ ) ){
analyze_packet($seq, $src, $dst, $tx, $rx);
}
}
}
sub gap_type {
my $gap=shift;
foreach $type ( 'big', 'small', 'tiny'){
return $type if $gap >= $$gap_limit{$type};
}
}
if ( $opt_id){
# use that
} elsif ($hostname{$src}){
$id=$hostname{$src};
} else {
if ( $hostname = gethostbyaddr(inet_aton($src), AF_INET) ){
$id=$hostname{$src}=$hostname;
} else {
$id=$hostname{$src}=$src;
}
}
sub analyze_packet {
my ($seq, $src, $dst, $tx, $rx ) = @_;
if ( $opt_id){
# use existing # $id=opt_id;
} else {
$id=get_name($src);
}
$src_adr{$id}=$src if ! $src_adr{$id} ;
$dst_adr{$id}=$dst if ! $dst_adr{$id} ;
$dst_name{$id}=get_name($dst);
if ($tx < $min_tx || $tx > $max_tx){
$small_tx{$id}++;
next; #########################
}
if ($tx < $min_tx || $tx > $max_tx){
$small_tx{$id}++;
next; #########################
}
$last_tx{$id}=$tx;
$npackets{$id}++;
my $dt=0;
my $bufferit=1;
if (defined($pseq{$id})){
my $dseq=$seq - $pseq{$id};
$dt=$rx-$tx;
$ids{$id}++;
$least_delay{$id} = $dt if !$least_delay{$id} || $dt < $least_delay{$id};
if ( $tx < $t0{$id}) { # packets from the past
$late_n{$id}++;
$late_sum{$id}+=$dt;
$late_ss{$id}+=$dt*$dt;
$bufferit=0;
} elsif ( $dseq == 1 ){ # normal packet
if ( $ntail_seq{$id} && $ntail_seq{$id} > 0 ){ # is recovering
$ntail_seq{$id}++;
if ( $ntail_seq{$id} > $minrecover && $in_gap{$id} ){
my $gap_type=gap_type($missing);
if ( $n_gaps{$id}{$gap_type}++ <= $$max_gaps{$gap_type} ){
$emit_graph{$id}=1;
$npackets{$id}++;
my $dt=0;
if (defined($pseq{$id})){
my $dseq=$seq - $pseq{$id};
$dt=$rx-$tx;
$ids{$id}++;
if ( $dt > $late_delay ){ # late packet
$late_n{$id}++;
$late_sum{$id}+=$dt;
$late_ss{$id}+=$dt*$dt;
} elsif ( $dseq == 1 ){ # normal packet
if ( $ntail_seq{$id} > 0 ){ # is recovering
$ntail_seq{$id}++;
if ( $ntail_seq{$id} >= $minrecover && $in_gap{$id} ){
$emit_graph{$id}=1;
my $missing= $gap_end_seq{$id} - $head_seq{$id};
if ( $missing <= $max_small_gap ){
if ($n_small_graphs{$id} > $max_small_graphs){
$emit_graph{$id}=0;
} else {
$n_small_graphs{$id}++;
}
} else {
if ( $n_big_graphs{$id} > $max_big_graphs){
$emit_graph{$id}=0;
} else {