Fixed issue in lf_tests/client_isolation and added flags to lf_libs/client_connect_using_radio and station_list param in lf_libs/stop_sniffer for better naming in allure report

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2024-03-18 16:52:49 +05:30
parent 436ecbfc9e
commit 6e13568824
2 changed files with 176 additions and 126 deletions

View File

@@ -913,7 +913,7 @@ class lf_libs:
time.sleep(5)
self.pcap_obj.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration)
def stop_sniffer(self):
def stop_sniffer(self, sta_list=[]):
self.pcap_obj.monitor.admin_down()
time.sleep(2)
self.pcap_obj.cleanup()
@@ -923,7 +923,7 @@ class lf_libs:
report_location="/home/lanforge/" + self.pcap_name,
report_dir=".")
allure.attach.file(source=self.pcap_name,
name="pcap_file", attachment_type=allure.attachment_type.PCAP)
name="pcap_file " + ", ".join(sta_list), attachment_type=allure.attachment_type.PCAP)
logging.info("pcap file name: " + str(self.pcap_name))
except Exception as e:
logging.error(e)
@@ -1695,7 +1695,8 @@ class lf_libs:
def client_connect_using_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band=None,
vlan_id=[None], radio=None, client_type=0, station_name=[], dut_data=None,
sniff_radio=False, create_vlan=True):
sniff_radio=False, create_vlan=True, attach_port_info=True,
attach_station_data=True):
# pre cleanup
# if pre_cleanup:
# self.pre_cleanup()
@@ -1723,10 +1724,11 @@ class lf_libs:
client_connect.upstream_port = upstream_port
client_connect.upstream_resource = 1
client_connect.radio = radio
# allure attach for port info
port_data = self.json_get(_req_url="port?fields=ip")
port_info = {key: value for d in port_data["interfaces"] for key, value in d.items()}
self.allure_report_table_format(dict_data=port_info, key="Port Names", value="ip", name="Port info")
if attach_port_info is True:
# allure attach for port info
port_data = self.json_get(_req_url="port?fields=ip")
port_info = {key: value for d in port_data["interfaces"] for key, value in d.items()}
self.allure_report_table_format(dict_data=port_info, key="Port Names", value="ip", name="Port info")
if sniff_radio:
for dut_ in self.dut_data:
identifier = dut_["identifier"]
@@ -1745,15 +1747,16 @@ class lf_libs:
client_connect.build()
logging.info("napping 10 sec")
time.sleep(10)
# station data
sta_length = len(station_name)
for i in range(sta_length):
sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i]))
self.allure_report_table_format(dict_data=sta_data1["interface"], key="STATION DETAILS",
value="VALUE", name="%s info" % (station_name[i]))
if attach_station_data is True:
# station data
sta_length = len(station_name)
for i in range(sta_length):
sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i]))
self.allure_report_table_format(dict_data=sta_data1["interface"], key="STATION DETAILS",
value="VALUE", name="%s info" % (station_name[i]))
# to stop sniffer
if radio is not None and sniffer_channel is not None:
self.stop_sniffer()
self.stop_sniffer(station_name)
else:
logging.info("missing identifier.")
else:

View File

