wpa3 enterprise support in configuring ap

Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
shivamcandela
2021-09-12 03:00:40 +05:30
parent 02ce1c7839
commit 2d1aff55e6
3 changed files with 105 additions and 20 deletions

View File

@@ -517,8 +517,10 @@ class UProfileUtility:
print(self.base_profile_config)
resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(),
verify=False, timeout=100)
print(resp.json())
print(resp.status_code)
self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri)
print(resp.url)
# print(resp.url)
resp.close()
print(resp)
@@ -539,7 +541,7 @@ if __name__ == '__main__':
profile.set_radio_config()
ssid = {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G", "5G"], "security": "psk", "security_key": "something"}
profile.add_ssid(ssid_data=ssid, radius=False)
profile.push_config(serial_number="903cb39d6918")
profile.push_config(serial_number="0000c1018812")
# print(profile.get_ssid_info())
# # print(obj.get_devices())
obj.logout()

View File

@@ -207,7 +207,7 @@ CONFIGURATION = {
'password': "pumpkin77",
'port': 22,
'jumphost_tty': '/dev/ttyAP1',
'version': "next-latest"
'version': "release-latest"
}
],
"traffic_generator": {
@@ -618,7 +618,7 @@ CONFIGURATION = {
RADIUS_SERVER_DATA = {
"ip": "10.10.10.72",
"ip": "10.10.10.180",
"port": 1812,
"secret": "testing123",
"user": "user",
@@ -627,8 +627,8 @@ RADIUS_SERVER_DATA = {
}
RADIUS_ACCOUNTING_DATA = {
"ip": "10.10.10.72",
"port": 1812,
"ip": "10.10.10.180",
"port": 1813,
"secret": "testing123",
"user": "user",
"password": "password",

View File

@@ -55,7 +55,7 @@ class Fixtures_2x:
def disconnect(self):
self.controller_obj.logout()
def setup_firmware(self, get_apnos, get_configuration):
def setup_firmware(self, get_apnos, get_configuration, request=""):
# Query AP Firmware
for ap in get_configuration['access_point']:
@@ -63,6 +63,7 @@ class Fixtures_2x:
try:
response = requests.get(ap['version'])
print("URL is valid and exists on the internet")
allure.attach(name="firmware url: ", body=str(ap['version']))
target_revision_commit = ap['version'].split("-")[-2]
ap_version = ap_ssh.get_ap_version_ucentral()
current_version_commit = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
@@ -71,7 +72,16 @@ class Fixtures_2x:
if target_revision_commit in current_version_commit:
continue
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(ap['version']))
time.sleep(300)
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
ap_version = ap_ssh.get_ap_version_ucentral()
current_version_commit = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0]
if target_revision_commit in current_version_commit:
@@ -95,6 +105,7 @@ class Fixtures_2x:
if ap['version'].split('-')[0] == 'release':
if firmware['revision'].split("/")[1].replace(" ", "").split('-')[1].__contains__('v2.'):
print("Target Firmware: \n", firmware)
allure.attach(name="Target firmware : ", body=str(firmware))
target_revision = firmware['revision'].split("/")[1].replace(" ", "")
# check the current AP Revision before upgrade
@@ -109,12 +120,19 @@ class Fixtures_2x:
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="")
break
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
time.sleep(300)
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
@@ -130,6 +148,7 @@ class Fixtures_2x:
break
if firmware['image'].split("-")[-2] == ap['version'].split('-')[0]:
print("Target Firmware: \n", firmware)
allure.attach(name="Target firmware : ", body=str(firmware))
target_revision = firmware['revision'].split("/")[1].replace(" ", "")
@@ -145,12 +164,19 @@ class Fixtures_2x:
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="")
break
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
time.sleep(300)
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
@@ -179,6 +205,7 @@ class Fixtures_2x:
if len(fw_list) == 1:
print("Target Firmware: \n", fw_list[0])
allure.attach(name="Target firmware : ", body=str(fw_list[0]))
url = fw_list[0]['uri']
target_revision = fw_list[0]['revision'].split("/")[1].replace(" ", "")
@@ -195,7 +222,7 @@ class Fixtures_2x:
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="")
break
# upgrade the firmware in another condition
@@ -203,7 +230,14 @@ class Fixtures_2x:
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(url))
# wait for 300 seconds after firmware upgrade
time.sleep(300)
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
@@ -231,6 +265,7 @@ class Fixtures_2x:
break
firmware = target_fw
print("Target Firmware: \n", firmware)
allure.attach(name="Target firmware : ", body=str(firmware))
target_revision = firmware['revision'].split("/")[1].replace(" ", "")
@@ -246,12 +281,21 @@ class Fixtures_2x:
# if AP is already in target Version then skip upgrade unless force upgrade is specified
if current_version == target_revision:
print("Skipping Upgrade! AP is already in target version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version")
allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="")
break
self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri']))
# wait for 300 seconds after firmware upgrade
time.sleep(300)
# Initial call to print 0% progress
items = list(range(0, 300))
l = len(items)
self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)
for i, item in enumerate(items):
# Do stuff...
time.sleep(0.8)
# Update Progress Bar
self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50)
# check the current AP Revision again
ap_version = ap_ssh.get_ap_version_ucentral()
@@ -266,9 +310,8 @@ class Fixtures_2x:
print("firmware upgraded failed: ", target_revision)
break
# Compare with the specified one
# if 'latest'
pass
def get_ap_version(self, get_apnos, get_configuration):
version_list = []
@@ -287,7 +330,6 @@ class Fixtures_2x:
print(1, instantiate_profile_obj.sdk_client)
vlan_id, mode = 0, 0
parameter = dict(param)
print("hola", parameter)
test_cases = {}
profile_data = {}
@@ -427,6 +469,25 @@ class Fixtures_2x:
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
if mode == "wpa3_enterprise":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'wpa3'
RADIUS_SERVER_DATA = radius_info
RADIUS_ACCOUNTING_DATA = radius_accounting_info
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j, radius=True,
radius_auth_data=RADIUS_SERVER_DATA,
radius_accounting_data=RADIUS_ACCOUNTING_DATA)
test_cases["wpa3_eap"] = True
except Exception as e:
print(e)
test_cases["wpa3_eap"] = False
ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/", sdk="2.x")
connected, latest, active = ap_ssh.get_ucentral_status()
if connected == False:
@@ -560,3 +621,25 @@ class Fixtures_2x:
request.addfinalizer(teardown_session)
return test_cases
# Print iterations progress
def printProgressBar(self, iteration, total, prefix='', suffix='', decimals=1, length=100, fill='', printEnd="\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=printEnd)
# Print New Line on Complete
if iteration == total:
print()