diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index e855b07a..7504f8a6 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -1691,27 +1691,115 @@ class lf_libs: if name != None: allure.attach(name=name, body=str(data_table)) - def client_connect_using_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", - vlan_id=100, radio=None, sta_mode=0, station_name=[]): - self.client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port, _mode=sta_mode, - _sta_list=station_name, _password=passkey, _ssid=ssid, _security=security) + def client_connect_using_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band=None, + vlan_id=100, radio=None, sta_mode=0, station_name=[], num_sta=None, + dut_data=None, sniff_radio=False): + # pre cleanup + # if pre_cleanup: + # self.pre_cleanup() + global data + if dut_data is not None: + data = self.setup_interfaces(ssid=ssid, passkey=passkey, encryption=security, band=band, vlan_id=vlan_id, + mode=mode, num_sta=num_sta, dut_data_=dut_data) + print("dut data :",data) - # self.client_connect.station_profile.sta_mode = sta_mode - self.client_connect.upstream_resource = 1 + client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port, _mode=sta_mode, + _sta_list=station_name, _password=passkey, _ssid=ssid, _security=security) + + client_connect.upstream_resource = 1 if mode == "BRIDGE": - self.client_connect.upstream_port = self.wan_ports - elif mode == "NAT": - self.client_connect.upstream_port = self.wan_ports - else: - self.client_connect.upstream_port = self.wan_ports + "." + str(vlan_id) + client_connect.upstream_port = self.wan_ports + elif mode == "NAT-WAN": + client_connect.upstream_port = self.wan_ports + elif mode == "VLAN": + client_connect.upstream_port = self.wan_ports + "." + str(vlan_id) - self.client_connect.radio = radio - self.client_connect.build() - # self.client_connect.wait_for_ip(station_name, timeout_sec=100) - # print(self.client_connect.wait_for_ip(station_name)) - if self.client_connect.wait_for_ip(station_name, timeout_sec=100): - self.client_connect._pass("ALL Stations got IP's", print_=True) - return self.client_connect + client_connect.radio = radio + + if sniff_radio: + for dut in data: + for dut_ in self.dut_data: + identifier = dut_["identifier"] + if dut_data.keys().__contains__(identifier): + if band == "twog": + sta_length = len(station_name) + for i in range(sta_length): + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \ + dict(dut_data.get(identifier)["radio_data"])["2G"] is not None: + sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"] + if data[dut]["sniff_radio_2g"] is not None and sniffer_channel is not None: + self.start_sniffer(radio_channel=sniffer_channel, + radio=data[dut]["sniff_radio_2g"], + duration=60) + logging.info("started-sniffer") + + client_connect.build() + + # station data + sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i])) + self.allure_report_table_format(dict_data=sta_data1["interface"], + key="STATION DETAILS", value="VALUE", + name="%s info" % (station_name[i])) + logging.info("napping 60 sec") + time.sleep(60) + if data[dut]["sniff_radio_2g"] is not None and sniffer_channel is not None: + self.stop_sniffer() + elif band == "fiveg": + sta_length = len(station_name) + for i in range(sta_length): + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \ + dict(dut_data.get(identifier)["radio_data"])["5G"] is not None: + sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"] + if data[dut]["sniff_radio_5g"] is not None and sniffer_channel is not None: + self.start_sniffer(radio_channel=sniffer_channel, + radio=data[dut]["sniff_radio_5g"], + duration=60) + logging.info("started-sniffer") + + client_connect.build() + + # station data + sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i])) + self.allure_report_table_format(dict_data=sta_data1["interface"], + key="STATION DETAILS", value="VALUE", + name="%s info" % (station_name[i])) + logging.info("napping 60 sec") + time.sleep(60) + if data[dut]["sniff_radio_5g"] is not None and sniffer_channel is not None: + self.stop_sniffer() + elif band == "sixg": + sta_length = len(station_name) + for i in range(sta_length): + if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \ + dict(dut_data.get(identifier)["radio_data"])["6G"] is not None: + sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"] + if data[dut]["sniff_radio_6g"] is not None and sniffer_channel is not None: + self.start_sniffer(radio_channel=sniffer_channel, + radio=data[dut]["sniff_radio_6g"], + duration=60) + logging.info("started-sniffer") + + client_connect.build() + + # station data + sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i])) + self.allure_report_table_format(dict_data=sta_data1["interface"], + key="STATION DETAILS", value="VALUE", + name="%s info" % (station_name[i])) + logging.info("napping 60 sec") + time.sleep(60) + if data[dut]["sniff_radio_6g"] is not None and sniffer_channel is not None: + self.stop_sniffer() + # fetch supplicant logs from lanforge + try: + self.get_supplicant_logs(radio=str(radio)) + except Exception as e: + logging.error(f"Error in getting Supplicant logs: {str(e)}") + else: + client_connect.build() + if client_connect.wait_for_ip(station_name, timeout_sec=100): + client_connect._pass("ALL Stations got IP's", print_=True) + return client_connect else: return False