From c807e32a96a8d15fcceb139ea2f240eea2cb7d12 Mon Sep 17 00:00:00 2001 From: shivamcandela Date: Sun, 19 Sep 2021 15:15:31 +0530 Subject: [PATCH] Added FMS and GW Test cases as a part of sanity Signed-off-by: shivamcandela --- libs/apnos/apnos.py | 55 ++++--- libs/controller/controller_2x/controller.py | 29 ++-- tests/conftest.py | 78 +++------ .../ucentral_gateway/test_gatewayservice.py | 16 +- tests/fixtures_1x.py | 12 ++ tests/fixtures_2x.py | 118 +++++--------- tests/pytest.ini | 2 +- tests/test_connectivity.py | 154 ++++++++++++++---- 8 files changed, 256 insertions(+), 208 deletions(-) diff --git a/libs/apnos/apnos.py b/libs/apnos/apnos.py index 6b401553f..d015c0c3f 100644 --- a/libs/apnos/apnos.py +++ b/libs/apnos/apnos.py @@ -17,7 +17,6 @@ import random import paramiko from scp import SCPClient import os -import allure class APNOS: @@ -50,12 +49,10 @@ class APNOS: cmd = "kill " + str(a).replace("b'", "") print(cmd) stdin, stdout, stderr = client.exec_command(cmd) - print(stdout) - client = self.ssh_cli_connect() cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"' stdin, stdout, stderr = client.exec_command(cmd) output = str(stdout.read()) - print(output) + if output.__contains__("False"): cmd = 'mkdir ~/cicd-git/' stdin, stdout, stderr = client.exec_command(cmd) @@ -69,6 +66,7 @@ class APNOS: cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' stdin, stdout, stderr = client.exec_command(cmd) var = str(stdout.read()) + client.close() if var.__contains__("True"): print("APNOS Serial Setup OK") else: @@ -109,6 +107,7 @@ class APNOS: client.close() data = str(data).replace(" ", "").split("\\r\\n") band_info = [] + client.close() for i in data: tmp = [] if i.__contains__("AccessPoint"): @@ -323,13 +322,18 @@ class APNOS: f"cmd --value \"{cmd}\" " stdin, stdout, stderr = client.exec_command(cmd) output = stdout.read() - # print(output, stderr.read()) - connected = False - if "connected" in output.decode('utf-8').splitlines()[2]: - connected = True - # connected = output.decode('utf-8').splitlines()[2] - latest = output.decode('utf-8').splitlines()[3].split(":")[1].replace(" ", "").replace(",", "") - active = output.decode('utf-8').splitlines()[4].split(":")[1].replace(" ", "").replace(",", "") + print(output) + if 'latest' not in str(output): + print("ubus call ucentral status: command has invalid output", str(output)) + connected, latest, active = "Error", "Error1", "Error2" + return connected, latest, active + else: + connected = False + if "connected" in output.decode('utf-8').splitlines()[2]: + connected = True + # connected = output.decode('utf-8').splitlines()[2] + latest = output.decode('utf-8').splitlines()[3].split(":")[1].replace(" ", "").replace(",", "") + active = output.decode('utf-8').splitlines()[4].split(":")[1].replace(" ", "").replace(",", "") client.close() except Exception as e: print(e) @@ -467,6 +471,7 @@ class APNOS: stdin, stdout, stderr = client.exec_command(cmd) output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8') o = output + client.close() return o def gettxpower(self): @@ -489,6 +494,7 @@ class APNOS: name = output.replace("\t", "").splitlines() name.remove('') name.pop(-1) + client.close() return tx_power, name def get_logread(self, start_ref="", stop_ref=""): @@ -574,20 +580,21 @@ class APNOS: if __name__ == '__main__': obj = { - 'model': 'wf188n', - 'mode': 'wifi6', - 'serial': '0000c1018812', - 'jumphost': True, - 'ip': "10.28.3.103", - 'username': "lanforge", - 'password': "pumpkin77", - 'port': 22, - 'jumphost_tty': '/dev/ttyAP1', - 'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/uCentral/cig_wf188/20210729-cig_wf188-v2.0.0-rc2-ec3662e-upgrade.bin" - } + 'model': 'ecw5211', + 'mode': 'wifi5', + 'serial': '68215fda456d', + 'jumphost': True, + 'ip': "localhost", + 'username': "lanforge", + 'password': "pumpkin77", + 'port': 8733, + 'jumphost_tty': "/dev/ttyAP5", + 'version': "release-latest" + } var = APNOS(credentials=obj, sdk="2.x") - a = var.run_generic_command(cmd="wifi status") - print("".join(a)) + a, b, c = var.get_ucentral_status() + print(a, b, c) + # S = 9 # instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) # var.run_generic_command(cmd="logger start testcase: " + instance_name) diff --git a/libs/controller/controller_2x/controller.py b/libs/controller/controller_2x/controller.py index c1c7fafc6..3da043992 100644 --- a/libs/controller/controller_2x/controller.py +++ b/libs/controller/controller_2x/controller.py @@ -233,28 +233,30 @@ class FMSUtils: return {} # 2c3becf - def get_firmwares(self, limit="", model="", latestonly="", branch="", commit_id=""): + def get_firmwares(self, limit="10000", model="", latestonly="", branch="", commit_id="", offset="3000"): deviceType = self.ap_model_lookup(model=model) - params = "limit=" + limit + "&deviceType=" + deviceType + "&latestonly=" + latestonly - - response = self.sdk_client.request(service="fms", command="firmwares/", method="GET", params=params, payload="") - + params = "limit=" + limit + \ + "&deviceType=" + deviceType + \ + "&latestonly=" + latestonly + \ + "offset=" + offset + command = "firmwares/" + response = self.sdk_client.request(service="fms", command=command, method="GET", params=params, payload="") + allure.attach(name=command + params, + body=str(response.status_code) + "\n" + str(response.json()), + attachment_type=allure.attachment_type.JSON) if response.status_code == 200: data = response.json() newlist = sorted(data['firmwares'], key=itemgetter('created')) - print("finding a bug", len(newlist)) - for i in newlist: - print(i['uri']) - print(i['revision']) + # for i in newlist: + # print(i['uri']) + # print(i['revision']) # print(newlist) - self.sdk_client.logout() - pytest.exit("hey") return newlist # print(data) - return "devices" + return "error" class UProfileUtility: @@ -540,7 +542,8 @@ if __name__ == '__main__': } obj = Controller(controller_data=controller) fms = FMSUtils(sdk_client=obj) - fms.get_firmwares() + new = fms.get_firmwares(model='cig_wf194c', offset='3') + print(len(new)) # fms.get_device_set() # model = fms.get_latest_fw(model="eap102") # print(model) diff --git a/tests/conftest.py b/tests/conftest.py index 3a13fc02a..d71821aff 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -83,7 +83,7 @@ def pytest_addoption(parser): parser.addini("influx_token", "Influx Token", default="TCkdATXAbHmNbn4QyNaj43WpGBYxFrzV") parser.addini("influx_bucket", "influx bucket", default="tip-cicd") parser.addini("influx_org", "influx organization", default="tip") - parser.addini("build", "AP Firmware build URL", default="0") + parser.addini(name="firmware", type='string', help="AP Firmware build URL", default="0") parser.addini("cloud_ctlr", "AP Firmware build URL", default="0") parser.addini("num_stations", "Number of Stations/Clients for testing") @@ -224,6 +224,11 @@ def get_configuration(testbed, request): """yields the selected testbed information from lab info file (configuration.py)""" if request.config.getini("cloud_ctlr") != "0": CONFIGURATION[testbed]["controller"]["url"] = request.config.getini("cloud_ctlr") + if request.config.getini("firmware") != "0": + version = request.config.getini("firmware") + version_list = version.split(",") + for i in range(len(CONFIGURATION[testbed]["access_point"])): + CONFIGURATION[testbed]["access_point"][i]["version"] = version_list[i] yield CONFIGURATION[testbed] @@ -258,8 +263,9 @@ def setup_controller(request, get_configuration, test_access_point, add_env_prop @pytest.fixture(scope="session") -def setup_firmware(fixtures_ver): +def setup_firmware(setup_controller): """ Fixture to Setup Firmware with the selected sdk """ + setup_controller.instantiate_firmware() yield True @@ -480,53 +486,23 @@ def get_markers(request, get_security_flags): yield security_dict -# Will be availabe as a test case @pytest.fixture(scope="session") -def test_access_point(request, testbed, get_apnos, get_configuration): +def test_access_point(fixtures_ver, request, get_configuration, get_apnos): """used to check the manager status of AP, should be used as a setup to verify if ap can reach cloud""" - mgr_status = [] - if request.config.getoption("1.x"): - for access_point_info in get_configuration['access_point']: - ap_ssh = get_apnos(access_point_info, sdk="1.x") - status = ap_ssh.get_manager_state() - if "ACTIVE" not in status: - time.sleep(30) - ap_ssh = APNOS(access_point_info) - status = ap_ssh.get_manager_state() - mgr_status.append(status) - else: - # forgit access_point_info in get_configuration['access_point']: - # ap_ssh = get_apnos(access_point_info) - # status = ap_ssh.get_manager_state() - # if "ACTIVE" not in status: - # time.sleep(30) - # ap_ssh = APNOS(access_point_info) - # status = ap_ssh.get_manager_state() - # mgr_status.append(status) - pass - yield mgr_status + status = fixtures_ver.get_ap_cloud_connectivity_status(get_configuration, get_apnos) + def teardown_session(): + data = [] + data.append(False) + for s in status: + data.append(s[0]) + print(data) + if False not in data: + pytest.exit("AP is Not connected to ucentral gw") + allure.attach(name=str(status), body="") -# Not used anymore, needs to depreciate it -@pytest.fixture(scope="session") -def get_lanforge_data(get_configuration): - """depreciate it""" - lanforge_data = {} - if get_configuration['traffic_generator']['name'] == 'lanforge': - lanforge_data = { - "lanforge_ip": get_configuration['traffic_generator']['details']['ip'], - "lanforge-port-number": get_configuration['traffic_generator']['details']['port'], - "lanforge_2dot4g": get_configuration['traffic_generator']['details']['2.4G-Radio'][0], - "lanforge_5g": get_configuration['traffic_generator']['details']['5G-Radio'][0], - "lanforge_2dot4g_prefix": get_configuration['traffic_generator']['details']['2.4G-Station-Name'], - "lanforge_5g_prefix": get_configuration['traffic_generator']['details']['5G-Station-Name'], - "lanforge_2dot4g_station": get_configuration['traffic_generator']['details']['2.4G-Station-Name'], - "lanforge_5g_station": get_configuration['traffic_generator']['details']['5G-Station-Name'], - "lanforge_bridge_port": get_configuration['traffic_generator']['details']['upstream'], - "lanforge_vlan_port": get_configuration['traffic_generator']['details']['upstream'] + ".100", - "vlan": 100 - } - yield lanforge_data + request.addfinalizer(teardown_session) + yield status @pytest.fixture(scope="session") @@ -575,14 +551,6 @@ def lf_tools(get_configuration, testbed): yield obj -@pytest.fixture(scope="session") -def lf_tools(get_configuration, testbed): - """ Create a DUT on LANforge""" - obj = ChamberView(lanforge_data=get_configuration["traffic_generator"]["details"], - testbed=testbed, access_point_data=get_configuration["access_point"]) - yield obj - - @pytest.fixture(scope="session") def setup_influx(request, testbed, get_configuration): """ Setup Influx Parameters: Used in CV Automation""" @@ -645,8 +613,8 @@ def fixtures_ver(request, get_configuration): @pytest.fixture(scope="session") def firmware_upgrade(fixtures_ver, get_apnos, get_configuration): - fixtures_ver.setup_firmware(get_apnos, get_configuration) - yield True + upgrade_status = fixtures_ver.setup_firmware(get_apnos, get_configuration) + yield upgrade_status """ diff --git a/tests/controller_tests/ucentral_gateway/test_gatewayservice.py b/tests/controller_tests/ucentral_gateway/test_gatewayservice.py index b54c4bf96..016631f66 100644 --- a/tests/controller_tests/ucentral_gateway/test_gatewayservice.py +++ b/tests/controller_tests/ucentral_gateway/test_gatewayservice.py @@ -7,11 +7,12 @@ import pytest import json import allure +@pytest.mark.uc_sanityw @allure.feature("SDK REST API") - class TestUcentralGatewayService(object): """ """ + @pytest.mark.sdk_restapi def test_gwservice_listdevices(self, setup_controller): """ @@ -24,7 +25,7 @@ class TestUcentralGatewayService(object): if resp.status_code != 200: assert False devices = json.loads(resp.text) - print (devices) + print(devices) @pytest.mark.sdk_restapi def test_gwservice_createdevice(self, setup_controller): @@ -48,7 +49,7 @@ class TestUcentralGatewayService(object): if resp.status_code != 200: assert False devices = json.loads(resp.text) - print (devices) + print(devices) resp = setup_controller.request("gw", "device/DEADBEEF0011", "GET", None, None) body = resp.url + "," + str(resp.status_code) + ',' + resp.text @@ -83,7 +84,7 @@ class TestUcentralGatewayService(object): if resp.status_code != 200: assert False devices = json.loads(resp.text) - print (devices) + print(devices) payload = {'serialNumber': 'DEADBEEF0011', 'owner': 'pytest'} @@ -100,8 +101,7 @@ class TestUcentralGatewayService(object): assert False device = json.loads(resp.text) - print (device) - + print(device) resp = setup_controller.request("gw", "device/DEADBEEF0011", "DELETE", None, None) body = resp.url + "," + str(resp.status_code) + ',' + resp.text @@ -130,12 +130,10 @@ class TestUcentralGatewayService(object): if resp.status_code != 200: assert False devices = json.loads(resp.text) - print (devices) - + print(devices) resp = setup_controller.request("gw", "device/DEADBEEF0011", "DELETE", None, None) body = resp.url + "," + str(resp.status_code) + ',' + resp.text allure.attach(name="gw get device", body=body) if resp.status_code != 200: assert False - diff --git a/tests/fixtures_1x.py b/tests/fixtures_1x.py index d61b8aa14..5c6e1f250 100644 --- a/tests/fixtures_1x.py +++ b/tests/fixtures_1x.py @@ -60,6 +60,18 @@ class Fixtures_1x: def setup_firmware(self): pass + def get_ap_cloud_connectivity_status(self, get_configuration, get_apnos): + mgr_status = [] + for access_point_info in get_configuration['access_point']: + ap_ssh = get_apnos(access_point_info, sdk="1.x") + status = ap_ssh.get_manager_state() + if "ACTIVE" not in status: + time.sleep(30) + ap_ssh = APNOS(access_point_info) + status = ap_ssh.get_manager_state() + mgr_status.append(status) + return mgr_status + def get_ap_version(self, get_apnos, get_configuration): # version_list = [] # for access_point_info in get_configuration['access_point']: diff --git a/tests/fixtures_2x.py b/tests/fixtures_2x.py index 9abd311ed..22ac7fc9d 100644 --- a/tests/fixtures_2x.py +++ b/tests/fixtures_2x.py @@ -60,8 +60,10 @@ class Fixtures_2x: def setup_firmware(self, get_apnos, get_configuration, request=""): # Query AP Firmware - + upgrade_status = [] for ap in get_configuration['access_point']: + + ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x") # If specified as URL try: response = requests.get(ap['version']) @@ -78,24 +80,19 @@ class Fixtures_2x: items = list(range(0, 300)) l = len(items) - self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) - for i, item in enumerate(items): - # Do stuff... - time.sleep(0.8) - # Update Progress Bar - self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) - ap_version = ap_ssh.get_ap_version_ucentral() current_version_commit = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0] if target_revision_commit in current_version_commit: + upgrade_status.append([ap['serial'], target_revision_commit, current_version_commit]) print("Firmware Upgraded to :", ap_version) + else: + print("firmware upgraded failed: ", target_revision) + upgrade_status.append([ap['serial'],target_revision_commit, current_version_commit]) + break except Exception as e: print("URL does not exist on Internet") - - # else Specified as "branch-commit_id" / "branch-latest" firmware_url = "" - ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x") ap_version = ap_ssh.get_ap_version_ucentral() response = self.fw_client.get_latest_fw(model=ap["model"]) # if the target version specified is "branch-latest" @@ -122,20 +119,16 @@ class Fixtures_2x: # if AP is already in target Version then skip upgrade unless force upgrade is specified if current_version == target_revision: + upgrade_status.append([ap['serial'], target_revision, current_version, 'skip']) print("Skipping Upgrade! AP is already in target version") - allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="") + allure.attach(name="Skipping Upgrade because AP is already in the target Version", + body="") break self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri'])) # wait for 300 seconds after firmware upgrade - items = list(range(0, 300)) - l = len(items) - # self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) - # for i, item in enumerate(items): - # # Do stuff... - # time.sleep(0.8) - # # Update Progress Bar - # self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) + print("waiting for 300 Sec for Firmware Upgrade") + time.sleep(300) # check the current AP Revision again ap_version = ap_ssh.get_ap_version_ucentral() @@ -145,8 +138,10 @@ class Fixtures_2x: body="current revision: " + current_version + "\ntarget revision: " + target_revision) print("current revision: ", current_version, "\ntarget revision: ", target_revision) if current_version == target_revision: + upgrade_status.append([ap['serial'], target_revision, current_version]) print("firmware upgraded successfully: ", target_revision) else: + upgrade_status.append([ap['serial'], target_revision, current_version]) print("firmware upgraded failed: ", target_revision) break if firmware['image'].split("-")[-2] == ap['version'].split('-')[0]: @@ -166,20 +161,15 @@ class Fixtures_2x: # if AP is already in target Version then skip upgrade unless force upgrade is specified if current_version == target_revision: + upgrade_status.append([ap['serial'], target_revision, current_version, 'skip']) print("Skipping Upgrade! AP is already in target version") allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="") break self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri'])) # wait for 300 seconds after firmware upgrade - items = list(range(0, 300)) - l = len(items) - self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) - for i, item in enumerate(items): - # Do stuff... - time.sleep(0.8) - # Update Progress Bar - self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) + print("waiting for 300 Sec for Firmware Upgrade") + time.sleep(300) # check the current AP Revision again ap_version = ap_ssh.get_ap_version_ucentral() @@ -189,19 +179,20 @@ class Fixtures_2x: body="current revision: " + current_version + "\ntarget revision: " + target_revision) print("current revision: ", current_version, "\ntarget revision: ", target_revision) if current_version == target_revision: + upgrade_status.append([ap['serial'], target_revision, current_version]) print("firmware upgraded successfully: ", target_revision) else: + upgrade_status.append([ap['serial'], target_revision, current_version]) print("firmware upgraded failed: ", target_revision) break - - # if branch-commit is specified else: firmware_list = self.fw_client.get_firmwares(model=ap['model'], branch="", commit_id='') fw_list = [] # getting the list of firmwares in fw_list that has the commit id specified as an input for firmware in firmware_list: - if firmware['revision'].split("/")[1].replace(" ", "").split('-')[-1] == ap['version'].split('-')[1]: + if firmware['revision'].split("/")[1].replace(" ", "").split('-')[-1] == ap['version'].split('-')[ + 1]: fw_list.append(firmware) # If there is only 1 commit ID in fw_list @@ -224,6 +215,7 @@ class Fixtures_2x: # if AP is already in target Version then skip upgrade unless force upgrade is specified if current_version == target_revision: + upgrade_status.append([ap['serial'], target_revision, current_version, 'skip']) print("Skipping Upgrade! AP is already in target version") allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="") break @@ -233,14 +225,8 @@ class Fixtures_2x: self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(url)) # wait for 300 seconds after firmware upgrade - items = list(range(0, 300)) - l = len(items) - self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) - for i, item in enumerate(items): - # Do stuff... - time.sleep(0.8) - # Update Progress Bar - self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) + print("waiting for 300 Sec for Firmware Upgrade") + time.sleep(300) # check the current AP Revision again ap_version = ap_ssh.get_ap_version_ucentral() @@ -250,8 +236,10 @@ class Fixtures_2x: body="current revision: " + current_version + "\ntarget revision: " + target_revision) print("current revision: ", current_version, "\ntarget revision: ", target_revision) if current_version == target_revision: + upgrade_status.append([ap['serial'], target_revision, current_version]) print("firmware upgraded successfully: ", target_revision) else: + upgrade_status.append([ap['serial'], target_revision, current_version]) print("firmware upgraded failed: ", target_revision) break @@ -283,6 +271,7 @@ class Fixtures_2x: # if AP is already in target Version then skip upgrade unless force upgrade is specified if current_version == target_revision: + upgrade_status.append([ap['serial'], target_revision, current_version, 'skip']) print("Skipping Upgrade! AP is already in target version") allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="") break @@ -290,15 +279,8 @@ class Fixtures_2x: self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri'])) # wait for 300 seconds after firmware upgrade - # Initial call to print 0% progress - items = list(range(0, 300)) - l = len(items) - self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) - for i, item in enumerate(items): - # Do stuff... - time.sleep(0.8) - # Update Progress Bar - self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) + print("waiting for 300 Sec for Firmware Upgrade") + time.sleep(300) # check the current AP Revision again ap_version = ap_ssh.get_ap_version_ucentral() @@ -308,13 +290,25 @@ class Fixtures_2x: body="current revision: " + current_version + "\ntarget revision: " + target_revision) print("current revision: ", current_version, "\ntarget revision: ", target_revision) if current_version == target_revision: + upgrade_status.append([target_revision, current_version]) print("firmware upgraded successfully: ", target_revision) else: + upgrade_status.append([target_revision, current_version]) print("firmware upgraded failed: ", target_revision) break + return upgrade_status - - + def get_ap_cloud_connectivity_status(self, get_configuration, get_apnos): + status_data = [] + self.ubus_connection = [] + for access_point_info in get_configuration['access_point']: + ap_ssh = get_apnos(access_point_info, sdk="2.x") + status = ap_ssh.get_ucentral_status() + print(status) + status_data.append(status) + connectivity_data = ap_ssh.run_generic_command(cmd="ubus call ucentral status") + self.ubus_connection.append(['Serial Number: ' + access_point_info['serial'], connectivity_data]) + return status_data def get_ap_version(self, get_apnos, get_configuration): version_list = [] @@ -579,7 +573,7 @@ class Fixtures_2x: pass ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name) ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, - stop_ref="stop testcase: " + instance_name) + stop_ref="stop testcase: " + instance_name) allure.attach(body=ap_logs, name="AP Log: ") try: @@ -620,25 +614,3 @@ class Fixtures_2x: request.addfinalizer(teardown_session) return test_cases - - # Print iterations progress - def printProgressBar(self, iteration, total, prefix='', suffix='', decimals=1, length=100, fill='█', printEnd="\r"): - """ - Call in a loop to create terminal progress bar - @params: - iteration - Required : current iteration (Int) - total - Required : total iterations (Int) - prefix - Optional : prefix string (Str) - suffix - Optional : suffix string (Str) - decimals - Optional : positive number of decimals in percent complete (Int) - length - Optional : character length of bar (Int) - fill - Optional : bar fill character (Str) - printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) - """ - percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) - filledLength = int(length * iteration // total) - bar = fill * filledLength + '-' * (length - filledLength) - print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=printEnd) - # Print New Line on Complete - if iteration == total: - print() \ No newline at end of file diff --git a/tests/pytest.ini b/tests/pytest.ini index 3dbe1fd9d..06ee9d19c 100644 --- a/tests/pytest.ini +++ b/tests/pytest.ini @@ -11,7 +11,7 @@ num_stations=1 sdk-customer-id=2 #fIRMWARE Option -build=0 +firmware=0 # Influx Params influx_host=influx.cicd.lab.wlan.tip.build diff --git a/tests/test_connectivity.py b/tests/test_connectivity.py index 0af753f37..a68d727d2 100644 --- a/tests/test_connectivity.py +++ b/tests/test_connectivity.py @@ -6,16 +6,10 @@ import allure import pytest import requests -pytestmark = [pytest.mark.test_resources, pytest.mark.sanity, +pytestmark = [pytest.mark.test_resources, pytest.mark.sanity, pytest.mark.uc_sanity, pytest.mark.sanity_55] -@pytest.mark.fw -def test_firmware(firmware_upgrade): - - assert True - - @allure.testcase(name="Test Resources", url="") class TestResources(object): """Test Case Class: Test cases to cover resource Connectivity""" @@ -28,39 +22,133 @@ class TestResources(object): login_response_json = setup_controller.login_resp.json() response_code = setup_controller.login_resp.status_code allure.attach(name="Login Response Code", body=str(response_code)) - allure.attach(name="Login Response JSON", body=str(login_response_json)) - # if setup_controller.bearer: - # allure.attach(name="Controller Connectivity Success", body="") - # else: - # allure.attach(name="Controller Connectivity Failed", body="") - # pytest.exit("Controller Not Available") - # assert setup_controller.bearer + allure.attach(name="Login Response JSON", + body=str(login_response_json), + attachment_type=allure.attachment_type.JSON) assert response_code == 200 @pytest.mark.test_access_points_connectivity @allure.testcase(name="test_access_points_connectivity", url="") - def test_access_points_connectivity(self, test_access_point): + def test_access_points_connectivity(self, test_access_point, fixtures_ver): """Test case to verify Access Points Connectivity""" - flag = True - for i in test_access_point: - if "ACTIVE" not in i: - flag = False - if flag is False: - allure.attach(name="Access Point Connectivity Success", body=str(test_access_point)) - pytest.exit("Access Point Manager state is not Active") - else: - allure.attach(name="Access Point Connectivity Failed", body=str(test_access_point)) - - assert flag + data = [] + for status in test_access_point: + data.append(status[0]) + allure.attach(name="AP - Cloud connectivity info", body=str(fixtures_ver.ubus_connection)) + assert False not in data @pytest.mark.traffic_generator_connectivity @allure.testcase(name="test_traffic_generator_connectivity", url="") - def test_traffic_generator_connectivity(self, traffic_generator_connectivity, update_report, test_cases): + def test_traffic_generator_connectivity(self, traffic_generator_connectivity): """Test case to verify Traffic Generator Connectivity""" - if traffic_generator_connectivity == "5.4.4": - allure.attach(name="LANforge-", body=str(traffic_generator_connectivity)) - - else: - pytest.exit("LANforgeGUI-5.4.3 is not available") - + allure.attach(name="LANforge version", body=str(traffic_generator_connectivity)) assert traffic_generator_connectivity + + +@allure.testcase(name="Firmware Management", url="") +@pytest.mark.uc_firmware +class TestFMS(object): + + @pytest.mark.get_firmware_list + def test_fms_version_list(self, fixtures_ver, get_configuration): + PASS = [] + for ap in get_configuration['access_point']: + # get the latest branch + firmware_list = fixtures_ver.fw_client.get_firmwares(model=ap['model'], + branch="", + commit_id='', + limit='10000', + offset='3000') + firmware_list.reverse() + release_list_data = [] + for i in firmware_list: + release_list_data.append(str(i['release'])) + allure.attach(name="firmware_list", body=str("\n".join(release_list_data)), + attachment_type=allure.attachment_type.JSON) + try: + response = requests.get(ap['version']) + print("URL is valid and exists on the internet") + allure.attach(name="firmware url: ", body=str(ap['version'])) + target_revision_commit = ap['version'].split("-")[-2] + target_revision_branch = ap['version'].split("-")[-3] + flag = True + for i in release_list_data: + if target_revision_commit == i.split('-')[-1] and target_revision_branch == i.split('-')[-2]: + print('target firmware : ' + ap['version'] + " is available in FMS : " + i) + allure.attach(name='target firmware : ' + ap['version'] + " is available in FMS : " + i, + body="") + PASS.append(True) + flag = False + + if flag: + print('target firmware : ' + ap['version'] + " is not available in FMS : ") + allure.attach(name='target firmware : ' + ap['version'] + " is not available in FMS : ", + body="") + PASS.append(False) + break + except Exception as e: + pass + + if ap['version'].split('-')[1] == "latest": + + for firmware in firmware_list: + if ap['version'].split('-')[0] == 'release': + version = firmware['revision'].split("/")[1].replace(" ", "").split('-')[1] + if firmware['revision'].split("/")[1].replace(" ", "").split('-')[1].__contains__('v2.'): + print("Target Firmware: \n", firmware) + allure.attach(name="Target firmware : ", body=str(firmware['release'])) + break + + if firmware['release'].split("-")[-2] == ap['version'].split('-')[0]: + print("Target Firmware: \n", firmware) + allure.attach(name="Target firmware : ", body=str(firmware['release'])) + break + else: + flag = True + for firmware in firmware_list: + if ap['version'].split('-')[0] == 'release': + branch = firmware['revision'].split("/")[1].replace(" ", "").split('-')[1] + commit = ap['version'].split('-')[1] + if branch.__contains__('v2.') and commit == firmware['release'].split('-')[-1]: + print("Target Firmware: \n", firmware) + allure.attach(name="Target firmware : ", body=str(firmware['release'])) + PASS.append(True) + flag = False + break + if ap['version'].split('-')[1] == firmware['release'].split('-')[-1] and ap['version'].split('-')[ + 0] == \ + firmware['release'].split('-')[-2]: + print('target firmware : ' + ap['version'] + " is available in FMS : " + firmware['release']) + allure.attach( + name='target firmware : ' + ap['version'] + " is available in FMS : " + firmware['release'] + , body="") + PASS.append(True) + flag = False + + if flag: + print('target firmware : ' + ap['version'] + " is not available in FMS : ") + allure.attach(name='target firmware : ' + ap['version'] + " is not available in FMS : ", + body="") + PASS.append(False) + assert False not in PASS + + @pytest.mark.firmware_upgrade + def test_firmware_upgrade_request(self, firmware_upgrade): + assert True + + @pytest.mark.test_firmware_ap + def test_firmware_upgrade_status_AP(self, firmware_upgrade): + allure.attach(name="firmware Upgrade Status:", body="") + assert True + + @pytest.mark.test_firmware_gw + def test_firmware_upgrade_status_gateway(self, get_apnos, get_configuration, setup_controller): + status = [] + for ap in get_configuration['access_point']: + ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x") + ap_version = ap_ssh.get_ap_version_ucentral() + current_version_ap = str(ap_version).split() + data = setup_controller.get_device_by_serial_number(serial_number=ap['serial']) + allure.attach(name=str(data['firmware']) + str(current_version_ap), body="") + status.append(current_version_ap == data['firmware'].split()) + assert False not in status