From 3246c28be28618f4e1a53f60d79e9517ac7f369f Mon Sep 17 00:00:00 2001 From: shivam Date: Thu, 11 Aug 2022 05:22:41 +0530 Subject: [PATCH] Added sniffer option in client_connectivity_test Signed-off-by: shivam --- lf_libs/lf_tests.py | 156 ++++++++++++++++++++++++++++++++------------ 1 file changed, 115 insertions(+), 41 deletions(-) diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 4779f457..7d832596 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -71,7 +71,9 @@ class lf_tests(lf_libs): pytest.exit() if mode == "BRIDGE": upstream_port = self.upstream_port() - elif mode == "NAT": + elif mode == "NAT-WAN": + upstream_port = self.upstream_port() + elif mode == "NAT-LAN": upstream_port = self.upstream_port() elif mode == "VLAN": # for vlan mode vlan id should be available @@ -317,7 +319,8 @@ class lf_tests(lf_libs): sniff_radio = left_radio[0] return sniff_radio - def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[], + def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", dut_data={}, + security="open", extra_securities=[], num_sta=1, mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None, allure_attch=True, runtime_secs=40): # self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug) @@ -349,9 +352,34 @@ class lf_tests(lf_libs): if not result and ssid_channel: # Sniffer required # print("sniff radio", data["sniff_radio"].split(".")[2]) - # self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) - # time.sleep(30) - # self.stop_sniffer() + for dut in self.dut_data: + identifier = dut["identifier"] + if dut_data.keys().__contains__(identifier): + if band == "twog": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ + dict(dut_data.get(identifier)[-1])["2G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["2G"][0] + self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2], + duration=10) + time.sleep(10) + self.stop_sniffer() + elif band == "fiveg": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ + dict(dut_data.get(identifier)[-1])["5G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["5G"][0] + self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2], + duration=10) + time.sleep(10) + self.stop_sniffer() + elif band == "sixg": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ + dict(dut_data.get(identifier)[-1])["6G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["6G"][0] + self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2], + duration=10) + time.sleep(10) + self.stop_sniffer() + # print("ssid not available in scan result") # return "FAIL", "ssid not available in scan result" pass @@ -368,17 +396,55 @@ class lf_tests(lf_libs): obj_sta_connect.side_a_pdu = 1200 obj_sta_connect.side_b_pdu = 1500 obj_sta_connect.setup(extra_securities=extra_securities) + print("after-setup") if ssid_channel: pass # Need to start sniffer # print("sniff radio", data["sniff_radio"].split(".")[2]) # self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) sta_connect_obj.append(obj_sta_connect) - for obj in sta_connect_obj: - print(obj) - obj.start() - print("napping %f sec" % runtime_secs) - time.sleep(runtime_secs) + print("after-adding-object") + for dut in self.dut_data: + identifier = dut["identifier"] + if dut_data.keys().__contains__(identifier): + if band == "twog": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ + dict(dut_data.get(identifier)[-1])["2G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["2G"][0] + self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2], + duration=runtime_secs) + print("started-sniffer") + for obj in sta_connect_obj: + print(obj) + obj.start() + print("napping %f sec" % runtime_secs) + time.sleep(runtime_secs) + print("stopping-sniffer") + self.stop_sniffer() + elif band == "fiveg": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ + dict(dut_data.get(identifier)[-1])["5G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["5G"][0] + self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2], + duration=runtime_secs) + for obj in sta_connect_obj: + print(obj) + obj.start() + print("napping %f sec" % runtime_secs) + time.sleep(runtime_secs) + self.stop_sniffer() + elif band == "sixg": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ + dict(dut_data.get(identifier)[-1])["6G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["6G"][0] + self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2], + duration=runtime_secs) + for obj in sta_connect_obj: + print(obj) + obj.start() + print("napping %f sec" % runtime_secs) + time.sleep(runtime_secs) + self.stop_sniffer() pass_fail_result = [] for obj in sta_connect_obj: print(obj.station_names) @@ -448,6 +514,7 @@ class lf_tests(lf_libs): description = "Unknown error" count = 0 temp_dict = {} + print(pass_fail_sta) if "Fail" in pass_fail_sta: count = count + 1 result = "FAIL" @@ -474,12 +541,14 @@ class lf_tests(lf_libs): pass result = "PASS" description = "" + print(pass_fail_sta) for i in pass_fail_result: if i == "FAIL": result = "FAIL" description = pass_fail_result[i] break + print(result) return result, description def enterprise_client_connectivity_test(self): @@ -594,7 +663,7 @@ class lf_tests(lf_libs): self.pcap_obj.monitor.admin_down() time.sleep(2) self.pcap_obj.cleanup() - lf_report.pull_reports(hostname=self.manager_ip, port=self.manager_http_port, username="lanforge", + lf_report.pull_reports(hostname=self.manager_ip, port=self.manager_ssh_port, username="lanforge", password="lanforge", report_location="/home/lanforge/" + self.pcap_name, report_dir=".") @@ -730,78 +799,83 @@ class lf_tests(lf_libs): if __name__ == '__main__': basic_1 = { "target": "tip_2x", - "controller": { + "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", "password": "OpenWifi%123" }, "device_under_tests": [{ - "model": "edgecore_eap101", + "model": "cig_wf188n", "supported_bands": ["2G", "5G"], - "wan_port": "1.1.eth2", "supported_modes": ["BRIDGE", "NAT", "VLAN"], + "wan_port": "1.1.eth2", "ssid": { "2g-ssid": "OpenWifi", "5g-ssid": "OpenWifi", - "6g-ssid": "candela6ghz", + "6g-ssid": "OpenWifi", "2g-password": "OpenWifi", "5g-password": "OpenWifi", - "6g-password": "hello123", + "6g-password": "OpenWifi", "2g-encryption": "WPA2", - "5g-encryption": "open", + "5g-encryption": "WPA2", "6g-encryption": "WPA3", - "2g-bssid": "68:7d:b4:5f:5c:31 ", + "2g-bssid": "68:7d:b4:5f:5c:31", "5g-bssid": "68:7d:b4:5f:5c:3c", "6g-bssid": "68:7d:b4:5f:5c:38" }, "mode": "wifi6", - "identifier": "c44bd1005b30", - "serial_port": True, - "host_ip": "10.28.3.100", + "identifier": "0000c1018812", + "method": "serial", + "host_ip": "10.28.3.103", "host_username": "lanforge", "host_password": "pumpkin77", "host_ssh_port": 22, - "serial_tty": "/dev/ttyAP8", + "serial_tty": "/dev/ttyAP1", "firmware_version": "next-latest" }], "traffic_generator": { "name": "lanforge", "testbed": "basic", - "scenario": "dhcp-bridge", # dhcp-bridge / dhcp-external + "scenario": "dhcp-bridge", "details": { - "manager_ip": "10.28.3.6", + "manager_ip": "10.28.3.28", "http_port": 8080, "ssh_port": 22, - "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, # method: build/load, - # "wan_ports": { - # "1.1.eth1": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { - # "lease-first": 10, - # "lease-count": 10000, - # "lease-time": "6h" - # }}}, - # DB : Default database name + "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, "wan_ports": { "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { "lease-first": 10, "lease-count": 10000, "lease-time": "6h" - }}}, - "lan_ports": {}, + } + } + }, + "lan_ports": { + + }, "uplink_nat_ports": { - "1.1.eth3": {"addressing": "static", "subnet": "10.28.2.1/24", "gateway_ip": "10.28.2.1"} - # dhcp-server/{"addressing": - # "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"} + "1.1.eth1": {"addressing": "static", "subnet": "10.28.2.16/24", "gateway_ip": "10.28.2.1"} } } } } obj = lf_tests(lf_data=dict(basic_1["traffic_generator"]), dut_data=list(basic_1["device_under_tests"]), - log_level=logging.DEBUG, run_lf=True) + log_level=logging.DEBUG, run_lf=False) + obj.start_sniffer(radio_channel=1, radio="wiphy7", test_name="sniff_radio", duration=30) + print("started") + time.sleep(30) + obj.stop_sniffer() + # lf_report.pull_reports(hostname="10.28.3.28", port=22, username="lanforge", + # password="lanforge", + # report_location="/home/lanforge/" + "sniff_radio.pcap", + # report_dir=".") + # def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): + # # obj.get_cx_data() # obj.chamber_view() - obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[], - num_sta=10, mode="BRIDGE", vlan_id=1, - band="twog", ssid_channel=11) + # obj.client_connectivity_test(ssid="wpa2_5g", passkey="something", security="wpa2", extra_securities=[], + # num_sta=1, mode="BRIDGE", vlan_id=1, + # band="fiveg", ssid_channel=36) # obj.chamber_view() # obj.setup_relevent_profiles()