diff --git a/libs/controller/controller_2x/controller.py b/libs/controller/controller_2x/controller.py index 50156da39..01115b450 100644 --- a/libs/controller/controller_2x/controller.py +++ b/libs/controller/controller_2x/controller.py @@ -384,6 +384,28 @@ class UProfileUtility: } self.mode = None + def set_mesh_services(self): + self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24" + self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100 + del self.base_profile_config['metrics']['wifi-frames'] + del self.base_profile_config['metrics']['dhcp-snooping'] + var = { + "filters": ["probe", + "auth"] + } + self.base_profile_config["metrics"]['wifi-frames'] = var + del self.base_profile_config['services'] + var2 = { + "lldp":{ + "describe": "uCentral", + "location": "universe" + }, + "ssh" : { + "port" : 22 + } + } + self.base_profile_config['services'] = var2 + def set_express_wifi(self, open_flow=None): if self.mode == "NAT": self.base_profile_config["interfaces"][0]["services"] = ["lldp", "ssh"] @@ -479,13 +501,25 @@ class UProfileUtility: self.vlan_section["ssids"] = [] self.vlan_ids = [] - def set_mode(self, mode): + def set_mode(self, mode, mesh=False): self.mode = mode if mode == "NAT": + if mesh: + self.base_profile_config['interfaces'][0]['tunnel'] = { + "proto": "mesh" + } self.base_profile_config['interfaces'][1]['ssids'] = [] elif mode == "BRIDGE": + if mesh: + self.base_profile_config['interfaces'][0]['tunnel'] = { + "proto": "mesh" + } self.base_profile_config['interfaces'][0]['ssids'] = [] elif mode == "VLAN": + if mesh: + self.base_profile_config['interfaces'][0]['tunnel'] = { + "proto": "mesh" + } del self.base_profile_config['interfaces'][1] self.base_profile_config['interfaces'][0]['ssids'] = [] self.base_profile_config['interfaces'] = [] diff --git a/libs/lanforge/lf_tools.py b/libs/lanforge/lf_tools.py index c80452002..6f52bc31c 100644 --- a/libs/lanforge/lf_tools.py +++ b/libs/lanforge/lf_tools.py @@ -158,7 +158,7 @@ class ChamberView: def Chamber_View(self): if self.delete_old_scenario: - self.CreateChamberview.clean_cv_scenario(cv_type="Network-Connectivity", scenario_name=self.scenario_name) + self.CreateChamberview.clean_cv_scenario(scenario_name=self.scenario_name) self.CreateChamberview.setup(create_scenario=self.scenario_name, raw_line=self.raw_line ) @@ -219,7 +219,7 @@ class ChamberView: if num_stations > 64: num_stations = int(num_stations / len(self.fiveg_radios)) for radio in self.fiveg_radios: - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + station_dataupstream_port_1 = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] print(station_data) @@ -378,34 +378,62 @@ class ChamberView: name=i, attachment_type="image/png", extension=None) - def create_mesh(self): - upstream_list = [] - for data in range(0,len(self.access_point_data)): - self.CreateDut = DUT(lfmgr=self.lanforge_ip, - port=self.lanforge_port, - dut_name="upstream" + str(data)) - self.CreateDut.lan_port = "10.28.2.1/24" - name = "upstream" + str(data) - upstream_list.append(name) + def create_mesh_scenario(self): + # upstream_list = [] + # for data in range(0,len(self.access_point_data)): + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name="upstream" ) # + str(data)) + self.CreateDut.lan_port = "10.28.2.1/24" + # name = "upstream" + str(data) + # upstream_list.append(name) - self.CreateDut.setup() - data = data + 1 + self.CreateDut.setup() + # data = data + 1 self.raw_line = [ ["profile_link " + self.upstream_resource_2 + " upstream-dhcp 1 NA NA " + self.upstream_port_2.split(".")[2] + ",AUTO -1 NA"], - ["profile_link " + self.uplink_resource_2 + " uplink-nat 1 'DUT: " + str(upstream_list[0]) + " LAN " + self.upstream_subnet + "' NA " + self.uplink_port_2.split(".")[2] + "," + self.upstream_port_2.split(".")[2] + " -1 NA"], + ["profile_link " + self.uplink_resource_2 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_2.split(".")[2] + "," + self.upstream_port_2.split(".")[2] + " -1 NA"], ["profile_link " + self.upstream_resource_3 + " upstream-dhcp 1 NA NA " + self.upstream_port_3.split(".")[2] + ",AUTO -1 NA"], - ["profile_link " + self.uplink_resource_3 + " uplink-nat 1 'DUT: " + str(upstream_list[1]) + " LAN " + self.upstream_subnet + "' NA " + self.uplink_port_3.split(".")[2] + "," + self.upstream_port_3.split(".")[2] + " -1 NA"], + ["profile_link " + self.uplink_resource_3 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_3.split(".")[2] + "," + self.upstream_port_3.split(".")[2] + " -1 NA"], ["profile_link " + self.upstream_resource_4 + " upstream-dhcp 1 NA NA " + self.upstream_port_4.split(".")[2] + ",AUTO -1 NA"], - ["profile_link " + self.uplink_resource_4 + " uplink-nat 1 'DUT: " + str(upstream_list[2]) + " LAN " + self.upstream_subnet + "' NA " + self.uplink_port_4.split(".")[2] + "," + self.upstream_port_4.split(".")[2] + " -1 NA"] + ["profile_link " + self.uplink_resource_4 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_4.split(".")[2] + "," + self.upstream_port_4.split(".")[2] + " -1 NA"] ] print(self.raw_line) mesh = self.Chamber_View() return mesh - def create_mesh_dut(self): - for ap in self.access_point_data: - # print(ap) - self.dut_name = ap["type"] + def create_mesh_scenario_dhcp_disable(self): + + # upstream_list = [] + # for data in range(0,len(self.access_point_data)): + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name="upstream" ) + self.CreateDut.lan_port = "10.28.2.1/24" + # name = "upstream" + str(data) + # upstream_list.append(name) + + self.CreateDut.setup() + # data = data + 1 + self.raw_line = [ + ["profile_link " + self.upstream_resource_2 + " upstream-dhcp 1 NA NA " + self.upstream_port_2.split(".")[2] + ",AUTO -1 NA"], + ["profile_link " + self.uplink_resource_2 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_2.split(".")[2] + "," + self.upstream_port_2.split(".")[2] + " -1 NA"], + ["profile_link " + self.upstream_resource_3 + " upstream 1 NA NA " + self.upstream_port_3.split(".")[2] + ",AUTO -1 NA"], + ["profile_link " + self.uplink_resource_3 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_3.split(".")[2] + "," + self.upstream_port_3.split(".")[2] + " -1 NA"], + ["profile_link " + self.upstream_resource_4 + " upstream 1 NA NA " + self.upstream_port_4.split(".")[2] + ",AUTO -1 NA"], + ["profile_link " + self.uplink_resource_4 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_4.split(".")[2] + "," + self.upstream_port_4.split(".")[2] + " -1 NA"] + ] + print(self.raw_line) + mesh = self.Chamber_View() + return mesh + + def create_mesh_dut(self, ssid_data=None): + print("hi ssid data", ssid_data) + for ap, ssid in zip(self.access_point_data, range(len(ssid_data))): + print("ap", ap) + print(ssid) + print(ssid_data[ssid]) + self.dut_name = "tip-" + str(ap["type"]) print(self.dut_name) self.ap_model = ap["model"] self.version = ap["version"].split("/")[-1] @@ -418,6 +446,8 @@ class ChamberView: serial_num=self.serial ) self.Create_Dut() + # [['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58']] + self.update_ssid(ssid_data=ssid_data[ssid]) def set_radio_antenna(self, req_url, shelf, resources, radio, antenna): data = { diff --git a/tests/conftest.py b/tests/conftest.py index 1a4af6daa..3b2e5596f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -247,7 +247,9 @@ def get_configuration(testbed, request): version = request.config.getini("firmware") version_list = version.split(",") for i in range(len(CONFIGURATION[testbed]["access_point"])): - CONFIGURATION[testbed]["access_point"][i]["version"] = version_list[i] + print("i", i) + print(version_list) + CONFIGURATION[testbed]["access_point"][i]["version"] = version_list[0] yield CONFIGURATION[testbed] diff --git a/tests/e2e/mesh/conftest.py b/tests/e2e/mesh/conftest.py index ac0028dd5..4bbeef321 100644 --- a/tests/e2e/mesh/conftest.py +++ b/tests/e2e/mesh/conftest.py @@ -47,19 +47,31 @@ def create_lanforge_chamberview_dut(lf_tools): yield dut_name @pytest.fixture(scope="session") -def setup_mesh(lf_tools): - mesh_obj = lf_tools.create_mesh() +def setup_mesh_scenario(lf_tools): + mesh_obj = lf_tools.create_mesh_scenario() yield mesh_obj +@pytest.fixture(scope="session") +def create_lanforge_chamberview_dut(lf_tools, skip_lf): + dut_name = "" + if not skip_lf: + dut_object, dut_name = lf_tools.Create_Dut() + return dut_name @pytest.fixture(scope="session") -def create_mesh_dut(lf_tools): - mesh_dut = lf_tools.create_mesh_dut() - yield True +def create_mesh_dut(lf_tools, skip_lf, ssid_data): + dut_name = "" + if not skip_lf: + mesh_dut_object, dut_name = lf_tools.create_mesh_dut(ssid_data=ssid_data) + yield dut_name @pytest.fixture(scope="class") -def setup_mesh_profile_fix(fixtures_ver, get_apnos, get_configuration): - ret_var = fixtures_ver.setup_mesh_profile(get_apnos, get_configuration) +def setup_mesh_profile_fix(request, fixtures_ver, get_apnos, get_configuration, setup_controller, instantiate_profile,get_markers, get_equipment_ref, + lf_tools, ): + param = dict(request.param) + ret_var = fixtures_ver.setup_mesh_profile(request, param, get_apnos, get_configuration, setup_controller, instantiate_profile, get_markers, get_equipment_ref, + lf_tools, skip_lf=False, open_flow=None + ) yield ret_var diff --git a/tests/e2e/mesh/test_mesh.py b/tests/e2e/mesh/test_mesh.py new file mode 100644 index 000000000..15f0bf6ba --- /dev/null +++ b/tests/e2e/mesh/test_mesh.py @@ -0,0 +1,32 @@ +import pytest +import allure + +pytestmark = [pytest.mark.mesh, pytest.mark.bridge] + + +setup_params_general = { + "mode": "BRIDGE", + "ssid_modes": { + "wpa2_personal": [ + {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something"}, + {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security_key": "something"} + ] + }, + "mesh": "yes", + "rf": {}, + "radius": False +} +@allure.feature("MESH BASIC") +@pytest.mark.parametrize( + 'setup_mesh_profile_fix', + [setup_params_general], + indirect=True, + scope="class" +) +# @pytest.mark.usefixtures("setup_profiles") +class TestMesh(object): + + @pytest.mark.wpa2_personal + def testmesh(self, setup_mesh_profile_fix): + #setup_mesh_scenario): + pass \ No newline at end of file diff --git a/tests/fixtures_2x.py b/tests/fixtures_2x.py index b4cd7af35..5d0d17023 100644 --- a/tests/fixtures_2x.py +++ b/tests/fixtures_2x.py @@ -696,10 +696,6 @@ class Fixtures_2x: try: iwinfo = ap_ssh.iwinfo() allure.attach(name="iwinfo: ", body=str(iwinfo)) - - # tx_power, name = ap_ssh.gettxpower() - # allure.attach(name="interface name: ", body=str(name)) - # allure.attach(name="tx power: ", body=str(tx_power)) except: pass ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name) @@ -712,6 +708,7 @@ class Fixtures_2x: try: ssid_info_sdk = instantiate_profile_obj.get_ssid_info() + ap_wifi_data = ap_ssh.get_iwinfo() for p in ap_wifi_data: @@ -756,24 +753,312 @@ class Fixtures_2x: request.addfinalizer(teardown_session) return test_cases - # comment - def setup_mesh_profile(self, get_apnos, get_configuration): + def setup_mesh_profile(self, request, param, get_apnos, get_configuration, setup_controller, instantiate_profile, + get_markers, get_equipment_ref, lf_tools, skip_lf=False, open_flow=None): + + instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller) + print(1, instantiate_profile_obj.sdk_client) + vlan_id, mode = 0, 0 + parameter = dict(param) + print("parameter", parameter) + print(list(parameter["ssid_modes"])[0]) + ssid_info_sdk = instantiate_profile_obj.get_ssid_info() + print(ssid_info_sdk) + test_cases = {} + profile_data = {} + var = "" + if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]: + print("Invalid Mode: ", parameter['mode']) + return test_cases + instantiate_profile_obj.set_radio_config() + if parameter['mode'] == "NAT": + mode = "NAT" + instantiate_profile_obj.set_mode(mode=mode, mesh=True) + vlan_id = 1 + if parameter['mode'] == "BRIDGE": + mode = "BRIDGE" + instantiate_profile_obj.set_mode(mode=mode, mesh=True) + vlan_id = 1 + if parameter['mode'] == "VLAN": + mode = "VLAN" + instantiate_profile_obj.set_mode(mode=mode, mesh=True) + + profile_data["ssid"] = {} + + for i in parameter["ssid_modes"]: + profile_data["ssid"][i] = [] + for j in range(len(parameter["ssid_modes"][i])): + data = parameter["ssid_modes"][i][j] + profile_data["ssid"][i].append(data) + lf_dut_data = [] + + for mode in profile_data['ssid']: + if mode == "open": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'none' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa_2g"] = False + if mode == "wpa": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'psk' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa_2g"] = False + if mode == "wpa2_personal": + for j in profile_data["ssid"][mode]: + + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'psk2' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa2_personal"] = False + + try: + if parameter['mesh']: + print("yes") + instantiate_profile_obj.set_mesh_services() + except Exception as e: + pass + + + dut_data = [] + ssid_data = [] # this will return configuration of your testbed from tests/conftest.py get_configuration fixtures - print("get configuration",get_configuration) + print("get configuration", get_configuration) print(len(get_configuration['access_point'])) - # print(get_configuration['access_point']) - for length in range(0,len(get_configuration['access_point'])): + print(get_configuration['access_point']) + for length in range(0, len(get_configuration['access_point'])): ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x") connected, latest, active = ap_ssh.get_ucentral_status() print("connected", connected) - print("latest",latest) + print("latest", latest) print("active", active) + latest_old = latest if connected == False: + output = ap_ssh.run_generic_command(cmd="ubus call ucentral status") + allure.attach(name="ubus call ucentral status: ", body=str(output)) + pytest.exit("AP is disconnected from UC Gateway") + + if latest != active: + allure.attach(name="FAIL : ubus call ucentral status: ", + body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str( + active)) + ap_logs = ap_ssh.logread() + allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ") + pytest.fail("AP is disconnected from UC Gateway") + + S = 10 + + # Add logger command before config push + instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) + ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name) + + time_1 = time.time() + + # Apply config + print("get equipment id ref ", get_equipment_ref) + print("get equipment id ref [0]", get_equipment_ref[length]) + instantiate_profile_obj.push_config(serial_number=get_equipment_ref[length]) + + config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"')) + config["uuid"] = 0 + + # Attach the config that is sent from API + allure.attach(name="Config Sent from API: ", body=str(config), attachment_type=allure.attachment_type.JSON) + + ap_config_latest = ap_ssh.get_uc_latest_config() + try: + ap_config_latest["uuid"] = 0 + except Exception as e: + print(e) + pass + x = 1 + + # Check if ucentral gw has pushed the config into latest + connected, latest, active = ap_ssh.get_ucentral_status() + while latest_old == latest: + time.sleep(5) + x += 1 + print("old config: ", latest_old) + print("latest: ", latest) + connected, latest, active = ap_ssh.get_ucentral_status() + if x == 5: + break + onnected, latest, active = ap_ssh.get_ucentral_status() + if latest == latest_old: + latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest)) + allure.attach(name="Latest Config Received by AP: ", + body=str(latest_cfg), + attachment_type=allure.attachment_type.JSON) + ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, + stop_ref="stop testcase: " + instance_name) + allure.attach(body=ap_logs, name="AP Log: ") + print("Config from ucentral gw is not sent to AP") + else: + print("Config is sent to AP from ucentral gw") + + x = 1 + latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest)) + allure.attach(name="Latest Config Received by AP: ", + body=str(latest_cfg), + attachment_type=allure.attachment_type.JSON) + + while active != latest: + connected, latest, active = ap_ssh.get_ucentral_status() + time.sleep(20) + x += 1 + print("active: ", active) + print("latest: ", latest) + if x == 10: + break + + connected, latest, active = ap_ssh.get_ucentral_status() + if latest == active: + print("Config properly Applied on AP") + else: + print("Config is not Applied on AP") + + time_2 = time.time() + time_interval = time_2 - time_1 + allure.attach(name="Time Took to apply Config: " + str(time_interval), body="") + + time.sleep(60) + + ap_config_latest = ap_ssh.get_uc_latest_config() + ap_config_active = ap_ssh.get_uc_active_config() + if x < 19: + print("AP is Broadcasting Applied Config") + allure.attach(name="Success : Active Config in AP: ", body=str(ap_config_active)) + + else: + print("AP is Not Broadcasting Applied Config") + allure.attach(name="Failed to Apply Config : Active Config in AP : ", body=str(ap_config_active)) + time.sleep(10) + + try: + iwinfo = ap_ssh.iwinfo() + allure.attach(name="iwinfo: ", body=str(iwinfo)) + except: + pass + ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name) + ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, + stop_ref="stop testcase: " + instance_name) + allure.attach(body=ap_logs, name="AP Log: ") + + wifi_status = ap_ssh.get_wifi_status() + allure.attach(name="wifi status", body=str(wifi_status)) + + try: + ssid_info_sdk = instantiate_profile_obj.get_ssid_info() + print("ssid_info_sdk", ssid_info_sdk) + ap_wifi_data = ap_ssh.get_iwinfo() + print("ap_wifi_data", ap_wifi_data) + print(type(ap_wifi_data)) + + for p in ap_wifi_data: + print(p) + for q in ssid_info_sdk: + if ap_wifi_data[p][0] == q[0] and ap_wifi_data[p][2] == q[3]: + q.append(ap_wifi_data[p][1]) + + + idx_mapping = {} + print(ssid_info_sdk) + dut_data.append(ssid_info_sdk) + + except Exception as e: + print(e) + pass + print("ssid_data", ssid_data) + print("dut", dut_data) + dut_ssid_data = [] + dut_final_data = [] + dut_1 = [] + dut_2 = [] + dut_3 = [] + for dut in range(len(dut_data)): + for interface in range(len(dut_data[dut])): + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + dut_data[dut][interface][0] + + " security=" + dut_data[dut][interface][1].upper() + + " password=" + dut_data[dut][interface][2] + + " bssid=" + dut_data[dut][interface][4].lower() + ] + #print(ssid) + dut_ssid_data.append(ssid) + # print("dut ssid data", dut_ssid_data) + dut_1.append(dut_ssid_data[0]) + dut_1.append(dut_ssid_data[1]) + dut_2.append(dut_ssid_data[2]) + dut_2.append(dut_ssid_data[3]) + dut_3.append(dut_ssid_data[4]) + dut_3.append(dut_ssid_data[5]) + dut_final_data.append(dut_1) + dut_final_data.append(dut_2) + dut_final_data.append(dut_3) + print("dut_final_data", dut_final_data) + # dut_final_data = [[['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:4a:84'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:4a:7c']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:49:0d'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:49:05']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=90:3c:b3:9d:69:36'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=90:3c:b3:9d:69:2e']]] + + # dut creation for mesh + # dut_ssid_data = [[['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:4a:84'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:4a:7c']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:49:0d'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:49:05']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=90:3c:b3:9d:69:36'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=90:3c:b3:9d:69:2e']]] + create_dut = lf_tools.create_mesh_dut(ssid_data=dut_final_data) + + #create mesh scenario + mesh_scenario = lf_tools.create_mesh_scenario() + + #check for all ap are connected and is pinging + for length in range(0, len(get_configuration['access_point'])): + ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x") + print("checking again if all ap's are connected and able to reach internet") + connected, latest, active = ap_ssh.get_ucentral_status() + print("connected", connected) + print("latest", latest) + print("active", active) + latest_old = latest + if connected == False: + output = ap_ssh.run_generic_command(cmd="ubus call ucentral status") + allure.attach(name="ubus call ucentral status: ", body=str(output)) pytest.exit("AP is disconnected from UC Gateway") if latest != active: - allure.attach(name="FAIL : ubus call ucentral status: ", body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(active)) + allure.attach(name="FAIL : ubus call ucentral status: ", + body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(active)) ap_logs = ap_ssh.logread() allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ") pytest.fail("AP is disconnected from UC Gateway") + #create a mesh scenario with dhcp disable option to node-1 and node-2 + dhcp_dis = lf_tools.create_mesh_scenario_dhcp_disable() + + + + + +