Commit 6610d523 authored by paalmm's avatar paalmm

updates: fixed MPEG-ts bugs and implemented support for XML,rtmp and tcp

parent ad39ed20
#package RTMP::Client;
use Socket qw(:all);
use Fcntl;
use Time::HiRes qw/ sleep time tv_interval gettimeofday/;
#use FLV::Header;
use 5.008008;
use warnings;
#use Data::AMF; not in use
our $VERSION = '0.03';
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(rtmp_connect rtmp_play rtmp_call print_hex my_recv get_stream_start get_rtmp_data);
our $rtmp_packet_length = 12;
our $debug_flag = 0;
##RTMP REFs.###
## http://osflash.org/_media/rtmp_spec.jpg
## http://wiki.multimedia.cx/index.php?title=RTMP#Packet_header
## http://osflash.org/documentation/rtmp
## http://search.cpan.org/~chengang/RTMP-Client-0.03/lib/RTMP/Client.pm
#RTMP
my $av_pkt=0;
my $first_av_packet:shared=0;
my $start_time:shared;
my $time_stamp_calc=0;
my $real_last_time_rtmp = 0;
my $real_time_rtmp = 0;
my %rtmp_stats = ();
my %rtmp_jitter_stats = ();
my $calculated_timestamp=0;
my $msgcount = 0;
#some temps
my $recv_bytes = 0;
my $offset = 0;
my $control_counter = 0;
=head1 NAME
RTMP::Client - A Simple RTMP client
=head1 SYNOPSIS
use RTMP::Client qw(rtmp_connect rtmp_play rtmp_call);
print "connect success\n" if rtmp_connect('192.168.1.1', '1935', 'live/23');
rtmp_play('ktv', '-1.0', '-1.0');
rtmp_call('YourFunc', 'YourARGV');
=head1 DESCRIPTION
This is a simple RTMP client without video or audio decode.
It implemented in pure PERL including packing Adobe amf packages.
=head1 METHODS
=head2 rtmp_connect($rtmp_server_address, $server_port, $application_name)
Just like the 'NetConnection.connect()' function in ActionScript, with args are set in different way.
=cut
sub rtmp_connect
{
my ($server_addr, $server_port, $app_name, $swfurl, $pageurl, $flash_versio) = @_;
die 'app_name MUST NOT begin with \'/\' ' if substr($app_name, 0 , 1) eq '/';
$swfurl = "http://streamer.uninett.no/player.swf" unless $swfurl;
$pageurl = "http://streamer.uninett.no/flash2.htm" unless $pageurl;
$flash_version = "LNX 10,0,32,18" unless $flash_version;
my $ipaddress;
&init_stats_rtmp();
if ($server_addr =~ /^[\d\.]+$/ )
{ $ipaddress = inet_aton($server_addr); } # ip addr here
else
{ $ipaddress = gethostbyname($server_addr); } # domain addr here
my $address = sockaddr_in($server_port, $ipaddress);
$start_time = time() if(!$start_time);
socket(SOCK, AF_INET, SOCK_STREAM, IPPROTO_TCP) || die $!;
connect(SOCK, $address) || die $!;
#my $fh_flag = fcntl(SOCK, F_GETFL,0);
#$fh_flag |= O_NONBLOCK;
#my $fh_flag = fcntl(SOCK, F_SETFL, $fh_flag);
die 'handshake fail' unless rtmp_handshake();
my $amf_body = pack_amf_string('connect');
$amf_body.= pack_amf_number('1.0');
$amf_body.= pack_amf_object_start();
$amf_body.= pack_amf_attribute_name('app');
$amf_body.= pack_amf_string($app_name);
$amf_body.= pack_amf_attribute_name('flashVer');
$amf_body.= pack_amf_string($flash_version);
$amf_body.= pack_amf_attribute_name('swfUrl');
$amf_body.= pack_amf_string($swfurl);
$amf_body.= pack_amf_attribute_name('tcUrl');
$amf_body.= pack_amf_string("rtmp://$server_addr/$app_name");
$amf_body.= pack_amf_attribute_name('fpad');
$amf_body.= pack_amf_boolean('false');
$amf_body.= pack_amf_attribute_name('capabilities');
$amf_body.= pack_amf_number('15.0');
$amf_body.= pack_amf_attribute_name('audioCodecs');
$amf_body.= pack_amf_number('3191.0');
$amf_body.= pack_amf_attribute_name('videoCodecs');
$amf_body.= pack_amf_number('252.0');
$amf_body.= pack_amf_attribute_name('videoFunction');
$amf_body.= pack_amf_number('1.0');
$amf_body.= pack_amf_attribute_name('pageUrl');
$amf_body.= pack_amf_string($pageurl);
$amf_body.= pack_amf_attribute_name('objectEncoding');
$amf_body.= pack_amf_number('0.0');
$amf_body.= pack_amf_object_end();
# set object id 3
my $amf = pack_amf_body_to_chunks($amf_body, '00000011', '00000000', '14');
my_send_bin($amf);
for (1..5)
{
my @rtmp_content = my_recv_a_msg();
#analysis_rtmp_msg(@rtmp_content);
}
$amf = pack('H*', '025154620000040500000000002625a0');
my_send_bin($amf);
# user control message -> set buffer length -> set stream 0 buffer to 1000
$amf = pack('H*', '4200000000000a04000300000000000003e8');
my_send_bin($amf);
$amf_body = pack_amf_string('createStream');
$amf_body.= pack_amf_number('2.0');
$amf_body.= pack_amf_null();
$amf = pack_amf_body_to_chunks($amf_body, '00000011', '00000000', '14');
my_send_bin($amf);
$amf = pack('H*', 'c2000300000001000003e8');
my_send_bin($amf);
for (1..1)
{
my @rtmp_content = my_recv_a_msg();
# analysis_rtmp_msg(@rtmp_content);
}
return 1;
}
=head2 rtmp_play($stream_or_file_name, $play_type, $length, $interval_call_hook_function, $hook_function)
Just like the 'NetStream.play()' function in ActionScript, with args are set in different way.
You can use the last two args or not.
=cut
sub rtmp_play
{
my ($stream_name, $play_type, $length, $period, $last,$debug) = @_;
$debug_flag = 1 if $debug;
my $amf_body = pack_amf_string('play');
$amf_body.= pack_amf_number('0.0');
$amf_body.= pack_amf_null();
$amf_body.= pack_amf_string($stream_name);
$amf_body.= pack_amf_number($play_type);
$amf_body.= pack_amf_number($length);
my $amf = pack_amf_body_to_chunks($amf_body, '00001000', '01000000', '14');
#my $amf = pack('H*','0800372400002e110100000000020004706c6179000000000000000000050200074d61696e425f3100c08f40000000000000c08f400000000000');
my_send_bin($amf);
my_recv_nostop($period,$last);
return 0;
}
=head2 rtmp_call($stream_or_file_name, $play_type, $length, $file_path_to_store_the_data_received)
Just like the 'NetStream.call()' function in ActionScript, with args are set in different way.
=cut
sub rtmp_call
{
my ($func_name,$select_option) = @_;
#return unless $select_option =~ /^[A-Z]$/;
my $amf_body = pack_amf_string($func_name);
$amf_body.= pack_amf_number('1.0');
$amf_body.= pack_amf_null();
#$amf.= pack_amf_string($select_option);
$amf_body.= pack_amf_string($select_option);
#my $amf = pack_amf_body_to_chunks($amf_body, '01000011', '01000000', '11');
#print_hex($amf);
my $amf = pack('H*','43010d640000271100');
$amf.= $amf_body;
my_send_bin($amf);
my @rtmp_content = my_recv_a_msg();
my $string = do { use bytes;length $rtmp_content[1]; };
return $string;
}
=head1 EXAMPLES
use RTMP::Client qw(rtmp_connect rtmp_play rtmp_call);
Speed Detection
report download speed every 5 secs.
print "connect success\n" if rtmp_connect('192.168.1.1', '1935', 'live/23');
my $report_time = 5;
rtmp_play('ktv', '-1.0', '-1.0', $report_time, \&speed_detector);
sub speed_detector
{
my $rev_length = shift;
my $speed = $rev_length / 1024 / $report_time;
if ($speed > 3)
{
my $cur_time = strftime("%F_%T", localtime);
print $cur_time, "\t", $speed, "\tKbytes/s\n";
}
else
{
print "too slow !\n";
}
}
Save to File
do things like "rtmpdump".L<http://rtmpdump.mplayerhq.hu/>
print "connect success\n" if rtmp_connect('192.168.1.1', '1935', 'live/23');
my $loop_time = 10;
rtmp_play('ktv', '-1.0', '-1.0', $report_time, \&save_to_file);
sub save_to_file
{
my $rev_length = shift;
my $rev_binary = shift;
open my $fh,">>","/root/rtmp_dump.bin";
binmode $fh;
print $fh $rev_binary;
close $fh;
}
=head1 SOME INTERNAL METHODS
=head2 rtmp_handshake()
No args need. Called in function rtmp_connect().
=cut
sub rtmp_handshake
{
my @c0 = qw(03);
my_send_hex(\@c0);
my @c1 = qw(00 00 00 00); # time - 4 bytes
push @c1, qw(00 00 00 00); # zero - 4 bytes (do not MUST!)
for (1..1528) { push @c1, '99';}
my_send_hex(\@c1);
#print "start handshack...\n";
my $s0 = my_recv(1);
my $s1 = my_recv(1536);
my $s2 = my_recv(1536);
my $c2 = $s1;
my_send_bin($c2);
#print "handshack success.\n";
return 1;
}
=head2 pack_amf_body_to_chunks($string, $object_id, $stream_id, $type)
Output a available binary amf packet.
Works on amf message body, just like add a right amf header before the message body.
=cut
sub pack_amf_body_to_chunks
{
my ($body, $size_and_object_id, $stream_id, $type) = @_;
my $body_length = do { use bytes; length $body; };
my $header = pack('B8', $size_and_object_id); # 2 = the size of the header{here: 12} 6 = object id{here: 3}
$header.= pack('H6', '000000'); # timestamp (big-endian integer) [header size >= 4 bytes]
$header.= substr( pack('N', $body_length), 1); # length of the object body (big-endian integer) [>= 8 bytes]
$header.= pack('H2', $type); # content type [>= 8 bytes] {0x14: Function call, 0x09: Video data, 0x08: Audio data}
$header.= pack('H8', $stream_id); # a stream id (32 bit integer that is little-endian encoded) [only = 12 bytes]
my $need_chunk_num = ( $body_length - ( $body_length % 128 ) ) / 128;
$need_chunk_num++;
my $output = $header;
for (1..$need_chunk_num)
{
my $seek_flag = ($_ - 1) * 128 ;
$output.= substr($body, $seek_flag, 128);
$output.= pack('H2', 'c3');
}
$output = substr( $output, 0, -1);
return $output;
}
=head2 pack_amf_object_start()
=cut
sub pack_amf_object_start
{
my $output = pack('H2', '03');
return $output;
}
=head2 pack_amf_object_end()
=cut
sub pack_amf_object_end
{
my $output = pack('H6', '000009');
return $output;
}
=head2 pack_amf_attribute_name($string)
It packs a attribute_name which less than 65536 bytes or return null.
=cut
sub pack_amf_attribute_name
{
my ($string) = (@_);
my $length = do { use bytes; length $string; };
return '' if $length > 65536;
my $output = pack('n', $length);
$output.= $string;
return $output;
}
=head2 pack_amf_number($double)
Return 9 bytes binary data.
=cut
sub pack_amf_number
{
my ($number) = (@_);
my $output = pack('H2', '00');
$output.= join "", reverse split//, pack('d', $number);
return $output;
}
=head2 pack_amf_boolean($boolean)
Return 2 bytes binary data.
=cut
sub pack_amf_boolean
{
my ($boolean) = (@_);
my $output = pack('H2', '01');
$output.= pack('H2', '00') if $boolean eq 'false';
$output.= pack('H2', '01') if $boolean eq 'true';
return $output;
}
=head2 pack_amf_string($string)
it can pack a string which less than 65536 bytes or it return null.
There will be a long string packer in future.
=cut
sub pack_amf_string
{
my ($string) = (@_);
my $length = do { use bytes; length $string; };
return '' if $length > 65536; #todo: alarm or switch into long_string function.
my $output = pack('H2', '02');
$output.= pack('n', $length);
$output.= $string;
return $output;
}
=head2 pack_amf_boolean($boolean)
Return 1 byte binary data.
=cut
sub pack_amf_null
{
return pack('H2', '05');
}
=head2 my_recv_a_chunk()
Recieve a rtmp chunk.
=cut
sub my_recv_a_chunk
{
# get the first byte include fmt and chunk stream id
my $fmt_and_chunk_id = my_recv(1,3);
$fmt_and_chunk_id = unpack('B8', $fmt_and_chunk_id);
dprint("Chunk fmt: $fmt_and_chunk_id\n");
# get chunk_id
my $chunk_id = substr( $fmt_and_chunk_id, 2, 6);
if ($chunk_id eq '000000')
{
# havn`t test
my $buf = dec(my_recv(1));
$chunk_id = $buf + 64;
}
elsif ($chunk_id eq '000001')
{
# havn`t test
my $buf = dec(my_recv(1));
$chunk_id = $buf + 64;
$buf = dec(my_recv(1));
$chunk_id += $buf * 256;
}
else
{
$chunk_id = oct('0b'.$chunk_id);
}
my $chunk_size = get_rtmp_client_chunk_size();
print "chunk id ", $chunk_id, "\n" if $debug_flag !=0;
# get chunk_header_length
my $fmt = substr( $fmt_and_chunk_id, 0, 2);
if ($fmt eq '00')
{
#Chunks of Type 0 are 11 bytes long. This type MUST be used at the
#start of a chunk stream, and whenever the stream timestamp goes
#backward (e.g., because of a backward seek).
my $chunk_header = my_recv(11);
print('Chunk header: ') if $debug_flag;
print_hex($chunk_header) if $debug_flag;
#the absolute timestamp of the message is sent here.
#If the timestamp is greater than or equal to 16777215
#(hexadecimal 0x00ffffff), this value MUST be 16777215, and the
#‘extended timestamp header’ MUST be present. Otherwise, this value
#SHOULD be the entire timestamp.
#REF adobe specs.
my $chunk_timestamp = dec(substr($chunk_header, 0, 3)); #todo: absolute timestamp for what?
check_if_extended($chunk_timestamp); #checks if time stmap is extended. -PMM
reset_rtmp_timer($chunk_id); #todo: means reset chunk timestamp here?
#Note that this is generally not the same as the length of the chunk
#payload. The chunk payload length is the maximum chunk size for all
#but the last chunk, and the remainder (which may be the entire
#length, for small messages) for the last chunk.
my $chunk_msg_length = dec(substr($chunk_header, 3, 3));
#Needs to be commented -PMM
print "msg len $chunk_msg_length \n" if $debug_flag;
# if ($chunk_msg_length > 1151110 || $chunk_msg_length<=0){
# $chunk_msg_length = dec(substr($chunk_header, 4, 3));
# if($chunk_msg_length > 100000){
# $chunk_msg_length = $chunk_size;
# }
# }
set_rtmp_chunk_msg_length($chunk_id, $chunk_msg_length);
my $chunk_msg_type_id = dec(substr($chunk_header, 6, 1));
$chunk_msg_type_id = -1 unless defined $chunk_msg_type_id;
if ($chunk_msg_type_id !~ /^[1-9][\d]*$/){
&restart_stream_on_error();
}
set_rtmp_chunk_msg_type_id($chunk_id, int($chunk_msg_type_id));
my $chunk_msg_stream_id = dec(substr($chunk_header, 7, 4));
set_rtmp_chunk_msg_stream_id($chunk_id, int($chunk_msg_stream_id));
}
elsif ($fmt eq '01')
{
#Chunks of Type 1 are 7 bytes long. The message stream ID is not
#included; this chunk takes the same stream ID as the preceding chunk.
#Streams with variable-sized messages (for example, many video
#formats) SHOULD use this format for the first chunk of each new
#message after the first.
my $chunk_header = my_recv(7);
print('Chunk header: ') if $debug_flag;
print_hex($chunk_header) if $debug_flag;
my $chunk_timestamp_delta = dec(substr($chunk_header, 0, 3));
$calculated_timestamp += $chunk_timestamp_delta;
check_if_extended($chunk_timestamp_delta); #checks if time stmap is extended. -PMM
my $chunk_msg_length = dec(substr($chunk_header, 3, 3));
print "msg lengt $chunk_msg_length \n" if $debug_flag;
if ($chunk_msg_length > 100000 ){
$chunk_msg_length = $chunk_size ;
}
;
set_rtmp_chunk_msg_length($chunk_id, $chunk_msg_length);
my $chunk_msg_type_id = dec(substr($chunk_header, 6, 1));
$chunk_msg_type_id = -1 unless defined $chunk_msg_type_id;
if ($chunk_msg_type_id !~ /^[1-9][\d]*$/){
&restart_stream_on_error();
}
set_rtmp_chunk_msg_type_id($chunk_id, int($chunk_msg_type_id));
if ($chunk_msg_type_id == 9 || $chunk_msg_type_id == 8){
rtmp_jitter_stats($chunk_timestamp_delta,$chunk_id); #jitter stats -PMM
}
}
elsif ($fmt eq '10')
{
#Chunks of Type 2 are 3 bytes long. Neither the stream ID nor the
#message length is included; this chunk has the same stream ID and
#message length as the preceding chunk. Streams with constant-sized
#messages (for example, some audio and data formats) SHOULD use this
#format for the first chunk of each message after the first.
my $chunk_header = my_recv(3,3);
print('Chunk header: ') if $debug_flag;
print_hex($chunk_header) if $debug_flag;
my $chunk_timestamp_delta = dec(substr($chunk_header, 0, 3)); #for eval the bandwidth, i do not use it.
$calculated_timestamp += $chunk_timestamp_delta;
check_if_extended($chunk_timestamp_delta); #checks if time stmap is extended. -PMM
#RTMP_jitter_stats($chunk_timestamp_delta);
}
elsif ($fmt eq '11')
{
#Chunks of Type 3 have no header. Stream ID, message length and
#timestamp delta are not present; chunks of this type take values from
#the preceding chunk. When a single message is split into chunks, all
#chunks of a message except the first one, SHOULD use this type.
}
my $msg_length = get_rtmp_chunk_msg_length($chunk_id);
my $msg_type_id = get_rtmp_chunk_msg_type_id($chunk_id);
# if this is a Protocol Control Messages
#Protocol control messages SHOULD have message stream ID 0(called as
#control stream) and chunk stream ID 2
#this is bugging. -PMM
if ($chunk_id == 2)
{
dprint('protocol_control_message', 3);
$control_counter++;
#Set Chunk Size
if ($msg_type_id == 1)
{
dprint('Set Chunk Size',6);
my $new_chunk_size = dec(my_recv(4),3);
set_rtmp_client_chunk_size($new_chunk_size);
dprint("new chunk size is $new_chunk_size", 6);
# my $set_chunk_response =
# pack('H2','02')
# . get_rtmp_timer($chunk_id, '3bytes')
# . pack('H6','000004')
# . pack('H2','01')
# . pack('H8','00000000')
# . pack('H8','00000080');
# my_send_bin($set_chunk_response);
}
#Abort Message
elsif ($msg_type_id == 2)
{
dprint('Abort Message',6);
}
# Acknowledgement
elsif ($msg_type_id == 3)
{
dprint('Acknowledgement',6);
}
# User Control Message
elsif ($msg_type_id == 4)
{
dprint('User Control Message',6);
my $event_type = dec(my_recv(2));
if($event_type == 0)
{
dprint('Stream Begin', 9);
my $begin_stream_id = dec(my_recv(4));
dprint("Begin Stream ID is $begin_stream_id", 9);
}
elsif($event_type == 1)
{
dprint('Stream EOF', 9);
}
elsif($event_type == 2)
{
dprint('Stream Dry', 9);
}
elsif($event_type == 3)
{
dprint('Set Buffer Length', 9);
}
elsif($event_type == 4)
{
dprint('StreamIsRecorded', 9);
}
elsif($event_type == 5)
{
dprint('user contorl error recving', 9);
}
elsif($event_type == 6)
{
dprint('PingRequest', 9);
my $ping_timestamp = my_recv(4);
my $ping_response =
pack('H2','02')
. get_rtmp_timer($chunk_id, '3bytes')
. pack('H6','000004')
. pack('H2','07')
. pack('H8','00000000')
. $ping_timestamp;
my_send_bin($ping_response);
}
elsif($event_type == 7)
{
dprint('PingResponse', 9);
}
else
{
my $cleaing_bytes = my_recv(4);
print "user contorl error recving !\n" if $debug_flag !=0;
}
}
# Window Acknowledgement Size
elsif ($msg_type_id == 5)
{
dprint('Window Acknowledgement Size',6);
my $server_window_acknowledgement = dec(my_recv($msg_length));
set_rtmp_peer_window($server_window_acknowledgement); # could not work on this
dprint("peer Window Size is $server_window_acknowledgement now", 6);
}
# Set Peer Bandwidth
elsif ($msg_type_id == 6)
{
dprint('Set Peer Bandwidth',6);
my $buf = my_recv($msg_length);
my $set_window_size = dec(substr($buf, 0, 4));
my $limit_type = dec(substr($buf, 4, 1)); # 0-hard, 1-soft, 2-dynamic
set_rtmp_window($set_window_size, $limit_type);
dprint("peer set my Sindow Size to $set_window_size with limit type $limit_type", 6);
my $rtmp_packet_window_acknowledgement_size =
pack('H2','02')
. get_rtmp_timer($chunk_id, '3bytes')
. pack('H6','000004')
. pack('H2','05')
. pack('H8','00000000')
. substr($buf, 0, 4);
my_send_bin($rtmp_packet_window_acknowledgement_size);
}
return 'rtmp_protocol_control_message';
}
if($msg_type_id == 20)
{
dprint('Command message in AMF0', 3);
}
if($msg_length <= $chunk_size)
{
#dprint("payload 1:$msg_length");
my $rtmp_msg = my_recv($msg_length);
#print_hex($rtmp_msg);
put_rtmp_msg($chunk_id, $rtmp_msg) unless (!defined $rtmp_msg);
return $chunk_id;
}
else
{
our $msg_length_remain = get_rtmp_chunk_msg_length($chunk_id) unless defined $msg_length_remain;
$msg_length_remain = get_rtmp_chunk_msg_length($chunk_id) if $msg_length_remain <= 0;
if ($msg_length_remain <= $chunk_size)
{
#dprint("payload 2:$msg_length_remain");
my $rtmp_msg = my_recv($msg_length_remain);
#print_hex($rtmp_msg);
put_rtmp_msg($chunk_id, $rtmp_msg) unless (!defined $rtmp_msg);
$msg_length_remain = 0;
return $chunk_id;
}
else
{
#dprint("payload 3:$chunk_size");
my $rtmp_msg = my_recv($chunk_size);
#print_hex($rtmp_msg);
put_rtmp_msg($chunk_id, $rtmp_msg);
$msg_length_remain = $msg_length_remain - $chunk_size;
return 'rtmp_msg_recv_piece';
}
}
}
=head2 my_recv_a_msg()
Recieve a rtmp message.
=cut
sub my_recv_a_msg
{
our $msg_counter unless defined $msg_counter;
my $chunk_id = 0;
my $chunk_count = 0;
my $limit_chunk = 9; # this is a limit by me. not by the RTMP
while(1)
{
$chunk_count++;
#print "[", "-" x 5, " $chunk_count ", "-" x 5 ,"]\n";
$chunk_id = my_recv_a_chunk();
my $rtmp_msg_length = get_rtmp_chunk_msg_length($chunk_id);
#die "limit chunk already recv.\n" if $chunk_count == $limit_chunk;
if ($chunk_id =~ /[\d]+/ and $rtmp_msg_length)
{
my $rtmp_msg = get_rtmp_msg($chunk_id);
my $rtmp_msg_type_id = int(get_rtmp_chunk_msg_type_id($chunk_id));
my $rtmp_msg_type_name = '';
if ($rtmp_msg_type_id == 8)
{
$rtmp_msg_type_name = 'Audio';
}
elsif ($rtmp_msg_type_id == 9)
{
$rtmp_msg_type_name = 'Video';
}
elsif ($rtmp_msg_type_id == 20 or $rtmp_msg_type_id == 17)
{
$rtmp_msg_type_name = 'Command';
}
elsif ($rtmp_msg_type_id == 18 or $rtmp_msg_type_id == 15)
{
$rtmp_msg_type_name = 'Data';
}
elsif ($rtmp_msg_type_id == 19 or $rtmp_msg_type_id == 16)
{
$rtmp_msg_type_name = 'SharedObject';
}
elsif ($rtmp_msg_type_id == 22)
{
$rtmp_msg_type_name = 'Aggregate';