@@ -2527,126 +2527,173 @@ class lf_tests(lf_libs):
logging.info("Test Failed, due to station has no ip")
return False, "TEST FAILED, due to station has no ip"
def client_isolation(self, ssid1=None, ssid2=None, passkey=None, security=None, mode="BRIDGE", band_2g=None,
band_5g=None, dut_data=None, num_sta=None, side_a_min_rate=None, side_a_max_rate=None,
side_b_min_rate=None,
side_b_max_rate=None, sniff_radio=True):
band = radio_name = station_name = radio_names = station_name1 = station_name2 = station_result = layer3_result = None
if band_5g is None and band_2g is not None:
def client_isolation(self, ssid1=None, ssid2=None, passkey=None, security=None, mode="BRIDGE", band_2g=False,
band_5g=False, dut_data=None, num_sta=None, side_a_min_rate=None, side_a_max_rate=None,
side_b_min_rate=None, side_b_max_rate=None, sniff_radio=True):
copy_num_sta = num_sta
# selecting radio(s) based on the requested bands of the client(s)
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios, "wave1_radios": self.wave1_radios,
"mtk_radios": self.mtk_radios, "ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios}
dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, "wave1_radios": self.wave1_radios,
"mtk_radios": self.mtk_radios, "ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios}
max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19,
"ax200_radios": 1, "ax210_radios": 1}
radio_name_2g = []
radio_name_5g = []
if band_2g is True and band_5g is True: # a 2G and a 5G station
for type_of_radio in dict_all_radios_2g:
if len(dict_all_radios_2g[type_of_radio]) > 0:
radio_name_2g.append(dict_all_radios_2g[type_of_radio][0])
max_station_per_radio[type_of_radio] -= 1
break
for type_of_radio in dict_all_radios_5g:
if len(dict_all_radios_5g[type_of_radio]) > 0 and max_station_per_radio[type_of_radio] > 0:
radio_name_5g.append(dict_all_radios_5g[type_of_radio][0])
break
if len(radio_name_2g) == 0 or len(radio_name_5g) == 0:
logging.info("Looks like the langforge radios can't support creating a 2G and a 5G station, "
"simultaneously.")
pytest.skip("Looks like the langforge radios can't support creating a 2G and a 5G station, "
"simultaneously.")
station_name_2g = "sta_2g"
station_name_5g = "sta_5g"
band = ["twog", "fiveg"]
elif band_2g is True: # only 2g bands but num_sta can be 1 or 2
if self.max_2g_stations < num_sta:
logging.info(f"Looks like the langforge radios can't support creating {num_sta} 2G stations.")
raise ValueError(f"Looks like the langforge radios can't support creating {num_sta} 2G stations.")
band = "twog"
radio_name = self.wave2_2g_radios[0]
station_name = self.twog_prefix
elif band_2g is None and band_5g is not None:
enough_radios = False
for type_of_radio in dict_all_radios_2g:
if len(dict_all_radios_2g[type_of_radio]) > 0:
for i in range(len(dict_all_radios_2g[type_of_radio])):
radio_name_2g.append(dict_all_radios_2g[type_of_radio][i])
if num_sta <= max_station_per_radio[type_of_radio]:
num_sta = 0
enough_radios = True
break
else:
num_sta -= max_station_per_radio[type_of_radio]
if enough_radios:
break
station_name = "sta_2g"
elif band_5g is True: # only 5g bands but num_sta can be 1 or 2
if self.max_5g_stations < num_sta:
logging.info(f"Looks like the langforge radios can't support creating {num_sta} 5G stations.")
raise ValueError(f"Looks like the langforge radios can't support creating {num_sta} 5G stations.")
band = "fiveg"
radio_name = self.wave2_5g_radios[0]
station_name = self.fiveg_prefix
elif band_2g and band_5g is not None:
radio_name1 = self.wave2_2g_radios[0]
radio_name2 = self.wave2_5g_radios[0]
station_name1 = self.twog_prefix
station_name2 = self.fiveg_prefix
band = [band_2g, band_5g]
radio_names = [radio_name1, radio_name2]
else:
logging.info("band is not selected.")
allure.attach(name="Min Tx rate -A", body=f"{side_a_min_rate} bytes")
allure.attach(name="Min Tx rate -B", body=f"{side_b_min_rate} bytes")
# allure.attach(name="Max Tx rate -A", body=f"{side_a_max_rate} bytes")
# allure.attach(name="Max Tx rate -B", body=f"{side_b_max_rate} bytes")
enough_radios = False
for type_of_radio in dict_all_radios_5g:
if len(dict_all_radios_5g[type_of_radio]) > 0:
for i in range(len(dict_all_radios_5g[type_of_radio])):
radio_name_5g.append(dict_all_radios_5g[type_of_radio][i])
if num_sta <= max_station_per_radio[type_of_radio]:
num_sta = 0
enough_radios = True
break
else:
num_sta -= max_station_per_radio[type_of_radio]
if enough_radios:
break
station_name = "sta_5g"
logging.info("Clearing any existing stations and Layer-3 traffics before starting the test...")
self.pre_cleanup() # clear any existing stations and traffic
sta = []
ssids = [ssid1, ssid2]
if num_sta > 1:
if band_2g and band_5g is not None:
for i in range(1):
sta.append(station_name1 + str(i))
sta.append(station_name2 + str(i))
for i in range(num_sta):
station_result = self.client_connect_using_radio(ssid=ssids[i], passkey=passkey,
security=security, mode=mode,
band=band[i], radio=radio_names[i],
station_name=[sta[i]],
dut_data=dut_data,
sniff_radio=True)
layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
traffic_type="lf_udp", sta_list=[sta[0]], side_b=sta[1])
else:
num_sta = copy_num_sta
sta_got_ip = []
if num_sta > 1: # between 2 stations
if band_2g is True and band_5g is True: # a 2G and a 5G station
sta_got_ip.append(self.client_connect_using_radio(ssid=ssid1, passkey=passkey, security=security,
mode=mode, band="twog", radio=radio_name_2g[0],
station_name=[station_name_2g], dut_data=dut_data,
sniff_radio=sniff_radio, attach_port_info=False,
attach_station_data=False))
sta_got_ip.append(self.client_connect_using_radio(ssid=ssid2, passkey=passkey, security=security,
mode=mode, band="fiveg", radio=radio_name_5g[0],
station_name=[station_name_5g], dut_data=dut_data,
sniff_radio=sniff_radio, attach_port_info=False,
attach_station_data=False))
self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
traffic_type="lf_udp", sta_list=[station_name_2g], side_b=station_name_5g)
else: # else both are either 2G or 5G stations
ssids = [ssid1, ssid2]
radio_name = radio_name_2g + radio_name_5g
if len(radio_name) == 1:
radio_name.append(radio_name[0])
for i in range(2):
sta.append(station_name + str(i))
station_result = self.client_connect_using_radio(ssid=ssids[i], passkey=passkey, band=band,
security=security,
mode=mode, radio=radio_name, station_name=[sta[i]],
dut_data=dut_data, sniff_radio=sniff_radio)
layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
traffic_type="lf_udp", sta_list=sta, side_b=sta[1])
elif num_sta == 1:
station_result = self.client_connect_using_radio(ssid=ssid1, passkey=passkey, band=band, security=security,
mode=mode, radio=radio_name, station_name=[station_name],
dut_data=dut_data, sniff_radio=sniff_radio)
layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
traffic_type="lf_udp", sta_list=[station_name], side_b="")
logging.info("waiting for 20 seconds")
time.sleep(20)
sta.append(station_name + "_" + str(i + 1))
sta_got_ip.append(self.client_connect_using_radio(ssid=ssids[i], passkey=passkey, band=band,
security=security, mode=mode, radio=radio_name[i],
station_name=[sta[i]], dut_data=dut_data,
sniff_radio=sniff_radio, attach_port_info=False,
attach_station_data=False))
self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
traffic_type="lf_udp", sta_list=[sta[0]], side_b=sta[1])
elif num_sta == 1: # else between a 2G/5G station and uplink port
radio_name = radio_name_2g if band_2g is True else radio_name_5g
sta_got_ip.append(self.client_connect_using_radio(ssid=ssid1, passkey=passkey, band=band, security=security,
mode=mode, radio=radio_name[0], station_name=[station_name],
dut_data=dut_data, sniff_radio=sniff_radio,
attach_port_info=False, attach_station_data=False))
self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
traffic_type="lf_udp", sta_list=[station_name], side_b="")
if False in sta_got_ip:
self.pre_cleanup()
logging.info("TEST FAILED, due to station has no ip")
return False, "TEST FAILED, due to station has no ip"
logging.info("Running Traffic for 60 seconds...")
time.sleep(60)
logging.info("Getting Layer-3 and Endpoints Data...")
cx_list = self.get_cx_list()
rx_data = self.json_get(_req_url=f"cx/{cx_list[0]}")
rx_drop_a = rx_data[f"{cx_list[0]}"]["rx drop % a"]
rx_drop_b = rx_data[f"{cx_list[0]}"]["rx drop % b"]
bps_rx_a = rx_data[f"{cx_list[0]}"]["bps rx a"]
bps_rx_b = rx_data[f"{cx_list[0]}"]["bps rx b"]
self.allure_report_table_format(dict_data=rx_data[f"{cx_list[0]}"], key="layer3 column names", value="Values",
name="Layer-3 Data")
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
if len(sta) == 0:
up = self.get_wan_upstream_ports()
upstream = list(up.values())
upstream_port = upstream[0]
table_columns = [station_name, upstream_port]
else:
table_columns = [sta[0], sta[1]]
table_data = {"Station Name": table_columns, "bps rx a": [bps_rx_a, bps_rx_b],
"rx drop %": [rx_drop_a, rx_drop_b]}
table = tabulate(table_data, headers='keys', tablefmt='fancy_grid', showindex=True)
print(table)
if not station_result:
allure.attach(name="Test Result", body="TEST FAILED, due to station has no ip")
return False, "TEST FAILED, due to station has no ip"
else:
logging.info("Station creation passed. Successful.")
if layer3_result is None:
logging.info("Layer3 traffic ran.")
if ssid1 == ssid2:
if ((bps_rx_a == 0 and bps_rx_b == 0) and (rx_drop_a == 100 and rx_drop_b == 100)) or (
(bps_rx_a != 0 and bps_rx_b != 0) and (rx_drop_a != 100 and rx_drop_b != 100)):
allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table))
return True, "TEST PASS"
else:
allure.attach(name="Test Result",
body="TEST FAILED, Stations should not ping each other, when isolation enabled or"
"rx-drop should not be 100% when isolation disabled in same ssid" + "\n\n" + str(
table))
return False, "TEST FAILED, Stations should not ping each other, when isolation enabled or " \
"rx-drop should not be 100% when isolation disabled in same ssid"
elif band_2g and band_5g is not None:
if (bps_rx_a == 0 and bps_rx_b != 0) or (bps_rx_a != 0 and bps_rx_b == 0):
allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table))
return True, "TEST PASS"
else:
allure.attach(name="Test Result", body="TEST FAILED " + "\n\n" + str(table))
return False, "TEST FAILED, Traffic not ran properly"
else:
if rx_drop_a and rx_drop_b == 100 or 0:
allure.attach(name="Test Result",
body="TEST FAILED, Rx drop should not be 100% or 0%" + "\n\n" + str(table))
return False, "TEST FAILED, Rx drop should not be 100% or 0%"
else:
allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table))
return True, "TEST PASS"
else:
logging.info("Layer3 not ran properly.")
rx_drop_a = rx_data[cx_list[0]]["rx drop % a"]
rx_drop_b = rx_data[cx_list[0]]["rx drop % b"]
sta = []
for u in self.json_get("/port/?fields=port+type,alias,ssid")['interfaces']:
if (list(u.values())[0]['port type'] in ['WIFI-STA']
and list(u.values())[0]['ssid'] in [ssid1, ssid2]):
sta.append(list(u.keys())[0])
if len(sta) == 1:
sta.append(list(self.get_wan_upstream_ports().values())[0])
sta_rows = ["ssid", "ip", "mode", "channel", "signal", "mac", "parent dev"]
sta_dict = self.get_station_data(sta_name=sta, rows=sta_rows, allure_attach=False)
station_table_dict = {"station name": list(sta_dict.keys()),
"Min/Max Tx rate": [f"{side_a_min_rate} bytes", f"{side_b_min_rate} bytes"],
"rx drop %": [rx_drop_a, rx_drop_b]}
for col in sta_rows:
temp_list = []
for port in sta:
temp_list.append(sta_dict[port][col])
station_table_dict[col] = temp_list
logging.info("Attaching to the allure report...")
self.attach_table_allure(data=station_table_dict, allure_name="Endpoints Data")
self.allure_report_table_format(dict_data=rx_data[cx_list[0]], key="Layer-3 Column", value="Value",
name="Layer-3 Data")
logging.info("Traffic ran, Clearing stations and Layer-3 traffic...")
self.pre_cleanup()
return True, {"drop_a": rx_drop_a, "drop_b": rx_drop_b}
def ax_capacity_test(self, instance_name="", dut_data=None, mode="BRIDGE", download_rate="10Gbps",
upload_rate="0Gbps", dut_mode="", protocol="UDP-IPv4", num_stations={}, vlan_id=None):