diff --git a/lf_libs/__init__.py b/lf_libs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py new file mode 100644 index 00000000..e69de29b diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py new file mode 100644 index 00000000..bf437a3d --- /dev/null +++ b/lf_libs/lf_tests.py @@ -0,0 +1,2127 @@ +######################################################################################################### +# Used by Nightly_Sanity +# This has different types of old_pytest like Single client connectivity, Single_Client_EAP, testrail_retest +######################################################################################################### +import datetime +import sys +import os +import time +import datetime +from datetime import datetime +import threading + +import allure +import pytest +import importlib + +sys.path.append( + os.path.dirname( + os.path.realpath(__file__) + ) +) +if "libs" not in sys.path: + sys.path.append(f'../libs') +for folder in 'py-json', 'py-scripts': + if folder not in sys.path: + sys.path.append(f'../../lanforge/lanforge-scripts/{folder}') + +sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity") + +sys.path.append(f'../libs') +sys.path.append(f'../tools') +sys.path.append(f'../libs/lanforge/') +sys.path.append(os.path.join(os.path.abspath("../../../lanforge/lanforge-scripts/"))) +from sta_connect2 import StaConnect2 +import time +import string +import random +import csv +from datetime import datetime +from pull_report import Report +from scp_util import SCP_File +import pyshark as ps + +S = 12 +# from eap_connect import EAPConnect +from test_ipv4_ttls import TTLSTest +from lf_wifi_capacity_test import WiFiCapacityTest +from create_station import CreateStation +import lf_ap_auto_test +import lf_dataplane_test +from lf_dataplane_test import DataplaneTest +from lf_rx_sensitivity_test import RxSensitivityTest +from lf_ap_auto_test import ApAutoTest +from csv_to_influx import CSVtoInflux +# from influx import RecordInflux +from lf_multipsk import MultiPsk +from lf_rvr_test import RvrTest +from attenuator_serial import AttenuatorSerial +from lf_atten_mod_test import CreateAttenuator +from lf_mesh_test import MeshTest +from LANforge.lfcli_base import LFCliBase +from lf_tr398_test import TR398Test +from lf_pcap import LfPcap +from sta_scan_test import StaScan +from lf_sniff_radio import SniffRadio + +cv_test_reports = importlib.import_module("py-json.cv_test_reports") +lf_report = cv_test_reports.lanforge_reports +realm = importlib.import_module("py-json.realm") +cv_test_reports = importlib.import_module("py-json.cv_test_reports") +lf_report = cv_test_reports.lanforge_reports +Realm = realm.Realm +from LANforge import LFUtils +from lf_cleanup import lf_clean +from wifi_monitor_profile import WifiMonitor +from sta_scan_test import StaScan +from lf_sniff_radio import SniffRadio +cv_test_reports = importlib.import_module("py-json.cv_test_reports") +lf_report = cv_test_reports.lanforge_reports +from lf_pcap import LfPcap +from lf_hard_roam_test import HardRoam +from lf_csv import lf_csv + + +@allure.step +def nested_step_allure(bssid, rssi): + pass + + + +class RunTest: + + def __init__(self, configuration_data=None, local_report_path="../reports/", influx_params=None, run_lf=False, debug=False, skip_pcap=False): + if "type" in configuration_data['traffic_generator']["details"].keys(): + if configuration_data['traffic_generator']["details"]["type"] == "mesh": + self.lanforge_ip = configuration_data['traffic_generator']['details']["ip"] + self.lanforge_port = configuration_data['traffic_generator']['details']["port"] + self.ssh_port = configuration_data['traffic_generator']['details']["ssh_port"] + self.upstream_port_1 = configuration_data['traffic_generator']['details']["upstream-mobile-sta"] + self.upstream_port_2 = configuration_data['traffic_generator']['details']["upstream-root"] + self.upstream_port_3 = configuration_data['traffic_generator']['details']["upstream-node-1"] + self.upstream_port_4 = configuration_data['traffic_generator']['details']["upstream-node-2"] + self.uplink_port_1 = configuration_data['traffic_generator']['details']["uplink-mobile-sta"] + self.uplink_port_2 = configuration_data['traffic_generator']['details']["uplink-root"] + self.uplink_port_3 = configuration_data['traffic_generator']['details']["uplink--node-1"] + self.uplink_port_4 = configuration_data['traffic_generator']['details']["uplink--node-2"] + self.upstream_resource_1 = self.upstream_port_1.split(".")[0] + "." + self.upstream_port_1.split(".")[1] + self.upstream_resource_2 = self.upstream_port_2.split(".")[0] + "." + self.upstream_port_2.split(".")[1] + self.upstream_resource_3 = self.upstream_port_3.split(".")[0] + "." + self.upstream_port_3.split(".")[1] + self.upstream_resource_4 = self.upstream_port_4.split(".")[0] + "." + self.upstream_port_4.split(".")[1] + self.uplink_resource_1 = self.uplink_port_1.split(".")[0] + "." + self.uplink_port_1.split(".")[1] + self.uplink_resource_2 = self.uplink_port_2.split(".")[0] + "." + self.uplink_port_2.split(".")[1] + self.uplink_resource_3 = self.uplink_port_3.split(".")[0] + "." + self.uplink_port_3.split(".")[1] + self.uplink_resource_4 = self.uplink_port_4.split(".")[0] + "." + self.uplink_port_4.split(".")[1] + self.upstream_subnet = configuration_data['traffic_generator']['details']["upstream_subnet-mobile-sta"] + self.lf_ssh_port = configuration_data['traffic_generator']['details']["ssh_port"] + print("hi", self.lanforge_port) + self.local_report_path = local_report_path + + else: + self.lanforge_ip = configuration_data['traffic_generator']['details']["ip"] + self.lanforge_port = configuration_data['traffic_generator']['details']["port"] + self.lanforge_ssh_port = configuration_data['traffic_generator']['details']["ssh_port"] + self.twog_radios = configuration_data['traffic_generator']['details']["2.4G-Radio"] + self.fiveg_radios = configuration_data['traffic_generator']['details']["5G-Radio"] + self.ax_radios = configuration_data['traffic_generator']['details']["AX-Radio"] + self.upstream_port = configuration_data['traffic_generator']['details']["upstream"].split(".")[2] + self.upstream = configuration_data['traffic_generator']['details']["upstream"] + self.upstream_resource = configuration_data['traffic_generator']['details']["upstream"].split(".")[1] + self.twog_prefix = configuration_data['traffic_generator']['details']["2.4G-Station-Name"] + self.fiveg_prefix = configuration_data['traffic_generator']['details']["5G-Station-Name"] + self.ax_prefix = configuration_data['traffic_generator']['details']["AX-Station-Name"] + self.debug = debug + self.run_lf = run_lf + self.skip_pcap = skip_pcap + if self.run_lf: + self.ssid_data = configuration_data['access_point'][0]['ssid'] + self.lf_ssh_port = configuration_data['traffic_generator']['details']["ssh_port"] + self.staConnect = None + self.dataplane_obj = None + self.rx_sensitivity_obj = None + self.dualbandptest_obj = None + self.msthpt_obj = None + self.cvtest_obj = None + self.pcap_obj = None + self.influx_params = influx_params + # self.influxdb = RecordInflux(_influx_host=influx_params["influx_host"], + # _influx_port=influx_params["influx_port"], + # _influx_org=influx_params["influx_org"], + # _influx_token=influx_params["influx_token"], + # _influx_bucket=influx_params["influx_bucket"]) + self.local_report_path = local_report_path + if not os.path.exists(self.local_report_path): + os.mkdir(self.local_report_path) + + def Client_Connectivity(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[], + station_name=[], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None): + """SINGLE CLIENT CONNECTIVITY using test_connect2.py""" + self.staConnect = StaConnect2(self.lanforge_ip, self.lanforge_port, debug_=self.debug) + + self.staConnect.sta_mode = 0 + self.staConnect.upstream_resource = self.upstream_resource + if mode == "BRIDGE": + self.staConnect.upstream_port = self.upstream_port + elif mode == "NAT": + self.staConnect.upstream_port = self.upstream_port + else: + self.staConnect.upstream_port = self.upstream_port + "." + str(vlan_id) + if band == "twog": + if self.run_lf: + ssid = self.ssid_data["2g-ssid"] + passkey = self.ssid_data["2g-password"] + security = self.ssid_data["2g-encryption"].lower() + print(ssid) + self.staConnect.radio = self.twog_radios[0] + self.staConnect.admin_down(self.staConnect.radio) + self.staConnect.admin_up(self.staConnect.radio) + self.staConnect.sta_prefix = self.twog_prefix + if band == "fiveg": + if self.run_lf: + ssid = self.ssid_data["5g-ssid"] + passkey = self.ssid_data["5g-password"] + security = self.ssid_data["5g-encryption"].lower() + self.staConnect.radio = self.fiveg_radios[0] + self.staConnect.reset_port(self.staConnect.radio) + self.staConnect.sta_prefix = self.fiveg_prefix + self.set_radio_channel(radio=self.staConnect.radio, channel=ssid_channel) + print("scan ssid radio", self.staConnect.radio.split(".")[2]) + self.data_scan_ssid = self.scan_ssid(radio=self.staConnect.radio.split(".")[2]) + print("ssid scan data :- ", self.data_scan_ssid) + result = self.check_ssid_available_scan_result(scan_ssid_data=self.data_scan_ssid, ssid=ssid) + print("ssid available:-", result) + if not result and ssid_channel: + if not self.skip_pcap: + print("sniff radio", self.ax_radios[0].split(".")[2]) + self.start_sniffer(radio_channel=ssid_channel, radio=self.ax_radios[0].split(".")[2], duration=30) + time.sleep(30) + self.stop_sniffer() + print("ssid not available in scan result") + return "FAIL", "ssid not available in scan result" + self.staConnect.resource = 1 + self.staConnect.dut_ssid = ssid + self.staConnect.dut_passwd = passkey + self.staConnect.dut_security = security + self.staConnect.station_names = station_name + self.staConnect.runtime_secs = 40 + self.staConnect.bringup_time_sec = 80 + self.staConnect.cleanup_on_exit = True + data_table = "" + dict_table = {} + self.staConnect.setup(extra_securities=extra_securities) + for sta_name in self.staConnect.station_names: + try: + sta_url = self.staConnect.get_station_url(sta_name) + station_info = self.staConnect.json_get(sta_url) + dict_data = station_info["interface"] + dict_table[""] = list(dict_data.keys()) + dict_table["Before"] = list(dict_data.values()) + except Exception as e: + print(e) + if ssid_channel: + if not self.skip_pcap: + print("sniff radio", self.ax_radios[0].split(".")[2]) + self.start_sniffer(radio_channel=ssid_channel, radio=self.ax_radios[0].split(".")[2], duration=30) + self.staConnect.start() + print("napping %f sec" % self.staConnect.runtime_secs) + time.sleep(self.staConnect.runtime_secs) + report_obj = Report() + for sta_name in self.staConnect.station_names: + try: + sta_url = self.staConnect.get_station_url(sta_name) + station_info = self.staConnect.json_get(sta_url) + self.station_ip = station_info["interface"]["ip"] + dict_data = station_info["interface"] + dict_table["After"] = list(dict_data.values()) + try: + data_table = report_obj.table2(table=dict_table, headers='keys') + except Exception as e: + print(e) + allure.attach(name=str(sta_name), body=data_table) + except Exception as e: + print(e) + self.staConnect.stop() + run_results = self.staConnect.get_result_list() + if not self.staConnect.passes(): + if self.debug: + for result in run_results: + print("test result: " + result) + pytest.exit("Test Failed: Debug True") + self.staConnect.cleanup() + try: + supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + self.staConnect.radio.split(".")[2] + ".txt" + obj = SCP_File(ip=self.lanforge_ip, port=self.lanforge_ssh_port, username="root", password="lanforge", + remote_path=supplicant, + local_path=".") + obj.pull_file() + allure.attach.file(source="wpa_supplicant_log_" + self.staConnect.radio.split(".")[2] + ".txt", + name="supplicant_log") + except Exception as e: + print(e) + + for result in run_results: + print("test result: " + result) + result = "PASS" + description = "Unknown error" + dict_table = {} + print("Client Connectivity :", self.staConnect.passes) + endp_data = [] + for i in self.staConnect.resulting_endpoints: + endp_data.append(self.staConnect.resulting_endpoints[i]["endpoint"]) + dict_table["key"] = [i for s in [d.keys() for d in endp_data] for i in s] + dict_table["value"] = [i for s in [d.values() for d in endp_data] for i in s] + data_table = report_obj.table2(table=dict_table, headers='keys') + allure.attach(name="cx_data", body=data_table) + for i in range(len(run_results)): + if i == 0: + if "FAILED" in run_results[i]: + result = "FAIL" + description = "Station did not get an ip" + break + else: + if "FAILED" in run_results[i]: + result = "FAIL" + description = "did not report traffic" + + if self.staConnect.passes(): + print("client connection to", self.staConnect.dut_ssid, "successful. Test Passed") + result = "PASS" + else: + print("client connection to", self.staConnect.dut_ssid, "unsuccessful. Test Failed") + result = "FAIL" + time.sleep(3) + if ssid_channel: + if not self.skip_pcap: + self.stop_sniffer() + self.set_radio_channel(radio=self.staConnect.radio, channel="AUTO") + return result, description + + def EAP_Connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", extra_securities=[], + mode="BRIDGE", band="twog", vlan_id=100, + station_name=[], key_mgmt="WPA-EAP", + pairwise="NA", group="NA", wpa_psk="DEFAULT", + ttls_passwd="nolastart", ieee80211w=1, + wep_key="NA", ca_cert="NA", eap="TTLS", identity="nolaradius", d_vlan=False, cleanup=True, + ssid_channel=None): + self.eap_connect = TTLSTest(host=self.lanforge_ip, port=self.lanforge_port, + sta_list=station_name, vap=False, _debug_on=self.debug) + + self.eap_connect.station_profile.sta_mode = 0 + self.eap_connect.upstream_resource = self.upstream_resource + self.eap_connect.l3_cx_obj_udp.upstream_resource = self.upstream_resource + self.eap_connect.l3_cx_obj_tcp.upstream_resource = self.upstream_resource + if mode == "BRIDGE": + self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port + self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port + elif mode == "NAT": + self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port + self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port + else: + self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port + "." + str(vlan_id) + self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port + "." + str(vlan_id) + if band == "twog": + if self.run_lf: + ssid = self.ssid_data["2g-ssid"] + passkey = self.ssid_data["2g-password"] + security = self.ssid_data["2g-encryption"] + self.eap_connect.radio = self.twog_radios[0] + self.eap_connect.admin_down(self.eap_connect.radio) + self.eap_connect.admin_up(self.eap_connect.radio) + # self.eap_connect.sta_prefix = self.twog_prefix + if band == "fiveg": + if self.run_lf: + ssid = self.ssid_data["5g-ssid"] + passkey = self.ssid_data["5g-password"] + security = self.ssid_data["5g-encryption"] + self.eap_connect.radio = self.fiveg_radios[0] + self.eap_connect.admin_down(self.eap_connect.radio) + self.eap_connect.admin_up(self.eap_connect.radio) + # self.eap_connect.sta_prefix = self.fiveg_prefix + # self.eap_connect.resource = 1 + self.set_radio_channel(radio=self.eap_connect.radio, channel=ssid_channel) + print("scan ssid radio", self.eap_connect.radio.split(".")[2]) + self.data_scan_ssid = self.scan_ssid(radio=self.eap_connect.radio.split(".")[2]) + print("ssid scan data :- ", self.data_scan_ssid) + result = self.check_ssid_available_scan_result(scan_ssid_data=self.data_scan_ssid, ssid=ssid) + print("ssid available:-", result) + if not result and ssid_channel: + if not self.skip_pcap: + print("sniff radio", self.ax_radios[0].split(".")[2]) + self.start_sniffer(radio_channel=ssid_channel, radio=self.ax_radios[0].split(".")[2], duration=30) + time.sleep(30) + self.stop_sniffer() + print("ssid not available in scan result") + return "FAIL", "ssid not available in scan result" + if eap == "TTLS": + self.eap_connect.ieee80211w = ieee80211w + self.eap_connect.key_mgmt = key_mgmt + self.eap_connect.station_profile.set_command_flag("add_sta", "80211u_enable", 0) + self.eap_connect.identity = identity + self.eap_connect.ttls_passwd = ttls_passwd + self.eap_connect.pairwise = pairwise + self.eap_connect.group = group + if eap == "TLS": + self.eap_connect.key_mgmt = key_mgmt + self.eap_connect.station_profile.set_command_flag("add_sta", "80211u_enable", 0) + self.eap_connect.eap = eap + self.eap_connect.identity = "user" + self.eap_connect.ttls_passwd = "password" + self.eap_connect.private_key = "/home/lanforge/client.p12" + self.eap_connect.ca_cert = "/home/lanforge/ca.pem" + self.eap_connect.pk_passwd = "whatever" + self.eap_connect.ieee80211w = 1 + + # self.eap_connect.hs20_enable = False + self.eap_connect.ssid = ssid + self.eap_connect.password = passkey + self.eap_connect.security = security + self.eap_connect.sta_list = station_name + self.eap_connect.build(extra_securities=extra_securities) + dict_table = {} + report_obj = Report() + data_table = "" + for sta_name in station_name: + try: + station_info = self.eap_connect.json_get("port/1/1/" + sta_name) + dict_data = station_info["interface"] + dict_table[""] = list(dict_data.keys()) + dict_table["Before"] = list(dict_data.values()) + except Exception as e: + print(e) + if ssid_channel: + if not self.skip_pcap: + print("sniff radio", self.ax_radios[0].split(".")[2]) + self.start_sniffer(radio_channel=ssid_channel, radio=self.ax_radios[0].split(".")[2], duration=30) + self.eap_connect.start(station_name, True, True) + if d_vlan: + self.station_ip = {} + for sta_name in station_name: + # try: + station_data_str = "" + # sta_url = self.eap_connect.get_station_url(sta_name) + # station_info = self.eap_connect.json_get(sta_url) + station_info = self.eap_connect.json_get("port/1/1/" + sta_name) + + if d_vlan: + if "ip" in station_info["interface"].keys(): + self.station_ip[sta_name] = station_info["interface"]["ip"] + dict_data = station_info["interface"] + dict_table["After"] = list(dict_data.values()) + try: + data_table = report_obj.table2(table=dict_table, headers='keys') + except Exception as e: + print(e) + allure.attach(name=str(sta_name), body=data_table) + # except Exception as e: + # print(e) + + self.eap_connect.stop() + run_results = self.eap_connect.get_result_list()[1:] + try: + supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + self.eap_connect.radio.split(".")[2] + ".txt" + obj = SCP_File(ip=self.lanforge_ip, port=self.lanforge_ssh_port, username="root", password="lanforge", + remote_path=supplicant, + local_path=".") + obj.pull_file() + allure.attach.file(source="wpa_supplicant_log_" + self.eap_connect.radio.split(".")[2] + ".txt", + name="supplicant_log") + except Exception as e: + print(e) + + if not self.eap_connect.passes(): + if self.debug: + #print("test result: " + self.eap_connect.passes()) + pytest.exit("Test Failed: Debug True") + endp_data = [] + result = "PASS" + description = "Unknown error" + dict_table = {} + for i in self.eap_connect.resulting_endpoints: + endp_data.append(self.eap_connect.resulting_endpoints[i]["endpoint"]) + # finding all keys and values + dict_table["key"] = [i for s in [d.keys() for d in endp_data] for i in s] + dict_table["value"] = [i for s in [d.values() for d in endp_data] for i in s] + data_table = report_obj.table2(table=dict_table, headers='keys') + allure.attach(name="cx_data", body=data_table) + for i in range(len(run_results)): + if i == 0: + if "FAILED" in run_results[i]: + result = "FAIL" + description = "Station did not get an ip" + break + else: + if "FAILED" in run_results[i]: + result = "FAIL" + description = "did not report traffic" + if cleanup: + self.eap_connect.cleanup(station_name) + if self.eap_connect.passes(): + result = "PASS" + else: + result = "FAIL" + if ssid_channel: + if not self.skip_pcap: + self.stop_sniffer() + self.set_radio_channel(radio=self.eap_connect.radio, channel="AUTO") + return result, description + + def wifi_capacity(self, mode="BRIDGE", vlan_id=100, batch_size="1,5,10,20,40,64,128", + instance_name="wct_instance", download_rate="1Gbps", influx_tags="", + upload_rate="1Gbps", protocol="TCP-IPv4", duration="60000", stations="", create_stations=True, + sort="interleave", raw_lines=[], sets=[], move_to_influx=False): + instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) + if mode == "BRIDGE": + upstream_port = self.upstream_port + elif mode == "NAT": + upstream_port = self.upstream_port + elif mode == "VLAN": + upstream_port = self.upstream_port + "." + str(vlan_id) + '''SINGLE WIFI CAPACITY using lf_wifi_capacity.py''' + wificapacity_obj = WiFiCapacityTest(lfclient_host=self.lanforge_ip, + lf_port=self.lanforge_port, + ssh_port=self.lf_ssh_port, + lf_user="lanforge", + lf_password="lanforge", + local_lf_report_dir=self.local_report_path, + instance_name=instance_name, + config_name="wifi_config", + upstream="1.1." + upstream_port, + batch_size=batch_size, + loop_iter="1", + protocol=protocol, + duration=duration, + pull_report=True, + load_old_cfg=False, + upload_rate=upload_rate, + download_rate=download_rate, + sort=sort, + stations=stations, + create_stations=create_stations, + radio=None, + security=None, + paswd=None, + ssid=None, + enables=[], + disables=[], + raw_lines=raw_lines, + raw_lines_file="", + test_tag=influx_tags, + sets=sets) + wificapacity_obj.setup() + wificapacity_obj.run() + if move_to_influx: + report_name = "../reports/" + wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + + influx.glob() + return wificapacity_obj + + def Client_Connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog", + vlan_id=100, + station_name=[], scan_ssid=True): + if band == "twog": + if self.run_lf: + ssid = self.ssid_data["2g-ssid"] + passkey = self.ssid_data["2g-password"] + security = self.ssid_data["2g-encryption"].lower() + if band == "fiveg": + if self.run_lf: + ssid = self.ssid_data["5g-ssid"] + passkey = self.ssid_data["5g-password"] + security = self.ssid_data["5g-encryption"].lower() + self.client_connect = CreateStation(_host=self.lanforge_ip, _port=self.lanforge_port, + _sta_list=station_name, _password=passkey, _ssid=ssid, _security=security) + self.client_connect.station_profile.sta_mode = 0 + self.client_connect.upstream_resource = self.upstream_resource + if mode == "BRIDGE": + self.client_connect.upstream_port = self.upstream_port + elif mode == "NAT": + self.client_connect.upstream_port = self.upstream_port + else: + self.client_connect.upstream_port = self.upstream_port + "." + str(vlan_id) + if band == "twog": + self.client_connect.radio = self.twog_radios[0] + # self.client_connect.sta_prefix = self.twog_prefix + if band == "fiveg": + self.client_connect.radio = self.fiveg_radios[0] + if band == "ax": + self.client_connect.radio = self.ax_radios[0] + print("scan ssid radio", self.client_connect.radio.split(".")[2]) + if scan_ssid: + self.data_scan_ssid = self.scan_ssid(radio=self.client_connect.radio.split(".")[2]) + print("ssid scan data :- ", self.data_scan_ssid) + self.client_connect.build() + result = self.client_connect.wait_for_ip(station_list=station_name, timeout_sec=100) + #print(self.client_connect.wait_for_ip(station_name)) + print(result) + if result: + self.client_connect._pass("ALL Stations got IP's", print_=True) + return self.client_connect + else: + return False + + def allure_report_table_format(self, dict_data=None, key=None, value=None, name=None):#, value_on_same_table=True): + # for i in range(len(name)): + report_obj = Report() + data_table, dict_table = "", {} + # if value_on_same_table: + dict_table[key] = list(dict_data.keys()) + # for i in range(len(dict_data)): + # if value_on_same_table == False: + # dict_table[key[i]] = list(dict_data[i].keys()) + dict_table[value] = list(dict_data.values()) + try: + data_table = report_obj.table2(table=dict_table, headers='keys') + except Exception as e: + print(e) + if name != None: + allure.attach(name=name, body=str(data_table)) + + def attach_stationdata_to_allure(self, station_name=[], name=""): + self.sta_url_map = None + for sta_name_ in station_name: + if sta_name_ is None: + raise ValueError("get_station_url wants a station name") + if self.sta_url_map is None: + self.sta_url_map = {} + for sta_name in station_name: + self.sta_url_map[sta_name] = "port/1/%s/%s" % (str(1), sta_name) + print(self.sta_url_map) + + for sta_name in station_name: + try: + station_data_str = "" + # sta_url = self.staConnect.get_station_url(sta_name) + cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, ) + station_info = cli_base.json_get(_req_url=self.sta_url_map[sta_name]) + print("station info", station_info) + for i in station_info["interface"]: + try: + station_data_str = station_data_str + i + " : " + str(station_info["interface"][i]) + "\n" + except Exception as e: + print(e) + print("sta name", sta_name) + if name == "": + allure.attach(name=f"{sta_name} info", body=str(station_data_str)) + else: + allure.attach(name=name, body=str(station_data_str)) + except Exception as e: + print(e) + + def Client_Connect_Using_Radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", + vlan_id=100, radio=None, sta_mode=0, + station_name=[]): + self.client_connect = CreateStation(_host=self.lanforge_ip, _port=self.lanforge_port, _mode=sta_mode, + _sta_list=station_name, _password=passkey, _ssid=ssid, _security=security) + + # self.client_connect.station_profile.sta_mode = sta_mode + self.client_connect.upstream_resource = 1 + if mode == "BRIDGE": + self.client_connect.upstream_port = self.upstream_port + elif mode == "NAT": + self.client_connect.upstream_port = self.upstream_port + else: + self.client_connect.upstream_port = self.upstream_port + "." + str(vlan_id) + + self.client_connect.radio = radio + self.client_connect.build() + self.client_connect.wait_for_ip(station_name) + print(self.client_connect.wait_for_ip(station_name)) + if self.client_connect.wait_for_ip(station_name): + self.client_connect._pass("ALL Stations got IP's", print_=True) + return self.client_connect + else: + return False + + def wait_for_ip(self, station=[]): + self.local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + print(station) + if self.local_realm.wait_for_ip(station_list=station): + self.local_realm._pass("ALL Stations got IP's", print_=True) + return True + else: + return False + + def Client_disconnect(self, station_name=[], clean_l3_traffic=False, clear_all_sta = False): + self.client_dis = CreateStation(_host=self.lanforge_ip, _port=self.lanforge_port, + _sta_list=station_name, _password="passkey", _ssid="ssid", _security="security") + if len(station_name) > 0: + self.client_dis.station_profile.cleanup(station_name) + elif clear_all_sta: + exist_sta = [] + for u in self.client_dis.json_get("/port/?fields=port+type,alias")['interfaces']: + if list(u.values())[0]['port type'] not in ['Ethernet', 'WIFI-Radio', 'NA']: + exist_sta.append(list(u.values())[0]['alias']) + self.client_dis.station_profile.cleanup(desired_stations=exist_sta) + if clean_l3_traffic: + try: + exist_l3 = list( + filter(lambda cx_name: cx_name if (cx_name != 'handler' and cx_name != 'uri') else False, + self.client_dis.json_get("/cx/?fields=name"))) + list(map(lambda i: self.client_dis.rm_cx(cx_name=i), exist_l3)) + list(map(lambda cx_name: [self.client_dis.rm_endp(ename=i) for i in [f"{cx_name}-A", f"{cx_name}-B"]], + exist_l3)) + except Exception as e: + pass + return True + + def dataplane(self, station_name=None, mode="BRIDGE", vlan_id=100, download_rate="85%", dut_name="TIP", + upload_rate="0", duration="15s", instance_name="test_demo", raw_lines=None, influx_tags="", + move_to_influx=False): + instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) + + if mode == "BRIDGE": + upstream_port = self.upstream_port + elif mode == "NAT": + + upstream_port = self.upstream_port + elif mode == "VLAN": + # self.upstream_port = self.upstream_port + "." + str(vlan_id) + upstream_port = self.upstream_port + "." + str(vlan_id) + + if raw_lines is None: + raw_lines = [['pkts: 60;142;256;512;1024;MTU;4000'], ['directions: DUT Transmit;DUT Receive'], + ['traffic_types: UDP;TCP'], + ["show_3s: 1"], ["show_ll_graphs: 1"], ["show_log: 1"]] + self.client_connect.upstream_port = upstream_port + elif mode == "VLAN": + self.client_connect.upstream_port = upstream_port + "." + str(vlan_id) + + if raw_lines is None: + raw_lines = [['pkts: 60;142;256;512;1024;MTU;4000'], ['directions: DUT Transmit;DUT Receive'], + ['traffic_types: UDP;TCP'], + ["show_3s: 1"], ["show_ll_graphs: 1"], ["show_log: 1"]] + + self.dataplane_obj = DataplaneTest(lf_host=self.lanforge_ip, + lf_port=self.lanforge_port, + ssh_port=self.lf_ssh_port, + local_lf_report_dir=self.local_report_path, + lf_user="lanforge", + lf_password="lanforge", + instance_name=instance_name, + config_name="dpt_config", + upstream="1.1." + upstream_port, + pull_report=True, + load_old_cfg=False, + download_speed=download_rate, + upload_speed=upload_rate, + duration=duration, + dut=dut_name, + station="1.1." + station_name[0], + test_tag=influx_tags, + raw_lines=raw_lines) + + self.dataplane_obj.setup() + self.dataplane_obj.run() + if move_to_influx: + report_name = "../reports/" + self.dataplane_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + + influx.glob() + + return self.dataplane_obj + + def dualbandperformancetest(self, ssid_5G="[BLANK]", ssid_2G="[BLANK]", mode="BRIDGE", vlan_id=100, dut_name="TIP", + instance_name="test_demo", dut_5g="", dut_2g="", influx_tags="", move_to_influx=False): + instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) + + if mode == "BRIDGE": + self.upstream_port = self.upstream_port + elif mode == "NAT": + self.upstream_port = self.upstream_port + else: + self.upstream_port = self.upstream_port + "." + str(vlan_id) + + self.dualbandptest_obj = ApAutoTest(lf_host=self.lanforge_ip, + lf_port=self.lanforge_port, + lf_user="lanforge", + lf_password="lanforge", + ssh_port=self.lf_ssh_port, + instance_name=instance_name, + config_name="dbp_config", + upstream="1.1." + self.upstream_port, + pull_report=True, + dut5_0=dut_5g, + dut2_0=dut_2g, + load_old_cfg=False, + local_lf_report_dir=self.local_report_path, + max_stations_2=64, + max_stations_5=64, + max_stations_dual=124, + radio2=[self.twog_radios], + radio5=[self.fiveg_radios], + test_tag=influx_tags, + sets=[['Basic Client Connectivity', '0'], ['Multi Band Performance', '1'], + ['Throughput vs Pkt Size', '0'], ['Capacity', '0'], + ['Skip 2.4Ghz Tests', '1'], + ['Skip 5Ghz Tests', '1'], + ['Stability', '0'], + ['Band-Steering', '0'], ['Multi-Station Throughput vs Pkt Size', '0'], + ['Long-Term', '0']] + ) + self.dualbandptest_obj.setup() + self.dualbandptest_obj.run() + if move_to_influx: + report_name = "../reports/" + self.dualbandptest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + + influx.glob() + return self.dualbandptest_obj + + def apstabilitytest(self, ssid_5G="[BLANK]", ssid_2G="[BLANK]", mode="BRIDGE", vlan_id=100, dut_name="TIP", + instance_name="test_demo", dut_5g="", dut_2g=""): + instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) + + if mode == "BRIDGE": + self.upstream_port = self.upstream_port + elif mode == "NAT": + self.upstream_port = self.upstream_port + else: + self.upstream_port = self.upstream_port + "." + str(vlan_id) + + self.apstab_obj = ApAutoTest(lf_host=self.lanforge_ip, + lf_port=self.lanforge_port, + lf_user="lanforge", + lf_password="lanforge", + instance_name=instance_name, + config_name="dbp_config", + upstream="1.1." + self.upstream_port, + pull_report=True, + dut5_0=dut_5g, + dut2_0=dut_2g, + load_old_cfg=False, + local_lf_report_dir=self.local_report_path, + max_stations_2=5, + max_stations_5=5, + max_stations_dual=10, + radio2=[self.twog_radios], + radio5=[self.fiveg_radios], + sets=[['Basic Client Connectivity', '0'], ['Multi Band Performance', '0'], + ['Throughput vs Pkt Size', '0'], ['Capacity', '0'], + ['Stability', '1'], + ['Band-Steering', '0'], ['Multi-Station Throughput vs Pkt Size', '0'], + ['Long-Term', '0']], + raw_lines=[['reset_dur:300'], ['reset_batch_size:2']] + ) + self.apstab_obj.setup() + self.apstab_obj.run() + report_name = self.apstab_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + + influx.glob() + return self.apstab_obj + + def ratevsrange(self, station_name=None, mode="BRIDGE", vlan_id=100, download_rate="85%", dut_name="TIP", + upload_rate="0", duration="1m", instance_name="test_demo", raw_lines=None): + if mode == "BRIDGE": + self.client_connect.upstream_port = self.upstream_port + elif mode == "NAT": + self.client_connect.upstream_port = self.upstream_port + elif mode == "VLAN": + self.client_connect.upstream_port = self.upstream_port + "." + str(vlan_id) + + self.rvr_obj = RvrTest(lf_host=self.lanforge_ip, + lf_port=self.lanforge_port, + ssh_port=self.lf_ssh_port, + lf_user="lanforge", + local_lf_report_dir=self.local_report_path, + lf_password="lanforge", + instance_name=instance_name, + config_name="rvr_config", + upstream="1.1." + self.upstream_port, + pull_report=True, + load_old_cfg=False, + upload_speed=upload_rate, + download_speed=download_rate, + duration=duration, + station="1.1." + station_name[0], + dut=dut_name, + raw_lines=raw_lines) + self.rvr_obj.setup() + self.rvr_obj.run() + report_name = self.rvr_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + + influx.glob() + return self.rvr_obj + + def rx_sensitivity(self, station_name=None, mode="BRIDGE", vlan_id=100, download_rate="100%", dut_name="TIP", + upload_rate="0kbps", duration="30s", instance_name="test_demo", raw_lines=None): + if mode == "BRIDGE": + self.client_connect.upstream_port = self.upstream_port + elif mode == "NAT": + self.client_connect.upstream_port = self.upstream_port + else: + self.client_connect.upstream_port = self.upstream_port + "." + str(vlan_id) + if raw_lines is None: + raw_lines = [['txo_preamble: VHT'], + ['txo_mcs: 4 OFDM, HT, VHT;5 OFDM, HT, VHT;6 OFDM, HT, VHT;7 OFDM, HT, VHT'], + ['spatial_streams: 3'], ['bandw_options: 80'], ['txo_sgi: ON'], + ['txo_retries: No Retry'], ["show_3s: 1"], ['txo_txpower: 17'], + ["show_ll_graphs: 1"], ["show_log: 1"]] + + self.rx_sensitivity_obj = RxSensitivityTest(lf_host=self.lanforge_ip, + lf_port=self.lanforge_port, + ssh_port=self.lf_ssh_port, + local_path=self.local_report_path, + lf_user="lanforge", + lf_password="lanforge", + instance_name=instance_name, + config_name="rx_sen_config", + upstream="1.1." + self.upstream_port, + pull_report=True, + load_old_cfg=False, + download_speed=download_rate, + upload_speed=upload_rate, + duration=duration, + dut=dut_name, + station="1.1." + station_name[0], + raw_lines=raw_lines) + self.rx_sensitivity_obj.setup() + self.rx_sensitivity_obj.run() + report_name = self.rx_sensitivity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + + influx.glob() + return self.rx_sensitivity_obj + + def multipsk(self, ssid="[BLANK]", security=None, mode=None, key1=None, vlan_id=None, key2=None, band="twog", + station_name=None, n_vlan="1", key3=None): + global result1, sta_name + if mode == "BRIDGE": + self.upstream_port = self.upstream_port + elif mode == "NAT": + self.upstream_port = self.upstream_port + if band == "twog": + radio = self.twog_radios[0] + elif band == "fiveg": + radio = self.fiveg_radios[0] + print("vlan id", vlan_id) + + if n_vlan == "1": + input_data = [{ + "password": key1, + "upstream": str(self.upstream_port) + "." + str(vlan_id[0]), + "mac": "", + "num_station": 1, + "radio": str(radio) + }, + { + "password": key2, + "upstream": str(self.upstream_port), + "mac": "", + "num_station": 1, + "radio": str(radio) + }, + ] + else: + input_data = [{ + "password": key1, + "upstream": str(self.upstream_port) + "." + str(vlan_id[0]), + "mac": "", + "num_station": 1, + "radio": str(radio) + }, + { + "password": key2, + "upstream": str(self.upstream_port) + "." + str(vlan_id[1]), + "mac": "", + "num_station": 1, + "radio": str(radio) + }, + { + "password": key3, + "upstream": str(self.upstream_port), + "mac": "", + "num_station": 1, + "radio": str(radio) + }, + ] + print("station name", station_name) + + self.multi_obj = MultiPsk(host=self.lanforge_ip, + port=self.lanforge_port, + ssid=ssid, + input=input_data, + security=security, + upstream_resource=self.upstream_resource) + self.sta_url_map = None + self.multi_obj.build() + self.multi_obj.start() + time.sleep(60) + self.multi_obj.monitor_vlan_ip() + if n_vlan == "1": + self.multi_obj.get_sta_ip() + else: + self.multi_obj.get_sta_ip_for_more_vlan() + + result = self.multi_obj.compare_ip() + print("checking for vlan ips") + if result == "Pass": + print("Test pass") + else: + print("Test Fail") + print("now checking ip for non vlan port") + self.multi_obj.monitor_non_vlan_ip() + self.multi_obj.get_non_vlan_sta_ip() + print("mode", mode) + if mode == "BRIDGE": + result1 = self.multi_obj.compare_nonvlan_ip_bridge() + elif mode == "NAT": + result1 = self.multi_obj.compare_nonvlan_ip_nat() + # station_name = ['sta100', 'sta200', 'sta00'] + cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port) + res_data = cli_base.json_get(_req_url='port?fields=alias,port+type,ip,mac',)['interfaces'] + table_heads = ["station name", "configured vlan-id", "expected IP Range", "allocated IP", "mac address", 'pass/fail'] + table_data = [] + temp = {'sta100':'', 'sta200': '', 'sta00': ''} + for i in res_data: + for item in i: + if i[item]['port type'] == 'Ethernet' and i[item]['alias'] == self.upstream_port: + if mode == 'NAT': + temp.update({'sta00': '192.168.1.1'}) + else: + temp.update({'sta00': i[item]['ip']}) + if i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port+".100"): + temp.update({'sta100': i[item]['ip']}) + elif i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port+".200"): + temp.update({'sta200': i[item]['ip']}) + for i in res_data: + for item in i: + if i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == "sta100": + exp1 = temp['sta100'].split('.') + ip1 = i[item]['ip'].split('.') + if exp1[0] == ip1[0] and exp1[1] == ip1[1]: + pf = 'PASS' + else: + pf = 'FAIL' + table_data.append([i[item]['alias'], '100', f'{exp1[0]}.{exp1[1]}.X.X', i[item]['ip'], i[item]['mac'], + f'{pf}']) + elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta200': + exp2 = temp['sta200'].split('.') + ip2 = i[item]['ip'].split('.') + if exp2[0] == ip2[0] and exp2[1] == ip2[1]: + pf = 'PASS' + else: + pf = 'FAIL' + table_data.append([i[item]['alias'], '200', f'{exp2[0]}.{exp2[1]}.X.X', i[item]['ip'], i[item]['mac'], f'{pf}']) + elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta00': + exp3 = temp['sta00'].split('.') + ip2 = i[item]['ip'].split('.') + if mode == "BRIDGE": + if exp3[0] == ip2[0] and exp3[1] == ip2[1]: + pf = 'PASS' + else: + pf = 'FAIL' + table_data.append([i[item]['alias'], 'WAN upstream', f'{exp3[0]}.{exp3[1]}.X.X', i[item]['ip'], i[item]['mac'], f'{pf}']) + elif mode == "NAT": + if exp3[0] == '192' and exp3[1] == '168': + pf = 'PASS' + else: + pf = 'FAIL' + table_data.append([i[item]['alias'], 'LAN upstream', f'192.168.X.X', i[item]['ip'], i[item]['mac'], f'{pf}']) + print(table_data) + # attach test data in a table to allure + report_obj = Report() + table_info = report_obj.table2(table=table_data, headers=table_heads) + allure.attach(name="Test Results Info", body=table_info) + + if result1 == "Pass": + print("Test passed for non vlan ip ") + else: + print("Test failed for non vlan ip") + print("clean up") + self.multi_obj.postcleanup() + if result == result1: + return True + else: + return False + + def multi_sta_thpt(self, ssid_5G="[BLANK]", ssid_2G="[BLANK]", mode="BRIDGE", vlan_id=100, dut_name="TIP", + raw_line=[], instance_name="test_demo", dut_5g="", dut_2g=""): + + inst_name = instance_name.split('_')[0] + instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) + + if mode == "BRIDGE": + self.upstream_port = self.upstream_port + elif mode == "NAT": + self.upstream_port = self.upstream_port + else: + self.upstream_port = self.upstream_port + "." + str(vlan_id) + + sets = [['Basic Client Connectivity', '0'], ['Multi Band Performance', '0'], + ['Throughput vs Pkt Size', '0'], ['Capacity', '0'], + ['Stability', '0'], + ['Band-Steering', '0'], ['Multi-Station Throughput vs Pkt Size', '1'], + ['Long-Term', '0']] + + if len(self.twog_radios) == 1: + twog_radios = [[self.twog_radios[0]]] + + elif len(self.twog_radios) > 1: + twog_radio = [] + for i in range(0, len(self.twog_radios)): + twog_radio.append([self.twog_radios[i]]) + twog_radios = twog_radio + + if len(self.fiveg_radios) == 1: + fiveg_radios = [[self.fiveg_radios[0]]] + + elif len(self.fiveg_radios) > 1: + fiveg_radio = [] + for i in range(0, len(self.fiveg_radios)): + fiveg_radio.append([self.fiveg_radios[i]]) + fiveg_radios = fiveg_radio + + self.msthpt_obj = ApAutoTest(lf_host=self.lanforge_ip, + lf_port=self.lanforge_port, + ssh_port=self.lf_ssh_port, + + lf_user="lanforge", + lf_password="lanforge", + instance_name=instance_name, + config_name="dbp_config", + upstream="1.1." + self.upstream_port, + pull_report=True, + dut5_0=dut_5g, + dut2_0=dut_2g, + load_old_cfg=False, + local_lf_report_dir=self.local_report_path, + radio2=twog_radios, + radio5=fiveg_radios, + sets=sets, + raw_lines=raw_line + ) + self.msthpt_obj.setup() + self.msthpt_obj.run() + report_name = self.msthpt_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + + influx.glob() + return self.msthpt_obj + + def attenuator_serial(self): + self.obj = AttenuatorSerial( + lfclient_host=self.lanforge_ip, + lfclient_port=self.lanforge_port + ) + val = self.obj.show() + return val + + def attenuator_modify(self, serno, idx, val): + atten_obj = CreateAttenuator(self.lanforge_ip, self.lanforge_port, serno, idx, val) + atten_obj.build() + + def mesh_test(self, instance_name=None, raw_lines=None, duration="60s"): + self.mesh_obj = MeshTest( + lf_host=self.lanforge_ip, + lf_port=self.lanforge_port, + ssh_port=self.lf_ssh_port, + local_lf_report_dir=self.local_report_path, + lf_user="lanforge", + lf_password="lanforge", + instance_name=instance_name, + duration=duration, + config_name="mesh_config", + upstream="1.2.2 eth2", + upload_speed="85%", + download_speed="85%", + pull_report=True, + load_old_cfg=False, + raw_lines=raw_lines, + ) + self.mesh_obj.setup() + self.mesh_obj.run() + return self.mesh_obj + + def attenuator_serial_2g_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", + vlan_id=100, sta_mode=0, station_name=[], lf_tools_obj=None): + radio = self.twog_radios[0] + # index 0 of atten_serial_radio will ser no of 1st 2g radio and index 1 will ser no of 2nd and 3rd 2g radio + atten_serial_radio = [] + atten_serial = self.attenuator_serial() + self.Client_Connect_Using_Radio(ssid=ssid, passkey=passkey, security=security, mode=mode, + vlan_id=vlan_id, radio=radio, sta_mode=sta_mode, + station_name=station_name) + signal1 = lf_tools_obj.station_data_query(station_name=station_name[0], query="signal") + atten_sr = atten_serial[0].split(".") + for i in range(4): + self.attenuator_modify(int(atten_sr[2]), i, 400) + time.sleep(0.5) + signal2 = lf_tools_obj.station_data_query(station_name=station_name[0], query="signal") + if abs(int(signal2.split(" ")[0])) - abs(int(signal1.split(" ")[0])) >= 5: + atten_serial_radio = atten_serial + else: + atten_serial_radio = atten_serial[::-1] + self.Client_disconnect(station_name=station_name) + return atten_serial_radio + + def attenuator_serial_5g_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", + vlan_id=100, sta_mode=0, station_name=[], lf_tools_obj=None): + radio = self.fiveg_radios[0] + # index 0 of atten_serial_radio will ser no of 1st 5g radio and index 1 will ser no of 2nd and 3rd 5g radio + atten_serial_radio = [] + atten_serial = self.attenuator_serial() + self.Client_Connect_Using_Radio(ssid=ssid, passkey=passkey, security=security, mode=mode, + vlan_id=vlan_id, radio=radio, sta_mode=sta_mode, + station_name=station_name) + signal1 = lf_tools_obj.station_data_query(station_name=station_name[0], query="signal") + atten_sr = atten_serial[0].split(".") + for i in range(4): + self.attenuator_modify(int(atten_sr[2]), i, 400) + time.sleep(0.5) + signal2 = lf_tools_obj.station_data_query(station_name=station_name[0], query="signal") + if abs(int(signal2.split(" ")[0])) - abs(int(signal1.split(" ")[0])) >= 5: + atten_serial_radio = atten_serial + else: + atten_serial_radio = atten_serial[::-1] + self.Client_disconnect(station_name=station_name) + return atten_serial_radio + + def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None, + dut_security=None, dut_passwd=None, band=None, radio=None, lf_tools=None, type=None): + + local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + station_profile = local_realm.new_station_profile() + if band == "fiveg": + radio = self.fiveg_radios[0] + if band == "twog": + radio = self.twog_radios[0] + if band == "sixg": + radio = self.ax_radios[0] + + # pre clean + sta_list = lf_tools.get_station_list() + print(sta_list) + if not sta_list: + print("no stations on lanforge") + else: + station_profile.cleanup(sta_list, delay=1) + LFUtils.wait_until_ports_disappear(base_url=local_realm.lfclient_url, + port_list=sta_list, + debug=True) + time.sleep(2) + print("pre cleanup done") + + station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, + end_id_=num_sta - 1, padding_number_=10000, + radio=radio) + + if type == "11r-sae-802.1x": + dut_passwd = "[BLANK]" + station_profile.use_security(dut_security, dut_ssid, dut_passwd) + station_profile.set_number_template("00") + + station_profile.set_command_flag("add_sta", "create_admin_down", 1) + + station_profile.set_command_param("set_port", "report_timer", 1500) + + # connect station to particular bssid + # self.station_profile.set_command_param("add_sta", "ap", self.bssid[0]) + + station_profile.set_command_flag("set_port", "rpt_timer", 1) + if type == "11r": + station_profile.set_command_flag("add_sta", "80211u_enable", 0) + station_profile.set_command_flag("add_sta", "8021x_radius", 1) + station_profile.set_command_flag("add_sta", "disable_roam", 1) + station_profile.set_wifi_extra(key_mgmt="FT-PSK ", + pairwise="", + group="", + psk="", + eap="", + identity="", + passwd="", + pin="" + ) + if type == "11r-sae": + station_profile.set_command_flag("add_sta", "ieee80211w", 2) + station_profile.set_command_flag("add_sta", "80211u_enable", 0) + station_profile.set_command_flag("add_sta", "8021x_radius", 1) + station_profile.set_command_flag("add_sta", "disable_roam", 1) + station_profile.set_wifi_extra(key_mgmt="FT-SAE ", + pairwise="", + group="", + psk="", + eap="", + identity="", + passwd="", + pin="" + ) + + if type == "11r-sae-802.1x": + station_profile.set_command_flag("set_port", "rpt_timer", 1) + station_profile.set_command_flag("add_sta", "ieee80211w", 2) + station_profile.set_command_flag("add_sta", "80211u_enable", 0) + station_profile.set_command_flag("add_sta", "8021x_radius", 1) + station_profile.set_command_flag("add_sta", "disable_roam", 1) + # station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f") + station_profile.set_wifi_extra(key_mgmt="FT-EAP ", + pairwise="[BLANK]", + group="[BLANK]", + psk="[BLANK]", + eap="TTLS", + identity="testuser", + passwd="testpasswd", + pin="" + ) + station_profile.create(radio=radio, sta_names_=station_list) + local_realm.wait_until_ports_appear(sta_list=station_list) + station_profile.admin_up() + if local_realm.wait_for_ip(station_list): + print("All stations got IPs") + return True + else: + print("Stations failed to get IPs") + return False + + def json_get(self, _req_url="/"): + cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, ) + json_response = cli_base.json_get(_req_url=_req_url) + return json_response + + def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, + traffic_type, sta_list,): + # checked + print(sta_list) + print(type(sta_list)) + print(self.upstream) + local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + cx_profile = local_realm.new_l3_cx_profile() + cx_profile.host = self.lanforge_ip + cx_profile.port = self.lanforge_port + layer3_cols = ['name', 'tx bytes', 'rx bytes', 'tx rate', 'rx rate'] + cx_profile.side_a_min_bps = side_a_min_rate + cx_profile.side_a_max_bps = side_a_max_rate + cx_profile.side_b_min_bps = side_b_min_rate + cx_profile.side_b_max_bps = side_b_max_rate + + # create + cx_profile.create(endp_type=traffic_type, side_a=sta_list, + side_b=self.upstream, + sleep_time=0) + cx_profile.start_cx() + + def get_cx_list(self): + local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + layer3_result = local_realm.cx_list() + layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item] + print(layer3_names) + return layer3_names + + + def get_layer3_values(self, cx_name=None, query=None): + url = f"/cx/{cx_name}" + response = self.json_get(_req_url=url) + result = response[str(cx_name)][str(query)] + return result + + + def crete_file_attach(self, bssid,rssi, rx_rate, rx_bytes, rx_packets, cx_time, filename, name ): + try: + filename = filename + with open(filename, 'w') as f: + lines = ["bssid: " + str(bssid), 'rssi: ' + str(rssi), "rx rate: " + str(rx_rate), + "rx packets: " + str(rx_packets), "rx bytes: " + str(rx_bytes), "connection time: " + str(cx_time)] + for line in lines: + f.write(line) + f.write('\n') + allure.attach.file(name=str(name), source=str(filename)) + except FileNotFoundError: + print("The 'docs' directory does not exist") + + + def hard_roam(self, run_lf, instantiate_profile, lf_tools, lf_reports, get_configuration, test=None, band=None, num_sta=1, + security=None, security_key=None, iteration=1, ssid_name=None, + roaming_delay=None, option=None, channel=36, duration=None, duration_based=False, + iteration_based=True, dut_name=None, identity=None, ttls_pass=None): + allure.attach(name="Test Procedure", body="This test consists of creating a multiple client which will be " \ + " connected to the nearest ap, here the test automation will " \ + "do hard roam based on forced roam method" \ + "check if client performed roam by monitoring client bssid, for n number of iterions") + allure.attach(name="Pass Fail Criteria", + body="Test is said to be pass if it satisfy 4 criterias as - 1 after roam bssid should change , 2- reassociation respone should be present, 3- auth request should be present, 4- roaming time should be less then 50 ms " \ + "other wise the test is said to be failed ") + + # attaching 11r log before tart of test + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=0) + log = instantiate_profile_obj.show_11r_log() + + # get bssid from ap for 2g and 5g + radio = "" + c1_bssid = "" + c2_bssid = "" + if test == "2g": + c1_2g_bssid = "" + c2_2g_bssid = "" + if run_lf: + c1_2g_bssid = get_configuration["access_point"][0]["ssid"]["2g-bssid"] + allure.attach(name="bssid of ap1", body=c1_2g_bssid) + c2_2g_bssid = get_configuration["access_point"][1]["ssid"]["2g-bssid"] + allure.attach(name="bssid of ap2", body=c2_2g_bssid) + + else: + # instantiate controller class and check bssid's for each ap in testbed + for ap_name in range(len(get_configuration['access_point'])): + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=ap_name) + bssid_2g = instantiate_profile_obj.cal_bssid_2g() + if ap_name == 0: + c1_2g_bssid = bssid_2g + if ap_name == 1: + c2_2g_bssid = bssid_2g + c1_bssid = c1_2g_bssid + c2_bssid = c2_2g_bssid + elif test == "5g": + c1_5g_bssid = "" + c2_5g_bssid = "" + if run_lf: + c1_5g_bssid = get_configuration["access_point"][0]["ssid"]["5g-bssid"] + allure.attach(name="bssid of ap1", body=c1_5g_bssid) + c2_5g_bssid = get_configuration["access_point"][1]["ssid"]["5g-bssid"] + allure.attach(name="bssid of ap2", body=c2_5g_bssid) + else: + for ap_name in range(len(get_configuration['access_point'])): + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=ap_name) + bssid_5g = instantiate_profile_obj.cal_bssid_5g() + if ap_name == 0: + c1_5g_bssid = bssid_5g + if ap_name == 1: + c2_5g_bssid = bssid_5g + c1_bssid = c1_5g_bssid + c2_bssid = c2_5g_bssid + + elif test == "6g": + c1_6g_bssid = "" + c2_6g_bssid = "" + if run_lf: + c1_6g_bssid = get_configuration["access_point"][0]["ssid"]["6g-bssid"] + allure.attach(name="bssid of ap1", body=c1_6g_bssid) + c2_6g_bssid = get_configuration["access_point"][1]["ssid"]["6g-bssid"] + allure.attach(name="bssid of ap2", body=c2_6g_bssid) + else: + for ap_name in range(len(get_configuration['access_point'])): + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=ap_name) + bssid_6g = instantiate_profile_obj.cal_bssid_6g() + if ap_name == 0: + c1_6g_bssid = bssid_6g + if ap_name == 1: + c2_6g_bssid = bssid_6g + c1_bssid = c1_6g_bssid + c2_bssid = c2_6g_bssid + + print("bssid of c1 ", c1_bssid) + allure.attach(name="bssid of ap1", body=c1_bssid) + print("bssid of c2", c2_bssid) + allure.attach(name="bssid of ap2", body=c2_bssid) + allure.attach(name="11r logs before roam test", body=str(log)) + fiveg_radio, sixg_radio, twog_radio, sniff_radio = None, None, None, None + supplicant_radio = None + if band == "twog": + twog_radio = self.twog_radios[0] + supplicant_radio = twog_radio.split(".")[2] + radio_ = self.ax_radios[0] + sniff_radio = radio_.split(".")[2] + if band == "fiveg": + fiveg_radio = self.fiveg_radios[0] + supplicant_radio = fiveg_radio.split(".")[2] + radio_ = self.ax_radios[0] + sniff_radio = radio_.split(".")[2] + if band == "sixg": + sixg_radio = self.ax_radios[1] + supplicant_radio = sixg_radio.split(".")[2] + radio_ = self.ax_radios[2] + sniff_radio = radio_.split(".")[2] + obj = HardRoam(lanforge_ip=self.lanforge_ip, + lanforge_port=self.lanforge_port, + lanforge_ssh_port = 22, + c1_bssid=c1_bssid, + c2_bssid=c2_bssid, + fiveg_radio=fiveg_radio, + twog_radio=twog_radio, + sixg_radio=sixg_radio, + band=band, + sniff_radio=sniff_radio, + num_sta=num_sta, + security=security, + security_key=security_key, + ssid=ssid_name, + upstream=self.upstream, + duration=duration, + iteration=iteration, + channel=channel, + option=option, + duration_based=duration_based, + iteration_based=iteration_based, + dut_name = dut_name, + traffic_type="lf_udp", + path="../lanforge/lanforge-scripts", + scheme="ssh", + dest="localhost", + user="admin", + passwd="Cisco123", + prompt="WLC2", + series_cc="9800", + ap="AP687D.B45C.1D1C", + port="8888", + band_cc="5g", + timeout="40", + identity = identity, + ttls_pass = ttls_pass + ) + x = os.getcwd() + print(x) + file = obj.generate_csv() + kernel, message = obj.run(file_n=file) + allure.attach(name="message", body=str(message)) + # file = ["test_client_0.csv"] + report_dir_name = obj.generate_report(csv_list=file, kernel_lst=kernel, current_path=str(x) + "/tests") + print(report_dir_name) + lf_csv_obj = lf_csv() + for i, y in zip(file, range(len(file))): + data = lf_csv_obj.read_csv_row(file_name=str(x) + "/" + str(report_dir_name) + "/csv_data/" + str(i)) + tab = lf_reports.table2(table=data) + allure.attach(name="client " + str(y) + " table", body=str(tab)) + # report_dir_name = "2022-04-30-18-51-07_Hard Roam Test" + relevant_path = report_dir_name + "/" + entries = os.listdir(report_dir_name + "/") + pdf = None + for i in entries: + if ".pdf" in i: + pdf = i + allure.attach.file(source=relevant_path + pdf, name="hard_roam_report") + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=0) + z = instantiate_profile_obj.show_wireless_client_detail() + allure.attach(name="wireless client details", body=str(z)) + log = instantiate_profile_obj.show_11r_log() + allure.attach(name="11r logs after roam test", body=str(log)) + allure.attach(name="test_result_folder", body=str(report_dir_name)) + try: + supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + supplicant_radio + ".txt" + obj = SCP_File(ip=self.lanforge_ip, port=self.lanforge_ssh_port, username="root", password="lanforge", + remote_path=supplicant, + local_path=relevant_path) + obj.pull_file() + # obj.ssh_connect(command="journalctl --since '1 hour ago' > kernel_log.txt") + # kernel_log = "/home/lanforge/kernel_log.txt" + # obj1 = SCP_File(ip=self.lanforge_ip, port=self.lanforge_ssh_port, username="root", password="lanforge", + # remote_path=kernel_log, + # local_path=relevant_path) + # obj1.pull_file() + allure.attach.file(source=relevant_path + "/wpa_supplicant_log_" + supplicant_radio + ".txt", + name="supplicant_log") + except Exception as e: + print(e) + + def set_radio_country_channel(self,_radio="wiphy0",_channel=0,_country_num=840,): # 840 - US + data = { + "shelf": _radio[0], + "resource": _radio[1], + "radio": _radio[2], + "mode": "NA", + "channel": _channel, + "country": _country_num + } + print(f"Lanforge-radio Country changed {_country_num}") + self.local_realm.json_post("/cli-json/set_wifi_radio", _data=data) + + def downlink_mu_mimo(self, radios_2g=[], radios_5g=[], radios_ax=[], dut_name="TIP", dut_5g="", dut_2g="", + mode="BRIDGE", vlan_id=1, skip_2g=True, skip_5g=False): + raw_line = [] + skip_twog = '1' if skip_2g else '0' + skip_fiveg = '1' if skip_5g else '0' + sniff_radio = 'wiphy0' + channel = 149 if skip_2g else 11 + upstream_port = self.upstream_port + + sets = [['Calibrate Attenuators', '0'], ['Receiver Sensitivity', '0'], ['Maximum Connection', '0'], + ['Maximum Throughput', '0'], ['Airtime Fairness', '0'], ['Range Versus Rate', '0'], + ['Spatial Consistency', '0'], + ['Multiple STAs Performance', '0'], ['Multiple Assoc Stability', '0'], ['Downlink MU-MIMO', '1'], + ['AP Coexistence', '0'], ['Long Term Stability', '0'], ['Skip 2.4Ghz Tests', f'{skip_twog}'], + ['Skip 5Ghz Tests', f'{skip_fiveg}'], ['2.4Ghz Channel', 'AUTO'], ['5Ghz Channel', 'AUTO']] + if len(radios_2g) >= 3 and len(radios_5g) >= 3: + for i in range(6): + if i == 0 or i == 2: + raw_line.append([f'radio-{i}: {radios_5g[0] if i == 0 else radios_5g[1]}']) + if i == 1 or i == 3: + raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}']) + if i == 4 or i == 5: + raw_line.append([f'radio-{i}: {radios_5g[2] if i == 4 else radios_2g[2]}']) + if len(radios_ax) >= 1: + temp_ax = str(radios_ax[0]).split(" ") + if len(temp_ax) == 2: + sniff_radio = str(temp_ax[1]) + elif skip_2g: + temp = str(radios_5g[0]).split(" ") + if len(temp) == 2: + sniff_radio = str(temp[1]) + elif skip_5g: + temp = str(radios_2g[0]).split(" ") + if len(temp) == 2: + sniff_radio = str(temp[1]) + elif len(radios_2g) >= 2 and len(radios_5g) >= 2 and len(radios_ax) >= 2: + if len(radios_2g) >= 3 and len(radios_5g) >= 3: + for i in range(6): + if i == 0 or i == 2: + raw_line.append([f'radio-{i}: {radios_5g[0] if i == 0 else radios_5g[1]}']) + if i == 1 or i == 3: + raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}']) + if i == 4 or i == 5: + raw_line.append([f'radio-{i}: {radios_5g[2] if i == 4 else radios_2g[2]}']) + if len(radios_ax) >= 1: + temp_ax = str(radios_ax[0]).split(" ") + if len(temp_ax) == 2: + sniff_radio = str(temp_ax[1]) + else: + for i in range(6): + if i == 0 or i == 2: + raw_line.append([f'radio-{i}: {radios_5g[0] if i == 0 else radios_5g[1]}']) + if i == 1 or i == 3: + raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}']) + if i == 4 or i == 5: + raw_line.append([f'radio-{i}: {radios_ax[0] if i == 4 else radios_ax[1]}']) + if len(radios_ax) >= 3: + temp_ax = str(radios_ax[2]).split(" ") + if len(temp_ax) == 2: + sniff_radio = str(temp_ax[1]) + + if len(raw_line) != 6: + raw_line = [['radio-0: 1.1.5 wiphy1'], ['radio-1: 1.1.4 wiphy0'], ['radio-2: 1.1.7 wiphy3'], + ['radio-3: 1.1.6 wiphy2'], ['radio-4: 1.1.8 wiphy4'], ['radio-5: 1.1.9 wiphy5']] + instance_name = "tr398-instance-{}".format(str(random.randint(0, 100000))) + + if not os.path.exists("mu-mimo-config.txt"): + with open("mu-mimo-config.txt", "wt") as f: + for i in raw_line: + f.write(str(i[0]) + "\n") + f.close() + + if mode == "BRIDGE" or mode == "NAT": + upstream_port = self.upstream_port + else: + upstream_port = self.upstream_port + "." + str(vlan_id) + print("Upstream Port: ", self.upstream_port) + print("radio used to sniff :", sniff_radio) + self.pcap_obj = LfPcap(host=self.lanforge_ip, port=self.lanforge_port) + self.cvtest_obj = TR398Test(lf_host=self.lanforge_ip, + lf_port=self.lanforge_port, + lf_user="lanforge", + lf_password="lanforge", + instance_name=instance_name, + config_name="cv_dflt_cfg", + upstream="1.1." + upstream_port, + pull_report=True, + local_lf_report_dir=self.local_report_path, + load_old_cfg=False, + dut2=dut_2g, + dut5=dut_5g, + raw_lines_file="mu-mimo-config.txt", + enables=[], + disables=[], + raw_lines=[], + sets=sets, + test_rig=dut_name + ) + self.cvtest_obj.result = True + self.cvtest_obj.setup() + t1 = threading.Thread(target=self.cvtest_obj.run) + t1.start() + t2 = threading.Thread(target=self.pcap_obj.sniff_packets, args=(sniff_radio, "mu-mimo", channel, 30)) + if t1.is_alive(): + time.sleep(480) + t2.start() + while t1.is_alive(): + time.sleep(1) + if os.path.exists("mu-mimo-config.txt"): + os.remove("mu-mimo-config.txt") + # pull pcap from lanforge to current directory + if self.pcap_obj.pcap_name is not None: + lf_report.pull_reports(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", + password="lanforge", report_location="/home/lanforge/" + self.pcap_obj.pcap_name, + report_dir=".") + else: + raise ValueError("pcap_name should not be None") + + table_heads = ["Packet Type", "Capability Check", 'PASS/FAIL'] + table_data = [] + + try: + if os.path.exists(self.pcap_obj.pcap_name): + # check for mu-mimo bearmformee association request + assoc_req = self.pcap_obj.check_beamformee_association_request(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=assoc_req, name="Check Bearmformee Association Request") + if assoc_req == "MU Beamformee Capable: Supported": + table_data.append(['Association Request', assoc_req, 'PASS']) + else: + table_data.append(['Association Request', assoc_req, 'FAIL']) + self.cvtest_obj.result = 'FAIL' + + #check for mu-mimo bearmformer association response + assoc_res = self.pcap_obj.check_beamformer_association_response(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=assoc_res, name="Check Bearmformer Association Response") + if assoc_res == "MU Beamformer Capable: Supported": + table_data.append(['Association Response', assoc_res, 'PASS']) + else: + table_data.append(['Association Response', assoc_res, 'FAIL']) + self.cvtest_obj.result = 'FAIL' + + # check for mu-mimo bearmformer in beacon frame + beacon_res = self.pcap_obj.check_beamformer_beacon_frame(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=beacon_res, name="Check Bearmformer in Beacon Frame") + if beacon_res == "MU Beamformer Capable: Supported": + table_data.append(['Beacon Frame', beacon_res, 'PASS']) + else: + table_data.append(['Beacon Frame', beacon_res, 'FAIL']) + self.cvtest_obj.result = 'FAIL' + + # check for mu-mimo bearmformer in probe response + beacon_res = self.pcap_obj.check_beamformer_probe_response(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=beacon_res, name="Check Bearmformer in Probe Response") + if beacon_res == "MU Beamformer Capable: Supported": + table_data.append(['Beacon Frame', beacon_res, 'PASS']) + else: + table_data.append(['Beacon Frame', beacon_res, 'FAIL']) + self.cvtest_obj.result = 'FAIL' + + # check for Group ID Management in Packet + group_mgmt = self.pcap_obj.check_group_id_mgmt(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=group_mgmt, name="Check Group ID Management") + if group_mgmt != "Packet Not Found" or group_mgmt is not None: + table_data.append(['Group ID Management Frame', group_mgmt, 'PASS']) + else: + table_data.append(['Group ID Management Frame', group_mgmt, 'FAIL']) + self.cvtest_obj.result = 'FAIL' + + print(table_data) + # attach test data in a table to allure + report_obj = Report() + table_info = report_obj.table2(table=table_data, headers=table_heads) + allure.attach(name="Test Results Info", body=table_info) + + # attach pcap file to allure + allure.attach.file(source=self.pcap_obj.pcap_name, + name="pcap_file", attachment_type=allure.attachment_type.PCAP) + except FileNotFoundError: + print(f"{self.pcap_obj.pcap_name} doesn't exist") + + report_name = self.cvtest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + influx.glob() + return self.cvtest_obj + + def scan_ssid(self, radio=""): + '''This method for scan ssid data''' + list_data = [] + obj_scan = StaScan(host=self.lanforge_ip, port=self.lanforge_port, ssid="fake ssid", security="open", + password="[BLANK]", radio=radio, sta_list=["sta0000"], csv_output="scan_ssid.csv") + obj_scan.pre_cleanup() + time1 = datetime.now() + first = time.mktime(time1.timetuple()) * 1000 + obj_scan.build() + obj_scan.start() + time2 = datetime.now() + second = time.mktime(time2.timetuple()) * 1000 + diff = int(second - first) + try: + with open(obj_scan.csv_output, 'r') as file: + reader = csv.reader(file) + for row in reader: + if row[1] == "age": + list_data.append(row) + continue + elif int(row[1]) < diff: + list_data.append(row) + except Exception as e: + print(e) + report_obj = Report() + csv_data_table = report_obj.table2(list_data) + allure.attach(name="scan_ssid_data", body=csv_data_table) + obj_scan.cleanup() + return list_data + + def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): + self.pcap_name = test_name + ".pcap" + self.pcap_obj = SniffRadio(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port, radio=radio, + channel=radio_channel) + self.pcap_obj.setup(0, 0, 0) + time.sleep(5) + self.pcap_obj.monitor.admin_up() + time.sleep(5) + self.pcap_obj.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration) + + def stop_sniffer(self): + self.pcap_obj.monitor.admin_down() + time.sleep(2) + self.pcap_obj.cleanup() + lf_report.pull_reports(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", + password="lanforge", + report_location="/home/lanforge/" + self.pcap_name, + report_dir=".") + allure.attach.file(source=self.pcap_name, + name="pcap_file", attachment_type=allure.attachment_type.PCAP) + print("pcap file name : ", self.pcap_name) + return self.pcap_name + + def check_ssid_available_scan_result(self, scan_ssid_data=None, ssid=None): + '''This method will check ssid available or not in scan ssid data''' + try: + flag = False + for i in scan_ssid_data: + if ssid in i: + flag = True + if flag: + return True + else: + return False + except Exception as e: + print(e) + + def country_code_channel_division(self, ssid = "[BLANK]", passkey='[BLANK]', security="wpa2", mode="BRIDGE", + band='2G', station_name=[], vlan_id=100, channel='1', channel_width=20, + country_num=392, country='United States(US)'): + self.local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + radio = (self.fiveg_radios[0] if band == "fiveg" else self.twog_radios[0]).split('.') + self.set_radio_country_channel(_radio=radio,_country_num=country_num) + station = self.Client_Connect(ssid=ssid, passkey=passkey, security=security, mode=mode, band=band, + station_name=station_name, vlan_id=vlan_id) + if station: + for i in range(10): + station_info = station.json_get(f"/port/1/1/{station_name[0]}") + if station_info['interface']['ip'] == '0.0.0.0': + time.sleep(5) + else: + break + print(f"station {station_name[0]} IP: {station_info['interface']['ip']}\n" + f"connected channel: {station_info['interface']['channel']}\n" + f"and expected channel: {channel}") + allure.attach(name="Definition", + body="Country code channel test intends to verify stability of Wi-Fi device " \ + "where the AP is configured with different countries with different channels.") + allure.attach(name="Procedure", + body=f"This test case definition states that we need to push the basic {mode.lower()} mode config on the AP to " + f"be tested by configuring it with {country} on {channel_width}MHz channel width and " + f"channel {channel}. Create a client on {'5' if band=='fiveg' else '2.4'} GHz radio. Pass/ fail criteria: " + f"The client created on {'5' if band=='fiveg' else '2.4'} GHz radio should get associated to the AP") + allure.attach(name="Details", + body=f"Country code : {country[country.find('(')+1:-1]}\n" + f"Bandwidth : {channel_width}Mhz\n" + f"Channel : {channel}\n") + station_data_str = "" + for i in station_info["interface"]: + try: + station_data_str += i + " : " + str(station_info["interface"][i]) + "\n" + except Exception as e: + print(e) + allure.attach(name=str(station_name[0]), body=str(station_data_str)) + station.station_profile.cleanup() + self.set_radio_country_channel(_radio=radio) + if station_info['interface']['ip'] and station_info['interface']['channel'] == str(channel): + return True + else: + return False + + def ofdma(self, mode="BRIDGE", vlan_id=1, inst_name="ofdma", batch_size='1', rawlines=None, sniffer_channel=0, + sniffer_radio="wiphy0", wct_stations=None, sniffer_duration=60): + + if inst_name == "ofdma": + inst_name = "ofdma-instance-{}".format(str(random.randint(0, 100000))) + + if rawlines is None: + rawlines = [['pdu_sz: 300']] + + self.pcap_obj = LfPcap(host=self.lanforge_ip, port=self.lanforge_port) + sniffer = threading.Thread(target=self.pcap_obj.sniff_packets, + args=(sniffer_radio, "ofdma-test", sniffer_channel, 90)) + sniffer.start() + ofdma_obj = self.wifi_capacity(instance_name=inst_name, mode=mode, vlan_id=vlan_id, download_rate="10 Mbps", + batch_size=batch_size, stations=wct_stations, create_stations=False, + sort="interleave", + upload_rate="0", protocol="TCP-IPv4", duration="60000", raw_lines=rawlines) + ofdma_obj.result = 'PASS' + while sniffer.is_alive(): + time.sleep(1) + report_name = ofdma_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + # pull pcap from lanforge to current directory + if self.pcap_obj.pcap_name is not None: + try: + lf_report.pull_reports(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", + password="lanforge", report_location="/home/lanforge/" + self.pcap_obj.pcap_name, + report_dir=".") + except Exception as e: + print("error while pulling pcap file from lanforge", e) + else: + raise ValueError("pcap_name should not be None") + + table_heads = ["Packet Type", "Capability Check", 'PASS/FAIL'] + table_data = [] + + # check for HE Capability in Beacon Frame + check_he = self.pcap_obj.check_he_capability_beacon_frame(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=check_he, name="Check HE Capabilities in Beacon Frame") + if check_he == "HE SU PPDU & HE MU PPDU w 4x HE-LTF & 0.8us GI: Supported": + table_data.append(['Beacon Frame', check_he, 'PASS']) + else: + table_data.append(['Beacon Frame', check_he, 'FAIL']) + ofdma_obj.result = 'FAIL' + + # check for HE Capability in probe request + check_he = self.pcap_obj.check_he_capability_probe_request(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=check_he, name="Check HE Capabilities in Probe Request") + if check_he == "HE SU PPDU & HE MU PPDU w 4x HE-LTF & 0.8us GI: Supported": + table_data.append(['Probe Request', check_he, 'PASS']) + else: + table_data.append(['Probe Request', check_he, 'FAIL']) + ofdma_obj.result = 'FAIL' + + # check for HE Capability in probe response + check_he = self.pcap_obj.check_he_capability_probe_response(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=check_he, name="Check HE Capabilities in Probe Request") + if check_he == "HE SU PPDU & HE MU PPDU w 4x HE-LTF & 0.8us GI: Supported": + table_data.append(['Probe Response', check_he, 'PASS']) + else: + table_data.append(['Probe Response', check_he, 'FAIL']) + ofdma_obj.result = 'FAIL' + + # check for HE Capability in Association request + check_he = self.pcap_obj.check_he_capability_association_request(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=check_he, name="Check HE Capabilities in Association Request") + if check_he == "HE SU PPDU & HE MU PPDU w 4x HE-LTF & 0.8us GI: Supported": + table_data.append(['Association Request', check_he, 'PASS']) + else: + table_data.append(['Association Request', check_he, 'FAIL']) + ofdma_obj.result = 'FAIL' + + # check for HE Capability in Association response + check_he = self.pcap_obj.check_he_capability_association_response(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=check_he, name="Check HE Capabilities in Association Response") + if check_he == "HE SU PPDU & HE MU PPDU w 4x HE-LTF & 0.8us GI: Supported": + table_data.append(['Association Response', check_he, 'PASS']) + else: + table_data.append(['Association Response', check_he, 'FAIL']) + ofdma_obj.result = 'FAIL' + + # check for Guard Interval + check_he = self.pcap_obj.check_he_guard_interval(pcap_file=self.pcap_obj.pcap_name) + allure.attach(body=check_he, name="Check Guard Interval") + if check_he == "Packet Not Found" or check_he is None: + table_data.append(['Guard Interval', check_he, 'FAIL']) + ofdma_obj.result = 'FAIL' + else: + table_data.append(['Guard Interval', check_he, 'PASS']) + + print(table_data) + # attach test data in a table to allure + report_obj = Report() + table_info = report_obj.table2(table=table_data, headers=table_heads) + allure.attach(name="Test Results Info", body=table_info) + + allure.attach.file(source=self.pcap_obj.pcap_name, + name="pcap_file", attachment_type=allure.attachment_type.PCAP) + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + influx.glob() + return ofdma_obj + + def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO"): + try: + radio = radio.split(".") + shelf = radio[0] + resource = radio[1] + radio_ = radio[2] + print("radio %s channel %s" % (radio, channel)) + local_realm_obj = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + data = { + "shelf": shelf, + "resource": resource, + "radio": radio_, + "mode": "NA", + "channel": channel + } + local_realm_obj.json_post("/cli-json/set_wifi_radio", _data=data) + time.sleep(2) + except Exception as e: + print(e) + + def layer3_traffic(self, ssid_num=8, band="2.4 Ghz", station_name=[]): + self.staConnect = StaConnect2(self.lanforge_ip, self.lanforge_port, debug_=self.debug) + self.staConnect.station_profile = self.staConnect.new_station_profile() + self.staConnect.resource = 1 + self.staConnect.station_names = station_name + self.staConnect.runtime_secs = 40 + self.staConnect.bringup_time_sec = 80 + self.staConnect.cleanup_on_exit = True + self.staConnect.station_profile.up = True + self.staConnect.use_existing_sta = True + allure.attach(name="Definition", + body="Max-SSID test intends to verify stability of Wi-Fi device " \ + "where the AP is configured with max no.of SSIDs with different security modes.") + allure.attach(name="Procedure", + body=f"This test case definition states that we need to push the basic bridge mode config on the " + f"AP to be tested by configuring it with maximum {ssid_num} SSIDs in {band} radio. " + f"Create client on each SSIDs and run Layer-3 traffic. Pass/ fail criteria: " + f"The client created should get associated to the AP") + # Create UDP endpoints + self.staConnect.l3_udp_profile = self.staConnect.new_l3_cx_profile() + self.staConnect.l3_udp_profile.report_timer = 1000 + self.staConnect.l3_udp_profile.name_prefix = "udp" + self.staConnect.cx_profile.name_prefix = "udp" + self.staConnect.pre_cleanup() + self.staConnect.l3_udp_profile.create(endp_type="lf_udp", + side_a=station_name, + side_b="%d.%s" % (self.staConnect.resource, self.upstream_port), + suppress_related_commands=True) + + # Create TCP endpoints + self.staConnect.l3_tcp_profile = self.staConnect.new_l3_cx_profile() + self.staConnect.l3_tcp_profile.report_timer = 1000 + self.staConnect.l3_tcp_profile.name_prefix = "tcp" + self.staConnect.cx_profile.name_prefix = "tcp" + self.staConnect.pre_cleanup() + self.staConnect.l3_tcp_profile.create(endp_type="lf_tcp", + side_a=station_name, + side_b="%d.%s" % (self.staConnect.resource, self.upstream_port), + suppress_related_commands=True) + self.staConnect.start() + print("napping %f sec" % self.staConnect.runtime_secs) + time.sleep(self.staConnect.runtime_secs) + count = 0 + report_obj = Report() + for station_info in self.staConnect.resulting_stations: + data_table, dict_table = "", {} + dict_data = self.staConnect.resulting_stations[station_info]["interface"] + dict_table["Interface"] = list(dict_data.keys()) + dict_table["Value"] = list(dict_data.values()) + try: + data_table = report_obj.table2(table=dict_table, headers='keys') + except Exception as e: + print(e) + allure.attach(name=str(self.staConnect.resulting_stations[station_info]["interface"]['alias']), body=data_table) + data_table = "" + dict_table.clear() + cx = list(self.staConnect.l3_tcp_profile.created_cx.keys())[count] + dict_data = self.staConnect.json_get(f"/cx/{cx}") + dict_table["Cross-connect"] = list(dict_data[cx].keys()) + dict_table["tcp-Value"] = list(dict_data[cx].values()) + try: + data_table = report_obj.table2(table=dict_table, headers='keys') + except Exception as e: + print(e) + cx = list(self.staConnect.l3_udp_profile.created_cx.keys())[count] + dict_data = self.staConnect.json_get(f"/cx/{cx}") + dict_table["udp-Value"] = list(dict_data[cx].values()) + try: + data_table = report_obj.table2(table=dict_table, headers='keys') + except Exception as e: + print(e) + allure.attach(name="cx-" + str(self.staConnect.resulting_stations[station_info]["interface"]['alias']), body=str(data_table)) + count += 1 + self.staConnect.stop() + self.staConnect.cleanup() + + + +if __name__ == '__main__': + influx_host = "influx.cicd.lab.wlan.tip.build" + influx_port = 80 + influx_token = "TCkdATXAbHmNbn4QyNaj43WpGBYxFrzV" + influx_bucket = "tip-cicd" + influx_org = "tip" + influx_params = { + "influx_host": influx_host, + "influx_port": influx_port, + "influx_token": influx_token, + "influx_bucket": influx_bucket, + "influx_org": influx_org, + "influx_tag": ["basic-03", "ec420"], + } + lanforge_data = { + "ip": "10.28.3.6", + "port": 8080, + "ssh_port": 22, + "2.4G-Radio": ["1.1.wiphy4"], + "5G-Radio": ["1.1.wiphy5"], + "AX-Radio": ["1.1.wiphy0", "1.1.wiphy1", "1.1.wiphy2", "1.1.wiphy3"], + "upstream": "1.1.eth2", + "upstream_subnet": "10.28.2.1/24", + "uplink": "1.1.eth3", + "2.4G-Station-Name": "wlan0", + "5G-Station-Name": "wlan1", + "AX-Station-Name": "ax" + } + obj = RunTest(lanforge_data=lanforge_data, debug=False, influx_params=influx_params) + upstream = lanforge_data['upstream'] + # data = obj.staConnect.json_get("/port/all") + obj.eap_connect.admin_down("1.1.wiphy4") + obj.eap_connect.admin_up("1.1.wiphy4") + # print(dict(list(data['interfaces'])).keys()) + # print(obj.staConnect.json_get("/port/" + upstream.split(".")[0] + + # "/" + upstream.split(".")[1] + + # "/" + upstream.split(".")[2] + "/" + "10")) + # print("/port/" + upstream.split(".")[0] + + # "/" + upstream.split(".")[1] + + # "/" + upstream.split(".")[2] + "/" + "100") + # obj.Client_Connect(ssid="ssid_wpa_5g_br", passkey="something", security="wpa", station_name=['sta0000']) + # obj.dataplane(station_name=["sta0000"]) + # a = obj.staConnect.json_get("/events/since/") + # print(a) + # print(obj.eap_connect.json_get("port/1/1/sta0000?fields=ap,ip")) + # obj.EAP_Connect(station_name=["sta0000", "sta0001"], eap="TTLS", ssid="testing_radius") diff --git a/lf_libs/lf_tools.py b/lf_libs/lf_tools.py new file mode 100644 index 00000000..eba45dc3 --- /dev/null +++ b/lf_libs/lf_tools.py @@ -0,0 +1,881 @@ +import re +import sys +import os +import importlib + +import pytest + +sys.path.append( + os.path.dirname( + os.path.realpath(__file__) + ) +) +if "libs" not in sys.path: + sys.path.append(f'../libs') +for folder in 'py-json', 'py-scripts': + if folder not in sys.path: + sys.path.append(f'../../lanforge/lanforge-scripts/{folder}') + +sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity") + +sys.path.append(f'../libs') +sys.path.append(f'../libs/lanforge/') +import allure +from sta_connect2 import StaConnect2 + +from create_chamberview import CreateChamberview +from create_chamberview_dut import DUT +import time +from LANforge.lfcli_base import LFCliBase +import json +import os +import pandas as pd + +realm = importlib.import_module("py-json.realm") +Realm = realm.Realm +import logging + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) +stdout_handler = logging.StreamHandler(sys.stdout) +LOGGER.addHandler(stdout_handler) + + +class ChamberView: + def __init__(self, lanforge_data=None, access_point_data=None, run_lf=False, debug=True, testbed=None, cc_1=False, ap_version=None): + print("lanforge data", lanforge_data) + print("access point data", access_point_data) + self.access_point_data = access_point_data + self.access_point_data = access_point_data + self.run_lf = run_lf + self.cc_1 = cc_1 + print("testbed", testbed) + if "type" in lanforge_data.keys(): + if lanforge_data["type"] == "Non-mesh": + self.lanforge_ip = lanforge_data["ip"] + self.lanforge_port = lanforge_data["port"] + self.ssh_port = lanforge_data["ssh_port"] + self.twog_radios = lanforge_data["2.4G-Radio"] + self.fiveg_radios = lanforge_data["5G-Radio"] + self.ax_radios = lanforge_data["AX-Radio"] + self.upstream_port = lanforge_data["upstream"] + self.uplink_port = lanforge_data["uplink"] + self.upstream_subnet = lanforge_data["upstream_subnet"] + self.upstream_resources = self.upstream_port.split(".")[0] + "." + self.upstream_port.split(".")[1] + self.uplink_resources = self.uplink_port.split(".")[0] + "." + self.uplink_port.split(".")[1] + self.scenario_name = testbed + self.debug = debug + self.exit_on_error = False + self.dut_idx_mapping = {} + self.ssid_list = [] + self.staConnect = StaConnect2(self.lanforge_ip, self.lanforge_port, debug_=self.debug) + self.local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + self.raw_line = [ + ["profile_link " + self.upstream_resources + " upstream-dhcp 1 NA NA " + + self.upstream_port.split(".") + [2] + ",AUTO -1 NA"], + [ + "profile_link " + self.uplink_resources + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + + "' NA " + self.uplink_port.split(".")[2] + "," + self.upstream_port.split(".")[2] + " -1 NA"] + ] + self.CreateChamberview = CreateChamberview(self.lanforge_ip, self.lanforge_port) + + self.delete_old_scenario = True + if access_point_data: + print(len(access_point_data)) + + for ap in range(len(access_point_data)): + print(access_point_data[ap]) + self.dut_name = access_point_data[ap]["model"] + print(self.dut_name) + self.ap_model = access_point_data[ap]["model"] + self.version = access_point_data[ap]["version"].split("/")[-1] + self.serial = access_point_data[ap]["serial"] + self.ssid_data = None + if self.run_lf: + self.ssid_data = access_point_data[ap]['ssid'] + print(self.ssid_data) + + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name=self.dut_name, + sw_version=self.version, + hw_version=self.ap_model, + model_num=self.ap_model, + serial_num=self.serial + ) + self.CreateDut.ssid = [] + if self.ssid_data is not None: + self.twog_ssid = ["ssid_idx=0" + " ssid=" + self.ssid_data["2g-ssid"] + + " security=" + self.ssid_data["2g-encryption"].upper() + + " password=" + self.ssid_data["2g-password"] + + " bssid=" + self.ssid_data["2g-bssid"].lower().replace(" ", "") + ] + + self.fiveg_ssid = ["ssid_idx=1 ssid=" + + self.ssid_data["5g-ssid"] + + " security=" + + self.ssid_data["5g-encryption"].upper() + + " password=" + + self.ssid_data["5g-password"] + + " bssid=" + + self.ssid_data["5g-bssid"].lower().replace(" ", "")] + if "6g-ssid" in self.ssid_data.keys(): + print("yes") + self.sixg_ssid = ["ssid_idx=2 ssid=" + + self.ssid_data["6g-ssid"] + + " security=" + + self.ssid_data["6g-encryption"].upper() + + " password=" + + self.ssid_data["6g-password"] + + " bssid=" + + self.ssid_data["6g-bssid"].lower().replace(" ", "")] + ssid_var = [self.twog_ssid, self.fiveg_ssid, self.sixg_ssid] + else: + ssid_var = [self.twog_ssid, self.fiveg_ssid] + self.CreateDut.ssid = ssid_var + # print(self.CreateDut.ssid) + self.Create_Dut() + + if lanforge_data["type"] == "mesh": + self.lanforge_ip = lanforge_data["ip"] + self.lanforge_port = lanforge_data["port"] + self.ssh_port = lanforge_data["ssh_port"] + self.upstream_port_1 = lanforge_data["upstream-mobile-sta"] + self.upstream_port_2 = lanforge_data["upstream-root"] + self.upstream_port_3 = lanforge_data["upstream-node-1"] + self.upstream_port_4 = lanforge_data["upstream-node-2"] + self.uplink_port_1 = lanforge_data["uplink-mobile-sta"] + self.uplink_port_2 = lanforge_data["uplink-root"] + self.uplink_port_3 = lanforge_data["uplink--node-1"] + self.uplink_port_4 = lanforge_data["uplink--node-2"] + self.upstream_resource_1 = self.upstream_port_1.split(".")[0] + "." + self.upstream_port_1.split(".")[1] + self.upstream_resource_2 = self.upstream_port_2.split(".")[0] + "." + self.upstream_port_2.split(".")[1] + self.upstream_resource_3 = self.upstream_port_3.split(".")[0] + "." + self.upstream_port_3.split(".")[1] + self.upstream_resource_4 = self.upstream_port_4.split(".")[0] + "." + self.upstream_port_4.split(".")[1] + self.uplink_resource_1 = self.uplink_port_1.split(".")[0] + "." + self.uplink_port_1.split(".")[1] + self.uplink_resource_2 = self.uplink_port_2.split(".")[0] + "." + self.uplink_port_2.split(".")[1] + self.uplink_resource_3 = self.uplink_port_3.split(".")[0] + "." + self.uplink_port_3.split(".")[1] + self.uplink_resource_4 = self.uplink_port_4.split(".")[0] + "." + self.uplink_port_4.split(".")[1] + self.upstream_subnet = lanforge_data["upstream_subnet-mobile-sta"] + self.delete_old_scenario = True + self.debug = debug + self.testbed = "mesh" + self.scenario_name = "TIP-" + self.testbed + self.raw_line = [ + ["profile_link " + self.upstream_resource_1 + " upstream-dhcp 1 NA NA " + + self.upstream_port_1.split(".")[2] + ",AUTO -1 NA"], + [ + "profile_link " + self.uplink_resource_1 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + + self.uplink_port_1.split(".")[2] + "," + self.upstream_port_1.split(".")[2] + " -1 NA"] + ] + self.CreateChamberview = CreateChamberview(self.lanforge_ip, self.lanforge_port) + else: + self.lanforge_ip = lanforge_data["ip"] + self.lanforge_port = lanforge_data["port"] + self.twog_radios = lanforge_data["2.4G-Radio"] + self.fiveg_radios = lanforge_data["5G-Radio"] + self.ax_radios = lanforge_data["AX-Radio"] + self.upstream_port = lanforge_data["upstream"] + self.twog_prefix = lanforge_data["2.4G-Station-Name"] + self.fiveg_prefix = lanforge_data["5G-Station-Name"] + self.ax_prefix = lanforge_data["AX-Station-Name"] + self.uplink_port = lanforge_data["uplink"] # eth2 + self.upstream_subnet = lanforge_data["upstream_subnet"] + self.testbed = testbed + self.upstream_resources = self.upstream_port.split(".")[0] + "." + self.upstream_port.split(".")[1] + self.uplink_resources = self.uplink_port.split(".")[0] + "." + self.uplink_port.split(".")[1] + self.delete_old_scenario = True + # For chamber view + self.scenario_name = "TIP-" + self.testbed + self.debug = debug + self.exit_on_error = False + self.dut_idx_mapping = {} + self.ssid_list = [] + self.ap_version = ap_version + self.staConnect = StaConnect2(self.lanforge_ip, self.lanforge_port, debug_=self.debug) + self.raw_line = [ + ["profile_link " + self.upstream_resources + " upstream-dhcp 1 NA NA " + self.upstream_port.split(".") + [2] + ",AUTO -1 NA"], + ["profile_link " + self.uplink_resources + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + + "' NA " + self.uplink_port.split(".")[2] + "," + self.upstream_port.split(".")[2] + " -1 NA"] + ] + + # This is for rawline input | see create_chamberview_dut.py for more details + + self.CreateChamberview = CreateChamberview(self.lanforge_ip, self.lanforge_port) + + if access_point_data: + # for DUT + self.dut_name = testbed + self.ap_model = access_point_data[0]["model"] + self.ap_hw_info = access_point_data[0]["mode"] + try: + self.version = self.ap_version[0].split(" / ")[1].split("\r\n\n")[0] + print("AP version", self.version) + except Exception as e: + print(e) + self.serial = access_point_data[0]["serial"] + self.ssid_data = None + if self.run_lf: + self.ssid_data = access_point_data[0]['ssid'] + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name=self.testbed, + sw_version=self.version, + hw_version=self.ap_hw_info, + model_num=self.ap_model, + serial_num=self.serial + ) + self.CreateDut.ssid = [] + if self.ssid_data is not None: + # ssid = ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'] + # ssid = ["ssid_idx=" + str(interface) + + # " ssid=" + ssid_info_sdk[interface][0] + + # " security=" + ssid_info_sdk[interface][1].upper() + + # " password=" + ssid_info_sdk[interface][2] + + # " bssid=" + ssid_info_sdk[interface][4].lower() + # ] + self.twog_ssid = ["ssid_idx=0" + " ssid=" + self.ssid_data["2g-ssid"] + + " security=" + self.ssid_data["2g-encryption"].upper() + + " password=" + self.ssid_data["2g-password"] + + " bssid=" + self.ssid_data["2g-bssid"].lower().replace(" ", "") + ] + + self.fiveg_ssid = ["ssid_idx=1 ssid=" + + self.ssid_data["5g-ssid"] + + " security=" + + self.ssid_data["5g-encryption"].upper() + + " password=" + + self.ssid_data["5g-password"] + + " bssid=" + + self.ssid_data["5g-bssid"].lower().replace(" ", "")] + self.CreateDut.ssid = [self.twog_ssid, self.fiveg_ssid] + # print(self.CreateDut.ssid) + self.Create_Dut() + + def reset_scenario(self): + # self.layer3_cleanup() + # self.Create_Dut() + if not self.run_lf: + self.raw_line = [ + ["profile_link " + self.upstream_resources + " upstream-dhcp 1 NA NA " + self.upstream_port.split(".") + [2] + ",AUTO -1 NA"], + ["profile_link " + self.uplink_resources + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + + "' NA " + self.uplink_port.split(".")[2] + "," + self.upstream_port.split(".")[2] + " -1 NA"] + ] + print(self.raw_line) + self.Chamber_View() + + def reset_dut(self): + temp = [] + for i in range(0, 8): + temp.append(['ssid_idx=' + str(i) + ' ssid=-']) + print(temp) + self.CreateDut.ssid = temp + self.CreateDut.add_ssids() + + def get_station_list(self): + # realm_obj = self.staConnect.localrealm + sta = self.staConnect.station_list() + sta_list = [] + for i in sta: + for j in i: + sta_list.append(j) + return sta_list + + def admin_up_down(self, sta_list=[], option="up"): + # realm_obj = self.staConnect.localrealm + if option == "up": + for i in sta_list: + self.staConnect.admin_up(i) + time.sleep(0.005) + elif option == "down": + for j in sta_list: + self.staConnect.admin_down(j) + time.sleep(0.005) + time.sleep(2) + + def Chamber_View(self): + if self.delete_old_scenario: + self.CreateChamberview.clean_cv_scenario(scenario_name=self.scenario_name) + self.CreateChamberview.setup(create_scenario=self.scenario_name, + raw_line=self.raw_line + ) + self.CreateChamberview.build(self.scenario_name) + self.CreateChamberview.sync_cv() + time.sleep(2) + self.CreateChamberview.show_text_blob(None, None, True) # Show changes on GUI + self.CreateChamberview.sync_cv() + return self.CreateChamberview, self.scenario_name + + def add_vlan(self, vlan_ids=[]): + data = self.staConnect.json_get("/port/all") + flag = 0 + for vlans in vlan_ids: + for i in data["interfaces"]: + if list(i.keys())[0] != self.upstream_port + "." + str(vlans): + flag = 1 + if flag == 1: + for vlans in vlan_ids: + self.raw_line.append(["profile_link " + self.upstream_resources + " vlan-100 1 " + self.upstream_port + + " NA " + self.upstream_port.split(".")[2] + ",AUTO -1 " + str(vlans)]) + self.Chamber_View() + + def add_stations(self, band="2G", num_stations="max", dut="NA", ssid_name=[], idx=0): + LOGGER.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) + + " DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx)) + if num_stations == 0: + LOGGER.warning("0 Stations") + return + idx = idx + if self.run_lf or self.cc_1: + if band == "2G": + idx = 0 + if band == "5G": + idx = 1 + + + for i in self.dut_idx_mapping: + if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band: + idx = i + if band == "2G": + if num_stations != "max": + LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.twog_radios))) + total_sta = num_stations + max_possible = 0 + for radio in self.twog_radios: + max_possible = max_possible + int(self.get_max_sta(radio)) + if total_sta <= max_possible: + per_radio_sta = int(total_sta / len(self.twog_radios)) + rem = total_sta % len(self.twog_radios) + else: + total_sta = max_possible + per_radio_sta = int(total_sta / len(self.twog_radios)) + rem = total_sta % len(self.twog_radios) + if rem != 0 and per_radio_sta == 0: + per_radio_sta = rem / len(self.twog_radios) + LOGGER.info("Total stations per radio: " + str(per_radio_sta)) + for radio in self.twog_radios: + max_possible = int(self.get_max_sta(radio)) + if total_sta == 0: + return + num_stations = per_radio_sta + if rem == 0 and num_stations == 0: + return + if max_possible - num_stations >= rem: + num_stations = num_stations + rem + rem = 0 + elif max_possible - rem >= num_stations: + num_stations = num_stations + rem + rem = 0 + elif total_sta <= max_possible: + num_stations = total_sta + if per_radio_sta < 1: + num_stations = 1 + total_sta = total_sta - num_stations + LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.raw_line.append(station_data) + LOGGER.debug("Raw Line : " + str(station_data)) + + if num_stations == "max": + LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.twog_radios))) + for radio in self.twog_radios: + num_stations = self.get_max_sta(radio) + LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.raw_line.append(station_data) + LOGGER.debug("Raw Line : " + str(station_data)) + + if band == "5G": + if num_stations != "max": + LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.fiveg_radios))) + total_sta = num_stations + max_possible = 0 + for radio in self.fiveg_radios: + max_possible = max_possible + int(self.get_max_sta(radio)) + if total_sta <= max_possible: + per_radio_sta = int(total_sta / len(self.fiveg_radios)) + rem = total_sta % len(self.fiveg_radios) + else: + total_sta = max_possible + per_radio_sta = int(total_sta / len(self.fiveg_radios)) + rem = total_sta % len(self.fiveg_radios) + if rem != 0 and per_radio_sta == 0: + per_radio_sta = rem / len(self.fiveg_radios) + LOGGER.info("Total stations per radio: " + str(per_radio_sta)) + for radio in self.fiveg_radios: + max_possible = int(self.get_max_sta(radio)) + if total_sta == 0: + return + num_stations = per_radio_sta + if rem == 0 and num_stations == 0: + return + if max_possible - num_stations >= rem: + num_stations = num_stations + rem + rem = 0 + elif max_possible - rem >= num_stations: + num_stations = num_stations + rem + rem = 0 + elif total_sta <= max_possible: + num_stations = total_sta + if per_radio_sta < 1: + num_stations = 1 + total_sta = total_sta - num_stations + LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.raw_line.append(station_data) + LOGGER.debug("Raw Line : " + str(station_data)) + + if num_stations == "max": + LOGGER.info("Total 5G Radios Available in Testbed: " + str(len(self.fiveg_radios))) + for radio in self.fiveg_radios: + num_stations = self.get_max_sta(radio) + LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.raw_line.append(station_data) + LOGGER.debug("Raw Line : " + str(station_data)) + if band == "ax": + if num_stations != "max": + LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.ax_radios))) + total_sta = num_stations + max_possible = 0 + for radio in self.ax_radios: + max_possible = max_possible + int(self.get_max_sta(radio)) + if total_sta <= max_possible: + per_radio_sta = int(total_sta / len(self.ax_radios)) + rem = total_sta % len(self.ax_radios) + else: + total_sta = max_possible + per_radio_sta = int(total_sta / len(self.ax_radios)) + rem = total_sta % len(self.ax_radios) + if rem != 0 and per_radio_sta == 0: + per_radio_sta = rem / len(self.ax_radios) + LOGGER.info("Total stations per radio: " + str(per_radio_sta)) + for radio in self.ax_radios: + max_possible = int(self.get_max_sta(radio)) + if total_sta == 0: + return + num_stations = per_radio_sta + if rem == 0 and num_stations == 0: + return + if max_possible - num_stations >= rem: + num_stations = num_stations + rem + rem = 0 + elif max_possible - rem >= num_stations: + num_stations = num_stations + rem + rem = 0 + elif total_sta <= max_possible: + num_stations = total_sta + if per_radio_sta < 1: + num_stations = 1 + total_sta = total_sta - num_stations + LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.raw_line.append(station_data) + LOGGER.debug("Raw Line : " + str(station_data)) + if num_stations == "max": + LOGGER.info("Total AX Radios Available in Testbed: " + str(len(self.ax_radios))) + for radio in self.ax_radios: + num_stations = self.get_max_sta(radio) + LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.raw_line.append(station_data) + LOGGER.debug("Raw Line : " + str(station_data)) + + def Create_Dut(self): + self.CreateDut.setup() + self.CreateDut.add_ssids() + self.CreateDut.cv_test.show_text_blob(None, None, True) # Show changes on GUI + self.CreateDut.cv_test.sync_cv() + time.sleep(2) + self.CreateDut.cv_test.show_text_blob(None, None, True) # Show changes on GUI + self.CreateDut.cv_test.sync_cv() + return self.CreateDut, self.dut_name + + def update_ssid(self, ssid_data=[]): + self.CreateDut.ssid = ssid_data + self.CreateDut.add_ssids() + # SSID data should be in this format + # [ + # ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'], + # ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59'] + # ] + pass + + def json_get(self, _req_url="/"): + cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, ) + json_response = cli_base.json_get(_req_url=_req_url) + return json_response + + def json_post(self, req_url, shelf, resources, port, current, intrest): + data = { + "shelf": shelf, + "resource": resources, + "port": port, + "current_flags": current, + "interest": intrest + } + cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, ) + return cli_base.json_post(req_url, data) + + def station_data_query(self, station_name="wlan0", query="channel"): + x = self.twog_radios[0].split(".") + url = f"/port/{x[0]}/{x[1]}/{station_name}?fields={query}" + print("url", url) + response = self.json_get(_req_url=url) + print("response: ", response) + if (response is None) or ("interface" not in response): + print("station_list: incomplete response:") + # pprint(response) + exit(1) + y = response["interface"][query] + return y + + def read_kpi_file(self, column_name, dir_name): + if column_name == None: + df = pd.read_csv("../reports/" + str(dir_name) + "/kpi.csv", sep=r'\t', engine='python') + if df.empty == True: + return "empty" + else: + return df + else: + df = pd.read_csv("../reports/" + str(dir_name) + "/kpi.csv", sep=r'\t', usecols=column_name, + engine='python') + if df.empty == True: + return "empty" + else: + result = df[column_name].values.tolist() + return result + + def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False, + file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv", + batch_size="0"): + try: + df = pd.read_csv( + "../reports/" + str(dir_name) + file_name, + sep=r'\t', engine='python') + print("csv file opened") + except FileNotFoundError: + print("csv file does not exist") + return False + + if kpi_csv: + count = 0 + dict_data = {"Down": {}, "Up": {}, "Both": {}} + csv_short_dis = df.loc[:,"short-description"] + csv_num_score = df.loc[:,"numeric-score"] + for i in range(len(batch_size.split(","))): + dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0] + dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1] + dict_data["Both"][csv_short_dis[count + 2]] = csv_num_score[count + 2] + count += 3 + + if individual_station_throughput: + dict_data = {} + if option == "download": + csv_sta_names = df.iloc[[0]].values.tolist() + csv_throughput_values = df.iloc[[1]].values.tolist() + elif option == "upload": + csv_sta_names = df.iloc[[0]].values.tolist() + csv_throughput_values = df.iloc[[2]].values.tolist() + else: + print("Provide proper option: download or upload") + return + + sta_list = csv_sta_names[0][0][:-1].replace('"', '').split(",") + th_list = list(map(float, csv_throughput_values[0][0].split(","))) + for i in range(len(sta_list)): + dict_data[sta_list[i]] = th_list[i] + + return dict_data + + def attach_report_kpi(self, report_name=None, file_name="kpi_file"): + path = "../reports/" + str(report_name) + "/kpi.csv" + if os.path.exists(path): + allure.attach.file(source=path, + name=file_name, attachment_type="CSV") + return os.path.exists(path) + + def attach_report_graphs(self, report_name=None, pdf_name="WIFI Capacity Test PDF Report"): + relevant_path = "../reports/" + report_name + "/" + entries = os.listdir("../reports/" + report_name + '/') + pdf = False + for i in entries: + if ".pdf" in i: + pdf = i + if pdf: + allure.attach.file(source=relevant_path + pdf, + name=pdf_name) + + included_extensions = ['png'] + file_names = [fn for fn in os.listdir(relevant_path) + if any(fn.endswith(ext) for ext in included_extensions)] + + a = [item for item in file_names if 'print' not in item] + a = [item for item in a if 'logo' not in item] + a = [item for item in a if 'Logo' not in item] + a = [item for item in a if 'candela' not in item] + + a.sort() + for i in a: + allure.attach.file(source=relevant_path + i, + name=i, + attachment_type="image/png", extension=None) + + def create_mesh_scenario(self): + # upstream_list = [] + # for data in range(0,len(self.access_point_data)): + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name="upstream") # + str(data)) + self.CreateDut.lan_port = "10.28.2.1/24" + # name = "upstream" + str(data) + # upstream_list.append(name) + + self.CreateDut.setup() + # data = data + 1 + self.raw_line = [ + ["profile_link " + self.upstream_resource_2 + " upstream-dhcp 1 NA NA " + self.upstream_port_2.split(".")[ + 2] + ",AUTO -1 NA"], + [ + "profile_link " + self.uplink_resource_2 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + + self.uplink_port_2.split(".")[2] + "," + self.upstream_port_2.split(".")[2] + " -1 NA"], + ["profile_link " + self.upstream_resource_3 + " upstream-dhcp 1 NA NA " + self.upstream_port_3.split(".")[ + 2] + ",AUTO -1 NA"], + [ + "profile_link " + self.uplink_resource_3 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + + self.uplink_port_3.split(".")[2] + "," + self.upstream_port_3.split(".")[2] + " -1 NA"], + ["profile_link " + self.upstream_resource_4 + " upstream-dhcp 1 NA NA " + self.upstream_port_4.split(".")[ + 2] + ",AUTO -1 NA"], + [ + "profile_link " + self.uplink_resource_4 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + + self.uplink_port_4.split(".")[2] + "," + self.upstream_port_4.split(".")[2] + " -1 NA"] + ] + print(self.raw_line) + mesh = self.Chamber_View() + return mesh + + def create_mesh_scenario_dhcp_disable(self): + + # upstream_list = [] + # for data in range(0,len(self.access_point_data)): + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name="upstream") + self.CreateDut.lan_port = "10.28.2.1/24" + # name = "upstream" + str(data) + # upstream_list.append(name) + + self.CreateDut.setup() + # data = data + 1 + self.raw_line = [ + ["profile_link " + self.upstream_resource_2 + " upstream-dhcp 1 NA NA " + self.upstream_port_2.split(".")[ + 2] + ",AUTO -1 NA"], + [ + "profile_link " + self.uplink_resource_2 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + + self.uplink_port_2.split(".")[2] + "," + self.upstream_port_2.split(".")[2] + " -1 NA"], + ["profile_link " + self.upstream_resource_3 + " upstream 1 NA NA " + self.upstream_port_3.split(".")[ + 2] + ",AUTO -1 NA"], + [ + "profile_link " + self.uplink_resource_3 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + + self.uplink_port_3.split(".")[2] + "," + self.upstream_port_3.split(".")[2] + " -1 NA"], + ["profile_link " + self.upstream_resource_4 + " upstream 1 NA NA " + self.upstream_port_4.split(".")[ + 2] + ",AUTO -1 NA"], + [ + "profile_link " + self.uplink_resource_4 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + + self.uplink_port_4.split(".")[2] + "," + self.upstream_port_4.split(".")[2] + " -1 NA"] + ] + print(self.raw_line) + mesh = self.Chamber_View() + return mesh + + def create_mesh_dut(self, ssid_data=None): + print("hi ssid data", ssid_data) + for ap, ssid in zip(self.access_point_data, range(len(ssid_data))): + print("ap", ap) + print(ssid) + print(ssid_data[ssid]) + self.dut_name = "tip-" + str(ap["type"]) + print(self.dut_name) + self.ap_model = ap["model"] + self.version = ap["version"].split("/")[-1] + self.serial = ap["serial"] + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name=self.dut_name, + hw_version=self.ap_model, + model_num=self.ap_model, + serial_num=self.serial + ) + self.Create_Dut() + # [['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58']] + self.update_ssid(ssid_data=ssid_data[ssid]) + + + def create_non_meh_dut(self, ssid_data=[]): + # print("hi") + for ap, ssid in zip(self.access_point_data, range(len(ssid_data))): + print("ap", ap) + print(ssid_data[ssid]) + self.dut_name = ap["model"] + print(self.dut_name) + self.ap_model = ap["model"] + self.version = ap["version"].split("/")[-1] + self.serial = ap["serial"] + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name=self.dut_name, + sw_version=self.version, + hw_version=self.ap_model, + model_num=self.ap_model, + serial_num=self.serial + ) + self.Create_Dut() + # [['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58']] + self.update_ssid(ssid_data=ssid_data[ssid]) + + + + def set_radio_antenna(self, req_url, shelf, resources, radio, antenna): + data = { + "shelf": shelf, + "resource": resources, + "radio": radio, + "antenna": antenna + } + cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, ) + return cli_base.json_post(req_url, data) + + def get_max_sta(self, radio=""): + data = self.json_get("/radiostatus/all") + return data[radio]["max_vifs"] + + +def main(): + # # lanforge_data = {'ip': 'localhost', 'port': 8802, 'ssh_port': 8804, '2.4G-Radio': ['1.1.wiphy0', '1.1.wiphy2'], '5G-Radio': ['1.1.wiphy1', '1.1.wiphy3'], 'AX-Radio': ['1.1.wiphy4', '1.1.wiphy5', '1.1.wiphy6', '1.1.wiphy7'], 'upstream': '1.1.eth2', 'upstream_subnet': '10.28.2.1/24', 'uplink': '1.1.eth1', '2.4G-Station-Name': 'sta00', '5G-Station-Name': 'sta10', 'AX-Station-Name': 'ax'} + # lanforge_data = { + # "type": "mesh", + # "ip": "localhost", # 10.28.3.14 + # "port": 8802, # 8080 + # "ssh_port": 8804, + # "2.4G-Radio-mobile-sta": ["1.1.wiphy0", "1.1.wiphy2"], + # "5G-Radio-mobile-sta": ["1.1.wiphy1", "1.1.wiphy3"], + # "AX-Radio-mobile-sta": ["1.1.wiphy4", "1.1.wiphy5", "1.1.wiphy6", "1.1.wiphy7"], + # "upstream-mobile-sta": "1.1.eth2", + # "upstream_subnet-mobile-sta": "10.28.2.1/24", + # "uplink-mobile-sta": "1.1.eth3", + # "2.4G-Radio-root": ["1.2.wiphy0"], + # "5G-Radio-root": ["1.2.wiphy1"], + # "AX-Radio-root": [], + # "upstream-root": "1.2.eth2", + # "upstream_subnet-root": "10.28.2.1/24", + # "uplink-root": "1.2.eth3", + # "2.4G-Radio-node-1": ["1.3.wiphy0"], + # "5G-Radio-node-1": ["1.3.wiphy1"], + # "AX-Radio-node-1": [], + # "upstream-node-1": "1.3.eth2", + # "upstream_subnet-node-1": "10.28.2.1/24", + # "uplink--node-1": "1.3.eth3", + # "2.4G-Radio-node-2": ["1.4.wiphy0"], + # "5G-Radio-node-2": ["1.4.wiphy1"], + # "AX-Radio-node-2": [], + # "upstream-node-2": "1.4.eth2", + # "upstream_subnet-node-2": "10.28.2.1/24", + # "uplink--node-2": "1.4.eth3", + # "2.4G-Station-Name": "wlan0", + # "5G-Station-Name": "wlan0", + # "AX-Station-Name": "ax" + # } + # # ap_data = [{'model': 'wf188n', 'mode': 'wifi6', 'serial': '0000c1018812', 'jumphost': True, 'ip': 'localhost', 'username': 'lanforge', 'password': 'pumpkin77', 'port': 8803, 'jumphost_tty': '/dev/ttyAP1', 'version': 'https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/uCentral/cig_wf188/20210729-cig_wf188-v2.0.0-rc2-ec3662e-upgrade.bin'}] + # ap_data = [ + # { + # 'type': 'root', + # 'model': 'eap101', + # 'mode': 'wifi6', + # 'serial': '34efb6af4a7a', + # 'jumphost': True, + # 'ip': "localhost", # 10\.28\.3\.101 + # 'username': "lanforge", + # 'password': "pumpkin77", + # 'port': 8803, # 22 + # 'jumphost_tty': '/dev/ttyAP2', + # 'version': "latest" + # }, + # { + # 'type': 'node-1', + # 'model': 'eap101', + # 'mode': 'wifi6', + # 'serial': '34efb6af4903', + # 'jumphost': True, + # 'ip': "localhost", # 10\.28\.3\.101 + # 'username': "lanforge", + # 'password': "pumpkin77", + # 'port': 8803, # 22 + # 'jumphost_tty': '/dev/ttyAP3', + # 'version': "latest" + # }, + # { + # 'type': 'node-2', + # 'model': 'eap102', + # 'mode': 'wifi6', + # 'serial': '34efb6af4a7a', + # 'jumphost': True, + # 'ip': "localhost", # 10\.28\.3\.101 + # 'username': "lanforge", + # 'password': "pumpkin77", + # 'port': 8803, # 22 + # 'jumphost_tty': '/dev/ttyAP4', + # 'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/eap101/trunk/eap101-1.1.0.tar.gz" + # } + # ] + # testbed = "mesh" + lanforge_data = { + "ip": "10.28.3.32", + "port": 8080, + "ssh_port": 22, + "2.4G-Radio": ["1.1.wiphy0", "1.1.wiphy2", "1.1.wiphy1"], + "5G-Radio": ["1.1.wiphy1", "1.1.wiphy3"], + "AX-Radio": ["1.1.wiphy4", "1.1.wiphy5", "1.1.wiphy6", "1.1.wiphy7"], + "upstream": "1.1.eth2", + "upstream_subnet": "10.28.2.1/24", + "uplink": "1.1.eth3", + "2.4G-Station-Name": "sta10", + "5G-Station-Name": "sta00", + "AX-Station-Name": "ax" + } + ap_data = [{ + "model": "edgecore_eap101", + "mode": "wifi6", + "serial": "903cb36ae223", + "jumphost": True, + "ip": "10.28.3.103", + "username": "lanforge", + "password": "pumpkin77", + "port": 22, + "jumphost_tty": "/dev/ttyAP3", + "version": "release-latest" + }] + obj = ChamberView(lanforge_data=lanforge_data, access_point_data=ap_data, testbed="basic") + obj.add_stations("2G", "max", obj.dut_name, "abc") + # lf_tools.add_stations(band="2G", num_stations="max", dut=lf_tools.dut_name, ssid_name=ssid_name) + # obj.create_mesh_dut() + + +if __name__ == '__main__': + main()