diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index bf4700c8..54071aab 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -9,6 +9,7 @@ import paramiko import pytest from scp import SCPClient from tabulate import tabulate +from itertools import islice sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base") @@ -64,6 +65,11 @@ class lf_libs: max_6g_stations = None max_ax_stations = None max_ac_stations = None + twog_prefix = "ath10k_2g0" + fiveg_prefix = "ath10k_5g0" + sixg_prefix = "AX210_0" + ax_prefix = "AX200_0" + pcap_obj = None """ Scenario : dhcp-bridge / dhcp-external dhcp-bridge - wan_ports will act as dhcp server for AP's and it will use uplink_nat_ports for uplink NAT @@ -186,9 +192,7 @@ class lf_libs: """ def setup_dut(self): - print(self.dut_data) for index in range(0, len(self.dut_data)): - print(self.testbed) dut_obj = DUT(lfmgr=self.manager_ip, port=self.manager_http_port, dut_name=self.testbed + "-" + str(index), @@ -198,7 +202,7 @@ class lf_libs: serial_num=self.dut_data[index]["identifier"]) dut_obj.setup() dut_obj.add_ssids() - print("Creating DUT") + logging.info("Creating DUT") def setup_metadata(self): data = self.json_get("/port/all") @@ -289,7 +293,6 @@ class lf_libs: if len(data) == 0: return for eth_port in data: - print(eth_port) if data[eth_port]["addressing"] == "dhcp-server": return elif data[eth_port]["addressing"] == "static": @@ -393,6 +396,196 @@ class lf_libs: temp_raw_lines.append([d]) logging.info("Saved default CV Scenario details: " + str(temp_raw_lines)) + def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None): + if band is None: + logging.error("Band value is not available.") + pytest.exit("Band value is not available.") + if mode is None: + logging.error("mode value is not available") + pytest.exit("mode value is not available") + if num_sta is None: + logging.error("Number of stations are not available") + pytest.exit("Number of stations are not available") + if mode == "BRIDGE": + upstream_port = self.upstream_port() + elif mode == "NAT-WAN": + upstream_port = self.upstream_port() + elif mode == "NAT-LAN": + upstream_port = self.upstream_port() + elif mode == "VLAN": + # for vlan mode vlan id should be available + if vlan_id is not None: + upstream_port = self.upstream_port() + "." + str(vlan_id) + else: + logging.error("Vlan id is not available for vlan") + pytest.exit("Vlan id is not available for vlan") + else: + logging.error("Mode value is wrong.Value e.g. BRIDGE or NAT or VLAN") + pytest.exit("Mode value is wrong.Value e.g. BRIDGE or NAT or VLAN") + radio_data = {} + sta_prefix = "" + sniff_radio = "" + data_dict = {} + # deleting existing stations and layer 3 + self.pre_cleanup() + max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19, + "ax200_radios": 1, "ax210_radios": 1} + if band == "twog": + if self.run_lf: + for i in self.dut_data: + ssid = i["ssid"]["2g-ssid"] + passkey = i["ssid"]["2g-password"] + security = i["ssid"]["2g-encryption"].lower() + sta_prefix = self.twog_prefix + # checking station compitality of lanforge + if int(num_sta) > int(self.max_2g_stations): + logging.error("Can't create %s stations on lanforge" % num_sta) + pytest.skip("Can't create %s stations on lanforge" % num_sta) + # checking atleast one 2g radio is available or not + elif len(self.wave2_2g_radios) == 0 and len(self.wave1_radios) and len(self.ax210_radios) == 0 and len( + self.ax200_radios) == 0 and len(self.mtk_radios) == 0: + logging.error("Twog radio is not available") + pytest.skip("Twog radio is not available") + + dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios, + "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, + "ax200_radios": self.ax200_radios, + "ax210_radios": self.ax210_radios} + + # radio and station selection + stations = num_sta + for j in dict_all_radios_2g: + max_station = max_station_per_radio[j] + if stations > 0: + if len(dict_all_radios_2g[j]) > 0: + diff = max_station - stations + for i in dict_all_radios_2g[j]: + if diff >= 0: + radio_data[i] = stations + stations = 0 + break + elif diff < 0: + radio_data[i] = max_station + stations = stations - max_station + diff = max_station - stations + # setup sniffer + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) + data_dict["sniff_radio_2g"] = sniff_radio + if band == "fiveg": + if self.run_lf: + for i in self.dut_data: + ssid = i["ssid"]["2g-ssid"] + passkey = i["ssid"]["2g-password"] + security = i["ssid"]["2g-encryption"].lower() + + sta_prefix = self.fiveg_prefix + # checking station compitality of lanforge + if int(num_sta) > int(self.max_5g_stations): + logging.error("Can't create %s stations on lanforge" % num_sta) + pytest.skip("Can't create %s stations on lanforge" % num_sta) + # checking atleast one 5g radio is available or not + elif len(self.wave2_5g_radios) == 0 and len(self.wave1_radios) and len(self.ax210_radios) == 0 and len( + self.ax200_radios) == 0 and len(self.mtk_radios) == 0: + logging.error("fiveg radio is not available") + pytest.skip("fiveg radio is not available") + + dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, + "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, + "ax200_radios": self.ax200_radios, + "ax210_radios": self.ax210_radios} + + # radio and station selection + stations = num_sta + for j in dict_all_radios_5g: + max_station = max_station_per_radio[j] + if stations > 0: + if len(dict_all_radios_5g[j]) > 0: + diff = max_station - stations + for i in dict_all_radios_5g[j]: + if diff >= 0: + radio_data[i] = stations + stations = 0 + break + elif diff < 0: + radio_data[i] = max_station + stations = stations - max_station + diff = max_station - stations + # setup sniffer + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) + data_dict["sniff_radio_5g"] = sniff_radio + if band == "sixg": + if self.run_lf: + for i in self.dut_data: + ssid = i["ssid"]["6g-ssid"] + passkey = i["ssid"]["6g-password"] + security = i["ssid"]["6g-encryption"].lower() + + sta_prefix = self.sixg_prefix + # checking station compitality of lanforge + if int(num_sta) > int(self.max_6g_stations): + logging.error("Can't create %s stations on lanforge" % num_sta) + pytest.skip("Can't create %s stations on lanforge" % num_sta) + # checking atleast one 6g radio is available or not + elif len(self.ax210_radios) == 0: + logging.error("sixg radio is not available") + pytest.skip("sixg radio is not available") + + dict_all_radios_6g = {"ax210_radios": self.ax210_radios} + + # radio and station selection + stations = num_sta + for j in dict_all_radios_6g: + max_station = max_station_per_radio[j] + if stations > 0: + if len(dict_all_radios_6g[j]) > 0: + diff = max_station - stations + for i in dict_all_radios_6g[j]: + if diff >= 0: + radio_data[i] = stations + stations = 0 + break + elif diff < 0: + radio_data[i] = max_station + stations = stations - max_station + diff = max_station - stations + + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) + data_dict["sniff_radio_6g"] = sniff_radio + # creating dict of radio and station_list + dict_radio_sta_list = {} + # list of per radio station + length_to_split = list(radio_data.values()) + # station list of per radio + sta_list = self.get_station_list(num_sta=num_sta, band=band) + sta_list = iter(sta_list) + sta_list_ = [list(islice(sta_list, elem)) + for elem in length_to_split] + # Checking station lists according to radios + if len(sta_list_) == len(length_to_split): + dict_radio_sta_list = dict(zip(list(radio_data.keys()), sta_list_)) + for i in dict_radio_sta_list: + temp_list = [] + shelf_resource = str(i.split(".")[0] + "." + i.split(".")[1] + ".") + for j in dict_radio_sta_list[i]: + temp_list.append(shelf_resource + j) + dict_radio_sta_list[i] = temp_list + + if self.run_lf: + data_dict["radios"] = dict_radio_sta_list + data_dict["upstream_port"] = upstream_port + data_dict["ssid"] = ssid + data_dict["passkey"] = passkey + data_dict["security"] = security + data_dict["sta_prefix"] = sta_prefix + # data_dict["sniff_radio"] = sniff_radio + return data_dict + else: + data_dict["radios"] = dict_radio_sta_list + data_dict["upstream_port"] = upstream_port + data_dict["sta_prefix"] = sta_prefix + # data_dict["sniff_radio"] = sniff_radio + return data_dict + def setup_relevent_profiles(self): """ TODO Read all Profiles @@ -457,6 +650,265 @@ class lf_libs: else: profile_utility_obj.add_profile(profile_name="vlan_profile", profile_type="vlan", profile_flags=None) + def pre_cleanup(self): + """ deleting existing stations and layer 3 connections """ + logging.info("Checking existing stations and layer3 connections...") + exist_sta = [] + for u in self.json_get("/port/?fields=port+type,alias")['interfaces']: + if list(u.values())[0]['port type'] not in ['Ethernet', 'WIFI-Radio', 'NA']: + exist_sta.append(list(u.values())[0]['alias']) + if len(exist_sta) == 0: + logging.info("Existing stations are not available") + else: + for port_eid in exist_sta: + self.staConnect.rm_port(port_eid, check_exists=True) + time.sleep(0.3) + logging.warning("Deleting existing stations") + logging.info("Deleted %s Stations" % exist_sta) + + # deleting the previously created traffic + try: + exist_l3 = list(filter(lambda cx_name: cx_name if (cx_name != 'handler' and cx_name != 'uri') else False, + self.json_get("/cx/?fields=name"))) + if len(exist_l3) == 0 or exist_l3[0] == "empty": + logging.info("Existing layer3 and endp are not available") + else: + list(map(lambda i: self.staConnect.rm_cx(cx_name=i), exist_l3)) + list(map(lambda cx_name: [self.staConnect.rm_endp(ename=i) for i in [f"{cx_name}-A", f"{cx_name}-B"]], + exist_l3)) + except Exception as e: + logging.error(e) + + def nametoresource(self, name=None): + """Returns resource number""" + if name is not None: + resource = name.split(".")[1] + return resource + else: + logging.error("Name is not provided") + + def upstream_port(self): + """finding upstream port""" + upstream_port = "" + for i in self.dut_data: + upstream_port = i["wan_port"] + return upstream_port + + def setup_sniffer(self, band=None, station_radio_data=None): + """Setup sniff radio""" + sniff_radio = None + if band == "twog": + all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios + logging.info("All 2g radios" + str(all_radio_2g)) + left_radio = list(set(all_radio_2g) - set(list(station_radio_data.keys()))) + if len(left_radio) == 0: + sniff_radio = None + logging.error("Radios are not available for sniffing") + else: + sniff_radio = left_radio[0] + elif band == "fiveg": + all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios + logging.info("All 5g radios" + str(all_radio_5g)) + left_radio = list(set(all_radio_5g) - set(list(station_radio_data.keys()))) + if len(left_radio) == 0: + sniff_radio = None + logging.error("Radios are not available for sniffing") + else: + sniff_radio = left_radio[0] + elif band == "sixg": + all_radio_6g = self.ax210_radios + logging.info("All 6g radios" + str(all_radio_6g)) + left_radio = list(set(all_radio_6g) - set(list(station_radio_data.keys()))) + if len(left_radio) == 0: + sniff_radio = None + logging.error("Radios are not available for sniffing") + else: + sniff_radio = left_radio[0] + return sniff_radio + + def scan_ssid(self, radio="", retry=1, allure_attach=True, scan_time=15, ssid=None, ssid_channel=None): + '''This method for scan ssid data''' + count = 0 + sta_list = [] + sta_name = str(radio.split(".")[0]) + "." + str(radio.split(".")[1]) + "." + "sta00100" + sta_list.append(sta_name) + logging.info("scan station: " + str(sta_list)) + for i in range(retry + 1): + list_data = [] + obj_scan = StaScan(host=self.manager_ip, port=self.manager_http_port, ssid="fake ssid", security="open", + password="[BLANK]", radio=radio, sta_list=sta_list, csv_output="scan_ssid.csv", + scan_time=scan_time) + # obj_scan.pre_cleanup() + time1 = datetime.now() + first = time.mktime(time1.timetuple()) * 1000 + obj_scan.build() + obj_scan.start() + time2 = datetime.now() + second = time.mktime(time2.timetuple()) * 1000 + diff = int(second - first) + try: + with open(obj_scan.csv_output, 'r') as file: + reader = csv.reader(file) + for row in reader: + if row[1] == "age": + list_data.append(row) + continue + elif int(row[1]) < diff: + list_data.append(row) + except Exception as e: + logging.error(e) + report_obj = Report() + csv_data_table = report_obj.table2(list_data) + # allure.attach(name="scan_ssid_data", body=csv_data_table) + if allure_attach: + allure.attach(name="scan_ssid_data_" + str(i + 1), body=csv_data_table) + obj_scan.cleanup() + if self.check_ssid_available_scan_result(scan_ssid_data=list_data, ssid=ssid): + count = count + 1 + return list_data + if count == 0: + return False + + def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): + self.pcap_name = test_name + ".pcap" + self.pcap_obj = SniffRadio(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port, radio=radio, + channel=radio_channel, monitor_name="moni3a") + self.pcap_obj.setup(0, 0, 0) + time.sleep(5) + self.pcap_obj.monitor.admin_up() + time.sleep(5) + self.pcap_obj.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration) + + def stop_sniffer(self): + self.pcap_obj.monitor.admin_down() + time.sleep(2) + self.pcap_obj.cleanup() + lf_report.pull_reports(hostname=self.manager_ip, port=self.manager_ssh_port, username="lanforge", + password="lanforge", + report_location="/home/lanforge/" + self.pcap_name, + report_dir=".") + allure.attach.file(source=self.pcap_name, + name="pcap_file", attachment_type=allure.attachment_type.PCAP) + logging.info("pcap file name: " + str(self.pcap_name)) + return self.pcap_name + + def check_ssid_available_scan_result(self, scan_ssid_data=None, ssid=None): + """This method will check ssid available or not in scan ssid data""" + try: + flag = False + for i in scan_ssid_data: + if ssid in i: + flag = True + if flag: + return True + else: + return False + except Exception as e: + logging.error(e) + + def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO"): + try: + radio = radio.split(".") + shelf = radio[0] + resource = radio[1] + radio_ = radio[2] + local_realm_obj = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) + data = { + "shelf": shelf, + "resource": resource, + "radio": radio_, + "mode": "NA", + "channel": channel + } + local_realm_obj.json_post("/cli-json/set_wifi_radio", _data=data) + time.sleep(2) + except Exception as e: + logging.error(e) + + def get_station_data(self, rows=[], sta_name=[], allure_attach=True): + """ + Attach station data to allure + e.g. rows = ["ip", "signal"] , sta_names = ["1.1.wlan0000", "1.1.wlan0001"] + """ + # dict for station data + sta_dict = {} + try: + for sta in sta_name: + sta_url = "port/" + str(sta.split(".")[0]) + "/" + str(sta.split(".")[1]) + "/" + str(sta.split(".")[2]) + station_info = self.staConnect.json_get(sta_url) + dict_data = station_info["interface"] + temp_dict = {} + for i in rows: + temp_dict[i] = dict_data[i] + sta_dict[sta] = temp_dict + except Exception as e: + logging.error(e) + logging.info("station info: " + str(sta_dict)) + # Creating dict for allure table + station_table_dict = {} + station_table_dict["station name"] = list(sta_dict.keys()) + for i in rows: + temp_list = [] + for j in sta_name: + temp_list.append(sta_dict[j][i]) + station_table_dict[i] = temp_list + if allure_attach: + self.attach_table_allure(data=station_table_dict, allure_name="station data") + return sta_dict + + def get_cx_data(self, cx_name=[], cx_data=[], allure_attach=True): + """Attach cx data to allure""" + url = "cx/all" + # cx_data.append("type") + dict_cx_data = {} + cx_json_data = self.json_get(url) + try: + for sta_ in cx_name: + temp_dict = {} + for i in cx_data: + temp_dict[i] = cx_json_data[sta_][i] + dict_cx_data[sta_] = temp_dict + except Exception as e: + logging.error(e) + cx_table_dict = {} + cx_table_dict["cx name"] = list(dict_cx_data.keys()) + for i in cx_data: + temp_list = [] + for j in cx_name: + temp_list.append(dict_cx_data[j][i]) + if i == "bps rx a": + cx_table_dict["Download"] = temp_list + elif i == "bps rx b": + cx_table_dict["Upload"] = temp_list + elif i == "type": + cx_table_dict["cx type"] = temp_list + if allure_attach: + self.attach_table_allure(data=cx_table_dict, allure_name="cx data") + return dict_cx_data + + def get_station_list(self, num_sta=1, band="twog"): + """Create station list""" + sta_list = [] + for i in range(num_sta): + if band == "twog": + sta_list.append(self.twog_prefix + str(i)) + elif band == "fiveg": + sta_list.append(self.fiveg_prefix + str(i)) + elif band == "sixg": + sta_list.append(self.sixg_prefix + str(i)) + else: + logging.error("band is wrong") + return sta_list + + def attach_table_allure(self, data=None, allure_name=None): + """Attach table to allure.data should be dict.""" + try: + report_obj = Report() + data_table = report_obj.table2(table=data, headers='keys') + allure.attach(name=allure_name, body=data_table) + except Exception as e: + logging.error(e) + def create_stations(self): pass @@ -514,7 +966,6 @@ class lf_libs: for port in self.wan_ports: for vlans in vlan_ids: for i in data["interfaces"]: - print(i) if list(i.keys())[0] != port + "." + str(vlans): flag = 1 if flag == 1: @@ -527,10 +978,8 @@ class lf_libs: temp_raw_lines.append(["profile_link " + port + " " + profile_name + " 1 " + port + " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)]) - print(temp_raw_lines) self.chamber_view(raw_lines=temp_raw_lines) if self.scenario == "dhcp-external": - print(port_list) for port in port_list: data = { "shelf": port.split(".")[0], @@ -543,7 +992,6 @@ class lf_libs: time.sleep(2) def chamber_view(self, delete_old_scenario=True, raw_lines=[]): - print(self.chamberview_object) if delete_old_scenario: self.chamberview_object.clean_cv_scenario(scenario_name=self.scenario) # if self.scenario == "dhcp-bridge":