Update OTA roaming testcases and Add enterprise TLS and TTLS testcases

Signed-off-by: bhargavi-ct <bhargavimamidipaka@candelatech.com>
This commit is contained in:
bhargavi-ct
2025-10-15 17:49:01 +05:30
parent a4d3d708d6
commit 98766cfc3a
16 changed files with 1985 additions and 4977 deletions

View File

@@ -1,113 +0,0 @@
{
"uuid": 2,
"radios": [
{
"band": "2G",
"channel": 11,
"channel-mode": "HE",
"channel-width": 40,
"country": "CA"
},
{
"band": "5G",
"channel": 36,
"channel-mode": "HE",
"channel-width": 80,
"country": "CA"
}
],
"interfaces": [
{
"name": "WAN",
"role": "upstream",
"services": [
"lldp"
],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
},
"ssids": [
{
"name": "OpenWifi",
"wifi-bands": [
"2G","5G"
],
"bss-mode": "ap",
"encryption": {
"proto": "psk2",
"key": "Openwifi",
"ieee80211w": "optional"
},
"roaming": true,
"services": [
"wifi-steering"
]
}
]
},
{
"name": "LAN",
"role": "downstream",
"services": [
"ssh",
"lldp"
],
"ethernet": [
{
"select-ports": [
"LAN*"
]
}
],
"ipv4": {
"addressing": "static",
"subnet": "192.168.1.1/24",
"dhcp": {
"lease-first": 10,
"lease-count": 100,
"lease-time": "6h"
}
}
}
],
"metrics": {
"statistics": {
"interval": 120,
"types": [
"ssids",
"lldp",
"clients"
]
},
"health": {
"interval": 120
},
"wifi-frames": {
"filters": [
"probe",
"auth"
]
}
},
"services": {
"wifi-steering": {
"mode": "local",
"network": "upstream",
"assoc-steering": true,
"required-snr": -85,
"required-probe-snr": -80,
"required-roam-snr": -80,
"load-kick-threshold": 90
},
"ssh": {
"port": 22
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,116 +0,0 @@
{
"uuid": 2,
"radios": [
{
"band": "2G",
"channel": 11,
"channel-mode": "HE",
"channel-width": 40,
"country": "CA"
},
{
"band": "5G",
"channel": 36,
"channel-mode": "HE",
"channel-width": 80,
"country": "CA"
}
],
"interfaces": [
{
"name": "WAN",
"role": "upstream",
"services": [
"lldp"
],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
},
"ssids": [
{
"name": "OpenWifi",
"wifi-bands": [
"2G","5G"
],
"bss-mode": "ap",
"encryption": {
"proto": "psk2",
"key": "OpenWifi",
"ieee80211w": "optional"
},
"roaming": {
"message-exchange": "ds",
"generate-psk": true
},
"services": [
"wifi-steering"
]
}
]
},
{
"name": "LAN",
"role": "downstream",
"services": [
"ssh",
"lldp"
],
"ethernet": [
{
"select-ports": [
"LAN*"
]
}
],
"ipv4": {
"addressing": "static",
"subnet": "192.168.1.1/24",
"dhcp": {
"lease-first": 10,
"lease-count": 100,
"lease-time": "6h"
}
}
}
],
"metrics": {
"statistics": {
"interval": 120,
"types": [
"ssids",
"lldp",
"clients"
]
},
"health": {
"interval": 120
},
"wifi-frames": {
"filters": [
"probe",
"auth"
]
}
},
"services": {
"wifi-steering": {
"mode": "local",
"network": "upstream",
"assoc-steering": true,
"required-snr": -85,
"required-probe-snr": -80,
"required-roam-snr": -80,
"load-kick-threshold": 90
},
"ssh": {
"port": 22
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,170 +0,0 @@
# import time
#
# import pytest
# import allure
# # from configuration import CONFIGURATION
#
# pytestmark = [pytest.mark.roam_test, pytest.mark.bridge]
#
# # setup_params_general = {
# # "mode": "BRIDGE",
# # "roam": False,
# # "ssid_modes": {
# # "wpa2_personal": [
# # {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security_key": "something"},
# # {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something"}]},
# # "rf": {},
# # "radius": False
# # }
# setup_params_general = {
# "mode": "BRIDGE",
# "roam": True,
# "ft+psk": True,
# "ssid_modes": {
# "wpa2_personal": [{"ssid_name": "RoamAP2g", "appliedRadios": ["2G"], "security_key": "something"},
# {"ssid_name": "RoamAP5g", "appliedRadios": ["5G"], "security_key": "something"}],
#
# "wpa3_personal": [{"ssid_name": "RoamAP6g", "appliedRadios": ["6G"], "security_key": "something"}]
# },
# # "roam_type": "fiveg_to_fiveg",
# "rf": {},
# "radius": False
# }
# @allure.suite("Roam Test with attenuator")
# @allure.feature("Roam Test")
# # @pytest.mark.parametrize(
# # 'setup_configuration',
# # [setup_params_general],
# # indirect=True,
# # scope="class"
# # )
# # @pytest.mark.usefixtures("setup_configuration")
# # @allure.step
# # def nested_step_allure(bssid, rssi):
# # pass
#
# class TestBasicRoam(object):
#
# @pytest.mark.roam_5g
# @pytest.mark.bob
# @pytest.mark.wpa2_personal
# def test_basic_roam_5g_to_5g(self, lf_test, lf_reports, station_names_fiveg, lf_tools, run_lf, add_env_properties,
# instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs):
# profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0]
# ssid_name = profile_data["ssid_name"]
# security_key = profile_data["security_key"]
# security = "wpa2"
# mode = "BRIDGE"
# band = "fiveg"
# vlan = 1
# # lf_test.basic_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools,
# # lf_reports=lf_reports,
# # instantiate_profile=instantiate_profile,
# # ssid_name=ssid_name, security=security, security_key=security_key,
# # mode=mode, band=band, station_name=station_names_fiveg, vlan=vlan, test="5g")
#
#
# @pytest.mark.multi_roam
# @pytest.mark.wpa2_personal
# @pytest.mark.wpa3_personal
# def test_multi_roam_5g_to_5g_soft_roam_11r(self, lf_test, lf_reports, station_names_fiveg, lf_tools, run_lf, add_env_properties,
# instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs):
# profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1]
# ssid_name = profile_data["ssid_name"]
# security_key = profile_data["security_key"]
# security = "wpa2"
# mode = "BRIDGE"
# band = "fiveg"
# vlan = 1
# print("starting snifer")
# # lf_test.start_sniffer(radio_channel=36, radio="wiphy2", test_name="roam_11r", duration=3600)
# lf_test.create_n_clients(sta_prefix="wlan", num_sta=2, dut_ssid=ssid_name,
# dut_security=security, dut_passwd=security_key, radio="wiphy1", lf_tools=lf_tools,
# type="11r")
#
# # lf_test.multi_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools,
# # lf_reports=lf_reports,
# # instantiate_profile=instantiate_profile,
# # ssid_name=ssid_name, security=security, security_key=security_key,
# # mode=mode, band=band, station_name=station_names_fiveg, vlan=vlan, test="5g")
# #
# # print("stop sniff")
# # file_name = lf_test.stop_sniffer()
# # print(file_name)
# # print("wait for logs to be attached")
# # file_name = "roam_11r2022-03-23-00-02.pcap"
# # time.sleep(10)
# # query_auth = lf_test. query_sniff_data(pcap_file=str(file_name), filter="wlan.fc.type_subtype==0x000b")
# # print("query", query_auth)
# # allure.attach(name="authentication", body=str(query_auth))
# # query_asso = lf_test.query_sniff_data(pcap_file=str(file_name), filter="wlan.fc.type_subtype==0x0000")
# # print("query", query_asso)
# # allure.attach(name="authentication", body=str(query_asso))
# # query_reasso_response = lf_test.query_sniff_data(pcap_file=str(file_name), filter="(wlan.fc.type_subtype==3) && (wlan.tag.number==55)")
# # print("query", query_reasso_response)
# # allure.attach(name="authentication", body=str(query_reasso_response))
# # query_4way = lf_test.query_sniff_data(pcap_file=str(file_name), filter="eapol")
# # print("query", query_4way)
# # allure.attach(name="authentication", body=str(query_4way))
# #
#
# @pytest.mark.hard
# @pytest.mark.wpa2_personal
# def test_multi_hard_roam_5g_to_5g(self, lf_test, lf_reports, station_names_fiveg, lf_tools, run_lf, add_env_properties,
# instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs):
# profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1]
# ssid_name = profile_data["ssid_name"]
# security_key = profile_data["security_key"]
# security = "wpa2"
# mode = "BRIDGE"
# band = "fiveg"
# vlan = 1
# # lf_test.multi_hard_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools,
# # lf_reports=lf_reports,
# # instantiate_profile=instantiate_profile,
# # ssid_name=ssid_name, security=security, security_key=security_key,
# # mode=mode, band=band, station_name=station_names_fiveg, vlan=vlan, test="5g", iteration=2, num_sta=1)
#
# @pytest.mark.testing
# def test_testing(self, lf_test):
# ret = lf_test.sniff_full_data(pcap_file="roam_11r2022-03-25-13-27.pcap")
# print(ret)
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#

View File

@@ -1,208 +0,0 @@
# import time
#
# import pytest
# import allure
# # from configuration import CONFIGURATION
#
# pytestmark = [pytest.mark.roam_test, pytest.mark.bridge]
#
# setup_params_general = {
# "mode": "BRIDGE",
# "roam": False,
# "ssid_modes": {
# "wpa2_personal": [
# {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security_key": "something"},
# {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something"}],
# "wpa3_personal": [
# {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["6G"], "security_key": "something"}
# ]},
# "rf": {},
# "radius": False
# }
# @allure.suite("Roam Test with attenuator")
# @allure.feature("Roam Test")
# # @pytest.mark.parametrize(
# # 'setup_configuration',
# # [setup_params_general],
# # indirect=True,
# # scope="class"
# # )
# # @pytest.mark.usefixtures("setup_configuration")
#
# class TestBasicRoam(object):
#
#
# @pytest.mark.roam_2g
# @pytest.mark.wpa2_personal
# def test_basic_roam_2g(self, lf_test, station_names_twog, lf_tools, run_lf, add_env_properties,
# instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs):
#
# profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0]
# ssid_name = profile_data["ssid_name"]
# security_key = profile_data["security_key"]
# security = "wpa2"
# mode = "BRIDGE"
# band = "twog"
# vlan = 1
#
# # calling basic roam from lf_test
# # roam =lf_test.basic_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, instantiate_profile=instantiate_profile,
# # ssid_name=ssid_name, security=security, security_key=security_key,
# # mode=mode, band=band, station_name=station_names_twog, vlan=vlan, test="2g")
# # if roam:
# # assert True
# # else:
# # assert False
#
#
# @pytest.mark.roam_5g
# @pytest.mark.wpa2_personal
# def test_basic_roam_5g(self, lf_test, station_names_fiveg, lf_tools, run_lf, add_env_properties,
# instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs):
# profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1]
# ssid_name = profile_data["ssid_name"]
# security_key = profile_data["security_key"]
# security = "wpa2"
# mode = "BRIDGE"
# band = "fiveg"
# vlan = 1
# # roam = lf_test.basic_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools,
# # instantiate_profile=instantiate_profile,
# # ssid_name=ssid_name, security=security, security_key=security_key,
# # mode=mode, band=band, station_name=station_names_fiveg, vlan=vlan, test="5g")
# # if roam:
# # assert True
# # else:
# # assert False
#
# @pytest.mark.sixg
# @pytest.mark.wpa2_personal
# @pytest.mark.wpa3_personal
# def test_basic_roam_6g(self):
# pass
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
# # @pytest.mark.multi_roam_2g
# # @pytest.mark.wpa2_personl
# # def test_multiple_roam_2g(self, get_configuration, lf_test, station_names_twog, lf_tools, run_lf, add_env_properties,
# # instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs):
# # profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0]
# # ssid_name = profile_data["ssid_name"]
# # security_key = profile_data["security_key"]
# # security = "wpa2"
# # mode = "BRIDGE"
# # band = "twog"
# # vlan = 1
# # c1_2g_bssid = ""
# # c2_2g_bssid = ""
# # if run_lf:
# # c1_2g_bssid = get_configuration["access_point"][0]["ssid"]["2g-bssid"]
# # allure.attach(name="bssid of ap1", body=c1_2g_bssid)
# # c2_2g_bssid = get_configuration["access_point"][1]["ssid"]["2g-bssid"]
# # allure.attach(name="bssid of ap2", body=c2_2g_bssid)
# # ssid_name = get_configuration["access_point"][0]["ssid"]["2g-ssid"]
# # else:
# # for ap_name in range(len(get_configuration['access_point'])):
# # instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'],
# # timeout="10", ap_data=get_configuration['access_point'], type=ap_name)
# # bssid_2g = instantiate_profile_obj.cal_bssid_2g()
# # if ap_name == 0 :
# # c1_2g_bssid = bssid_2g
# # if ap_name == 1:
# # c2_2g_bssid = bssid_2g
# # print("bssid of c1 ", c1_2g_bssid)
# # allure.attach(name="bssid of ap1", body=c1_2g_bssid)
# # print("bssid of c2", c2_2g_bssid)
# # allure.attach(name="bssid of ap2", body=c2_2g_bssid)
# #
# #
# # ser_no = lf_test.attenuator_serial()
# # print(ser_no[0])
# # ser_1 = ser_no[0].split(".")[2]
# # ser_2 = ser_no[1].split(".")[2]
# # lf_tools.add_stations(band="2G", num_stations=3, dut=lf_tools.dut_name, ssid_name=ssid_name)
# # lf_tools.Chamber_View()
# # sta_list = lf_tools.get_station_list()
# # print("sta_list", sta_list)
# # lf_tools.admin_up_down(sta_list=sta_list, option="up")
# # station = lf_test.wait_for_ip(station=sta_list)
# # station_before = ""
# # station_list = []
# # for i in range(len(sta_list)):
# # station_list.append(sta_list[i].split(".")[2])
# # print(station_list)
# # if station:
# # lf_test.attach_stationdata_to_allure(name="staion info before roam", station_name=sta_list)
# # for i in station_list:
# # bssid = lf_tools.station_data_query(station_name=str(i), query="ap")
# # formated_bssid = bssid.lower()
# # if formated_bssid == c1_2g_bssid:
# # print("station connected to chamber1 ap")
# # station_before = formated_bssid
# # elif formated_bssid == c2_2g_bssid:
# # print("station connected to chamber 2 ap")
# # station_before = formated_bssid
# # # logic to decrease c1 attenuation and increase c2 attenuation
# # for atten_val1, atten_val2 in zip([0, 100, 300, 500, 750, 950], [950, 750, 500, 300, 100, 0]):
# # print(atten_val1)
# # print(atten_val2)
# # for i in range(4):
# # lf_test.attenuator_modify(int(ser_1), i, atten_val1)
# # lf_test.attenuator_modify(int(ser_2), i, atten_val2)
# # time.sleep(10)
# # lf_tools.admin_up_down(sta_list=station_list, option="down")
# # time.sleep(15)
# # lf_tools.admin_up_down(sta_list=station_list, option="up")
# # time.sleep(15)
# # for i in station_list:
# # bssid = lf_tools.station_data_query(station_name=str(i), query="ap")
# # station_after = bssid.lower()
# # if station_after == station_before:
# # continue
# # elif station_after != station_before:
# # print("client performed roam")
# # lf_test.attach_stationdata_to_allure(name="staion info after roam", station_name=i)
# # allure.attach(name="attenuation_data", body="ap1 was at attenuation value " + str(atten_val2) + "ddbm and ap2 was at attenuation value " + str(atten_val1) + "ddbm")
# # break
# #
# # else:
# # allure.attach(name="FAIL", body="stations failed to get ip")
# # assert False
# #
#
#
#
#
#
#
#
#
#
#
#

View File

@@ -97,15 +97,6 @@
}
},
"services": {
"wifi-steering": {
"mode": "local",
"network": "upstream",
"assoc-steering": true,
"required-snr": -85,
"required-probe-snr": -80,
"required-roam-snr": -80,
"load-kick-threshold": 90
},
"ssh": {
"port": 22
}

View File

@@ -10,7 +10,7 @@ import time
import copy
import requests
pytestmark = [pytest.mark.roam_test, pytest.mark.bridge, pytest.mark.roam_ota, pytest.mark.hard_roam_ota, pytest.mark.ow_regression_lf]
pytestmark = [pytest.mark.roam_test, pytest.mark.bridge, pytest.mark.hard_roam_ota, pytest.mark.ow_regression_lf]
# Get the directory of the current test config file
test_file_dir = os.path.dirname(os.path.abspath(__file__))
@@ -26,32 +26,44 @@ with open(file_path, 'r') as file:
@allure.parent_suite("Roam Test")
@allure.sub_suite("wpa2_enterprise")
@pytest.mark.roam
@pytest.mark.ota
@pytest.mark.wpa2_enterprise
class TestRoamOTA(object):
@pytest.mark.same_channel
@pytest.mark.twog
def test_roam_2g_to_2g_sc_wpa2_eap(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
@pytest.mark.tls
def test_roam_2g_to_2g_sc_wpa2_eap_tls(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
radius_info):
"""
Test Roaming between two APs, Same channel, 2G, WPA2 Enterprise
pytest -m "roam and twog and same_channel and wpa2_enterprise and ota"
pytest -m "hard_roam_ota and twog and same_channel and wpa2_enterprise and tls"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
testbed_info = get_lab_info.CONFIGURATION
config = copy.deepcopy(config_data)
temp_list = list()
for key, val in testbed_info.items():
tb_type, tb_name = selected_testbed.split("-")
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['interfaces'][0]["ssids"][0]["radius"] = {
"accounting": {
"host": radius_info["ip"],
@@ -65,20 +77,17 @@ class TestRoamOTA(object):
}
}
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}]
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa2"
if "key" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"].pop("key")
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -86,7 +95,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -115,7 +124,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -143,11 +152,13 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
pass_fail, message = True, "Test Passed"
twog_radio = list(get_test_library.get_radio_availabilities(num_stations_2g=1)[0].keys())[0]
logging.info(f"twog_radio from testcase:{twog_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_, twog_radio=twog_radio,
band="twog", num_sta=1, security="wpa2", ssid=ssid,
upstream="1.1.eth1", eap_method="TLS",
upstream="eth2", eap_method="TLS",
pairwise_cipher="DEFAULT ",
groupwise_cipher="DEFAULT ",
eap_identity=radius_info["user"],
@@ -169,14 +180,14 @@ class TestRoamOTA(object):
@pytest.mark.same_channel
@pytest.mark.fiveg
@pytest.mark.enterprise
def test_roam_5g_to_5g_sc_wpa2_eap(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
@pytest.mark.tls
def test_roam_5g_to_5g_sc_wpa2_eap_tls(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
radius_info):
"""
Test Roaming between two APs, Same channel, 5G, WPA2 Enterprise
pytest -m "roam and fiveg and same_channel and wpa2_enterprise and ota"
pytest -m "hard_roam_ota and fiveg and same_channel and wpa2_enterprise"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -188,7 +199,15 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['interfaces'][0]["ssids"][0]["radius"] = {
"accounting": {
"host": radius_info["ip"],
@@ -202,7 +221,7 @@ class TestRoamOTA(object):
}
}
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa2"
@@ -215,7 +234,7 @@ class TestRoamOTA(object):
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -223,7 +242,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}: {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -252,7 +271,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -280,9 +299,11 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
pass_fail, message = True, "Test Passed"
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_, fiveg_radio=fiveg_radio,
band="fiveg", num_sta=1, security="wpa2", ssid=ssid,
upstream="1.1.eth1", eap_method="TLS",
pairwise_cipher="DEFAULT ",
@@ -306,14 +327,15 @@ class TestRoamOTA(object):
@pytest.mark.twog
@pytest.mark.fiveg
@pytest.mark.enterprise
def test_roam_5g_and_2g_wpa2_eap(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
@pytest.mark.both
@pytest.mark.tls
def test_roam_5g_to_2g_wpa2_eap_tls(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
radius_info):
"""
Test Roaming between two APs, 2G & 5G, WPA2 Enterprise
pytest -m "roam and fiveg and twog and wpa2_enterprise and ota"
pytest -m "hard_roam_ota and fiveg and twog and wpa2_enterprise"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -325,7 +347,16 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['interfaces'][0]["ssids"][0]["radius"] = {
"accounting": {
"host": radius_info["ip"],
@@ -339,7 +370,7 @@ class TestRoamOTA(object):
}
}
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa2"
@@ -354,9 +385,9 @@ class TestRoamOTA(object):
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
if ap == 1:
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 20, "country": "CA"}]
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 20, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -364,7 +395,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -393,7 +424,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -421,10 +452,12 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
pass_fail, message = True, "Test Passed"
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"twog_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
band="both", num_sta=1, security="wpa2", ssid=ssid,
scan_freq=freqs_,fiveg_radio=fiveg_radio,
band="fiveg,twog", num_sta=1, security="wpa2", ssid=ssid,
upstream="1.1.eth1", eap_method="TLS",
pairwise_cipher="DEFAULT ",
groupwise_cipher="DEFAULT ",
@@ -442,5 +475,455 @@ class TestRoamOTA(object):
get_target_object.dut_library_object.get_dut_logs(print_log=False)
if not pass_fail:
pytest.fail(f"Test failed with the following reasons: \n{message}")
else:
assert True
@pytest.mark.same_channel
@pytest.mark.twog
@pytest.mark.ttls
def test_roam_2g_to_2g_sc_wpa2_eap_ttls(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
radius_info):
"""
Test Roaming between two APs, Same channel, 2G, WPA2 Enterprise
pytest -m "hard_roam_ota and twog and same_channel and wpa2_enterprise and ttls"
"""
ap_data = dict()
dut_names = list()
bssid_list = list()
freqs_ = ""
testbed_info = get_lab_info.CONFIGURATION
config = copy.deepcopy(config_data)
temp_list = list()
for key, val in testbed_info.items():
tb_type, tb_name = selected_testbed.split("-")
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['interfaces'][0]["ssids"][0]["radius"] = {
"accounting": {
"host": radius_info["ip"],
"port": radius_info["port"],
"secret": radius_info["secret"]
},
"authentication": {
"host": radius_info["ip"],
"port": radius_info["port"],
"secret": radius_info["secret"]
}
}
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa2"
if "key" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"].pop("key")
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
logging.info("Sending Command: " + "\n" + str(uri) + "\n" +
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on AP{ap + 1}:{serial_number}: ",
body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
"Headers: " + str(
get_target_object.controller_library_object.make_headers()))
resp = requests.post(uri, data=json.dumps(payload, indent=2),
headers=get_target_object.controller_library_object.make_headers(),
verify=False, timeout=120)
time.sleep(10)
logging.info(resp.json())
allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json()))
if resp.status_code != 200:
if resp.status_code == 400 and "Device is already executing a command. Please try later." in \
resp.json()["ErrorDescription"]:
time.sleep(30)
resp = requests.post(uri, data=json.dumps(payload, indent=2),
headers=get_target_object.controller_library_object.make_headers(),
verify=False, timeout=120)
time.sleep(10)
logging.info(resp.json())
else:
assert False, f"push configuration to {serial_number} got failed"
get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][
"device_under_tests"]
ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True)
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
# find all matches
interface_matches = re_obj.finditer(ap_iwinfo)
if interface_matches:
for match in interface_matches:
interface_name = match.group(1)
access_point = match.group(2)
channel = match.group(3)
frequency = match.group(4).replace('.', '')
ap_data.update(
{serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}})
logging.info(f"AP Data from iwinfo: {ap_data}")
if ap_data == {}:
logging.error("Failed to get required iwinfo from minicom")
pytest.fail("Failed to get required iwinfo from minicom")
else:
pytest.fail("Failed to get iwinfo from minicom")
for serial in ap_data:
bssid_list.append(ap_data[serial]['Access Point'])
if not ap_data[serial]['frequency'].endswith(","):
freqs_ = freqs_ + ap_data[serial]['frequency'] + ","
else:
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
pass_fail, message = True, "Test Passed"
twog_radio = list(get_test_library.get_radio_availabilities(num_stations_2g=1)[0].keys())[0]
logging.info(f"twog_radio from testcase:{twog_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_, twog_radio=twog_radio,
band="twog", num_sta=1, security="wpa2", ssid=ssid,
upstream="1.1.eth1", eap_method="TTLS",
pairwise_cipher="DEFAULT ",
groupwise_cipher="DEFAULT ",
eap_identity=radius_info["user"],
eap_password=radius_info["password"],
pk_passwd=radius_info["pk_password"],
sta_type="11r-eap", private_key="NA", ca_cert="NA",
iteration=1, channel="11", option="ota", dut_name=dut_names,
traffic_type="lf_udp")
except Exception as e:
logging.error(f"Exception in roam test : {e}")
pass_fail, message = False, e
finally:
get_target_object.dut_library_object.get_dut_logs(print_log=False)
if not pass_fail:
pytest.fail(f"Test failed with the following reasons: \n{message}")
else:
assert True
@pytest.mark.same_channel
@pytest.mark.fiveg
@pytest.mark.enterprise
@pytest.mark.ttls
def test_roam_5g_to_5g_sc_wpa2_eap_ttls(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
radius_info):
"""
Test Roaming between two APs, Same channel, 5G, WPA2 Enterprise
pytest -m "hard_roam_ota and fiveg and same_channel and wpa2_enterprise"
"""
ap_data = dict()
dut_names = list()
bssid_list = list()
freqs_ = ""
testbed_info = get_lab_info.CONFIGURATION
config = copy.deepcopy(config_data)
temp_list = list()
for key, val in testbed_info.items():
tb_type, tb_name = selected_testbed.split("-")
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['interfaces'][0]["ssids"][0]["radius"] = {
"accounting": {
"host": radius_info["ip"],
"port": radius_info["port"],
"secret": radius_info["secret"]
},
"authentication": {
"host": radius_info["ip"],
"port": radius_info["port"],
"secret": radius_info["secret"]
}
}
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa2"
if "key" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"].pop("key")
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
logging.info("Sending Command: " + "\n" + str(uri) + "\n" +
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on AP{ap + 1}: {serial_number}: ",
body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
"Headers: " + str(
get_target_object.controller_library_object.make_headers()))
resp = requests.post(uri, data=json.dumps(payload, indent=2),
headers=get_target_object.controller_library_object.make_headers(),
verify=False, timeout=120)
time.sleep(10)
logging.info(resp.json())
allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json()))
if resp.status_code != 200:
if resp.status_code == 400 and "Device is already executing a command. Please try later." in \
resp.json()["ErrorDescription"]:
time.sleep(30)
resp = requests.post(uri, data=json.dumps(payload, indent=2),
headers=get_target_object.controller_library_object.make_headers(),
verify=False, timeout=120)
time.sleep(10)
logging.info(resp.json())
else:
assert False, f"push configuration to {serial_number} got failed"
get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][
"device_under_tests"]
ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True)
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
# find all matches
interface_matches = re_obj.finditer(ap_iwinfo)
if interface_matches:
for match in interface_matches:
interface_name = match.group(1)
access_point = match.group(2)
channel = match.group(3)
frequency = match.group(4).replace('.', '')
ap_data.update(
{serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}})
logging.info(f"AP Data from iwinfo: {ap_data}")
if ap_data == {}:
logging.error("Failed to get required iwinfo from minicom")
pytest.fail("Failed to get required iwinfo from minicom")
else:
pytest.fail("Failed to get iwinfo from minicom")
for serial in ap_data:
bssid_list.append(ap_data[serial]['Access Point'])
if not ap_data[serial]['frequency'].endswith(","):
freqs_ = freqs_ + ap_data[serial]['frequency'] + ","
else:
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
pass_fail, message = True, "Test Passed"
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_, fiveg_radio=fiveg_radio,
band="fiveg", num_sta=1, security="wpa2", ssid=ssid,
upstream="1.1.eth1", eap_method="TTLS",
pairwise_cipher="DEFAULT ",
groupwise_cipher="DEFAULT ",
eap_identity=radius_info["user"],
eap_password=radius_info["password"],
pk_passwd=radius_info["pk_password"], sta_type="11r-eap",
iteration=1, channel="11", option="ota", dut_name=dut_names,
traffic_type="lf_udp")
except Exception as e:
logging.error(f"Exception in roam test : {e}")
pass_fail, message = False, e
finally:
get_target_object.dut_library_object.get_dut_logs(print_log=False)
if not pass_fail:
pytest.fail(f"Test failed with the following reasons: \n{message}")
else:
assert True
@pytest.mark.twog
@pytest.mark.fiveg
@pytest.mark.enterprise
@pytest.mark.both
@pytest.mark.ttls
def test_roam_5g_and_2g_wpa2_eap_ttls(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
radius_info):
"""
Test Roaming between two APs, 2G & 5G, WPA2 Enterprise
pytest -m "hard_roam_ota and fiveg and twog and wpa2_enterprise"
"""
ap_data = dict()
dut_names = list()
bssid_list = list()
freqs_ = ""
testbed_info = get_lab_info.CONFIGURATION
config = copy.deepcopy(config_data)
temp_list = list()
for key, val in testbed_info.items():
tb_type, tb_name = selected_testbed.split("-")
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['interfaces'][0]["ssids"][0]["radius"] = {
"accounting": {
"host": radius_info["ip"],
"port": radius_info["port"],
"secret": radius_info["secret"]
},
"authentication": {
"host": radius_info["ip"],
"port": radius_info["port"],
"secret": radius_info["secret"]
}
}
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa2"
if "key" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"].pop("key")
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
if ap == 1:
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 20, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
logging.info("Sending Command: " + "\n" + str(uri) + "\n" +
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on AP{ap + 1}:{serial_number}: ",
body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
"Headers: " + str(
get_target_object.controller_library_object.make_headers()))
resp = requests.post(uri, data=json.dumps(payload, indent=2),
headers=get_target_object.controller_library_object.make_headers(),
verify=False, timeout=120)
time.sleep(10)
logging.info(resp.json())
allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json()))
if resp.status_code != 200:
if resp.status_code == 400 and "Device is already executing a command. Please try later." in \
resp.json()["ErrorDescription"]:
time.sleep(30)
resp = requests.post(uri, data=json.dumps(payload, indent=2),
headers=get_target_object.controller_library_object.make_headers(),
verify=False, timeout=120)
time.sleep(10)
logging.info(resp.json())
else:
assert False, f"push configuration to {serial_number} got failed"
get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][
"device_under_tests"]
ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True)
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
# find all matches
interface_matches = re_obj.finditer(ap_iwinfo)
if interface_matches:
for match in interface_matches:
interface_name = match.group(1)
access_point = match.group(2)
channel = match.group(3)
frequency = match.group(4).replace('.', '')
ap_data.update(
{serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}})
logging.info(f"AP Data from iwinfo: {ap_data}")
if ap_data == {}:
logging.error("Failed to get required iwinfo from minicom")
pytest.fail("Failed to get required iwinfo from minicom")
else:
pytest.fail("Failed to get iwinfo from minicom")
for serial in ap_data:
bssid_list.append(ap_data[serial]['Access Point'])
if not ap_data[serial]['frequency'].endswith(","):
freqs_ = freqs_ + ap_data[serial]['frequency'] + ","
else:
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
pass_fail, message = True, "Test Passed"
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"twog_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_, fiveg_radio=fiveg_radio,
band="fiveg,twog", num_sta=1, security="wpa2", ssid=ssid,
upstream="1.1.eth1", eap_method="TTLS",
pairwise_cipher="DEFAULT ",
groupwise_cipher="DEFAULT ",
eap_identity=radius_info["user"],
eap_password=radius_info["password"],
pk_passwd=radius_info["pk_password"], sta_type="11r-eap",
iteration=1, channel="36", option="ota", dut_name=dut_names,
traffic_type="lf_udp")
except Exception as e:
logging.error(f"Exception in roam test : {e}")
pass_fail, message = False, e
finally:
get_target_object.dut_library_object.get_dut_logs(print_log=False)
if not pass_fail:
pytest.fail(f"Test failed with the following reasons: \n{message}")
else:
assert True

View File

@@ -10,7 +10,7 @@ import time
import copy
import requests
pytestmark = [pytest.mark.roam_test, pytest.mark.bridge, pytest.mark.roam_ota, pytest.mark.hard_roam_ota, pytest.mark.ow_regression_lf]
pytestmark = [pytest.mark.roam_test, pytest.mark.bridge, pytest.mark.hard_roam_ota, pytest.mark.ow_regression_lf]
# Get the directory of the current test config file
test_file_dir = os.path.dirname(os.path.abspath(__file__))
@@ -27,18 +27,15 @@ with open(file_path, 'r') as file:
@allure.sub_suite("wpa2_personal")
@pytest.mark.wpa2_personal
@pytest.mark.roam
@pytest.mark.ota
class TestRoamOTA(object):
@pytest.mark.same_channel
@pytest.mark.twog
def test_roam_2g_to_2g_sc_psk_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
def test_roam_2g_to_2g_sc_wpa2_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, Same channel, 2G, WPA2 Personal
pytest -m "roam and twog and same_channel and wpa2_personal and ota"
pytest -m "hard_roam_ota and twog and same_channel and wpa2_personal"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -50,7 +47,13 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}]
@@ -62,7 +65,7 @@ class TestRoamOTA(object):
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -70,7 +73,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -99,7 +102,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -130,9 +133,11 @@ class TestRoamOTA(object):
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
pass_fail, message = True, "Test Passed"
twog_radio = list(get_test_library.get_radio_availabilities(num_stations_2g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{twog_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_,twog_radio=twog_radio,
band="twog", num_sta=1, security="wpa2", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="11", option="ota", dut_name=dut_names,
@@ -149,13 +154,12 @@ class TestRoamOTA(object):
@pytest.mark.different_channel
@pytest.mark.twog
def test_roam_2g_to_2g_dc_psk_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
def test_roam_2g_to_2g_dc_wpa2_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, Different channel, 2G, WPA2 Personal
pytest -m "roam and twog and different_channel and wpa2_personal and ota"
pytest -m "hard_roam_ota and twog and different_channel and wpa2_personal"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -167,7 +171,15 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
@@ -181,7 +193,7 @@ class TestRoamOTA(object):
if ap == 1:
config['radios'] = [
{"band": "2G", "channel": 1, "channel-mode": "HE", "channel-width": 20, "country": "CA"}]
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -218,7 +230,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -248,9 +260,11 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
twog_radio = list(get_test_library.get_radio_availabilities(num_stations_2g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{twog_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_, twog_radio=twog_radio,
band="twog", num_sta=1, security="wpa2", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="1", option="ota", dut_name=dut_names,
@@ -267,13 +281,12 @@ class TestRoamOTA(object):
@pytest.mark.same_channel
@pytest.mark.fiveg
def test_roam_5g_to_5g_sc_psk_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
def test_roam_5g_to_5g_sc_wpa2_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, Same channel, 5G, WPA2 Personal
pytest -m "roam and fiveg and same_channel and wpa2_personal and ota"
pytest -m "hard_roam_ota and fiveg and same_channel and wpa2_personal"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -285,7 +298,15 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
@@ -296,7 +317,7 @@ class TestRoamOTA(object):
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -333,7 +354,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -364,9 +385,11 @@ class TestRoamOTA(object):
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
pass_fail, message = True, "Test Passed"
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_, fiveg_radio=fiveg_radio,
band="fiveg", num_sta=1, security="wpa2", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="36", option="ota", dut_name=dut_names,
@@ -383,13 +406,12 @@ class TestRoamOTA(object):
@pytest.mark.different_channel
@pytest.mark.fiveg
def test_roam_5g_to_5g_dc_psk_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
def test_roam_5g_to_5g_dc_wpa2_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, Different channel, 5G, WPA2 Personal
pytest -m "roam and fiveg and different_channel and wpa2_personal and ota"
pytest -m "hard_roam_ota and fiveg and different_channel and wpa2_personal"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -401,7 +423,14 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
@@ -415,7 +444,7 @@ class TestRoamOTA(object):
if ap == 1:
config['radios'] = [
{"band": "5G", "channel": 149, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -452,7 +481,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -482,9 +511,11 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_, fiveg_radio=fiveg_radio,
band="fiveg", num_sta=1, security="wpa2", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="36", option="ota", dut_name=dut_names,
@@ -501,13 +532,13 @@ class TestRoamOTA(object):
@pytest.mark.fiveg
@pytest.mark.twog
def test_roam_5g_and_2g_wpa2psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
@pytest.mark.both
def test_roam_5g_to_2g_wpa2_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, 2G & 5G, WPA2 Personal
pytest -m "roam and fiveg and twog and wpa2_personal and ota"
pytest -m "hard_roam_ota and fiveg and twog and wpa2_personal"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -519,7 +550,13 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list;;{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
@@ -527,6 +564,9 @@ class TestRoamOTA(object):
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
logging.info(f"---dut list: {dut_list}---")
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
@@ -534,7 +574,7 @@ class TestRoamOTA(object):
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 20, "country": "CA"}]
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -571,7 +611,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -590,7 +630,7 @@ class TestRoamOTA(object):
logging.error("Failed to get iwinfo")
pytest.exit("Failed to get iwinfo")
elif ap_iwinfo == {}:
pytest.fail("Empty iwinfo reponse from AP through minicom")
pytest.fail("Empty iwinfo response from AP through minicom")
else:
pytest.fail("Failed to get iwinfo from minicom")
for serial in ap_data:
@@ -601,10 +641,12 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
band="both", num_sta=2, security="wpa2", security_key=key,
scan_freq=freqs_, fiveg_radio=fiveg_radio,
band="fiveg,twog", num_sta=1, security="wpa2", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="36", option="ota", dut_name=dut_names,
traffic_type="lf_udp", sta_type="11r")

View File

@@ -10,7 +10,7 @@ import time
import copy
import requests
pytestmark = [pytest.mark.roam_test, pytest.mark.bridge, pytest.mark.roam_ota, pytest.mark.hard_roam_ota, pytest.mark.ow_regression_lf]
pytestmark = [pytest.mark.roam_test, pytest.mark.bridge, pytest.mark.hard_roam_ota, pytest.mark.ow_regression_lf]
# Get the directory of the current test config file
test_file_dir = os.path.dirname(os.path.abspath(__file__))
@@ -28,19 +28,17 @@ with open(file_path, 'r') as file:
@allure.sub_suite("wpa3_personal")
@pytest.mark.wpa3_personal
@pytest.mark.roam
@pytest.mark.sae
@pytest.mark.ota
class TestRoamOTA(object):
@pytest.mark.same_channel
@pytest.mark.twog
def test_roam_2g_to_2g_sc_psk_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
@pytest.mark.sae
def test_roam_2g_to_2g_sc_wpa3_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, Same channel, 2G, WPA3 Personal
pytest -m "roam and twog and same_channel and wpa3_personal and ota"
pytest -m "hard_roam_ota and twog and same_channel and wpa3_personal"
"""
get_test_library.check_band_ap("twog")
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -52,7 +50,14 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}]
# change ssid config data to sae
@@ -65,7 +70,7 @@ class TestRoamOTA(object):
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -73,7 +78,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -102,7 +107,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -133,9 +138,11 @@ class TestRoamOTA(object):
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
pass_fail, message = True, "Test Passed"
twog_radio = list(get_test_library.get_radio_availabilities(num_stations_2g=1)[0].keys())[0]
logging.info(f"twog_radio from testcase:{twog_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_, twog_radio=twog_radio,
band="twog", num_sta=1, security="wpa3", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="11", option="ota", dut_name=dut_names,
@@ -152,13 +159,13 @@ class TestRoamOTA(object):
@pytest.mark.same_channel
@pytest.mark.fiveg
def test_roam_5g_to_5g_sc_psk_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
@pytest.mark.sae
def test_roam_5g_to_5g_sc_wpa3_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, Same channel, 5G, WPA3 Personal
pytest -m "roam and fiveg and same_channel and wpa3_personal and ota"
pytest -m "hard_roam_ota and fiveg and same_channel and wpa3_personal"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -170,7 +177,14 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
# change ssid security type to sae
@@ -183,7 +197,7 @@ class TestRoamOTA(object):
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -191,7 +205,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -220,7 +234,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -251,9 +265,11 @@ class TestRoamOTA(object):
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
pass_fail, message = True, "Test Passed"
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_, fiveg_radio=fiveg_radio,
band="fiveg", num_sta=1, security="wpa3", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="11", option="ota", dut_name=dut_names,
@@ -270,13 +286,14 @@ class TestRoamOTA(object):
@pytest.mark.same_channel
@pytest.mark.sixg
def test_roam_6g_to_6g_sc_psk_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
@pytest.mark.sae
def test_roam_6g_to_6g_sc_wpa3_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, Same channel, 6G, WPA3 Personal
pytest -m "roam and sixg and same_channel and wpa3_personal and ota"
pytest -m "hard_roam_ota and sixg and same_channel and wpa3_personal"
"""
band = "sixg"
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -288,27 +305,45 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['radios'] = [
{
"band": "6G",
"channel": 161,
"channel-mode": "HE",
"channel-width": 80,
"country": "US"
}
]
# change wifi-band and security type to sae
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"]
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae"
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
logging.info(f"temp_list:{temp_list}")
ap_modes = {ap: testbed_info[ap]["device_under_tests"][0]["mode"].lower() for ap in temp_list}
wifi7 = [ap for ap, m in ap_modes.items() if "wifi7" in m]
wifi6e = [ap for ap, m in ap_modes.items() if "wifi6e" in m]
wifi6 = [ap for ap, m in ap_modes.items() if "wifi6" in m and "wifi6e" not in m]
logging.info(f"Available APs by type -> WiFi7: {wifi7}, WiFi6E: {wifi6e}, WiFi6: {wifi6}")
try:
if band == "sixg":
sixg_aps = wifi7 + wifi6e
if len(sixg_aps) < 2:
raise ValueError("Not enough 6GHz-capable APs")
dut_list = sixg_aps[:2]
else:
ap2 = (wifi7 or wifi6e or [None])[0] if "sixg" in band else None
ap1 = (wifi6 or wifi7 or wifi6e or [None])[0] if any(b in band for b in ["twog", "fiveg"]) else None
if not ap1 or (("sixg" in band) and not ap2):
raise ValueError("Required APs not found")
dut_list = [ap for ap in [ap1, ap2] if ap]
except ValueError as e:
logging.warning(f"No available APs satisfy the required band '{band}': {e}")
pytest.skip(f"No available APs satisfy the required band '{band}': {e}")
dut_list = []
logging.info(f"Selected DUTs for band='{band}': {dut_list}")
logging.info(f"---dut list: {dut_list}---")
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
logging.info(config)
ap_mode = testbed_info[dut_list[ap]]["device_under_tests"][0]['mode']
logging.info(f"ap_mode:{ap_mode}")
channel_mode = "EHT" if "wifi7" in ap_mode else "HE"
config['radios'] = [
{"band": "6G", "channel": 161, "channel-mode": channel_mode, "channel-width": 80, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"]
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae"
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -316,7 +351,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -345,7 +380,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -374,9 +409,11 @@ class TestRoamOTA(object):
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
pass_fail, message = True, "Test Passed"
sixg_radio = list(get_test_library.get_radio_availabilities(num_stations_6g=1)[0].keys())[0]
logging.info(f"sixg_radio from testcase:{sixg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
scan_freq=freqs_, sixg_radio=sixg_radio,
band="sixg", num_sta=1, security="wpa3", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="161", option="ota",
@@ -394,13 +431,13 @@ class TestRoamOTA(object):
@pytest.mark.fiveg
@pytest.mark.twog
def test_roam_5g_and_2g_wpa3psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
@pytest.mark.both
def test_roam_5g_to_2g_wpa3_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, 2G & 5G, WPA3 Personal
pytest -m "roam and fiveg and twog and wpa3_personal and ota"
pytest -m "hard_roam_ota and fiveg and twog and wpa3_personal"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -412,7 +449,14 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
dut_list = []
idx = temp_list.index(selected_testbed)
dut_list = [temp_list[idx]]
if idx + 1 < len(temp_list):
dut_list.append(temp_list[idx + 1])
logging.info(f"---dut list: {dut_list}---")
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
@@ -429,7 +473,7 @@ class TestRoamOTA(object):
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 20, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -437,7 +481,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -466,7 +510,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -496,13 +540,15 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
fiveg_radio = list(get_test_library.get_radio_availabilities(num_stations_5g=1)[0].keys())[0]
logging.info(f"fiveg_radio from testcase:{fiveg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
band="both", num_sta=2, security="wpa2", security_key=key,
scan_freq=freqs_, fiveg_radio=fiveg_radio,
band="fiveg,twog", num_sta=1, security="wpa2", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="36", option="ota", dut_name=dut_names,
traffic_type="lf_udp", sta_type="11r")
traffic_type="lf_udp", sta_type="11r-sae")
except Exception as e:
logging.error(f"Exception in roam test : {e}")
pass_fail, message = False, e
@@ -515,11 +561,13 @@ class TestRoamOTA(object):
@pytest.mark.fiveg
@pytest.mark.sixg
def test_roam_5g_and_6g_wpa3psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
@pytest.mark.both
def test_roam_5g_to_6g_wpa3_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, 5G & 6G, WPA3 Personal
pytest -m "roam and fiveg and sixg and wpa3_personal and ota"
pytest -m "hard_roam_ota and fiveg and sixg and wpa3_personal"
"""
band = "fiveg,sixg"
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
@@ -532,25 +580,51 @@ class TestRoamOTA(object):
tb_type, tb_name = selected_testbed.split("-")
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
logging.info(f"temp_list::{temp_list}")
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
ap_modes = {ap: testbed_info[ap]["device_under_tests"][0]["mode"].lower() for ap in temp_list}
wifi7 = [ap for ap, m in ap_modes.items() if "wifi7" in m]
wifi6e = [ap for ap, m in ap_modes.items() if "wifi6e" in m]
wifi6 = [ap for ap, m in ap_modes.items() if "wifi6" in m and "wifi6e" not in m]
logging.info(f"Available APs by type -> WiFi7: {wifi7}, WiFi6E: {wifi6e}, WiFi6: {wifi6}")
try:
if band == "sixg":
sixg_aps = wifi7 + wifi6e
if len(sixg_aps) < 2:
raise ValueError("Not enough 6GHz-capable APs")
dut_list = sixg_aps[:2]
else:
ap2 = (wifi7 or wifi6e or [None])[0] if "sixg" in band else None
ap1 = (wifi6 or wifi7 or wifi6e or [None])[0] if any(b in band for b in ["twog", "fiveg"]) else None
if not ap1 or (("sixg" in band) and not ap2):
raise ValueError("Required APs not found")
dut_list = [ap for ap in [ap1, ap2] if ap]
except ValueError as e:
logging.warning(f"No available APs satisfy the required band '{band}': {e}")
dut_list = []
logging.info(f"Selected DUTs for band='{band}': {dut_list}")
logging.info(f"---dut list: {dut_list}---")
# dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['radios'] = [
{"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"]
# change ssid config data to sae
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae"
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
if ap == 1:
ap_mode = testbed_info[dut_list[ap]]["device_under_tests"][0]['mode']
logging.info(f"ap_mode:{ap_mode}")
channel_mode = "EHT" if "wifi7" in ap_mode else "HE"
config['radios'] = [
{"band": "6G", "channel": 161, "channel-mode": "HE", "channel-width": 80, "country": "US"}]
{"band": "6G", "channel": 161, "channel-mode": channel_mode, "channel-width": 80, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"]
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -558,7 +632,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -587,7 +661,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -617,13 +691,15 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
sixg_radio = list(get_test_library.get_radio_availabilities(num_stations_6g=1)[0].keys())[0]
logging.info(f"sixg_radio from testcase:{sixg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
band="both", num_sta=2, security="wpa3", security_key=key,
scan_freq=freqs_, sixg_radio=sixg_radio,
band="fiveg,sixg", num_sta=1, security="wpa3", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="36", option="ota", dut_name=dut_names,
traffic_type="lf_udp", sta_type="11r")
traffic_type="lf_udp", sta_type="11r-sae")
except Exception as e:
logging.error(f"Exception in roam test : {e}")
pass_fail, message = False, e
@@ -636,13 +712,14 @@ class TestRoamOTA(object):
@pytest.mark.two
@pytest.mark.sixg
def test_roam_2g_and_6g_wpa3psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
@pytest.mark.both
def test_roam_2g_to_6g_wpa3_psk(self, get_target_object, get_test_library, get_lab_info, selected_testbed):
"""
Test Roaming between two APs, 2G & 6G, WPA3 Personal
pytest -m "roam and two and sixg and wpa3_personal and ota"
pytest -m "hard_roam_ota and two and sixg and wpa3_personal"
"""
band = "twog,sixg"
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
@@ -654,23 +731,48 @@ class TestRoamOTA(object):
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
logging.info(f"temp_list:{temp_list}")
ap_modes = {ap: testbed_info[ap]["device_under_tests"][0]["mode"].lower() for ap in temp_list}
wifi7 = [ap for ap, m in ap_modes.items() if "wifi7" in m]
wifi6e = [ap for ap, m in ap_modes.items() if "wifi6e" in m]
wifi6 = [ap for ap, m in ap_modes.items() if "wifi6" in m and "wifi6e" not in m]
logging.info(f"Available APs by type -> WiFi7: {wifi7}, WiFi6E: {wifi6e}, WiFi6: {wifi6}")
try:
if band == "sixg":
sixg_aps = wifi7 + wifi6e
if len(sixg_aps) < 2:
raise ValueError("Not enough 6GHz-capable APs")
dut_list = sixg_aps[:2]
else:
ap2 = (wifi7 or wifi6e or [None])[0] if "sixg" in band else None
ap1 = (wifi6 or wifi7 or wifi6e or [None])[0] if any(b in band for b in ["twog", "fiveg"]) else None
if not ap1 or (("sixg" in band) and not ap2):
raise ValueError("Required APs not found")
dut_list = [ap for ap in [ap1, ap2] if ap]
except ValueError as e:
logging.warning(f"No available APs satisfy the required band '{band}': {e}")
dut_list = []
logging.info(f"Selected DUTs for band='{band}': {dut_list}")
logging.info(f"-----dut_list---:{dut_list}")
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 20, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
# change ssid config data to sae
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae"
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
if ap == 1:
config['radios'] = [{"band": "6G", "channel": 161, "channel-mode": "HE", "channel-width": 80, "country": "US"}]
ap_mode = testbed_info[dut_list[ap]]["device_under_tests"][0]['mode']
logging.info(f"ap_mode:{ap_mode}")
channel_mode = "EHT" if "wifi7" in ap_mode else "HE"
config['radios'] = [
{"band": "6G", "channel": 161, "channel-mode": channel_mode, "channel-width": 80, "country": "US"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"]
logging.info(config)
logging.info(f"config:{config}")
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
@@ -678,7 +780,7 @@ class TestRoamOTA(object):
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
allure.attach(name=f"Push roam config on AP{ap+1}:{serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
@@ -707,7 +809,7 @@ class TestRoamOTA(object):
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
rf'([a-zA-Z0-9-]+)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
@@ -737,13 +839,15 @@ class TestRoamOTA(object):
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
key = config['interfaces'][0]["ssids"][0]["encryption"]["key"]
sixg_radio = list(get_test_library.get_radio_availabilities(num_stations_6g=1)[0].keys())[0]
logging.info(f"sixg_radio from testcase:{sixg_radio}")
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
scan_freq=freqs_,
band="both", num_sta=2, security="wpa3", security_key=key,
scan_freq=freqs_, sixg_radio=sixg_radio,
band="twog,sixg", num_sta=1, security="wpa3", security_key=key,
ssid=ssid, upstream="1.1.eth1", duration=None,
iteration=1, channel="11", option="ota", dut_name=dut_names,
traffic_type="lf_udp", sta_type="11r")
traffic_type="lf_udp", sta_type="11r-sae")
except Exception as e:
logging.error(f"Exception in roam test : {e}")
pass_fail, message = False, e