#!/usr/bin/env python3 """ NAME: test_l4.py PURPOSE: test_l4.py will create stations and endpoints to generate and verify layer-4 traffic This script will monitor the urls/s, bytes-rd, or bytes-wr attribute of the endpoints. These attributes can be tested over FTP using a --ftp flag. If the monitored value does not continually increase, this test will not pass. This script replaces the functionality of test_ipv4_l4.py, test_ipv4_l4_ftp_upload.py, test_ipv4_l4_ftp_urls_per_ten.py, test_ipv4_l4_ftp_wifi.py, test_ipv4_l4_urls_per_ten.py, test_ipv4_l4_urls_per_ten.py, test_ipv4_l4_wifi.py EXAMPLE (urls/s): ./test_l4.py --mgr localhost --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3} --ssid --passwd --test_duration 1m --url "dl http://192.168.1.101 /dev/null" --requests_per_ten 600 --test_type 'urls' --csv_outfile test_l4.csv --test_rig Test-Lab --test_tag L4 --dut_hw_version Linux --dut_model_num 1 --dut_sw_version 5.4.5 --dut_serial_num 1234 --test_id "L4 data" EXAMPLE (bytes-rd): ./test_l4.py --mgr localhost --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3} --ssid --passwd --test_duration 2m --url "dl http://192.168.1.101 /dev/null" --requests_per_ten 600 --test_type bytes-rd --csv_outfile test_l4.csv --test_rig Test-Lab --test_tag L4 --dut_hw_version Linux --dut_model_num 1 --dut_sw_version 5.4.5 --dut_serial_num 1234 --test_id "L4 data" EXAMPLE (ftp urls/s): ./test_l4.py --mgr localhost --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3} --ssid --passwd --test_duration 1m --url "ul ftp://lanforge:lanforge@192.168.1.101/large-file.bin /home/lanforge/large-file.bin" --requests_per_ten 600 --test_type 'urls' --csv_outfile test_l4.csv --test_rig Test-Lab --test_tag L4 --dut_hw_version Linux --dut_model_num 1 --dut_sw_version 5.4.5 --dut_serial_num 1234 --test_id "L4 data" EXAMPLE (ftp bytes-wr): ./test_l4.py --mgr localhost --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3} --ssid --passwd --test_duration 1m --url "ul ftp://lanforge:lanforge@192.168.1.101/large-file.bin /home/lanforge/large-file.bin" --requests_per_ten 600 --test_type bytes-wr --csv_outfile test_l4.csv --test_rig Test-Lab --test_tag L4 --dut_hw_version Linux --dut_model_num 1 --dut_sw_version 5.4.5 --dut_serial_num 1234 --test_id "L4 data" EXAMPLE (ftp bytes-rd): ./test_l4.py --mgr localhost --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3} --ssid --passwd --test_duration 1m --url "dl ftp://192.168.1.101 /dev/null" --requests_per_ten 600 --test_type bytes-rd --csv_outfile test_l4.csv --test_rig Test-Lab --test_tag L4 --dut_hw_version Linux --dut_model_num 1 --dut_sw_version 5.4.5 --dut_serial_num 1234 --test_id "L4 data" Use './test_l4.py --help' to see command line usage and options Copyright 2021 Candela Technologies Inc License: Free to distribute and modify. LANforge systems must be licensed. """ import sys import os import csv import importlib import time import argparse import datetime import logging if sys.version_info[0] != 3: print("This script requires Python 3") exit(1) sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) LFUtils = importlib.import_module("py-json.LANforge.LFUtils") realm = importlib.import_module("py-json.realm") Realm = realm.Realm TestGroupProfile = realm.TestGroupProfile port_utils = importlib.import_module("py-json.port_utils") PortUtils = port_utils.PortUtils lf_kpi_csv = importlib.import_module("py-scripts.lf_kpi_csv") lf_report = importlib.import_module("py-scripts.lf_report") lf_graph = importlib.import_module("py-scripts.lf_graph") logger = logging.getLogger(__name__) lf_logger_config = importlib.import_module("py-scripts.lf_logger_config") class IPV4L4(Realm): def __init__(self, host="localhost", port=8080, ssid=None, security=None, password=None, url=None, ftp_user=None, ftp_passwd=None, requests_per_ten=None, station_list=None, test_duration="2m", ap=None, outfile=None, kpi_csv=None, mode=0, target_requests_per_ten=60, number_template="00000", num_tests=1, radio="wiphy0", _debug_on=False, upstream_port="eth1", ftp=False, source=None, dest=None, test_type=None, _exit_on_error=False, _exit_on_fail=False): super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) self.host = host self.port = port self.radio = radio self.upstream_port = upstream_port self.ssid = ssid self.security = security self.password = password self.url = url self.mode = mode self.ap = ap self.outfile = outfile self.kpi_csv = kpi_csv self.epoch_time = int(time.time()) self.debug = _debug_on self.requests_per_ten = int(requests_per_ten) self.number_template = number_template self.test_duration = test_duration self.sta_list = station_list self.num_tests = int(num_tests) self.target_requests_per_ten = int(target_requests_per_ten) self.station_profile = self.new_station_profile() self.cx_profile = self.new_l4_cx_profile() self.port_util = PortUtils(self) self.station_profile.lfclient_url = self.lfclient_url self.station_profile.ssid = self.ssid self.station_profile.ssid_pass = self.password self.station_profile.security = self.security self.station_profile.number_template_ = self.number_template self.station_profile.mode = self.mode self.test_type = test_type self.ftp_user = ftp_user self.ftp_passwd = ftp_passwd self.source = source self.dest = dest if self.ap is not None: self.station_profile.set_command_param("add_sta", "ap", self.ap) self.cx_profile.url = self.url self.cx_profile.test_type = self.test_type self.cx_profile.requests_per_ten = self.requests_per_ten self.cx_profile.target_requests_per_ten = self.target_requests_per_ten if self.outfile is not None: results = self.outfile[:-4] results = results + "-results.csv" self.csv_results_file = open(results, "w") self.csv_results_writer = csv.writer(self.csv_results_file, delimiter=",") self.ftp = ftp if self.ftp and 'ftp://' not in self.url: logger.info("WARNING! FTP test chosen, but ftp:// not present in url!") test_types = {'urls', 'bytes-wr', 'bytes-rd'} if self.test_type not in test_types: raise ValueError( "Unknown test type: %s\nValid test types are urls, bytes-rd, or bytes-wr" % self.test_type) self.report = lf_report.lf_report(_results_dir_name="test_l4", _output_html="ftp_test.html", _output_pdf="ftp_test.pdf") def get_csv_name(self): logger.info("self.csv_results_file {}".format(self.csv_results_file.name)) return self.csv_results_file.name # Common code to generate timestamp for CSV files. def time_stamp(self): return time.strftime('%m_%d_%Y_%H_%M_%S', time.localtime(self.epoch_time)) # Query all endpoints to generate rx and other stats, returned # as an array of objects. def get_rx_values(self): endp_list = self.json_get("/layer4/all") # logger.info("endp_list: {endp_list}".format(endp_list=endp_list)) endp_rx_drop_map = {} endp_rx_map = {} our_endps = {} endps = [] total_bytes_rd = 0 total_bytes_wr = 0 total_rx_rate = 0 total_tx_rate = 0 urls_seconds = 0 total_urls = 0 ''' for e in self.cx_profile.created_endp.keys(): our_endps[e] = e print("our_endps {our_endps}".format(our_endps=our_endps)) ''' for endp_name in endp_list['endpoint']: if endp_name != 'uri' and endp_name != 'handler': for item, endp_value in endp_name.items(): # if item in our_endps: if True: endps.append(endp_value) logger.debug("endpoint: {item} value:\n".format(item=item)) logger.debug(endp_value) # print("item {item}".format(item=item)) for value_name, value in endp_value.items(): if value_name == 'bytes-rd': endp_rx_map[item] = value total_bytes_rd += int(value) if value_name == 'bytes-wr': endp_rx_map[item] = value total_bytes_wr += int(value) if value_name == 'rx rate': endp_rx_map[item] = value total_rx_rate += int(value) if value_name == 'tx rate': endp_rx_map[item] = value total_tx_rate += int(value) if value_name == 'urls/s': endp_rx_map[item] = value urls_seconds += float(value) if value_name == 'total-urls': endp_rx_map[item] = value total_urls += int(value) # logger.debug("total-dl: ", total_dl, " total-ul: ", total_ul, "\n") return endp_rx_map, endps, total_bytes_rd, total_bytes_wr, total_rx_rate, total_tx_rate, urls_seconds, total_urls def build(self): # Build stations self.station_profile.use_security(self.security, self.ssid, self.password) logger.info("Creating stations") self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) self.station_profile.set_command_param("set_port", "report_timer", 1500) self.station_profile.set_command_flag("set_port", "rpt_timer", 1) self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug) self._pass("PASS: Station build finished") temp_url = self.url.split(" ") if temp_url[0] == 'ul' or temp_url[0] == 'dl': if len(temp_url) == 2: if self.url.startswith("ul") and self.source not in self.url: self.cx_profile.url += " " + self.source elif self.url.startswith("dl") and self.dest not in self.url: self.cx_profile.url += " " + self.dest else: raise ValueError("ul or dl required in url to indicate direction") if self.ftp: if self.ftp_user is not None and self.ftp_passwd is not None: if ("%s:%s" % (self.ftp_user, self.ftp_passwd)) not in self.url: temp_url = self.url.split("//") temp_url = ("//%s:%s@" % (self.ftp_user, self.ftp_passwd)).join(temp_url) self.cx_profile.url = temp_url self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug, suppress_related_commands_=True) else: self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug, suppress_related_commands_=None) def start(self, print_pass=False, print_fail=False): if self.ftp: self.port_util.set_ftp(port_name=self.name_to_eid(self.upstream_port)[2], resource=1, on=True) temp_stas = self.sta_list.copy() self.station_profile.admin_up() if self.wait_for_ip(temp_stas): self._pass("All stations got IPs", print_pass) else: self._fail("Stations failed to get IPs", print_fail) exit(1) self.csv_add_column_headers() self.cx_profile.start_cx() logger.info("Starting test") def stop(self): self.cx_profile.stop_cx() if self.ftp: self.port_util.set_ftp(port_name=self.name_to_eid(self.upstream_port)[2], resource=1, on=False) self.station_profile.admin_down() def cleanup(self, sta_list): self.cx_profile.cleanup() self.station_profile.cleanup(sta_list) LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list, debug=self.debug) # builds test data into kpi.csv report def record_kpi_csv( self, station_list, total_test, total_pass, total_bytes_rd, total_bytes_wr, total_rx_rate, total_tx_rate, urls_second, total_urls): sta_count = len(station_list) total_seconds = round(urls_second, 2) # logger.info(total_seconds) # logic for Subtest-Pass & Subtest-Fail columns subpass_bytes_rd = 0 subpass_bytes_wr = 0 subpass_rx_rate = 0 subpass_tx_rate = 0 subpass_urls = 0 subfail_bytes_rd = 1 subfail_bytes_wr = 1 subfail_rx_rate = 1 subfail_tx_rate = 1 subfail_urls = 1 if total_bytes_rd > 0: subpass_bytes_rd = 1 subfail_bytes_rd = 0 if total_bytes_wr > 0: subpass_bytes_wr = 1 subfail_bytes_wr = 0 if total_rx_rate > 0: subpass_rx_rate = 1 subfail_rx_rate = 0 if total_tx_rate > 0: subpass_tx_rate = 1 subfail_tx_rate = 0 if urls_second > 0: subpass_urls = 1 subfail_urls = 0 # logic for pass/fail column # total_test & total_pass values from lfcli_base.py if total_test == total_pass: pass_fail = "PASS" else: pass_fail = "FAIL" results_dict = self.kpi_csv.kpi_csv_get_dict_update_time() # kpi data for combined station totals if self.url.startswith('dl'): # kpi data for Total Bytes-RD results_dict['Graph-Group'] = "L4 Total Bytes-RD" results_dict['pass/fail'] = pass_fail results_dict['Subtest-Pass'] = subpass_bytes_rd results_dict['Subtest-Fail'] = subfail_bytes_rd results_dict['short-description'] = "Total Bytes-RD" results_dict['numeric-score'] = "{}".format(total_bytes_rd) results_dict['Units'] = "bytes-rd" self.kpi_csv.kpi_csv_write_dict(results_dict) # kpi data for RX Rate results_dict['Graph-Group'] = "L4 Total RX Rate" results_dict['pass/fail'] = pass_fail results_dict['Subtest-Pass'] = subpass_rx_rate results_dict['Subtest-Fail'] = subfail_rx_rate results_dict['short-description'] = "{sta_count} Stations Total RX Rate".format(sta_count=sta_count) results_dict['numeric-score'] = "{}".format(total_rx_rate) results_dict['Units'] = "bps" self.kpi_csv.kpi_csv_write_dict(results_dict) if self.url.startswith('ul'): # kpi data for Bytes-WR results_dict['Graph-Group'] = "L4 Total Bytes-WR" results_dict['pass/fail'] = pass_fail results_dict['Subtest-Pass'] = subpass_bytes_wr results_dict['Subtest-Fail'] = subfail_bytes_wr results_dict['short-description'] = "Total Bytes-WR" results_dict['numeric-score'] = "{}".format(total_bytes_wr) results_dict['Units'] = "bytes-wr" self.kpi_csv.kpi_csv_write_dict(results_dict) # kpi data for TX Rate results_dict['Graph-Group'] = "L4 Total TX Rate" results_dict['pass/fail'] = pass_fail results_dict['Subtest-Pass'] = subpass_tx_rate results_dict['Subtest-Fail'] = subfail_tx_rate results_dict['short-description'] = "{sta_count} Stations Total TX Rate".format(sta_count=sta_count) results_dict['numeric-score'] = "{}".format(total_tx_rate) results_dict['Units'] = "bps" self.kpi_csv.kpi_csv_write_dict(results_dict) # kpi data for URLs/s results_dict['Graph-Group'] = "Average URLs per Second" results_dict['pass/fail'] = pass_fail results_dict['Subtest-Pass'] = subpass_urls results_dict['Subtest-Fail'] = subfail_urls results_dict['short-description'] = "Average URLs per Second" results_dict['numeric-score'] = "{}".format(total_seconds) results_dict['Units'] = "urls/s" self.kpi_csv.kpi_csv_write_dict(results_dict) # kpi data for Total URLs results_dict['Graph-Group'] = "Total URLs" results_dict['pass/fail'] = pass_fail results_dict['Subtest-Pass'] = subpass_urls results_dict['Subtest-Fail'] = subfail_urls results_dict['short-description'] = "Total URLs" results_dict['numeric-score'] = "{}".format(total_urls) results_dict['Units'] = "total-urls" self.kpi_csv.kpi_csv_write_dict(results_dict) # record results for .html & .pdf reports def record_results( self, sta_count, bytes_rd, bytes_wr, rx_rate, tx_rate, urls_second, total_urls): tags = dict() tags['station-count'] = sta_count # tags['attenuation'] = atten tags["script"] = 'test_l4' # now = str(datetime.datetime.utcnow().isoformat()) if self.csv_results_file: row = [self.epoch_time, self.time_stamp(), sta_count, bytes_rd, bytes_wr, rx_rate, tx_rate, urls_second, total_urls ] self.csv_results_writer.writerow(row) self.csv_results_file.flush() def csv_generate_results_column_headers(self): csv_rx_headers = [ 'Time epoch', 'Time', 'Station-Count', 'Bytes-RD', 'Bytes-WR', 'RX Rate', 'TX Rate', 'URLs/s', 'Total URLs', ] return csv_rx_headers # Write initial headers to csv file. def csv_add_column_headers(self): logger.info("self.csv_results_file: {csv_results_file}".format(csv_results_file=self.csv_results_file)) if self.csv_results_file is not None: self.csv_results_writer.writerow( self.csv_generate_results_column_headers()) self.csv_results_file.flush() def main(): parser = Realm.create_basic_argparse( prog='test_l4', formatter_class=argparse.RawTextHelpFormatter, epilog='''\ This script will monitor the urls/s, bytes-rd, or bytes-wr attribute of the endpoints. ''', description='''\ --------------------------- Layer-4 Test Script - test_l4.py --------------------------- Summary: This script will create stations and endpoints to generate and verify layer-4 traffic by monitoring the urls/s, bytes-rd, or bytes-wr attribute of the endpoints. --------------------------- Generic command example: ./test_l4.py --mgr --upstream_port eth1 --radio wiphy0 --num_stations 3 --security wpa2 --ssid --passwd --test_duration 2m --url "ul http:// /dev/null" --requests_per_ten 600 --test_type bytes-wr --debug --------------------------- ''') parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600) parser.add_argument('--num_tests', help='--num_tests number of tests to run. Each test runs 10 minutes', default=1) parser.add_argument('--url', help='--url specifies upload/download, (Access Point IP), and dest', default="dl http://10.40.0.1 /dev/null") parser.add_argument('--test_duration', help='duration of test', default="2m") parser.add_argument('--target_per_ten', help='--target_per_ten target number of request per ten minutes. test will check for 90 percent this value', default=600) parser.add_argument('--mode', help='Used to force mode of stations') parser.add_argument('--ap', help='Used to force a connection to a particular AP') parser.add_argument('--report_file', help='where you want to store results') parser.add_argument('--output_format', help='choose csv or xlsx') # update once other forms are completed parser.add_argument('--ftp', help='Use ftp for the test', action='store_true') parser.add_argument('--test_type', help='Choose type of test to run {urls, bytes-rd, bytes-wr}', default='bytes-rd') parser.add_argument('--ftp_user', help='--ftp_user sets the username to be used for ftp', default=None) parser.add_argument('--ftp_passwd', help='--ftp_user sets the password to be used for ftp', default=None) parser.add_argument('--dest', help='--dest specifies the destination for the file, should be used when downloading', default="/dev/null") parser.add_argument('--source', help='--source specifies the source of the file, should be used when uploading', default="/var/www/html/data_slug_4K.bin") parser.add_argument('--local_lf_report_dir', help='--local_lf_report_dir override the report path, primary use when running test in test suite', default="") # kpi_csv arguments parser.add_argument( "--test_rig", default="", help="test rig for kpi.csv, testbed that the tests are run on") parser.add_argument( "--test_tag", default="", help="test tag for kpi.csv, test specific information to differenciate the test") parser.add_argument( "--dut_hw_version", default="", help="dut hw version for kpi.csv, hardware version of the device under test") parser.add_argument( "--dut_sw_version", default="", help="dut sw version for kpi.csv, software version of the device under test") parser.add_argument( "--dut_model_num", default="", help="dut model for kpi.csv, model number / name of the device under test") parser.add_argument( "--dut_serial_num", default="", help="dut serial for kpi.csv, serial number / serial number of the device under test") parser.add_argument( "--test_priority", default="", help="dut model for kpi.csv, test-priority is arbitrary number") parser.add_argument( '--csv_outfile', help="--csv_outfile ", default="") args = parser.parse_args() # set up logger logger_config = lf_logger_config.lf_logger_config() if args.lf_logger_config_json: # logger_config.lf_logger_config_json = "lf_logger_config.json" logger_config.lf_logger_config_json = args.lf_logger_config_json logger_config.load_lf_logger_config() # for kpi.csv generation local_lf_report_dir = args.local_lf_report_dir test_rig = args.test_rig test_tag = args.test_tag dut_hw_version = args.dut_hw_version dut_sw_version = args.dut_sw_version dut_model_num = args.dut_model_num dut_serial_num = args.dut_serial_num # test_priority = args.test_priority # this may need to be set per test test_id = args.test_id if local_lf_report_dir != "": report = lf_report.lf_report( _path=local_lf_report_dir, _results_dir_name="test_l4", _output_html="test_l4.html", _output_pdf="test_l4.pdf") else: report = lf_report.lf_report( _results_dir_name="test_l4", _output_html="test_l4.html", _output_pdf="test_l4.pdf") kpi_path = report.get_report_path() # kpi_filename = "kpi.csv" logger.info("kpi_path :{kpi_path}".format(kpi_path=kpi_path)) kpi_csv = lf_kpi_csv.lf_kpi_csv( _kpi_path=kpi_path, _kpi_test_rig=test_rig, _kpi_test_tag=test_tag, _kpi_dut_hw_version=dut_hw_version, _kpi_dut_sw_version=dut_sw_version, _kpi_dut_model_num=dut_model_num, _kpi_dut_serial_num=dut_serial_num, _kpi_test_id=test_id) if args.csv_outfile is not None: current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) csv_outfile = "{}_{}-test_l4.csv".format( args.csv_outfile, current_time) csv_outfile = report.file_add_path(csv_outfile) logger.info("csv output file : {}".format(csv_outfile)) num_sta = 2 if (args.num_stations is not None) and (int(args.num_stations) > 0): num_stations_converted = int(args.num_stations) num_sta = num_stations_converted if args.report_file is None: if args.output_format in ['csv', 'json', 'html', 'hdf', 'stata', 'pickle', 'pdf', 'parquet', 'png', 'df', 'xlsx']: output_form = args.output_format.lower() else: logger.info("Defaulting data file output type to Excel") output_form = 'xlsx' else: if args.output_format is None: output_form = str(args.report_file).split('.')[-1] else: output_form = args.output_format # Create directory if args.report_file is None: if os.path.isdir('/home/lanforge/report-data'): homedir = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + 'test_l4' path = os.path.join('/home/lanforge/report-data/', homedir) logger.info(path) os.mkdir(path) else: path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) logger.info('Saving file to local directory') if args.output_format in ['csv', 'json', 'html', 'hdf', 'stata', 'pickle', 'pdf', 'png', 'df', 'parquet', 'xlsx']: rpt_file = path + '/data.' + args.output_format logger.info(rpt_file) else: logger.info('Defaulting data file output type to Excel') rpt_file = path + '/data.xlsx' logger.info(rpt_file) else: rpt_file = args.report_file logger.info(rpt_file) station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta - 1, padding_number_=10000, radio=args.radio) ip_test = IPV4L4(host=args.mgr, port=args.mgr_port, ssid=args.ssid, password=args.passwd, radio=args.radio, upstream_port=args.upstream_port, security=args.security, station_list=station_list, url=args.url, mode=args.mode, ap=args.ap, outfile=args.csv_outfile, kpi_csv=kpi_csv, ftp=args.ftp, ftp_user=args.ftp_user, ftp_passwd=args.ftp_passwd, source=args.source, dest=args.dest, test_type=args.test_type, _debug_on=args.debug, test_duration=args.test_duration, num_tests=args.num_tests, target_requests_per_ten=args.target_per_ten, requests_per_ten=args.requests_per_ten) ip_test.cleanup(station_list) ip_test.build() ip_test.start() l4_cx_results = {} layer4traffic = ','.join([[*x.keys()][0] for x in ip_test.json_get('layer4')['endpoint']]) ip_test.cx_profile.monitor(col_names=['name', 'bytes-rd', 'urls/s', 'bytes-wr'], report_file=rpt_file, duration_sec=args.test_duration, created_cx=layer4traffic, output_format=output_form, script_name='test_l4', arguments=args, debug=args.debug) temp_stations_list = [] temp_stations_list.extend(ip_test.station_profile.station_names.copy()) logger.info("temp_stations_list: {temp_stations_list}".format(temp_stations_list=temp_stations_list)) total_test = len(ip_test.get_result_list()) total_pass = len(ip_test.get_passed_result_list()) endp_rx_map, endps, total_bytes_rd, total_bytes_wr, total_rx_rate, total_tx_rate, urls_second, total_urls = ip_test.get_rx_values() #endp_rx_map, endp_rx_drop_map, endps, bytes_rd, bytes_wr, rx_rate, tcp_ul, tx_rate, urls_sec, total_urls, total_ul_ll = ip_test.get_rx_values() ip_test.record_kpi_csv(temp_stations_list, total_test, total_pass, total_bytes_rd, total_bytes_wr, total_rx_rate, total_tx_rate, urls_second, total_urls) ip_test.record_results(len(temp_stations_list), total_bytes_rd, total_bytes_wr, total_rx_rate, total_tx_rate, urls_second, total_urls) # ip_test.record_results(len(temp_stations_list), bytes_rd, bytes_wr, rx_rate, tx_rate, urls_sec, total_urls) # Reporting Results (.pdf & .html) csv_results_file = ip_test.get_csv_name() logger.info("csv_results_file: %s", csv_results_file) # csv_results_file = kpi_path + "/" + kpi_filename report.set_title("L4 Test") report.build_banner() report.set_table_title("L4 Test Key Performance Indexes") report.build_table_title() report.set_table_dataframe_from_csv(csv_results_file) report.build_table() report.write_html_with_timestamp() report.write_index_html() # report.write_pdf(_page_size = 'A3', _orientation='Landscape') # report.write_pdf_with_timestamp(_page_size='A4', _orientation='Portrait') report.write_pdf_with_timestamp(_page_size='A4', _orientation='Landscape') is_passing = ip_test.passes() ip_test.stop() # cleanup stations: if not args.no_cleanup: # time.sleep(15) ip_test.cleanup(station_list) if not is_passing: logger.info(ip_test.get_fail_message()) ip_test.exit_fail() if is_passing: logger.info("Full test passed") ip_test.exit_success() if __name__ == "__main__": main()