diff --git a/py-scripts/sandbox/smw_lf_rvr_test.py b/py-scripts/sandbox/smw_lf_rvr_test.py new file mode 100755 index 00000000..776e244d --- /dev/null +++ b/py-scripts/sandbox/smw_lf_rvr_test.py @@ -0,0 +1,576 @@ +#!/usr/bin/env python3 + +""" +NAME: rvr_test.py + +PURPOSE: rvr_test.py will measure the performance of stations over a certain distance of the DUT. Distance is emulated + using programmable attenuators and throughput test is run at each distance/RSSI step. + +EXAMPLE: +python3 rvr_test.py --mgr 192.168.200.21 --mgr_port 8080 --upstream eth1 --num_stations 15 --mode 9 --security wpa2 --ssid ct-523 --password ct-523-ps --radio wiphy3 --atten_serno 84 --atten_idx all --atten_val 10,20,30 --test_duration 1m --ap_model WAC505 --traffic 500 + +Use './rvr_test.py --help' to see command line usage and options +Copyright 2021 Candela Technologies Inc +License: Free to distribute and modify. LANforge systems must be licensed. +""" + +import sys +import os +import pandas as pd + +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit(1) + +if 'py-json' not in sys.path: + sys.path.append(os.path.join(os.path.abspath('..'), 'py-json')) + +import argparse +from LANforge import LFUtils +from realm import Realm +from lf_report import lf_report +from lf_graph import lf_bar_graph +import time +from datetime import datetime, timedelta + + +class RvR(Realm): + def __init__(self, ssid=None, security=None, password="", create_sta=True, name_prefix=None, upstream=None, + host="localhost", port=8080, + mode=0, ap_model="", traffic_type="lf_tcp,lf_udp", traffic_direction="bidirectional", + side_a_min_rate=0, side_a_max_rate=0, + sta_names=None, side_b_min_rate=56, side_b_max_rate=0, number_template="00000", test_duration="2m", + sta_list=[1, 1], + serial_number='2222', indices="all", atten_val="0", traffic=500, radio_list=['wiphy0', 'wiphy3'], + _debug_on=False, _exit_on_error=False, _exit_on_fail=False): + super().__init__(lfclient_host=host, + lfclient_port=port), + self.upstream = upstream + self.host = host + self.port = port + self.ssid = ssid + self.security = security + self.password = password + self.radio = radio_list + self.sta_list = sta_list + self.num_stations = sum(self.sta_list) + self.station_names = sta_names + self.create_sta = create_sta + self.mode = mode + self.ap_model = ap_model + self.traffic_type = traffic_type.split(",") + self.traffic_direction = traffic_direction + self.traffic = traffic + self.number_template = number_template + self.debug = _debug_on + self.name_prefix = name_prefix + self.test_duration = test_duration + self.station_profile = self.new_station_profile() + self.station_profile.lfclient_url = self.lfclient_url + self.station_profile.ssid = self.ssid + self.station_profile.ssid_pass = self.password + self.station_profile.security = self.security + self.station_profile.number_template_ = self.number_template + self.station_profile.debug = self.debug + self.station_profile.mode = mode + self.cx_profile = self.new_l3_cx_profile() + self.cx_profile.host = self.host + self.cx_profile.port = self.port + self.cx_profile.name_prefix = self.name_prefix + self.cx_profile.side_a_min_bps = side_a_min_rate // self.num_stations + self.cx_profile.side_a_max_bps = side_a_max_rate + self.cx_profile.side_b_min_bps = side_b_min_rate // self.num_stations + self.cx_profile.side_b_max_bps = side_b_max_rate + self.attenuator_profile = self.new_attenuator_profile() + self.serial_number = serial_number + self.indices = indices.split(",") + self.atten_values = atten_val + + def initialize_attenuator(self): + self.attenuator_profile.atten_serno = self.serial_number + self.attenuator_profile.atten_idx = "all" + self.attenuator_profile.atten_val = '0' + self.attenuator_profile.mode = None + self.attenuator_profile.pulse_width_us5 = None + self.attenuator_profile.pulse_interval_ms = None, + self.attenuator_profile.pulse_count = None, + self.attenuator_profile.pulse_time_ms = None + self.attenuator_profile.create() + # self.attenuator_profile.show() + + def set_attenuation(self, value): + self.attenuator_profile.atten_serno = self.serial_number + self.attenuator_profile.atten_idx = "all" + self.attenuator_profile.atten_val = str(int(value) * 10) + self.attenuator_profile.create() + # self.attenuator_profile.show() + + def start_l3(self): + if len(self.cx_profile.created_cx) > 0: + self.json_post("/cli-json/clear_cx_counters", {"cx_name": 'all'}) + for cx in self.cx_profile.created_cx.keys(): + req_url = "cli-json/set_cx_report_timer" + data = { + "test_mgr": "all", + "cx_name": cx, + "milliseconds": 1000 + } + self.json_post(req_url, data) + time.sleep(5) + self.cx_profile.start_cx() + print("Monitoring CX's & Endpoints for %s seconds" % self.test_duration) + + def stop_l3(self): + self.cx_profile.stop_cx() + # self.station_profile.admin_down() + + def reset_l3(self): + if len(self.cx_profile.created_cx) > 0: + clear_endp = "cli-json/clear_endp_counters" + data = { + "endp_name": "all" + } + self.json_post(clear_endp, data) + clear_cx = "cli-json/clear_cx_counters" + data = { + "cx_name": "all" + } + self.json_post(clear_cx, data) + + def pre_cleanup(self): + self.cx_profile.cleanup_prefix() + if self.create_sta: + for sta in self.station_names: + self.rm_port(sta, check_exists=True) + + def cleanup(self): + self.cx_profile.cleanup() + if self.create_sta: + self.station_profile.cleanup() + LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, + port_list=self.station_profile.station_names, + debug=self.debug) + + def start_stations(self): + self.station_profile.admin_up() + # check here if upstream port got IP + temp_stations = self.station_profile.station_names.copy() + if self.wait_for_ip(temp_stations): + self._pass("All stations got IPs") + else: + self._fail("Stations failed to get IPs") + self.exit_fail() + self._pass("PASS: Station build finished") + + def build(self): + throughput_dbm = {} + if len(self.traffic_type) == 2: + throughput_dbm = {f"{self.traffic_type[0]}": {}, f"{self.traffic_type[1]}": {}} + elif len(self.traffic_type) == 1: + throughput_dbm = {f"{self.traffic_type[0]}": {}} + upload, download = [], [] + self.station_profile.set_number_template(self.number_template) + self.station_profile.use_security(security_type=self.station_profile.security, + ssid=self.station_profile.ssid, + passwd=self.station_profile.ssid_pass) + print("Creating stations") + self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) + self.station_profile.set_command_param("set_port", "report_timer", 1500) + self.station_profile.set_command_flag("set_port", "rpt_timer", 1) + first, last = 0, self.sta_list[0] + for i in range(len(self.radio)): + if i != 0: + last = last + self.sta_list[i] + print(first, last) + station_names = self.station_names[first:last] + self.station_profile.create(radio=self.radio[i], sta_names_=station_names, debug=self.debug) + first = first + self.sta_list[i] + print(station_names) + self.start_stations() + for traffic in self.traffic_type: + self.cx_profile.create(endp_type=traffic, side_a=self.station_profile.station_names, + side_b=self.upstream, + sleep_time=0) + self.initialize_attenuator() + for val in self.atten_values: + throughput = {'upload': [], 'download': []} + self.set_attenuation(value=val) + self.start_l3() + time.sleep(20) + upload, download = self.monitor() + # self.stop_l3() + self.reset_l3() + throughput['upload'] = upload + throughput['download'] = download + throughput_dbm[''.join(traffic)][f"{val} dB"] = throughput + self.cx_profile.cleanup() + print(throughput_dbm) + return throughput_dbm + + def monitor(self): + throughput, upload, download = {}, [], [] + if (self.test_duration is None) or (int(self.test_duration) <= 1): + raise ValueError("Monitor test duration should be > 1 second") + if self.cx_profile.created_cx is None: + raise ValueError("Monitor needs a list of Layer 3 connections") + # monitor columns + start_time = datetime.now() + end_time = start_time + timedelta(seconds=int(self.test_duration)) + index = -1 + connections = dict.fromkeys(list(self.cx_profile.created_cx.keys()), float(0)) + [(upload.append([]), download.append([])) for i in range(len(self.cx_profile.created_cx))] + while datetime.now() < end_time: + index += 1 + response = list( + self.json_get('/cx/%s?fields=%s' % ( + ','.join(self.cx_profile.created_cx.keys()), ",".join(['bps rx a', 'bps rx b']))).values())[2:] + throughput[index] = list( + map(lambda i: [x for x in i.values()], response)) + time.sleep(1) + # # rx_rate list is calculated + # print("Total rx values are %s", throughput) + for index, key in enumerate(throughput): + for i in range(len(throughput[key])): + upload[i].append(throughput[key][i][0]) + download[i].append(throughput[key][i][1]) + print("Upload values", upload) + print("Download Values", download) + upload_throughput = [float(f"{(sum(i) / 1000000) / len(i): .2f}") for i in upload] + download_throughput = [float(f"{(sum(i) / 1000000) / len(i): .2f}") for i in download] + print("upload: ", upload_throughput) + print("download: ", download_throughput) + return upload_throughput, download_throughput + + def set_report_data(self, data): + res = {} + if data is not None: + res = data + else: + print("No Data found to generate report!") + exit(1) + if self.traffic_type is not None: + if self.traffic_direction == 'upload': + for traffic in self.traffic_type: + for key in res[traffic]: + if 'download' in res[traffic][key]: + res[traffic][key].pop('download') + elif self.traffic_direction == 'download': + for traffic in self.traffic_type: + for key in res[traffic]: + if 'download' in res[traffic][key]: + res[traffic][key].pop('upload') + table_df = {} + num_stations = [] + mode = [] + graph_df = {} + if len(self.traffic_type) == 2: + graph_df = {f"{self.traffic_type[0]}": {}, f"{self.traffic_type[1]}": {}} + elif len(self.traffic_type) == 1: + graph_df = {f"{self.traffic_type[0]}": {}} + # for case in self.traffic_type: + # throughput_df = [] + # for key in res[case]: + # table_df.update({"No of Stations": []}) + # table_df.update({"Mode": []}) + # table_df.update({"Throughput for traffic {}".format(key): []}) + # graph_df.update({case: [throughput_df]}) + # print(throughput) + # table_df.update({"No of Stations": num_stations}) + # table_df.update({"Mode": mode}) + for traffic in self.traffic_type: + dataset, label, color = [], [], [] + direction = "" + if self.traffic_direction == 'upload': + dataset.append([float(f"{sum(res[traffic][i]['upload']):.2f}") for i in res[traffic]]) + label = ['upload'] + color = ['olivedrab'] + direction = "upload" + elif self.traffic_direction == 'download': + dataset.append([float(f"{sum(res[traffic][i]['download']):.2f}") for i in res[traffic]]) + label = ['download'] + color = ['orangered'] + direction = "download" + elif self.traffic_direction == 'bidirectional': + dataset.append([float(f"{sum(res[traffic][i]['upload']):.2f}") for i in res[traffic]]) + dataset.append([float(f"{sum(res[traffic][i]['download']):.2f}") for i in res[traffic]]) + label = ['upload', 'download'] + color = ['olivedrab', 'orangered'] + direction = "upload and download" + graph_df[traffic].update({"dataset": dataset}) + graph_df[traffic].update({"label": label}) + graph_df[traffic].update({"color": color}) + graph_df[traffic].update({"direction": direction}) + # res.update({"throughput_table_df": table_df}) + res.update({"graph_df": graph_df}) + return res + + def generate_report(self, data, test_setup_info, input_setup_info): + res = self.set_report_data(data) + report = lf_report(_output_pdf="rvr_test.pdf", _output_html="rvr_test.html", _results_dir_name="RvR_Test") + report_path = report.get_path() + report_path_date_time = report.get_path_date_time() + print("path: {}".format(report_path)) + print("path_date_time: {}".format(report_path_date_time)) + report.set_title("Rate vs Range") + report.build_banner() + # objective title and description + report.set_obj_html(_obj_title="Objective", + _obj="Through this test we can measure the performance of stations over a certain distance " + "of the DUT, Distance is emulated using programmable attenuators and throughput test " + "is run at each distance/RSSI step") + report.build_objective() + report.test_setup_table(test_setup_data=test_setup_info, value="Device Under Test") + # report.set_table_title( + # "Overall download Throughput for different attenuation") + # report.build_table_title() + # df_throughput = pd.DataFrame(res["throughput_table_df"]) + # report.set_table_dataframe(df_throughput) + # report.build_table() + print(res) + for traffic_type in res["graph_df"]: + report.set_obj_html( + _obj_title="Overall {} throughput for {} clients using {} traffic.".format(res["graph_df"] + [traffic_type]["direction"], + len(self.station_names), + traffic_type), + _obj="The below graph represents overall {} throughput for different attenuation (RSSI) ".format( + res["graph_df"][traffic_type]["direction"])) + report.build_objective() + graph = lf_bar_graph(_data_set=res["graph_df"][traffic_type]["dataset"], + _xaxis_name="Attenuation", + _yaxis_name="Throughput(in Mbps)", + _xaxis_categories=[str(traffic_type) for traffic_type in res[traffic_type].keys()], + _graph_image_name=f"rvr_{traffic_type}_{self.traffic_direction}", + _label=res["graph_df"][traffic_type]["label"], + _color=res["graph_df"][traffic_type]["color"], + _color_edge='grey', + _xaxis_step=1, + _graph_title="Overall throughput vs attenuation", + _title_size=16, + _bar_width=0.15, + _figsize=(18, 6), + _legend_loc="best", + _legend_box=(1.0, 1.0), + _dpi=96, + _show_bar_value=True, + _enable_csv=True) + graph_png = graph.build_bar_graph() + + print("graph name {}".format(graph_png)) + + report.set_graph_image(graph_png) + # need to move the graph image to the results directory + report.move_graph_image() + report.set_csv_filename(graph_png) + report.move_csv_file() + report.build_graph() + self.generate_individual_graphs(report, res) + report.test_setup_table(test_setup_data=input_setup_info, value="Information") + report.build_custom() + report.build_footer() + report.write_html() + report.write_pdf() + + def generate_individual_graphs(self, report, res): + if len(res.keys()) > 0: + if "graph_df" in res: + res.pop("graph_df") + for traffic_type in res: + for attenuation in res[traffic_type]: + for direction in res[traffic_type][attenuation]: + report.set_obj_html( + _obj_title=f"Individual {direction} Throughput for {len(self.station_names)} clients using {traffic_type} traffic over {attenuation} attenuation", + _obj=f"The below graph represents Individual {direction} throughput of all stations when attenuation (RSSI) set to {attenuation}") + report.build_objective() + graph = lf_bar_graph(_data_set=[res[traffic_type][attenuation][direction]], + _xaxis_name="No.of Stations", + _yaxis_name="Throughput(in Mbps)", + _xaxis_categories=[str(i + 1) for i in range(len(self.station_names))], + _graph_image_name=f"rvr_{traffic_type}_{attenuation}_{direction}", + _label=['upload' if direction == 'upload' else + 'download'], + _color=['olivedrab' if direction == 'upload' else 'orangered'], + _color_edge='grey', + _xaxis_step=1, + _graph_title=f"Individual throughput with {attenuation} attenuation", + _title_size=16, + _bar_width=0.15, + _figsize=(18, 6), + _legend_loc="best", + _legend_box=(1.0, 1.0), + _dpi=96, + _show_bar_value=True, + _enable_csv=True) + graph_png = graph.build_bar_graph() + + print("graph name {}".format(graph_png)) + + report.set_graph_image(graph_png) + # need to move the graph image to the results directory + report.move_graph_image() + report.set_csv_filename(graph_png) + report.move_csv_file() + report.build_graph() + + +def main(): + parser = argparse.ArgumentParser(description='''\ + rvr_test.py: + -------------------- + Generic command layout: + ===================================================================== + sudo python3 rvr_test.py --mgr localhost --mgr_port 8080 --upstream eth1 --num_stations 40 + --security wpa2 --ssid NETGEAR73-5G --password fancylotus986 --radio wiphy3 --atten_serno 2222 --atten_idx all + --atten_val 10 --test_duration 1m --ap_model WAX610 --traffic 100''', allow_abbrev=False) + optional = parser.add_argument_group('optional arguments') + required = parser.add_argument_group('required arguments') + optional.add_argument('--mgr', help='hostname for where LANforge GUI is running', default='localhost') + optional.add_argument('--mgr_port', help='port LANforge GUI HTTP service is running on', default=8080) + optional.add_argument('--upstream', help='non-station port that generates traffic: ., ' + 'e.g: 1.eth1', default='eth1') + optional.add_argument('--mode', help='used to force mode of stations', default="0") + required.add_argument('--radio_list', help='radio to use on which clients gets created', default=['wiphy0', 'wiphy3']) + required.add_argument('--sta_list', help='radio to use on which clients gets created', default=[1, 1]) + required.add_argument('--ssid', help="ssid for client association with Access Point", required=True) + required.add_argument('--security', help="security type of ssid, ex: wpa || wpa2 || wpa3 || open", required=True) + required.add_argument('--password', help="password of ssid", required=True) + required.add_argument('--traffic_type', help='provide the traffic Type lf_udp, lf_tcp', default='lf_tcp') + optional.add_argument('--traffic_direction', help='Traffic direction i.e upload or download or bidirectional', + default="bidirectional") + required.add_argument('--traffic', help='traffic to be created for the given number of clients (in Mbps)', + required=True) + required.add_argument('--test_duration', help='sets the duration of the test ex: 2s --> two seconds || 2m ' + '--> two minutes || 2h --> two hours', required=True) + optional.add_argument('--create_sta', help="used to create stations if you do not prefer existing stations", + default=True) + optional.add_argument('--sta_names', + help='used to provide existing station names from the port manager, prefer only if ' + 'create_sta is False', + default="sta0000") + optional.add_argument('--ap_model', help="AP Model Name", default="Test-AP") + # required.add_argument('--num_stations', help='number of stations to create, works only if create_sta is True', + # required=True) + optional.add_argument('-as', '--atten_serno', help='Serial number for requested Attenuator', default='2222') + optional.add_argument('-ai', '--atten_idx', + help='Attenuator index eg. For module 1 = 0,module 2 = 1 --> --atten_idx 0,1', + default='all') + optional.add_argument('-av', '--atten_val', + help='Requested attenuation in dB ex:--> --atten_val 0, 10', default='0') + optional.add_argument('--debug', help="to enable debug", default=False) + args = parser.parse_args() + test_start_time = datetime.now().strftime("%b %d %H:%M:%S") + print("Test started at ", test_start_time) + print(parser.parse_args()) + if args.test_duration.endswith('s') or args.test_duration.endswith('S'): + args.test_duration = abs(int(float(args.test_duration[0:-1]))) + elif args.test_duration.endswith('m') or args.test_duration.endswith('M'): + args.test_duration = abs(int(float(args.test_duration[0:-1]) * 60)) + elif args.test_duration.endswith('h') or args.test_duration.endswith('H'): + args.test_duration = abs(int(float(args.test_duration[0:-1]) * 60 * 60)) + elif args.test_duration.endswith(''): + args.test_duration = abs(int(float(args.test_duration))) + + if not isinstance(args.radio_list, list): + if isinstance(args.radio_list, str): + print(args.radio_list) + args.radio_list = args.radio_list.split(",") + print(args.radio_list) + else: + raise TypeError("radio_list should be a list or a string of radio names separated with ','") + + if not isinstance(args.sta_list, list): + if isinstance(args.sta_list, str): + print(args.sta_list) + args.sta_list = args.sta_list.split(",") + args.sta_list = [int(x) for x in args.sta_list] + print(args.sta_list) + else: + raise TypeError("sta_list should be a list of no. of stations or a string of no. of stations separated " + "with ','") + + if len(args.radio_list) != len(args.sta_list): + raise AttributeError("list of radio names and list of sta_list should be equal in number") + + if args.atten_val: + if args.atten_val.split(',')[0] != '0': + temp = ['0'] + temp.extend(args.atten_val.split(',')) + args.atten_val = temp + else: + args.atten_val = args.atten_val.split(',') + + if args.traffic is not None and int(args.traffic) < 0: + raise ValueError("Traffic should be greater than 0 Mbps") + + side_a, side_b = 25, 25 + if args.traffic_direction == "upload": + side_a = 0 + side_b = abs(int(float(args.traffic) * 1000000)) + elif args.traffic_direction == "download": + side_a = abs(int(float(args.traffic) * 1000000)) + side_b = 0 + elif args.traffic_direction == "bidirectional": + side_a = abs(int(float(args.traffic) * 1000000)) + side_b = abs(int(float(args.traffic) * 1000000)) + + if args.create_sta: + station_list = [] + first, last = 0, args.sta_list[0] + for i in range(len(args.radio_list)): + if i != 0: + last = last + args.sta_list[i] + station_list.extend(LFUtils.portNameSeries(prefix_="sta", start_id_=first, end_id_=abs(last) - 1, padding_number_=10000, radio=args.radio_list[i])) + first = first + args.sta_list[i] + print(station_list) + else: + station_list = args.sta_names.split(",") + + rvr_obj = RvR(host=args.mgr, + port=args.mgr_port, + number_template="0000", + sta_names=station_list, + create_sta=args.create_sta, + sta_list=args.sta_list, + name_prefix="RvR-", + upstream=args.upstream, + radio_list=args.radio_list, + ssid=args.ssid, + password=args.password, + security=args.security, + test_duration=args.test_duration, + traffic=abs(int(args.traffic)), + side_a_min_rate=side_a, + side_b_min_rate=side_b, + mode=args.mode, + ap_model=args.ap_model, + serial_number=args.atten_serno, + indices=args.atten_idx, + atten_val=args.atten_val, + traffic_type=args.traffic_type, + traffic_direction=args.traffic_direction, + _debug_on=args.debug) + + rvr_obj.pre_cleanup() + data = rvr_obj.build() + rvr_obj.cleanup() + + test_end_time = datetime.now().strftime("%b %d %H:%M:%S") + print("Test ended at: ", test_end_time) + + test_setup_info = { + "AP Model": rvr_obj.ap_model, + "Number of Stations": rvr_obj.num_stations, + "SSID": rvr_obj.ssid, + "Intended traffic": f"{rvr_obj.traffic} Mbps", + "Test Duration": datetime.strptime(test_end_time, "%b %d %H:%M:%S") - datetime.strptime( + test_start_time, "%b %d %H:%M:%S") + } + + input_setup_info = { + "contact": "support@candelatech.com" + } + rvr_obj.generate_report(data=data, test_setup_info=test_setup_info, input_setup_info=input_setup_info) + + +if __name__ == "__main__": + main() \ No newline at end of file