Commit d5434ba3 authored by Olav Kvittem's avatar Olav Kvittem
Browse files

spread crude input on multiple output with threads

parent 8edd4e5a
Pipeline #40238 passed with stages
in 1 minute and 4 seconds
#!/usr/bin/perl
# spread file input to parallel commands on a measurement point
# using threads and queueing
my $debug=1;
use threads;
use Thread::Queue;
use FileHandle;
use File::Basename;
use Getopt::Long;
my (@opt_cmd, $opt_v, $opt_h);
my $usage="Usage\n$0 --cmd <command> --cmd ...
-v - print summary stats at end
$0 index output-directory [data-file]..\n
";
GetOptions('cmd=s'=> \@opt_cmd, 'v'=>\$opt_v, 'h'=>\$opt_h) or die "$usage : $!" ;
die $usage if $opt_h;
my @cmds=();
if ( $#opt_cmd >= 0 ){ # externally spec'd commands
@cmds=@opt_cmd;
} else { # builtin commands
if ( $#ARGV < 1 ){
printf "Usage: $0 index output-directory [data-file]..\n";
exit 1;
}
my $index=shift;
my $lager=shift;
my $pipe_size=2**20; # 1MB
my $bin = dirname(__FILE__);
my $root = "$bin/..";
my $ext=`date +%T`;
chomp $ext;
@cmds = (
"nice perl $bin/qstream-gap-ana -v -minloss 5 -head -win 50 -jitter 600 -names $root/etc/mp-address.txt -rtp 5" .
" -json $lager/gap-ana-$ext.json" .
" >> $lager/gap-ana-$ext.txt 2>> $lager/gap-log-$ext.txt",
"gzip >> $lager/crude-$ext.gz" );
# only done at central site - mostly for debug purposes :
# "nice perl $bin/qstream-gap-list >> $lager/gap-list-$ext.txt";
$opt_v=1;
}
$SIG{INT} = 'stop_threads';
my @queues=();
my @writers=();
foreach $i ( 0.. $#cmds ){
my $q= new Thread::Queue;
push( @queues, $q );
push( @writers, threads->create('writer', $cmds[$i], $q) );
}
while(<>){
foreach $q ( @queues){
$q->enqueue($_);
}
}
foreach $q ( @queues){
$q->enqueue("end of civilization");
while ( $q->pending()>0 ) { sleep 1; }
}
# stop_threads();
gather_results();
exit 0;
#================================================================================
sub writer{
sub slutt{
close $fh || die "Could not close filehandle : $!\n$cmd";
return $n;
}
$SIG{INT} = 'slutt';
$SIG{HUP} = 'slutt';
my ($cmd, $q)=@_;
my $n=0;
my $fh = IO::File->new();
$fh->open( "|$cmd" or die "Could not open cmd : $! :\n$cmd");
$0=$cmd;
while(1){
my $l=$q->dequeue;
last if $l =~ /^end of civilization$/;
my $lth = syswrite($fh, $l ) || die sprintf "could not write to fh %s : $! \n$cmd" ;
if ( $lth != length $l ) { die sprintf "Invalid length $lth : buffer is %d", length $l; }
$n++;
}
close $fh || die "Could not close filehandle : $!\n$cmd";
# return sprintf "%8d %s", $n, substr($cmd, 0, 20);
return $n;
}
sub stop_threads{
foreach $t (0..$#writers){
$writers[$t]->kill('SIGHUP');
}
}
sub gather_results(){
my @lines = sprintf "%8d input\n", $.;
foreach $t (0..$#writers){
push ( @lines, sprintf "%8d %s\n", $writers[$t]->join(), substr( $cmds[$t], 0, 80) );
}
print @lines if $opt_v;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment