Added FMS and GW Test cases as a part of sanity

Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
shivamcandela
2021-09-19 15:15:31 +05:30
parent 4b1653c89f
commit c807e32a96
8 changed files with 256 additions and 208 deletions

View File

@@ -17,7 +17,6 @@ import random
import paramiko
from scp import SCPClient
import os
import allure
class APNOS:
@@ -50,12 +49,10 @@ class APNOS:
cmd = "kill " + str(a).replace("b'", "")
print(cmd)
stdin, stdout, stderr = client.exec_command(cmd)
print(stdout)
client = self.ssh_cli_connect()
cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"'
stdin, stdout, stderr = client.exec_command(cmd)
output = str(stdout.read())
print(output)
if output.__contains__("False"):
cmd = 'mkdir ~/cicd-git/'
stdin, stdout, stderr = client.exec_command(cmd)
@@ -69,6 +66,7 @@ class APNOS:
cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
stdin, stdout, stderr = client.exec_command(cmd)
var = str(stdout.read())
client.close()
if var.__contains__("True"):
print("APNOS Serial Setup OK")
else:
@@ -109,6 +107,7 @@ class APNOS:
client.close()
data = str(data).replace(" ", "").split("\\r\\n")
band_info = []
client.close()
for i in data:
tmp = []
if i.__contains__("AccessPoint"):
@@ -323,13 +322,18 @@ class APNOS:
f"cmd --value \"{cmd}\" "
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read()
# print(output, stderr.read())
connected = False
if "connected" in output.decode('utf-8').splitlines()[2]:
connected = True
# connected = output.decode('utf-8').splitlines()[2]
latest = output.decode('utf-8').splitlines()[3].split(":")[1].replace(" ", "").replace(",", "")
active = output.decode('utf-8').splitlines()[4].split(":")[1].replace(" ", "").replace(",", "")
print(output)
if 'latest' not in str(output):
print("ubus call ucentral status: command has invalid output", str(output))
connected, latest, active = "Error", "Error1", "Error2"
return connected, latest, active
else:
connected = False
if "connected" in output.decode('utf-8').splitlines()[2]:
connected = True
# connected = output.decode('utf-8').splitlines()[2]
latest = output.decode('utf-8').splitlines()[3].split(":")[1].replace(" ", "").replace(",", "")
active = output.decode('utf-8').splitlines()[4].split(":")[1].replace(" ", "").replace(",", "")
client.close()
except Exception as e:
print(e)
@@ -467,6 +471,7 @@ class APNOS:
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8')
o = output
client.close()
return o
def gettxpower(self):
@@ -489,6 +494,7 @@ class APNOS:
name = output.replace("\t", "").splitlines()
name.remove('')
name.pop(-1)
client.close()
return tx_power, name
def get_logread(self, start_ref="", stop_ref=""):
@@ -574,20 +580,21 @@ class APNOS:
if __name__ == '__main__':
obj = {
'model': 'wf188n',
'mode': 'wifi6',
'serial': '0000c1018812',
'jumphost': True,
'ip': "10.28.3.103",
'username': "lanforge",
'password': "pumpkin77",
'port': 22,
'jumphost_tty': '/dev/ttyAP1',
'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/uCentral/cig_wf188/20210729-cig_wf188-v2.0.0-rc2-ec3662e-upgrade.bin"
}
'model': 'ecw5211',
'mode': 'wifi5',
'serial': '68215fda456d',
'jumphost': True,
'ip': "localhost",
'username': "lanforge",
'password': "pumpkin77",
'port': 8733,
'jumphost_tty': "/dev/ttyAP5",
'version': "release-latest"
}
var = APNOS(credentials=obj, sdk="2.x")
a = var.run_generic_command(cmd="wifi status")
print("".join(a))
a, b, c = var.get_ucentral_status()
print(a, b, c)
# S = 9
# instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S))
# var.run_generic_command(cmd="logger start testcase: " + instance_name)

View File

