From 6bc8d9edbe30a0e725a0eef0ebe726d24c02ad5f Mon Sep 17 00:00:00 2001 From: yadavnikita <75775450+yadavnikita@users.noreply.github.com> Date: Wed, 8 Jun 2022 17:45:15 +0530 Subject: [PATCH] Wifi 8295 (#533) * adding dependencies bash file Signed-off-by: Nikita Yadav * added cc.1 option and fixture version Signed-off-by: Nikita Yadav * added controller 3x files Signed-off-by: Nikita Yadav * calling controller logic Signed-off-by: Nikita Yadav * added more functions Signed-off-by: Nikita Yadav * added cc.1 functionality to different fixtures Signed-off-by: Nikita Yadav * added cc.1 fixture to setup_profile Signed-off-by: Nikita Yadav * cc.1 fixture Signed-off-by: Nikita Yadav * lanforge scripts Signed-off-by: Nikita Yadav * test function Signed-off-by: Nikita Yadav * added realtive path and more function to controller access Signed-off-by: Nikita Yadav * testcase Signed-off-by: Nikita Yadav * added wpa2 functionality Signed-off-by: Nikita Yadav * get slot id and ssid name Signed-off-by: Nikita Yadav * made change to add_env_properties Signed-off-by: Nikita Yadav * added fixture line in setup_profiles Signed-off-by: Nikita Yadav * added fixture line in setup_profiles Signed-off-by: Nikita Yadav * added accurate way of creating wlan Signed-off-by: Nikita Yadav * added controller log option Signed-off-by: Nikita Yadav * added 2g options Signed-off-by: Nikita Yadav * added cc option to add tations Signed-off-by: Nikita Yadav * added cc option to add tations Signed-off-by: Nikita Yadav * added cc option Signed-off-by: Nikita Yadav * added cc option Signed-off-by: Nikita Yadav * adding lanforge changes Signed-off-by: Nikita Yadav * create non mesh dut Signed-off-by: Nikita Yadav * cal bssid 2g and 5g method Signed-off-by: Nikita Yadav * edited get ap config slot and controller log Signed-off-by: Nikita Yadav * adddd tst2g and test5g testcase Signed-off-by: Nikita Yadav * added modification to fixture3x Signed-off-by: Nikita Yadav * testbed info Signed-off-by: Nikita Yadav * added everything Signed-off-by: Nikita Yadav * remove laforge scripts Signed-off-by: Nikita Yadav * sync lanforge-scripts Signed-off-by: Nikita Yadav * comment out multi test Signed-off-by: Nikita Yadav * changed class name Signed-off-by: Nikita Yadav * remove roam directory Signed-off-by: Nikita Yadav * optimized code by calling basic_roam function from lftest Signed-off-by: Nikita Yadav * checking bssids after creating wlan Signed-off-by: Nikita Yadav * added time calculation, rssi and cx-time reporting Signed-off-by: Nikita Yadav * created layer3 Signed-off-by: Nikita Yadav * changed station name Signed-off-by: Nikita Yadav * added rx packets Signed-off-by: Nikita Yadav * increased wait time Signed-off-by: Nikita Yadav * chaged logic for calculating 5g and 2g bssid also added 11r_log method Signed-off-by: Nikita Yadav * aadded create_n_client,json_get,create_l3x,get_cx,get_l3_values,attachfile,basic_roam,multi_roam methods Signed-off-by: Nikita Yadav * adde lf_report() fixture Signed-off-by: Nikita Yadav * added test basic roam 5g Signed-off-by: Nikita Yadav * added roam setup Signed-off-by: Nikita Yadav * added report library Signed-off-by: Nikita Yadav * added multi_roam Signed-off-by: Nikita Yadav * added dtim function Signed-off-by: Nikita Yadav * added start sniffer stop sniff and query data Signed-off-by: Nikita Yadav * made it generic to support dtim and 11r Signed-off-by: Nikita Yadav * added logic code for hard roam and query sniff Signed-off-by: Nikita Yadav * added multi hard roam Signed-off-by: Nikita Yadav * uncommenting Signed-off-by: Nikita Yadav * added cisco-lab-02 Signed-off-by: Nikita Yadav * added ft over ds config Signed-off-by: Nikita Yadav * added ftotd_psk method Signed-off-by: Nikita Yadav * added something Signed-off-by: Nikita Yadav * added classsification of tests Signed-off-by: Nikita Yadav * modified multi_hard_roam Signed-off-by: Nikita Yadav * iteration, client and roaming delay created Signed-off-by: Nikita Yadav * hard roam 5g method Signed-off-by: Nikita Yadav * hard roam 2g method Signed-off-by: Nikita Yadav * added 0db0 ap info to testbed-02 Signed-off-by: Nikita Yadav * added 6e option and commentes while loop Signed-off-by: Nikita Yadav * enable_ft_sae Signed-off-by: Nikita Yadav * duration for roam Signed-off-by: Nikita Yadav * duration in 5g function Signed-off-by: Nikita Yadav * sae option Signed-off-by: Nikita Yadav * otd 5g test case Signed-off-by: Nikita Yadav * ota 6g testcase Signed-off-by: Nikita Yadav * Fixed the path for import Signed-off-by: shivam * controller.py: enable_ft_dot1x_wpa3, set_channel_width, set_channel Signed-off-by: Nikita Yadav * dot1x 6g testcase Signed-off-by: Nikita Yadav * updated lab_info Signed-off-by: Nikita Yadav * updated channel width and channel Signed-off-by: Nikita Yadav * updated hard roam Signed-off-by: Nikita Yadav * 2g 5g and 6g testcases in one file Signed-off-by: Nikita Yadav * string var in duration Signed-off-by: Nikita Yadav * 802.1x in multi hard roam Signed-off-by: Nikita Yadav * removing 5g testcase Signed-off-by: Nikita Yadav * removing 6g testcase Signed-off-by: Nikita Yadav * report.py: table using pandas Signed-off-by: Nikita Yadav * lf_test.py: added function of hard roam Signed-off-by: Nikita Yadav * hard roam using lanforge test function testcase Signed-off-by: Nikita Yadav * add dut name to hard roam function Signed-off-by: Nikita Yadav * add dut name to hard roam 5g Signed-off-by: Nikita Yadav * controller.py: enable_ft_dot1x_sha256_wpa3 Signed-off-by: Nikita Yadav * lf_test.py: added pass fail Signed-off-by: Nikita Yadav * lf_test.py: twog radio added to hard roam class Signed-off-by: Nikita Yadav * test_roam_ota.py: 2g, 5g, 6g testcase modified Signed-off-by: Nikita Yadav * fixtures_3x.py: ft-dot1x-sha256 feature added Signed-off-by: Nikita Yadav * lab_info.json: lab info updtaed Signed-off-by: Nikita Yadav * reports: remove report directory Signed-off-by: Nikita Yadav * controller.py:show_wireless_client_sum Signed-off-by: Nikita Yadav * lf_hard_roam: modified Signed-off-by: Nikita Yadav * disble enable all bands fixtures Signed-off-by: Nikita Yadav * lf_report to cc_1 Signed-off-by: Nikita Yadav * ota test case Signed-off-by: Nikita Yadav * mdification Signed-off-by: Nikita Yadav * lf_tests: identity and pass Signed-off-by: Nikita Yadav * conftest: varg Signed-off-by: Nikita Yadav * testcase correction Signed-off-by: Nikita Yadav * lab file Signed-off-by: Nikita Yadav * controller.py: enable_ft_dot1x_sha256_wpa3 enable radius Signed-off-by: Nikita Yadav * radius info Signed-off-by: Nikita Yadav * deleted all unused functions Signed-off-by: Nikita Yadav * modification to tetcase Signed-off-by: Nikita Yadav * radius data missing Signed-off-by: Nikita Yadav * remove example.py Signed-off-by: Nikita Yadav * remove lanforge reports Signed-off-by: Nikita Yadav * updated lab json Signed-off-by: Nikita Yadav * remove lanforge_log_0.txt Signed-off-by: Nikita Yadav * remove lanforge_log_1.txt Signed-off-by: Nikita Yadav * remove report.py Signed-off-by: Nikita Yadav * use pull_report Signed-off-by: Nikita Yadav * make it as master * remove cc dp Signed-off-by: Nikita Yadav * remove unwanted thing Signed-off-by: Nikita Yadav * remove unwanted things Signed-off-by: Nikita Yadav * Added pdfkit,matplotlib Signed-off-by: jitendracandela * Resolved the key error Signed-off-by: jitendracandela Co-authored-by: shivam Co-authored-by: jitendracandela --- dependencies.bash | 9 + libs/controller/controller_3x/__init__.py | 0 libs/controller/controller_3x/controller.py | 741 ++++++++++++++++++ libs/lanforge/lf_tests.py | 368 +++++++++ libs/lanforge/lf_tools.py | 34 +- requirements.txt | 4 +- tests/conftest.py | 178 ++++- tests/e2e/advanced/conftest.py | 19 +- .../roam_test/hard_roam/OTA/__init__.py | 0 .../roam_test/hard_roam/OTA/test_roam_ota.py | 320 ++++++++ .../roam_test/hard_roam/OTD/__init__.py | 0 .../hard_roam/OTD/test_5g_roam_otd.py | 76 ++ .../advanced/roam_test/hard_roam/__init__.py | 0 .../roam_by_attenuation/test_bridge _roam.py | 232 ------ .../test_bridge_5g_to_5g.py | 170 ++++ .../test_bridge_single_client.py | 208 +++++ tests/e2e/basic/conftest.py | 11 +- tests/fixtures_3x.py | 400 ++++++++++ 18 files changed, 2507 insertions(+), 263 deletions(-) create mode 100755 dependencies.bash create mode 100644 libs/controller/controller_3x/__init__.py create mode 100644 libs/controller/controller_3x/controller.py create mode 100644 tests/e2e/advanced/roam_test/hard_roam/OTA/__init__.py create mode 100644 tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py create mode 100644 tests/e2e/advanced/roam_test/hard_roam/OTD/__init__.py create mode 100644 tests/e2e/advanced/roam_test/hard_roam/OTD/test_5g_roam_otd.py create mode 100644 tests/e2e/advanced/roam_test/hard_roam/__init__.py delete mode 100644 tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge _roam.py create mode 100644 tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge_5g_to_5g.py create mode 100644 tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge_single_client.py create mode 100644 tests/fixtures_3x.py diff --git a/dependencies.bash b/dependencies.bash new file mode 100755 index 000000000..c057d4208 --- /dev/null +++ b/dependencies.bash @@ -0,0 +1,9 @@ +#!/bin/bash + + +if [ -d ../lanforge-scripts ] +then + rm -fr lanforge/lanforge-scripts + + cp -a ../lanforge-scripts lanforge/lanforge-scripts +fi \ No newline at end of file diff --git a/libs/controller/controller_3x/__init__.py b/libs/controller/controller_3x/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/libs/controller/controller_3x/controller.py b/libs/controller/controller_3x/controller.py new file mode 100644 index 000000000..4a483a109 --- /dev/null +++ b/libs/controller/controller_3x/controller.py @@ -0,0 +1,741 @@ +import sys +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit() + +import logging +import importlib +import subprocess +import os + +sys.path.append(os.path.join(os.path.abspath("../../../lanforge/lanforge-scripts/"))) +logger = logging.getLogger(__name__) +lf_logger_config = importlib.import_module("py-scripts.lf_logger_config") + +class CController: + def __init__(self, controller_data, ap_data=None, timeout=None, ssid_data=None, type=None): + self.controller_data = controller_data + self.ap_data = ap_data + self.type = type + print("type", type) + print(self.ap_data) + self.ip = self.controller_data["ip"] + self.user = self.controller_data["username"] + self.password = self.controller_data["password"] + self.port = self.controller_data["ssh_port"] + self.type = self.controller_data["series"] + self.prompt = self.controller_data["prompt"] + # for i in range(controller_data["ap_name"]): + # self.ap_name = self.controller_data["ap_name"][i] + # for band in range(controller_data["band"]): + # self.band = self.controller_data["band"][i] + # self.ap_name = self.controller_data["ap_name"][0]["ap_name"] # hard coded + self.ap_name = None + if self.ap_name == None: + self.ap_name = self.ap_data[0]['ap_name'] + if type == 0: + print("yes") + self.ap_name = self.ap_data[0]['ap_name'] + print(self.ap_data[0]['ap_name']) + if type == 1: + self.ap_name = self.ap_data[1]['ap_name'] + print(self.ap_data[1]['ap_name']) + + print("ap_name ", self.ap_name) + self.band = self.controller_data["band"][0] + + self.scheme = self.controller_data["scheme"] + self.timeout = timeout + self.ssid_data= ssid_data + print("ssid data", self.ssid_data) + + series = importlib.import_module("cc_module_9800_3504") + self.cc = series.create_controller_series_object( + scheme=self.scheme, + dest=self.ip, + user=self.user, + passwd=self.password, + prompt=self.prompt, + series=self.type, + ap=self.ap_name, + port=self.port, + band=self.band, + timeout=self.timeout) + self.cc.bandwidth = None + if ssid_data is None: + self.cc.wlan = None + self.cc.wlanID = None + self.cc.wlanSSID = None + self.cc.security_key = None + else: + for i in range(len(ssid_data)): + print(i) + if ssid_data[i]["appliedRadios"] == ["2G"]: + self.cc.wlan = ssid_data[i]['ssid_name'] + self.cc.wlanID = "1" + self.cc.wlanSSID = ssid_data[i]['ssid_name'] + self.cc.security_key = ssid_data[i]['security_key'] + print("ss", self.cc.wlan) + elif ssid_data[i]["appliedRadios"] == ["5G"]: + self.cc.wlan = ssid_data[i]['ssid_name'] + self.cc.wlanID = "2" # hard coded + self.cc.wlanSSID = ssid_data[i]['ssid_name'] + self.cc.security_key = ssid_data[i]['security_key'] + print("ss", self.cc.wlan) + elif ssid_data[i]["appliedRadios"] == ["6G"]: + self.cc.wlan = ssid_data[i]['ssid_name'] + self.cc.wlanID = "3" + self.cc.wlanSSID = ssid_data[i]['ssid_name'] + self.cc.security_key = ssid_data[i]['security_key'] + + self.cc.wlanpw = None + if type == 0: + self.cc.tag_policy = self.ap_data[0]['tag_policy'] + self.cc.policy_profile = self.ap_data[0]['policy_profile'] + if type == 1: + self.cc.tag_policy = self.ap_data[1]['tag_policy'] + self.cc.policy_profile = self.ap_data[1]['policy_profile'] + self.cc.tx_power = None + self.cc.channel = None + self.cc.bandwidth = None + self.cc.action = None + self.cc.value = None + self.cc.command = [] + self.cc.command_extend = [] + self.cc.pwd = "../lanforge/lanforge-scripts" + + + def no_logging_console(self): + log = self.cc.no_logging_console() + print(log) + return log + + def line_console(self): + line = self.cc.line_console_0() + return line + + def show_shutdown_5ghz_ap(self): + self.cc.ap_band_slot = "2" + fiveghz = self.cc.show_ap_dot11_5gz_shutdown() + print(fiveghz) + return fiveghz + + def show_shutdown_2ghz_ap(self): + fiveghz = self.cc.show_ap_dot11_24gz_shutdown() + return fiveghz + + def show_shutdown_6ghz_ap(self): + self.cc.ap_band_slot = "3" + sixg = self.cc.show_ap_dot11_6gz_shutdown() + print(sixg) + return sixg + + + def disable_wlan(self, wlan): + self.cc.wlan = wlan + print(wlan) + print("disable wlan") + print("wlan", wlan) + wlan1 = self.cc.wlan_shutdown() + return wlan1 + + def ap_5ghz_shutdown(self): + print("shutdown 5ghz network") + shut = self.cc.ap_dot11_5ghz_shutdown() + return shut + + def ap_2ghz_shutdown(self): + print("shutdown 2ghz network") + shut = self.cc.ap_dot11_24ghz_shutdown() + return shut + + def ap_6ghz_shutdown(self): + print("shut down 6ghz network") + shut = self.cc.ap_dot11_6ghz_shutdown() + return shut + + def no_ap_5ghz_shutdown(self): + print("no shutdown 5ghz network") + shut = self.cc.config_no_ap_dot11_5ghz_shutdown() + return shut + + def no_ap_2ghz_shutdown(self): + print("shutdown 2ghz network") + shut = self.cc.config_no_ap_dot11_24ghz_shutdown() + return shut + + def no_ap_6ghz_shutdown(self): + print("shut down 6ghz network") + shut = self.cc.config_no_ap_dot11_6ghz_shutdown + return shut + + def enable_all_bands(self): + print("enable all bands") + self.no_ap_5ghz_shutdown() + self.no_ap_2ghz_shutdown() + self.no_ap_6ghz_shutdown() + + def get_ssids(self): + print("show ssid's present") + wlan_summary = self.cc.show_wlan_summary() + print(wlan_summary) + return wlan_summary + + def delete_wlan(self, ssid): + print("delete wlan") + self.cc.wlan = ssid + wlan = self.cc.config_no_wlan() + return wlan + + def create_wlan_wpa2(self,id, wlan, wlanssid, key): + print("create a new wpa2 wlan") + self.cc.wlan = wlan + self.cc.wlanID = id + self.cc.wlanSSID = wlanssid + self.cc.security_key = key + ssid = self.cc.config_wlan_wpa2() + return ssid + + def create_wlan_wpa3(self,id, wlan, wlanssid, key): + self.cc.wlan = wlan + self.cc.wlanID = id + self.cc.wlanSSID = wlanssid + self.cc.security_key = key + ssid = self.cc.config_wlan_wpa3() + return ssid + + def config_wireless_tag_policy_and_policy_profile(self, wlan): + self.cc.wlan = wlan + policy = self.cc.config_wireless_tag_policy_and_policy_profile() + return policy + + def enable_wlan(self, wlan): + self.cc.wlan = wlan + enable = self.cc.config_enable_wlan_send_no_shutdown() + return enable + + def enable_5ghz_netwrk(self, id, wlan, wlanssid, key): + self.cc.wlan = wlan + self.cc.wlanID = id + self.cc.wlanSSID = wlanssid + self.cc.security_key = key + en_net = self.cc.config_no_ap_dot11_5ghz_shutdown() + return en_net + + def enable_2ghz_netwrk(self, id, wlan, wlanssid, key): + self.cc.wlan = wlan + self.cc.wlanID = id + self.cc.wlanSSID = wlanssid + self.cc.security_key = key + en_net = self.cc.config_no_ap_dot11_24ghz_shutdown() + return en_net + + def enable_6ghz_netwrk(self, id, wlan, wlanssid, key): + self.cc.wlan = wlan + self.cc.wlanID = id + self.cc.wlanSSID = wlanssid + self.cc.security_key = key + en_net = self.cc.config_no_ap_dot11_6ghz_shutdown() + return en_net + + def enable_ap_5ghz(self): + ap = self.cc.config_ap_no_dot11_5ghz_shutdown() + return ap + + def enable_ap_2ghz(self): + ap = self.cc.config_ap_no_dot11_24ghz_shutdown() + return ap + + def enable_ap_6ghz(self): + ap = self.cc.config_ap_no_dot11_6ghz_shutdown() + return ap + + + def show_5ghz_summary(self): + sum= self.cc.show_ap_dot11_5gz_summary() + return sum + + def show_2ghz_summary(self): + sum= self.cc.show_ap_dot11_24gz_summary() + return sum + + def show_6ghz_summary(self): + sum= self.cc.show_ap_dot11_6gz_summary() + return sum + + def check_admin_state_2ghz(self, ap_name): + summ = self.show_2ghz_summary() + print(sum) + ele_list = [y for y in (x.strip() for x in summ.splitlines()) if y] + print("ele_list", ele_list) + indices = [i for i, s in enumerate(ele_list) if str(ap_name) in s] + print("indices", indices) + y = ele_list[indices[3]] + list_ = [] + list_.append(y) + z = list_[0].split(" ") + state = None + if "Down" in z: + print("yes") + state = "Down" + if "Up" in z: + print("ap is up") + state = "Up" + return state + + def check_admin_state_5ghz(self, ap_name): + summ = self.show_5ghz_summary() + print(summ) + ele_list = [y for y in (x.strip() for x in summ.splitlines()) if y] + print(ele_list) + indices = [i for i, s in enumerate(ele_list) if str(ap_name) in s] + print(indices) + y = ele_list[indices[3]] + list_ = [] + list_.append(y) + z = list_[0].split(" ") + state = None + if "Down" in z: + print("yes") + state = "Down" + if "Up" in z: + print("ap is up") + state = "Up" + return state + + def check_admin_state_6ghz(self, ap_name): + summ = self.show_6ghz_summary() + print(sum) + ele_list = [y for y in (x.strip() for x in summ.splitlines()) if y] + print("ele_list", ele_list) + indices = [i for i, s in enumerate(ele_list) if str(ap_name) in s] + print("indices", indices) + # print(ele_list[]) + y = ele_list[indices[3]] + list_ = [] + list_.append(y) + z = list_[0].split(" ") + state = None + if "Down" in z: + print("yes") + state = "Down" + if "Up" in z: + print("ap is up") + state = "Up" + return state + + def create_wlan_open(self): + open = self.cc.config_wlan_open() + return open + + def get_number_of_wlan_present(self): + wlan_summary = self.cc.show_wlan_summary() + # value = wlan_summary.decode("utf-8") + ele_list = [y for y in (x.strip() for x in wlan_summary.splitlines()) if y] + indices = [i for i, s in enumerate(ele_list) if 'Number of WLANs' in s] + number = ele_list[22][17:18].strip() + print(number, "ssid's are present") + return number + # do some formatting here and return actual data + + + + def calculate_data(self, place): + wlan_number = self.get_number_of_wlan_present() + print(wlan_number) + for number in range(len(wlan_number)): + pass + wlan_sumry = self.get_ssids() + ele_list = [y for y in (x.strip() for x in wlan_sumry.splitlines()) if y] + indices = [i for i, s in enumerate(ele_list) if 'Profile Name' in s] + # print(indices) + data = indices[1] + data2 = data + 1 + data3 = data + 2 + data4 = data + 3 + data5 = data + 4 + acc_data = ele_list[int(data)] + acc_data2 = ele_list[int(data2)] + acc_data3 = ele_list[int(data3)] + acc_data4 = ele_list[int(data4)] + acc_data5 = ele_list[int(data5)] + print("data 4 ",acc_data4) + print("data 5",acc_data5) + ident_list = [] + if acc_data == 'ID Profile Name SSID Status Security': + if acc_data2 == "----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------": + id_list = acc_data3.split() + print(id_list) + if id_list[0] == "1": + ident_list.append(id_list[int(place)]) + else: + ident_list.append("0") + id_list2 = acc_data4.split() + print(id_list2) + if id_list2[0] == "2": + ident_list.append(id_list2[int(place)]) + else: + ident_list.append("0") + id_list3 = acc_data5.split() + print("hi",id_list3) + if id_list3[0] == "3": + ident_list.append(id_list3[int(place)]) + print("ident_list 1", ident_list) + elif id_list2[0] == "3": + ident_list.append(id_list2[int(place)]) + print("ident_list 2", ident_list) + elif id_list[0] == "3": + ident_list.append(id_list[int(place)]) + print("ident_list 3", ident_list) + + else: + print("ident_list", ident_list) + ident_list.append("0") + else: + print("There is no Profile name") + # print(ident_list) + return ident_list + + def get_ap_bssid_2g(self): + bssid_2g = self.cc.show_ap_bssid_24ghz() + return bssid_2g + + def get_ap_bssid_5g(self): + bssid_5g = self.cc.show_ap_bssid_5ghz() + return bssid_5g + + def get_ap_bssid_6g(self): + bssid_6g = self.cc.show_ap_bssid_6ghz() + return bssid_6g + + def cal_bssid_2g(self): + wlan_sumry = self.get_ap_bssid_2g() + print("wlan_sumry", wlan_sumry) + ele_list = [y for y in (x.strip() for x in wlan_sumry.splitlines()) if y] + indices = [i for i, s in enumerate(ele_list) if 'BSSID' in s] + data = indices[1] + data2 = data + 1 + data3 = data + 2 + data4 = data + 3 + data5 = data + 4 + data6 = data + 4 + acc_data = ele_list[int(data)] + acc_data2 = ele_list[int(data2)] + acc_data3 = ele_list[int(data3)] + acc_data4 = ele_list[int(data4)] + acc_data5 = ele_list[int(data5)] + acc_data6 = ele_list[int(data6)] + print(acc_data3) + print(acc_data4) + print(acc_data5) + print(acc_data6) + id_list = acc_data3.split() + id_list1 = acc_data4.split() + id_list2 = acc_data5.split() + #print(id_list) + print(id_list, id_list1, id_list2) + wlan_id_list = [] + wlan_bssid = [] + if acc_data == "WLAN ID BSSID": + if acc_data2 == "-------------------------": + if id_list[0] == "1": + wlan_id_list.append(id_list) + wlan_bssid.append(id_list[1]) + elif id_list1[0] == "1": + wlan_id_list.append(id_list1) + wlan_bssid.append(id_list1[1]) + elif id_list2[0] == "1": + wlan_id_list.append(id_list2) + wlan_bssid.append(id_list2[1]) + else: + print("no wlan on slot 1 present") + y = wlan_bssid[0].replace(".", '') + bssid = ':'.join(a + b for a, b in zip(y[::2], y[1::2])) + return bssid + + def cal_bssid_5g(self): + wlan_sumry = self.get_ap_bssid_5g() + ele_list = [y for y in (x.strip() for x in wlan_sumry.splitlines()) if y] + indices = [i for i, s in enumerate(ele_list) if 'BSSID' in s] + data = indices[1] + data2 = data + 1 + data3 = data + 2 + data4 = data + 3 + data5 = data + 4 + data6 = data + 5 + acc_data = ele_list[int(data)] + acc_data2 = ele_list[int(data2)] + acc_data3 = ele_list[int(data3)] + acc_data4 = ele_list[int(data4)] + acc_data5 = ele_list[int(data5)] + acc_data6 = ele_list[int(data6)] + id_list = acc_data3.split() + id_list1 = acc_data4.split() + id_list2 = acc_data5.split() + id_list3 = acc_data6.split() + wlan_id_list = [] + wlan_bssid = [] + if acc_data == "WLAN ID BSSID": + if acc_data2 == "-------------------------": + + # print(id_list) + if id_list[0] == "2": + wlan_id_list.append(id_list) + wlan_bssid.append(id_list[1]) + elif id_list1[0] == "2": + wlan_id_list.append(id_list1) + wlan_bssid.append(id_list1[1]) + elif id_list2[0] == "2": + wlan_id_list.append(id_list2) + wlan_bssid.append(id_list2[1]) + elif id_list3[0] == "2": + wlan_id_list.append(id_list3) + wlan_bssid.append(id_list3[1]) + else: + print("no wlan on slot 2 present") + y = wlan_bssid[0].replace(".", '') + bssid = ':'.join(a + b for a, b in zip(y[::2], y[1::2])) + return bssid + + def cal_bssid_6g(self): + wlan_sumry = self.get_ap_bssid_6g() + print(wlan_sumry) + ele_list = [y for y in (x.strip() for x in wlan_sumry.splitlines()) if y] + indices = [i for i, s in enumerate(ele_list) if 'BSSID' in s] + data = indices[1] + data2 = data + 1 + data3 = data + 2 + data4 = data + 3 + data5 = data + 4 + data6 = data + 5 + acc_data = ele_list[int(data)] + acc_data2 = ele_list[int(data2)] + acc_data3 = ele_list[int(data3)] + acc_data4 = ele_list[int(data4)] + acc_data5 = ele_list[int(data5)] + acc_data6 = ele_list[int(data6)] + id_list = acc_data3.split() + id_list1 = acc_data4.split() + id_list2 = acc_data5.split() + id_list3 = acc_data6.split() + wlan_id_list = [] + wlan_bssid = [] + print(id_list, id_list1 , id_list2) + if acc_data == "WLAN ID BSSID": + if acc_data2 == "-------------------------": + + print(id_list) + if id_list[0] == "3": + print("yes") + wlan_id_list.append(id_list) + wlan_bssid.append(id_list[1]) + elif id_list1[0] == "3": + wlan_id_list.append(id_list1) + wlan_bssid.append(id_list1[1]) + elif id_list2[0] == "3": + wlan_id_list.append(id_list2) + wlan_bssid.append(id_list2[1]) + elif id_list3[0] == "3": + wlan_id_list.append(id_list3) + wlan_bssid.append(id_list3[1]) + else: + print("no wlan on slot 3 present") + y = wlan_bssid[0].replace(".", '') + bssid = ':'.join(a + b for a, b in zip(y[::2], y[1::2])) + return bssid + + + def get_slot_id_wlan(self): + id = self.calculate_data(place=0) + return id + + def get_ssid_name_on_id(self): + ssid = self.calculate_data(place=1) + return ssid + + + def show_ap_summary(self): + summary = self.cc.show_ap_summary() + return summary + + def show_ap_config_slots(self): + slot = self.cc.show_ap_config_slots() + return slot + + # gives info of ap wlan bssid and state + def show_ap_wlan_summary(self): + w_sum = self.cc.show_ap_wlan_summary() + print(w_sum) + return w_sum + + def show_11r_log(self): + show = self.cc.show_11r_logs() + print(show) + return show + + def enable_ft_psk(self, ssid, key): + self.cc.wlan = ssid + self.cc.security_key = key + en = self.cc.enable_ft_psk_cc() + return en + + def enable_ftotd_psk(self, ssid, key): + # ft over the ds + self.cc.wlan = ssid + self.cc.security_key = key + en = self.cc.enable_ftotd_psk_cc() + print(en) + return en + + def enable_ft_sae(self, ssid, key): + self.cc.wlan = ssid + self.cc.security_key = key + en = self.cc.enable_ft_sae_cc() + return en + + def set_dtim_5ghz(self, wlan, value): + self.cc.wlan = wlan + self.value = value + dtim = self.cc.config_dtim_dot11_5ghz() + return dtim + + def set_channel(self, band=None, channel=None, slot=None): + self.cc.channel = channel + self.cc.ap_band_slot = slot + if band == "6g": + channel = self.cc.config_dot11_6ghz_channel() + if band == "5g": + channel = self.cc.config_dot11_5ghz_channel() + if band == "2g": + channel = self.cc.config_dot11_24ghz_channel() + return channel + + def set_channel_width(self, band=None, width=None, slot=None): + bdwth = None + self.cc.bandwidth = width + self.cc.ap_band_slot = slot + if band == "6g": + bdwth = self.cc.config_dot11_6ghz_channel_width() + if band == "5g": + bdwth = self.cc.config_dot11_5ghz_channel_width() + if band == "2g": + bdwth = self.cc.config_dot11_24ghz_channel_width() + return bdwth + + def enable_ft_dot1x_wpa3(self, ssid): + self.cc.wlan = ssid + en = self.cc.enable_ft_dot1x_wpa3_cc() + print(en) + return en + + def enable_ft_dot1x_sha256_wpa3(self, ssid, radius): + self.cc.wlan = ssid + self.cc.value = radius + en = self.cc.enable_ft_dot1x_sha256_wpa3_cc(radius=radius) + print(en) + return en + + def show_wireless_client_sum(self): + en = self.cc.show_wireless_client_sum_cc() + return en + + def get_mc_address(self): + wlan_sumry = self.show_wireless_client_sum() + print(wlan_sumry) + ele_list = [y for y in (x.strip() for x in wlan_sumry.splitlines()) if y] + print(ele_list) + indices = [i for i, s in enumerate(ele_list) if 'MAC Address' in s] + data = indices[1] + data2 = data + 1 + data3 = data + 2 + data4 = data + 3 + # ele_list[data] + y = ele_list[data3] + print(y) + list_ = [] + list_.append(y) + z = list_[0].split(" ") + print(z[0]) + return z[0] + + def show_wireless_client_detail(self): + mac = self.get_mc_address() + detail = self.cc.show_wireless_client_mac_details(mac=mac) + return detail + + +if __name__ == '__main__': + controller = { + "url": "https://172.16.0.2", + "ip": "localhost", + "username": "admin", + "password": "Cisco123", + "ssh_port": "8888", + "series": "9800", + "prompt": "WLC2", + "band": ["5g"], + "scheme": "ssh" + } + access_point = [ + { + "ap_name": "AP687D.B45C.1D1C", + "chamber": "C1", + "model": "cisco9136i", + "mode": "wifi6", + "serial": "FOC25322JQP", + "tag_policy": "RM204-TB2-AP1", + "policy_profile": "default-policy-profile", + "ssid": { + "2g-ssid": "candela2ghz", + "5g-ssid": "open-wlan", + "6g-ssid": "candela6ghz", + "2g-password": "hello123", + "5g-password": "[BLANK]", + "6g-password": "hello123", + "2g-encryption": "WPA2", + "5g-encryption": "open", + "6g-encryption": "WPA3", + "2g-bssid": "68:7d:b4:5f:5c:31 ", + "5g-bssid": "68:7d:b4:5f:5c:3c", + "6g-bssid": "68:7d:b4:5f:5c:38" + }, + + "ip": "192.168.100.109", + "username": "lanforge", + "password": "lanforge", + "port": 22, + "jumphost_tty": "/dev/ttyAP1", + "version": "17.7.1.11" + }] + obj = CController(controller_data=controller, ap_data=access_point, timeout="10", ssid_data=None) + obj.get_ap_bssid_2g() + # x = obj.get_all_ssids_from_controller() + # print(x) + # obj.no_logging_console() + # obj.line_console() + # obj.delete_wlan() + # obj.no_logging_console() + # obj.get_ssids() + # obj.delete_wlan() + # obj.create_wlan_open() + # obj.get_ssids() + # obj.get_number_of_wlan_present() + + +# if __name__ == '__main__': +# logger_config = lf_logger_config.lf_logger_config() +# series = importlib.import_module("cc_module_9800_3504") +# cc = series.create_controller_series_object( +# scheme="ssh", +# dest="localhost", +# user="admin", +# passwd="xyz", +# prompt="WLC2", +# series="9800", +# ap="AP2C57.4152.385C", +# port="8888", +# band="5g", +# timeout="10") +# cc.show_ap_config_slots() +# cc.show_wlan_summary() +# diff --git a/libs/lanforge/lf_tests.py b/libs/lanforge/lf_tests.py index a671f3cbd..48a156a24 100644 --- a/libs/lanforge/lf_tests.py +++ b/libs/lanforge/lf_tests.py @@ -5,6 +5,9 @@ import datetime import sys import os +import time +import datetime +from datetime import datetime import threading import allure @@ -27,6 +30,7 @@ sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity") sys.path.append(f'../libs') sys.path.append(f'../tools') sys.path.append(f'../libs/lanforge/') +sys.path.append(os.path.join(os.path.abspath("../../../lanforge/lanforge-scripts/"))) from sta_connect2 import StaConnect2 import time import string @@ -35,6 +39,7 @@ import csv from datetime import datetime from pull_report import Report from scp_util import SCP_File +import pyshark as ps S = 12 # from eap_connect import EAPConnect @@ -65,6 +70,21 @@ realm = importlib.import_module("py-json.realm") cv_test_reports = importlib.import_module("py-json.cv_test_reports") lf_report = cv_test_reports.lanforge_reports Realm = realm.Realm +from LANforge import LFUtils +from lf_cleanup import lf_clean +from wifi_monitor_profile import WifiMonitor +from sta_scan_test import StaScan +from lf_sniff_radio import SniffRadio +cv_test_reports = importlib.import_module("py-json.cv_test_reports") +lf_report = cv_test_reports.lanforge_reports +from lf_pcap import LfPcap +from lf_hard_roam_test import HardRoam +from lf_csv import lf_csv + + +@allure.step +def nested_step_allure(bssid, rssi): + pass @@ -105,6 +125,7 @@ class RunTest: self.fiveg_radios = configuration_data['traffic_generator']['details']["5G-Radio"] self.ax_radios = configuration_data['traffic_generator']['details']["AX-Radio"] self.upstream_port = configuration_data['traffic_generator']['details']["upstream"].split(".")[2] + self.upstream = configuration_data['traffic_generator']['details']["upstream"] self.upstream_resource = configuration_data['traffic_generator']['details']["upstream"].split(".")[1] self.twog_prefix = configuration_data['traffic_generator']['details']["2.4G-Station-Name"] self.fiveg_prefix = configuration_data['traffic_generator']['details']["5G-Station-Name"] @@ -1192,6 +1213,353 @@ class RunTest: self.Client_disconnect(station_name=station_name) return atten_serial_radio + def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None, + dut_security=None, dut_passwd=None, band=None, radio=None, lf_tools=None, type=None): + + local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + station_profile = local_realm.new_station_profile() + if band == "fiveg": + radio = self.fiveg_radios[0] + if band == "twog": + radio = self.twog_radios[0] + if band == "sixg": + radio = self.ax_radios[0] + + # pre clean + sta_list = lf_tools.get_station_list() + print(sta_list) + if not sta_list: + print("no stations on lanforge") + else: + station_profile.cleanup(sta_list, delay=1) + LFUtils.wait_until_ports_disappear(base_url=local_realm.lfclient_url, + port_list=sta_list, + debug=True) + time.sleep(2) + print("pre cleanup done") + + station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, + end_id_=num_sta - 1, padding_number_=10000, + radio=radio) + + if type == "11r-sae-802.1x": + dut_passwd = "[BLANK]" + station_profile.use_security(dut_security, dut_ssid, dut_passwd) + station_profile.set_number_template("00") + + station_profile.set_command_flag("add_sta", "create_admin_down", 1) + + station_profile.set_command_param("set_port", "report_timer", 1500) + + # connect station to particular bssid + # self.station_profile.set_command_param("add_sta", "ap", self.bssid[0]) + + station_profile.set_command_flag("set_port", "rpt_timer", 1) + if type == "11r": + station_profile.set_command_flag("add_sta", "80211u_enable", 0) + station_profile.set_command_flag("add_sta", "8021x_radius", 1) + station_profile.set_command_flag("add_sta", "disable_roam", 1) + station_profile.set_wifi_extra(key_mgmt="FT-PSK ", + pairwise="", + group="", + psk="", + eap="", + identity="", + passwd="", + pin="" + ) + if type == "11r-sae": + station_profile.set_command_flag("add_sta", "ieee80211w", 2) + station_profile.set_command_flag("add_sta", "80211u_enable", 0) + station_profile.set_command_flag("add_sta", "8021x_radius", 1) + station_profile.set_command_flag("add_sta", "disable_roam", 1) + station_profile.set_wifi_extra(key_mgmt="FT-SAE ", + pairwise="", + group="", + psk="", + eap="", + identity="", + passwd="", + pin="" + ) + + if type == "11r-sae-802.1x": + station_profile.set_command_flag("set_port", "rpt_timer", 1) + station_profile.set_command_flag("add_sta", "ieee80211w", 2) + station_profile.set_command_flag("add_sta", "80211u_enable", 0) + station_profile.set_command_flag("add_sta", "8021x_radius", 1) + station_profile.set_command_flag("add_sta", "disable_roam", 1) + # station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f") + station_profile.set_wifi_extra(key_mgmt="FT-EAP ", + pairwise="[BLANK]", + group="[BLANK]", + psk="[BLANK]", + eap="TTLS", + identity="testuser", + passwd="testpasswd", + pin="" + ) + station_profile.create(radio=radio, sta_names_=station_list) + local_realm.wait_until_ports_appear(sta_list=station_list) + station_profile.admin_up() + if local_realm.wait_for_ip(station_list): + print("All stations got IPs") + return True + else: + print("Stations failed to get IPs") + return False + + def json_get(self, _req_url="/"): + cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, ) + json_response = cli_base.json_get(_req_url=_req_url) + return json_response + + def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, + traffic_type, sta_list,): + # checked + print(sta_list) + print(type(sta_list)) + print(self.upstream) + local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + cx_profile = local_realm.new_l3_cx_profile() + cx_profile.host = self.lanforge_ip + cx_profile.port = self.lanforge_port + layer3_cols = ['name', 'tx bytes', 'rx bytes', 'tx rate', 'rx rate'] + cx_profile.side_a_min_bps = side_a_min_rate + cx_profile.side_a_max_bps = side_a_max_rate + cx_profile.side_b_min_bps = side_b_min_rate + cx_profile.side_b_max_bps = side_b_max_rate + + # create + cx_profile.create(endp_type=traffic_type, side_a=sta_list, + side_b=self.upstream, + sleep_time=0) + cx_profile.start_cx() + + def get_cx_list(self): + local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) + layer3_result = local_realm.cx_list() + layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item] + print(layer3_names) + return layer3_names + + + def get_layer3_values(self, cx_name=None, query=None): + url = f"/cx/{cx_name}" + response = self.json_get(_req_url=url) + result = response[str(cx_name)][str(query)] + return result + + + def crete_file_attach(self, bssid,rssi, rx_rate, rx_bytes, rx_packets, cx_time, filename, name ): + try: + filename = filename + with open(filename, 'w') as f: + lines = ["bssid: " + str(bssid), 'rssi: ' + str(rssi), "rx rate: " + str(rx_rate), + "rx packets: " + str(rx_packets), "rx bytes: " + str(rx_bytes), "connection time: " + str(cx_time)] + for line in lines: + f.write(line) + f.write('\n') + allure.attach.file(name=str(name), source=str(filename)) + except FileNotFoundError: + print("The 'docs' directory does not exist") + + + def hard_roam(self, run_lf, instantiate_profile, lf_tools, lf_reports, get_configuration, test=None, band=None, num_sta=1, + security=None, security_key=None, iteration=1, ssid_name=None, + roaming_delay=None, option=None, channel=36, duration=None, duration_based=False, + iteration_based=True, dut_name=None, identity=None, ttls_pass=None): + allure.attach(name="Test Procedure", body="This test consists of creating a multiple client which will be " \ + " connected to the nearest ap, here the test automation will " \ + "do hard roam based on forced roam method" \ + "check if client performed roam by monitoring client bssid, for n number of iterions") + allure.attach(name="Pass Fail Criteria", + body="Test is said to be pass if it satisfy 4 criterias as - 1 after roam bssid should change , 2- reassociation respone should be present, 3- auth request should be present, 4- roaming time should be less then 50 ms " \ + "other wise the test is said to be failed ") + + # attaching 11r log before tart of test + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=0) + log = instantiate_profile_obj.show_11r_log() + + # get bssid from ap for 2g and 5g + radio = "" + c1_bssid = "" + c2_bssid = "" + if test == "2g": + c1_2g_bssid = "" + c2_2g_bssid = "" + if run_lf: + c1_2g_bssid = get_configuration["access_point"][0]["ssid"]["2g-bssid"] + allure.attach(name="bssid of ap1", body=c1_2g_bssid) + c2_2g_bssid = get_configuration["access_point"][1]["ssid"]["2g-bssid"] + allure.attach(name="bssid of ap2", body=c2_2g_bssid) + + else: + # instantiate controller class and check bssid's for each ap in testbed + for ap_name in range(len(get_configuration['access_point'])): + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=ap_name) + bssid_2g = instantiate_profile_obj.cal_bssid_2g() + if ap_name == 0: + c1_2g_bssid = bssid_2g + if ap_name == 1: + c2_2g_bssid = bssid_2g + c1_bssid = c1_2g_bssid + c2_bssid = c2_2g_bssid + elif test == "5g": + c1_5g_bssid = "" + c2_5g_bssid = "" + if run_lf: + c1_5g_bssid = get_configuration["access_point"][0]["ssid"]["5g-bssid"] + allure.attach(name="bssid of ap1", body=c1_5g_bssid) + c2_5g_bssid = get_configuration["access_point"][1]["ssid"]["5g-bssid"] + allure.attach(name="bssid of ap2", body=c2_5g_bssid) + else: + for ap_name in range(len(get_configuration['access_point'])): + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=ap_name) + bssid_5g = instantiate_profile_obj.cal_bssid_5g() + if ap_name == 0: + c1_5g_bssid = bssid_5g + if ap_name == 1: + c2_5g_bssid = bssid_5g + c1_bssid = c1_5g_bssid + c2_bssid = c2_5g_bssid + + elif test == "6g": + c1_6g_bssid = "" + c2_6g_bssid = "" + if run_lf: + c1_6g_bssid = get_configuration["access_point"][0]["ssid"]["6g-bssid"] + allure.attach(name="bssid of ap1", body=c1_6g_bssid) + c2_6g_bssid = get_configuration["access_point"][1]["ssid"]["6g-bssid"] + allure.attach(name="bssid of ap2", body=c2_6g_bssid) + else: + for ap_name in range(len(get_configuration['access_point'])): + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=ap_name) + bssid_6g = instantiate_profile_obj.cal_bssid_6g() + if ap_name == 0: + c1_6g_bssid = bssid_6g + if ap_name == 1: + c2_6g_bssid = bssid_6g + c1_bssid = c1_6g_bssid + c2_bssid = c2_6g_bssid + + print("bssid of c1 ", c1_bssid) + allure.attach(name="bssid of ap1", body=c1_bssid) + print("bssid of c2", c2_bssid) + allure.attach(name="bssid of ap2", body=c2_bssid) + allure.attach(name="11r logs before roam test", body=str(log)) + fiveg_radio, sixg_radio, twog_radio, sniff_radio = None, None, None, None + supplicant_radio = None + if band == "twog": + twog_radio = self.twog_radios[0] + supplicant_radio = twog_radio.split(".")[2] + radio_ = self.ax_radios[0] + sniff_radio = radio_.split(".")[2] + if band == "fiveg": + fiveg_radio = self.fiveg_radios[0] + supplicant_radio = fiveg_radio.split(".")[2] + radio_ = self.ax_radios[0] + sniff_radio = radio_.split(".")[2] + if band == "sixg": + sixg_radio = self.ax_radios[1] + supplicant_radio = sixg_radio.split(".")[2] + radio_ = self.ax_radios[2] + sniff_radio = radio_.split(".")[2] + obj = HardRoam(lanforge_ip=self.lanforge_ip, + lanforge_port=self.lanforge_port, + lanforge_ssh_port = 22, + c1_bssid=c1_bssid, + c2_bssid=c2_bssid, + fiveg_radio=fiveg_radio, + twog_radio=twog_radio, + sixg_radio=sixg_radio, + band=band, + sniff_radio=sniff_radio, + num_sta=num_sta, + security=security, + security_key=security_key, + ssid=ssid_name, + upstream=self.upstream, + duration=duration, + iteration=iteration, + channel=channel, + option=option, + duration_based=duration_based, + iteration_based=iteration_based, + dut_name = dut_name, + traffic_type="lf_udp", + path="../lanforge/lanforge-scripts", + scheme="ssh", + dest="localhost", + user="admin", + passwd="Cisco123", + prompt="WLC2", + series_cc="9800", + ap="AP687D.B45C.1D1C", + port="8888", + band_cc="5g", + timeout="40", + identity = identity, + ttls_pass = ttls_pass + ) + x = os.getcwd() + print(x) + file = obj.generate_csv() + kernel, message = obj.run(file_n=file) + allure.attach(name="message", body=str(message)) + # file = ["test_client_0.csv"] + report_dir_name = obj.generate_report(csv_list=file, kernel_lst=kernel, current_path=str(x) + "/tests") + print(report_dir_name) + lf_csv_obj = lf_csv() + for i, y in zip(file, range(len(file))): + data = lf_csv_obj.read_csv_row(file_name=str(x) + "/" + str(report_dir_name) + "/csv_data/" + str(i)) + tab = lf_reports.table2(table=data) + allure.attach(name="client " + str(y) + " table", body=str(tab)) + # report_dir_name = "2022-04-30-18-51-07_Hard Roam Test" + relevant_path = report_dir_name + "/" + entries = os.listdir(report_dir_name + "/") + pdf = None + for i in entries: + if ".pdf" in i: + pdf = i + allure.attach.file(source=relevant_path + pdf, name="hard_roam_report") + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=0) + z = instantiate_profile_obj.show_wireless_client_detail() + allure.attach(name="wireless client details", body=str(z)) + log = instantiate_profile_obj.show_11r_log() + allure.attach(name="11r logs after roam test", body=str(log)) + allure.attach(name="test_result_folder", body=str(report_dir_name)) + try: + supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + supplicant_radio + ".txt" + obj = SCP_File(ip=self.lanforge_ip, port=self.lanforge_ssh_port, username="root", password="lanforge", + remote_path=supplicant, + local_path=relevant_path) + obj.pull_file() + # obj.ssh_connect(command="journalctl --since '1 hour ago' > kernel_log.txt") + # kernel_log = "/home/lanforge/kernel_log.txt" + # obj1 = SCP_File(ip=self.lanforge_ip, port=self.lanforge_ssh_port, username="root", password="lanforge", + # remote_path=kernel_log, + # local_path=relevant_path) + # obj1.pull_file() + allure.attach.file(source=relevant_path + "/wpa_supplicant_log_" + supplicant_radio + ".txt", + name="supplicant_log") + except Exception as e: + print(e) def set_radio_country_channel(self,_radio="wiphy0",_channel=0,_country_num=840,): # 840 - US data = { diff --git a/libs/lanforge/lf_tools.py b/libs/lanforge/lf_tools.py index fad14d0d7..4a190a97d 100755 --- a/libs/lanforge/lf_tools.py +++ b/libs/lanforge/lf_tools.py @@ -42,13 +42,13 @@ LOGGER.addHandler(stdout_handler) class ChamberView: - - def __init__(self, lanforge_data=None, access_point_data=None, run_lf=False, debug=True, testbed=None, ap_version=None): + def __init__(self, lanforge_data=None, access_point_data=None, run_lf=False, debug=True, testbed=None, cc_1=False, ap_version=None): print("lanforge data", lanforge_data) print("access point data", access_point_data) self.access_point_data = access_point_data self.access_point_data = access_point_data self.run_lf = run_lf + self.cc_1 = cc_1 print("testbed", testbed) if "type" in lanforge_data.keys(): if lanforge_data["type"] == "Non-mesh": @@ -83,6 +83,7 @@ class ChamberView: self.delete_old_scenario = True if access_point_data: print(len(access_point_data)) + for ap in range(len(access_point_data)): print(access_point_data[ap]) self.dut_name = access_point_data[ap]["model"] @@ -327,11 +328,13 @@ class ChamberView: LOGGER.warning("0 Stations") return idx = idx - if self.run_lf: + if self.run_lf or self.cc_1: if band == "2G": idx = 0 if band == "5G": idx = 1 + + for i in self.dut_idx_mapping: if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band: idx = i @@ -706,6 +709,31 @@ class ChamberView: # [['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58']] self.update_ssid(ssid_data=ssid_data[ssid]) + + def create_non_meh_dut(self, ssid_data=[]): + # print("hi") + for ap, ssid in zip(self.access_point_data, range(len(ssid_data))): + print("ap", ap) + print(ssid_data[ssid]) + self.dut_name = ap["model"] + print(self.dut_name) + self.ap_model = ap["model"] + self.version = ap["version"].split("/")[-1] + self.serial = ap["serial"] + self.CreateDut = DUT(lfmgr=self.lanforge_ip, + port=self.lanforge_port, + dut_name=self.dut_name, + sw_version=self.version, + hw_version=self.ap_model, + model_num=self.ap_model, + serial_num=self.serial + ) + self.Create_Dut() + # [['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58']] + self.update_ssid(ssid_data=ssid_data[ssid]) + + + def set_radio_antenna(self, req_url, shelf, resources, radio, antenna): data = { "shelf": shelf, diff --git a/requirements.txt b/requirements.txt index e20d9b005..9acadf260 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,6 @@ scp selenium tip-wlan-cloud xlsxwriter -tabulate \ No newline at end of file +tabulate +pdfkit +matplotlib \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index e27883c98..f0165c39b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -49,6 +49,7 @@ from lanforge.scp_util import SCP_File from testrails.testrail_api import APIClient from testrails.reporting import Reporting from lf_tools import ChamberView +from libs.lanforge.pull_report import Report from os import path from typing import Any, Callable, Optional @@ -57,6 +58,8 @@ from pytest import fixture from fixtures_1x import Fixtures_1x from fixtures_2x import Fixtures_2x +from fixtures_3x import Fixtures_3x +from controller.controller_3x.controller import CController ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties' ALLUREDIR_OPTION = '--alluredir' @@ -148,6 +151,33 @@ def pytest_addoption(parser): default=False, help="skip cloud controller and AP, run only lanforge tests on a ssid preconfigured" ) + parser.addoption( + "--cc.1", + action="store_true", + default=False, + help="Option to run Test Cases on cc version 1" + + ) + parser.addoption( + "--roaming_delay", + default=1, + help="Roaming delay interval" + ) + parser.addoption( + "--iteration", + default=1, + help="Roaming iterations" + ) + parser.addoption( + "--duration", + default=1, + help="Roaming duration in minutes" + ) + parser.addoption( + "--client", + default=1, + help="Number of clients to be created" + ) parser.addoption( "--skip-pcap", action="store_true", @@ -160,7 +190,6 @@ def pytest_addoption(parser): help="Device Model which is needed to test" ) - # Perfecto Parameters parser.addini("perfectoURL", "Cloud URL") parser.addini("securityToken", "Security Token") @@ -207,6 +236,7 @@ def test_cases(): def testbed(request): """yields the testbed option selection""" var = request.config.getoption("--testbed") + allure.attach(name="testbed name", body=var) yield var @pytest.fixture(scope="session") @@ -227,12 +257,47 @@ def run_lf(request): var = request.config.getoption("--run-lf") yield var +@pytest.fixture(scope="session") +def cc_1(request): + """yields the --cc.1 option for skipping configuration on AP and using Cloud controller of available framework""" + var = request.config.getoption("--cc.1") + yield var + +@pytest.fixture(scope="session") +def roaming_delay(request): + """yields the --roaming_delay option """ + var = request.config.getoption("--roaming_delay") + allure.attach(name="roaming delay provided in seconds", body=str(var)) + yield var + +@pytest.fixture(scope="session") +def iteration(request): + """yields the --iteration option for a test to provide how frequenty roam should happen """ + var = request.config.getoption("--iteration") + allure.attach(name="iteration", body=var) + yield var + +@pytest.fixture(scope="session") +def duration(request): + """yields the --duration option for a test to provide how long roam should happen """ + var = request.config.getoption("--duration") + allure.attach(name="duration in minutes", body=str(var)) + yield var + +@pytest.fixture(scope="session") +def client(request): + """yields the --client option for getting user specified client number""" + var = request.config.getoption("--client") + allure.attach(name="number of clients", body=var) + yield var + @pytest.fixture(scope="session") def skip_pcap(request): """yields the --skip-pcap option for skipping the packet capture for sanity""" var = request.config.getoption("--skip-pcap") yield var + @pytest.fixture(scope="session") def should_upgrade_firmware(request): """yields the --force-upgrade option for firmware upgrade selection""" @@ -313,10 +378,11 @@ def get_equipment_ref(request, setup_controller, testbed, get_configuration): @pytest.fixture(scope="session") -def get_sdk_version(fixtures_ver, run_lf): +def get_sdk_version(fixtures_ver, run_lf, cc_1): version = "" - if not run_lf: + if not run_lf and not cc_1: version = fixtures_ver.get_sdk_version() + print(version) yield version @@ -326,9 +392,12 @@ def get_uci_show(fixtures_ver, get_apnos, get_configuration): yield uci_show @pytest.fixture(scope="session") -def get_ap_version(fixtures_ver, get_apnos, get_configuration): - ap_version = fixtures_ver.get_ap_version(get_apnos, get_configuration) - yield ap_version +def get_ap_version(fixtures_ver, get_apnos, get_configuration, cc_1): + if not cc_1: + ap_version = fixtures_ver.get_ap_version(get_apnos, get_configuration) + yield ap_version + else: + yield True @pytest.fixture(scope="session") def skip_lf(request): @@ -345,7 +414,8 @@ def get_openflow(): def setup_controller(request, get_configuration, add_env_properties, fixtures_ver): """sets up the controller connection and yields the sdk_client object""" sdk_client = fixtures_ver.controller_obj - request.addfinalizer(fixtures_ver.disconnect) + if not cc_1: + request.addfinalizer(fixtures_ver.disconnect) yield sdk_client # Prov Controller Fixture @@ -645,16 +715,21 @@ def create_lanforge_chamberview_dut(lf_tools, skip_lf, run_lf): @pytest.fixture(scope="session") -def lf_tools(get_configuration, testbed, skip_lf, run_lf, get_ap_version): +def lf_tools(get_configuration, testbed, skip_lf, run_lf, get_ap_version, cc_1): """ Create a DUT on LANforge""" if not skip_lf: obj = ChamberView(lanforge_data=get_configuration["traffic_generator"]["details"], - testbed=testbed, run_lf=run_lf, - access_point_data=get_configuration["access_point"], ap_version=get_ap_version) + testbed=testbed, run_lf=run_lf, access_point_data=get_configuration["access_point"], cc_1=cc_1, ap_version=get_ap_version) else: obj = False yield obj +@pytest.fixture(scope="session") +def lf_reports(): + obj = Report() + yield obj + + @pytest.fixture(scope="session") def lf_test(get_configuration, setup_influx, request, skip_lf, run_lf, skip_pcap): @@ -711,16 +786,37 @@ def add_allure_environment_property(request: SubRequest) -> Optional[Callable]: @fixture(scope='session') def add_env_properties(get_configuration, get_sdk_version, get_apnos, fixtures_ver, add_allure_environment_property: Callable) -> None: - add_allure_environment_property('Access-Point-Model', get_configuration["access_point"][0]["model"]) - add_allure_environment_property('SDK-Version', get_sdk_version) + if cc_1: + for i in range(len(get_configuration["access_point"])): + add_allure_environment_property(str('Access-Point-Model'+ str(i+1)), get_configuration["access_point"][i]["model"]) + else: + add_allure_environment_property('Access-Point-Model', get_configuration["access_point"][0]["model"]) + add_allure_environment_property('SDK-Version', get_sdk_version) try: - add_allure_environment_property('Access-Point-Firmware-Version', + if not cc_1: + add_allure_environment_property('Access-Point-Firmware-Version', fixtures_ver.get_ap_version(get_apnos, get_configuration)[0].split("\n")[1]) except Exception as e: print(e) pass - add_allure_environment_property('Cloud-Controller-SDK-URL', get_configuration["controller"]["url"]) - add_allure_environment_property('AP-Serial-Number', get_configuration["access_point"][0]["serial"] + "\n") + if cc_1: + listkey = list(get_configuration["controller"].keys()) + if "version" in listkey: + add_allure_environment_property('Cloud-Controller-SDK-URL', get_configuration["controller"]["url"]) + add_allure_environment_property('Controller-Version', get_configuration["controller"]["version"]) + for i in range(len(get_configuration["access_point"])): + add_allure_environment_property(str('AP-Name-' + str(i + 1)), + get_configuration["access_point"][i]["ap_name"]) + for i in range(len(get_configuration["access_point"])): + add_allure_environment_property(str('AP-Serial-Number-' + str(i + 1)), + get_configuration["access_point"][i]["serial"]) + add_allure_environment_property('LANforge-Chipset-Info', + get_configuration["traffic_generator"]["details"]["Chip-set-info"]) + else: + pass + else: + add_allure_environment_property('Cloud-Controller-SDK-URL', get_configuration["controller"]["url"]) + add_allure_environment_property('AP-Serial-Number', get_configuration["access_point"][0]["serial"] + "\n") @fixture(scope="session") @@ -735,13 +831,16 @@ def add_firmware_property_after_upgrade(add_allure_environment_property, fixture @pytest.fixture(scope="session") -def fixtures_ver(request, get_configuration, run_lf): - if request.config.getoption("1.x") is False: +def fixtures_ver(request, get_configuration, run_lf, cc_1): + if request.config.getoption("1.x") is False and request.config.getoption("cc.1") is False: print("2.x") obj = Fixtures_2x(configuration=get_configuration, run_lf=run_lf) if request.config.getoption("1.x"): print("1.x") obj = Fixtures_1x(configuration=get_configuration) + if request.config.getoption("cc.1"): + print(" fixture version cc.1") + obj = Fixtures_3x(configuration=get_configuration, run_lf=run_lf, cc_1=cc_1) yield obj @@ -816,6 +915,21 @@ def get_apnos_logs(get_apnos, get_configuration): yield all_logs +@pytest.fixture(scope="function") +def get_controller_logs(get_configuration, ): + obj = CController(controller_data=get_configuration['controller'], ap_data=get_configuration['access_point']) + summary = obj.show_ap_summary() + print(summary) + allure.attach(name='show ap summary', body=str(summary)) + + +@pytest.fixture(scope="function") +def get_ap_config_slots(get_configuration): + obj = CController(controller_data=get_configuration['controller'], ap_data=get_configuration['access_point']) + slot = obj.show_ap_config_slots() + # print(slot) + allure.attach(name="ap_slots", body=str(slot)) + @pytest.fixture(scope="session") def get_apnos_max_clients(get_apnos, get_configuration): all_logs = [] @@ -859,4 +973,32 @@ def get_ap_channel(get_apnos, get_configuration): print(e) pass print(all_data) - yield all_data \ No newline at end of file + yield all_data + +@pytest.fixture(scope="function") +def disable_band5ghz(get_configuration): + obj = CController(controller_data=get_configuration['controller'], ap_data=get_configuration['access_point']) + shut= obj.ap_5ghz_shutdown() + print(shut) + +@pytest.fixture(scope="function") +def disable_band2ghz(get_configuration): + obj = CController(controller_data=get_configuration['controller'], ap_data=get_configuration['access_point']) + shut = obj.ap_2ghz_shutdown() + print(shut) + +@pytest.fixture(scope="function") +def disable_band6ghz(get_configuration): + obj = CController(controller_data=get_configuration['controller'], ap_data=get_configuration['access_point']) + shut = obj.ap_6ghz_shutdown() + print(shut) + +@pytest.fixture(scope="function") +def enable_all_bands(get_configuration): + obj = CController(controller_data=get_configuration['controller'], ap_data=get_configuration['access_point']) + obj.no_ap_5ghz_shutdown() + obj.no_ap_2ghz_shutdown() + obj.no_ap_6ghz_shutdown() + + + diff --git a/tests/e2e/advanced/conftest.py b/tests/e2e/advanced/conftest.py index 12935d065..6a2a5207e 100644 --- a/tests/e2e/advanced/conftest.py +++ b/tests/e2e/advanced/conftest.py @@ -11,6 +11,7 @@ if "libs" not in sys.path: from controller.controller_1x.controller import ProfileUtility from controller.controller_2x.controller import UProfileUtility +from controller.controller_3x.controller import CController import time from lanforge.lf_tests import RunTest from lanforge.lf_tools import ChamberView @@ -22,13 +23,12 @@ import allure def instantiate_profile(request): if request.config.getoption("1.x"): yield ProfileUtility + elif request.config.getoption("cc.1"): + yield CController else: yield UProfileUtility - - - @pytest.fixture(scope="session") def create_lanforge_chamberview(lf_tools): scenario_object, scenario_name = lf_tools.Chamber_View() @@ -46,7 +46,8 @@ def setup_vlan(): @pytest.fixture(scope="class") def setup_profiles(request, setup_controller, testbed, get_equipment_ref, fixtures_ver, instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools, - get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info, run_lf): + get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info, + run_lf, cc_1, lf_reports): lf_tools.reset_scenario() param = dict(request.param) @@ -70,9 +71,13 @@ def setup_profiles(request, setup_controller, testbed, get_equipment_ref, fixtur vlan_list.pop(i) if request.param["mode"] == "VLAN": lf_tools.add_vlan(vlan_ids=vlan_list) - - # call this, if 1.x - return_var = fixtures_ver.setup_profiles(request, param, setup_controller, testbed, get_equipment_ref, + print("fixture version ", fixtures_ver) + if cc_1: + return_var = fixtures_ver.setup_profiles(request, param, run_lf, instantiate_profile, get_configuration, + get_markers, + lf_tools, lf_reports) + else: + return_var = fixtures_ver.setup_profiles(request, param, setup_controller, testbed, get_equipment_ref, instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools, get_security_flags, get_configuration, radius_info, get_apnos, diff --git a/tests/e2e/advanced/roam_test/hard_roam/OTA/__init__.py b/tests/e2e/advanced/roam_test/hard_roam/OTA/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py b/tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py new file mode 100644 index 000000000..863c8972f --- /dev/null +++ b/tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py @@ -0,0 +1,320 @@ +import pytest +import allure +import time + +pytestmark = [pytest.mark.roam_test, pytest.mark.bridge, pytest.mark.roam_ota] + +setup_params_general = { + "mode": "BRIDGE", + "roam": True, + "ft+psk": True, + "ft-otd": False, + "ft-dot1x": False, + "ft-dot1x_sha256": True, + "ssid_modes": { + "wpa2_personal": [{"ssid_name": "RoamAP2g", "appliedRadios": ["2G"], "security_key": "something"}, + {"ssid_name": "RoamAP5g", "appliedRadios": ["5G"], "security_key": "something"}], + + "wpa3_personal": [{"ssid_name": "RoamAP6g", "appliedRadios": ["6G"], "security_key": "something"}] + }, + "rf": {}, + "radius": False +} +@allure.suite("Hard Roam over the air") +@allure.feature("Roam Test") +@pytest.mark.parametrize( + 'setup_profiles', + [setup_params_general], + indirect=True, + scope="class" +) +@pytest.mark.usefixtures("setup_profiles") + + +class TestRoamOTA(object): + + @pytest.mark.hard_roam_5g_ota + @pytest.mark.wpa2_personal + @pytest.mark.wpa3_personal + def test_multi_hard_roam_5g_to_5g_ft_psk_wpa2(self, get_configuration, lf_test, lf_reports, lf_tools, + run_lf, add_env_properties, + instantiate_profile, get_controller_logs, get_ap_config_slots, + get_lf_logs, + roaming_delay, iteration, client, duration): + + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=0) + print("shut down 2g and 6g band") + instantiate_profile_obj.ap_2ghz_shutdown() + instantiate_profile_obj.ap_6ghz_shutdown() + print("enable only 5g") + instantiate_profile_obj.no_ap_5ghz_shutdown() + + + profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa2" + mode = "BRIDGE" + band = "fiveg" + vlan = 1 + print("disable wlan ") + instantiate_profile_obj.disable_wlan(wlan=setup_params_general["ssid_modes"]["wpa2_personal"][0]["ssid_name"]) + instantiate_profile_obj.disable_wlan(wlan=setup_params_general["ssid_modes"]["wpa3_personal"][0]["ssid_name"]) + dut_name = [] + for i in range(len(get_configuration["access_point"])): + dut_name.append(get_configuration["access_point"][i]["ap_name"]) + + print("dut names", dut_name) + # check channel + + lf_test.create_n_clients(sta_prefix="wlan1", num_sta=1, dut_ssid=ssid_name, + dut_security=security, dut_passwd=security_key, band="fiveg", + lf_tools=lf_tools, type="11r") + sta_list = lf_tools.get_station_list() + print(sta_list) + val = lf_test.wait_for_ip(station=sta_list) + ch = "" + if val: + for sta_name in sta_list: + sta = sta_name.split(".")[2] + time.sleep(5) + ch = lf_tools.station_data_query(station_name=str(sta), query="channel") + print(ch) + lf_test.Client_disconnect(station_name=sta_list) + + else: + pytest.exit("station failed to get ip") + assert False + + lf_test.hard_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, + instantiate_profile=instantiate_profile, lf_reports=lf_reports, + ssid_name=ssid_name, security=security, security_key=security_key, + band=band, test="5g", + iteration=int(iteration), num_sta=int(client), roaming_delay=roaming_delay, + option="ota", channel=ch, duration=duration, duration_based=False, + iteration_based=True, dut_name=dut_name) + + @pytest.mark.hard_roam_2g_ota + @pytest.mark.wpa2_personal + @pytest.mark.wpa3_personal + def test_multi_hard_roam_2g_to_2g_ft_psk_wpa2(self, get_configuration, lf_test, lf_reports, lf_tools, + run_lf, add_env_properties, + instantiate_profile, get_controller_logs, get_ap_config_slots, + get_lf_logs, + roaming_delay, iteration, client, duration): + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=0) + print("shut down 5g and 6g band") + instantiate_profile_obj.ap_5ghz_shutdown() + instantiate_profile_obj.ap_6ghz_shutdown() + print("enable only 2g") + instantiate_profile_obj.no_ap_2ghz_shutdown() + profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa2" + mode = "BRIDGE" + band = "twog" + vlan = 1 + print("disable wlan ") + instantiate_profile_obj.disable_wlan(wlan=setup_params_general["ssid_modes"]["wpa2_personal"][1]["ssid_name"]) + instantiate_profile_obj.disable_wlan(wlan=setup_params_general["ssid_modes"]["wpa3_personal"][0]["ssid_name"]) + dut_name = [] + for i in range(len(get_configuration["access_point"])): + dut_name.append(get_configuration["access_point"][i]["ap_name"]) + + print("dut names", dut_name) + # check channel + + lf_test.create_n_clients(sta_prefix="wlan", num_sta=1, dut_ssid=ssid_name, + dut_security=security, dut_passwd=security_key, band="twog", + lf_tools=lf_tools, type="11r") + sta_list = lf_tools.get_station_list() + print(sta_list) + val = lf_test.wait_for_ip(station=sta_list) + ch = "" + if val: + for sta_name in sta_list: + sta = sta_name.split(".")[2] + time.sleep(5) + ch = lf_tools.station_data_query(station_name=str(sta), query="channel") + print(ch) + lf_test.Client_disconnect(station_name=sta_list) + + else: + pytest.exit("station failed to get ip") + assert False + + lf_test.hard_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, + instantiate_profile=instantiate_profile, lf_reports=lf_reports, + ssid_name=ssid_name, security=security, security_key=security_key, + band=band, test="2g", + iteration=int(iteration), num_sta=int(client), roaming_delay=roaming_delay, + option="ota", channel=ch, duration=duration, duration_based=False, + iteration_based=True, dut_name=dut_name) + + @pytest.mark.hard_roam_6g_to_6g_dot1x_sha256 + @pytest.mark.wpa2_personal + @pytest.mark.wpa3_personal + def test_multi_hard_roam_6g_to_6g_802dot1x_sha256_wpa3(self, get_configuration, lf_test, lf_reports, lf_tools, + run_lf, add_env_properties, + instantiate_profile, get_controller_logs, + get_ap_config_slots, + get_lf_logs, + roaming_delay, iteration, client, duration, radius_info): + ttls_passwd = radius_info["password"] + identity = radius_info['user'] + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", + ap_data=get_configuration['access_point'], + type=0) + print("shut down 2g band") + instantiate_profile_obj.ap_2ghz_shutdown() + print("enable only 5g and 6g") + instantiate_profile_obj.no_ap_5ghz_shutdown() + instantiate_profile_obj.no_ap_6ghz_shutdown() + + profile_data = setup_params_general["ssid_modes"]["wpa3_personal"][0] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa3" + mode = "BRIDGE" + band = "sixg" + vlan = 1 + print("disable wlan ") + instantiate_profile_obj.disable_wlan(wlan=setup_params_general["ssid_modes"]["wpa2_personal"][0]["ssid_name"]) + instantiate_profile_obj.disable_wlan(wlan=setup_params_general["ssid_modes"]["wpa2_personal"][1]["ssid_name"]) + dut_name = [] + for i in range(len(get_configuration["access_point"])): + dut_name.append(get_configuration["access_point"][i]["ap_name"]) + + print("dut names", dut_name) + + # check channel + lf_test.create_n_clients(sta_prefix="wlan1", num_sta=1, dut_ssid=ssid_name, + dut_security=security, dut_passwd=security_key, band="sixg", + lf_tools=lf_tools, type="11r-sae-802.1x") + sta_list = lf_tools.get_station_list() + print(sta_list) + val = lf_test.wait_for_ip(station=sta_list) + ch = "" + if val: + for sta_name in sta_list: + sta = sta_name.split(".")[2] + time.sleep(5) + ch = lf_tools.station_data_query(station_name=str(sta), query="channel") + print(ch) + lf_test.Client_disconnect(station_name=sta_list) + + else: + pytest.exit("station failed to get ip") + assert False + + lf_test.hard_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, + lf_reports=lf_reports, + instantiate_profile=instantiate_profile, + ssid_name=ssid_name, security=security, security_key=security_key, + band=band, test="6g", + iteration=int(iteration), num_sta=int(client), roaming_delay=roaming_delay, + option="ota", channel=ch, duration=duration, iteration_based=True, + duration_based=False, dut_name=dut_name, identity=identity, ttls_passwd=ttls_passwd) + + +# setup_params_general_two = { +# "mode": "BRIDGE", +# "roam": True, +# "ft+psk": True, +# "ft-otd": False, +# "ft-dot1x": False, +# "ft-dot1x_sha256": True, +# "ssid_modes": { +# "wpa2_personal": [{"ssid_name": "RoamAP2g", "appliedRadios": ["2G"], "security_key": "something"}, +# {"ssid_name": "RoamAP5g", "appliedRadios": ["5G"], "security_key": "something"}], +# +# "wpa3_personal": [{"ssid_name": "RoamAP6g", "appliedRadios": ["6G"], "security_key": "something"}] +# }, +# "rf": {}, +# "radius": False +# } +# +# @allure.suite("Hard Roam over the air") +# @allure.feature("Roam Test") +# @pytest.mark.parametrize( +# 'setup_profiles', +# [setup_params_general_two], +# indirect=True, +# scope="class" +# ) +# @pytest.mark.usefixtures("setup_profiles") +# class TestRoamOTAdot1xSha256(object): +# +# @pytest.mark.hard_roam_6g_to_6g_dot1x_sha256 +# @pytest.mark.wpa2_personal +# @pytest.mark.wpa3_personal +# def test_multi_hard_roam_6g_to_6g_802dot1x_sha256_wpa3(self, get_configuration, lf_test, lf_reports, lf_tools, +# run_lf, add_env_properties, +# instantiate_profile, get_controller_logs, +# get_ap_config_slots, +# get_lf_logs, +# roaming_delay, iteration, client, duration, radius_info): +# ttls_passwd = radius_info["password"] +# identity = radius_info['user'] +# instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], +# timeout="10", +# ap_data=get_configuration['access_point'], +# type=0) +# print("shut down 2g band") +# instantiate_profile_obj.ap_2ghz_shutdown() +# print("enable only 5g and 6g") +# instantiate_profile_obj.no_ap_5ghz_shutdown() +# instantiate_profile_obj.no_ap_6ghz_shutdown() +# +# profile_data = setup_params_general["ssid_modes"]["wpa3_personal"][0] +# ssid_name = profile_data["ssid_name"] +# security_key = profile_data["security_key"] +# security = "wpa3" +# mode = "BRIDGE" +# band = "sixg" +# vlan = 1 +# dut_name = [] +# for i in range(len(get_configuration["access_point"])): +# dut_name.append(get_configuration["access_point"][i]["ap_name"]) +# +# print("dut names", dut_name) +# +# # check channel +# lf_test.create_n_clients(sta_prefix="wlan1", num_sta=1, dut_ssid=ssid_name, +# dut_security=security, dut_passwd=security_key, band="sixg", +# lf_tools=lf_tools, type="11r-sae-802.1x") +# sta_list = lf_tools.get_station_list() +# print(sta_list) +# val = lf_test.wait_for_ip(station=sta_list) +# ch = "" +# if val: +# for sta_name in sta_list: +# sta = sta_name.split(".")[2] +# time.sleep(5) +# ch = lf_tools.station_data_query(station_name=str(sta), query="channel") +# print(ch) +# lf_test.Client_disconnect(station_name=sta_list) +# +# else: +# pytest.exit("station failed to get ip") +# assert False +# +# lf_test.hard_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, +# lf_reports=lf_reports, +# instantiate_profile=instantiate_profile, +# ssid_name=ssid_name, security=security, security_key=security_key, +# band=band, test="6g", +# iteration=int(iteration), num_sta=int(client), roaming_delay=roaming_delay, +# option="ota", channel=ch, duration=duration, iteration_based=True, +# duration_based=False, dut_name=dut_name, identity=identity, ttls_passwd=ttls_passwd) +# +# +# diff --git a/tests/e2e/advanced/roam_test/hard_roam/OTD/__init__.py b/tests/e2e/advanced/roam_test/hard_roam/OTD/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/e2e/advanced/roam_test/hard_roam/OTD/test_5g_roam_otd.py b/tests/e2e/advanced/roam_test/hard_roam/OTD/test_5g_roam_otd.py new file mode 100644 index 000000000..7cdf4110a --- /dev/null +++ b/tests/e2e/advanced/roam_test/hard_roam/OTD/test_5g_roam_otd.py @@ -0,0 +1,76 @@ +import pytest +import allure +import time + +pytestmark = [pytest.mark.roam_test, pytest.mark.bridge] + +setup_params_general = { + "mode": "BRIDGE", + "roam": True, + "ft+psk": True, + "ft-otd": False, + "ssid_modes": { + "wpa2_personal": [{"ssid_name": "RoamAP2g", "appliedRadios": ["2G"], "security_key": "something"}, + {"ssid_name": "RoamAP5g", "appliedRadios": ["5G"], "security_key": "something"}], + + "wpa3_personal": [{"ssid_name": "RoamAP6g", "appliedRadios": ["6G"], "security_key": "something"}] + }, + "rf": {}, + "radius": False +} +@allure.suite("Hard Roam 5g") +@allure.feature("Roam Test") +# @pytest.mark.parametrize( +# 'setup_profiles', +# [setup_params_general], +# indirect=True, +# scope="class" +# ) +# @pytest.mark.usefixtures("setup_profiles") + + +class TestRoamOTDFiveg(object): + + @pytest.mark.roam_5g_otd + @pytest.mark.wpa2_personal + def test_multi_hard_roam_otd_5g_to_5g(self, get_configuration, lf_test, lf_reports, lf_tools, + run_lf, add_env_properties, + instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs, + roaming_delay, iteration, client): + profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa2" + mode = "BRIDGE" + band = "fiveg" + vlan = 1 + # check channel + + lf_test.create_n_clients(sta_prefix="wlan1", num_sta=1, dut_ssid=ssid_name, + dut_security=security, dut_passwd=security_key, band="fiveg", + lf_tools=lf_tools, type="11r") + sta_list = lf_tools.get_station_list() + print(sta_list) + val = lf_test.wait_for_ip(station=sta_list) + ch = "" + if val: + for sta_name in sta_list: + sta = sta_name.split(".")[2] + time.sleep(5) + ch = lf_tools.station_data_query(station_name=str(sta), query="channel") + print(ch) + lf_test.Client_disconnect(station_name=sta_list) + + else: + pytest.exit("station failed to get ip") + assert False + + lf_test.multi_hard_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, + lf_reports=lf_reports, + instantiate_profile=instantiate_profile, + ssid_name=ssid_name, security=security, security_key=security_key, + band=band, test="5g", + iteration=int(iteration), num_sta=int(client), roaming_delay=roaming_delay, + option="otds", channel=ch) + + diff --git a/tests/e2e/advanced/roam_test/hard_roam/__init__.py b/tests/e2e/advanced/roam_test/hard_roam/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge _roam.py b/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge _roam.py deleted file mode 100644 index dd2e52f90..000000000 --- a/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge _roam.py +++ /dev/null @@ -1,232 +0,0 @@ -import time - -import pytest -import allure -from configuration import CONFIGURATION - -pytestmark = [pytest.mark.roam_test, pytest.mark.bridge] - -setup_params_general = { - "mode": "BRIDGE", - "ssid_modes": { - "wpa2_personal": [ - {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security_key": "something"}, - {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something"}]}, - "rf": {}, - "radius": False -} - -class TestRateLimitingWithRadiusBridge(object): - - @pytest.mark.roam_2g - def test_basic_roam_2g(self, get_configuration, lf_test, station_names_twog, lf_tools, run_lf, add_env_properties): - profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0] - ssid_name = profile_data["ssid_name"] - security_key = profile_data["security_key"] - security = "wpa2" - mode = "BRIDGE" - band = "twog" - vlan = 1 - c1_2g_bssid = "" - c2_2g_bssid = "" - if run_lf: - c1_2g_bssid = get_configuration["access_point"][0]["ssid"]["2g-bssid"] - allure.attach(name="bssid of ap1", body=c1_2g_bssid) - c2_2g_bssid = get_configuration["access_point"][1]["ssid"]["2g-bssid"] - allure.attach(name="bssid of ap2", body=c2_2g_bssid) - - ser_no = lf_test.attenuator_serial() - print(ser_no[0]) - ser_1 = ser_no[0].split(".")[2] - ser_2 = ser_no[1].split(".")[2] - # put attenuation to zero in all attenuator's - # for i in range(4): - # lf_test.attenuator_modify(ser_1, i, 0) - # lf_test.attenuator_modify(ser_2, i, 0) - - # # create station - station = lf_test.Client_Connect(ssid=ssid_name, security=security, passkey=security_key, mode=mode, band=band, - station_name=station_names_twog, vlan_id=vlan) - if station : - lf_test.attach_stationdata_to_allure(name="staion info before roam", station_name=station_names_twog) - bssid = lf_tools.station_data_query(station_name=str(station_names_twog[0]), query="ap") - formated_bssid = bssid.lower() - station_before = "" - if formated_bssid == c1_2g_bssid: - print("station connected to chamber1 ap") - station_before = formated_bssid - elif formated_bssid == c2_2g_bssid: - print("station connected to chamber 2 ap") - station_before = formated_bssid - # logic to decrease c1 attenuation and increase c2 attenuation - for atten_val1, atten_val2 in zip([0, 100, 300, 500, 750, 950],[950, 750, 500,300, 100, 0]): - print(atten_val1) - print(atten_val2) - for i in range(4): - lf_test.attenuator_modify(int(ser_1), i, atten_val1) - lf_test.attenuator_modify(int(ser_2), i, atten_val2) - time.sleep(10) - lf_tools.admin_up_down(sta_list=station_names_twog, option="down") - time.sleep(15) - lf_tools.admin_up_down(sta_list=station_names_twog,option="up") - time.sleep(15) - bssid = lf_tools.station_data_query(station_name=str(station_names_twog[0]), query="ap") - station_after = bssid.lower() - if station_after == station_before: - continue - elif station_after != station_before: - print("client performed roam") - lf_test.attach_stationdata_to_allure(name="staion info after roam", - station_name=station_names_twog) - allure.attach(name="attenuation_data", body="ap1 was at attenuation value " + str( - atten_val2) + "ddbm and ap2 was at attenuation value " + str(atten_val1) + "ddbm") - break - lf_test.Client_disconnect(station_name=station_names_twog) - else: - allure.attach(name="FAIL", body="station failed to get ip") - assert False - - @pytest.mark.roam_5g - def test_basic_roam_5g(self, get_configuration, lf_test, station_names_fiveg, lf_tools, run_lf, add_env_properties): - profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0] - ssid_name = profile_data["ssid_name"] - security_key = profile_data["security_key"] - security = "wpa2" - mode = "BRIDGE" - band = "fiveg" - vlan = 1 - c1_5g_bssid = "" - c2_5g_bssid = "" - if run_lf: - c1_5g_bssid = get_configuration["access_point"][0]["ssid"]["5g-bssid"] - allure.attach(name="bssid of ap1", body=c1_5g_bssid) - c2_5g_bssid = get_configuration["access_point"][1]["ssid"]["5g-bssid"] - allure.attach(name="bssid of ap2", body=c2_5g_bssid) - - ser_no = lf_test.attenuator_serial() - print(ser_no[0]) - ser_1 = ser_no[0].split(".")[2] - ser_2 = ser_no[1].split(".")[2] - # put attenuation to zero in all attenuator's - # for i in range(4): - # lf_test.attenuator_modify(ser_1, i, 0) - # lf_test.attenuator_modify(ser_2, i, 0) - - # # create station - station = lf_test.Client_Connect(ssid=ssid_name, security=security, passkey=security_key, mode=mode, band=band, - station_name=station_names_fiveg, vlan_id=vlan) - if station: - lf_test.attach_stationdata_to_allure(name="staion info before roam", station_name=station_names_fiveg) - bssid = lf_tools.station_data_query(station_name=str(station_names_fiveg[0]), query="ap") - formated_bssid = bssid.lower() - station_before = "" - if formated_bssid == c1_5g_bssid: - print("station connected to chamber1 ap") - station_before = formated_bssid - elif formated_bssid == c2_5g_bssid: - print("station connected to chamber 2 ap") - station_before = formated_bssid - # logic to decrease c1 attenuation and increase c2 attenuation - for atten_val1, atten_val2 in zip([0, 100, 300, 500, 750, 950], [950, 750, 500, 300, 100, 0]): - print(atten_val1) - print(atten_val2) - for i in range(4): - lf_test.attenuator_modify(int(ser_1), i, atten_val1) - lf_test.attenuator_modify(int(ser_2), i, atten_val2) - time.sleep(10) - lf_tools.admin_up_down(sta_list=station_names_fiveg, option="down") - time.sleep(15) - lf_tools.admin_up_down(sta_list=station_names_fiveg, option="up") - time.sleep(15) - bssid = lf_tools.station_data_query(station_name=str(station_names_fiveg[0]), query="ap") - station_after = bssid.lower() - if station_after == station_before: - continue - elif station_after != station_before: - print("client performed roam") - lf_test.attach_stationdata_to_allure(name="staion info after roam", - station_name=station_names_fiveg) - allure.attach(name="attenuation_data", body="ap1 was at attenuation value " + str( - atten_val2) + "ddbm and ap2 was at attenuation value " + str(atten_val1) + "ddbm") - break - lf_test.Client_disconnect(station_name=station_names_fiveg) - else: - allure.attach(name="FAIL", body="station failed to get ip") - assert False - - - @pytest.mark.multi_roam - def test_multiple_roam_2g(self, get_configuration, lf_test, station_names_twog, lf_tools, run_lf, add_env_properties): - c1_2g_bssid = "" - c2_2g_bssid = "" - if run_lf: - c1_2g_bssid = get_configuration["access_point"][0]["ssid"]["2g-bssid"] - allure.attach(name="bssid of ap1", body=c1_2g_bssid) - c2_2g_bssid = get_configuration["access_point"][1]["ssid"]["2g-bssid"] - allure.attach(name="bssid of ap2", body=c2_2g_bssid) - ssid_name = get_configuration["access_point"][0]["ssid"]["2g-ssid"] - - ser_no = lf_test.attenuator_serial() - print(ser_no[0]) - ser_1 = ser_no[0].split(".")[2] - ser_2 = ser_no[1].split(".")[2] - lf_tools.add_stations(band="2G", num_stations=3, dut=lf_tools.dut_name, ssid_name=ssid_name) - lf_tools.Chamber_View() - sta_list = lf_tools.get_station_list() - print("sta_list", sta_list) - lf_tools.admin_up_down(sta_list=sta_list, option="up") - station = lf_test.wait_for_ip(station=sta_list) - station_before = "" - station_list = [] - for i in range(len(sta_list)): - station_list.append(sta_list[i].split(".")[2]) - print(station_list) - if station: - lf_test.attach_stationdata_to_allure(name="staion info before roam", station_name=sta_list) - for i in station_list: - bssid = lf_tools.station_data_query(station_name=str(i), query="ap") - formated_bssid = bssid.lower() - if formated_bssid == c1_2g_bssid: - print("station connected to chamber1 ap") - station_before = formated_bssid - elif formated_bssid == c2_2g_bssid: - print("station connected to chamber 2 ap") - station_before = formated_bssid - # logic to decrease c1 attenuation and increase c2 attenuation - for atten_val1, atten_val2 in zip([0, 100, 300, 500, 750, 950], [950, 750, 500, 300, 100, 0]): - print(atten_val1) - print(atten_val2) - for i in range(4): - lf_test.attenuator_modify(int(ser_1), i, atten_val1) - lf_test.attenuator_modify(int(ser_2), i, atten_val2) - time.sleep(10) - lf_tools.admin_up_down(sta_list=station_list, option="down") - time.sleep(15) - lf_tools.admin_up_down(sta_list=station_list, option="up") - time.sleep(15) - for i in station_list: - bssid = lf_tools.station_data_query(station_name=str(i), query="ap") - station_after = bssid.lower() - if station_after == station_before: - continue - elif station_after != station_before: - print("client performed roam") - lf_test.attach_stationdata_to_allure(name="staion info after roam", station_name=i) - allure.attach(name="attenuation_data", body="ap1 was at attenuation value " + str(atten_val2) + "ddbm and ap2 was at attenuation value " + str(atten_val1) + "ddbm") - break - - else: - allure.attach(name="FAIL", body="stations failed to get ip") - assert False - - - - - - - - - - - - diff --git a/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge_5g_to_5g.py b/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge_5g_to_5g.py new file mode 100644 index 000000000..7c7711839 --- /dev/null +++ b/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge_5g_to_5g.py @@ -0,0 +1,170 @@ +import time + +import pytest +import allure +# from configuration import CONFIGURATION + +pytestmark = [pytest.mark.roam_test, pytest.mark.bridge] + +# setup_params_general = { +# "mode": "BRIDGE", +# "roam": False, +# "ssid_modes": { +# "wpa2_personal": [ +# {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security_key": "something"}, +# {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something"}]}, +# "rf": {}, +# "radius": False +# } +setup_params_general = { + "mode": "BRIDGE", + "roam": True, + "ft+psk": True, + "ssid_modes": { + "wpa2_personal": [{"ssid_name": "RoamAP2g", "appliedRadios": ["2G"], "security_key": "something"}, + {"ssid_name": "RoamAP5g", "appliedRadios": ["5G"], "security_key": "something"}], + + "wpa3_personal": [{"ssid_name": "RoamAP6g", "appliedRadios": ["6G"], "security_key": "something"}] + }, + # "roam_type": "fiveg_to_fiveg", + "rf": {}, + "radius": False +} +@allure.suite("Roam Test with attenuator") +@allure.feature("Roam Test") +# @pytest.mark.parametrize( +# 'setup_profiles', +# [setup_params_general], +# indirect=True, +# scope="class" +# ) +# @pytest.mark.usefixtures("setup_profiles") +# @allure.step +# def nested_step_allure(bssid, rssi): +# pass + +class TestBasicRoam(object): + + @pytest.mark.roam_5g + @pytest.mark.bob + @pytest.mark.wpa2_personal + def test_basic_roam_5g_to_5g(self, get_configuration, lf_test, lf_reports, station_names_fiveg, lf_tools, run_lf, add_env_properties, + instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs): + profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa2" + mode = "BRIDGE" + band = "fiveg" + vlan = 1 + lf_test.basic_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, + lf_reports=lf_reports, + instantiate_profile=instantiate_profile, + ssid_name=ssid_name, security=security, security_key=security_key, + mode=mode, band=band, station_name=station_names_fiveg, vlan=vlan, test="5g") + + + @pytest.mark.multi_roam + @pytest.mark.wpa2_personal + @pytest.mark.wpa3_personal + def test_multi_roam_5g_to_5g_soft_roam_11r(self, get_configuration, lf_test, lf_reports, station_names_fiveg, lf_tools, run_lf, add_env_properties, + instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs): + profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa2" + mode = "BRIDGE" + band = "fiveg" + vlan = 1 + print("starting snifer") + # lf_test.start_sniffer(radio_channel=36, radio="wiphy2", test_name="roam_11r", duration=3600) + lf_test.create_n_clients(sta_prefix="wlan", num_sta=2, dut_ssid=ssid_name, + dut_security=security, dut_passwd=security_key, radio="wiphy1", lf_tools=lf_tools, + type="11r") + + # lf_test.multi_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, + # lf_reports=lf_reports, + # instantiate_profile=instantiate_profile, + # ssid_name=ssid_name, security=security, security_key=security_key, + # mode=mode, band=band, station_name=station_names_fiveg, vlan=vlan, test="5g") + # + # print("stop sniff") + # file_name = lf_test.stop_sniffer() + # print(file_name) + # print("wait for logs to be attached") + # file_name = "roam_11r2022-03-23-00-02.pcap" + # time.sleep(10) + # query_auth = lf_test. query_sniff_data(pcap_file=str(file_name), filter="wlan.fc.type_subtype==0x000b") + # print("query", query_auth) + # allure.attach(name="authentication", body=str(query_auth)) + # query_asso = lf_test.query_sniff_data(pcap_file=str(file_name), filter="wlan.fc.type_subtype==0x0000") + # print("query", query_asso) + # allure.attach(name="authentication", body=str(query_asso)) + # query_reasso_response = lf_test.query_sniff_data(pcap_file=str(file_name), filter="(wlan.fc.type_subtype==3) && (wlan.tag.number==55)") + # print("query", query_reasso_response) + # allure.attach(name="authentication", body=str(query_reasso_response)) + # query_4way = lf_test.query_sniff_data(pcap_file=str(file_name), filter="eapol") + # print("query", query_4way) + # allure.attach(name="authentication", body=str(query_4way)) + # + + @pytest.mark.hard + @pytest.mark.wpa2_personal + def test_multi_hard_roam_5g_to_5g(self, get_configuration, lf_test, lf_reports, station_names_fiveg, lf_tools, run_lf, add_env_properties, + instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs): + profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa2" + mode = "BRIDGE" + band = "fiveg" + vlan = 1 + lf_test.multi_hard_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, + lf_reports=lf_reports, + instantiate_profile=instantiate_profile, + ssid_name=ssid_name, security=security, security_key=security_key, + mode=mode, band=band, station_name=station_names_fiveg, vlan=vlan, test="5g", iteration=2, num_sta=1) + + @pytest.mark.testing + def test_testing(self, lf_test): + ret = lf_test.sniff_full_data(pcap_file="roam_11r2022-03-25-13-27.pcap") + print(ret) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge_single_client.py b/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge_single_client.py new file mode 100644 index 000000000..f359b6e46 --- /dev/null +++ b/tests/e2e/advanced/roam_test/roam_by_attenuation/test_bridge_single_client.py @@ -0,0 +1,208 @@ +import time + +import pytest +import allure +# from configuration import CONFIGURATION + +pytestmark = [pytest.mark.roam_test, pytest.mark.bridge] + +setup_params_general = { + "mode": "BRIDGE", + "roam": False, + "ssid_modes": { + "wpa2_personal": [ + {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security_key": "something"}, + {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something"}], + "wpa3_personal": [ + {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["6G"], "security_key": "something"} + ]}, + "rf": {}, + "radius": False +} +@allure.suite("Roam Test with attenuator") +@allure.feature("Roam Test") +# @pytest.mark.parametrize( +# 'setup_profiles', +# [setup_params_general], +# indirect=True, +# scope="class" +# ) +# @pytest.mark.usefixtures("setup_profiles") + +class TestBasicRoam(object): + + + @pytest.mark.roam_2g + @pytest.mark.wpa2_personal + def test_basic_roam_2g(self, get_configuration, lf_test, station_names_twog, lf_tools, run_lf, add_env_properties, + instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs): + + profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa2" + mode = "BRIDGE" + band = "twog" + vlan = 1 + + # calling basic roam from lf_test + roam =lf_test.basic_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, instantiate_profile=instantiate_profile, + ssid_name=ssid_name, security=security, security_key=security_key, + mode=mode, band=band, station_name=station_names_twog, vlan=vlan, test="2g") + if roam: + assert True + else: + assert False + + + @pytest.mark.roam_5g + @pytest.mark.wpa2_personal + def test_basic_roam_5g(self, get_configuration, lf_test, station_names_fiveg, lf_tools, run_lf, add_env_properties, + instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs): + profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1] + ssid_name = profile_data["ssid_name"] + security_key = profile_data["security_key"] + security = "wpa2" + mode = "BRIDGE" + band = "fiveg" + vlan = 1 + roam = lf_test.basic_roam(run_lf=run_lf, get_configuration=get_configuration, lf_tools=lf_tools, + instantiate_profile=instantiate_profile, + ssid_name=ssid_name, security=security, security_key=security_key, + mode=mode, band=band, station_name=station_names_fiveg, vlan=vlan, test="5g") + if roam: + assert True + else: + assert False + + @pytest.mark.sixg + @pytest.mark.wpa2_personal + @pytest.mark.wpa3_personal + def test_basic_roam_6g(self): + pass + + + + + + + + + + + + + + + + + + + + + + + + + + + + + # @pytest.mark.multi_roam_2g + # @pytest.mark.wpa2_personl + # def test_multiple_roam_2g(self, get_configuration, lf_test, station_names_twog, lf_tools, run_lf, add_env_properties, + # instantiate_profile, get_controller_logs, get_ap_config_slots, get_lf_logs): + # profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0] + # ssid_name = profile_data["ssid_name"] + # security_key = profile_data["security_key"] + # security = "wpa2" + # mode = "BRIDGE" + # band = "twog" + # vlan = 1 + # c1_2g_bssid = "" + # c2_2g_bssid = "" + # if run_lf: + # c1_2g_bssid = get_configuration["access_point"][0]["ssid"]["2g-bssid"] + # allure.attach(name="bssid of ap1", body=c1_2g_bssid) + # c2_2g_bssid = get_configuration["access_point"][1]["ssid"]["2g-bssid"] + # allure.attach(name="bssid of ap2", body=c2_2g_bssid) + # ssid_name = get_configuration["access_point"][0]["ssid"]["2g-ssid"] + # else: + # for ap_name in range(len(get_configuration['access_point'])): + # instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + # timeout="10", ap_data=get_configuration['access_point'], type=ap_name) + # bssid_2g = instantiate_profile_obj.cal_bssid_2g() + # if ap_name == 0 : + # c1_2g_bssid = bssid_2g + # if ap_name == 1: + # c2_2g_bssid = bssid_2g + # print("bssid of c1 ", c1_2g_bssid) + # allure.attach(name="bssid of ap1", body=c1_2g_bssid) + # print("bssid of c2", c2_2g_bssid) + # allure.attach(name="bssid of ap2", body=c2_2g_bssid) + # + # + # ser_no = lf_test.attenuator_serial() + # print(ser_no[0]) + # ser_1 = ser_no[0].split(".")[2] + # ser_2 = ser_no[1].split(".")[2] + # lf_tools.add_stations(band="2G", num_stations=3, dut=lf_tools.dut_name, ssid_name=ssid_name) + # lf_tools.Chamber_View() + # sta_list = lf_tools.get_station_list() + # print("sta_list", sta_list) + # lf_tools.admin_up_down(sta_list=sta_list, option="up") + # station = lf_test.wait_for_ip(station=sta_list) + # station_before = "" + # station_list = [] + # for i in range(len(sta_list)): + # station_list.append(sta_list[i].split(".")[2]) + # print(station_list) + # if station: + # lf_test.attach_stationdata_to_allure(name="staion info before roam", station_name=sta_list) + # for i in station_list: + # bssid = lf_tools.station_data_query(station_name=str(i), query="ap") + # formated_bssid = bssid.lower() + # if formated_bssid == c1_2g_bssid: + # print("station connected to chamber1 ap") + # station_before = formated_bssid + # elif formated_bssid == c2_2g_bssid: + # print("station connected to chamber 2 ap") + # station_before = formated_bssid + # # logic to decrease c1 attenuation and increase c2 attenuation + # for atten_val1, atten_val2 in zip([0, 100, 300, 500, 750, 950], [950, 750, 500, 300, 100, 0]): + # print(atten_val1) + # print(atten_val2) + # for i in range(4): + # lf_test.attenuator_modify(int(ser_1), i, atten_val1) + # lf_test.attenuator_modify(int(ser_2), i, atten_val2) + # time.sleep(10) + # lf_tools.admin_up_down(sta_list=station_list, option="down") + # time.sleep(15) + # lf_tools.admin_up_down(sta_list=station_list, option="up") + # time.sleep(15) + # for i in station_list: + # bssid = lf_tools.station_data_query(station_name=str(i), query="ap") + # station_after = bssid.lower() + # if station_after == station_before: + # continue + # elif station_after != station_before: + # print("client performed roam") + # lf_test.attach_stationdata_to_allure(name="staion info after roam", station_name=i) + # allure.attach(name="attenuation_data", body="ap1 was at attenuation value " + str(atten_val2) + "ddbm and ap2 was at attenuation value " + str(atten_val1) + "ddbm") + # break + # + # else: + # allure.attach(name="FAIL", body="stations failed to get ip") + # assert False + # + + + + + + + + + + + diff --git a/tests/e2e/basic/conftest.py b/tests/e2e/basic/conftest.py index 731761ea5..ee36f7307 100644 --- a/tests/e2e/basic/conftest.py +++ b/tests/e2e/basic/conftest.py @@ -12,6 +12,7 @@ if "libs" not in sys.path: from controller.controller_1x.controller import ProfileUtility from controller.controller_2x.controller import UProfileUtility +from controller.controller_3x.controller import CController import time from lanforge.lf_tools import ChamberView import pytest @@ -22,6 +23,8 @@ import allure def instantiate_profile(request): if request.config.getoption("1.x"): yield ProfileUtility + elif request.config.getoption("cc.1"): + yield CController else: yield UProfileUtility @@ -43,7 +46,7 @@ def create_lanforge_chamberview_dut(lf_tools, run_lf): @pytest.fixture(scope="class") def setup_profiles(request, setup_controller, testbed, get_equipment_ref, fixtures_ver, reset_scenario_lf, instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools, run_lf, - get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info): + get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info, cc_1): param = dict(request.param) # VLAN Setup @@ -69,7 +72,11 @@ def setup_profiles(request, setup_controller, testbed, get_equipment_ref, fixtur lf_tools.add_vlan(vlan_ids=vlan_list) # call this, if 1.x - return_var = fixtures_ver.setup_profiles(request, param, setup_controller, testbed, get_equipment_ref, + print("fixture version ", fixtures_ver) + if cc_1: + return_var = fixtures_ver.setup_profiles(request, param, run_lf, instantiate_profile, get_configuration, get_markers, lf_tools) + else: + return_var = fixtures_ver.setup_profiles(request, param, setup_controller, testbed, get_equipment_ref, instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools, get_security_flags, get_configuration, radius_info, get_apnos, diff --git a/tests/fixtures_3x.py b/tests/fixtures_3x.py new file mode 100644 index 000000000..6b58a9ec8 --- /dev/null +++ b/tests/fixtures_3x.py @@ -0,0 +1,400 @@ +import allure +import pytest +import os +import sys +""" Environment Paths """ +if "libs" not in sys.path: + sys.path.append(f'../libs') +for folder in 'py-json', 'py-scripts': + if folder not in sys.path: + sys.path.append(f'../lanforge/lanforge-scripts/{folder}') + +sys.path.append( + os.path.dirname( + os.path.realpath(__file__) + ) +) +sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity") + +sys.path.append(f'../libs') +sys.path.append(f'../libs/lanforge/') +sys.path.append(f'../lanforge/lanforge-scripts') + +from LANforge.LFUtils import * + +if 'py-json' not in sys.path: + sys.path.append('../py-scripts') +from controller.controller_3x.controller import CController + + +class Fixtures_3x: + + def __init__(self, configuration={}, run_lf=False, cc_1=False): + self.lab_info = configuration + self.run_lf = run_lf + self.cc_1 = cc_1 + # print(self.lab_info) + print("cc.1") + self.controller_obj = "" + + + def setup_profiles(self, request, param, run_lf, instantiate_profile, get_configuration, get_markers, lf_tools, lf_reports): + table1= [] + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", ssid_data=None, + ap_data=self.lab_info['access_point'], type=0) + + instantiate_profile_obj.enable_all_bands() + + + if run_lf: + return 0 + print("check params") + # gives parameter value of setup_params_general + parameter = dict(param) + print("parameter", parameter) + + test_cases = {} + profile_data= {} + var = "" + list_key = list(parameter.keys()) + + if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]: + print("Invalid Mode: ", parameter['mode']) + return test_cases + profile_data["ssid"] = {} + lf_dut_data = [] + for i in parameter["ssid_modes"]: + profile_data["ssid"][i] = [] + for j in range(len(parameter["ssid_modes"][i])): + data = parameter["ssid_modes"][i][j] + profile_data["ssid"][i].append(data) + + # profile data will give ssid data like {'ssid': {'wpa2_personal': [{'ssid_name': 'ssid_wpa2_2g', 'appliedRadios': ['2G'], 'security_key': 'something'}, {'ssid_name': 'ssid_wpa2_5g', 'appliedRadios': ['5G'], 'security_key': 'something'}], 'wpa3_personal': [{'ssid_name': 'ssid_wpa2_5g', 'appliedRadios': ['6G'], 'security_key': 'something'}]}} + print("profile data", profile_data) + + + # create wlan + for mode in profile_data['ssid']: + if mode == "wpa2_personal": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("6G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'wpa2' + # print("dut data", lf_dut_data) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa2_personal"] = False + if mode == "wpa3_personal": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + print( profile_data["ssid"][mode]) + print(j) + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("6G"): + lf_dut_data.append(j) + print(lf_dut_data) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'wpa3' + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa3_personal"] = False + + # lf dut data [{'ssid_name': 'ssid_wpa2_2g', 'appliedRadios': ['2G'], 'security_key': 'something', 'security': 'wpa2'}, {'ssid_name': 'ssid_wpa2_5g', 'appliedRadios': ['5G'], 'security_key': 'something', 'security': 'wpa2'}, {'ssid_name': 'ssid_wpa2_5g', 'appliedRadios': ['6G'], 'security_key': 'something', 'security': 'wpa3'}] + print("lf dut data", lf_dut_data) + allure.attach(name="wlan data passing", body=str(parameter)) + ap = instantiate_profile_obj.show_ap_summary() + print("show ap summ", ap) + allure.attach(name="show ap summary", body=str(ap)) + + print("create 3 wlans on slot1,2 and 3") + for ap_name in range(len(self.lab_info['access_point'])): + print("ap ", ap_name) + # instantiate controller object + + print("set channel and bandwidth") + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], + timeout="10", ssid_data=lf_dut_data, + ap_data=self.lab_info['access_point'], type=ap_name) + instantiate_profile_obj.set_channel(band="6g", channel=self.lab_info['access_point'][ap_name]["channel_6g"], + slot=self.lab_info['access_point'][ap_name]["6g_slot"]) + allure.attach(name="set 6g channel on " + str(ap_name + 1) + " ap ", body=str(self.lab_info['access_point'][ap_name]["channel_6g"])) + instantiate_profile_obj.set_channel(band="5g", channel=self.lab_info['access_point'][ap_name]["channel_5g"], + slot=self.lab_info['access_point'][ap_name]["5g_slot"]) + allure.attach(name="set 5g channel on " + str(ap_name + 1) + " ap ", body=str(self.lab_info['access_point'][ap_name]["channel_5g"])) + instantiate_profile_obj.set_channel(band="2g", channel=self.lab_info['access_point'][ap_name]["channel_2g"], + slot=self.lab_info['access_point'][ap_name]["2g_slot"]) + allure.attach(name="set 2g channel on " + str(ap_name + 1) + " ap ", body=str(self.lab_info['access_point'][ap_name]["channel_2g"])) + print(self.lab_info['access_point'][ap_name]["ap_name"]) + check_admin = instantiate_profile_obj.check_admin_state_2ghz(ap_name=self.lab_info['access_point'][ap_name]["ap_name"]) + allure.attach(name="2ghz ap admin state for " + str(ap_name + 1) + " ap", body=str(check_admin)) + + check_admin_5 = instantiate_profile_obj.check_admin_state_5ghz(ap_name=self.lab_info['access_point'][ap_name]["ap_name"]) + allure.attach(name="5ghz ap admin state for " + str(ap_name + 1) + " ap", body=str(check_admin_5)) + + check_admin_6 = instantiate_profile_obj.check_admin_state_6ghz(ap_name=self.lab_info['access_point'][ap_name]["ap_name"]) + allure.attach(name="6ghz ap admin state for " + str(ap_name + 1) + " ap", body=str(check_admin_6)) + + # table1.append(tab) + + + if ap_name == 0: + for band in range(len(lf_dut_data)): + if lf_dut_data[band]["appliedRadios"] == ["2G"]: + instantiate_profile_obj.no_logging_console() + instantiate_profile_obj.line_console() + id_slot = instantiate_profile_obj.get_slot_id_wlan() + ssid_name = instantiate_profile_obj.get_ssid_name_on_id() + if id_slot[0] == "1": + instantiate_profile_obj.show_shutdown_2ghz_ap() + instantiate_profile_obj.disable_wlan(wlan=ssid_name[0]) + instantiate_profile_obj.ap_2ghz_shutdown() + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.delete_wlan(ssid=ssid_name[0]) + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.create_wlan_wpa2(id="1", wlan=lf_dut_data[0]['ssid_name'], + wlanssid=lf_dut_data[0]['ssid_name'], + key=lf_dut_data[0]['security_key']) + else: + print(lf_dut_data[0]['ssid_name']) + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.show_shutdown_2ghz_ap() + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.create_wlan_wpa2(id="1", wlan=lf_dut_data[0]['ssid_name'], + wlanssid=lf_dut_data[0]['ssid_name'], + key=lf_dut_data[0]['security_key']) + instantiate_profile_obj.get_ssids() + + if "roam_type" not in list_key: + # add wlan to single ap + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile(wlan=lf_dut_data[0]['ssid_name']) + instantiate_profile_obj.enable_wlan( wlan=lf_dut_data[0]['ssid_name']) + instantiate_profile_obj.enable_2ghz_netwrk(id="1", wlan=lf_dut_data[0]['ssid_name'], + wlanssid=lf_dut_data[0]['ssid_name'], + key=lf_dut_data[0]['security_key']) + instantiate_profile_obj.enable_ap_2ghz() + if parameter["ft+psk"] == True: + instantiate_profile_obj.enable_ft_psk(ssid=lf_dut_data[0]['ssid_name'], + key=lf_dut_data[0]['security_key']) + if parameter["ft-otd"] == True: + instantiate_profile_obj.enable_ftotd_psk(ssid=lf_dut_data[0]['ssid_name'], + key=lf_dut_data[0]['security_key']) + + if 'dtim' in list_key: + instantiate_profile_obj.set_dtim_2ghz(wlan=lf_dut_data[0]['ssid_name'], value=parameter["dtim"]) + instantiate_profile_obj.get_ssids() + + if lf_dut_data[band]["appliedRadios"] == ["5G"]: + instantiate_profile_obj.no_logging_console() + instantiate_profile_obj.line_console() + id_slot = instantiate_profile_obj.get_slot_id_wlan() + print(id_slot) + ssid_name = instantiate_profile_obj.get_ssid_name_on_id() + print(ssid_name) + if id_slot[1] == "2": + # if ssid present on slot 2 delete it and create new + instantiate_profile_obj.show_shutdown_5ghz_ap() + instantiate_profile_obj.disable_wlan(wlan=ssid_name[1]) + instantiate_profile_obj.ap_5ghz_shutdown() + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.delete_wlan(ssid=ssid_name[1]) + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.create_wlan_wpa2(id="2", wlan=lf_dut_data[1]['ssid_name'], + wlanssid=lf_dut_data[1]['ssid_name'], + key=lf_dut_data[1]['security_key']) + else: + print(lf_dut_data[1]['ssid_name']) + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.show_shutdown_5ghz_ap() + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.create_wlan_wpa2(id="2", wlan=lf_dut_data[1]['ssid_name'], + wlanssid=lf_dut_data[1]['ssid_name'], + key=lf_dut_data[1]['security_key']) + instantiate_profile_obj.get_ssids() + + if "roam_type" not in list_key: + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile(wlan=lf_dut_data[1]['ssid_name']) + instantiate_profile_obj.enable_wlan(wlan=lf_dut_data[1]['ssid_name']) + instantiate_profile_obj.enable_5ghz_netwrk(id="2", wlan=lf_dut_data[1]['ssid_name'], + wlanssid=lf_dut_data[1]['ssid_name'], + key=lf_dut_data[1]['security_key']) + instantiate_profile_obj.enable_ap_5ghz() + if parameter["ft+psk"] == True: + instantiate_profile_obj.enable_ft_psk(ssid=lf_dut_data[1]['ssid_name'], + key=lf_dut_data[1]['security_key']) + if parameter["ft-otd"] == True: + instantiate_profile_obj.enable_ftotd_psk(ssid=lf_dut_data[1]['ssid_name'], + key=lf_dut_data[1]['security_key']) + if 'dtim' in list_key: + instantiate_profile_obj.set_dtim_5ghz(wlan=lf_dut_data[1]['ssid_name'], value=parameter["dtim"]) + + instantiate_profile_obj.get_ssids() + if lf_dut_data[band]["appliedRadios"] == ["6G"]: + instantiate_profile_obj.no_logging_console() + instantiate_profile_obj.line_console() + id_slot = instantiate_profile_obj.get_slot_id_wlan() + print(id_slot) + ssid_name = instantiate_profile_obj.get_ssid_name_on_id() + print(ssid_name) + if id_slot[2] == "3": + instantiate_profile_obj.show_shutdown_6ghz_ap() + # instantiate_profile_obj.show_shutdown_5ghz_ap() + # instantiate_profile_obj.show_shutdown_2ghz_ap() + instantiate_profile_obj.disable_wlan(wlan=ssid_name[2]) + instantiate_profile_obj.ap_6ghz_shutdown() + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.delete_wlan(ssid=ssid_name[2]) + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.create_wlan_wpa3(id="3", wlan=lf_dut_data[2]['ssid_name'], + wlanssid=lf_dut_data[2]['ssid_name'], + key=lf_dut_data[2]['security_key']) + else: + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.show_shutdown_6ghz_ap() + instantiate_profile_obj.get_ssids() + instantiate_profile_obj.create_wlan_wpa3(id="3", wlan=lf_dut_data[2]['ssid_name'], + wlanssid=lf_dut_data[2]['ssid_name'], + key=lf_dut_data[2]['security_key']) + instantiate_profile_obj.get_ssids() + + if "roam_type" not in list_key: + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile( + wlan=lf_dut_data[2]['ssid_name']) + instantiate_profile_obj.enable_wlan( wlan=lf_dut_data[2]['ssid_name']) + instantiate_profile_obj.enable_6ghz_netwrk(id="3", wlan=lf_dut_data[2]['ssid_name'], + wlanssid=lf_dut_data[2]['ssid_name'], + key=lf_dut_data[2]['security_key']) + instantiate_profile_obj.enable_ap_6ghz() + # if parameter["ft+psk"] == True: + # instantiate_profile_obj.enable_ft_sae(ssid=lf_dut_data[2]['ssid_name'], key=lf_dut_data[2]['security_key']) + if parameter["ft-dot1x"] == True: + instantiate_profile_obj.enable_ft_dot1x_wpa3(ssid=lf_dut_data[2]['ssid_name']) + if parameter["ft-dot1x_sha256"] == True: + instantiate_profile_obj.enable_ft_dot1x_sha256_wpa3(ssid=lf_dut_data[2]['ssid_name'], + radius=get_configuration['controller']["radius"]) + instantiate_profile_obj.get_ssids() + + twog_sum = instantiate_profile_obj.show_2ghz_summary() + fiveg_sum = instantiate_profile_obj.show_5ghz_summary() + sixg_sum = instantiate_profile_obj.show_6ghz_summary() + allure.attach(name="ap admin state 2ghz ", body=str(twog_sum)) + allure.attach(name="ap admin state 5ghz ", body=str(fiveg_sum)) + allure.attach(name="ap admin state 6ghz", body=str(sixg_sum)) + sumy = instantiate_profile_obj.get_ssids() + allure.attach(name="wlan summary after creating test wlan", body=str(sumy)) + if "roam_type" in list_key: + print("after creating wlan's assign wlan to respective tag policy") + for ap_name in range(len(self.lab_info['access_point'])): + print("ap ", ap_name) + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], timeout="10", + ssid_data=lf_dut_data, ap_data=self.lab_info['access_point'], + type=ap_name) + if parameter["roam_type"] == "fiveg_to_fiveg": + # add 5g wlan to both ap's + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile(wlan=lf_dut_data[1]['ssid_name']) + if parameter["roam_type"] == "fiveg_to_sixg": + if ap_name == 0: + # add 5g ssid to 5g ap + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile(wlan=lf_dut_data[1]['ssid_name']) + if ap_name == 1: + # add 6g ssid to 6g ap + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile(wlan=lf_dut_data[2]['ssid_name']) + + else: + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile( + wlan=lf_dut_data[0]['ssid_name']) + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile( + wlan=lf_dut_data[1]['ssid_name']) + instantiate_profile_obj.config_wireless_tag_policy_and_policy_profile( + wlan=lf_dut_data[2]['ssid_name']) + + + + + bssid_list_2g = [] + bssid_list_5g = [] + ssid_data_list = [] + + for ap_name in range(len(self.lab_info['access_point'])): + print("ap ", ap_name) + # instantiate controller object + instantiate_profile_obj = instantiate_profile(controller_data=get_configuration['controller'], timeout="10", + ssid_data=lf_dut_data, ap_data=self.lab_info['access_point'], type=ap_name) + bss2_info = instantiate_profile_obj.get_ap_bssid_2g() + allure.attach(name="wlan 2g bssid info", body=str(bss2_info)) + bss5_info = instantiate_profile_obj.get_ap_bssid_5g() + allure.attach(name="wlan 5g bssid info", body=str(bss5_info)) + bss6_info = instantiate_profile_obj.get_ap_bssid_6g() + allure.attach(name="wlan 6g bssid info", body=str(bss6_info)) + + bssid_2g = instantiate_profile_obj.cal_bssid_2g() + print("bssid 2g", bssid_2g) + lst_2g = bssid_list_2g.append(bssid_2g) + + bssid_5g = instantiate_profile_obj.cal_bssid_5g() + print("bssid 5g ", bssid_5g) + lst_5g = bssid_list_5g.append(bssid_5g) + # print(bssid_5g) + # print(bssid_list_2g) + # print(bssid_list_5g) + ssid_data = [] + try: + idx_mapping = {} + bssid = "" + for interface in range(len(lf_dut_data)): + if interface == 0: + bssid = bssid_2g + if interface == 1: + bssid = bssid_5g + if lf_dut_data[interface]['security'] == "psk2": + lf_dut_data[interface]['security'] = "WPA2" + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + lf_dut_data[interface]['ssid_name'] + + " security=" + lf_dut_data[interface]['security'] + + " password=" + lf_dut_data[interface]['security_key'] + + " bssid=" + bssid + ] + ssid_data.append(ssid) + + except Exception as e: + print(e) + pass + # print("nikita",ssid_data) + ssid_data_list.append(ssid_data) + print("final ssid data", ssid_data_list) + # ssid_data = [[['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=68:7d:b4:5f:5c:30'], + # ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=68:7d:b4:5f:5c:3e']], + # [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=14:16:9d:53:58:c0'], + # ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=14:16:9d:53:58:ce']]] + lf_tools.create_non_meh_dut(ssid_data=ssid_data_list) + table1 = [["show ap summary", "done"], ["Configure wlan", "done"], ["configure channel/width", "done"], + ["ap admin state up", "done"], ["checking admin state of ap", "done"], + ["sniffer capture", "done"], + ["client connect", "don"], ["roam", "done"], ["sniffer verification", "done"], + ["iteration", "done"], + ["table display", "done"], ["check in controller client connected", "done"], + ["Bring down unused band before start of testcase", "done"]] + tab1 = lf_reports.table2(table=table1) + allure.attach(name="skeleton of code", body=str(tab1)) + + + + + + + + +