Commit 0bb806bd authored by Olav Kvittem's avatar Olav Kvittem
Browse files

added crude-spread-select

parent 962f9dd6
Pipeline #46136 passed with stages
in 2 minutes and 44 seconds
......@@ -41,15 +41,11 @@ else
sleep 3 # allow pipe to be created
fi
if ! pgrep -u $USER -f "gzip .*crude.pipe" >/dev/null; then
gzip -c $PIPE >> $logdir/crude-`date +%T`.gz &
fi
if ! pgrep -u $USER -f "bin/crude-ana-gz" > /dev/null; then
# make sure that qstream-gapa-ana deos not run already
if ! pgrep -u $USER -f "bin/qstream-gap-ana " > /dev/null; then
$root/bin/crude-ana-gz &
fi
if ! pgrep -u $USER -f "bin/crude-spread-select" > /dev/null; then
# make sure that qstream-gapa-ana deos not run already
if ! pgrep -u $USER -f "bin/crude-spread-select" > /dev/null; then
$root/bin/crude-spread-select $index $logdir $PIPE &
fi
fi
if ! pgrep -u $USER -f "bin/trace.sh " >/dev/null; then
......
#!/usr/bin/perl
# spread file input to parallel commands on a measurement point
# using internal buffers/debug
my $debug=0;
use threads;
# use Thread::Queue;
use FileHandle;
use File::Basename;
use IO::Select;
use Getopt::Long;
use Time::HiRes qw(usleep);
my $pipe_size=2**20 - 1; # 1MB
my (@opt_cmd, $opt_v, $opt_h, $opt_debug);
my $usage="Usage\n$0 --cmd <command> --cmd ...
-pipe_size - buffering output to commands ($pipe_size)
-v - print summary stats at end
$0 index output-directory [data-file]..\n
";
GetOptions('cmd=s'=> \@opt_cmd, 'pipe_size=s'=>\$pipe_size, 'debug=s' => \$opt_debug, 'v'=>\$opt_v, 'h'=>\$opt_h) or die "$usage : $!" ;
die $usage if $opt_h;
my @cmds=();
if ( $#opt_cmd >= 0 ){ # externally spec'd commands
@cmds=@opt_cmd;
} else { # builtin commands
if ( $#ARGV < 1 ){
printf "Usage: $0 index output-directory [data-file]..\n";
exit 1;
}
my $index=shift;
my $lager=shift;
my $bin = dirname($0);
my $root = "$bin/..";
my $ext=`date +%T`;
chomp $ext;
@cmds = (
"perl $bin/qstream-gap-ana -v -minloss 5 -head -win 50 -jitter 600 -addresses $root/etc/mp-address.txt -rtp 5" .
" -json $lager/gap-ana-$ext.json" .
" >> $lager/gap-ana-$ext.txt 2>> $lager/gap-log-$ext.txt",
"gzip >> $lager/crude-$ext.gz" );
# only done at central site - mostly for debug purposes :
# "nice perl $bin/qstream-gap-list >> $lager/gap-list-$ext.txt";
}
if ( ! $DB::{single} ){ # drop when in debugger
$SIG{INT} = 'final_summary';
$SIG{TERM} = 'final_summary';
}
my @queues=();
my @fh=();
my @blocked=();
my $n_slept=();
my $t_slept=();
my @would_block=();
my @partial_write=();
my @normal_write=();
my $line_count=0;
foreach $i ( 0.. $#cmds ){
$fh[$i] = IO::File->new();
$fh[$i]->open( "|$cmds[$i]" or die "Could not open cmd : $! :\n$cmd");
$fh[$i]->blocking(0);
$queues[$i]=();
set_pipe_size( $fh[$i], $pipe_size); # 1M
}
push @ARGV, "-" if $#ARGV < 0; # default stdin
foreach $in_file ( @ARGV){
my $in_fh;
if ( $in_file eq "-"){
$in_fh = STDIN;
} else {
open $in_fh, "<", $in_file || die "Could not open $in_file : $!\n" ;
}
my $select = IO::Select->new( $in_fh );
my $remaining_output=0;
my $wait_s=0.01; # 10 ms
my $line;
my $is_pipe= -p $in_fh;
my $eof=0;
$!=0;
until( $eof == 1 and $remaining_output <= 0){
# empty input queue
my $continue = 1;
while ( !$eof and $continue and my @ready_FHs = $select->can_read( $wait_s)) {
foreach my $FH (@ready_FHs) { # only one
if ( $! != 0 ) {
die "Error reading $in_file : $!";
}
sysread $FH, $line, 1000;
if ( length $line == 0 ){
$eof=1;
} else {
$line_count++;
foreach $i ( 0.. $#cmds ){
push ( @{$queues[$i]}, $line);
}
}
}
if ( ! $is_pipe || $eof == 1) { # not pipe - don't read the whole disk file
$continue=0;
}
}
$qmax[$i]=$#{$queues[$i]} if ! $qmax[$i] || $qmax[$i] < $#{$queues[$i]};
# try to empty internal queues
foreach $i ( 0.. $#cmds ){
while ( $#{$queues[$i]} >= 0 ){
if ( $blocked[$i] < 0 ){ # never
printf "blocked index %d for %d\n", $i, $blocked[$i] if $opt_debug && $blocked[$i] > 1;
usleep(100 * $blocked[$i]); # additive increase in sleep_time
$n_slept[$i] ++;
$t_slept[$i] += $blocked[$i];
}
my $line = shift(@{$queues[$i]});
my $rv = syswrite( $fh[$i], $line, length $line);
if (!defined($rv) && $!{EAGAIN}) { # would block
unshift @{$queues[$i]}, $line;
$would_block[$i]++;
$blocked[$i]++;
break;
} elsif ($rv != length $line) { # incomplete write
unshift @{$queues[$i]}, substr( $line, $rv );
$partial_write[$i]++;
$blocked[$i]++;
break;
} else { # successfully wrote
$normal_write[$i]++;
$blocked[$i]=0;
}
}
}
$remaining_output=0;
foreach $i ( 0.. $#cmds ){
$remaining_output += $#{$queues[$i]} + 1;
}
$!=0; # reset errors before can_read
} # of input cycle
} # of input files
summary();
exit (0);
#================================================================================
sub summary{
if ( $opt_v){
printf "summary: lines in : %d\n", $line_count;
printf "%15s %15s %15s %15s %15s %15s\n", qw /would_block partial_write normal_write n_slept t_slept qmax/;
foreach $i(0..$#cmds){
printf "%15d %15d %15d %15d %15d %15d %s\n", $would_block[$i], $partial_write[$i], $normal_write[$i],
$n_slept[$i], $t_slept[$id], $qmax[$i], $cmds[$i];
}
}
}
sub final_summary{
summary();
exit(1);
}
sub set_pipe_size{
my $fh = shift;
my $pipe_size = shift;
if ($debug and -p $fh ) {
printf "fh %d is pipe\n", $fh;
printf "pipe size was %d\n", fcntl($fh, Fcntl::F_GETPIPE_SZ, 0);
}
my $new = fcntl( $fh, Fcntl::F_SETPIPE_SZ, int($pipe_size))
|| warn "### could not set pipe size $pipe_size for $cmd: $!\n";
warn sprintf "### set-pipe-size for $cmd failed: new size $new, got: %d",
fcntl($fh, Fcntl::F_GETPIPE_SZ, 0)
if $new < $pipe_size;
return $fh;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment