adddition of l3cx_profile2, testbase, vt_2

Signed-off-by: Dipti <dipti.dhond@candelatech.com>
This commit is contained in:
Dipti
2021-03-02 20:12:44 -08:00
parent 209b88f682
commit 016c05b708
4 changed files with 845 additions and 3 deletions

470
py-json/l3_cxprofile2.py Normal file
View File

@@ -0,0 +1,470 @@
#!/usr/bin/env python3
import re
import time
import pprint
from pprint import pprint
from LANforge import LFRequest
from LANforge import LFUtils
from LANforge import set_port
from LANforge import add_sta
from LANforge import add_dut
from LANforge import lfcli_base
from LANforge import add_vap
from LANforge.lfcli_base import LFCliBase
#from lfdata import LFDataCollection
from LANforge import add_monitor
from LANforge.add_monitor import *
#from LANforge.base_profile import BaseProfile
import os
import datetime
import base64
import csv
class L3CXProfile2(LFCliBase):
def __init__(self,
lfclient_host,
lfclient_port,
local_realm,
side_a_min_bps=None,
side_b_min_bps=None,
side_a_max_bps=0,
side_b_max_bps=0,
side_a_min_pdu=-1,
side_b_min_pdu=-1,
side_a_max_pdu=0,
side_b_max_pdu=0,
report_timer_=3000,
name_prefix_="Unset",
number_template_="00000",
debug_=False):
"""
:param lfclient_host:
:param lfclient_port:
:param local_realm:
:param side_a_min_bps:
:param side_b_min_bps:
:param side_a_max_bps:
:param side_b_max_bps:
:param side_a_min_pdu:
:param side_b_min_pdu:
:param side_a_max_pdu:
:param side_b_max_pdu:
:param name_prefix_: prefix string for connection
:param number_template_: how many zeros wide we padd, possibly a starting integer with left padding
:param debug_:
"""
super().__init__(local_realm=local_realm,
debug=debug_)
self.side_a_min_pdu = side_a_min_pdu
self.side_b_min_pdu = side_b_min_pdu
self.side_a_max_pdu = side_a_max_pdu
self.side_b_max_pdu = side_b_max_pdu
self.side_a_min_bps = side_a_min_bps
self.side_b_min_bps = side_b_min_bps
self.side_a_max_bps = side_a_max_bps
self.side_b_max_bps = side_b_max_bps
self.report_timer = report_timer_
self.created_cx = {}
self.created_endp = {}
self.name_prefix = name_prefix_
self.number_template = number_template_
def get_cx_names(self):
return self.created_cx.keys()
def get_cx_report(self):
self.data = {}
for cx_name in self.get_cx_names():
self.data[cx_name] = self.json_get("/cx/" + cx_name).get(cx_name)
return self.data
def instantiate_file(self, file_name, file_format):
pass
############################################ transfer into lfcriteria.py
#get current rx values
def __get_rx_values(self):
cx_list = self.json_get("endp?fields=name,rx+bytes")
if self.debug:
print(self.created_cx.values())
print("==============\n", cx_list, "\n==============")
cx_rx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
for item, value in cx_name.items():
for value_name, value_rx in value.items():
if value_name == 'rx bytes' and item in self.created_cx.values():
cx_rx_map[item] = value_rx
return cx_rx_map
#compare vals
def __compare_vals(self, old_list, new_list):
passes = 0
expected_passes = 0
if len(old_list) == len(new_list):
for item, value in old_list.items():
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
if passes == expected_passes:
return True
else:
return False
else:
return False
############################################ transfer into lfcriteria.py
def refresh_cx(self):
for cx_name in self.created_cx.keys():
self.json_post("/cli-json/show_cxe", {
"test_mgr": "ALL",
"cross_connect": cx_name
}, debug_=self.debug)
print(".", end='')
def start_cx(self):
print("Starting CXs...")
for cx_name in self.created_cx.keys():
if self.debug:
print("cx-name: %s" % (cx_name))
self.json_post("/cli-json/set_cx_state", {
"test_mgr": "default_tm",
"cx_name": cx_name,
"cx_state": "RUNNING"
}, debug_=self.debug)
if self.debug:
print(".", end='')
if self.debug:
print("")
def stop_cx(self):
print("Stopping CXs...")
for cx_name in self.created_cx.keys():
self.local_realm.stop_cx(cx_name)
print(".", end='')
print("")
def cleanup_prefix(self):
self.local_realm.cleanup_cxe_prefix(self.name_prefix)
def cleanup(self):
print("Cleaning up cxs and endpoints")
if len(self.created_cx) != 0:
for cx_name in self.created_cx.keys():
if self.debug:
print("Cleaning cx: %s"%(cx_name))
self.local_realm.rm_cx(cx_name)
for side in range(len(self.created_cx[cx_name])):
ename = self.created_cx[cx_name][side]
if self.debug:
print("Cleaning endpoint: %s"%(ename))
self.local_realm.rm_endp(self.created_cx[cx_name][side])
def create(self, endp_type, side_a, side_b, sleep_time=0.03, suppress_related_commands=None, debug_=False,
tos=None):
if self.debug:
debug_ = True
cx_post_data = []
timer_post_data = []
these_endp = []
these_cx = []
# print(self.side_a_min_rate, self.side_a_max_rate)
# print(self.side_b_min_rate, self.side_b_max_rate)
if (self.side_a_min_bps is None) \
or (self.side_a_max_bps is None) \
or (self.side_b_min_bps is None) \
or (self.side_b_max_bps is None):
raise ValueError(
"side_a_min_bps, side_a_max_bps, side_b_min_bps, and side_b_max_bps must all be set to a value")
if type(side_a) == list and type(side_b) != list:
side_b_info = self.local_realm.name_to_eid(side_b)
side_b_shelf = side_b_info[0]
side_b_resource = side_b_info[1]
for port_name in side_a:
side_a_info = self.local_realm.name_to_eid(port_name,debug=debug_)
side_a_shelf = side_a_info[0]
side_a_resource = side_a_info[1]
if port_name.find('.') < 0:
port_name = "%d.%s" % (side_a_info[1], port_name)
cx_name = "%s%s-%i" % (self.name_prefix, side_a_info[2], len(self.created_cx))
endp_a_name = cx_name + "-A"
endp_b_name = cx_name + "-B"
self.created_cx[cx_name] = [endp_a_name, endp_b_name]
self.created_endp[endp_a_name] = endp_a_name
self.created_endp[endp_b_name] = endp_b_name
these_cx.append(cx_name)
these_endp.append(endp_a_name)
these_endp.append(endp_b_name)
endp_side_a = {
"alias": endp_a_name,
"shelf": side_a_shelf,
"resource": side_a_resource,
"port": side_a_info[2],
"type": endp_type,
"min_rate": self.side_a_min_bps,
"max_rate": self.side_a_max_bps,
"min_pkt": self.side_a_min_pdu,
"max_pkt": self.side_a_max_pdu,
"ip_port": -1
}
endp_side_b = {
"alias": endp_b_name,
"shelf": side_b_shelf,
"resource": side_b_resource,
"port": side_b_info[2],
"type": endp_type,
"min_rate": self.side_b_min_bps,
"max_rate": self.side_b_max_bps,
"min_pkt": self.side_b_min_pdu,
"max_pkt": self.side_b_max_pdu,
"ip_port": -1
}
url = "/cli-json/add_endp"
self.local_realm.json_post(url, endp_side_a, debug_=debug_, suppress_related_commands_=suppress_related_commands)
self.local_realm.json_post(url, endp_side_b, debug_=debug_, suppress_related_commands_=suppress_related_commands)
#print("napping %f sec"%sleep_time)
time.sleep(sleep_time)
url = "cli-json/set_endp_flag"
data = {
"name": endp_a_name,
"flag": "AutoHelper",
"val": 1
}
self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands)
data["name"] = endp_b_name
self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands)
if (endp_type == "lf_udp") or (endp_type == "udp") or (endp_type == "lf_udp6") or (endp_type == "udp6"):
data["name"] = endp_a_name
data["flag"] = "UseAutoNAT"
self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands)
data["name"] = endp_b_name
self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands)
if tos != None:
self.local_realm.set_endp_tos(endp_a_name, tos)
self.local_realm.set_endp_tos(endp_b_name, tos)
data = {
"alias": cx_name,
"test_mgr": "default_tm",
"tx_endp": endp_a_name,
"rx_endp": endp_b_name,
}
# pprint(data)
cx_post_data.append(data)
timer_post_data.append({
"test_mgr": "default_tm",
"cx_name": cx_name,
"milliseconds": self.report_timer
})
elif type(side_b) == list and type(side_a) != list:
side_a_info = self.local_realm.name_to_eid(side_a,debug=debug_)
side_a_shelf = side_a_info[0]
side_a_resource = side_a_info[1]
# side_a_name = side_a_info[2]
for port_name in side_b:
print(side_b)
side_b_info = self.local_realm.name_to_eid(port_name,debug=debug_)
side_b_shelf = side_b_info[0]
side_b_resource = side_b_info[1]
side_b_name = side_b_info[2]
cx_name = "%s%s-%i" % (self.name_prefix, port_name, len(self.created_cx))
endp_a_name = cx_name + "-A"
endp_b_name = cx_name + "-B"
self.created_cx[cx_name] = [endp_a_name, endp_b_name]
self.created_endp[endp_a_name] = endp_a_name
self.created_endp[endp_b_name] = endp_b_name
these_cx.append(cx_name)
these_endp.append(endp_a_name)
these_endp.append(endp_b_name)
endp_side_a = {
"alias": endp_a_name,
"shelf": side_a_shelf,
"resource": side_a_resource,
"port": side_a_info[2],
"type": endp_type,
"min_rate": self.side_a_min_bps,
"max_rate": self.side_a_max_bps,
"min_pkt": self.side_a_min_pdu,
"max_pkt": self.side_a_max_pdu,
"ip_port": -1
}
endp_side_b = {
"alias": endp_b_name,
"shelf": side_b_shelf,
"resource": side_b_resource,
"port": side_b_info[2],
"type": endp_type,
"min_rate": self.side_b_min_bps,
"max_rate": self.side_b_max_bps,
"min_pkt": self.side_b_min_pdu,
"max_pkt": self.side_b_max_pdu,
"ip_port": -1
}
url = "/cli-json/add_endp"
self.local_realm.json_post(url, endp_side_a, debug_=debug_, suppress_related_commands_=suppress_related_commands)
self.local_realm.json_post(url, endp_side_b, debug_=debug_, suppress_related_commands_=suppress_related_commands)
#print("napping %f sec" %sleep_time )
time.sleep(sleep_time)
url = "cli-json/set_endp_flag"
data = {
"name": endp_a_name,
"flag": "autohelper",
"val": 1
}
self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands)
url = "cli-json/set_endp_flag"
data = {
"name": endp_b_name,
"flag": "autohelper",
"val": 1
}
self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands)
#print("CXNAME451: %s" % cx_name)
data = {
"alias": cx_name,
"test_mgr": "default_tm",
"tx_endp": endp_a_name,
"rx_endp": endp_b_name,
}
cx_post_data.append(data)
timer_post_data.append({
"test_mgr": "default_tm",
"cx_name": cx_name,
"milliseconds": self.report_timer
})
else:
raise ValueError(
"side_a or side_b must be of type list but not both: side_a is type %s side_b is type %s" % (
type(side_a), type(side_b)))
print("wait_until_endps_appear these_endp: {} debug_ {}".format(these_endp,debug_))
self.local_realm.wait_until_endps_appear(these_endp, debug=debug_)
for data in cx_post_data:
url = "/cli-json/add_cx"
self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands)
time.sleep(0.01)
self.local_realm.wait_until_cxs_appear(these_cx, debug=debug_)
def to_string(self):
pprint(self)
# temp transfer of functions from test script to class
def build(self):
self.create(endp_type="lf_udp", side_a=self.station_profile.station_names, side_b=self.upstream,
sleep_time=0)
def start(self):
self.start_cx()
def stop(self):
self.stop_cx()
#to do : have the variables saved in l3cx profile, upon creation of profile , and called)
def monitor_record(self,
duration_sec=60,
monitor_interval_ms=1,
sta_list=None,
layer3_cols=None,
port_mgr_cols=None,
created_cx=None,
report_file=None,
output_format=None,
script_name=None,
arguments=None,
compared_report=None,
debug=False):
try:
duration_sec = self.parse_time(duration_sec).seconds
except:
if (duration_sec is None) or (duration_sec <= 1):
raise ValueError("L3CXProfile::monitor wants duration_sec > 1 second")
if (duration_sec <= monitor_interval_ms):
raise ValueError("L3CXProfile::monitor wants duration_sec > monitor_interval")
if report_file == None:
raise ValueError("Monitor requires an output file to be defined")
if created_cx == None:
raise ValueError("Monitor needs a list of Layer 3 connections")
if (monitor_interval_ms is None) or (monitor_interval_ms < 1):
raise ValueError("L3CXProfile::monitor wants monitor_interval >= 1 second")
if layer3_cols is None:
raise ValueError("L3CXProfile::monitor wants a list of column names to monitor")
if output_format is not None:
if output_format.lower() != report_file.split('.')[-1]:
raise ValueError('Filename %s has an extension that does not match output format %s .' % (report_file, output_format))
else:
output_format = report_file.split('.')[-1]
#default save to csv first
if report_file.split('.')[-1] != 'csv':
report_file = report_file.replace(str(output_format),'csv',1)
print("Saving rolling data into..." + str(report_file))
#add layer3 cols to header row
layer3_cols=[self.replace_special_char(x) for x in layer3_cols]
layer3_fields = ",".join(layer3_cols)
default_cols=['Timestamp','Timestamp milliseconds epoch','Duration elapsed']
default_cols.extend(layer3_cols)
header_row=default_cols
#add port mgr columns to header row
if port_mgr_cols is not None:
port_mgr_cols=[self.replace_special_char(x) for x in port_mgr_cols]
port_mgr_cols_labelled =[]
for col_name in port_mgr_cols:
port_mgr_cols_labelled.append("port mgr - " + col_name)
header_row.extend(port_mgr_cols_labelled)
#add sys info to header row
systeminfo = self.json_get('/')
header_row.extend([str("LANforge GUI Build: " + systeminfo['VersionInfo']['BuildVersion']), str("Script Name: " + script_name), str("Argument input: " + str(arguments))])
#cut "sta" off all "sta_names"
sta_list_edit=[]
if sta_list is not None:
for sta in sta_list:
sta_list_edit.append(sta[4:])
sta_list=",".join(sta_list_edit)
#instantiate csv file here, add specified column headers
csvfile=open(str(report_file),'w')
csvwriter = csv.writer(csvfile,delimiter=",")
csvwriter.writerow(header_row)
#wait 10 seconds to get IPs
time.sleep(10)
start_time = datetime.datetime.now()
end_time = start_time + datetime.timedelta(seconds=duration_sec)
#create lf data object
lf_data_collection= LFDataCollection(local_realm=self.local_realm,debug=self.debug)
while datetime.datetime.now() < end_time:
csvwriter.writerow(lf_data_collection.monitor_interval(start_time_=start_time,sta_list_=sta_list_edit, created_cx_=created_cx, layer3_fields_=layer3_fields,port_mgr_fields_=",".join(port_mgr_cols)))
time.sleep(monitor_interval_ms)
csvfile.close()
def pre_cleanup(self):
self.cleanup_prefix()

View File

@@ -14,9 +14,6 @@ from LANforge.lfcli_base import LFCliBase
#from generic_cx import GenericCx #from generic_cx import GenericCx
from LANforge import add_monitor from LANforge import add_monitor
from LANforge.add_monitor import * from LANforge.add_monitor import *
#Profile Imports
from l3_cxprofile import L3CXProfile
from l3_cxprofile2 import L3CXProfile2
import os import os
import datetime import datetime
import base64 import base64
@@ -25,6 +22,10 @@ import pandas as pd
import requests import requests
import ast import ast
import csv import csv
#Profile Imports
from l3_cxprofile import L3CXProfile
from l3_cxprofile2 import L3CXProfile2

70
py-json/test_base.py Normal file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
import lfdata
import lfreporting
class TestBase:
def __init__(self):
self.profiles = list()
def pre_clean_up(self):
if self.profiles:
for profile in self.profiles:
profile.precleanup()
def clean_up(self):
if self.profiles:
for profile in self.profiles:
profile.cleanup()
def start(self):
if self.profiles:
for profile in self.profiles:
profile.start()
def stop(self):
if self.profiles:
for profile in self.profiles:
profile.stop()
def build(self):
# - create station profile
# - create 2 criteria [ex: not down, continually_receiving] object (for ex)
# - station_profile.add_criteria([not_down, continually_receiving, etc_3])
# design - inversion of control
if self.profiles:
for profile in self.profiles:
profile.build()
def passes(self):
if self.profiles:
for profile in self.profiles:
profile.check_passes()
def run_duration(self, monitor_enabled= False):
#here check if monitor is enabled or not, then run loop accordingly
self.check_for_halt()
if self.profiles:
if monitor_enabled:
for profile in self.profiles:
profile.monitor_record() #check for halt in monitor record?
for profile in self.profiles:
profile.grade()
if self.exit_on_fail:
if self.fails():
self.exit_fail()
self.check_for_quit()
def report(self, enabled= False):
#here check if monitor is enabled or not, then run loop accordingly with lfreporting
pass
def begin(self):
self.pre_clean_up()
self.build()
self.start()
self.run_duration()
self.stop()
self.report()
self.clean_up()

View File

@@ -0,0 +1,301 @@
#!/usr/bin/env python3
"""test_ipv4_variable_time.py will create stations and endpoints to generate and verify layer-3 traffic.
This script will create a variable number of stations each with their own set of cross-connects and endpoints.
It will then create layer 3 traffic over a specified amount of time, testing for increased traffic at regular intervals.
This test will pass if all stations increase traffic over the full test duration.
Use './test_ipv4_variable_time.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
import argparse
#from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
from realm import Realm
from LANforge.test_base import TestBase
import time
import datetime
class IPV4VariableTime(Realm, TestBase):
def __init__(self,
ssid=None,
security=None,
password=None,
sta_list=[],
name_prefix=None,
upstream=None,
radio=None,
host="localhost",
port=8080,
mode=0,
ap=None,
monitor=False,
side_a_min_rate=56, side_a_max_rate=0,
side_b_min_rate=56, side_b_max_rate=0,
number_template="00000", test_duration="5m", use_ht160=False,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(lfclient_host=host,
lfclient_port=port)
self.add_to
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.sta_list = sta_list
self.security = security
self.password = password
self.radio = radio
self.mode = mode
self.ap = ap
self.number_template = number_template
self.debug = _debug_on
# self.json_post("/cli-json/set_resource", {
# "shelf":1,
# "resource":all,
# "max_staged_bringup": 30,
# "max_trying_ifup": 15,
# "max_station_bringup": 6
# })
self.name_prefix = name_prefix
self.test_duration = test_duration
self.station_profile = self.new_station_profile(station_list=sta_list)
self.cx_profile = self.new_l3_cx_profile(ver=2)
#station profile settings
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.debug = self.debug
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.use_ht160 = use_ht160
if self.station_profile.use_ht160:
self.station_profile.mode = 9
self.station_profile.mode = mode
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap", self.ap)
#cx profile settings
self.cx_profile.host = self.host
self.cx_profile.port = self.port
self.cx_profile.name_prefix = self.name_prefix
self.cx_profile.side_a_min_bps = side_a_min_rate
self.cx_profile.side_a_max_bps = side_a_max_rate
self.cx_profile.side_b_min_bps = side_b_min_rate
self.cx_profile.side_b_max_bps = side_b_max_rate
self.profiles.extend([self.station_profile, self.cx_profile])
def main():
optional = []
optional.append({'name': '--mode', 'help': 'Used to force mode of stations'})
optional.append({'name': '--ap', 'help': 'Used to force a connection to a particular AP'})
optional.append({'name': '--output_format', 'help': 'choose either csv or xlsx'})
optional.append({'name': '--report_file', 'help': 'where you want to store results', 'default': None})
optional.append({'name': '--a_min', 'help': '--a_min bps rate minimum for side_a', 'default': 256000})
optional.append({'name': '--b_min', 'help': '--b_min bps rate minimum for side_b', 'default': 256000})
optional.append(
{'name': '--test_duration', 'help': '--test_duration sets the duration of the test', 'default': "2m"})
optional.append({'name': '--layer3_cols', 'help': 'Columns wished to be monitored from layer 3 endpoint tab',
'default': ['name', 'tx bytes', 'rx bytes']})
optional.append({'name': '--port_mgr_cols', 'help': 'Columns wished to be monitored from port manager tab',
'default': ['ap', 'ip', 'parent dev']})
optional.append(
{'name': '--compared_report', 'help': 'report path and file which is wished to be compared with new report',
'default': None})
optional.append({'name': '--monitor_interval',
'help': 'frequency of monitor polls - ex: 250ms, 35s, 2h',
'default': '2s'})
optional.append({'name': '--monitor',
'help': 'whether test data should be recorded and stored in a report'})
parser = Realm.create_basic_argparse(
prog='test_ipv4_variable_time.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations to test connection and traffic on VAPs of varying security types (WEP, WPA, WPA2, WPA3, Open)
''',
description='''\
test_ipv4_variable_time.py:
--------------------
Generic command layout:
python3 ./test_ipv4_variable_time.py
--upstream_port eth1
--radio wiphy0
--num_stations 32
--security {open|wep|wpa|wpa2|wpa3}
--mode 1
{"auto" : "0",
"a" : "1",
"b" : "2",
"g" : "3",
"abg" : "4",
"abgn" : "5",
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
"bgnAX" : "13"}
--ssid netgear
--password admin123
--test_duration 2m (default)
--monitor_interval_ms
--monitor
--a_min 3000
--b_min 1000
--ap "00:0e:8e:78:e1:76"
--output_format csv
--report_file ~/Documents/results.csv (Example of csv file output - please use another extension for other file formats)
--compared_report ~/Documents/results_prev.csv (Example of csv file retrieval - please use another extension for other file formats) - UNDER CONSTRUCTION
--layer3_cols'name','tx bytes','rx bytes','dropped' (column names from the GUI to print on report - please read below to know what to put here according to preferences)
--port_mgr_cols 'ap','ip' (column names from the GUI to print on report - please read below to know what to put here according to preferences)
--debug
===============================================================================
** FURTHER INFORMATION **
Using the layer3_cols flag:
Currently the output function does not support inputting the columns in layer3_cols the way they are displayed in the GUI. This quirk is under construction. To output
certain columns in the GUI in your final report, please match the according GUI column display to it's counterpart to have the columns correctly displayed in
your report.
GUI Column Display Layer3_cols argument to type in (to print in report)
Name | 'name'
EID | 'eid'
Run | 'run'
Mng | 'mng'
Script | 'script'
Tx Rate | 'tx rate'
Tx Rate (1 min) | 'tx rate (1&nbsp;min)'
Tx Rate (last) | 'tx rate (last)'
Tx Rate LL | 'tx rate ll'
Rx Rate | 'rx rate'
Rx Rate (1 min) | 'rx rate (1&nbsp;min)'
Rx Rate (last) | 'rx rate (last)'
Rx Rate LL | 'rx rate ll'
Rx Drop % | 'rx drop %'
Tx PDUs | 'tx pdus'
Tx Pkts LL | 'tx pkts ll'
PDU/s TX | 'pdu/s tx'
Pps TX LL | 'pps tx ll'
Rx PDUs | 'rx pdus'
Rx Pkts LL | 'pps rx ll'
PDU/s RX | 'pdu/s tx'
Pps RX LL | 'pps rx ll'
Delay | 'delay'
Dropped | 'dropped'
Jitter | 'jitter'
Tx Bytes | 'tx bytes'
Rx Bytes | 'rx bytes'
Replays | 'replays'
TCP Rtx | 'tcp rtx'
Dup Pkts | 'dup pkts'
Rx Dup % | 'rx dup %'
OOO Pkts | 'ooo pkts'
Rx OOO % | 'rx ooo %'
RX Wrong Dev | 'rx wrong dev'
CRC Fail | 'crc fail'
RX BER | 'rx ber'
CX Active | 'cx active'
CX Estab/s | 'cx estab/s'
1st RX | '1st rx'
CX TO | 'cx to'
Pattern | 'pattern'
Min PDU | 'min pdu'
Max PDU | 'max pdu'
Min Rate | 'min rate'
Max Rate | 'max rate'
Send Buf | 'send buf'
Rcv Buf | 'rcv buf'
CWND | 'cwnd'
TCP MSS | 'tcp mss'
Bursty | 'bursty'
A/B | 'a/b'
Elapsed | 'elapsed'
Destination Addr | 'destination addr'
Source Addr | 'source addr'
''',
more_optional=optional)
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_sta = int(args.num_stations)
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta - 1, padding_number_=10000,
radio=args.radio)
#transfer below to l3cxprofile2 or base_profile-----------------------#
# try:
# layer3connections = ','.join([[*x.keys()][0] for x in ip_var_test.json_get('endp')['endpoint']])
# except:
# raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
# if type(args.layer3_cols) is not list:
# layer3_cols = list(args.layer3_cols.split(","))
# # send col names here to file to reformat
# else:
# layer3_cols = args.layer3_cols
# # send col names here to file to reformat
# if type(args.port_mgr_cols) is not list:
# port_mgr_cols = list(args.port_mgr_cols.split(","))
# # send col names here to file to reformat
# else:
# port_mgr_cols = args.port_mgr_cols
# # send col names here to file to reformat
# if args.debug:
# print("Layer 3 Endp column names are...")
# print(layer3_cols)
# print("Port Manager column names are...")
# print(port_mgr_cols)
ip_var_test = IPV4VariableTime(host=args.mgr,
port=args.mgr_port,
number_template="0000",
sta_list=station_list,
name_prefix="VT",
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
test_duration=args.test_duration,
use_ht160=False,
side_a_min_rate=args.a_min,
side_b_min_rate=args.b_min,
mode=args.mode,
ap=args.ap,
_debug_on=args.debug)
ip_var_test.begin()
if __name__ == "__main__":
main()