From 2d1aff55e6537c00b1562672ba468aad15da508b Mon Sep 17 00:00:00 2001 From: shivamcandela Date: Sun, 12 Sep 2021 03:00:40 +0530 Subject: [PATCH] wpa3 enterprise support in configuring ap Signed-off-by: shivamcandela --- libs/controller/controller_2x/controller.py | 6 +- tests/configuration.py | 8 +- tests/fixtures_2x.py | 111 +++++++++++++++++--- 3 files changed, 105 insertions(+), 20 deletions(-) diff --git a/libs/controller/controller_2x/controller.py b/libs/controller/controller_2x/controller.py index 209049513..e7792dbb4 100644 --- a/libs/controller/controller_2x/controller.py +++ b/libs/controller/controller_2x/controller.py @@ -517,8 +517,10 @@ class UProfileUtility: print(self.base_profile_config) resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + print(resp.json()) + print(resp.status_code) self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri) - print(resp.url) + # print(resp.url) resp.close() print(resp) @@ -539,7 +541,7 @@ if __name__ == '__main__': profile.set_radio_config() ssid = {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G", "5G"], "security": "psk", "security_key": "something"} profile.add_ssid(ssid_data=ssid, radius=False) - profile.push_config(serial_number="903cb39d6918") + profile.push_config(serial_number="0000c1018812") # print(profile.get_ssid_info()) # # print(obj.get_devices()) obj.logout() diff --git a/tests/configuration.py b/tests/configuration.py index 7d2dda6a6..f6e07f626 100644 --- a/tests/configuration.py +++ b/tests/configuration.py @@ -207,7 +207,7 @@ CONFIGURATION = { 'password': "pumpkin77", 'port': 22, 'jumphost_tty': '/dev/ttyAP1', - 'version': "next-latest" + 'version': "release-latest" } ], "traffic_generator": { @@ -618,7 +618,7 @@ CONFIGURATION = { RADIUS_SERVER_DATA = { - "ip": "10.10.10.72", + "ip": "10.10.10.180", "port": 1812, "secret": "testing123", "user": "user", @@ -627,8 +627,8 @@ RADIUS_SERVER_DATA = { } RADIUS_ACCOUNTING_DATA = { - "ip": "10.10.10.72", - "port": 1812, + "ip": "10.10.10.180", + "port": 1813, "secret": "testing123", "user": "user", "password": "password", diff --git a/tests/fixtures_2x.py b/tests/fixtures_2x.py index cbf3e4893..ab6fb34a3 100644 --- a/tests/fixtures_2x.py +++ b/tests/fixtures_2x.py @@ -55,7 +55,7 @@ class Fixtures_2x: def disconnect(self): self.controller_obj.logout() - def setup_firmware(self, get_apnos, get_configuration): + def setup_firmware(self, get_apnos, get_configuration, request=""): # Query AP Firmware for ap in get_configuration['access_point']: @@ -63,6 +63,7 @@ class Fixtures_2x: try: response = requests.get(ap['version']) print("URL is valid and exists on the internet") + allure.attach(name="firmware url: ", body=str(ap['version'])) target_revision_commit = ap['version'].split("-")[-2] ap_version = ap_ssh.get_ap_version_ucentral() current_version_commit = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0] @@ -71,7 +72,16 @@ class Fixtures_2x: if target_revision_commit in current_version_commit: continue self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(ap['version'])) - time.sleep(300) + + items = list(range(0, 300)) + l = len(items) + self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) + for i, item in enumerate(items): + # Do stuff... + time.sleep(0.8) + # Update Progress Bar + self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) + ap_version = ap_ssh.get_ap_version_ucentral() current_version_commit = str(ap_version).split("/")[1].replace(" ", "").splitlines()[0] if target_revision_commit in current_version_commit: @@ -95,6 +105,7 @@ class Fixtures_2x: if ap['version'].split('-')[0] == 'release': if firmware['revision'].split("/")[1].replace(" ", "").split('-')[1].__contains__('v2.'): print("Target Firmware: \n", firmware) + allure.attach(name="Target firmware : ", body=str(firmware)) target_revision = firmware['revision'].split("/")[1].replace(" ", "") # check the current AP Revision before upgrade @@ -109,12 +120,19 @@ class Fixtures_2x: # if AP is already in target Version then skip upgrade unless force upgrade is specified if current_version == target_revision: print("Skipping Upgrade! AP is already in target version") - allure.attach(name="Skipping Upgrade because AP is already in the target Version") + allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="") break self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri'])) # wait for 300 seconds after firmware upgrade - time.sleep(300) + items = list(range(0, 300)) + l = len(items) + self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) + for i, item in enumerate(items): + # Do stuff... + time.sleep(0.8) + # Update Progress Bar + self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) # check the current AP Revision again ap_version = ap_ssh.get_ap_version_ucentral() @@ -130,6 +148,7 @@ class Fixtures_2x: break if firmware['image'].split("-")[-2] == ap['version'].split('-')[0]: print("Target Firmware: \n", firmware) + allure.attach(name="Target firmware : ", body=str(firmware)) target_revision = firmware['revision'].split("/")[1].replace(" ", "") @@ -145,12 +164,19 @@ class Fixtures_2x: # if AP is already in target Version then skip upgrade unless force upgrade is specified if current_version == target_revision: print("Skipping Upgrade! AP is already in target version") - allure.attach(name="Skipping Upgrade because AP is already in the target Version") + allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="") break self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri'])) # wait for 300 seconds after firmware upgrade - time.sleep(300) + items = list(range(0, 300)) + l = len(items) + self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) + for i, item in enumerate(items): + # Do stuff... + time.sleep(0.8) + # Update Progress Bar + self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) # check the current AP Revision again ap_version = ap_ssh.get_ap_version_ucentral() @@ -179,6 +205,7 @@ class Fixtures_2x: if len(fw_list) == 1: print("Target Firmware: \n", fw_list[0]) + allure.attach(name="Target firmware : ", body=str(fw_list[0])) url = fw_list[0]['uri'] target_revision = fw_list[0]['revision'].split("/")[1].replace(" ", "") @@ -195,7 +222,7 @@ class Fixtures_2x: # if AP is already in target Version then skip upgrade unless force upgrade is specified if current_version == target_revision: print("Skipping Upgrade! AP is already in target version") - allure.attach(name="Skipping Upgrade because AP is already in the target Version") + allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="") break # upgrade the firmware in another condition @@ -203,7 +230,14 @@ class Fixtures_2x: self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(url)) # wait for 300 seconds after firmware upgrade - time.sleep(300) + items = list(range(0, 300)) + l = len(items) + self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) + for i, item in enumerate(items): + # Do stuff... + time.sleep(0.8) + # Update Progress Bar + self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) # check the current AP Revision again ap_version = ap_ssh.get_ap_version_ucentral() @@ -231,6 +265,7 @@ class Fixtures_2x: break firmware = target_fw print("Target Firmware: \n", firmware) + allure.attach(name="Target firmware : ", body=str(firmware)) target_revision = firmware['revision'].split("/")[1].replace(" ", "") @@ -246,12 +281,21 @@ class Fixtures_2x: # if AP is already in target Version then skip upgrade unless force upgrade is specified if current_version == target_revision: print("Skipping Upgrade! AP is already in target version") - allure.attach(name="Skipping Upgrade because AP is already in the target Version") + allure.attach(name="Skipping Upgrade because AP is already in the target Version", body="") break self.fw_client.upgrade_firmware(serial=ap['serial'], url=str(firmware['uri'])) # wait for 300 seconds after firmware upgrade - time.sleep(300) + + # Initial call to print 0% progress + items = list(range(0, 300)) + l = len(items) + self.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50) + for i, item in enumerate(items): + # Do stuff... + time.sleep(0.8) + # Update Progress Bar + self.printProgressBar(i + 1, l, prefix='Progress:', suffix='Complete', length=50) # check the current AP Revision again ap_version = ap_ssh.get_ap_version_ucentral() @@ -266,9 +310,8 @@ class Fixtures_2x: print("firmware upgraded failed: ", target_revision) break - # Compare with the specified one - # if 'latest' - pass + + def get_ap_version(self, get_apnos, get_configuration): version_list = [] @@ -287,7 +330,6 @@ class Fixtures_2x: print(1, instantiate_profile_obj.sdk_client) vlan_id, mode = 0, 0 parameter = dict(param) - print("hola", parameter) test_cases = {} profile_data = {} @@ -427,6 +469,25 @@ class Fixtures_2x: except Exception as e: print(e) test_cases["wpa2_personal"] = False + if mode == "wpa3_enterprise": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'wpa3' + RADIUS_SERVER_DATA = radius_info + RADIUS_ACCOUNTING_DATA = radius_accounting_info + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j, radius=True, + radius_auth_data=RADIUS_SERVER_DATA, + radius_accounting_data=RADIUS_ACCOUNTING_DATA) + test_cases["wpa3_eap"] = True + except Exception as e: + print(e) + test_cases["wpa3_eap"] = False ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/", sdk="2.x") connected, latest, active = ap_ssh.get_ucentral_status() if connected == False: @@ -560,3 +621,25 @@ class Fixtures_2x: request.addfinalizer(teardown_session) return test_cases + + # Print iterations progress + def printProgressBar(self, iteration, total, prefix='', suffix='', decimals=1, length=100, fill='█', printEnd="\r"): + """ + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : positive number of decimals in percent complete (Int) + length - Optional : character length of bar (Int) + fill - Optional : bar fill character (Str) + printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filledLength = int(length * iteration // total) + bar = fill * filledLength + '-' * (length - filledLength) + print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=printEnd) + # Print New Line on Complete + if iteration == total: + print() \ No newline at end of file