import importlib import logging import os import sys import time from datetime import datetime import allure import paramiko import pytest from scp import SCPClient from tabulate import tabulate from itertools import islice import csv import pandas as pd sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base") LFCliBase = lfcli_base.LFCliBase realm = importlib.import_module("py-json.realm") cv_test_manager = importlib.import_module("py-json.cv_test_manager") cv_test = cv_test_manager.cv_test lf_cv_base = importlib.import_module("py-json.lf_cv_base") ChamberViewBase = lf_cv_base.ChamberViewBase create_chamberview_dut = importlib.import_module("py-scripts.create_chamberview_dut") DUT = create_chamberview_dut.DUT create_chamberview = importlib.import_module("py-scripts.create_chamberview") CreateChamberview = create_chamberview.CreateChamberview sta_connect2 = importlib.import_module("py-scripts.sta_connect2") StaConnect2 = sta_connect2.StaConnect2 profile_utility = importlib.import_module("py-json.profile_utility") ProfileUtility = profile_utility.ProfileUtility createstation = importlib.import_module("py-scripts.create_station") CreateStation = createstation.CreateStation sniffradio = importlib.import_module("py-scripts.lf_sniff_radio") SniffRadio = sniffradio.SniffRadio stascan = importlib.import_module("py-scripts.sta_scan_test") StaScan = stascan.StaScan cv_test_reports = importlib.import_module("py-json.cv_test_reports") lf_report = cv_test_reports.lanforge_reports attenuator_serial = importlib.import_module("py-scripts.attenuator_serial") Attenuator = attenuator_serial.AttenuatorSerial lf_atten_mod_test = importlib.import_module("py-scripts.lf_atten_mod_test") Attenuator_modify = lf_atten_mod_test.CreateAttenuator class lf_libs: """ "traffic_generator": { "name": "lanforge", "scenario": "", "details": { "manager_ip": "", "http_port": 8080, "ssh_port": 22, "default_setup_DB": "Test_Scenario", "wan_ports": ["1.1.eth2"], "lan_ports": ["1.1.eth1"] } } """ lanforge_data = dict() manager_ip = None testbed = None run_lf = False manager_http_port = None manager_ssh_port = None manager_default_db = None wan_ports = None lan_ports = None uplink_nat_ports = None dut_data = None dut_objects = [] default_scenario_name = None default_scenario_test = None default_scenario_raw_lines = [] temp_raw_lines = [] chamberview_object = None max_possible_stations = None max_2g_stations = None max_5g_stations = None max_6g_stations = None max_ax_stations = None max_ac_stations = None twog_prefix = "ath10k_2g0" fiveg_prefix = "ath10k_5g0" sixg_prefix = "AX210_0" ax_prefix = "AX200_0" pcap_obj = None """ Scenario : dhcp-bridge / dhcp-external dhcp-bridge - wan_ports will act as dhcp server for AP's and it will use uplink_nat_ports for uplink NAT lan_ports will have IP Address from the AP dhcp-external - wan_ports will have IP Address from the dhcp server that will be hosted outside the lanforge lan_ports will have IP Address from the AP In case of VLAN configurations, wan_ports will be tagged to get IP Address from VLANS that are outside LANforge """ scenario = None """ Scenario in chamberview which will be read by read_cv_scenario() and stored here This will be used to add additional stuff on scenario along with this """ cv_scenario = None """ Number of Resources available """ resources = None """ ax radio - supports (2.4G and 5gHz Band) Maximum 1 Station per radio """ ax200_radios = [] """ 6e radio - supports (2.4GHz, 5gHz and 6gHz Band) Maximum 1 Station per radio """ ax210_radios = [] """ ax radio - supports (2.4G and 5gHz Band) Maximum 19 Station per radio """ mtk_radios = [] """ ax radio - supports (2.4G Band) Maximum 64 Station per radio """ wave2_2g_radios = [] """ ax radio - supports (5gHz Band) Maximum 64 Station per radio """ wave2_5g_radios = [] """ ax radio - supports (2.4G and 5gHz Band) Maximum 64 Station per radio """ wave1_radios = [] """lf_tests ax radio - supports (2.4G and 5gHz Band) Maximum 64 Station per radio """ attenuator = [] """ Realm Object can be used to call various methods available """ local_realm = None def __init__(self, lf_data={}, dut_data=[], run_lf=False, log_level=logging.DEBUG): logging.basicConfig(format='%(asctime)s - %(message)s', level=log_level) lf_data = dict(lf_data) self.dut_data = dut_data self.dut_data = dut_data self.run_lf = run_lf self.dut_idx_mapping = {} # try: self.lanforge_data = lf_data.get("details") self.testbed = lf_data.get("testbed") self.scenario = lf_data.get("scenario") self.setup_lf_data() self.setup_relevent_profiles() self.setup_metadata() if self.scenario == "dhcp-bridge": logging.info("Scenario name: " + str(self.scenario)) # creating default raw lines for chamberview self.create_dhcp_bridge() elif self.scenario == "dhcp-external": logging.info("Scenario name: " + str(self.scenario)) self.create_dhcp_external() self.chamber_view(raw_lines="default") self.temp_raw_lines = self.default_scenario_raw_lines.copy() self.setup_dut() # except Exception as e: # logging.error("lf_data has bad values: " + str(lf_data)) # logging.error(e) """ setup_lf_data : used to set object variables that are passed from lab_info.json It also creates object for realm and CreateChamberview class object which can be used further """ def setup_lf_data(self): try: self.manager_ip = self.lanforge_data.get("manager_ip") self.manager_http_port = self.lanforge_data.get("http_port") self.manager_ssh_port = self.lanforge_data.get("ssh_port") self.manager_default_db = self.lanforge_data.get("default_setup_DB") self.wan_ports = self.lanforge_data.get("wan_ports") self.lan_ports = self.lanforge_data.get("lan_ports") self.uplink_nat_ports = self.lanforge_data.get("uplink_nat_ports") self.local_realm = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) self.chamberview_object = CreateChamberview(self.manager_ip, self.manager_http_port) self.default_scenario_raw_lines = [] except Exception as e: logging.error(e) """ setup_dut : It read the dut data and creates the dut with relevent data """ def setup_dut(self): self.dut_objects = [] logging.info("DUT Data: " + str(self.dut_data)) for index in range(0, len(self.dut_data)): dut_obj = DUT(lfmgr=self.manager_ip, port=self.manager_http_port, dut_name=self.dut_data[index]["identifier"], sw_version=self.dut_data[index]["firmware_version"], hw_version=self.dut_data[index]["mode"], model_num=self.dut_data[index]["model"], serial_num=self.dut_data[index]["identifier"]) dut_obj.setup() dut_obj.add_ssids() self.dut_objects.append(dut_obj) logging.info("Creating DUT") def setup_metadata(self): data = self.json_get("/port/all") all_eth_ports = [] for info in data["interfaces"]: if (info[list(info.keys())[0]]["port type"]) == "Ethernet": all_eth_ports.append(list(dict(info).keys())[0]) logging.info("Available Ports: " + str(all_eth_ports)) for port in self.wan_ports: if port not in all_eth_ports: logging.error("LANforge system doesn't contains the expected WAN Port: " + str(port)) continue logging.info("WAN Port is Available on LANforge Port Manager: " + str(port)) for port in self.lan_ports: if port not in all_eth_ports: logging.error("LANforge system doesn't contains the expected LAN Port: " + str(port)) continue logging.info("LAN Port is Available on LANforge Port Manager: " + str(port)) data = self.json_get("/radiostatus/all") all_radios = [] all_radio_eid = [] self.max_possible_stations = 0 self.max_2g_stations = 0 self.max_5g_stations = 0 self.max_6g_stations = 0 self.max_ax_stations = 0 self.max_ac_stations = 0 phantom_radios = [] for info in data: if info == "handler" or info == "uri" or info == "warnings": continue all_radio_eid.append(info) all_radios.append(data[info]) if str(data[info]["phantom"]).__contains__("True"): phantom_radios.append(str(data[info]["entity id"])) logging.error("Radio is in phantom state: " + str(data[info]["entity id"]) + " ,Please Contact: support@candelatech.com") if str(data[info]["driver"]).__contains__("AX210"): self.max_possible_stations += 1 self.max_2g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_5g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_6g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_ax_stations += 1 * int(str(data[info]["max_vifs"])) self.ax210_radios.append(info) if str(data[info]["driver"]).__contains__("AX200"): self.max_possible_stations += 1 * int(str(data[info]["max_vifs"])) self.max_2g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_5g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_ax_stations += 1 * int(str(data[info]["max_vifs"])) self.ax200_radios.append(info) if str(data[info]["driver"]).__contains__("ath10k(988x)"): self.max_possible_stations += 1 * int(str(data[info]["max_vifs"])) self.max_2g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_5g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_ac_stations += 1 * int(str(data[info]["max_vifs"])) self.wave1_radios.append(info) if str(data[info]["driver"]).__contains__("ath10k(9984)"): if str(data[info]["capabilities"]).__contains__("802.11bgn-AC"): self.max_possible_stations += 1 * int(str(data[info]["max_vifs"])) self.max_2g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_ac_stations += 1 * int(str(data[info]["max_vifs"])) self.wave2_2g_radios.append(info) if str(data[info]["capabilities"]).__contains__("802.11an-AC"): self.max_possible_stations += 1 * int(str(data[info]["max_vifs"])) self.max_5g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_ac_stations += 1 * int(str(data[info]["max_vifs"])) self.wave2_5g_radios.append(info) if str(data[info]["driver"]).__contains__("mt7915e"): self.max_possible_stations += 1 * int(str(data[info]["max_vifs"])) self.max_2g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_5g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_ax_stations += 1 * int(str(data[info]["max_vifs"])) self.mtk_radios.append(info) logging.info("Radio Information is Extracted") logging.info("Available Radios: " + str(all_radio_eid) + " - Phantom Radios: " + str(phantom_radios)) logging.info("max_possible_stations: " + str(self.max_possible_stations)) logging.info("max_2g_stations: " + str(self.max_2g_stations)) logging.info("max_5g_stations: " + str(self.max_5g_stations)) logging.info("max_6g_stations: " + str(self.max_6g_stations)) logging.info("max_ax_stations: " + str(self.max_ax_stations)) logging.info("max_ac_stations: " + str(self.max_ac_stations)) def load_scenario(self): self.local_realm.load(self.manager_default_db) def setup_connectivity_port(self, data=None): """setting up ethernet port""" if len(data) == 0: return for eth_port in data: if data[eth_port]["addressing"] == "dhcp-server": return elif data[eth_port]["addressing"] == "static": try: data = { "shelf": eth_port.split(".")[0], "resource": eth_port.split(".")[1], "port": eth_port.split(".")[2], "ip_addr": data[eth_port]["ip"].split("/")[0], "netmask": data[eth_port]["ip_mask"], "gateway": data[eth_port]["gateway_ip"].split("/")[0], "dns_servers": data[eth_port]["dns_servers"], "current_flags": 562949953421312, "interest": 0x401e } self.json_post("/cli-json/set_port", data) time.sleep(1) except Exception as e: logging.error(e) elif data[eth_port]["addressing"] == "dynamic": try: data = { "shelf": eth_port.split(".")[0], "resource": eth_port.split(".")[1], "port": eth_port.split(".")[2], "current_flags": 2147483648, "interest": 16384 } self.json_post("/cli-json/set_port", data) time.sleep(1) except Exception as e: logging.error(e) def create_dhcp_bridge(self): """ create chamber view scenario for DHCP-Bridge""" self.setup_connectivity_port(data=self.wan_ports) self.setup_connectivity_port(data=self.lan_ports) self.setup_connectivity_port(data=self.uplink_nat_ports) for wan_ports, uplink_nat_ports in zip(self.wan_ports, self.uplink_nat_ports): upstream_port = wan_ports upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1] uplink_port = uplink_nat_ports uplink_resources = uplink_port.split(".")[0] + "." + uplink_port.split(".")[1] uplink_subnet = self.uplink_nat_ports[uplink_nat_ports]["ip"] gateway_ip = self.uplink_nat_ports[uplink_nat_ports]["gateway_ip"] dut_obj = DUT(lfmgr=self.manager_ip, port=self.manager_http_port, dut_name="upstream", lan_port=gateway_ip) dut_obj.setup() dut_obj.add_ssids() # dut_obj.show_text_blob(None, None, True) # Show changes on GUI # dut_obj.sync_cv() # time.sleep(2) # dut_obj.sync_cv() self.default_scenario_raw_lines.append(["profile_link " + upstream_resources + " upstream-dhcp 1 NA NA " + upstream_port.split(".")[2] + ",AUTO -1 NA"]) self.default_scenario_raw_lines.append( ["profile_link " + uplink_resources + " uplink-nat 1 'DUT: upstream LAN " + gateway_ip + "' NA " + uplink_port.split(".")[2] + "," + upstream_port.split(".")[2] + " -1 NA"]) return self.default_scenario_raw_lines def create_dhcp_external(self): self.setup_connectivity_port(data=self.wan_ports) self.setup_connectivity_port(data=self.lan_ports) for wan_port in self.wan_ports: upstream_port = wan_port upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1] self.default_scenario_raw_lines.append(["profile_link " + upstream_resources + " upstream 1 NA NA " + upstream_port.split(".")[2] + ",AUTO -1 NA"]) return self.default_scenario_raw_lines def json_get(self, _req_url="/"): cli_base = LFCliBase(_lfjson_host=self.manager_ip, _lfjson_port=self.manager_http_port) json_response = cli_base.json_get(_req_url=_req_url) return json_response def json_post(self, _req_url="/", data=None): cli_base = LFCliBase(_lfjson_host=self.manager_ip, _lfjson_port=self.manager_http_port) json_response = cli_base.json_post(_req_url=_req_url, _data=data) return json_response def read_cv_scenario(self): cv_obj = cv_test(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) cv_obj.show_text_blob(type="Last-Built-Scenario") data = self.json_get("/text/Last-Built-Scenario.last_built") data = data['record']['text'].split("\n") for d in data: if "scenario-name" in d: self.default_scenario_name = d.split(":")[1][1:] cv_obj.apply_cv_scenario(self.default_scenario_name) time.sleep(2) cv_obj.show_text_blob(type="Network-Connectivity") data = self.json_get("/text/Network-Connectivity." + str(self.default_scenario_name)) data = data["record"]["text"].split("\n") for d in data: self.temp_raw_lines = self.default_scenario_raw_lines.copy() if "profile_link" in d: self.temp_raw_lines.append([d]) logging.info("Saved default CV Scenario details: " + str(self.temp_raw_lines)) def setup_interfaces(self, ssid="", bssid="", passkey="", encryption="", band=None, vlan_id=None, mode=None, num_sta=None, dut_data_=None): logging.info("dut_data_ in setup_interfaces: " + str(dut_data_)) if dut_data_ is None: pytest.skip("No DUT data received") if band not in ["twog", "fiveg", "sixg"]: pytest.skip("Unsupported Band Selected: " + str(band)) if mode not in ["BRIDGE", "NAT-WAN", "NAT-LAN", "VLAN"]: pytest.skip("Unsupported Mode Selected: " + str(mode)) r_val = dict() for dut in self.dut_data: r_val[dut["identifier"]] = None for i in r_val: r_val[i] = dict.fromkeys(["ssid", "bssid", "passkey", "encryption", "upstream_port", "upstream_resource", "upstream", "station_data", "sniff_radio_2g", "sniff_radio_5g", "sniff_radio_6g"]) if mode == "BRIDGE": ret = self.get_wan_upstream_ports() for dut in r_val: if ret.keys().__contains__(dut) and ret[dut] is not None: upstream_data = (ret[dut]).split(".") r_val[dut]["upstream_port"] = ret[dut] upstream_resource = upstream_data[1] r_val[dut]["upstream_resource"] = upstream_resource upstream_data.pop(0) upstream_data.pop(0) upstream = ".".join(upstream_data) r_val[dut]["upstream"] = upstream else: r_val.pop(dut) if mode == "NAT-WAN": ret = self.get_wan_upstream_ports() for dut in r_val: if ret.keys().__contains__(dut) and ret[dut] is not None: upstream_data = (ret[dut]).split(".") r_val[dut]["upstream_port"] = ret[dut] upstream_resource = upstream_data[1] r_val[dut]["upstream_resource"] = upstream_resource upstream_data.pop(0) upstream_data.pop(0) upstream = ".".join(upstream_data) r_val[dut]["upstream"] = upstream else: r_val.pop(dut) if mode == "NAT-LAN": ret = self.get_lan_upstream_ports() temp = r_val.copy() for dut in temp: if ret.keys().__contains__(dut) and ret[dut] is not None: upstream_data = (ret[dut]).split(".") r_val[dut]["upstream_port"] = ret[dut] upstream_resource = upstream_data[1] r_val[dut]["upstream_resource"] = upstream_resource upstream_data.pop(0) upstream_data.pop(0) upstream = ".".join(upstream_data) r_val[dut]["upstream"] = upstream else: r_val.pop(dut) if mode == "VLAN": if vlan_id is None: logging.error("VLAN ID is Unspecified in the VLAN Case") pytest.skip("VLAN ID is Unspecified in the VLAN Case") else: self.add_vlan(vlan_ids=vlan_id) ret = self.get_wan_upstream_ports() for dut in r_val: if ret.keys().__contains__(dut) and ret[dut] is not None: upstream_data = (ret[dut] + "." + str(vlan_id[0])).split(".") r_val[dut]["upstream_port"] = ret[dut] + "." + str(vlan_id[0]) upstream_resource = upstream_data[1] r_val[dut]["upstream_resource"] = upstream_resource upstream_data.pop(0) upstream_data.pop(0) upstream = ".".join(upstream_data) r_val[dut]["upstream"] = upstream else: r_val.pop(dut) dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios, "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, "ax200_radios": self.ax200_radios, "ax210_radios": self.ax210_radios} dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, "ax200_radios": self.ax200_radios, "ax210_radios": self.ax210_radios} dict_all_radios_6g = {"ax210_radios": self.ax210_radios} max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19, "ax200_radios": 1, "ax210_radios": 1} radio_data = {} sniff_radio = "" sta_prefix = "" data_dict = {} # deleting existing stations and layer 3 # self.pre_cleanup() data_dict["sniff_radio_2g"] = None data_dict["sniff_radio_5g"] = None data_dict["sniff_radio_6g"] = None if band == "twog": sta_prefix = self.twog_prefix data_dict["sta_prefix"] = sta_prefix # checking station compitality of lanforge if int(num_sta) > int(self.max_2g_stations): logging.error("Can't create %s stations on lanforge" % num_sta) pytest.skip("Can't create %s stations on lanforge" % num_sta) # checking atleast one 2g radio is available or not elif len(self.wave2_2g_radios) == 0 and len(self.wave1_radios) == 0 and len(self.ax210_radios) == 0 and len( self.ax200_radios) == 0 and len(self.mtk_radios) == 0: logging.error("Twog radio is not available") pytest.skip("Twog radio is not available") # radio and station selection stations = num_sta for j in dict_all_radios_2g: max_station = max_station_per_radio[j] if stations > 0: if len(dict_all_radios_2g[j]) > 0: diff = max_station - stations for i in dict_all_radios_2g[j]: if diff >= 0: radio_data[i] = stations stations = 0 break elif diff < 0: radio_data[i] = max_station stations = stations - max_station diff = max_station - stations sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) data_dict["sniff_radio_2g"] = sniff_radio if band == "fiveg": sta_prefix = self.fiveg_prefix data_dict["sta_prefix"] = sta_prefix # checking station compitality of lanforge if int(num_sta) > int(self.max_5g_stations): logging.error("Can't create %s stations on lanforge" % num_sta) pytest.skip("Can't create %s stations on lanforge" % num_sta) # checking atleast one 5g radio is available or not elif len(self.wave2_5g_radios) == 0 and len(self.wave1_radios) == 0 and len(self.ax210_radios) == 0 and len( self.ax200_radios) == 0 and len(self.mtk_radios) == 0: logging.error("fiveg radio is not available") pytest.skip("fiveg radio is not available") # radio and station selection stations = num_sta for j in dict_all_radios_5g: max_station = max_station_per_radio[j] if stations > 0: if len(dict_all_radios_5g[j]) > 0: diff = max_station - stations for i in dict_all_radios_5g[j]: if diff >= 0: radio_data[i] = stations stations = 0 break elif diff < 0: radio_data[i] = max_station stations = stations - max_station diff = max_station - stations # setup sniffer sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) data_dict["sniff_radio_5g"] = sniff_radio if band == "sixg": sta_prefix = self.sixg_prefix data_dict["sta_prefix"] = sta_prefix # checking station compitality of lanforge if int(num_sta) > int(self.max_6g_stations): logging.error("Can't create %s stations on lanforge" % num_sta) pytest.skip("Can't create %s stations on lanforge" % num_sta) # checking atleast one 6g radio is available or not elif len(self.ax210_radios) == 0: logging.error("sixg radio is not available") pytest.skip("sixg radio is not available") # radio and station selection stations = num_sta for j in dict_all_radios_6g: max_station = max_station_per_radio[j] if stations > 0: if len(dict_all_radios_6g[j]) > 0: diff = max_station - stations for i in dict_all_radios_6g[j]: if diff >= 0: radio_data[i] = stations stations = 0 break elif diff < 0: radio_data[i] = max_station stations = stations - max_station diff = max_station - stations sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) data_dict["sniff_radio_6g"] = sniff_radio dict_radio_sta_list = {} # list of per radio station length_to_split = list(radio_data.values()) # station list of per radio sta_list = self.get_station_list(num_sta=num_sta, band=band) sta_list = iter(sta_list) sta_list_ = [list(islice(sta_list, elem)) for elem in length_to_split] # Checking station lists according to radios if len(sta_list_) == len(length_to_split): dict_radio_sta_list = dict(zip(list(radio_data.keys()), sta_list_)) for i in dict_radio_sta_list: temp_list = [] shelf_resource = str(i.split(".")[0] + "." + i.split(".")[1] + ".") for j in dict_radio_sta_list[i]: temp_list.append(shelf_resource + j) dict_radio_sta_list[i] = temp_list data_dict["radios"] = dict_radio_sta_list for i in r_val: r_val[i]["station_data"] = data_dict["radios"] r_val[i]["sta_prefix"] = data_dict["sta_prefix"] r_val[i]["sniff_radio_2g"] = data_dict["sniff_radio_2g"] r_val[i]["sniff_radio_5g"] = data_dict["sniff_radio_5g"] r_val[i]["sniff_radio_6g"] = data_dict["sniff_radio_6g"] #mode encryption and band if band == "twog": temp_band = "2G" elif band == "fiveg": temp_band = "5G" elif band == "sixg": temp_band = "6G" if self.run_lf: ssid_data_run_lf = self.run_lf_dut_data() for dut in self.dut_data: if mode != ssid_data_run_lf[dut["identifier"]]["mode"]: pytest.skip("Dut is not configured in mode: " + mode) #"{'706dec0a8a79': {'ssid': 'ssid_wpa_5g_br', 'bssid': '[BLANK]', 'passkey': 'something', 'encryption': 'wpa', 'upstream_port': '1.1.eth2', # 'upstream_resource': '1', 'upstream': 'eth2', 'station_data': {'1.1.wiphy5': ['1.1.ath10k_5g00']}, # 'sniff_radio_2g': None, 'sniff_radio_5g': '1.1.wiphy0', 'sniff_radio_6g': None, 'sta_prefix': 'ath10k_5g0'}}" else: for i in ssid_data_run_lf[dut["identifier"]]["ssid_data"]: if encryption == ssid_data_run_lf[dut["identifier"]]["ssid_data"][i]["encryption"]: if band == ssid_data_run_lf[dut["identifier"]]["ssid_data"][i]["band"]: r_val[dut["identifier"]]["ssid"] = ssid_data_run_lf[dut["identifier"]]["ssid_data"][i]["ssid"] r_val[dut["identifier"]]["passkey"] = ssid_data_run_lf[dut["identifier"]]["ssid_data"][i]["password"] r_val[dut["identifier"]]["encryption"] = ssid_data_run_lf[dut["identifier"]]["ssid_data"][i]["encryption"] r_val[dut["identifier"]]["bssid"] = ssid_data_run_lf[dut["identifier"]]["ssid_data"][i]["bssid"] r_val[dut["identifier"]]["channel"] = ssid_data_run_lf[dut["identifier"]]["radio_data"][temp_band]["channel"] if r_val[dut["identifier"]]["ssid"] is None: logging.warning("Required combination: " + temp_band + ", " + encryption + " is not available") pytest.skip("Expected ssid configuration is not available in DUT") else: for dut in self.dut_data: ssid_data = [] if r_val.keys().__contains__(dut["identifier"]): r_val[dut["identifier"]]["ssid"] = ssid r_val[dut["identifier"]]["passkey"] = passkey r_val[dut["identifier"]]["encryption"] = encryption r_val[dut["identifier"]]["bssid"] = bssid r_val[dut["identifier"]]["channel"] = dut_data_[dut["identifier"]]["radio_data"][temp_band]["channel"] if str(encryption).upper() == "OPEN": ssid_data.append(['ssid_idx=0 ssid=' + ssid + ' bssid=' + str(bssid).upper()]) else: ssid_data.append(['ssid_idx=0 ssid=' + ssid + ' security=' + str(encryption).upper() + ' password=' + passkey + ' bssid=' + str(bssid).upper()]) if str(encryption).upper() in ["OPEN", "WPA", "WPA2", "WPA3", "WEP"]: self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) return r_val def update_duts(self, identifier=0, ssid_data=[]): for dut_obj in self.dut_objects: if identifier == dut_obj.dut_name: dut_obj.ssid = ssid_data dut_obj.add_ssids() # SSID data should be in this format # [ # ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'], # ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59'] # ] def setup_relevent_profiles(self): """ TODO Read all Profiles Create upstream-dhcp and uplink-nat profile if they don't exists Create VLAN Based profiles Create 2 Profiles for vlan vlan profile with dhcp server vlan profile without dhcp server """ profile_utility_obj = ProfileUtility(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) # Read all Profiles all_profiles = profile_utility_obj.show_profile() logging.info("Profiles: " + str(all_profiles)) # Create upstream-dhcp and uplink-nat profile if they don't exists # Checking availability of upstream-dhcp profile try: upstream_dhcp = profile_utility_obj.check_profile(profile_name="upstream-dhcp") # Checking availability of uplink-nat profile uplink_nat = profile_utility_obj.check_profile(profile_name="uplink-nat") except Exception as e: upstream_dhcp = True uplink_nat = True pass if upstream_dhcp: logging.info("upstream_dhcp profile: Available") profile_utility_obj.remove_profile(name="upstream-dhcp") profile_utility_obj.add_profile(profile_name="upstream-dhcp", profile_type="upstream", profile_flags="DHCP-SERVER") else: profile_utility_obj.add_profile(profile_name="upstream-dhcp", profile_type="upstream", profile_flags="DHCP-SERVER") if uplink_nat: profile_utility_obj.remove_profile(name="uplink-nat") profile_utility_obj.add_profile(profile_name="uplink-nat", profile_type="uplink", profile_flags="NAT") else: profile_utility_obj.add_profile(profile_name="uplink-nat", profile_type="uplink", profile_flags="NAT") # Create VLAN Based profiles if self.scenario == "dhcp-bridge": try: vlan_dhcp_profile = profile_utility_obj.check_profile(profile_name="vlan_dhcp_profile") except Exception as e: vlan_dhcp_profile = True if vlan_dhcp_profile: profile_utility_obj.remove_profile(name="vlan_dhcp_profile") profile_utility_obj.add_profile(profile_name="vlan_dhcp_profile", profile_type="vlan", profile_flags="DHCP-SERVER") else: profile_utility_obj.add_profile(profile_name="vlan_dhcp_profile", profile_type="vlan", profile_flags="DHCP-SERVER") elif self.scenario == "dhcp-external": try: vlan_profile = profile_utility_obj.check_profile(profile_name="vlan_profile") except Exception as e: vlan_profile = True if vlan_profile: profile_utility_obj.remove_profile(name="vlan_profile") profile_utility_obj.add_profile(profile_name="vlan_profile", profile_type="vlan", profile_flags=None) else: profile_utility_obj.add_profile(profile_name="vlan_profile", profile_type="vlan", profile_flags=None) def pre_cleanup(self): """ deleting existing stations and layer 3 connections """ logging.info("Checking existing stations and layer3 connections...") exist_sta = [] for u in self.json_get("/port/?fields=port+type,alias")['interfaces']: if list(u.values())[0]['port type'] not in ['Ethernet', 'WIFI-Radio', 'NA']: exist_sta.append(list(u.values())[0]['alias']) if len(exist_sta) == 0: logging.info("Existing stations are not available") else: for port_eid in exist_sta: self.local_realm.rm_port(port_eid, check_exists=True) time.sleep(0.3) logging.warning("Deleting existing stations") logging.info("Deleted %s Stations" % exist_sta) # deleting the previously created traffic try: exist_l3 = list(filter(lambda cx_name: cx_name if (cx_name != 'handler' and cx_name != 'uri') else False, self.json_get("/cx/?fields=name"))) if len(exist_l3) == 0 or exist_l3[0] == "empty": logging.info("Existing layer3 and endp are not available") else: list(map(lambda i: self.local_realm.rm_cx(cx_name=i), exist_l3)) list(map(lambda cx_name: [self.local_realm.rm_endp(ename=i) for i in [f"{cx_name}-A", f"{cx_name}-B"]], exist_l3)) except Exception as e: logging.error(e) def nametoresource(self, name=None): """Returns resource number""" if name is not None: resource = name.split(".")[1] return resource else: logging.error("Name is not provided") def get_wan_upstream_ports(self): """finding upstream wan ports""" r_val = dict() for dut in self.dut_data: r_val[dut["identifier"]] = None if dut["wan_port"] is not None: if dut["wan_port"] in self.lanforge_data["wan_ports"].keys(): r_val[dut["identifier"]] = dut["wan_port"] return r_val def get_lan_upstream_ports(self): """finding upstream wan ports""" r_val = dict() for dut in self.dut_data: r_val[dut["identifier"]] = None if dut["lan_port"] is not None: if dut["lan_port"] in self.lanforge_data["lan_ports"].keys(): r_val[dut["identifier"]] = dut["lan_port"] return r_val def setup_sniffer(self, band=None, station_radio_data=None): """Setup sniff radio""" sniff_radio = None if band == "twog": all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios logging.info("All 2g radios" + str(all_radio_2g)) left_radio = list(set(all_radio_2g) - set(list(station_radio_data.keys()))) if len(left_radio) == 0: sniff_radio = None logging.error("Radios are not available for sniffing") else: sniff_radio = left_radio[0] elif band == "fiveg": all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios logging.info("All 5g radios" + str(all_radio_5g)) left_radio = list(set(all_radio_5g) - set(list(station_radio_data.keys()))) if len(left_radio) == 0: sniff_radio = None logging.error("Radios are not available for sniffing") else: sniff_radio = left_radio[0] elif band == "sixg": all_radio_6g = self.ax210_radios logging.info("All 6g radios" + str(all_radio_6g)) left_radio = list(set(all_radio_6g) - set(list(station_radio_data.keys()))) if len(left_radio) == 0: sniff_radio = None logging.error("Radios are not available for sniffing") else: sniff_radio = left_radio[0] return sniff_radio def scan_ssid(self, radio="", retry=1, allure_attach=True, scan_time=15, ssid=None, ssid_channel=None): '''This method for scan ssid data''' count = 0 sta_list = [] sta_name = str(radio.split(".")[0]) + "." + str(radio.split(".")[1]) + "." + "sta00100" sta_list.append(sta_name) logging.info("scan station: " + str(sta_list)) for i in range(retry + 1): list_data = [] obj_scan = StaScan(host=self.manager_ip, port=self.manager_http_port, ssid="fake ssid", security="open", password="[BLANK]", radio=radio, sta_list=sta_list, csv_output="scan_ssid.csv", scan_time=scan_time) # obj_scan.pre_cleanup() time1 = datetime.now() first = time.mktime(time1.timetuple()) * 1000 obj_scan.build() obj_scan.start() time2 = datetime.now() second = time.mktime(time2.timetuple()) * 1000 diff = int(second - first) try: with open(obj_scan.csv_output, 'r') as file: reader = csv.reader(file) for row in reader: if row[1] == "age": list_data.append(row) continue elif int(row[1]) < diff: list_data.append(row) except Exception as e: logging.error(e) report_obj = Report() csv_data_table = report_obj.table2(list_data) # allure.attach(name="scan_ssid_data", body=csv_data_table) if allure_attach: if i == 0: allure.attach(name="scan_ssid_data", body=csv_data_table) else: allure.attach(name="scan_ssid_data_retry", body=csv_data_table) obj_scan.cleanup() if self.check_ssid_available_scan_result(scan_ssid_data=list_data, ssid=ssid): count = count + 1 return list_data if count == 0: return False def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): self.pcap_name = test_name + ".pcap" self.pcap_obj = SniffRadio(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port, radio=radio, channel=radio_channel, monitor_name="moni3a") self.pcap_obj.setup(0, 0, 0) time.sleep(5) self.pcap_obj.monitor.admin_up() time.sleep(5) self.pcap_obj.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration) def stop_sniffer(self): self.pcap_obj.monitor.admin_down() time.sleep(2) self.pcap_obj.cleanup() lf_report.pull_reports(hostname=self.manager_ip, port=self.manager_ssh_port, username="lanforge", password="lanforge", report_location="/home/lanforge/" + self.pcap_name, report_dir=".") allure.attach.file(source=self.pcap_name, name="pcap_file", attachment_type=allure.attachment_type.PCAP) logging.info("pcap file name: " + str(self.pcap_name)) return self.pcap_name def check_ssid_available_scan_result(self, scan_ssid_data=None, ssid=None): """This method will check ssid available or not in scan ssid data""" try: flag = False for i in scan_ssid_data: if ssid in i: flag = True if flag: return True else: return False except Exception as e: logging.error(e) def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO", country=None, antenna=None): # country_code = US(840) try: radio = radio.split(".") shelf = radio[0] resource = radio[1] radio_ = radio[2] local_realm_obj = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) data = { "shelf": shelf, "resource": resource, "radio": radio_, "mode": "NA", "channel": channel } try: if country: # update the dictionary data["country"] = country if antenna: data["antenna"] = antenna except Exception as e: logging.error(f"{e}\nunable to change lanforge radio country code") local_realm_obj.json_post("/cli-json/set_wifi_radio", _data=data) time.sleep(2) except Exception as e: logging.error(e) def get_station_data(self, rows=[], sta_name=[], allure_attach=True, allure_name="station data"): """ Attach station data to allure e.g. rows = ["ip", "signal"] , sta_names = ["1.1.wlan0000", "1.1.wlan0001"] """ # dict for station data sta_dict = {} try: for sta in sta_name: sta_url = "port/" + str(sta.split(".")[0]) + "/" + str(sta.split(".")[1]) + "/" + str(sta.split(".")[2]) station_info = self.json_get(sta_url) dict_data = station_info["interface"] temp_dict = {} for i in rows: temp_dict[i] = dict_data[i] if i == "channel": if dict_data[i] == "-1": self.local_realm.reset_port(sta) time.sleep(5) sta_url_ = "port/" + str(sta.split(".")[0]) + "/" + str(sta.split(".")[1]) + "/" + str( sta.split(".")[2]) station_info_ = self.json_get(sta_url_) dict_data_ = station_info_["interface"] logging.info("station data after port reset" + str(dict_data_)) temp_dict[i] = dict_data_[i] sta_dict[sta] = temp_dict except Exception as e: logging.error(e) logging.info("station info: " + str(sta_dict)) # Creating dict for allure table station_table_dict = {} station_table_dict["station name"] = list(sta_dict.keys()) for i in rows: temp_list = [] for j in sta_name: temp_list.append(sta_dict[j][i]) station_table_dict[i] = temp_list if allure_attach: self.attach_table_allure(data=station_table_dict, allure_name=allure_name) return sta_dict def get_cx_data(self, cx_name=[], cx_data=[], allure_attach=True): """Attach cx data to allure""" url = "cx/all" # cx_data.append("type") dict_cx_data = {} cx_json_data = self.json_get(url) try: for sta_ in cx_name: temp_dict = {} for i in cx_data: temp_dict[i] = cx_json_data[sta_][i] dict_cx_data[sta_] = temp_dict except Exception as e: logging.error(e) cx_table_dict = {} cx_table_dict["cx name"] = list(dict_cx_data.keys()) for i in cx_data: temp_list = [] for j in cx_name: temp_list.append(dict_cx_data[j][i]) if i == "bps rx a": cx_table_dict["Download"] = temp_list elif i == "bps rx b": cx_table_dict["Upload"] = temp_list elif i == "type": cx_table_dict["cx type"] = temp_list if allure_attach: self.attach_table_allure(data=cx_table_dict, allure_name="cx data") return dict_cx_data def get_station_list(self, num_sta=1, band="twog"): """Create station list""" sta_list = [] for i in range(num_sta): if band == "twog": sta_list.append(self.twog_prefix + str(i)) elif band == "fiveg": sta_list.append(self.fiveg_prefix + str(i)) elif band == "sixg": sta_list.append(self.sixg_prefix + str(i)) else: logging.error("band is wrong") return sta_list def attach_table_allure(self, data=None, allure_name=None): """Attach table to allure.data should be dict.""" try: report_obj = Report() data_table = report_obj.table2(table=data, headers='keys') allure.attach(name=allure_name, body=data_table) logging.info(str("\n") + str(data_table)) except Exception as e: logging.error(e) def create_stations(self): pass def delete_stations(self): pass def modify_station(self): pass def read_stations(self): pass def pull_reports(self): pass def get_wifi_radios(self): pass def modify_wifi_radio(self): pass def load_scenario_db(self): pass def delete_dut(self): pass def read_dut(self): pass def update_dut(self): pass def get_ethernet_ports(self): pass def set_ethernet_port(self): pass def clean_port_manager(self): pass def clean_layer3cx(self): pass def station_data_query(self, station_name=None, query=None): """Station data query. station name e.g 1.1.wlan0000""" if station_name is None or query is None: logging.error("Station name or query is missing") x = station_name.split(".") url = f"/port/{x[0]}/{x[1]}/{x[2]}?fields={query}" logging.info("url: " + str(url)) response = self.json_get(_req_url=url) if (response is None) or ("interface" not in response): logging.info("station_list: incomplete response:") # pprint(response) pytest.skip("station_list: incomplete response:") y = response["interface"][query] return y def get_max_sta(self, radio=""): data = self.json_get("/radiostatus/all") return data[radio]["max_vifs"] def add_vlan(self, vlan_ids=[], build=True): self.temp_raw_lines = self.default_scenario_raw_lines.copy() data = self.json_get("/port/all") flag = 0 profile_name = "" port_list = [] vlan_raws = [] # temp_raw_lines = self.default_scenario_raw_lines.copy() for port in self.wan_ports: for vlans in vlan_ids: for i in data["interfaces"]: if list(i.keys())[0] != port + "." + str(vlans): flag = 1 if flag == 1: for vlans in vlan_ids: if self.scenario == "dhcp-bridge": profile_name = "vlan_dhcp_profile" elif self.scenario == "dhcp-external": profile_name = "vlan_profile" port_list.append(str(port) + "." + str(vlans)) vlan_raws.append(["profile_link " + port + " " + profile_name + " 1 " + port + " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)]) self.temp_raw_lines.append(["profile_link " + port + " " + profile_name + " 1 " + port + " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)]) if build: self.chamber_view(raw_lines="custom") if self.scenario == "dhcp-external": for port in port_list: data = { "shelf": port.split(".")[0], "resource": port.split(".")[1], "port": port.split(".")[2] + "." + port.split(".")[3], "current_flags": 2147483648, "interest": 16384 } self.json_post("/cli-json/set_port", data) time.sleep(2) time.sleep(5) # query and fetch vlan Ip Address port_data = self.json_get(_req_url="/port?fields=alias,parent+dev,port+type,ip,mac")['interfaces'] logging.info("Port data: " + str(port_data)) vlan_table_data = {} port = [] ip = [] parent_dev = [] not_ip_vlans = [] vlan_ip_fail = False for i in port_data: for item in i: if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0': vlan_ip_fail = True logging.error(f"VLAN Interface - {i[item]['alias']} do not have IP") port.append(item) ip.append(i[item]['ip']) not_ip_vlans.append(item) parent_dev.append(i[item]['parent dev']) elif i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] != '0.0.0.0': port.append(item) ip.append(i[item]['ip']) parent_dev.append(i[item]['parent dev']) # creating dict for vlan table vlan_table_data["Port"] = port vlan_table_data["Parent Dev"] = parent_dev vlan_table_data["ip"] = ip # Attaching vlan table to allure self.attach_table_allure(data=vlan_table_data, allure_name="VLAN Table") if vlan_ip_fail: # Fail if Vlan don't have IP pytest.fail("VLAN do not have IP:-" + str(not_ip_vlans)) return vlan_raws else: return vlan_raws def chamber_view(self, delete_old_scenario=True, raw_lines="default"): """create chamber view. raw_lines values are default | custom""" if delete_old_scenario: self.chamberview_object.clean_cv_scenario(scenario_name=self.scenario) # if self.scenario == "dhcp-bridge": # self.create_dhcp_bridge() # logging.info("Scenario name: " + str(self.scenario)) # elif self.scenario == "dhcp-external": # self.create_dhcp_external() # logging.info("Scenario name: " + str(self.scenario)) if raw_lines.lower() == "default": raw_lines_ = self.default_scenario_raw_lines elif raw_lines.lower() == "custom": raw_lines_ = self.temp_raw_lines else: logging.error("raw lines are wrong. Provide default or custom") self.chamberview_object.setup(create_scenario=self.scenario, raw_line=raw_lines_ ) logging.info("Raw Lines type: " + str(raw_lines)) logging.info("Raw Lines: " + str(raw_lines_)) self.chamberview_object.build(self.scenario) self.chamberview_object.sync_cv() time.sleep(2) self.chamberview_object.show_text_blob(None, None, True) # Show changes on GUI self.chamberview_object.sync_cv() time.sleep(15) return self.chamberview_object, self.scenario def setup_radius_server(self, user=""): """ TODO: setup freeradius server on lanforge and return the radius server data setup the radius server for basic EAP-TLS and EAP-TTLS encryptions Special Radius configurations can be done in later implementations Radius server should be working properly on WAN Interface of AP """ pass def enable_verbose_debug(self, radio=None, enable=True): """Increase debug info in wpa-supplicant and hostapd logs""" # radio e.g 1.1wiphy0 if radio is not None: shelf = radio.split(".")[0] resource = radio.split(".")[1] radio_name = radio.split(".")[2] if enable: flag_value = "0x10000" else: flag_value = "0x00000" data = { "shelf": shelf, "resource": resource, "radio": radio_name, "flags": flag_value } self.json_post("/cli-json/set_wifi_radio", data=data) else: logging.error("Radio name is wrong") def attach_report_graphs(self, report_name=None, pdf_name="WIFI Capacity Test PDF Report"): if report_name[-1] == "/": relevant_path = "../reports/" + report_name entries = os.listdir("../reports/" + report_name) else: relevant_path = "../reports/" + report_name + "/" entries = os.listdir("../reports/" + report_name + '/') # logging.info("GETCWD" + str(os.getcwd())) # logging.info("List report dict" + str(os.listdir("../reports"))) # logging.info("CHeck Report directory" + str(os.path.exists(f"../reports/{report_name}"))) logging.info("Entries" + str(entries)) pdf = False for i in entries: if ".pdf" in i: pdf = i if pdf: allure.attach.file(source=relevant_path + pdf, name=pdf_name) included_extensions = ['png'] file_names = [fn for fn in os.listdir(relevant_path) if any(fn.endswith(ext) for ext in included_extensions)] a = [item for item in file_names if 'print' not in item] a = [item for item in a if 'logo' not in item] a = [item for item in a if 'Logo' not in item] a = [item for item in a if 'candela' not in item] a.sort() for i in a: allure.attach.file(source=relevant_path + i, name=i, attachment_type="image/png", extension=None) def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False, file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv", batch_size="0"): try: df = pd.read_csv("../reports/" + str(dir_name) + file_name, sep=r'\t', engine='python') logging.info("csv file opened") except FileNotFoundError: logging.info("csv file does not exist") return False if kpi_csv: count = 0 dict_data = {"Down": {}, "Up": {}, "Both": {}} csv_short_dis = df.loc[:,"short-description"] csv_num_score = df.loc[:,"numeric-score"] for i in range(len(batch_size.split(","))): dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0] dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1] dict_data["Both"][csv_short_dis[count + 2]] = csv_num_score[count + 2] count += 3 if individual_station_throughput: dict_data = {} if option == "download": csv_sta_names = df.iloc[[0]].values.tolist() csv_throughput_values = df.iloc[[1]].values.tolist() elif option == "upload": csv_sta_names = df.iloc[[0]].values.tolist() csv_throughput_values = df.iloc[[2]].values.tolist() else: print("Provide proper option: download or upload") return sta_list = csv_sta_names[0][0][:-1].replace('"', '').split(",") th_list = list(map(float, csv_throughput_values[0][0].split(","))) for i in range(len(sta_list)): dict_data[sta_list[i]] = th_list[i] return dict_data def attach_report_kpi(self, report_name=None, file_name="kpi_file"): if report_name[-1] == "/": path = "../reports/" + str(report_name) + "kpi.csv" else: path = "../reports/" + str(report_name) + "/kpi.csv" if os.path.exists(path): allure.attach.file(source=path, name=file_name, attachment_type="CSV") return os.path.exists(path) def get_supplicant_logs(self, radio="1.1.wiphy0", attach_allure=True): try: resource = radio.split(".")[1] radio = radio.split(".")[2] ip = self.get_manager_from_resource(resource=int(resource)) if ip is not None: supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + radio + ".txt" obj = SCP_File(ip=ip, port=22, username="root", password="lanforge", remote_path=supplicant, local_path=".") obj.pull_file() if attach_allure: allure.attach.file(source="wpa_supplicant_log_" + radio + ".txt", name="wpa_supplicant_log - " + str(radio)) except Exception as e: logging.error("get_supplicant_logs() - Error in getting supplicant Logs: " + str(e)) def get_resources(self): try: d = self.json_get("/port/?fields=alias,ip") resource_data = dict() for i in d["interfaces"]: if str(list(dict(i).keys())[0]).__contains__("eth0"): resource_data[str(list(dict(i).keys())[0]).split(".")[1]] = i[str(list(dict(i).keys())[0])]["ip"] logging.info("Resource ID and Management port Mapping: " + str(resource_data)) except Exception as e: logging.error(str(e)) return resource_data def get_manager_from_resource(self, resource=1): resource_data = self.get_resources() try: ip = resource_data[str(resource)] except Exception as e: logging.error("Resource is Unavailable when reading manager: " + str(e)) ip = None return ip def client_disconnect(self, station_name=[], clean_l3_traffic=False, clear_all_sta=False): client_dis = CreateStation(_host=self.manager_ip, _port=self.manager_http_port, _sta_list=station_name, _password="passkey", _ssid="ssid", _security="security") if len(station_name) > 0: client_dis.station_profile.cleanup(station_name) elif clear_all_sta: exist_sta = [] for u in client_dis.json_get("/port/?fields=port+type,alias")['interfaces']: if list(u.values())[0]['port type'] not in ['Ethernet', 'WIFI-Radio', 'NA']: exist_sta.append(list(u.values())[0]['alias']) client_dis.station_profile.cleanup(desired_stations=exist_sta) if clean_l3_traffic: try: exist_l3 = list( filter(lambda cx_name: cx_name if (cx_name != 'handler' and cx_name != 'uri') else False, client_dis.json_get("/cx/?fields=name"))) list(map(lambda i: client_dis.rm_cx(cx_name=i), exist_l3)) list(map(lambda cx_name: [client_dis.rm_endp(ename=i) for i in [f"{cx_name}-A", f"{cx_name}-B"]], exist_l3)) except Exception as e: logging.error(e) return True def run_lf_dut_data(self): """Dut data for running test cases only on lanforge""" ret = {} try: for i in self.dut_data: ret = {i["identifier"]: i["ssid"]} except Exception as e: logging.error(e) return ret def check_band_ap(self, band=None): """"This method will check given band ap is supporting or not""" temp_band = None if band is not None: if band == "twog" or band == "2G": temp_band = "2G" elif band == "fiveg" or band == "5G": temp_band = "5G" elif band == "sixg" or band == "6G": temp_band = "6G" print(self.dut_data) if len(self.dut_data) == 1: for ap in self.dut_data: print(ap) if temp_band not in ap["supported_bands"]: logging.error("AP is not supporting " + temp_band + " Band") pytest.skip("AP is not supporting " + temp_band + " Band") else: pass def lf_sixg_lookup_validation(self, channel=None): """ This method will return ap equaivalent 6g channel for lanforge. channel should be ap 6g channel """ ap_lanforge_6g_channel_dict = { 1: 191, 5: 195, 9: 199, 13: 203, 17: 207, 21: 211, 25: 215, 29: 219, 33: 223, 37: 227, 41: 231, 45: 235, 49: 239, 53: 243, 57: 247, 61: 251, 65: 255, 69: 259, 73: 262, 77: 266, 81: 270, 85: 274, 89: 278, 93: 282, 97: 286, 101: 290, 105: 294, 109: 298, 113: 302, 117: 306, 121: 310, 125: 314, 129: 318, 133: 322, 137: 326, 141: 330, 145: 334, 149: 339, 153: 343, 157: 347, 161: 351, 165: 355, 169: 359, 173: 363, 177: 367, 181: 371, 185: 375, 189: 379, 193: 383, 197: 387, 201: 391, 205: 395, 209: 399, 213: 403, 217: 407, 221: 411, 225: 415, 229: 419, 233: 423 } if channel is not None: channel = int(channel) return ap_lanforge_6g_channel_dict[channel] else: return None def attenuator_serial(self): self.obj = Attenuator( lfclient_host= self.manager_ip, lfclient_port=self.manager_http_port ) val = self.obj.show() return val def attenuator_modify(self, serno, idx, val): atten_obj = Attenuator_modify(self.manager_ip, self.manager_http_port, serno, idx, val) atten_obj.build() def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False, file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv", batch_size="0"): try: df = pd.read_csv( "../reports/" + str(dir_name) + file_name, sep=r'\t', engine='python') print("csv file opened") except FileNotFoundError: print("csv file does not exist") return False if kpi_csv: count = 0 dict_data = {"Down": {}, "Up": {}, "Both": {}} csv_short_dis = df.loc[:, "short-description"] csv_num_score = df.loc[:, "numeric-score"] for i in range(len(batch_size.split(","))): dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0] dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1] dict_data["Both"][csv_short_dis[count + 2]] = csv_num_score[count + 2] count += 3 if individual_station_throughput: dict_data = {} if option == "download": csv_sta_names = df.iloc[[0]].values.tolist() csv_throughput_values = df.iloc[[1]].values.tolist() elif option == "upload": csv_sta_names = df.iloc[[0]].values.tolist() csv_throughput_values = df.iloc[[2]].values.tolist() else: print("Provide proper option: download or upload") return sta_list = csv_sta_names[0][0][:-1].replace('"', '').split(",") th_list = list(map(float, csv_throughput_values[0][0].split(","))) for i in range(len(sta_list)): dict_data[sta_list[i]] = th_list[i] return dict_data def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, traffic_type, sta_list, side_b=""): # checked print(sta_list) print(type(sta_list)) if side_b == "": side_b = self.wan_ports side_b_ = list(side_b.keys()) side_b = side_b_[0] print(side_b) print(type(side_b)) else: print(side_b) print(type(side_b)) local_realm = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) cx_profile = local_realm.new_l3_cx_profile() cx_profile.host = self.manager_ip cx_profile.port = self.manager_http_port layer3_cols = ['name', 'tx bytes', 'rx bytes', 'tx rate', 'rx rate'] cx_profile.side_a_min_bps = side_a_min_rate cx_profile.side_a_max_bps = side_a_max_rate cx_profile.side_b_min_bps = side_b_min_rate cx_profile.side_b_max_bps = side_b_max_rate # create cx_profile.create(endp_type=traffic_type, side_a=sta_list, side_b=side_b, sleep_time=0) cx_profile.start_cx() def l3_cleanup(self): local_realm = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) local_realm.remove_all_cxs(remove_all_endpoints=True) def get_cx_list(self): local_realm = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) layer3_result = local_realm.cx_list() layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item] print(layer3_names) return layer3_names def allure_report_table_format(self, dict_data=None, key=None, value=None, name=None):#, value_on_same_table=True): report_obj = Report() data_table, dict_table = "", {} dict_table[key] = list(dict_data.keys()) dict_table[value] = list(dict_data.values()) try: data_table = report_obj.table2(table=dict_table, headers='keys') except Exception as e: print(e) if name != None: allure.attach(name=name, body=str(data_table)) def client_connect_using_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", vlan_id=100, radio=None, sta_mode=0, station_name=[]): self.client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port, _mode=sta_mode, _sta_list=station_name, _password=passkey, _ssid=ssid, _security=security) # self.client_connect.station_profile.sta_mode = sta_mode self.client_connect.upstream_resource = 1 if mode == "BRIDGE": self.client_connect.upstream_port = self.wan_ports elif mode == "NAT": self.client_connect.upstream_port = self.wan_ports else: self.client_connect.upstream_port = self.wan_ports + "." + str(vlan_id) self.client_connect.radio = radio self.client_connect.build() # self.client_connect.wait_for_ip(station_name, timeout_sec=100) # print(self.client_connect.wait_for_ip(station_name)) if self.client_connect.wait_for_ip(station_name, timeout_sec=100): self.client_connect._pass("ALL Stations got IP's", print_=True) return self.client_connect else: return False class Report: def __init__(self, key1=None, key2=None, val1=None, val2=None): self.key1 = key1 self.key2 = key2 self.val1 = val1 self.val2 = val2 def table1(self): table = {str(self.key1): self.val1, str(self.key2): self.val2} x = tabulate(table, headers="keys", tablefmt="fancy_grid") return x def table2(self, table=None, headers='firstrow', tablefmt='fancy_grid'): self.table = table x = tabulate(self.table, headers=headers, tablefmt=tablefmt) return x class SCP_File: def __init__(self, ip="localhost", port=22, username="lanforge", password="lanforge", remote_path="/home/lanforge/", local_path="."): self.ip = ip self.port = port self.remote_path = remote_path self.local_path = local_path self.username = username self.password = password def pull_file(self): ssh = paramiko.SSHClient() ssh.load_system_host_keys() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.ip, username=self.username, password=self.password, port=self.port, allow_agent=False, look_for_keys=False) # ssh.close() with SCPClient(ssh.get_transport()) as scp: scp.get(remote_path=self.remote_path, local_path=self.local_path, recursive=True) scp.close() def save_current_scenario(self): pass