diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 717ec567..def8c2d4 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -7,6 +7,7 @@ import sys import time import string import random +from itertools import islice import paramiko from datetime import datetime @@ -309,30 +310,45 @@ class lf_tests(lf_libs): pass def client_connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog", - vlan_id=100, num_sta=None, scan_ssid=True, + vlan_id=[None], num_sta=None, scan_ssid=True, sta_mode=0, station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"], - allure_attach=True): - data = self.setup_interfaces(band=band, vlan_id=vlan_id, mode=mode, num_sta=num_sta) - logging.info("Setup interface data" + str(data)) - if self.run_lf: - ssid = data["ssid"] - passkey = data["passkey"] - security = data["security"] + allure_attach=True, identifier=None, allure_name="station data"): + if identifier is None: + identifier = self.dut_data[0]["identifier"] + logging.info("Identifier: " + str(identifier)) + else: + all_identifier_list = [] + for dut in self.dut_data: + all_identifier_list.append(self.dut_data[dut]["indentifier"]) + print(all_identifier_list) + if identifier not in all_identifier_list: + logging.error("Identifier is missinhg") + pytest.fail("Identifier is missinhg") + + data = self.setup_interfaces(ssid=ssid, passkey=passkey, encryption=security, + band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta) + + logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2)) + allure.attach(name="Interface Info: \n", body=json.dumps(str(data), indent=2), + attachment_type=allure.attachment_type.JSON) + if data == {}: + pytest.skip("Skipping This Test") client_connect_obj = [] station_data_all = {} - for radio in data["radios"]: + for radio in data[identifier]["station_data"]: client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port, - _sta_list=data["radios"][radio], _password=passkey, _ssid=ssid, - _security=security) - client_connect.station_profile.sta_mode = 0 - client_connect.upstream_resource = data["upstream_port"].split(".")[1] - client_connect.upstream_port = data["upstream_port"].split(".")[2] + _sta_list=data[identifier]["station_data"][radio], _password=data[identifier]["passkey"], + _ssid=data[identifier]["ssid"], + _security=data[identifier]["encryption"]) + client_connect.station_profile.sta_mode = sta_mode + client_connect.upstream_resource = data[identifier]["upstream_resource"] + client_connect.upstream_port = data[identifier]["upstream"] client_connect.radio = radio - logging.info("scan ssid radio: " + str(client_connect.radio)) - if scan_ssid: - self.data_scan_ssid = self.scan_ssid(radio=client_connect.radio, ssid=ssid) - logging.info("ssid scan data: " + str(self.data_scan_ssid)) - client_connect_obj.append(client_connect) + logging.info("scan ssid radio: " + str(client_connect.radio)) + if scan_ssid: + self.data_scan_ssid = self.scan_ssid(radio=client_connect.radio, ssid=ssid) + logging.info("ssid scan data: " + str(self.data_scan_ssid)) + client_connect_obj.append(client_connect) pass_fail = [] for obj in client_connect_obj: obj.build() @@ -357,7 +373,7 @@ class lf_tests(lf_libs): pass_fail_sta.append("Pass") sta_table_dict["Pass/Fail"] = pass_fail_sta if allure_attach: - self.attach_table_allure(data=sta_table_dict, allure_name="station data") + self.attach_table_allure(data=sta_table_dict, allure_name=allure_name) logging.info("pass_fail result: " + str(pass_fail)) if False in pass_fail: @@ -367,6 +383,27 @@ class lf_tests(lf_libs): logging.info("ALL Stations got IP's") return station_data_all + # def dfs_test(self, ssid=None, security=None, passkey=None, mode=None, + # band=None, num_sta=1, vlan_id=[None], dut_data={}): + # """DFS test""" + # if len(dut_data) == 0: + # logging.error("DUT data is empty") + # pytest.fail("DUT data is empty") + # for identifier in dut_data: + # station_data = self.client_connect(ssid=ssid, security=security, passkey=passkey, mode=mode, + # band=band, num_sta=num_sta, vlan_id=vlan_id, + # allure_name="Data before simulate radar", identifier=identifier) + # station_list = list(station_data.kyes()) + # pass_fail = [] + # logging.info("AP channel: " + str(ap_channel)) + # + # #run the simulate radar command + + + + + + def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None): dut_name = [] @@ -566,8 +603,6 @@ class lf_tests(lf_libs): instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12)) if mode == "BRIDGE": ret = self.get_wan_upstream_ports() - print("ret", ret) - print("dut", dut) upstream_port = ret[identifier] if mode == "NAT-WAN": @@ -584,8 +619,8 @@ class lf_tests(lf_libs): else: self.add_vlan(vlan_ids=[vlan_id]) ret = self.get_wan_upstream_ports() - upstream_data = ret[identifier] + "." + str(vlan_id) - logging.info("Upstream data: ", upstream_port) + upstream_port = ret[identifier] + "." + str(vlan_id) + logging.info("Upstream data: " + str(upstream_port)) sets = [["DUT_NAME", dut]] '''SINGLE WIFI CAPACITY using lf_wifi_capacity.py''' for band_ in num_stations: @@ -637,7 +672,7 @@ class lf_tests(lf_libs): if move_to_influx: try: report_name = "../reports/" + \ - wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/" influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], influx_port=self.influx_params["influx_port"], influx_org=self.influx_params["influx_org"], @@ -649,7 +684,7 @@ class lf_tests(lf_libs): except Exception as e: print(e) pass - report_name = wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + report_name = wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/" self.attach_report_graphs(report_name=report_name) self.attach_report_kpi(report_name=report_name) @@ -906,29 +941,31 @@ if __name__ == '__main__': } obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]), - log_level=logging.DEBUG, run_lf=True) + log_level=logging.DEBUG, run_lf=False) # obj.add_stations() # obj.add_stations(band="5G") # obj.chamber_view(raw_lines="custom") - dut = {'0000c1018812': {"ssid_data": { - 0: {"ssid": 'TestSSID-2G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '2G', - "bssid": '00:00:C1:01:88:15'}, - 1: {"ssid": 'TestSSID-5G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '5G', - "bssid": '00:00:C1:01:88:14'}}, "radio_data": {'2G': [1, 40, 2422], '5G': [36, 80, 5210], '6G': None}}} - obj.wifi_capacity(instance_name="test_client_wpa2_BRIDGE_udp_bi", mode="BRIDGE", - vlan_id=[100], - download_rate="1Gbps", batch_size="1,5,10,20,40,64,128,256", - influx_tags="Jitu", - upload_rate="1Gbps", protocol="UDP-IPv4", duration="60000", - move_to_influx=False, dut_data=dut, ssid_name="OpenWifi", - num_stations={"2G": 10, "5G": 10}) + # dut = {'0000c1018812': {"ssid_data": { + # 0: {"ssid": 'TestSSID-2G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '2G', + # "bssid": '00:00:C1:01:88:15'}, + # 1: {"ssid": 'TestSSID-5G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '5G', + # "bssid": '00:00:C1:01:88:14'}}, "radio_data": {'2G': [1, 40, 2422], '5G': [36, 80, 5210], '6G': None}}} + # obj.wifi_capacity(instance_name="test_client_wpa2_BRIDGE_udp_bi", mode="BRIDGE", + # vlan_id=[100], + # download_rate="1Gbps", batch_size="1,5,10,20,40,64,128,256", + # influx_tags="Jitu", + # upload_rate="1Gbps", protocol="UDP-IPv4", duration="60000", + # move_to_influx=False, dut_data=dut, ssid_name="OpenWifi", + # num_stations={"2G": 10, "5G": 10}) # A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1) # print(A) # obj.setup_relevent_profiles() - # obj.Client_Connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog", - # vlan_id=100, num_sta=5, scan_ssid=True, + # obj.client_connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog", + # vlan_id=[None], num_sta=65, scan_ssid=True, # station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"], # allure_attach=True) + obj.dfs_test(ssid="OpenWifi", security="wpa2", passkey="OpenWifi", mode="BRIDGE", + band="fiveg", num_sta=1) # obj.add_vlan(vlan_iFds=[100]) # obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600]) # obj.get_cx_data()