From 582381a02c215c113bc3b55ea201078cc9f07053 Mon Sep 17 00:00:00 2001 From: Jed Reynolds Date: Tue, 7 Apr 2020 15:12:54 -0700 Subject: [PATCH] l3_video: progress on L4 pattern, unfinished --- l3_video_em.pl | 305 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 249 insertions(+), 56 deletions(-) diff --git a/l3_video_em.pl b/l3_video_em.pl index 7b8dbac3..07ddf4d5 100755 --- a/l3_video_em.pl +++ b/l3_video_em.pl @@ -7,7 +7,7 @@ use Carp; $SIG{ __DIE__ } = sub { Carp::confess( @_ ) }; $SIG{ __WARN__ } = sub { Carp::confess( @_ ) }; use Data::Dumper; - +use POSIX; # Un-buffer output $| = 1; @@ -26,6 +26,7 @@ our $has_usleep = (defined &usleep) ? 1 : 0; my $NA ='NA'; our $resource = 1; +our $upstream_res = 1; our $quiet = "yes"; our $silent = 0; our $endp_name = ""; @@ -38,11 +39,17 @@ our $tx_style = ""; our $cx_name = ""; our $tx_side = "B"; our $min_tx = 0; -our $max_tx = -1; -our $buf_size = -1; +our $max_tx = 920 * 1024 * 1024; # 920Mbps default +our $buf_size = 3 * 1024 * 1024; # 3MB default our $log_cli = "unset"; # do not set to 0, it turns into logfile "./0" our $stream_key = undef; our $quit_when_const = 0; +our $sta = ""; +our $upstream = ""; +our $proto = "udp"; # for constant +our $est_fill_time_sec = 0; +our $last_fill_time_sec = 0; + # https://en.wikipedia.org/wiki/Standard-definition_television # https://www.adobe.com/devnet/adobe-media-server/articles/dynstream_live/popup.html @@ -160,7 +167,12 @@ our $usage = "$0: # modulates a Layer 3 CX to emulate a video server # Expects an existing L3 connection --mgr {hostname | IP} --mgr_port {ip port} - --tx_style { constant | bufferfill } + --tx_style { constant | bufferfill | L4 } + # constant: for variable-br constant streaming, like Skype. UDP or TCP. + # bufferfill: for framebuffer transmission, like YouTube, that monitors and throttles an existing Layer-3 connection + # High cpu load and imprecise technique, but uses a single constant connection. UDP or TCP + # L4: for framebuffer transmission, like YouTube, but using more precise Layer-4 URL fetching pattern. + # This repeats the same curl fetch over and over, creating a new connection every time. More efficient, TCP only. --cx_name {name} --tx_side {A|B} # which side is emulating the server, # default $::tx_side @@ -174,16 +186,24 @@ our $usage = "$0: # modulates a Layer 3 CX to emulate a video server --quiet {0|1|yes|no} # print CLI commands --silent # do not print status output --quit_when_const # quits connection when constant tx detected + --sta {1.1.sta0 or 1.sta0} # use with L4 or constant + --upstream {1.1.eth1 or 1.eth1} # use with L4 or constant; will create HTTP service on port if necessary + --proto {udp|tcp} # use with constant tx style + Example: 1) create the L3 connection: ./lf_firemod.pl --resource 1 --action create_endp bursty-udp-A --speed 0 --endp_type lf_udp --port_name eth1 --report_timer 500 ./lf_firemod.pl --resource 1 --action create_endp bursty-udp-B --speed 0 --endp_type lf_udp --port_name eth2 --report_timer 500 ./lf_firemod.pl --resource 1 --action create_cx --cx_name bursty-udp --cx_endps bursty-udp-A,bursty-udp-B $0 --cx_name bursty-udp --stream 720p --buf_size 8M --max_tx 40M + + 2) Create a Layer-4 connection: + $0 --tx_style L4 --cx_name hunker --stream yt-sdr-1080p30 --buf_size 3M --port 1.sta0000 "; my $show_help = undef; our $debug = 0; + $::stream_key = $resolution; GetOptions ( @@ -193,6 +213,7 @@ GetOptions 'silent+' => \$::silent, 'mgr|m=s' => \$::lfmgr_host, 'mgr_port|p:i' => \$::lfmgr_port, + 'resource|r:i' => \$::resource, 'log_cli:s{0,1}' => \$log_cli, 'tx_style|style:s' => \$::tx_style, 'cx_name|e=s' => \$::cx_name, @@ -203,6 +224,9 @@ GetOptions 'stream_res|stream=s' => \$::stream_key, 'list_streams+' => \$list_streams, 'quit_when_const' => \$::quit_when_const, + 'sta=s' => \$::sta, + 'upstream|up|u=s' => \$::upstream, + 'proto=s' => \$::proto, ) || die($!); @@ -270,7 +294,7 @@ if (defined $log_cli) { # PIPE;POLL;PROF;PWR;QUIT;RTMAX;RTMIN;SEGV;STKFLT;STOP;SYS;TERM;TRAP;TSTP;TTIN;TTOU;UNUSED; # URG;USR1;USR2;VTALRM;WINCH;XCPU;XFSZ;__DIE__;__WARN__ # -# install signal handlers for stopping L3 connections +# install signal handlers for stopping connections $SIG{ABRT} = \&cleanexit; $SIG{HUP} = \&cleanexit; $SIG{INT} = \&cleanexit; @@ -287,24 +311,24 @@ sub cleanexit { if (!(defined $msg) || ("" eq $msg)) { $msg = 'no msg'; } - if ((defined $::cx_name) && ("" ne $::cx_name)) { - if (defined $::utils->telnet) { - print STDERR "\nStopping $::cx_name: $msg\n"; - $::utils->doAsyncCmd($::utils->fmt_cmd("set_cx_state", "all", $::cx_name, "STOPPED")); - } - } - exit 0; + if ((defined $::cx_name) && ("" ne $::cx_name)) { + if (defined $::utils->telnet) { + if ($::stop_cx_on_exit) { + print STDERR "\nStopping $::cx_name: $msg\n"; + $::utils->doAsyncCmd($::utils->fmt_cmd("set_cx_state", "all", $::cx_name, "STOPPED")); + } + else { + print STDERR ("CX '$::cx_name' will keep running.") unless $::silent; + } + } + else { + print STDERR ("No telnet session remains, CX '$::cx_name' will keep running."); + } + } + exit 0; } # ======================================================================== -our $est_fill_time_sec = 0; -our $last_fill_time_sec = 0; - -sub filltime { - my () = @_; - -} - # ======================================================================== sub rxbytes { my ($endp) = @_; @@ -336,8 +360,9 @@ sub rxbytes { sub get_txrx_rate { my ($lf_host, $lf_port, $rez, $cxnam, $rx_sid) = @_; my $rxendp = "${cxnam}-${rx_sid}"; - my $cmd = "lf_firemod.pl --mgr $lf_host --mgr_port $lf_port -r $rez " + my $cmd = "./lf_firemod.pl --mgr $lf_host --mgr_port $lf_port -r $rez " ."--action show_endp --endp_name $rxendp --endp_vals EID"; + print "GET_TXRX: $cmd\n"; my @lines = `$cmd`; chomp(@lines); my @matches = grep {/EID:/} @lines; @@ -379,14 +404,28 @@ sub txbytes { my @lines = split("\n", $::utils->doAsyncCmd("nc_show_endpoints $endp")); #Tx Bytes: Total: 0 Time: 60s Cur: 0 0/s my $bytes = 0; - my @matches = grep {/^\s+Tx Bytes/} @lines; - if (@matches < 1) { - warn "tx-bytes not found for [$endp]\n"; - print join("\n> ", @lines), "\n"; - return 0; + my @matches = grep {/^L4Endp \[/} @lines; + my $is_4 = (@matches > 0)? 1 : 0; + + if ($is_4) { + @matches = grep {/^\s+Bytes Written/} @lines; + if (@matches < 1) { + warn "bytes-written not found for [$endp]\n"; + print join("\n> ", @lines), "\n"; + return 0; + } + ($bytes) = $matches[0] =~ /Bytes Written:\s+Total: (\d+)/; + } + else { + @matches = grep {/^\s+Tx Bytes/} @lines; + if (@matches < 1) { + warn "tx-bytes not found for [$endp]\n"; + print join("\n> ", @lines), "\n"; + return 0; + } + ($bytes) = $matches[0] =~ /Tx Bytes:\s+Total: (\d+)/; } - ($bytes) = $matches[0] =~ /Tx Bytes:\s+Total: (\d+)/; if (!(defined $bytes)) { warn "no tx-bytes match for [$endp]\n"; print "="x72, "\n"; @@ -469,6 +508,62 @@ if ($::min_tx =~ /[kmg]$/i) { } } + +my @hunks = (); +my @lines = (); +my @matches = (); + +if ((defined $::sta) && ("" ne $::sta)) { + if ($::sta =~ /\./) { + @hunks = split(/\./, $::sta); + $::sta = $hunks[-1]; + if (("$::resource" ne $hunks[-2])) { + print "Mismatch between station resource(${hunks[-2]}) and declared resource($::resource), bye.\n"; + exit(1); + } + @lines = split(/\r?\n/, $::utils->doAsyncCmd("nc_show_port 1 $::resource $::sta")); + @matches = grep {/^Shelf: 1,/} @lines; + if (@matches < 1) { + print "Cannot find port $::resource.$::sta, bye\n"; + exit(1); + } + } +} + +my $endp = $::cx_name."-".$::tx_side; # change me if L4 +@hunks = (); +if ($::tx_style =~ /^l(ayer)?[-_]?4$/i ) { + $::tx_style = "L4"; +} +if ($::tx_style =~ /^const(ant)?$/i) { + $::tx_style = "constant"; +} +if (($::tx_style eq "L4") || ($::tx_style eq "constant")) { + if (!(defined $::sta) || ("" eq $::sta)) { + print "L4 and constant connections needs a station, bye\n"; + exit 1; + } + if (!(defined $::upstream) || ("" eq $::upstream)) { + print "L4 and constant connection needs an upstream port, bye\n"; + exit 1; + } + if ($::upstream !~ /[.]/) { + $::upstream_res = $::resource; + } + else { + @hunks = split(/[.]/, $::upstream); + $::upstream = $hunks[-1]; + $::upstream_res = $hunks[-2]; + } + + if ($::sta =~ /\./) { + @hunks = split(/[.]/, $::sta); + $::sta = $hunks[-1]; + die("resource ${hunks[-2]} for station $::sta is not listed resource: $::resource, bye.") + if ($hunks[-2] ne $::resource); + } +} + my $stream_bps = 0; die("Unknown stream key $::stream_key") unless(exists $::avail_stream_res{$::stream_key}); @@ -493,38 +588,22 @@ print "Filling $::stream_key $buf_kB KB buffer est ${est_fill_time_sec}sec, empt die ("Please provide cx_name") unless((defined $::cx_name) && ("" ne $::cx_name)); -# print out choices for now -my @lines = split("\r?\n", $::utils->doAsyncCmd($::utils->fmt_cmd("show_cx", "all", $::cx_name))); -my @matches = grep {/Could not find/} @lines; -die($matches[0]) - unless (@matches == 0); +# check for cx if we're bufferfill +my $cx_exists = 0; +@lines = split("\r?\n", $::utils->doAsyncCmd($::utils->fmt_cmd("show_cx", "all", $::cx_name))); +@matches = grep {/Could not find/} @lines; +$cx_exists = 1 if (@matches == 0); -# avoid a stampede of scripts starting at the same time -my $rand_start_delay = rand(7); - if (! $::debug) { - print "Random start delay: $rand_start_delay...\n"; - $::utils->sleep_sec($rand_start_delay); +if ($::tx_style eq "bufferfill") { + print "Tx_style bufferfill requires your connection already exists, bye.\n"; + exit 1; } -print "Stopping and configuring $::cx_name\n" unless($silent); -$::utils->doCmd($::utils->fmt_cmd("set_cx_state", "all", $::cx_name, "STOPPED")); -my $endp = $::cx_name."-".$::tx_side; -@lines = split("\r?\n", $::utils->doAsyncCmd($::utils->fmt_cmd("nc_show_endp", $endp))); -@matches = grep {/ Shelf: 1, Card: /} @lines; -die ("No matches for show endp $endp") - unless($matches[0]); -my ($res, $port, $type) = $matches[0] =~ /, Card: (\d+)\s+Port: (\d+)\s+Endpoint: \d+ Type: ([^ ]+)\s+/; -my $cmd = $::utils->fmt_cmd("add_endp", $endp, 1, $res, $port, $type, - $NA, # ip_port - $NA, # is_rate_bursty - $::min_tx, # min_rate - $::min_tx # max_rate - ); -$::utils->doAsyncCmd($cmd); -print "Starting $::cx_name\n" unless($silent); -$::utils->doCmd($::utils->fmt_cmd("set_cx_state", "all", $::cx_name, "RUNNING")); -$cmd = $::utils->fmt_cmd("add_endp", $endp, 1, $res, $port, $type, $NA, $NA, $::max_tx, $::max_tx); -$::utils->doAsyncCmd($cmd); +print "Stopping and configuring $::cx_name\n" unless($silent); +if (($::tx_style eq "L4") && ($::cx_name !~ /^CX_/)) { + $::cx_name = "CX_$::cx_name"; +} +$::utils->doCmd($::utils->fmt_cmd("set_cx_state", "all", $::cx_name, "STOPPED")); my @reports = (); my $fill_starts = 1; @@ -537,6 +616,120 @@ my $begin = $starttime_sec; my $last_report_sec = $starttime_sec; my $report_period_sec = 6; my $check_if_stopped = 0; +my $cmd =""; +my $res = 1; +my $port = "Unknown"; +my $type = $::proto; +our $stop_cx_on_exit = 1; + +# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- +# Layer-4 setup +# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- +if ($::tx_style eq "L4") { + # check that the upstream port has http enabled + $::stop_cx_on_exit = 0; + my $cmd = "./lf_portmod.pl --mgr $::lfmgr_host --mgr_port $::lfmgr_port --port_name $::upstream --show_port Current"; + my $current = `$cmd`; + if ($current !~ / SVC-HTTPD/ ) { + print "Enabling HTTP on $::upstream...\n"; + #"set_port 1 1 eth1 NA NA NA NA 0 NA NA NA NA 134217730 " # <--- and to turn off + $cmd = + $::utils->doCmd($::utils->fmt_cmd("set_port", 1, $::resource, $::upstream, + "NA", "NA", "NA", "NA", 35184372088832, "NA", "NA", "NA", "NA", 134217730)); + sleep(1); + } + my ($short_cx) = $::cx_name =~ /CX_(\S+)/; + my $tmp_ep1 = $short_cx; + my $tmp_ep2 = "D_$short_cx"; + + $endp = $tmp_ep1; # L4 endpoints are not '-A', '-B' + my $timeout = 2000; # ms + die("Invalid drain time: $drain_time_sec") + if ($drain_time_sec <= 0); + my $url_rate = floor(600 / $drain_time_sec); + + my $short_size = $::buf_size; + while ($short_size > 1024) { + $short_size = floor($short_size / 1024); + } + my $url = "dl http://".$::upstream."/".$short_size."m.bin /dev/null"; + print "URL $url\n"; + sleep 10; + + #$::utils->doCmd($::utils->fmt_cmd( + # "add_l4_endp", $tmp_ep2, 1, $::resource, $::sta, "l4_generic", 0, 0, 0, ' ', ' ')); + #$::utils->doCmd($::utils->fmt_cmd("set_endp_flag", $tmp_ep2, "Unmananaged", 1)); + $::utils->doCmd($::utils->fmt_cmd( + "add_l4_endp", $tmp_ep1, 1, $::resource, $::sta, "l4_generic", 0, $timeout, $url_rate, $url, ' ')); + sleep 1; + $cmd = $::utils->fmt_cmd("add_cx", $::cx_name, "default_tm", $tmp_ep1, "NA"); + print "********** $cmd **********\n"; + $::utils->doAsyncCmd($cmd); +} + +# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- +# Layer-3 constant setup +# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- + + + +# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- +# Layer-3 constant bufferfill +# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- +#print "Stopping and configuring $::cx_name\n" unless($silent); +#$::utils->doCmd($::utils->fmt_cmd("set_cx_state", "all", $::cx_name, "STOPPED")); + +@lines = split("\r?\n", $::utils->doAsyncCmd($::utils->fmt_cmd("nc_show_endp", $endp))); +@matches = grep {/ Shelf: 1, Card: /} @lines; +# create a L3 connection +if ($::tx_style =~ /constant/) { + $::stop_cx_on_exit = 0; + + die ("No matches for show endp $endp") + unless($matches[0]); + ($res, $port, $type) = $matches[0] =~ /, Card: (\d+)\s+Port: (\d+)\s+Endpoint: \d+ Type: ([^ ]+)\s+/; + if (!(defined $res) || !(defined $port) || !(defined $type)) { + die("Unable to determine endpoint [$endp], bye"); + + } + $cmd = $::utils->fmt_cmd("add_endp", $endp, 1, $res, $port, $type, + $NA, # ip_port + $NA, # is_rate_bursty + $::min_tx, # min_rate + (($::tx_style eq "bufferfill") ? $::min_tx : $::max_tx) # max_rate + ); + print "CMD[$cmd]\n"; + sleep 5; + $::utils->doAsyncCmd($cmd); +} + + + +# +# start CX +# +# avoid a stampede of scripts starting at the same time +my $rand_start_delay = rand(7); + if (! $::debug) { + print "Random start delay: $rand_start_delay...\n"; + $::utils->sleep_sec($rand_start_delay); +} +$cmd = $::utils->fmt_cmd("set_cx_state", "all", $::cx_name, "RUNNING"); +print "Starting $::cx_name: $cmd\n" unless($silent); +$::utils->doCmd($cmd); + +if (!(defined $endp) || !(defined $res) || !(defined $port) || !(defined $type) || !(defined $::max_tx) || !(defined $::min_tx)) { + die("Unable to continue, missing values in: endp($endp) res($res) port($port) type($type) max_tx($::max_tx)"); +} + +if ($::tx_style !~ /bufferfill/) { + print "Done\n"; + cleanexit("Done with setup on $::tx_style $::cx_name\n"); +} + +$cmd = $::utils->fmt_cmd("add_endp", $endp, 1, $res, $port, $type, $NA, $NA, $::max_tx, $::max_tx); +$::utils->doAsyncCmd($cmd); + my $startbytes = txbytes($endp, $check_if_stopped); my @delta_reports = ();