mirror of
				https://github.com/Telecominfraproject/wlan-testing.git
				synced 2025-10-30 18:38:06 +00:00 
			
		
		
		
	Add: 11r OTA 6g-6g wpa3 on same channel test (#925)
Signed-off-by: anil-tegala <anil.tegala@candelatech.com>
This commit is contained in:
		 Anil Kumar Tegala
					Anil Kumar Tegala
				
			
				
					committed by
					
						 GitHub
						GitHub
					
				
			
			
				
	
			
			
			 GitHub
						GitHub
					
				
			
						parent
						
							d1c0b4ec7c
						
					
				
				
					commit
					c67df53655
				
			| @@ -588,6 +588,110 @@ class TestRoamOTA(object): | ||||
|               iteration=1, channel="36", option="ota", dut_name=dut_names, traffic_type="lf_udp", sta_type="11r-sae") | ||||
|         assert pass_fail, message | ||||
|  | ||||
|     @pytest.mark.same_channel | ||||
|     @pytest.mark.wpa3_personal | ||||
|     @pytest.mark.sae | ||||
|     @pytest.mark.sixg | ||||
|     @pytest.mark.roam | ||||
|     def test_roam_6g_to_6g_sc_psk_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed): | ||||
|         """ | ||||
|             Test Roaming between two APs, Same channel, 6G, WPA3 Personal | ||||
|             pytest -m "roam and sixg and same_channel and wpa3_personal" | ||||
|         """ | ||||
|         ap_data = dict() | ||||
|         dut_list = [str(selected_testbed)] | ||||
|         dut_names = list() | ||||
|         bssid_list = list() | ||||
|         testbed_info = get_lab_info.CONFIGURATION | ||||
|         if str(selected_testbed + 'a') in testbed_info: | ||||
|             dut_list.append(str(selected_testbed + 'a')) | ||||
|         logging.info(f"dut list: {dut_list}--") | ||||
|         config_data['radios'] = [ | ||||
|             { | ||||
|                 "band": "6G", | ||||
|                 "channel": 161, | ||||
|                 "channel-mode": "HE", | ||||
|                 "channel-width": 80, | ||||
|                 "country": "US" | ||||
|             } | ||||
|         ] | ||||
|         # change wifi-band and security type to sae | ||||
|         config_data['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"] | ||||
|         config_data['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae" | ||||
|         if len(dut_list) < 2: | ||||
|             logging.error( | ||||
|                 f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") | ||||
|             assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" | ||||
|         for ap in range(len(dut_list)): | ||||
|             serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] | ||||
|             dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) | ||||
|             logging.info(config_data) | ||||
|             payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} | ||||
|             uri = get_target_object.controller_library_object.build_uri( | ||||
|                 "device/" + serial_number + "/configure") | ||||
|             logging.info("Sending Command: " + "\n" + str(uri) + "\n" + | ||||
|                          "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + | ||||
|                          "Data: " + str(json.dumps(payload, indent=2)) + "\n" + | ||||
|                          "Headers: " + str(get_target_object.controller_library_object.make_headers())) | ||||
|             allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" + | ||||
|                                                                               "TimeStamp: " + str( | ||||
|                 datetime.datetime.utcnow()) + "\n" + | ||||
|                                                                               "Data: " + str(payload) + "\n" + | ||||
|                                                                               "Headers: " + str( | ||||
|                 get_target_object.controller_library_object.make_headers())) | ||||
|             resp = requests.post(uri, data=json.dumps(payload, indent=2), | ||||
|                                  headers=get_target_object.controller_library_object.make_headers(), | ||||
|                                  verify=False, timeout=120) | ||||
|             time.sleep(10) | ||||
|             logging.info(resp.json()) | ||||
|             allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) | ||||
|             if resp.status_code != 200: | ||||
|                 if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ | ||||
|                         resp.json()["ErrorDescription"]: | ||||
|                     time.sleep(30) | ||||
|                     resp = requests.post(uri, data=json.dumps(payload, indent=2), | ||||
|                                          headers=get_target_object.controller_library_object.make_headers(), | ||||
|                                          verify=False, timeout=120) | ||||
|                     time.sleep(10) | ||||
|                     logging.info(resp.json()) | ||||
|                 else: | ||||
|                     assert False, f"push configuration to {serial_number} got failed" | ||||
|             get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ | ||||
|                 "device_under_tests"] | ||||
|             ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) | ||||
|             if str(ap_iwinfo) != "Error: pop from empty list": | ||||
|                 interfaces = {} | ||||
|                 interface_matches = re.finditer( | ||||
|                     r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, | ||||
|                     re.DOTALL) | ||||
|                 logging.info(str(interface_matches)) | ||||
|                 if interface_matches: | ||||
|                     for match in interface_matches: | ||||
|                         interface_name = f'wlan{match.group(0)[4]}' | ||||
|                         access_point = match.group(1) | ||||
|                         channel = match.group(2).strip() | ||||
|                         interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} | ||||
|                         ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) | ||||
|                     logging.info(interfaces) | ||||
|                 else: | ||||
|                     logging.error("Failed to get iwinfo") | ||||
|                     pytest.exit("Failed to get iwinfo") | ||||
|             elif ap_iwinfo == {}: | ||||
|                 assert False, "Empty iwinfo reponse from AP through minicom" | ||||
|             else: | ||||
|                 assert False, "Failed to get iwinfo from minicom" | ||||
|         logging.info(f"AP Data from iwinfo: {ap_data}") | ||||
|         for serial in ap_data: | ||||
|             bssid_list.append(ap_data[serial]['Access Point']) | ||||
|         ssid = config_data['interfaces'][0]["ssids"][0]["name"] | ||||
|         key = config_data['interfaces'][0]["ssids"][0]["encryption"]["key"] | ||||
|         pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], | ||||
|                                                         band="sixg", num_sta=1, security="wpa3", security_key=key, | ||||
|                                                         ssid=ssid, upstream="1.1.eth1", duration=None, | ||||
|                                                         iteration=1, channel="161", option="ota", dut_name=dut_names, | ||||
|                                                         traffic_type="lf_udp", sta_type="11r-sae") | ||||
|         assert pass_fail, message | ||||
|  | ||||
|     @pytest.mark.same_channel | ||||
|     @pytest.mark.wpa2_enterprise | ||||
|     @pytest.mark.twog | ||||
|   | ||||
		Reference in New Issue
	
	Block a user