mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 11:18:03 +00:00
added sniffer & supplicant logs for client-connect
Signed-off-by: anil-tegala <anil.tegala@candelatech.com>
This commit is contained in:
@@ -582,7 +582,6 @@ class lf_tests(lf_libs):
|
|||||||
|
|
||||||
logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2))
|
logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2))
|
||||||
|
|
||||||
|
|
||||||
# query and fetch vlan Ip Address
|
# query and fetch vlan Ip Address
|
||||||
port_data = self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces']
|
port_data = self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces']
|
||||||
# Fail if Vlan don't have IP
|
# Fail if Vlan don't have IP
|
||||||
@@ -737,6 +736,33 @@ class lf_tests(lf_libs):
|
|||||||
client_connect_obj = []
|
client_connect_obj = []
|
||||||
station_data_all = {}
|
station_data_all = {}
|
||||||
for radio in data[identifier]["station_data"]:
|
for radio in data[identifier]["station_data"]:
|
||||||
|
if band == "twog":
|
||||||
|
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \
|
||||||
|
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
|
||||||
|
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
|
||||||
|
if data[identifier]["sniff_radio_2g"] is not None and sniffer_channel is not None:
|
||||||
|
self.start_sniffer(radio_channel=sniffer_channel, test_name=f'{data[identifier]["station_data"][radio][0]}',
|
||||||
|
radio=data[identifier]["sniff_radio_2g"],
|
||||||
|
duration=120)
|
||||||
|
logging.info("started-sniffer")
|
||||||
|
if band == "fiveg":
|
||||||
|
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \
|
||||||
|
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
|
||||||
|
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
|
||||||
|
if data[identifier]["sniff_radio_5g"] is not None and sniffer_channel is not None:
|
||||||
|
self.start_sniffer(radio_channel=sniffer_channel,
|
||||||
|
radio=data[identifier]["sniff_radio_5g"],
|
||||||
|
duration=120)
|
||||||
|
logging.info("started-sniffer")
|
||||||
|
if band == "sixg":
|
||||||
|
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \
|
||||||
|
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
|
||||||
|
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]
|
||||||
|
if data[identifier]["sniff_radio_6g"] is not None and sniffer_channel is not None:
|
||||||
|
self.start_sniffer(radio_channel=sniffer_channel,
|
||||||
|
radio=data[identifier]["sniff_radio_6g"],
|
||||||
|
duration=120)
|
||||||
|
logging.info("started-sniffer")
|
||||||
client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port,
|
client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port,
|
||||||
_sta_list=data[identifier]["station_data"][radio],
|
_sta_list=data[identifier]["station_data"][radio],
|
||||||
_password=data[identifier]["passkey"],
|
_password=data[identifier]["passkey"],
|
||||||
@@ -777,12 +803,26 @@ class lf_tests(lf_libs):
|
|||||||
if allure_attach:
|
if allure_attach:
|
||||||
self.attach_table_allure(data=sta_table_dict, allure_name=allure_name)
|
self.attach_table_allure(data=sta_table_dict, allure_name=allure_name)
|
||||||
|
|
||||||
|
# stop sniffer if active
|
||||||
|
logging.info(msg=str("Cleaning up sniffer interface If available on PORT Manager"))
|
||||||
|
port_data = self.json_get(_req_url="/port?fields=alias,parent+dev,port+type,ip,mac")['interfaces']
|
||||||
|
# for i in port_data:
|
||||||
|
# for item in i:
|
||||||
|
# if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0':
|
||||||
|
# logging.info('VLAN do not have IP')
|
||||||
|
if self.start_sniffer:
|
||||||
|
self.stop_sniffer()
|
||||||
|
|
||||||
logging.info("pass_fail result: " + str(pass_fail))
|
logging.info("pass_fail result: " + str(pass_fail))
|
||||||
if False in pass_fail:
|
if False in pass_fail:
|
||||||
logging.info("Station did not get an ip")
|
logging.info("Station did not get an ip")
|
||||||
|
for radio in data[identifier]["station_data"]:
|
||||||
|
self.get_supplicant_logs(radio=str(radio))
|
||||||
pytest.fail("Station did not get an ip")
|
pytest.fail("Station did not get an ip")
|
||||||
else:
|
else:
|
||||||
logging.info("ALL Stations got IP's")
|
logging.info("ALL Stations got IP's")
|
||||||
|
for radio in data[identifier]["station_data"]:
|
||||||
|
self.get_supplicant_logs(radio=str(radio))
|
||||||
return station_data_all
|
return station_data_all
|
||||||
|
|
||||||
def dfs_test(self, ssid=None, security=None, passkey=None, mode=None,
|
def dfs_test(self, ssid=None, security=None, passkey=None, mode=None,
|
||||||
@@ -1274,7 +1314,7 @@ class lf_tests(lf_libs):
|
|||||||
per_radio_sta = int(num_stations / len(radio))
|
per_radio_sta = int(num_stations / len(radio))
|
||||||
rem = num_stations % len(radio)
|
rem = num_stations % len(radio)
|
||||||
logging.info("Total stations per radio: " + str(per_radio_sta))
|
logging.info("Total stations per radio: " + str(per_radio_sta))
|
||||||
num_stations = lambda rem: per_radio_sta+1 if rem else per_radio_sta
|
num_stations = lambda rem: per_radio_sta + 1 if rem else per_radio_sta
|
||||||
identifier = list(dut_data.keys())[0]
|
identifier = list(dut_data.keys())[0]
|
||||||
allure.attach(name="Definition",
|
allure.attach(name="Definition",
|
||||||
body="Multiple association/disassociation stability test intends to measure stability of Wi-Fi device " \
|
body="Multiple association/disassociation stability test intends to measure stability of Wi-Fi device " \
|
||||||
@@ -1288,8 +1328,8 @@ class lf_tests(lf_libs):
|
|||||||
|
|
||||||
for i in radio:
|
for i in radio:
|
||||||
station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] +
|
station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] +
|
||||||
" STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" +
|
" STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" +
|
||||||
str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]]
|
str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]]
|
||||||
rem = 0
|
rem = 0
|
||||||
self.temp_raw_lines.append(station_data)
|
self.temp_raw_lines.append(station_data)
|
||||||
logging.debug("Raw Line : " + str(station_data))
|
logging.debug("Raw Line : " + str(station_data))
|
||||||
@@ -1309,10 +1349,12 @@ class lf_tests(lf_libs):
|
|||||||
val = [['ul_rate_sel: Per-Station Upload Rate:']]
|
val = [['ul_rate_sel: Per-Station Upload Rate:']]
|
||||||
thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],))
|
thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],))
|
||||||
thr1.start()
|
thr1.start()
|
||||||
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, download_rate=downld_rate,
|
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan,
|
||||||
|
download_rate=downld_rate,
|
||||||
stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate,
|
stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate,
|
||||||
protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data,
|
protocol="UDP-IPv4", duration="120000", create_stations=False,
|
||||||
sort = "interleave",)
|
dut_data=dut_data,
|
||||||
|
sort="interleave", )
|
||||||
|
|
||||||
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
|
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
|
||||||
self.attach_report_graphs(report_name=report_name)
|
self.attach_report_graphs(report_name=report_name)
|
||||||
|
|||||||
Reference in New Issue
Block a user