@@ -233,28 +233,30 @@ class FMSUtils:
return {}
# 2c3becf
def get_firmwares(self, limit="", model="", latestonly="", branch="", commit_id=""):
def get_firmwares(self, limit="10000", model="", latestonly="", branch="", commit_id="", offset="3000"):
deviceType = self.ap_model_lookup(model=model)
params = "limit=" + limit + "&deviceType=" + deviceType + "&latestonly=" + latestonly
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET", params=params, payload="")
params = "limit=" + limit + \
"&deviceType=" + deviceType + \
"&latestonly=" + latestonly + \
"offset=" + offset
command = "firmwares/"
response = self.sdk_client.request(service="fms", command=command, method="GET", params=params, payload="")
allure.attach(name=command + params,
body=str(response.status_code) + "\n" + str(response.json()),
attachment_type=allure.attachment_type.JSON)
if response.status_code == 200:
data = response.json()
newlist = sorted(data['firmwares'], key=itemgetter('created'))
print("finding a bug", len(newlist))
for i in newlist:
print(i['uri'])
print(i['revision'])
# for i in newlist:
# print(i['uri'])
# print(i['revision'])
# print(newlist)
self.sdk_client.logout()
pytest.exit("hey")
return newlist
# print(data)
return "devices"
return "error"
class UProfileUtility:
@@ -540,7 +542,8 @@ if __name__ == '__main__':
}
obj = Controller(controller_data=controller)
fms = FMSUtils(sdk_client=obj)
fms.get_firmwares()
new = fms.get_firmwares(model='cig_wf194c', offset='3')
print(len(new))
# fms.get_device_set()
# model = fms.get_latest_fw(model="eap102")
# print(model)

View File

