diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 47314bb2..00fe99ab 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -55,7 +55,7 @@ class lf_tests(lf_libs): # self.skip_pcap = skip_pcap #self.wan_upstream = list(self.wan_ports.keys()) # self.lan_upstream = - self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam") + self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam", _cleanup_on_exit=False) def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None): if band is None: @@ -96,8 +96,8 @@ class lf_tests(lf_libs): passkey = i["ssid"]["2g-password"] security = i["ssid"]["2g-encryption"].lower() sta_prefix = self.twog_prefix - # checking station compitality of lanforge + print(num_sta, self.max_2g_stations) if int(num_sta) > int(self.max_2g_stations): logging.error("Can't create %s stations on lanforge" % num_sta) pytest.exit() @@ -141,12 +141,12 @@ class lf_tests(lf_libs): # checking station compitality of lanforge if int(num_sta) > int(self.max_5g_stations): logging.error("Can't create %s stations on lanforge" % num_sta) - pytest.exit() + pytest.skip("Can't create %s stations on lanforge" % num_sta) # checking atleast one 5g radio is available or not elif len(self.wave2_5g_radios) == 0 and len(self.wave1_radios) and len(self.ax210_radios) == 0 and len( self.ax200_radios) == 0 and len(self.mtk_radios) == 0: logging.error("fiveg radio is not available") - pytest.exit() + pytest.skip("fiveg radio is not available") dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, @@ -170,6 +170,42 @@ class lf_tests(lf_libs): stations = stations - max_station diff = max_station - stations + if band == "sixg": + if self.run_lf: + for i in self.dut_data: + ssid = i["ssid"]["6g-ssid"] + passkey = i["ssid"]["6g-password"] + security = i["ssid"]["6g-encryption"].lower() + + sta_prefix = self.sixg_prefix + # checking station compitality of lanforge + if int(num_sta) > int(self.max_6g_stations): + logging.error("Can't create %s stations on lanforge" % num_sta) + pytest.exit() + # checking atleast one 6g radio is available or not + elif len(self.ax210_radios) == 0: + logging.error("sixg radio is not available") + pytest.exit() + + dict_all_radios_6g = {"ax210_radios": self.ax210_radios} + + # radio and station selection + stations = num_sta + for j in dict_all_radios_6g: + max_station = max_station_per_radio[j] + if stations > 0: + if len(dict_all_radios_6g[j]) > 0: + diff = max_station - stations + for i in dict_all_radios_6g[j]: + if diff >= 0: + radio_data[i] = stations + stations = 0 + break + elif diff < 0: + radio_data[i] = max_station + stations = stations - max_station + diff = max_station - stations + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) data_dict = {} if self.run_lf: @@ -213,7 +249,7 @@ class lf_tests(lf_libs): logging.info("Existing layer3 and endp are not available") else: list(map(lambda i: self.staConnect.rm_cx(cx_name=i), exist_l3)) - list(map(lambda cx_name: [self.rm_endp(ename=i) for i in [f"{cx_name}-A", f"{cx_name}-B"]], exist_l3)) + list(map(lambda cx_name: [self.staConnect.rm_endp(ename=i) for i in [f"{cx_name}-A", f"{cx_name}-B"]], exist_l3)) except Exception as e: logging.error(e) @@ -260,8 +296,7 @@ class lf_tests(lf_libs): return sniff_radio def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[], - station_name=[], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None, - outfile="shivam"): + station_name=[], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None, allure_attch=True): # self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug) # setup_interfaces() interface selection return radio name along no of station on each radio, upstream port # @@ -281,7 +316,7 @@ class lf_tests(lf_libs): length_to_split = list(data["radios"].values()) print(length_to_split) sta_list = iter(station_name) - # station list of per radio station list + # station list of per radio sta_list_ = [list(islice(sta_list, elem)) for elem in length_to_split] # Checking station lists according to radios @@ -300,14 +335,14 @@ class lf_tests(lf_libs): #changed to auto channel self.set_radio_channel(radio=radio, channel="AUTO") print("scan ssid radio", radio.split(".")[2]) - result = self.scan_ssid(radio=radio.split(".")[2]) + result = self.scan_ssid(radio=radio.split(".")[2], ssid=ssid, ssid_channel=ssid_channel) print("ssid scan data :- ", result) if not result and ssid_channel: #Sniffer required - # print("sniff radio", data["sniff_radio"].split(".")[2]) - # self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) - # time.sleep(30) - # self.stop_sniffer() + print("sniff radio", data["sniff_radio"].split(".")[2]) + self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) + time.sleep(30) + self.stop_sniffer() print("ssid not available in scan result") return "FAIL", "ssid not available in scan result" self.staConnect.resource = radio.split(".")[1] @@ -325,15 +360,6 @@ class lf_tests(lf_libs): data_table = "" dict_table = {} self.staConnect.setup(extra_securities=extra_securities) - # for sta_name in self.staConnect.station_names: - # try: - # sta_url = self.staConnect.get_station_url(sta_name.split(".")[2]) - # station_info = self.staConnect.json_get(sta_url) - # dict_data = station_info["interface"] - # dict_table[""] = list(dict_data.keys()) - # dict_table["Before"] = list(dict_data.values()) - # except Exception as e: - # print(e) if ssid_channel: pass #Need to start sniffer @@ -342,64 +368,89 @@ class lf_tests(lf_libs): self.staConnect.start() print("napping %f sec" % self.staConnect.runtime_secs) time.sleep(self.staConnect.runtime_secs) - self.get_station_data() - self.get_layer3_data() + print(self.staConnect.station_names) + sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"] + station_data = self.get_station_data(sta_name=self.staConnect.station_names, rows=sta_rows, allure_attach=False) + sta_table_dict = {} + sta_table_dict["station name"] = list(station_data.keys()) + for i in sta_rows: + temp_list = [] + for j in self.staConnect.station_names: + temp_list.append(station_data[j][i]) + sta_table_dict[i] = temp_list + #pass fail + pass_fail_sta = [] + for i in sta_table_dict["ip"]: + if i == "0.0.0.0": + pass_fail_sta.append("Fail") + else: + pass_fail_sta.append("Pass") + sta_table_dict["Pass/Fail"] = pass_fail_sta + if allure_attch: + self.attach_table_allure(data=sta_table_dict, allure_name="station data") self.staConnect.stop() - run_results = self.staConnect.get_result_list() - if not self.staConnect.passes(): - for result in run_results: - print("test result: " + result) - pytest.exit("Test Failed: Debug True") + cx_name = list(self.staConnect.l3_udp_profile.get_cx_names()) + list(self.staConnect.l3_tcp_profile.get_cx_names()) + cx_row = ["type", "bps rx a", "bps rx b"] + print(cx_name) + print(self.staConnect.get_result_list()) + print(self.staConnect.l3_udp_profile.get_cx_names()) + cx_data = self.get_cx_data(cx_name=cx_name, cx_data=cx_row, allure_attach=False) + print(cx_data) + cx_table_dict = {} + upstream = [] + for i in range(len(self.staConnect.station_names)): + upstream.append(data["upstream_port"]) + cx_table_dict["Upstream"] = upstream + cx_table_dict["Downstream"] = self.staConnect.station_names + cx_tcp_ul = [] + cx_tcp_dl = [] + cx_udp_ul = [] + cx_udp_dl = [] + for sta in self.staConnect.station_names: + for i in cx_data: + if sta.split(".")[2] in i: + if cx_data[i]["type"] == "LF/UDP": + cx_udp_dl.append(cx_data[i]["bps rx a"]) + cx_udp_ul.append(cx_data[i]["bps rx b"]) + elif cx_data[i]["type"] == "LF/TCP": + cx_tcp_dl.append(cx_data[i]["bps rx a"]) + cx_tcp_ul.append(cx_data[i]["bps rx b"]) + cx_table_dict["TCP DL"] = cx_tcp_dl + cx_table_dict["TCP UL"] = cx_tcp_ul + cx_table_dict["UDP DL"] = cx_udp_dl + cx_table_dict["UDP UL"] = cx_udp_ul + pass_fail_cx = [] + for i, j, k, l in zip(cx_tcp_dl, cx_tcp_ul, cx_udp_dl, cx_udp_ul): + if i == 0 or j == 0 or k == 0 or l == 0: + pass_fail_cx.append("Fail") + else: + pass_fail_cx.append("Pass") + cx_table_dict["Pass/Fail"] = pass_fail_cx + if allure_attch: + self.attach_table_allure(data=cx_table_dict, allure_name="cx data") self.staConnect.cleanup() - # try: - # supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + radio.split(".")[2] + ".txt" - # obj = SCP_File(ip=self.manager_ip, port=self.manager_ssh_port, username="root", password="lanforge", - # remote_path=supplicant, - # local_path=".") - # obj.pull_file() - # allure.attach.file(source="wpa_supplicant_log_" + radio.split(".")[2] + ".txt", - # name="supplicant_log") - # except Exception as e: - # print(e) - # - # for result in run_results: - # print("test result: " + result) - # result = "PASS" - # description = "Unknown error" - # dict_table = {} - # print("Client Connectivity :", self.staConnect.passes) - # endp_data = [] - # for i in self.staConnect.resulting_endpoints: - # endp_data.append(self.staConnect.resulting_endpoints[i]["endpoint"]) - # dict_table["key"] = [i for s in [d.keys() for d in endp_data] for i in s] - # dict_table["value"] = [i for s in [d.values() for d in endp_data] for i in s] - # data_table = report_obj.table2(table=dict_table, headers='keys') - # allure.attach(name="cx_data", body=data_table) - # for i in range(len(run_results)): - # if i == 0: - # if "FAILED" in run_results[i]: - # result = "FAIL" - # description = "Station did not get an ip" - # break - # else: - # if "FAILED" in run_results[i]: - # result = "FAIL" - # description = "did not report traffic" - # - # if self.staConnect.passes(): - # print("client connection to", self.staConnect.dut_ssid, "successful. Test Passed") - # result = "PASS" - # else: - # print("client connection to", self.staConnect.dut_ssid, "unsuccessful. Test Failed") - # result = "FAIL" - # time.sleep(3) + result = "PASS" + description = "Unknown error" + count = 0 + if "Fail" in pass_fail_sta: + count = count + 1 + result = "FAIL" + description = "Station did not get an ip" + if count == 0: + if "Fail" in pass_fail_cx: + result = "FAIL" + description = "did not report traffic" + if self.staConnect.passes(): + print("client connection to", self.staConnect.dut_ssid, "successful. Test Passed") + result = "PASS" + else: + print("client connection to", self.staConnect.dut_ssid, "unsuccessful. Test Failed") + result = "FAIL" + if ssid_channel: # need to stop sniffer - # if not self.skip_pcap: - # self.stop_sniffer() pass - self.set_radio_channel(radio=radio, channel="AUTO") - # return result, description + return result, description def enterprise_client_connectivity_test(self): pass @@ -419,7 +470,7 @@ class lf_tests(lf_libs): def multi_psk_test(self): pass - def scan_ssid(self, radio="", retry=1, allure_attach=True, scan_time=15, ssid=None): + def scan_ssid(self, radio="", retry=1, allure_attach=True, scan_time=15, ssid=None, ssid_channel=None): '''This method for scan ssid data''' count = 0 for i in range(retry + 1): @@ -516,53 +567,71 @@ class lf_tests(lf_libs): except Exception as e: print(e) - def get_station_data(self, rows=["ip", "signal"], sta_name=["1.1.sta0000", "1.1.sta0001"], allure_attach=True): + def get_station_data(self, rows=[], sta_name=[], allure_attach=True): """ Attach station data to allure e.g. rows = ["ip", "signal"] , sta_names = ["1.1.wlan0000", "1.1.wlan0001"] """ # dict for station data sta_dict = {} - for sta in sta_name: - sta_url = "port/" + str(sta.split(".")[0]) + "/" + str(sta.split(".")[1]) + "/" + str(sta.split(".")[2]) - station_info = self.staConnect.json_get(sta_url) - dict_data = station_info["interface"] - print("dict_data", dict_data) - temp_dict = {} - for i in rows: - temp_dict[i] = dict_data[i] - sta_dict[sta] = temp_dict - print(sta_dict) + try: + for sta in sta_name: + sta_url = "port/" + str(sta.split(".")[0]) + "/" + str(sta.split(".")[1]) + "/" + str(sta.split(".")[2]) + station_info = self.staConnect.json_get(sta_url) + dict_data = station_info["interface"] + temp_dict = {} + for i in rows: + temp_dict[i] = dict_data[i] + sta_dict[sta] = temp_dict + except Exception as e: + logging.error(e) + logging.info("station info: " + str(sta_dict)) # Creating dict for allure table - table_dict = {} - table_dict["station name"] = list(sta_dict.keys()) + station_table_dict = {} + station_table_dict["station name"] = list(sta_dict.keys()) for i in rows: temp_list = [] for j in sta_name: temp_list.append(sta_dict[j][i]) - table_dict[i] = temp_list - #pass fail - pass_fail = [] - for i in table_dict["ip"]: - if i == "0.0.0.0": - pass_fail.append("Fail") - else: - pass_fail.append("Pass") - table_dict["Pass/Fail"] = pass_fail + station_table_dict[i] = temp_list if allure_attach: - try: - report_obj = Report() - data_table = report_obj.table2(table=table_dict, headers='keys') - print(data_table) - allure.attach(name="station data", body=data_table) - except Exception as e: - logging.error(e) + self.attach_table_allure(data=station_table_dict, allure_name="station data") return sta_dict - def get_cx_data(self, sta_name=[], cx_data=[]): + def get_cx_data(self, cx_name=[], cx_data=[], allure_attach=True): """Attach cx data to allure""" + url = "cx/all" + #cx_data.append("type") + dict_cx_data = {} + cx_json_data = self.json_get(url) + print(cx_json_data) + try: + for sta_ in cx_name: + temp_dict = {} + for i in cx_data: + temp_dict[i] = cx_json_data[sta_][i] + dict_cx_data[sta_] = temp_dict + print(dict_cx_data) + except Exception as e: + logging.error(e) + print(dict_cx_data) + cx_table_dict = {} + cx_table_dict["cx name"] = list(dict_cx_data.keys()) + for i in cx_data: + temp_list = [] + for j in cx_name: + temp_list.append(dict_cx_data[j][i]) + print(i) + if i == "bps rx a": + cx_table_dict["Download"] = temp_list + elif i == "bps rx b": + cx_table_dict["Upload"] = temp_list + elif i == "type": + cx_table_dict["cx type"] = temp_list + if allure_attach: + self.attach_table_allure(data=cx_table_dict, allure_name="cx data") + return dict_cx_data - pass def get_station_list(self, num_sta=1, band="twog"): """Create station list""" @@ -577,6 +646,15 @@ class lf_tests(lf_libs): else: logging.error("band is wrong") + def attach_table_allure(self, data=None, allure_name=None): + """Attach table to allure.data should be dict.""" + try: + report_obj = Report() + data_table = report_obj.table2(table=data, headers='keys') + print(data_table) + allure.attach(name=allure_name, body=data_table) + except Exception as e: + logging.error(e) @@ -595,13 +673,13 @@ if __name__ == '__main__': "device_under_tests": [{ "model": "edgecore_eap101", "supported_bands": ["2G", "5G"], - "wan_port": "1.1.eth1", + "wan_port": "1.1.eth2", "supported_modes": ["BRIDGE", "NAT", "VLAN"], "ssid": { "2g-ssid": "OpenWifi", "5g-ssid": "OpenWifi", "6g-ssid": "candela6ghz", - "2g-password": "OpenWifi", + "2g-password": "OpenWifii", "5g-password": "OpenWifi", "6g-password": "hello123", "2g-encryption": "WPA2", @@ -626,41 +704,38 @@ if __name__ == '__main__': "testbed": "basic", "scenario": "dhcp-bridge", # dhcp-bridge / dhcp-external "details": { - "manager_ip": "10.28.3.34", + "manager_ip": "10.28.3.28", "http_port": 8080, "ssh_port": 22, "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, # method: build/load, - "wan_ports": { - "1.1.eth1": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { - "lease-first": 10, - "lease-count": 10000, - "lease-time": "6h" - }}}, - # DB : Default database name # "wan_ports": { - # "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { - # "lease-first": 10, - # "lease-count": 10000, - # "lease-time": "6h" - # }}}, - # "lan_ports": { - # "1.1.eth1": {"addressing": "dynamic"} # dhcp-server/{"addressing": "dynamic"}/{"addressing": - # # "static", "subnet": "10.28.2.6/16"} - # }, - # "uplink_nat_ports": { - # "1.1.eth3": {"addressing": "static", "subnet": "10.28.2.1/24", "gateway_ip": "10.28.2.1"} - # # dhcp-server/{"addressing": - # # "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"} - # }, + # "1.1.eth1": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + # "lease-first": 10, + # "lease-count": 10000, + # "lease-time": "6h" + # }}}, + # DB : Default database name + "wan_ports": { + "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + "lease-first": 10, + "lease-count": 10000, + "lease-time": "6h" + }}}, + "lan_ports": {}, + "uplink_nat_ports": { + "1.1.eth3": {"addressing": "static", "subnet": "10.28.2.1/24", "gateway_ip": "10.28.2.1"} + # dhcp-server/{"addressing": + # "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"} + } } } } obj = lf_tests(lf_data=dict(basic_1["traffic_generator"]), dut_data=list(basic_1["device_under_tests"]), log_level=logging.DEBUG, run_lf=True) - obj.get_station_data() + #obj.get_cx_data() # obj.chamber_view() - # obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[], - # station_name=["1.1.ath10k_2g000"], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=11) + obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[], + station_name=["1.1.ath10k_2g000", "1.1.ath10k_2g001"], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=11) # obj.chamber_view() # obj.setup_relevent_profiles()