mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-01 19:37:54 +00:00
added additional timeout and retries on rest API Calls
Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
@@ -9,6 +9,7 @@ Currently Having Methods:
|
||||
5. Get current Firmware
|
||||
|
||||
"""
|
||||
import json
|
||||
|
||||
import paramiko
|
||||
from scp import SCPClient
|
||||
@@ -18,9 +19,12 @@ import allure
|
||||
|
||||
class APNOS:
|
||||
|
||||
def __init__(self, credentials=None, pwd=os.getcwd()):
|
||||
def __init__(self, credentials=None, pwd=os.getcwd(), sdk="1.x"):
|
||||
allure.attach(name="APNOS LIbrary: ", body=str(credentials))
|
||||
self.serial = credentials['serial']
|
||||
self.owrt_args = "--prompt root@OpenAp -s serial --log stdout --user root --passwd openwifi"
|
||||
if sdk == "2.x":
|
||||
self.owrt_args = "--prompt root@" + self.serial + " -s serial --log stdout --user root --passwd openwifi"
|
||||
if credentials is None:
|
||||
print("No credentials Given")
|
||||
exit()
|
||||
@@ -34,23 +38,21 @@ class APNOS:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"'
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
print(stdout.read(), stderr.read())
|
||||
print(type(str(stdout.read()).__contains__("False\n")), str(stdout.read()).__contains__("False\n"))
|
||||
if str(stdout.read()).__contains__("False"):
|
||||
output = str(stdout.read())
|
||||
print(output)
|
||||
if output.__contains__("False"):
|
||||
cmd = 'mkdir ~/cicd-git/'
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
print("lol", stdout.read(), stderr.read())
|
||||
cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
print(stdout.read(), stderr.read())
|
||||
if str(stdout.read()).__contains__("False"):
|
||||
output = str(stdout.read())
|
||||
if output.__contains__("False"):
|
||||
print("Copying openwrt_ctl serial control Script...")
|
||||
with SCPClient(client.get_transport()) as scp:
|
||||
scp.put(pwd + 'openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py') # Copy my_file.txt to the server
|
||||
scp.put(pwd + '/openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py') # Copy my_file.txt to the server
|
||||
cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
var = str(stdout.read())
|
||||
print(var, stderr.read())
|
||||
if var.__contains__("True"):
|
||||
allure.attach(name="openwrt_ctl Setup", body=str(var))
|
||||
print("APNOS Serial Setup OK")
|
||||
@@ -150,10 +152,36 @@ class APNOS:
|
||||
info.append(":".join(mac_info_list).replace("'", ""))
|
||||
if ssid[0].split(":")[0] == "b'security":
|
||||
security = ssid[0].split(":")[1].split(",")[2].replace("]", "").replace('"', "").replace("'", "")
|
||||
info.append(security)
|
||||
print(ssid[0].split(":")[1])
|
||||
if security != "OPEN":
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
if security == "WPA-PSK":
|
||||
if ssid[0].split(":")[1].split(",")[6].__contains__("1"):
|
||||
info.append("WPA")
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
if ssid[0].split(":")[1].split(",")[6].__contains__("2"):
|
||||
info.append("WPA2")
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
if ssid[0].split(":")[1].split(",")[6].__contains__("mixed"):
|
||||
info.append("WPA | WPA2")
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
if security == "WPA-SAE":
|
||||
if ssid[0].split(":")[1].split(",")[6].__contains__("3"):
|
||||
info.append("WPA3_PERSONAL")
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
if ssid[0].split(":")[1].split(",")[6].__contains__("mixed"):
|
||||
info.append("WPA3_PERSONAL")
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
if security == "WPA-EAP":
|
||||
info.append("EAP-TTLS")
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
if security == "WPA3-EAP":
|
||||
info.append("EAP-TTLS")
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
else:
|
||||
security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
|
||||
info.append(security_key)
|
||||
else:
|
||||
info.append("OPEN")
|
||||
if ssid[0].split(":")[0] == "b'ssid":
|
||||
info.append(ssid[0].split(":")[1].replace("'", ""))
|
||||
ssid_info_list.append(info)
|
||||
@@ -256,7 +284,27 @@ class APNOS:
|
||||
allure.attach(name="get_redirector ", body=redirector)
|
||||
return redirector
|
||||
|
||||
def run_generic_cmd(self):
|
||||
def run_generic_command(self, cmd=""):
|
||||
try:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = cmd
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
print(output, stderr.read())
|
||||
status = output.decode('utf-8').splitlines()
|
||||
allure.attach(name="get_redirector output ", body=str(stderr))
|
||||
client.close()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
allure.attach(name="get_redirector - Exception ", body=str(e))
|
||||
status = "Error"
|
||||
allure.attach(name="get_redirector ", body=status)
|
||||
return status
|
||||
|
||||
def get_ucentral_status(self):
|
||||
try:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "ubus call ucentral status"
|
||||
@@ -265,17 +313,115 @@ class APNOS:
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
output = output.decode('utf-8').splitlines()
|
||||
allure.attach(name="get_serial_number output ", body=str(stderr))
|
||||
print(output)
|
||||
# serial = output[1].replace(" ", "").split("|")[1]
|
||||
# print(output, stderr.read())
|
||||
connected = False
|
||||
if "connected" in output.decode('utf-8').splitlines()[2]:
|
||||
connected = True
|
||||
# connected = output.decode('utf-8').splitlines()[2]
|
||||
latest = output.decode('utf-8').splitlines()[3].split(":")[1].replace(" ", "").replace(",", "")
|
||||
active = output.decode('utf-8').splitlines()[4].split(":")[1].replace(" ", "").replace(",", "")
|
||||
client.close()
|
||||
allure.attach(name="ubus call ucentral status ", body=str(connected) + "\n" + latest + "\n" + active)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
allure.attach(name="Exception ", body=str(e))
|
||||
connected, latest, active = "Error", "Error", "Error"
|
||||
allure.attach(name="ubus call ucentral status ", body=str(connected) + "\n" + latest + "\n" + active)
|
||||
return connected, latest, active
|
||||
|
||||
def get_uc_latest_config(self):
|
||||
try:
|
||||
connected, latest, active = self.get_ucentral_status()
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "cat /etc/ucentral/ucentral.cfg." + latest
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read().decode('utf-8').splitlines()[1]
|
||||
json_output = json.loads(output)#, sort_keys=True)
|
||||
print(type(json_output))
|
||||
client.close()
|
||||
except Exception as e:
|
||||
json_output = False
|
||||
print(e)
|
||||
return json_output
|
||||
|
||||
def get_uc_active_config(self):
|
||||
try:
|
||||
connected, latest, active = self.get_ucentral_status()
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "cat /etc/ucentral/ucentral.cfg." + active
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read().decode('utf-8').splitlines()[1]
|
||||
json_output = json.loads(output)#, sort_keys=True)
|
||||
print(json_output)
|
||||
client.close()
|
||||
except Exception as e:
|
||||
json_output = False
|
||||
print(e)
|
||||
return json_output
|
||||
|
||||
def logread(self):
|
||||
try:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "logread"
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
status = output.decode('utf-8').splitlines()
|
||||
logread = status
|
||||
logs = ""
|
||||
for i in logread:
|
||||
logs = logs + i + "\n"
|
||||
client.close()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
allure.attach(name="get_serial_number - Exception ", body=str(e))
|
||||
serial = "Error"
|
||||
allure.attach(name="get_serial_number ", body=str(serial))
|
||||
return serial
|
||||
logs = ""
|
||||
return logs
|
||||
|
||||
def get_vifc(self):
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "vifC"
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
client.close()
|
||||
allure.attach(name="vif state Output Msg: ", body=str(output))
|
||||
allure.attach(name="vif state Err Msg: ", body=str(stderr))
|
||||
return output
|
||||
|
||||
def get_vifs(self):
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "vifS"
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
client.close()
|
||||
allure.attach(name="vif state Output Msg: ", body=str(output))
|
||||
allure.attach(name="vif state Err Msg: ", body=str(stderr))
|
||||
return output
|
||||
|
||||
def get_vlan(self):
|
||||
stdout = self.get_vifs()
|
||||
vlan_list = []
|
||||
for i in stdout.splitlines():
|
||||
vlan = str(i.strip()).replace("|", ".").split(".")
|
||||
try:
|
||||
if not vlan[0].find("b'vlan_id"):
|
||||
vlan_list.append(vlan[1].strip())
|
||||
except:
|
||||
pass
|
||||
return vlan_list
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
@@ -289,17 +435,17 @@ if __name__ == '__main__':
|
||||
|
||||
}
|
||||
abc = {
|
||||
'model': 'eap102',
|
||||
'mode': 'wifi6',
|
||||
'serial': '903cb39d6918',
|
||||
'jumphost': True,
|
||||
'ip': "localhost", # 10.28.3.103
|
||||
'username': "lanforge",
|
||||
'password': "pumpkin77",
|
||||
'port': 8863, # 22
|
||||
'jumphost_tty': '/dev/ttyAP2',
|
||||
'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/uCentral/edgecore_eap102/20210625-edgecore_eap102-uCentral-trunk-4225122-upgrade.bin"
|
||||
}
|
||||
var = APNOS(credentials=abc)
|
||||
r = var.run_generic_cmd()
|
||||
print(r)
|
||||
'model': 'eap102',
|
||||
'mode': 'wifi6',
|
||||
'serial': '903cb39d6918',
|
||||
'jumphost': True,
|
||||
'ip': "localhost", # 10.28.3.103
|
||||
'username': "lanforge",
|
||||
'password': "pumpkin77",
|
||||
'port': 8806, # 22
|
||||
'jumphost_tty': '/dev/ttyAP2',
|
||||
'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/uCentral/edgecore_eap102/20210625-edgecore_eap102-uCentral-trunk-4225122-upgrade.bin"
|
||||
}
|
||||
var = APNOS(credentials=abc, sdk="2.x")
|
||||
r = var.get_uc_latest_config()
|
||||
print(r)
|
||||
|
||||
@@ -6,12 +6,15 @@
|
||||
import json
|
||||
import ssl
|
||||
import sys
|
||||
import time
|
||||
from urllib.parse import urlparse
|
||||
import pytest
|
||||
import allure
|
||||
import requests
|
||||
from pathlib import Path
|
||||
|
||||
from requests.adapters import HTTPAdapter
|
||||
|
||||
|
||||
class ConfigureController:
|
||||
|
||||
@@ -20,32 +23,39 @@ class ConfigureController:
|
||||
self.password = controller_data["password"]
|
||||
self.host = urlparse(controller_data["url"])
|
||||
self.access_token = ""
|
||||
self.session = requests.Session()
|
||||
|
||||
self.login_resp = self.login()
|
||||
|
||||
def build_uri(self, path):
|
||||
new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path)
|
||||
print(new_uri)
|
||||
return new_uri
|
||||
|
||||
def login(self):
|
||||
|
||||
uri = self.build_uri("oauth2")
|
||||
self.session.mount(uri, HTTPAdapter(max_retries=15))
|
||||
payload = json.dumps({"userId": self.username, "password": self.password})
|
||||
resp = requests.post(uri, data=payload, verify="")
|
||||
resp = self.session.post(uri, data=payload, verify=False, timeout=100)
|
||||
self.check_response("POST", resp, "", payload, uri)
|
||||
token = resp.json()
|
||||
self.access_token = token["access_token"]
|
||||
print(resp)
|
||||
self.session.headers.update({'Authorization': self.access_token})
|
||||
return resp
|
||||
|
||||
def logout(self):
|
||||
global access_token
|
||||
uri = self.build_uri('oauth2/%s' % self.access_token)
|
||||
resp = requests.delete(uri, headers=self.make_headers(), verify=False)
|
||||
resp = self.session.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("DELETE", resp, self.make_headers(), "", uri)
|
||||
print('Logged out:', resp.status_code)
|
||||
return resp
|
||||
|
||||
def make_headers(self):
|
||||
headers = {'Authorization': 'Bearer %s' % self.access_token}
|
||||
headers = {'Authorization': 'Bearer %s' % self.access_token,
|
||||
"Connection": "keep-alive",
|
||||
"Keep-Alive": "timeout=10, max=1000"
|
||||
}
|
||||
return headers
|
||||
|
||||
def check_response(self, cmd, response, headers, data_str, url):
|
||||
@@ -74,16 +84,18 @@ class UController(ConfigureController):
|
||||
|
||||
def get_devices(self):
|
||||
uri = self.build_uri("devices/")
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False)
|
||||
resp = self.session.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
devices = resp.json()
|
||||
#resp.close()()
|
||||
return devices
|
||||
|
||||
def get_device_by_serial_number(self, serial_number=None):
|
||||
uri = self.build_uri("device/" + serial_number)
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False)
|
||||
resp = self.session.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
device = resp.json()
|
||||
#resp.close()()
|
||||
return device
|
||||
|
||||
def get_device_uuid(self, serial_number):
|
||||
@@ -266,18 +278,17 @@ class UProfileUtility:
|
||||
pytest.exit("invalid Operating Mode")
|
||||
|
||||
def push_config(self, serial_number):
|
||||
payload = {}
|
||||
payload["configuration"] = self.base_profile_config
|
||||
payload['serialNumber'] = serial_number
|
||||
payload['UUID'] = 0
|
||||
print(payload)
|
||||
payload = {"configuration": self.base_profile_config, 'serialNumber': serial_number, 'UUID': 0}
|
||||
|
||||
uri = self.sdk_client.build_uri("device/" + serial_number + "/configure")
|
||||
basic_cfg_str = json.dumps(payload)
|
||||
resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(), verify=False)
|
||||
resp = self.sdk_client.session.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(),
|
||||
verify=False, timeout=100)
|
||||
self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri)
|
||||
print(resp.url)
|
||||
#resp.close()()
|
||||
print(resp)
|
||||
|
||||
|
||||
# UCENTRAL_BASE_CFG = {
|
||||
# "uuid": 1,
|
||||
# "radios": [
|
||||
@@ -427,9 +438,11 @@ controller = {
|
||||
# "radius": False
|
||||
# }
|
||||
# obj = UController(controller_data=controller)
|
||||
# # # # # print(obj.get_devices())
|
||||
# # # # # print(obj.get_device_uuid(serial_number="903cb3944873"))
|
||||
# # # # # obj.get_device_uuid(serial_number="c4411ef53f23")
|
||||
# time.sleep(10)
|
||||
# obj.logout()
|
||||
# # # # # # # print(obj.get_devices())
|
||||
# # # # # # # print(obj.get_device_uuid(serial_number="903cb3944873"))
|
||||
# # # # # # # obj.get_device_uuid(serial_number="c4411ef53f23")
|
||||
# profile_client = UProfileUtility(sdk_client=obj)
|
||||
# profile_client.set_radio_config()
|
||||
# profile_client.set_mode(mode="VLAN")
|
||||
@@ -437,10 +450,12 @@ controller = {
|
||||
# profile_client.add_ssid(ssid_data=ssid_data)
|
||||
# ssid_data = {"ssid_name": "ssid_wpa_test_4_vlan", "vlan": 100, "appliedRadios": ["2G", "5G"], "security_key": "something", "security": "none"}
|
||||
# profile_client.add_ssid(ssid_data=ssid_data)
|
||||
# # # print(profile_client.base_profile_config)
|
||||
# # # # print(profile_client.base_profile_config)
|
||||
# profile_client.push_config(serial_number="903cb39d6918")
|
||||
# print(profile_client.base_profile_config)
|
||||
# equipments = obj.get_devices()
|
||||
# b = json.loads(str(profile_client.base_profile_config).replace(" ", "").replace("'", '"'))#, json.dumps(b, sort_keys=True)
|
||||
# print(b["interfaces"])#, b["interfaces"])
|
||||
# # print(profile_client.base_profile_config)
|
||||
# # equipments = obj.get_devices()
|
||||
# print(equipments)
|
||||
# for i in equipments:
|
||||
# for j in equipments[i]:
|
||||
|
||||
@@ -215,7 +215,7 @@ CONFIGURATION = {
|
||||
'ip': "localhost", # 10.28.3.103
|
||||
'username': "lanforge",
|
||||
'password': "pumpkin77",
|
||||
'port': 8863, # 22
|
||||
'port': 8806, # 22
|
||||
'jumphost_tty': '/dev/ttyAP2',
|
||||
'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/uCentral/edgecore_eap102/20210625-edgecore_eap102-uCentral-trunk-4225122-upgrade.bin"
|
||||
}
|
||||
@@ -224,8 +224,8 @@ CONFIGURATION = {
|
||||
"name": "lanforge",
|
||||
"details": {
|
||||
"ip": "localhost", # 10.28.3.30
|
||||
"port": 8862, # 8080
|
||||
"ssh_port": 8864,
|
||||
"port": 8802, # 8080
|
||||
"ssh_port": 8804,
|
||||
"2.4G-Radio": ["wiphy4"],
|
||||
"5G-Radio": ["wiphy5"],
|
||||
"AX-Radio": ["wiphy0", "wiphy1", "wiphy2", "wiphy3"],
|
||||
|
||||
@@ -250,8 +250,12 @@ def setup_controller(request, get_configuration, test_access_point):
|
||||
|
||||
def teardown_ucontroller():
|
||||
print("\nTest session Completed")
|
||||
allure.attach(body=str(get_configuration["controller"]), name="Controller Teardown: ")
|
||||
sdk_client.logout()
|
||||
allure.attach(body=str(get_configuration["controller"]), name="Controller Teardown: ")
|
||||
try:
|
||||
sdk_client.logout()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
request.addfinalizer(teardown_ucontroller)
|
||||
|
||||
@@ -269,6 +273,7 @@ def setup_controller(request, get_configuration, test_access_point):
|
||||
print(e)
|
||||
allure.attach(body=str(e), name="Controller Instantiation Failed: ")
|
||||
sdk_client = False
|
||||
pytest.exit("unable to communicate to Controller" + str(e))
|
||||
yield sdk_client
|
||||
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -55,11 +56,12 @@ def setup_vlan():
|
||||
|
||||
@allure.feature("CLIENT CONNECTIVITY SETUP")
|
||||
@pytest.fixture(scope="class")
|
||||
def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment_id, get_uuid,
|
||||
def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment_id,
|
||||
instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools,
|
||||
get_security_flags, get_configuration, radius_info, get_apnos):
|
||||
if request.config.getoption("--ucentral"):
|
||||
instantiate_profile = instantiate_profile(sdk_client=setup_controller)
|
||||
instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller)
|
||||
print(1, instantiate_profile_obj.sdk_client)
|
||||
vlan_id, mode = 0, 0
|
||||
parameter = dict(request.param)
|
||||
print(parameter)
|
||||
@@ -69,18 +71,18 @@ def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment
|
||||
print("Invalid Mode: ", parameter['mode'])
|
||||
allure.attach(body=parameter['mode'], name="Invalid Mode: ")
|
||||
yield test_cases
|
||||
instantiate_profile.set_radio_config()
|
||||
instantiate_profile_obj.set_radio_config()
|
||||
if parameter['mode'] == "NAT":
|
||||
mode = "NAT"
|
||||
instantiate_profile.set_mode(mode=mode)
|
||||
instantiate_profile_obj.set_mode(mode=mode)
|
||||
vlan_id = 1
|
||||
if parameter['mode'] == "BRIDGE":
|
||||
mode = "BRIDGE"
|
||||
instantiate_profile.set_mode(mode=mode)
|
||||
instantiate_profile_obj.set_mode(mode=mode)
|
||||
vlan_id = 1
|
||||
if parameter['mode'] == "VLAN":
|
||||
mode = "VLAN"
|
||||
instantiate_profile.set_mode(mode=mode)
|
||||
instantiate_profile_obj.set_mode(mode=mode)
|
||||
vlan_id = setup_vlan
|
||||
profile_data["ssid"] = {}
|
||||
for i in parameter["ssid_modes"]:
|
||||
@@ -102,7 +104,7 @@ def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment
|
||||
j["appliedRadios"] = list(set(j["appliedRadios"]))
|
||||
j['security'] = 'none'
|
||||
lf_dut_data.append(j)
|
||||
creates_profile = instantiate_profile.add_ssid(ssid_data=j)
|
||||
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
|
||||
test_cases["wpa_2g"] = True
|
||||
allure.attach(body=str(creates_profile),
|
||||
name="SSID Profile Created")
|
||||
@@ -123,7 +125,7 @@ def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment
|
||||
j["appliedRadios"] = list(set(j["appliedRadios"]))
|
||||
j['security'] = 'psk'
|
||||
lf_dut_data.append(j)
|
||||
creates_profile = instantiate_profile.add_ssid(ssid_data=j)
|
||||
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
|
||||
test_cases["wpa_2g"] = True
|
||||
allure.attach(body=str(creates_profile),
|
||||
name="SSID Profile Created")
|
||||
@@ -145,7 +147,7 @@ def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment
|
||||
j['security'] = 'psk2'
|
||||
lf_dut_data.append(j)
|
||||
print("shivam: ", j)
|
||||
creates_profile = instantiate_profile.add_ssid(ssid_data=j)
|
||||
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
|
||||
test_cases["wpa_2g"] = True
|
||||
allure.attach(body=str(creates_profile),
|
||||
name="SSID Profile Created")
|
||||
@@ -154,9 +156,54 @@ def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment
|
||||
test_cases["wpa2_personal"] = False
|
||||
allure.attach(body=str(e),
|
||||
name="SSID Profile Creation Failed")
|
||||
instantiate_profile.push_config(serial_number=get_equipment_id[0])
|
||||
yield True
|
||||
ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/", sdk="2.x")
|
||||
connected, latest, active = ap_ssh.get_ucentral_status()
|
||||
print(2, instantiate_profile_obj.sdk_client)
|
||||
if connected == False:
|
||||
pytest.exit("AP is disconnected")
|
||||
instantiate_profile_obj.push_config(serial_number=get_equipment_id[0])
|
||||
config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"'))
|
||||
config["uuid"] = 0
|
||||
ap_config_latest = ap_ssh.get_uc_latest_config()
|
||||
ap_config_latest["uuid"] = 0
|
||||
x = 1
|
||||
while ap_config_latest != config:
|
||||
time.sleep(5)
|
||||
x += 1
|
||||
ap_config_latest = ap_ssh.get_uc_latest_config()
|
||||
ap_config_latest["uuid"] = 0
|
||||
if x == 19:
|
||||
break
|
||||
if x < 19:
|
||||
print("Config properly applied into AP", config)
|
||||
ap_config_latest = ap_ssh.get_uc_latest_config()
|
||||
ap_config_latest["uuid"] = 0
|
||||
|
||||
ap_config_active = ap_ssh.get_uc_active_config()
|
||||
ap_config_active["uuid"] = 0
|
||||
x = 1
|
||||
while ap_config_active != ap_config_latest:
|
||||
time.sleep(5)
|
||||
x += 1
|
||||
ap_config_latest = ap_ssh.get_uc_latest_config()
|
||||
ap_config_latest["uuid"] = 0
|
||||
|
||||
ap_config_active = ap_ssh.get_uc_active_config()
|
||||
ap_config_active["uuid"] = 0
|
||||
if x == 19:
|
||||
break
|
||||
allure_body = "AP config status: \n" + \
|
||||
"Active Config: " + str(ap_ssh.get_uc_active_config()) + "\n" \
|
||||
"Latest Config: ", str(
|
||||
ap_ssh.get_uc_latest_config()) + "\n" \
|
||||
"Applied Config: ", str(config)
|
||||
if x < 19:
|
||||
print("AP is Broadcasting Applied Config")
|
||||
allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body))
|
||||
else:
|
||||
print("AP is Not Broadcasting Applied Config")
|
||||
allure.attach(name="Config Info", body="AP is Not Broadcasting Applied Config: " + str(allure_body))
|
||||
yield True
|
||||
else:
|
||||
instantiate_profile = instantiate_profile(sdk_client=setup_controller)
|
||||
vlan_id, mode = 0, 0
|
||||
@@ -755,10 +802,10 @@ def get_vif_state(get_apnos, get_configuration, request):
|
||||
"""UCentral Fixtures"""
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def get_uuid(request, setup_controller, get_equipment_id):
|
||||
if request.config.getoption("--ucentral"):
|
||||
UUID = setup_controller.get_device_uuid(serial_number=get_equipment_id[0])
|
||||
yield UUID
|
||||
else:
|
||||
yield False
|
||||
# @pytest.fixture(scope="session")
|
||||
# def get_uuid(request, setup_controller, get_equipment_id):
|
||||
# # if request.config.getoption("--ucentral"):
|
||||
# # UUID = 1 #setup_controller.get_device_uuid(serial_number=get_equipment_id[0])
|
||||
# # yield UUID
|
||||
# # else:
|
||||
# yield False
|
||||
|
||||
@@ -13,7 +13,7 @@ pytestmark = [pytest.mark.client_connectivity, pytest.mark.nat, pytest.mark.gene
|
||||
setup_params_general = {
|
||||
"mode": "NAT",
|
||||
"ssid_modes": {
|
||||
"open": [{"ssid_name": "ssid_open_2g_nat", "appliedRadios": ["is2dot4GHz"], "security_key": "something"},
|
||||
"open": [{"ssid_name": "ssid_shivam", "appliedRadios": ["is2dot4GHz"], "security_key": "something"},
|
||||
{"ssid_name": "ssid_open_5g_nat", "appliedRadios": ["is5GHzU", "is5GHz", "is5GHzL"],
|
||||
"security_key": "something"}],
|
||||
"wpa": [{"ssid_name": "ssid_wpa_2g_nat", "appliedRadios": ["is2dot4GHz"], "security_key": "something"},
|
||||
@@ -53,8 +53,10 @@ class TestNATModeConnectivitySuiteA(object):
|
||||
"""Client Connectivity open ssid 2.4G
|
||||
pytest -m "client_connectivity and nat and general and open and twog"
|
||||
"""
|
||||
global setup_params_general
|
||||
profile_data = setup_params_general["ssid_modes"]["open"][0]
|
||||
ssid_name = profile_data["ssid_name"]
|
||||
print(ssid_name)
|
||||
security_key = "[BLANK]"
|
||||
security = "open"
|
||||
mode = "NAT"
|
||||
@@ -589,62 +591,3 @@ class TestBridgeModeConnectivitySuiteB(object):
|
||||
# passes))
|
||||
# assert passes
|
||||
|
||||
|
||||
setup_params_general = {
|
||||
"mode": "NAT",
|
||||
"ssid_modes": {
|
||||
"open": [{"ssid_name": "ssid_open_2g", "appliedRadios": ["is2dot4GHz"], "security_key": "something"},
|
||||
{"ssid_name": "ssid_open_5g", "appliedRadios": ["is5GHzU", "is5GHz", "is5GHzL"],
|
||||
"security_key": "something"}],
|
||||
"wpa": [{"ssid_name": "ssid_wpa_2g", "appliedRadios": ["is2dot4GHz"], "security_key": "something"},
|
||||
{"ssid_name": "ssid_wpa_5g", "appliedRadios": ["is5GHzU", "is5GHz", "is5GHzL"],
|
||||
"security_key": "something"}],
|
||||
"wpa2_personal": [
|
||||
{"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["is2dot4GHz"], "security_key": "something"},
|
||||
{"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["is5GHzU", "is5GHz", "is5GHzL"],
|
||||
"security_key": "something"}]},
|
||||
"rf": {},
|
||||
"radius": False
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.suiteX
|
||||
@allure.feature("NAT MODE CLIENT CONNECTIVITY")
|
||||
@pytest.mark.parametrize(
|
||||
'setup_profiles',
|
||||
[setup_params_general],
|
||||
indirect=True,
|
||||
scope="class"
|
||||
)
|
||||
@pytest.mark.usefixtures("setup_profiles")
|
||||
class TestSomthing(object):
|
||||
|
||||
@pytest.mark.open
|
||||
@pytest.mark.twog
|
||||
def test_open_2g(self):
|
||||
assert True
|
||||
|
||||
@pytest.mark.open
|
||||
@pytest.mark.fiveg
|
||||
def test_open_5g(self):
|
||||
assert True
|
||||
|
||||
@pytest.mark.wpa
|
||||
@pytest.mark.twog
|
||||
def test_wpa_2g(self):
|
||||
assert True
|
||||
|
||||
@pytest.mark.wpa
|
||||
@pytest.mark.fiveg
|
||||
def test_wpa_5g(self):
|
||||
assert True
|
||||
|
||||
@pytest.mark.wpa2_personal
|
||||
@pytest.mark.twog
|
||||
def test_wpa2_personal_2g(self):
|
||||
assert True
|
||||
|
||||
@pytest.mark.wpa2_personal
|
||||
@pytest.mark.fiveg
|
||||
def test_wpa2_personal_5g(self):
|
||||
assert True
|
||||
|
||||
Reference in New Issue
Block a user