@@ -83,7 +83,7 @@ def pytest_addoption(parser):
parser.addini("influx_token", "Influx Token", default="TCkdATXAbHmNbn4QyNaj43WpGBYxFrzV")
parser.addini("influx_bucket", "influx bucket", default="tip-cicd")
parser.addini("influx_org", "influx organization", default="tip")
parser.addini("build", "AP Firmware build URL", default="0")
parser.addini(name="firmware", type='string', help="AP Firmware build URL", default="0")
parser.addini("cloud_ctlr", "AP Firmware build URL", default="0")
parser.addini("num_stations", "Number of Stations/Clients for testing")
@@ -224,6 +224,11 @@ def get_configuration(testbed, request):
"""yields the selected testbed information from lab info file (configuration.py)"""
if request.config.getini("cloud_ctlr") != "0":
CONFIGURATION[testbed]["controller"]["url"] = request.config.getini("cloud_ctlr")
if request.config.getini("firmware") != "0":
version = request.config.getini("firmware")
version_list = version.split(",")
for i in range(len(CONFIGURATION[testbed]["access_point"])):
CONFIGURATION[testbed]["access_point"][i]["version"] = version_list[i]
yield CONFIGURATION[testbed]
@@ -258,8 +263,9 @@ def setup_controller(request, get_configuration, test_access_point, add_env_prop
@pytest.fixture(scope="session")
def setup_firmware(fixtures_ver):
def setup_firmware(setup_controller):
""" Fixture to Setup Firmware with the selected sdk """
setup_controller.instantiate_firmware()
yield True
@@ -480,53 +486,23 @@ def get_markers(request, get_security_flags):
yield security_dict
# Will be availabe as a test case
@pytest.fixture(scope="session")
def test_access_point(request, testbed, get_apnos, get_configuration):
def test_access_point(fixtures_ver, request, get_configuration, get_apnos):
"""used to check the manager status of AP, should be used as a setup to verify if ap can reach cloud"""
mgr_status = []
if request.config.getoption("1.x"):
for access_point_info in get_configuration['access_point']:
ap_ssh = get_apnos(access_point_info, sdk="1.x")
status = ap_ssh.get_manager_state()
if "ACTIVE" not in status:
time.sleep(30)
ap_ssh = APNOS(access_point_info)
status = ap_ssh.get_manager_state()
mgr_status.append(status)
else:
# forgit access_point_info in get_configuration['access_point']:
# ap_ssh = get_apnos(access_point_info)
# status = ap_ssh.get_manager_state()
# if "ACTIVE" not in status:
# time.sleep(30)
# ap_ssh = APNOS(access_point_info)
# status = ap_ssh.get_manager_state()
# mgr_status.append(status)
pass
yield mgr_status
status = fixtures_ver.get_ap_cloud_connectivity_status(get_configuration, get_apnos)
def teardown_session():
data = []
data.append(False)
for s in status:
data.append(s[0])
print(data)
if False not in data:
pytest.exit("AP is Not connected to ucentral gw")
allure.attach(name=str(status), body="")
# Not used anymore, needs to depreciate it
@pytest.fixture(scope="session")
def get_lanforge_data(get_configuration):
"""depreciate it"""
lanforge_data = {}
if get_configuration['traffic_generator']['name'] == 'lanforge':
lanforge_data = {
"lanforge_ip": get_configuration['traffic_generator']['details']['ip'],
"lanforge-port-number": get_configuration['traffic_generator']['details']['port'],
"lanforge_2dot4g": get_configuration['traffic_generator']['details']['2.4G-Radio'][0],
"lanforge_5g": get_configuration['traffic_generator']['details']['5G-Radio'][0],
"lanforge_2dot4g_prefix": get_configuration['traffic_generator']['details']['2.4G-Station-Name'],
"lanforge_5g_prefix": get_configuration['traffic_generator']['details']['5G-Station-Name'],
"lanforge_2dot4g_station": get_configuration['traffic_generator']['details']['2.4G-Station-Name'],
"lanforge_5g_station": get_configuration['traffic_generator']['details']['5G-Station-Name'],
"lanforge_bridge_port": get_configuration['traffic_generator']['details']['upstream'],
"lanforge_vlan_port": get_configuration['traffic_generator']['details']['upstream'] + ".100",
"vlan": 100
}
yield lanforge_data
request.addfinalizer(teardown_session)
yield status
@pytest.fixture(scope="session")
@@ -575,14 +551,6 @@ def lf_tools(get_configuration, testbed):
yield obj
@pytest.fixture(scope="session")
def lf_tools(get_configuration, testbed):
""" Create a DUT on LANforge"""
obj = ChamberView(lanforge_data=get_configuration["traffic_generator"]["details"],
testbed=testbed, access_point_data=get_configuration["access_point"])
yield obj
@pytest.fixture(scope="session")
def setup_influx(request, testbed, get_configuration):
""" Setup Influx Parameters: Used in CV Automation"""
@@ -645,8 +613,8 @@ def fixtures_ver(request, get_configuration):
@pytest.fixture(scope="session")
def firmware_upgrade(fixtures_ver, get_apnos, get_configuration):
fixtures_ver.setup_firmware(get_apnos, get_configuration)
yield True
upgrade_status = fixtures_ver.setup_firmware(get_apnos, get_configuration)
yield upgrade_status
"""

View File

@@ -7,11 +7,12 @@ import pytest
import json
import allure
@pytest.mark.uc_sanityw
@allure.feature("SDK REST API")
class TestUcentralGatewayService(object):
"""
"""
@pytest.mark.sdk_restapi
def test_gwservice_listdevices(self, setup_controller):
"""
@@ -24,7 +25,7 @@ class TestUcentralGatewayService(object):
if resp.status_code != 200:
assert False
devices = json.loads(resp.text)
print (devices)
print(devices)
@pytest.mark.sdk_restapi
def test_gwservice_createdevice(self, setup_controller):
@@ -48,7 +49,7 @@ class TestUcentralGatewayService(object):
if resp.status_code != 200:
assert False
devices = json.loads(resp.text)
print (devices)
print(devices)
resp = setup_controller.request("gw", "device/DEADBEEF0011", "GET", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
@@ -83,7 +84,7 @@ class TestUcentralGatewayService(object):
if resp.status_code != 200:
assert False
devices = json.loads(resp.text)
print (devices)
print(devices)
payload = {'serialNumber': 'DEADBEEF0011',
'owner': 'pytest'}
@@ -100,8 +101,7 @@ class TestUcentralGatewayService(object):
assert False
device = json.loads(resp.text)
print (device)
print(device)
resp = setup_controller.request("gw", "device/DEADBEEF0011", "DELETE", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
@@ -130,12 +130,10 @@ class TestUcentralGatewayService(object):
if resp.status_code != 200:
assert False
devices = json.loads(resp.text)
print (devices)
print(devices)
resp = setup_controller.request("gw", "device/DEADBEEF0011", "DELETE", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw get device", body=body)
if resp.status_code != 200:
assert False

View File

@@ -60,6 +60,18 @@ class Fixtures_1x:
def setup_firmware(self):
pass
def get_ap_cloud_connectivity_status(self, get_configuration, get_apnos):
mgr_status = []
for access_point_info in get_configuration['access_point']:
ap_ssh = get_apnos(access_point_info, sdk="1.x")
status = ap_ssh.get_manager_state()
if "ACTIVE" not in status:
time.sleep(30)
ap_ssh = APNOS(access_point_info)
status = ap_ssh.get_manager_state()
mgr_status.append(status)
return mgr_status
def get_ap_version(self, get_apnos, get_configuration):
# version_list = []
# for access_point_info in get_configuration['access_point']:

View File

@@ -60,8 +60,10 @@ class Fixtures_2x:
def setup_firmware(self, get_apnos, get_configuration, request=""):
# Query AP Firmware
upgrade_status = []
for ap in get_configuration['access_point']:
ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x")
# If specified as URL
try:
response = requests.get(ap['version'])
@@ -78,24 +80,19 @@ class Fixtures_2x:
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
ap_version = ap_ssh.get_ap_version_ucentral()
current_version_commit = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
if target_revision_commit in current_version_commit:
upgrade_status.append([ap['serial'], target_revision_commit, current_version_commit])
print("Firmware Upgraded to :", ap_version)
else:
print("firmware upgraded failed: ", target_revision)
upgrade_status.append([ap['serial'],target_revision_commit, current_version_commit])
break
except Exception as e:
print("URL does not exist on Internet")
# else Specified as "branch-commit_id" / "branch-latest"
firmware_url = ""
ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x")
ap_version = ap_ssh.get_ap_version_ucentral()
response = self.fw_client.get_latest_fw(model=ap["model"])
# if the target version specified is "branch-latest"
@@ -122,20 +119,16 @@ class Fixtures_2x:
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
upgrade_status.append([ap['serial'], target_revision, current_version, 'skip'])
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="")
allure.attach(name="Skipping Upgrade because AP is already in the target Version",
body="")
break
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
items = list(range(0, 300))
l = len(items)
# self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
# for i, item in enumerate(items):
# # Do stuff...
# time.sleep(0.8)
# # Update Progress Bar
# self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
print("waiting for 300 Sec for Firmware Upgrade")
time.sleep(300)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
@@ -145,8 +138,10 @@ class Fixtures_2x:
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
if current_version == target_revision:
upgrade_status.append([ap['serial'], target_revision, current_version])
print("firmware upgraded successfully: ", target_revision)
else:
upgrade_status.append([ap['serial'], target_revision, current_version])
print("firmware upgraded failed: ", target_revision)
break
if firmware['image'].split("-")[-2] == ap['version'].split('-')[0]:
@@ -166,20 +161,15 @@ class Fixtures_2x:
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
upgrade_status.append([ap['serial'], target_revision, current_version, 'skip'])
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="")
break
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
print("waiting for 300 Sec for Firmware Upgrade")
time.sleep(300)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
@@ -189,19 +179,20 @@ class Fixtures_2x:
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
if current_version == target_revision:
upgrade_status.append([ap['serial'], target_revision, current_version])
print("firmware upgraded successfully: ", target_revision)
else:
upgrade_status.append([ap['serial'], target_revision, current_version])
print("firmware upgraded failed: ", target_revision)
break
# if branch-commit is specified
else:
firmware_list = self.fw_client.get_firmwares(model=ap['model'], branch="", commit_id='')
fw_list = []
# getting the list of firmwares in fw_list that has the commit id specified as an input
for firmware in firmware_list:
if firmware['revision'].split("/")[1].replace(" ", "").split('-')[-1] == ap['version'].split('-')[1]:
if firmware['revision'].split("/")[1].replace(" ", "").split('-')[-1] == ap['version'].split('-')[
1]:
fw_list.append(firmware)
# If there is only 1 commit ID in fw_list
@@ -224,6 +215,7 @@ class Fixtures_2x:
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
upgrade_status.append([ap['serial'], target_revision, current_version, 'skip'])
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="")
break
@@ -233,14 +225,8 @@ class Fixtures_2x:
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(url))
# wait for 300 seconds after firmware upgrade
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
print("waiting for 300 Sec for Firmware Upgrade")
time.sleep(300)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
@@ -250,8 +236,10 @@ class Fixtures_2x:
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
if current_version == target_revision:
upgrade_status.append([ap['serial'], target_revision, current_version])
print("firmware upgraded successfully: ", target_revision)
else:
upgrade_status.append([ap['serial'], target_revision, current_version])
print("firmware upgraded failed: ", target_revision)
break
@@ -283,6 +271,7 @@ class Fixtures_2x:
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
upgrade_status.append([ap['serial'], target_revision, current_version, 'skip'])
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="")
break
@@ -290,15 +279,8 @@ class Fixtures_2x:
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
# Initial call to print 0% progress
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
print("waiting for 300 Sec for Firmware Upgrade")
time.sleep(300)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
@@ -308,13 +290,25 @@ class Fixtures_2x:
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
if current_version == target_revision:
upgrade_status.append([target_revision, current_version])
print("firmware upgraded successfully: ", target_revision)
else:
upgrade_status.append([target_revision, current_version])
print("firmware upgraded failed: ", target_revision)
break
return upgrade_status
def get_ap_cloud_connectivity_status(self, get_configuration, get_apnos):
status_data = []
self.ubus_connection = []
for access_point_info in get_configuration['access_point']:
ap_ssh = get_apnos(access_point_info, sdk="2.x")
status = ap_ssh.get_ucentral_status()
print(status)
status_data.append(status)
connectivity_data = ap_ssh.run_generic_command(cmd="ubus call ucentral status")
self.ubus_connection.append(['Serial Number: ' + access_point_info['serial'], connectivity_data])
return status_data
def get_ap_version(self, get_apnos, get_configuration):
version_list = []
@@ -579,7 +573,7 @@ class Fixtures_2x:
pass
ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name)
ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
stop_ref="stop testcase: " + instance_name)
stop_ref="stop testcase: " + instance_name)
allure.attach(body=ap_logs, name="AP Log: ")
try:
@@ -620,25 +614,3 @@ class Fixtures_2x:
request.addfinalizer(teardown_session)
return test_cases
# Print iterations progress
def printProgressBar(self, iteration, total, prefix='', suffix='', decimals=1, length=100, fill='', printEnd="\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=printEnd)
# Print New Line on Complete
if iteration == total:
print()

View File

@@ -11,7 +11,7 @@ num_stations=1
sdk-customer-id=2
#fIRMWARE Option
build=0
firmware=0
# Influx Params
influx_host=influx.cicd.lab.wlan.tip.build

View File

@@ -6,16 +6,10 @@ import allure
import pytest
import requests
pytestmark = [pytest.mark.test_resources, pytest.mark.sanity,
pytestmark = [pytest.mark.test_resources, pytest.mark.sanity, pytest.mark.uc_sanity,
pytest.mark.sanity_55]
@pytest.mark.fw
def test_firmware(firmware_upgrade):
assert True
@allure.testcase(name="Test Resources", url="")
class TestResources(object):
"""Test Case Class: Test cases to cover resource Connectivity"""
@@ -28,39 +22,133 @@ class TestResources(object):
login_response_json = setup_controller.login_resp.json()
response_code = setup_controller.login_resp.status_code
allure.attach(name="Login Response Code", body=str(response_code))
allure.attach(name="Login Response JSON", body=str(login_response_json))
# if setup_controller.bearer:
# allure.attach(name="Controller Connectivity Success", body="")
# else:
# allure.attach(name="Controller Connectivity Failed", body="")
# pytest.exit("Controller Not Available")
# assert setup_controller.bearer
allure.attach(name="Login Response JSON",
body=str(login_response_json),
attachment_type=allure.attachment_type.JSON)
assert response_code == 200
@pytest.mark.test_access_points_connectivity
@allure.testcase(name="test_access_points_connectivity", url="")
def test_access_points_connectivity(self, test_access_point):
def test_access_points_connectivity(self, test_access_point, fixtures_ver):
"""Test case to verify Access Points Connectivity"""
flag = True
for i in test_access_point:
if "ACTIVE" not in i:
flag = False
if flag is False:
allure.attach(name="Access Point Connectivity Success", body=str(test_access_point))
pytest.exit("Access Point Manager state is not Active")
else:
allure.attach(name="Access Point Connectivity Failed", body=str(test_access_point))
assert flag
data = []
for status in test_access_point:
data.append(status[0])
allure.attach(name="AP - Cloud connectivity info", body=str(fixtures_ver.ubus_connection))
assert False not in data
@pytest.mark.traffic_generator_connectivity
@allure.testcase(name="test_traffic_generator_connectivity", url="")
def test_traffic_generator_connectivity(self, traffic_generator_connectivity, update_report, test_cases):
def test_traffic_generator_connectivity(self, traffic_generator_connectivity):
"""Test case to verify Traffic Generator Connectivity"""
if traffic_generator_connectivity == "5.4.4":
allure.attach(name="LANforge-", body=str(traffic_generator_connectivity))
else:
pytest.exit("LANforgeGUI-5.4.3 is not available")
allure.attach(name="LANforge version", body=str(traffic_generator_connectivity))
assert traffic_generator_connectivity
@allure.testcase(name="Firmware Management", url="")
@pytest.mark.uc_firmware
class TestFMS(object):
@pytest.mark.get_firmware_list
def test_fms_version_list(self, fixtures_ver, get_configuration):
PASS = []
for ap in get_configuration['access_point']:
# get the latest branch
firmware_list = fixtures_ver.fw_client.get_firmwares(model=ap['model'],
branch="",
commit_id='',
limit='10000',
offset='3000')
firmware_list.reverse()
release_list_data = []
for i in firmware_list:
release_list_data.append(str(i['release']))
allure.attach(name="firmware_list", body=str("\n".join(release_list_data)),
attachment_type=allure.attachment_type.JSON)
try:
response = requests.get(ap['version'])
print("URL is valid and exists on the internet")
allure.attach(name="firmware url: ", body=str(ap['version']))
target_revision_commit = ap['version'].split("-")[-2]
target_revision_branch = ap['version'].split("-")[-3]
flag = True
for i in release_list_data:
if target_revision_commit == i.split('-')[-1] and target_revision_branch == i.split('-')[-2]:
print('target firmware : ' + ap['version'] + " is available in FMS : " + i)
allure.attach(name='target firmware : ' + ap['version'] + " is available in FMS : " + i,
body="")
PASS.append(True)
flag = False
if flag:
print('target firmware : ' + ap['version'] + " is not available in FMS : ")
allure.attach(name='target firmware : ' + ap['version'] + " is not available in FMS : ",
body="")
PASS.append(False)
break
except Exception as e:
pass
if ap['version'].split('-')[1] == "latest":
for firmware in firmware_list:
if ap['version'].split('-')[0] == 'release':
version = firmware['revision'].split("/")[1].replace(" ", "").split('-')[1]
if firmware['revision'].split("/")[1].replace(" ", "").split('-')[1].__contains__('v2.'):
print("Target Firmware: \n", firmware)
allure.attach(name="Target firmware : ", body=str(firmware['release']))
break
if firmware['release'].split("-")[-2] == ap['version'].split('-')[0]:
print("Target Firmware: \n", firmware)
allure.attach(name="Target firmware : ", body=str(firmware['release']))
break
else:
flag = True
for firmware in firmware_list:
if ap['version'].split('-')[0] == 'release':
branch = firmware['revision'].split("/")[1].replace(" ", "").split('-')[1]
commit = ap['version'].split('-')[1]
if branch.__contains__('v2.') and commit == firmware['release'].split('-')[-1]:
print("Target Firmware: \n", firmware)
allure.attach(name="Target firmware : ", body=str(firmware['release']))
PASS.append(True)
flag = False
break
if ap['version'].split('-')[1] == firmware['release'].split('-')[-1] and ap['version'].split('-')[
0] == \
firmware['release'].split('-')[-2]:
print('target firmware : ' + ap['version'] + " is available in FMS : " + firmware['release'])
allure.attach(
name='target firmware : ' + ap['version'] + " is available in FMS : " + firmware['release']
, body="")
PASS.append(True)
flag = False
if flag:
print('target firmware : ' + ap['version'] + " is not available in FMS : ")
allure.attach(name='target firmware : ' + ap['version'] + " is not available in FMS : ",
body="")
PASS.append(False)
assert False not in PASS
@pytest.mark.firmware_upgrade
def test_firmware_upgrade_request(self, firmware_upgrade):
assert True
@pytest.mark.test_firmware_ap
def test_firmware_upgrade_status_AP(self, firmware_upgrade):
allure.attach(name="firmware Upgrade Status:", body="")
assert True
@pytest.mark.test_firmware_gw
def test_firmware_upgrade_status_gateway(self, get_apnos, get_configuration, setup_controller):
status = []
for ap in get_configuration['access_point']:
ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x")
ap_version = ap_ssh.get_ap_version_ucentral()
current_version_ap = str(ap_version).split()
data = setup_controller.get_device_by_serial_number(serial_number=ap['serial'])
allure.attach(name=str(data['firmware']) + str(current_version_ap), body="")
status.append(current_version_ap == data['firmware'].split())
assert False not in status