Merge pull request #338 from Telecominfraproject/wifi-4857

Wifi 4857
This commit is contained in:
Shivam Thakur
2022-01-12 13:57:18 +05:30
committed by GitHub
6 changed files with 435 additions and 40 deletions

View File

@@ -384,6 +384,28 @@ class UProfileUtility:
}
self.mode = None
def set_mesh_services(self):
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24"
self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100
del self.base_profile_config['metrics']['wifi-frames']
del self.base_profile_config['metrics']['dhcp-snooping']
var = {
"filters": ["probe",
"auth"]
}
self.base_profile_config["metrics"]['wifi-frames'] = var
del self.base_profile_config['services']
var2 = {
"lldp":{
"describe": "uCentral",
"location": "universe"
},
"ssh" : {
"port" : 22
}
}
self.base_profile_config['services'] = var2
def set_express_wifi(self, open_flow=None):
if self.mode == "NAT":
self.base_profile_config["interfaces"][0]["services"] = ["lldp", "ssh"]
@@ -479,13 +501,25 @@ class UProfileUtility:
self.vlan_section["ssids"] = []
self.vlan_ids = []
def set_mode(self, mode):
def set_mode(self, mode, mesh=False):
self.mode = mode
if mode == "NAT":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
self.base_profile_config['interfaces'][1]['ssids'] = []
elif mode == "BRIDGE":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
self.base_profile_config['interfaces'][0]['ssids'] = []
elif mode == "VLAN":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
del self.base_profile_config['interfaces'][1]
self.base_profile_config['interfaces'][0]['ssids'] = []
self.base_profile_config['interfaces'] = []

View File

@@ -158,7 +158,7 @@ class ChamberView:
def Chamber_View(self):
if self.delete_old_scenario:
self.CreateChamberview.clean_cv_scenario(cv_type="Network-Connectivity", scenario_name=self.scenario_name)
self.CreateChamberview.clean_cv_scenario(scenario_name=self.scenario_name)
self.CreateChamberview.setup(create_scenario=self.scenario_name,
raw_line=self.raw_line
)
@@ -219,7 +219,7 @@ class ChamberView:
if num_stations > 64:
num_stations = int(num_stations / len(self.fiveg_radios))
for radio in self.fiveg_radios:
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
station_dataupstream_port_1 = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
print(station_data)
@@ -378,34 +378,62 @@ class ChamberView:
name=i,
attachment_type="image/png", extension=None)
def create_mesh(self):
upstream_list = []
for data in range(0,len(self.access_point_data)):
def create_mesh_scenario(self):
# upstream_list = []
# for data in range(0,len(self.access_point_data)):
self.CreateDut = DUT(lfmgr=self.lanforge_ip,
port=self.lanforge_port,
dut_name="upstream" + str(data))
dut_name="upstream" ) # + str(data))
self.CreateDut.lan_port = "10.28.2.1/24"
name = "upstream" + str(data)
upstream_list.append(name)
# name = "upstream" + str(data)
# upstream_list.append(name)
self.CreateDut.setup()
data = data + 1
# data = data + 1
self.raw_line = [
["profile_link " + self.upstream_resource_2 + " upstream-dhcp 1 NA NA " + self.upstream_port_2.split(".")[2] + ",AUTO -1 NA"],
["profile_link " + self.uplink_resource_2 + " uplink-nat 1 'DUT: " + str(upstream_list[0]) + " LAN " + self.upstream_subnet + "' NA " + self.uplink_port_2.split(".")[2] + "," + self.upstream_port_2.split(".")[2] + " -1 NA"],
["profile_link " + self.uplink_resource_2 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_2.split(".")[2] + "," + self.upstream_port_2.split(".")[2] + " -1 NA"],
["profile_link " + self.upstream_resource_3 + " upstream-dhcp 1 NA NA " + self.upstream_port_3.split(".")[2] + ",AUTO -1 NA"],
["profile_link " + self.uplink_resource_3 + " uplink-nat 1 'DUT: " + str(upstream_list[1]) + " LAN " + self.upstream_subnet + "' NA " + self.uplink_port_3.split(".")[2] + "," + self.upstream_port_3.split(".")[2] + " -1 NA"],
["profile_link " + self.uplink_resource_3 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_3.split(".")[2] + "," + self.upstream_port_3.split(".")[2] + " -1 NA"],
["profile_link " + self.upstream_resource_4 + " upstream-dhcp 1 NA NA " + self.upstream_port_4.split(".")[2] + ",AUTO -1 NA"],
["profile_link " + self.uplink_resource_4 + " uplink-nat 1 'DUT: " + str(upstream_list[2]) + " LAN " + self.upstream_subnet + "' NA " + self.uplink_port_4.split(".")[2] + "," + self.upstream_port_4.split(".")[2] + " -1 NA"]
["profile_link " + self.uplink_resource_4 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_4.split(".")[2] + "," + self.upstream_port_4.split(".")[2] + " -1 NA"]
]
print(self.raw_line)
mesh = self.Chamber_View()
return mesh
def create_mesh_dut(self):
for ap in self.access_point_data:
# print(ap)
self.dut_name = ap["type"]
def create_mesh_scenario_dhcp_disable(self):
# upstream_list = []
# for data in range(0,len(self.access_point_data)):
self.CreateDut = DUT(lfmgr=self.lanforge_ip,
port=self.lanforge_port,
dut_name="upstream" )
self.CreateDut.lan_port = "10.28.2.1/24"
# name = "upstream" + str(data)
# upstream_list.append(name)
self.CreateDut.setup()
# data = data + 1
self.raw_line = [
["profile_link " + self.upstream_resource_2 + " upstream-dhcp 1 NA NA " + self.upstream_port_2.split(".")[2] + ",AUTO -1 NA"],
["profile_link " + self.uplink_resource_2 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_2.split(".")[2] + "," + self.upstream_port_2.split(".")[2] + " -1 NA"],
["profile_link " + self.upstream_resource_3 + " upstream 1 NA NA " + self.upstream_port_3.split(".")[2] + ",AUTO -1 NA"],
["profile_link " + self.uplink_resource_3 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_3.split(".")[2] + "," + self.upstream_port_3.split(".")[2] + " -1 NA"],
["profile_link " + self.upstream_resource_4 + " upstream 1 NA NA " + self.upstream_port_4.split(".")[2] + ",AUTO -1 NA"],
["profile_link " + self.uplink_resource_4 + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet + "' NA " + self.uplink_port_4.split(".")[2] + "," + self.upstream_port_4.split(".")[2] + " -1 NA"]
]
print(self.raw_line)
mesh = self.Chamber_View()
return mesh
def create_mesh_dut(self, ssid_data=None):
print("hi ssid data", ssid_data)
for ap, ssid in zip(self.access_point_data, range(len(ssid_data))):
print("ap", ap)
print(ssid)
print(ssid_data[ssid])
self.dut_name = "tip-" + str(ap["type"])
print(self.dut_name)
self.ap_model = ap["model"]
self.version = ap["version"].split("/")[-1]
@@ -418,6 +446,8 @@ class ChamberView:
serial_num=self.serial
)
self.Create_Dut()
# [['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58']]
self.update_ssid(ssid_data=ssid_data[ssid])
def set_radio_antenna(self, req_url, shelf, resources, radio, antenna):
data = {

View File

@@ -247,7 +247,9 @@ def get_configuration(testbed, request):
version = request.config.getini("firmware")
version_list = version.split(",")
for i in range(len(CONFIGURATION[testbed]["access_point"])):
CONFIGURATION[testbed]["access_point"][i]["version"] = version_list[i]
print("i", i)
print(version_list)
CONFIGURATION[testbed]["access_point"][i]["version"] = version_list[0]
yield CONFIGURATION[testbed]

View File

@@ -47,19 +47,31 @@ def create_lanforge_chamberview_dut(lf_tools):
yield dut_name
@pytest.fixture(scope="session")
def setup_mesh(lf_tools):
mesh_obj = lf_tools.create_mesh()
def setup_mesh_scenario(lf_tools):
mesh_obj = lf_tools.create_mesh_scenario()
yield mesh_obj
@pytest.fixture(scope="session")
def create_lanforge_chamberview_dut(lf_tools, skip_lf):
dut_name = ""
if not skip_lf:
dut_object, dut_name = lf_tools.Create_Dut()
return dut_name
@pytest.fixture(scope="session")
def create_mesh_dut(lf_tools):
mesh_dut = lf_tools.create_mesh_dut()
yield True
def create_mesh_dut(lf_tools, skip_lf, ssid_data):
dut_name = ""
if not skip_lf:
mesh_dut_object, dut_name = lf_tools.create_mesh_dut(ssid_data=ssid_data)
yield dut_name
@pytest.fixture(scope="class")
def setup_mesh_profile_fix(fixtures_ver, get_apnos, get_configuration):
ret_var = fixtures_ver.setup_mesh_profile(get_apnos, get_configuration)
def setup_mesh_profile_fix(request, fixtures_ver, get_apnos, get_configuration, setup_controller, instantiate_profile,get_markers, get_equipment_ref,
lf_tools, ):
param = dict(request.param)
ret_var = fixtures_ver.setup_mesh_profile(request, param, get_apnos, get_configuration, setup_controller, instantiate_profile, get_markers, get_equipment_ref,
lf_tools, skip_lf=False, open_flow=None
)
yield ret_var

View File

@@ -0,0 +1,32 @@
import pytest
import allure
pytestmark = [pytest.mark.mesh, pytest.mark.bridge]
setup_params_general = {
"mode": "BRIDGE",
"ssid_modes": {
"wpa2_personal": [
{"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something"},
{"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security_key": "something"}
]
},
"mesh": "yes",
"rf": {},
"radius": False
}
@allure.feature("MESH BASIC")
@pytest.mark.parametrize(
'setup_mesh_profile_fix',
[setup_params_general],
indirect=True,
scope="class"
)
# @pytest.mark.usefixtures("setup_profiles")
class TestMesh(object):
@pytest.mark.wpa2_personal
def testmesh(self, setup_mesh_profile_fix):
#setup_mesh_scenario):
pass

View File

@@ -696,10 +696,6 @@ class Fixtures_2x:
try:
iwinfo = ap_ssh.iwinfo()
allure.attach(name="iwinfo: ", body=str(iwinfo))
# tx_power, name = ap_ssh.gettxpower()
# allure.attach(name="interface name: ", body=str(name))
# allure.attach(name="tx power: ", body=str(tx_power))
except:
pass
ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name)
@@ -712,6 +708,7 @@ class Fixtures_2x:
try:
ssid_info_sdk = instantiate_profile_obj.get_ssid_info()
ap_wifi_data = ap_ssh.get_iwinfo()
for p in ap_wifi_data:
@@ -756,24 +753,312 @@ class Fixtures_2x:
request.addfinalizer(teardown_session)
return test_cases
# comment
def setup_mesh_profile(self, get_apnos, get_configuration):
def setup_mesh_profile(self, request, param, get_apnos, get_configuration, setup_controller, instantiate_profile,
get_markers, get_equipment_ref, lf_tools, skip_lf=False, open_flow=None):
instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller)
print(1, instantiate_profile_obj.sdk_client)
vlan_id, mode = 0, 0
parameter = dict(param)
print("parameter", parameter)
print(list(parameter["ssid_modes"])[0])
ssid_info_sdk = instantiate_profile_obj.get_ssid_info()
print(ssid_info_sdk)
test_cases = {}
profile_data = {}
var = ""
if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]:
print("Invalid Mode: ", parameter['mode'])
return test_cases
instantiate_profile_obj.set_radio_config()
if parameter['mode'] == "NAT":
mode = "NAT"
instantiate_profile_obj.set_mode(mode=mode, mesh=True)
vlan_id = 1
if parameter['mode'] == "BRIDGE":
mode = "BRIDGE"
instantiate_profile_obj.set_mode(mode=mode, mesh=True)
vlan_id = 1
if parameter['mode'] == "VLAN":
mode = "VLAN"
instantiate_profile_obj.set_mode(mode=mode, mesh=True)
profile_data["ssid"] = {}
for i in parameter["ssid_modes"]:
profile_data["ssid"][i] = []
for j in range(len(parameter["ssid_modes"][i])):
data = parameter["ssid_modes"][i][j]
profile_data["ssid"][i].append(data)
lf_dut_data = []
for mode in profile_data['ssid']:
if mode == "open":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'none'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa_2g"] = False
if mode == "wpa":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'psk'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa_2g"] = False
if mode == "wpa2_personal":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'psk2'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
try:
if parameter['mesh']:
print("yes")
instantiate_profile_obj.set_mesh_services()
except Exception as e:
pass
dut_data = []
ssid_data = []
# this will return configuration of your testbed from tests/conftest.py get_configuration fixtures
print("get configuration",get_configuration)
print("get configuration", get_configuration)
print(len(get_configuration['access_point']))
# print(get_configuration['access_point'])
for length in range(0,len(get_configuration['access_point'])):
print(get_configuration['access_point'])
for length in range(0, len(get_configuration['access_point'])):
ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x")
connected, latest, active = ap_ssh.get_ucentral_status()
print("connected", connected)
print("latest",latest)
print("latest", latest)
print("active", active)
latest_old = latest
if connected == False:
output = ap_ssh.run_generic_command(cmd="ubus call ucentral status")
allure.attach(name="ubus call ucentral status: ", body=str(output))
pytest.exit("AP is disconnected from UC Gateway")
if latest != active:
allure.attach(name="FAIL : ubus call ucentral status: ",
body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(
active))
ap_logs = ap_ssh.logread()
allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ")
pytest.fail("AP is disconnected from UC Gateway")
S = 10
# Add logger command before config push
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S))
ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name)
time_1 = time.time()
# Apply config
print("get equipment id ref ", get_equipment_ref)
print("get equipment id ref [0]", get_equipment_ref[length])
instantiate_profile_obj.push_config(serial_number=get_equipment_ref[length])
config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"'))
config["uuid"] = 0
# Attach the config that is sent from API
allure.attach(name="Config Sent from API: ", body=str(config), attachment_type=allure.attachment_type.JSON)
ap_config_latest = ap_ssh.get_uc_latest_config()
try:
ap_config_latest["uuid"] = 0
except Exception as e:
print(e)
pass
x = 1
# Check if ucentral gw has pushed the config into latest
connected, latest, active = ap_ssh.get_ucentral_status()
while latest_old == latest:
time.sleep(5)
x += 1
print("old config: ", latest_old)
print("latest: ", latest)
connected, latest, active = ap_ssh.get_ucentral_status()
if x == 5:
break
onnected, latest, active = ap_ssh.get_ucentral_status()
if latest == latest_old:
latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
allure.attach(name="Latest Config Received by AP: ",
body=str(latest_cfg),
attachment_type=allure.attachment_type.JSON)
ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
stop_ref="stop testcase: " + instance_name)
allure.attach(body=ap_logs, name="AP Log: ")
print("Config from ucentral gw is not sent to AP")
else:
print("Config is sent to AP from ucentral gw")
x = 1
latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
allure.attach(name="Latest Config Received by AP: ",
body=str(latest_cfg),
attachment_type=allure.attachment_type.JSON)
while active != latest:
connected, latest, active = ap_ssh.get_ucentral_status()
time.sleep(20)
x += 1
print("active: ", active)
print("latest: ", latest)
if x == 10:
break
connected, latest, active = ap_ssh.get_ucentral_status()
if latest == active:
print("Config properly Applied on AP")
else:
print("Config is not Applied on AP")
time_2 = time.time()
time_interval = time_2 - time_1
allure.attach(name="Time Took to apply Config: " + str(time_interval), body="")
time.sleep(60)
ap_config_latest = ap_ssh.get_uc_latest_config()
ap_config_active = ap_ssh.get_uc_active_config()
if x < 19:
print("AP is Broadcasting Applied Config")
allure.attach(name="Success : Active Config in AP: ", body=str(ap_config_active))
else:
print("AP is Not Broadcasting Applied Config")
allure.attach(name="Failed to Apply Config : Active Config in AP : ", body=str(ap_config_active))
time.sleep(10)
try:
iwinfo = ap_ssh.iwinfo()
allure.attach(name="iwinfo: ", body=str(iwinfo))
except:
pass
ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name)
ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
stop_ref="stop testcase: " + instance_name)
allure.attach(body=ap_logs, name="AP Log: ")
wifi_status = ap_ssh.get_wifi_status()
allure.attach(name="wifi status", body=str(wifi_status))
try:
ssid_info_sdk = instantiate_profile_obj.get_ssid_info()
print("ssid_info_sdk", ssid_info_sdk)
ap_wifi_data = ap_ssh.get_iwinfo()
print("ap_wifi_data", ap_wifi_data)
print(type(ap_wifi_data))
for p in ap_wifi_data:
print(p)
for q in ssid_info_sdk:
if ap_wifi_data[p][0] == q[0] and ap_wifi_data[p][2] == q[3]:
q.append(ap_wifi_data[p][1])
idx_mapping = {}
print(ssid_info_sdk)
dut_data.append(ssid_info_sdk)
except Exception as e:
print(e)
pass
print("ssid_data", ssid_data)
print("dut", dut_data)
dut_ssid_data = []
dut_final_data = []
dut_1 = []
dut_2 = []
dut_3 = []
for dut in range(len(dut_data)):
for interface in range(len(dut_data[dut])):
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + dut_data[dut][interface][0] +
" security=" + dut_data[dut][interface][1].upper() +
" password=" + dut_data[dut][interface][2] +
" bssid=" + dut_data[dut][interface][4].lower()
]
#print(ssid)
dut_ssid_data.append(ssid)
# print("dut ssid data", dut_ssid_data)
dut_1.append(dut_ssid_data[0])
dut_1.append(dut_ssid_data[1])
dut_2.append(dut_ssid_data[2])
dut_2.append(dut_ssid_data[3])
dut_3.append(dut_ssid_data[4])
dut_3.append(dut_ssid_data[5])
dut_final_data.append(dut_1)
dut_final_data.append(dut_2)
dut_final_data.append(dut_3)
print("dut_final_data", dut_final_data)
# dut_final_data = [[['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:4a:84'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:4a:7c']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:49:0d'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:49:05']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=90:3c:b3:9d:69:36'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=90:3c:b3:9d:69:2e']]]
# dut creation for mesh
# dut_ssid_data = [[['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:4a:84'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:4a:7c']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:49:0d'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:49:05']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=90:3c:b3:9d:69:36'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=90:3c:b3:9d:69:2e']]]
create_dut = lf_tools.create_mesh_dut(ssid_data=dut_final_data)
#create mesh scenario
mesh_scenario = lf_tools.create_mesh_scenario()
#check for all ap are connected and is pinging
for length in range(0, len(get_configuration['access_point'])):
ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x")
print("checking again if all ap's are connected and able to reach internet")
connected, latest, active = ap_ssh.get_ucentral_status()
print("connected", connected)
print("latest", latest)
print("active", active)
latest_old = latest
if connected == False:
output = ap_ssh.run_generic_command(cmd="ubus call ucentral status")
allure.attach(name="ubus call ucentral status: ", body=str(output))
pytest.exit("AP is disconnected from UC Gateway")
if latest != active:
allure.attach(name="FAIL : ubus call ucentral status: ", body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(active))
allure.attach(name="FAIL : ubus call ucentral status: ",
body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(active))
ap_logs = ap_ssh.logread()
allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ")
pytest.fail("AP is disconnected from UC Gateway")
#create a mesh scenario with dhcp disable option to node-1 and node-2
dhcp_dis = lf_tools.create_mesh_scenario_dhcp_disable()