added dut functionality

Signed-off-by: Nikita Yadav <nikita.yadav@candelatech.com>
This commit is contained in:
Nikita Yadav
2021-12-28 17:51:21 +05:30
parent 32ee2d2177
commit d4e689ab55

View File

@@ -822,449 +822,180 @@ class Fixtures_2x:
print(e)
test_cases["wpa2_personal"] = False
dut = []
dut_data = []
ssid_data = []
# this will return configuration of your testbed from tests/conftest.py get_configuration fixtures
print("get configuration", get_configuration)
print(len(get_configuration['access_point']))
# print(get_configuration['access_point'])
for length in range(0, len(get_configuration['access_point'])):
ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x")
connected, latest, active = ap_ssh.get_ucentral_status()
print("connected", connected)
print("latest", latest)
print("active", active)
latest_old = latest
if connected == False:
pytest.exit("AP is disconnected from UC Gateway")
if latest != active:
allure.attach(name="FAIL : ubus call ucentral status: ",
body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(
active))
ap_logs = ap_ssh.logread()
allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ")
pytest.fail("AP is disconnected from UC Gateway")
S = 10
# Add logger command before config push
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S))
ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name)
time_1 = time.time()
# Apply config
print("get equipment id ref ", get_equipment_ref)
print("get equipment id ref [0]", get_equipment_ref[length])
instantiate_profile_obj.push_config(serial_number=get_equipment_ref[length])
config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"'))
config["uuid"] = 0
# Attach the config that is sent from API
allure.attach(name="Config Sent from API: ", body=str(config), attachment_type=allure.attachment_type.JSON)
ap_config_latest = ap_ssh.get_uc_latest_config()
try:
ap_config_latest["uuid"] = 0
except Exception as e:
print(e)
pass
x = 1
# Check if ucentral gw has pushed the config into latest
connected, latest, active = ap_ssh.get_ucentral_status()
while latest_old == latest:
time.sleep(5)
x += 1
print("old config: ", latest_old)
print("latest: ", latest)
connected, latest, active = ap_ssh.get_ucentral_status()
if x == 5:
break
onnected, latest, active = ap_ssh.get_ucentral_status()
if latest == latest_old:
latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
allure.attach(name="Latest Config Received by AP: ",
body=str(latest_cfg),
attachment_type=allure.attachment_type.JSON)
ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
stop_ref="stop testcase: " + instance_name)
allure.attach(body=ap_logs, name="AP Log: ")
print("Config from ucentral gw is not sent to AP")
else:
print("Config is sent to AP from ucentral gw")
x = 1
latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
allure.attach(name="Latest Config Received by AP: ",
body=str(latest_cfg),
attachment_type=allure.attachment_type.JSON)
while active != latest:
connected, latest, active = ap_ssh.get_ucentral_status()
time.sleep(20)
x += 1
print("active: ", active)
print("latest: ", latest)
if x == 10:
break
connected, latest, active = ap_ssh.get_ucentral_status()
if latest == active:
print("Config properly Applied on AP")
else:
print("Config is not Applied on AP")
time_2 = time.time()
time_interval = time_2 - time_1
allure.attach(name="Time Took to apply Config: " + str(time_interval), body="")
time.sleep(60)
ap_config_latest = ap_ssh.get_uc_latest_config()
ap_config_active = ap_ssh.get_uc_active_config()
if x < 19:
print("AP is Broadcasting Applied Config")
allure.attach(name="Success : Active Config in AP: ", body=str(ap_config_active))
else:
print("AP is Not Broadcasting Applied Config")
allure.attach(name="Failed to Apply Config : Active Config in AP : ", body=str(ap_config_active))
time.sleep(10)
try:
iwinfo = ap_ssh.iwinfo()
allure.attach(name="iwinfo: ", body=str(iwinfo))
# tx_power, name = ap_ssh.gettxpower()
# allure.attach(name="interface name: ", body=str(name))
# allure.attach(name="tx power: ", body=str(tx_power))
except:
pass
ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name)
ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
stop_ref="stop testcase: " + instance_name)
allure.attach(body=ap_logs, name="AP Log: ")
wifi_status = ap_ssh.get_wifi_status()
allure.attach(name="wifi status", body=str(wifi_status))
try:
ssid_info_sdk = instantiate_profile_obj.get_ssid_info()
print("ssid_info_sdk", ssid_info_sdk)
ap_wifi_data = ap_ssh.get_iwinfo()
print("ap_wifi_data", ap_wifi_data)
print(type(ap_wifi_data))
for p in ap_wifi_data:
print(p)
for q in ssid_info_sdk:
if ap_wifi_data[p][0] == q[0] and ap_wifi_data[p][2] == q[3]:
q.append(ap_wifi_data[p][1])
ssid_data = []
idx_mapping = {}
print(ssid_info_sdk)
for interface in range(len(ssid_info_sdk)):
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + ssid_info_sdk[interface][0] +
" security=" + ssid_info_sdk[interface][1].upper() +
" password=" + ssid_info_sdk[interface][2] +
" bssid=" + ssid_info_sdk[interface][4].lower()
]
print("ssid", ssid)
idx_mapping[str(interface)] = [ssid_info_sdk[interface][0],
ssid_info_sdk[interface][2],
ssid_info_sdk[interface][1],
ssid_info_sdk[interface][3],
ssid_info_sdk[interface][4].lower()
]
print("idx mapping", idx_mapping[str(interface)])
ssid_data.append(ssid)
print("ssid_data", ssid_data)
lf_tools.ssid_list.append(ssid_info_sdk[interface][0])
print("ssid_data", ssid_data)
if not skip_lf:
lf_tools.dut_idx_mapping = idx_mapping
lf_tools.update_ssid(ssid_data=ssid_data)
except Exception as e:
print(e)
pass
# def teardown_session():
# for length in range(0, len(get_configuration['access_point'])):
# ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x")
# connected, latest, active = ap_ssh.get_ucentral_status()
# print("connected", connected)
# print("latest", latest)
# print("active", active)
# latest_old = latest
# if connected == False:
# pytest.exit("AP is disconnected from UC Gateway")
# if latest != active:
# allure.attach(name="FAIL : ubus call ucentral status: ",
# body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(
# active))
# ap_logs = ap_ssh.logread()
# allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ")
# pytest.fail("AP is disconnected from UC Gateway")
#
# S = 10
#
# # Add logger command before config push
# instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S))
# ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name)
#
# time_1 = time.time()
#
# # Apply config
# print("get equipment id ref ", get_equipment_ref)
# print("get equipment id ref [0]", get_equipment_ref[length])
# instantiate_profile_obj.push_config(serial_number=get_equipment_ref[length])
#
# config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"'))
# config["uuid"] = 0
#
# # Attach the config that is sent from API
# allure.attach(name="Config Sent from API: ", body=str(config), attachment_type=allure.attachment_type.JSON)
#
# ap_config_latest = ap_ssh.get_uc_latest_config()
# try:
# ap_config_latest["uuid"] = 0
# except Exception as e:
# print(e)
# pass
# x = 1
#
# # Check if ucentral gw has pushed the config into latest
# connected, latest, active = ap_ssh.get_ucentral_status()
# while latest_old == latest:
# time.sleep(5)
# x += 1
# print("old config: ", latest_old)
# print("latest: ", latest)
# connected, latest, active = ap_ssh.get_ucentral_status()
# if x == 5:
# break
# onnected, latest, active = ap_ssh.get_ucentral_status()
# if latest == latest_old:
# latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
# allure.attach(name="Latest Config Received by AP: ",
# body=str(latest_cfg),
# attachment_type=allure.attachment_type.JSON)
# ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
# stop_ref="stop testcase: " + instance_name)
# allure.attach(body=ap_logs, name="AP Log: ")
# print("Config from ucentral gw is not sent to AP")
# else:
# print("Config is sent to AP from ucentral gw")
#
# x = 1
# latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
# allure.attach(name="Latest Config Received by AP: ",
# body=str(latest_cfg),
# attachment_type=allure.attachment_type.JSON)
#
# while active != latest:
# connected, latest, active = ap_ssh.get_ucentral_status()
# time.sleep(20)
# x += 1
# print("active: ", active)
# print("latest: ", latest)
# if x == 10:
# break
#
# connected, latest, active = ap_ssh.get_ucentral_status()
# if latest == active:
# print("Config properly Applied on AP")
# else:
# print("Config is not Applied on AP")
#
# time_2 = time.time()
# time_interval = time_2 - time_1
# allure.attach(name="Time Took to apply Config: " + str(time_interval), body="")
#
# time.sleep(60)
#
# ap_config_latest = ap_ssh.get_uc_latest_config()
# ap_config_active = ap_ssh.get_uc_active_config()
# if x < 19:
# print("AP is Broadcasting Applied Config")
# allure.attach(name="Success : Active Config in AP: ", body=str(ap_config_active))
#
# else:
# print("AP is Not Broadcasting Applied Config")
# allure.attach(name="Failed to Apply Config : Active Config in AP : ", body=str(ap_config_active))
# time.sleep(10)
#
# try:
# iwinfo = ap_ssh.iwinfo()
# allure.attach(name="iwinfo: ", body=str(iwinfo))
# except:
# pass
# ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name)
# ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
# stop_ref="stop testcase: " + instance_name)
# allure.attach(body=ap_logs, name="AP Log: ")
#
# wifi_status = ap_ssh.get_wifi_status()
# allure.attach(name="wifi status", body=str(wifi_status))
#
# iwinfo = ap_ssh.iwinfo()
# allure.attach(name="iwinfo: ", body=str(iwinfo))
# try:
# ssid_info_sdk = instantiate_profile_obj.get_ssid_info()
# print("ssid_info_sdk", ssid_info_sdk)
# ap_wifi_data = ap_ssh.get_iwinfo()
# print("ap_wifi_data", ap_wifi_data)
# print(type(ap_wifi_data))
#
# print("\nTeardown")
# for p in ap_wifi_data:
# print(p)
# for q in ssid_info_sdk:
# if ap_wifi_data[p][0] == q[0] and ap_wifi_data[p][2] == q[3]:
# q.append(ap_wifi_data[p][1])
#
#
# idx_mapping = {}
# print(ssid_info_sdk)
# dut_data.append(ssid_info_sdk)
#
# except Exception as e:
# print(e)
# pass
# print("ssid_data", ssid_data)
# print("dut", dut_data)
# dut_ssid_data = []
# dut_final_data = []
# for dut in range(len(dut_data)):
# for interface in range(len(dut_data[dut])):
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + dut_data[dut][interface][0] +
# " security=" + dut_data[dut][interface][1].upper() +
# " password=" + dut_data[dut][interface][2] +
# " bssid=" + dut_data[dut][interface][4].lower()
# ]
# print(ssid)
# dut_ssid_data.append(ssid)
# dut_final_data.append(dut_ssid_data)
# print("dut ssid data", dut_ssid_data)
#dut creation for mesh
dut_ssid_data = [[['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:4a:84'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:4a:7c']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:49:0d'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:49:05']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=90:3c:b3:9d:69:36'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=90:3c:b3:9d:69:2e']]]
create_dut = lf_tools.create_mesh_dut(ssid_data=dut_ssid_data)
#create mesh scenario
mesh_scenario = lf_tools.create_mesh_scenario()
#check for all ap are connected and is pinging
#create a mesh scenario with dhcp disable option to node-1 and node-2
# request.addfinalizer(teardown_session)
# return test_cases
# def setup_mesh_profile(self, request, param, get_apnos, get_configuration, setup_controller, instantiate_profile, get_markers,get_equipment_ref, lf_tools, skip_lf=False, open_flow=None):
# instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller)
# print(1, instantiate_profile_obj.sdk_client)
# vlan_id, mode = 0, 0
# parameter = dict(param)
# print("parameter", parameter)
# print(list(parameter["ssid_modes"])[0])
# ssid_info_sdk = instantiate_profile_obj.get_ssid_info()
# print(ssid_info_sdk)
# test_cases = {}
# profile_data = {}
# var = ""
# if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]:
# print("Invalid Mode: ", parameter['mode'])
# return test_cases
# instantiate_profile_obj.set_radio_config()
# if parameter['mode'] == "NAT":
# mode = "NAT"
# instantiate_profile_obj.set_mode(mode=mode)
# vlan_id = 1
# if parameter['mode'] == "BRIDGE":
# mode = "BRIDGE"
# instantiate_profile_obj.set_mode(mode=mode, mesh=True)
# vlan_id = 1
# if parameter['mode'] == "VLAN":
# mode = "VLAN"
# instantiate_profile_obj.set_mode(mode=mode)
#
# profile_data["ssid"] = {}
#
# for i in parameter["ssid_modes"]:
# profile_data["ssid"][i] = []
# for j in range(len(parameter["ssid_modes"][i])):
# data = parameter["ssid_modes"][i][j]
# profile_data["ssid"][i].append(data)
# lf_dut_data = []
#
# for mode in profile_data['ssid']:
# if mode == "open":
# for j in profile_data["ssid"][mode]:
# if mode in get_markers.keys() and get_markers[mode]:
# try:
# if j["appliedRadios"].__contains__("2G"):
# lf_dut_data.append(j)
# if j["appliedRadios"].__contains__("5G"):
# lf_dut_data.append(j)
# j["appliedRadios"] = list(set(j["appliedRadios"]))
# j['security'] = 'none'
# creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
# test_cases["wpa_2g"] = True
# except Exception as e:
# print(e)
# test_cases["wpa_2g"] = False
# if mode == "wpa":
# for j in profile_data["ssid"][mode]:
# if mode in get_markers.keys() and get_markers[mode]:
# try:
# if j["appliedRadios"].__contains__("2G"):
# lf_dut_data.append(j)
# if j["appliedRadios"].__contains__("5G"):
# lf_dut_data.append(j)
# j["appliedRadios"] = list(set(j["appliedRadios"]))
# j['security'] = 'psk'
# creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
# test_cases["wpa_2g"] = True
# except Exception as e:
# print(e)
# test_cases["wpa_2g"] = False
# if mode == "wpa2_personal":
# for j in profile_data["ssid"][mode]:
#
# if mode in get_markers.keys() and get_markers[mode]:
# try:
# if j["appliedRadios"].__contains__("2G"):
# lf_dut_data.append(j)
# if j["appliedRadios"].__contains__("5G"):
# lf_dut_data.append(j)
# j["appliedRadios"] = list(set(j["appliedRadios"]))
# j['security'] = 'psk2'
# creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
# test_cases["wpa_2g"] = True
# except Exception as e:
# print(e)
# test_cases["wpa2_personal"] = False
#
#
#
# # this will return configuration of your testbed from tests/conftest.py get_configuration fixtures
# print("get configuration",get_configuration)
# print(len(get_configuration['access_point']))
# # print(get_configuration['access_point'])
# for length in range(0,len(get_configuration['access_point'])):
# ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x")
# connected, latest, active = ap_ssh.get_ucentral_status()
# print("connected", connected)
# print("latest",latest)
# print("active", active)
# latest_old = latest
# if connected == False:
# pytest.exit("AP is disconnected from UC Gateway")
# if latest != active:
# allure.attach(name="FAIL : ubus call ucentral status: ", body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(active))
# ap_logs = ap_ssh.logread()
# allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ")
# pytest.fail("AP is disconnected from UC Gateway")
#
# S = 10
#
# # Add logger command before config push
# instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S))
# ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name)
#
# time_1 = time.time()
#
# # Apply config
# print("get equipment id ref ", get_equipment_ref)
# print("get equipment id ref [0]", get_equipment_ref[length])
# instantiate_profile_obj.push_config(serial_number=get_equipment_ref[length])
#
# config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"'))
# config["uuid"] = 0
#
# # Attach the config that is sent from API
# allure.attach(name="Config Sent from API: ", body=str(config), attachment_type=allure.attachment_type.JSON)
#
# ap_config_latest = ap_ssh.get_uc_latest_config()
# try:
# ap_config_latest["uuid"] = 0
# except Exception as e:
# print(e)
# pass
# x = 1
#
# # Check if ucentral gw has pushed the config into latest
# connected, latest, active = ap_ssh.get_ucentral_status()
# while latest_old == latest:
# time.sleep(5)
# x += 1
# print("old config: ", latest_old)
# print("latest: ", latest)
# connected, latest, active = ap_ssh.get_ucentral_status()
# if x == 5:
# break
# onnected, latest, active = ap_ssh.get_ucentral_status()
# if latest == latest_old:
# latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
# allure.attach(name="Latest Config Received by AP: ",
# body=str(latest_cfg),
# attachment_type=allure.attachment_type.JSON)
# ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
# stop_ref="stop testcase: " + instance_name)
# allure.attach(body=ap_logs, name="AP Log: ")
# print("Config from ucentral gw is not sent to AP")
# else:
# print("Config is sent to AP from ucentral gw")
#
# x = 1
# latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
# allure.attach(name="Latest Config Received by AP: ",
# body=str(latest_cfg),
# attachment_type=allure.attachment_type.JSON)
#
# while active != latest:
# connected, latest, active = ap_ssh.get_ucentral_status()
# time.sleep(20)
# x += 1
# print("active: ", active)
# print("latest: ", latest)
# if x == 10:
# break
#
# connected, latest, active = ap_ssh.get_ucentral_status()
# if latest == active:
# print("Config properly Applied on AP")
# else:
# print("Config is not Applied on AP")
#
# time_2 = time.time()
# time_interval = time_2 - time_1
# allure.attach(name="Time Took to apply Config: " + str(time_interval), body="")
#
# time.sleep(60)
#
# ap_config_latest = ap_ssh.get_uc_latest_config()
# ap_config_active = ap_ssh.get_uc_active_config()
# if x < 19:
# print("AP is Broadcasting Applied Config")
# allure.attach(name="Success : Active Config in AP: ", body=str(ap_config_active))
#
# else:
# print("AP is Not Broadcasting Applied Config")
# allure.attach(name="Failed to Apply Config : Active Config in AP : ", body=str(ap_config_active))
# time.sleep(10)
#
# try:
# iwinfo = ap_ssh.iwinfo()
# allure.attach(name="iwinfo: ", body=str(iwinfo))
#
# # tx_power, name = ap_ssh.gettxpower()
# # allure.attach(name="interface name: ", body=str(name))
# # allure.attach(name="tx power: ", body=str(tx_power))
# except:
# pass
# ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name)
# ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
# stop_ref="stop testcase: " + instance_name)
# allure.attach(body=ap_logs, name="AP Log: ")
#
# wifi_status = ap_ssh.get_wifi_status()
# allure.attach(name="wifi status", body=str(wifi_status))
#
# try:
# ssid_info_sdk = instantiate_profile_obj.get_ssid_info()
# print("ssid_info_sdk", ssid_info_sdk)
# ap_wifi_data = ap_ssh.get_iwinfo()
# print("ap_wifi_data", ap_wifi_data)
# print(type(ap_wifi_data))
#
# for p in ap_wifi_data:
# print(p)
# for q in ssid_info_sdk:
# if ap_wifi_data[p][0] == q[0] and ap_wifi_data[p][2] == q[3]:
# q.append(ap_wifi_data[p][1])
#
# ssid_data = []
# idx_mapping = {}
# print(ssid_info_sdk)
# for interface in range(len(ssid_info_sdk)):
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + ssid_info_sdk[interface][0] +
# " security=" + ssid_info_sdk[interface][1].upper() +
# " password=" + ssid_info_sdk[interface][2] +
# " bssid=" + ssid_info_sdk[interface][4].lower()
# ]
# print("ssid", ssid)
# idx_mapping[str(interface)] = [ssid_info_sdk[interface][0],
# ssid_info_sdk[interface][2],
# ssid_info_sdk[interface][1],
# ssid_info_sdk[interface][3],
# ssid_info_sdk[interface][4].lower()
# ]
# print("idx mapping", idx_mapping[str(interface)])
# ssid_data.append(ssid)
# print("ssid_data", ssid_data)
# lf_tools.ssid_list.append(ssid_info_sdk[interface][0])
# if not skip_lf:
# lf_tools.dut_idx_mapping = idx_mapping
# lf_tools.update_ssid(ssid_data=ssid_data)
# except Exception as e:
# print(e)
# pass
#
# def teardown_session():
# wifi_status = ap_ssh.get_wifi_status()
# allure.attach(name="wifi status", body=str(wifi_status))
#
# iwinfo = ap_ssh.iwinfo()
# allure.attach(name="iwinfo: ", body=str(iwinfo))
#
# print("\nTeardown")
#
# request.addfinalizer(teardown_session)
# return test_cases
#