Added sniff_radio functionality

Signed-off-by: tarun-candela <tarunkumar.madabathula@candelatech.com>
This commit is contained in:
tarun-candela
2023-01-30 13:12:58 +05:30
parent 042e3d44a0
commit 6f8df7be01

View File

@@ -1691,27 +1691,115 @@ class lf_libs:
if name != None:
allure.attach(name=name, body=str(data_table))
def client_connect_using_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE",
vlan_id=100, radio=None, sta_mode=0, station_name=[]):
self.client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port, _mode=sta_mode,
_sta_list=station_name, _password=passkey, _ssid=ssid, _security=security)
def client_connect_using_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band=None,
vlan_id=100, radio=None, sta_mode=0, station_name=[], num_sta=None,
dut_data=None, sniff_radio=False):
# pre cleanup
# if pre_cleanup:
# self.pre_cleanup()
global data
if dut_data is not None:
data = self.setup_interfaces(ssid=ssid, passkey=passkey, encryption=security, band=band, vlan_id=vlan_id,
mode=mode, num_sta=num_sta, dut_data_=dut_data)
print("dut data :",data)
# self.client_connect.station_profile.sta_mode = sta_mode
self.client_connect.upstream_resource = 1
client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port, _mode=sta_mode,
_sta_list=station_name, _password=passkey, _ssid=ssid, _security=security)
client_connect.upstream_resource = 1
if mode == "BRIDGE":
self.client_connect.upstream_port = self.wan_ports
elif mode == "NAT":
self.client_connect.upstream_port = self.wan_ports
else:
self.client_connect.upstream_port = self.wan_ports + "." + str(vlan_id)
client_connect.upstream_port = self.wan_ports
elif mode == "NAT-WAN":
client_connect.upstream_port = self.wan_ports
elif mode == "VLAN":
client_connect.upstream_port = self.wan_ports + "." + str(vlan_id)
self.client_connect.radio = radio
self.client_connect.build()
# self.client_connect.wait_for_ip(station_name, timeout_sec=100)
# print(self.client_connect.wait_for_ip(station_name))
if self.client_connect.wait_for_ip(station_name, timeout_sec=100):
self.client_connect._pass("ALL Stations got IP's", print_=True)
return self.client_connect
client_connect.radio = radio
if sniff_radio:
for dut in data:
for dut_ in self.dut_data:
identifier = dut_["identifier"]
if dut_data.keys().__contains__(identifier):
if band == "twog":
sta_length = len(station_name)
for i in range(sta_length):
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
if data[dut]["sniff_radio_2g"] is not None and sniffer_channel is not None:
self.start_sniffer(radio_channel=sniffer_channel,
radio=data[dut]["sniff_radio_2g"],
duration=60)
logging.info("started-sniffer")
client_connect.build()
# station data
sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i]))
self.allure_report_table_format(dict_data=sta_data1["interface"],
key="STATION DETAILS", value="VALUE",
name="%s info" % (station_name[i]))
logging.info("napping 60 sec")
time.sleep(60)
if data[dut]["sniff_radio_2g"] is not None and sniffer_channel is not None:
self.stop_sniffer()
elif band == "fiveg":
sta_length = len(station_name)
for i in range(sta_length):
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
if data[dut]["sniff_radio_5g"] is not None and sniffer_channel is not None:
self.start_sniffer(radio_channel=sniffer_channel,
radio=data[dut]["sniff_radio_5g"],
duration=60)
logging.info("started-sniffer")
client_connect.build()
# station data
sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i]))
self.allure_report_table_format(dict_data=sta_data1["interface"],
key="STATION DETAILS", value="VALUE",
name="%s info" % (station_name[i]))
logging.info("napping 60 sec")
time.sleep(60)
if data[dut]["sniff_radio_5g"] is not None and sniffer_channel is not None:
self.stop_sniffer()
elif band == "sixg":
sta_length = len(station_name)
for i in range(sta_length):
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]
if data[dut]["sniff_radio_6g"] is not None and sniffer_channel is not None:
self.start_sniffer(radio_channel=sniffer_channel,
radio=data[dut]["sniff_radio_6g"],
duration=60)
logging.info("started-sniffer")
client_connect.build()
# station data
sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i]))
self.allure_report_table_format(dict_data=sta_data1["interface"],
key="STATION DETAILS", value="VALUE",
name="%s info" % (station_name[i]))
logging.info("napping 60 sec")
time.sleep(60)
if data[dut]["sniff_radio_6g"] is not None and sniffer_channel is not None:
self.stop_sniffer()
# fetch supplicant logs from lanforge
try:
self.get_supplicant_logs(radio=str(radio))
except Exception as e:
logging.error(f"Error in getting Supplicant logs: {str(e)}")
else:
client_connect.build()
if client_connect.wait_for_ip(station_name, timeout_sec=100):
client_connect._pass("ALL Stations got IP's", print_=True)
return client_connect
else:
return False