diff --git a/lf_libs/__pycache__/lf_libs.cpython-39.pyc b/lf_libs/__pycache__/lf_libs.cpython-39.pyc new file mode 100644 index 00000000..6205d401 Binary files /dev/null and b/lf_libs/__pycache__/lf_libs.cpython-39.pyc differ diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index 69cfcb89..07937a1d 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -154,6 +154,7 @@ class lf_libs: logging.basicConfig(format='%(asctime)s - %(message)s', level=log_level) lf_data = dict(lf_data) self.dut_data = dut_data + self.dut_data = dut_data self.run_lf = run_lf self.dut_idx_mapping = {} # try: @@ -162,7 +163,6 @@ class lf_libs: self.scenario = lf_data.get("scenario") self.setup_lf_data() self.setup_relevent_profiles() - # self.load_scenario() self.setup_metadata() if self.scenario == "dhcp-bridge": logging.info("Scenario name: " + str(self.scenario)) @@ -174,9 +174,8 @@ class lf_libs: self.chamber_view(raw_lines="default") self.temp_raw_lines = self.default_scenario_raw_lines.copy() self.setup_dut() - # except Exception as e: - logging.error("lf_data has bad values: " + str(lf_data)) + # logging.error("lf_data has bad values: " + str(lf_data)) # logging.error(e) """ @@ -1241,6 +1240,43 @@ class lf_libs: allure.attach.file(source=path, name=file_name, attachment_type="CSV") return os.path.exists(path) + def get_supplicant_logs(self, radio="1.1.wiphy0", attach_allure=True): + try: + resource = radio.split(".")[1] + radio = radio.split(".")[2] + ip = self.get_manager_from_resource(resource=int(resource)) + if ip is not None: + supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + radio + ".txt" + obj = SCP_File(ip=ip, port=22, username="root", password="lanforge", + remote_path=supplicant, + local_path=".") + obj.pull_file() + if attach_allure: + allure.attach.file(source="wpa_supplicant_log_" + radio + ".txt", + name="wpa_supplicant_log - " + str(radio)) + except Exception as e: + logging.error("get_supplicant_logs() - Error in getting supplicant Logs: " + str(e)) + + def get_resources(self): + try: + d = self.json_get("/port/?fields=alias,ip") + resource_data = dict() + for i in d["interfaces"]: + if str(list(dict(i).keys())[0]).__contains__("eth0"): + resource_data[str(list(dict(i).keys())[0]).split(".")[1]] = i[str(list(dict(i).keys())[0])]["ip"] + logging.info("Resource ID and Management port Mapping: " + str(resource_data)) + except Exception as e: + logging.error(str(e)) + return resource_data + + def get_manager_from_resource(self, resource=1): + resource_data = self.get_resources() + try: + ip = resource_data[str(resource)] + except Exception as e: + logging.error("Resource is Unavailable when reading manager: " + str(e)) + ip = None + return ip class Report: diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index d8239527..5c0965de 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -1,5 +1,6 @@ import csv import importlib +import json import logging import os import sys @@ -59,10 +60,17 @@ class lf_tests(lf_libs): security="open", extra_securities=[], sta_mode=0, num_sta=1, mode="BRIDGE", vlan_id=[None], band="twog", allure_attach=True, runtime_secs=40): + + logging.info("DUT Data:\n" + json.dumps(str(dut_data), indent=2)) + allure.attach(name="DUT Data:\n", body=json.dumps(str(dut_data), indent=2), + attachment_type=allure.attachment_type.JSON) + data = self.setup_interfaces(ssid=ssid, bssid=bssid, passkey=passkey, encryption=security, band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta) - logging.info("Setup interface data" + str(data)) + logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2)) + allure.attach(name="Interface Info: \n", body=json.dumps(str(data), indent=2), + attachment_type=allure.attachment_type.JSON) if data == {}: pytest.skip("Skipping This Test") sta_connect_obj = [] @@ -100,14 +108,13 @@ class lf_tests(lf_libs): logging.info("ssid scan data : " + str(result)) if not result: # Sniffer required - # print("sniff radio", data["sniff_radio"].split(".")[2]) for duts in self.dut_data: identifier = duts["identifier"] if dut_data.keys().__contains__(identifier): if band == "twog": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ - dict(dut_data.get(identifier)[-1])["2G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["2G"][0] + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \ + dict(dut_data.get(identifier)["radio_data"])["2G"] is not None: + channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"] if data[dut]["sniff_radio_2g"] is not None: self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_2g"].split(".")[2], @@ -115,9 +122,9 @@ class lf_tests(lf_libs): time.sleep(10) self.stop_sniffer() elif band == "fiveg": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ - dict(dut_data.get(identifier)[-1])["5G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["5G"][0] + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \ + dict(dut_data.get(identifier)["radio_data"])["5G"] is not None: + channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"] if data[dut]["sniff_radio_5g"] is not None: self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_5g"].split(".")[2], @@ -125,9 +132,9 @@ class lf_tests(lf_libs): time.sleep(10) self.stop_sniffer() elif band == "sixg": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ - dict(dut_data.get(identifier)[-1])["6G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["6G"][0] + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \ + dict(dut_data.get(identifier)["radio_data"])["6G"] is not None: + channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"] if data[dut]["sniff_radio_6g"] is not None: self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_6g"].split(".")[2], @@ -142,9 +149,9 @@ class lf_tests(lf_libs): identifier = dut_["identifier"] if dut_data.keys().__contains__(identifier): if band == "twog": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ - dict(dut_data.get(identifier)[-1])["2G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["2G"][0] + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \ + dict(dut_data.get(identifier)["radio_data"])["2G"] is not None: + channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"] self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_2g"].split(".")[2], duration=runtime_secs) logging.info("started-sniffer") @@ -155,9 +162,9 @@ class lf_tests(lf_libs): logging.info("stopping-sniffer") self.stop_sniffer() elif band == "fiveg": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ - dict(dut_data.get(identifier)[-1])["5G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["5G"][0] + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \ + dict(dut_data.get(identifier)["radio_data"])["5G"] is not None: + channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"] self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_5g"].split(".")[2], duration=runtime_secs) for obj in sta_connect_obj: @@ -166,9 +173,9 @@ class lf_tests(lf_libs): time.sleep(runtime_secs) self.stop_sniffer() elif band == "sixg": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ - dict(dut_data.get(identifier)[-1])["6G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["6G"][0] + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \ + dict(dut_data.get(identifier)["radio_data"])["6G"] is not None: + channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"] self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_6g"].split(".")[2], duration=runtime_secs) for obj in sta_connect_obj: @@ -178,9 +185,8 @@ class lf_tests(lf_libs): self.stop_sniffer() else: for obj in sta_connect_obj: - print(obj) obj.start() - print("napping %f sec" % runtime_secs) + logging.info("napping %f sec" % runtime_secs) time.sleep(runtime_secs) pass_fail_result = [] for obj in sta_connect_obj: @@ -266,7 +272,12 @@ class lf_tests(lf_libs): else: logging.info("client connection to" + str(obj.dut_ssid) + "unsuccessful. Test Failed") result = "FAIL" - + for obj in sta_connect_obj: + try: + print("1." + str(obj.resource) + "." + str(obj.radio)) + self.get_supplicant_logs(radio=str(obj.radio)) + except Exception as e: + logging.error("client_cpnnectivity_tests() -- Error in getting Supplicant Logs:" + str(e)) result = "PASS" description = "" for i in pass_fail_result: @@ -324,7 +335,6 @@ class lf_tests(lf_libs): for obj in client_connect_obj: obj.build() result = obj.wait_for_ip(station_list=obj.sta_list, timeout_sec=50) - # print(self.client_connect.wait_for_ip(station_name)) pass_fail.append(result) station_data_ = self.get_station_data(sta_name=obj.sta_list, rows=station_data, allure_attach=False) @@ -841,11 +851,11 @@ if __name__ == '__main__': "password": "OpenWifi%123" }, "device_under_tests": [{ - "model": "edgecore_ecw5211", - "supported_bands": ["2G", "5G"], + "model": "cig_wf196", + "supported_bands": ["2G", "5G", "6G"], "supported_modes": ["BRIDGE", "NAT", "VLAN"], - "wan_port": "1.1.eth2", - "lan_port": "1.1.eth1", + "wan_port": "1.3.eth2", + "lan_port": None, "ssid": { "2g-ssid": "OpenWifi", "5g-ssid": "OpenWifi", @@ -853,15 +863,15 @@ if __name__ == '__main__': "2g-password": "OpenWifi", "5g-password": "OpenWifi", "6g-password": "OpenWifi", - "2g-encryption": "OPEN", - "5g-encryption": "OPEN", - "6g-encryption": "OPEN", + "2g-encryption": "WPA2", + "5g-encryption": "WPA2", + "6g-encryption": "WPA3", "2g-bssid": "68:7d:b4:5f:5c:31", "5g-bssid": "68:7d:b4:5f:5c:3c", "6g-bssid": "68:7d:b4:5f:5c:38" }, - "mode": "wifi5", - "identifier": "68215fda456d", + "mode": "wifi6e", + "identifier": "824f816011e4", "method": "serial", "host_ip": "localhost", "host_username": "lanforge", @@ -880,7 +890,7 @@ if __name__ == '__main__': "ssh_port": 22, "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, "wan_ports": { - "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + "1.3.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { "lease-first": 10, "lease-count": 10000, "lease-time": "6h" @@ -888,9 +898,7 @@ if __name__ == '__main__': } }, "lan_ports": { - "1.1.eth1": { - "addressing": "dynamic" - } + }, "uplink_nat_ports": { "1.1.eth3": { @@ -921,7 +929,7 @@ if __name__ == '__main__': # vlan_id=100, num_sta=5, scan_ssid=True, # station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"], # allure_attach=True) - # obj.add_vlan(vlan_ids=[100]) + # obj.add_vlan(vlan_iFds=[100]) # obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600]) # obj.get_cx_data() # obj.chamber_view() diff --git a/lf_libs/scan_ssid.csv b/lf_libs/scan_ssid.csv new file mode 100644 index 00000000..b761d1f4 --- /dev/null +++ b/lf_libs/scan_ssid.csv @@ -0,0 +1,4 @@ +,age,auth,beacon,bss,channel,entity id,frequency,info,signal,ssid +0,4365,WPA2,100,00:06:ae:6d:f0:fe,1,1.1.8,2412,2x2 MCS 0-11 AX,-39.0,dual_band +1,4096,Open,100,68:21:5f:da:45:6f,6,1.1.8,2437,2x2 MIMO,-41.0,ssid_open_2g_nat +2,35835829,WPA2,100,6a:21:5f:da:45:6f,6,1.1.8,2437,2x2 MIMO,-42.0,OpenWifi diff --git a/lf_libs/sh-results.csv b/lf_libs/sh-results.csv new file mode 100644 index 00000000..8a6722e5 --- /dev/null +++ b/lf_libs/sh-results.csv @@ -0,0 +1 @@ +Time epoch,Time,Station-Count,UL-Min-Requested,UL-Max-Requested,DL-Min-Requested,DL-Max-Requested,UL-Min-PDU,UL-Max-PDU,DL-Min-PDU,DL-Max-PDU,UDP-Upload-bps,TCP-Upload-bps,UDP-Download-bps,TCP-Download-bps,Total-UDP/TCP-Upload-bps,Total-UDP/TCP-Download-bps,Total-UDP/TCP-UL/DL-bps