From 202a348ca852657d00b40f1c410db6116d5a3aef Mon Sep 17 00:00:00 2001 From: anil-tegala Date: Tue, 6 Dec 2022 09:14:40 +0530 Subject: [PATCH] added sniffer & supplicant logs for client-connect Signed-off-by: anil-tegala --- lf_libs/lf_tests.py | 56 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 6c583299..db8fa594 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -582,7 +582,6 @@ class lf_tests(lf_libs): logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2)) - # query and fetch vlan Ip Address port_data = self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces'] # Fail if Vlan don't have IP @@ -737,6 +736,33 @@ class lf_tests(lf_libs): client_connect_obj = [] station_data_all = {} for radio in data[identifier]["station_data"]: + if band == "twog": + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \ + dict(dut_data.get(identifier)["radio_data"])["2G"] is not None: + sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"] + if data[identifier]["sniff_radio_2g"] is not None and sniffer_channel is not None: + self.start_sniffer(radio_channel=sniffer_channel, test_name=f'{data[identifier]["station_data"][radio][0]}', + radio=data[identifier]["sniff_radio_2g"], + duration=120) + logging.info("started-sniffer") + if band == "fiveg": + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \ + dict(dut_data.get(identifier)["radio_data"])["5G"] is not None: + sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"] + if data[identifier]["sniff_radio_5g"] is not None and sniffer_channel is not None: + self.start_sniffer(radio_channel=sniffer_channel, + radio=data[identifier]["sniff_radio_5g"], + duration=120) + logging.info("started-sniffer") + if band == "sixg": + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \ + dict(dut_data.get(identifier)["radio_data"])["6G"] is not None: + sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"] + if data[identifier]["sniff_radio_6g"] is not None and sniffer_channel is not None: + self.start_sniffer(radio_channel=sniffer_channel, + radio=data[identifier]["sniff_radio_6g"], + duration=120) + logging.info("started-sniffer") client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port, _sta_list=data[identifier]["station_data"][radio], _password=data[identifier]["passkey"], @@ -777,12 +803,26 @@ class lf_tests(lf_libs): if allure_attach: self.attach_table_allure(data=sta_table_dict, allure_name=allure_name) + # stop sniffer if active + logging.info(msg=str("Cleaning up sniffer interface If available on PORT Manager")) + port_data = self.json_get(_req_url="/port?fields=alias,parent+dev,port+type,ip,mac")['interfaces'] + # for i in port_data: + # for item in i: + # if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0': + # logging.info('VLAN do not have IP') + if self.start_sniffer: + self.stop_sniffer() + logging.info("pass_fail result: " + str(pass_fail)) if False in pass_fail: logging.info("Station did not get an ip") + for radio in data[identifier]["station_data"]: + self.get_supplicant_logs(radio=str(radio)) pytest.fail("Station did not get an ip") else: logging.info("ALL Stations got IP's") + for radio in data[identifier]["station_data"]: + self.get_supplicant_logs(radio=str(radio)) return station_data_all def dfs_test(self, ssid=None, security=None, passkey=None, mode=None, @@ -1274,7 +1314,7 @@ class lf_tests(lf_libs): per_radio_sta = int(num_stations / len(radio)) rem = num_stations % len(radio) logging.info("Total stations per radio: " + str(per_radio_sta)) - num_stations = lambda rem: per_radio_sta+1 if rem else per_radio_sta + num_stations = lambda rem: per_radio_sta + 1 if rem else per_radio_sta identifier = list(dut_data.keys())[0] allure.attach(name="Definition", body="Multiple association/disassociation stability test intends to measure stability of Wi-Fi device " \ @@ -1288,8 +1328,8 @@ class lf_tests(lf_libs): for i in radio: station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] + - " STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]] + " STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]] rem = 0 self.temp_raw_lines.append(station_data) logging.debug("Raw Line : " + str(station_data)) @@ -1309,10 +1349,12 @@ class lf_tests(lf_libs): val = [['ul_rate_sel: Per-Station Upload Rate:']] thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],)) thr1.start() - wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, download_rate=downld_rate, + wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, + download_rate=downld_rate, stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate, - protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data, - sort = "interleave",) + protocol="UDP-IPv4", duration="120000", create_stations=False, + dut_data=dut_data, + sort="interleave", ) report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/" self.attach_report_graphs(report_name=report_name)