firmware upgrade 2.x :

Input needed can be URL
Input can be <branch>-latest
Input can be <branch>-<commit-id>
branch = release | main/trunk | next | <any other custom branch> eg. CS1

Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
shivamcandela
2021-09-12 00:51:05 +05:30
parent c68b9ebb33
commit 02ce1c7839
3 changed files with 193 additions and 67 deletions

View File

@@ -207,7 +207,7 @@ CONFIGURATION = {
'password': "pumpkin77",
'port': 22,
'jumphost_tty': '/dev/ttyAP1',
'version': "trunk-d6c5e1f"
'version': "next-latest"
}
],
"traffic_generator": {
@@ -284,7 +284,7 @@ CONFIGURATION = {
'port': 22, # 22
'jumphost_tty': '/dev/ttyAP3',
'version': "latest-next",
'version_branch': "trunk-d6c5e1f"
'version_branch': "https://ucentral-ap-firmware.s3.amazonaws.com/20210908-cig_wf188-v2.1.0-rc3-bcd07e4-upgrade.bin"
}
],
"traffic_generator": {

View File

@@ -6,6 +6,8 @@ import os
import json
import time
import requests
""" Environment Paths """
if "libs" not in sys.path:
sys.path.append(f'../libs')
@@ -55,92 +57,214 @@ class Fixtures_2x:
def setup_firmware(self, get_apnos, get_configuration):
# Query AP Firmware
for ap in get_configuration['access_point']:
# If specified as URL
try:
response = requests.get(ap['version'])
print("URL is valid and exists on the internet")
target_revision_commit = ap['version'].split("-")[-2]
ap_version = ap_ssh.get_ap_version_ucentral()
current_version_commit = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if target_revision_commit in current_version_commit:
continue
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(ap['version']))
time.sleep(300)
ap_version = ap_ssh.get_ap_version_ucentral()
current_version_commit = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
if target_revision_commit in current_version_commit:
print("Firmware Upgraded to :", ap_version)
except Exception as e:
print("URL does not exist on Internet")
# else Specified as "branch-commit_id" / "branch-latest"
firmware_url = ""
ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x")
ap_version = ap_ssh.get_ap_version_ucentral()
response = self.fw_client.get_latest_fw(model=ap["model"])
# if the target version specified is latest
# if the target version specified is "branch-latest"
if ap['version'].split('-')[1] == "latest":
# get the latest branch
if 'revision' in list(response.keys()):
version_latest = response['revision']
ap_v = ""
latest_v = ""
try:
ap_v = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
latest_v = str(version_latest).split("/")[1].replace(" ", "")
print(ap_v,
latest_v)
except Exception as e:
pass
print(ap_v == latest_v)
if ap_v == latest_v:
continue
else:
if 'uri' in list(response.keys()):
firmware_url = response['uri']
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware_url))
time.sleep(300)
ap_version = ap_ssh.get_ap_version_ucentral()
ap_v = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
latest_v = str(version_latest).split("/")[1].replace(" ", "")
if ap_v == latest_v:
print("firmware upgraded to latest: ", latest_v)
continue
# firmware upgrade request
else:
# firmware uri is not available
continue
pass
else:
firmware_list = self.fw_client.get_firmwares(model=ap['model'], branch="", commit_id='')
firmware_list.reverse()
for firmware in firmware_list:
if ap['version'].split('-')[0] == 'release':
if firmware['revision'].split("/")[1].replace(" ", "").split('-')[1].__contains__('v2.'):
print("Target Firmware: \n", firmware)
target_revision = firmware['revision'].split("/")[1].replace(" ", "")
# check the current AP Revision before upgrade
ap_version = ap_ssh.get_ap_version_ucentral()
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# print and report the firmware versions before upgrade
allure.attach(name="Before Firmware Upgrade Request: ",
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version")
break
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
time.sleep(300)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# print and report the Firmware versions after upgrade
allure.attach(name="After Firmware Upgrade Request: ",
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
if current_version == target_revision:
print("firmware upgraded successfully: ", target_revision)
else:
print("firmware upgraded failed: ", target_revision)
break
if firmware['image'].split("-")[-2] == ap['version'].split('-')[0]:
print("Target Firmware: \n", firmware)
target_revision = firmware['revision'].split("/")[1].replace(" ", "")
# check the current AP Revision before upgrade
ap_version = ap_ssh.get_ap_version_ucentral()
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# print and report the firmware versions before upgrade
allure.attach(name="Before Firmware Upgrade Request: ",
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version")
break
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
time.sleep(300)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# print and report the Firmware versions after upgrade
allure.attach(name="After Firmware Upgrade Request: ",
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
if current_version == target_revision:
print("firmware upgraded successfully: ", target_revision)
else:
print("firmware upgraded failed: ", target_revision)
break
# if branch-commit is specified
else:
firmware_list = self.fw_client.get_firmwares(model=ap['model'], branch="", commit_id='')
fw_list = []
# getting the list of firmwares in fw_list that has the commit id specified as an input
for firmware in firmware_list:
if firmware['revision'].split("/")[1].replace(" ", "").split('-')[-1] == ap['version'].split('-')[1]:
fw_list.append(firmware)
# If there is only 1 commit ID in fw_list
if len(fw_list) == 1:
print("Target Firmware: \n", fw_list[0])
url = fw_list[0]['uri']
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
target_revision = fw_list[0]['revision'].split("/")[1].replace(" ", "")
# check the current AP Revision before upgrade
ap_version = ap_ssh.get_ap_version_ucentral()
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# print and report the firmware versions before upgrade
allure.attach(name="Before Firmware Upgrade Request: ",
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
continue
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version")
break
# upgrade the firmware in another condition
else:
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(url))
# wait for 300 seconds after firmware upgrade
time.sleep(300)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
print(ap_version)
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
print(current_version, target_revision)
# print and report the Firmware versions after upgrade
allure.attach(name="After Firmware Upgrade Request: ",
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
if current_version == target_revision:
print("firmware upgraded successfully: ", target_revision)
else:
pass
version_fms = firmware['uri'].split('-')[-3] + "-" + firmware['uri'].split('-')[-2]
print("firmware upgraded failed: ", target_revision)
break
if version_fms == ap['version']:
print(firmware)
# for version in firmware_list:
# if ap['version'] in version['revision']:
# print(version)
# firmware_url = version['uri']
# self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware_url))
# time.sleep(300)
# ap_version = ap_ssh.get_ap_version_ucentral()
# ap_v = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# if ap_v in version['revision']:
# print("firmware upgraded successfully: ", version['revision'])
# for revision_fms in revisions['revisions']:
# print(ap['version'] in revision_fms)
# # print(self.fw_client.get_revisions())
# upgrade to a specified version
# if ap['version'] in response['revision']:
# print("shivam", response['revision'], ap['version'])
# print("shivam", response['revision'], ap['version'])
# "TIP-v2.1.0-rc3-bcd07e4" in response['revision'])
continue
# if there are 1+ firmware images in fw_list then check for branch
else:
target_fw = ""
for firmware in fw_list:
if ap['version'].split('-')[0] == 'release':
if firmware['revision'].split("/")[1].replace(" ", "").split('-')[1].__contains__('v2.'):
target_fw = firmware
break
if firmware['image'].split("-")[-2] == ap['version'].split('-')[0]:
target_fw = firmware
break
firmware = target_fw
print("Target Firmware: \n", firmware)
target_revision = firmware['revision'].split("/")[1].replace(" ", "")
# check the current AP Revision before upgrade
ap_version = ap_ssh.get_ap_version_ucentral()
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# print and report the firmware versions before upgrade
allure.attach(name="Before Firmware Upgrade Request: ",
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version")
break
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
time.sleep(300)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
current_version = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
# print and report the Firmware versions after upgrade
allure.attach(name="After Firmware Upgrade Request: ",
body="current revision: " + current_version + "\ntarget revision: " + target_revision)
print("current revision: ", current_version, "\ntarget revision: ", target_revision)
if current_version == target_revision:
print("firmware upgraded successfully: ", target_revision)
else:
print("firmware upgraded failed: ", target_revision)
break
# Compare with the specified one
# if 'latest'

View File

@@ -4,6 +4,7 @@
import allure
import pytest
import requests
pytestmark = [pytest.mark.test_resources, pytest.mark.sanity,
pytest.mark.sanity_55]
@@ -11,6 +12,7 @@ pytestmark = [pytest.mark.test_resources, pytest.mark.sanity,
@pytest.mark.fw
def test_firmware(firmware_upgrade):
assert True