added the num_sta instead of sta_names

Signed-off-by: shivam <shivam.thakur@candelatech.com>
This commit is contained in:
shivam
2022-08-09 13:40:59 +05:30
parent c92cb45859
commit 1a7acc7b42
2 changed files with 24 additions and 27 deletions

View File

@@ -55,9 +55,9 @@ class lf_tests(lf_libs):
self.run_lf = run_lf
# self.upstream_port = list(self.uplink_nat_ports.keys())[0]
# self.skip_pcap = skip_pcap
#self.wan_upstream = list(self.wan_ports.keys())
# self.wan_upstream = list(self.wan_ports.keys())
# self.lan_upstream =
self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam", _cleanup_on_exit=False)
self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam", _cleanup_on_exit=False)
def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None):
if band is None:
@@ -251,7 +251,8 @@ class lf_tests(lf_libs):
logging.info("Existing layer3 and endp are not available")
else:
list(map(lambda i: self.staConnect.rm_cx(cx_name=i), exist_l3))
list(map(lambda cx_name: [self.staConnect.rm_endp(ename=i) for i in [f"{cx_name}-A", f"{cx_name}-B"]], exist_l3))
list(map(lambda cx_name: [self.staConnect.rm_endp(ename=i) for i in [f"{cx_name}-A", f"{cx_name}-B"]],
exist_l3))
except Exception as e:
logging.error(e)
@@ -272,8 +273,6 @@ class lf_tests(lf_libs):
print(upstream_port)
return upstream_port
def setup_sniffer(self, band=None, station_radio_data=None):
"""Setup sniff radio"""
sniff_radio = None
@@ -298,11 +297,13 @@ class lf_tests(lf_libs):
return sniff_radio
def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[],
station_name=[], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None, allure_attch=True):
num_sta=1, mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None,
allure_attch=True):
# self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug)
# setup_interfaces() interface selection return radio name along no of station on each radio, upstream port
#
data = self.setup_interfaces(band=band, vlan_id=vlan_id, mode=mode, num_sta=len(station_name))
data = self.setup_interfaces(band=band, vlan_id=vlan_id, mode=mode, num_sta=num_sta)
sta_list = self.get_station_list(num_sta=num_sta, band=band)
logging.info("Setup interface data" + str(data))
if self.run_lf:
ssid = data["ssid"]
@@ -317,8 +318,8 @@ class lf_tests(lf_libs):
# list of per radio station
length_to_split = list(data["radios"].values())
print(length_to_split)
sta_list = iter(station_name)
# station list of per radio
sta_list = iter(sta_list)
sta_list_ = [list(islice(sta_list, elem))
for elem in length_to_split]
# Checking station lists according to radios
@@ -334,13 +335,13 @@ class lf_tests(lf_libs):
self.staConnect.admin_down(self.staConnect.radio)
self.staConnect.admin_up(self.staConnect.radio)
self.staConnect.sta_prefix = data["sta_prefix"]
#changed to auto channel
# changed to auto channel
self.set_radio_channel(radio=radio, channel="AUTO")
print("scan ssid radio", radio.split(".")[2])
result = self.scan_ssid(radio=radio.split(".")[2], ssid=ssid, ssid_channel=ssid_channel)
print("ssid scan data :- ", result)
if not result and ssid_channel:
#Sniffer required
# Sniffer required
print("sniff radio", data["sniff_radio"].split(".")[2])
self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30)
time.sleep(30)
@@ -364,7 +365,7 @@ class lf_tests(lf_libs):
self.staConnect.setup(extra_securities=extra_securities)
if ssid_channel:
pass
#Need to start sniffer
# Need to start sniffer
# print("sniff radio", data["sniff_radio"].split(".")[2])
# self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30)
self.staConnect.start()
@@ -372,7 +373,8 @@ class lf_tests(lf_libs):
time.sleep(self.staConnect.runtime_secs)
print(self.staConnect.station_names)
sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"]
station_data = self.get_station_data(sta_name=self.staConnect.station_names, rows=sta_rows, allure_attach=False)
station_data = self.get_station_data(sta_name=self.staConnect.station_names, rows=sta_rows,
allure_attach=False)
sta_table_dict = {}
sta_table_dict["station name"] = list(station_data.keys())
for i in sta_rows:
@@ -380,7 +382,7 @@ class lf_tests(lf_libs):
for j in self.staConnect.station_names:
temp_list.append(station_data[j][i])
sta_table_dict[i] = temp_list
#pass fail
# pass fail
pass_fail_sta = []
for i in sta_table_dict["ip"]:
if i == "0.0.0.0":
@@ -391,7 +393,8 @@ class lf_tests(lf_libs):
if allure_attch:
self.attach_table_allure(data=sta_table_dict, allure_name="station data")
self.staConnect.stop()
cx_name = list(self.staConnect.l3_udp_profile.get_cx_names()) + list(self.staConnect.l3_tcp_profile.get_cx_names())
cx_name = list(self.staConnect.l3_udp_profile.get_cx_names()) + list(
self.staConnect.l3_tcp_profile.get_cx_names())
cx_row = ["type", "bps rx a", "bps rx b"]
print(cx_name)
print(self.staConnect.get_result_list())
@@ -533,7 +536,7 @@ class lf_tests(lf_libs):
csv_data_table = report_obj.table2(list_data)
# allure.attach(name="scan_ssid_data", body=csv_data_table)
if allure_attach:
allure.attach(name="scan_ssid_data_" + str(i+1), body=csv_data_table)
allure.attach(name="scan_ssid_data_" + str(i + 1), body=csv_data_table)
obj_scan.cleanup()
if self.check_ssid_available_scan_result(scan_ssid_data=list_data, ssid=ssid):
count = count + 1
@@ -541,7 +544,6 @@ class lf_tests(lf_libs):
if count == 0:
return False
def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60):
self.pcap_name = test_name + ".pcap"
self.pcap_obj = SniffRadio(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port, radio=radio,
@@ -633,7 +635,7 @@ class lf_tests(lf_libs):
def get_cx_data(self, cx_name=[], cx_data=[], allure_attach=True):
"""Attach cx data to allure"""
url = "cx/all"
#cx_data.append("type")
# cx_data.append("type")
dict_cx_data = {}
cx_json_data = self.json_get(url)
print(cx_json_data)
@@ -664,7 +666,6 @@ class lf_tests(lf_libs):
self.attach_table_allure(data=cx_table_dict, allure_name="cx data")
return dict_cx_data
def get_station_list(self, num_sta=1, band="twog"):
"""Create station list"""
sta_list = []
@@ -690,11 +691,6 @@ class lf_tests(lf_libs):
logging.error(e)
if __name__ == '__main__':
basic_1 = {
"target": "tip_2x",
@@ -766,9 +762,10 @@ if __name__ == '__main__':
obj = lf_tests(lf_data=dict(basic_1["traffic_generator"]), dut_data=list(basic_1["device_under_tests"]),
log_level=logging.DEBUG, run_lf=True)
#obj.get_cx_data()
# obj.get_cx_data()
# obj.chamber_view()
obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[],
station_name=["1.1.ath10k_2g000", "1.1.ath10k_2g001"], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=11)
station_name=["1.1.ath10k_2g000", "1.1.ath10k_2g001"], mode="BRIDGE", vlan_id=1,
band="twog", ssid_channel=11)
# obj.chamber_view()
# obj.setup_relevent_profiles()