Added run lf for wifi capacity

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2022-09-28 10:50:02 +05:30
parent d66db2904d
commit ad25246c90

View File

@@ -160,8 +160,9 @@ class lf_tests(lf_libs):
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
channel = data[dut]["channel"]
if data[dut]["sniff_radio_2g"] is not None and channel is not None:
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_2g"].split(".")[2],
duration=runtime_secs)
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_2g"].split(".")[2],
duration=runtime_secs)
logging.info("started-sniffer")
for obj in sta_connect_obj:
obj.start()
@@ -175,8 +176,9 @@ class lf_tests(lf_libs):
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
channel = data[dut]["channel"]
if data[dut]["sniff_radio_5g"] is not None and channel is not None:
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_5g"].split(".")[2],
duration=runtime_secs)
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_5g"].split(".")[2],
duration=runtime_secs)
for obj in sta_connect_obj:
obj.start()
logging.info("napping %f sec" % runtime_secs)
@@ -188,7 +190,8 @@ class lf_tests(lf_libs):
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
channel = data[dut]["channel"]
if data[dut]["sniff_radio_6g"] is not None and channel is not None:
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_6g"].split(".")[2],
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_6g"].split(".")[2],
duration=runtime_secs)
for obj in sta_connect_obj:
obj.start()
@@ -414,7 +417,8 @@ class lf_tests(lf_libs):
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
if data[dut]["sniff_radio_2g"] is not None and channel is not None:
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_2g"].split(".")[2],
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_2g"].split(".")[2],
duration=runtime_secs)
logging.info("started-sniffer")
for obj in eap_connect_objs:
@@ -428,7 +432,8 @@ class lf_tests(lf_libs):
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
if data[dut]["sniff_radio_5g"] is not None and channel is not None:
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_5g"].split(".")[2],
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_5g"].split(".")[2],
duration=runtime_secs)
for obj in eap_connect_objs:
obj.start(obj.sta_list, True, True, wait_time=1)
@@ -441,7 +446,8 @@ class lf_tests(lf_libs):
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]
if data[dut]["sniff_radio_6g"] is not None and channel is not None:
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_6g"].split(".")[2],
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_6g"].split(".")[2],
duration=runtime_secs)
for obj in eap_connect_objs:
obj.start(obj.sta_list, True, True, wait_time=1)
@@ -513,7 +519,8 @@ class lf_tests(lf_libs):
cx_table_dict["Pass/Fail"] = pass_fail_cx
if allure_attach:
self.attach_table_allure(data=cx_table_dict, allure_name="cx data")
obj.cleanup(obj.sta_list)
if cleanup:
obj.cleanup(obj.sta_list)
result = "PASS"
description = "Unknown error"
count = 0
@@ -823,64 +830,26 @@ class lf_tests(lf_libs):
r_val[dut["identifier"]] = None
idx = None
# updating ssids on all APS
if self.run_lf:
for dut in self.dut_data:
ssid_data = []
if r_val.keys().__contains__(dut["identifier"]):
if dut.keys().__contains__("ssid"):
if band == "2G":
if str(dut["ssid"]["2g-encryption"]).upper() == "OPEN":
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] +
' bssid=' + dut["ssid"]["2g-bssid"]])
print(ssid_data)
else:
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] +
' security=' + str(dut["ssid"]["2g-encryption"]).upper() +
' password=' + dut["ssid"]["2g-password"] +
' bssid=' + dut["ssid"]["2g-bssid"]])
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
if band == "5G":
if str(dut["ssid"]["5g-encryption"]).upper() == "OPEN":
ssid_data.append(['ssid_idx=1 ssid=' + dut["ssid"]["5g-ssid"] +
' bssid=' + dut["ssid"]["5g-bssid"]])
else:
ssid_data.append(['ssid_idx=1 ssid=' + dut["ssid"]["5g-ssid"] +
' security=' + str(dut["ssid"]["5g-encryption"]).upper() +
' password=' + dut["ssid"]["5g-password"] +
' bssid=' + dut["ssid"]["5g-bssid"]])
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
if band == "6G":
if str(dut["ssid"]["6g-encryption"]).upper() == "OPEN":
ssid_data.append(['ssid_idx=2 ssid=' + dut["ssid"]["6g-ssid"] +
' bssid=' + dut["ssid"]["6g-bssid"]])
else:
ssid_data.append(['ssid_idx=2 ssid=' + dut["ssid"]["6g-ssid"] +
' security=' + str(dut["ssid"]["6g-encryption"]).upper() +
' password=' + dut["ssid"]["6g-password"] +
' bssid=' + dut["ssid"]["6g-bssid"]])
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
else:
for dut in self.dut_data:
ssid_data = []
identifier = dut["identifier"]
if r_val.keys().__contains__(identifier):
for idx_ in dut_data[identifier]["ssid_data"]:
for dut in self.dut_data:
ssid_data = []
identifier = dut["identifier"]
if r_val.keys().__contains__(identifier):
for idx_ in dut_data[identifier]["ssid_data"]:
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() == "OPEN":
ssid_data.append(
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"]
+
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
else:
ssid_data.append(
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"] +
' security=' + str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() +
' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] +
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() == "OPEN":
ssid_data.append(
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"]
+
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
else:
ssid_data.append(
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"] +
' security=' + str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() +
' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] +
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() in ["OPEN", "WPA", "WPA2",
"WPA3", "WEP"]:
self.update_duts(identifier=identifier, ssid_data=ssid_data)
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() in ["OPEN", "WPA", "WPA2",
"WPA3", "WEP"]:
self.update_duts(identifier=identifier, ssid_data=ssid_data)
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
@@ -898,19 +867,18 @@ class lf_tests(lf_libs):
"ax200_radios": 1, "ax210_radios": 1}
radio_data = {}
sniff_radio = ""
if self.run_lf:
if band == "2G":
idx = 0
if band == "5G":
idx = 1
if band == "6g":
idx = 2
else:
for dut in dut_data:
for idx_ in dut_data[dut]["ssid_data"]:
if band == dut_data[dut]["ssid_data"][idx_]["band"] and ssid_name == \
dut_data[dut]["ssid_data"][idx_]["ssid"]:
idx = idx_
for dut in dut_data:
for idx_ in dut_data[dut]["ssid_data"]:
if band == "2G":
temp_band = "twog"
elif band == "5G":
temp_band = "fiveg"
elif band == "6G":
temp_band = "sixg"
if temp_band == dut_data[dut]["ssid_data"][idx_]["band"] and ssid_name == \
dut_data[dut]["ssid_data"][idx_]["ssid"]:
idx = idx_
if band == "2G":
stations = None
if num_stations != "max":
@@ -984,7 +952,7 @@ class lf_tests(lf_libs):
stations = stations - max_station
diff = max_station - stations
print(radio_data)
logging.info("Radio data: " + str(radio_data))
for radio in radio_data:
if identifier is None:
logging.error("Identifier is None")
@@ -997,14 +965,13 @@ class lf_tests(lf_libs):
def wifi_capacity(self, mode="BRIDGE", vlan_id=100, batch_size="1,5,10,20,40,64,128",
instance_name="wct_instance", download_rate="1Gbps", influx_tags="",
upload_rate="1Gbps", protocol="TCP-IPv4", duration="60000", stations="", create_stations=True,
upload_rate="1Gbps", protocol="TCP-IPv4", duration="60000", stations="", create_stations=False,
sort="interleave", raw_lines=[], move_to_influx=False, dut_data={}, ssid_name=None,
num_stations={}):
num_stations={}, add_stations=True):
wificapacity_obj_list = []
for dut in self.dut_data:
sets = [["DUT_NAME", dut["model"]]]
identifier = dut["identifier"]
print("sets", sets)
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
if mode == "BRIDGE":
ret = self.get_wan_upstream_ports()
@@ -1027,23 +994,41 @@ class lf_tests(lf_libs):
upstream_port = ret[identifier] + "." + str(vlan_id)
logging.info("Upstream data: " + str(upstream_port))
sets = [["DUT_NAME", dut]]
'''SINGLE WIFI CAPACITY using lf_wifi_capacity.py'''
self.temp_raw_lines = self.default_scenario_raw_lines.copy()
for band_ in num_stations:
if band_ not in ["2G", "5G", "6G"]:
logging.error("Band is missing")
pytest.fail("band is missing")
if not isinstance(num_stations[band_], int) or num_stations[band_] == "max":
logging.error("Number of stations are wrong")
pytest.fail("Number of stations are wrong")
if ssid_name is None:
logging.error("ssid name is missing")
pytest.fail("ssid name is missing")
if add_stations:
'''SINGLE WIFI CAPACITY using lf_wifi_capacity.py'''
self.temp_raw_lines = self.default_scenario_raw_lines.copy()
for band_ in num_stations:
if band_ not in ["2G", "5G", "6G"]:
logging.error("Band is missing")
pytest.fail("band is missing")
self.add_stations(band=band_, num_stations=num_stations[band_], ssid_name=ssid_name, dut_data=dut_data,
identifier=identifier)
self.chamber_view(raw_lines="custom")
if not isinstance(num_stations[band_], int):
if not num_stations[band_] == "max":
logging.error("Number of stations are wrong")
pytest.fail("Number of stations are wrong")
if ssid_name is None:
logging.error("ssid name is missing")
pytest.fail("ssid name is missing")
if self.run_lf:
dut_data = self.run_lf_dut_data()
for i in dut_data:
if mode != dut_data[i]["mode"]:
pytest.skip("Dut is not configured in mode: " + mode)
else:
for j in dut_data[i]["ssid_data"]:
if band_ == "2G":
temp_band = "twog"
elif band_ == "5G":
temp_band = "fiveg"
elif band_ == "6G":
temp_band = "sixg"
if temp_band == dut_data[i]["ssid_data"][j]["band"]:
ssid_name = dut_data[i]["ssid_data"][j]["ssid"]
self.add_stations(band=band_, num_stations=num_stations[band_], ssid_name=ssid_name,
dut_data=dut_data,
identifier=identifier)
self.chamber_view(raw_lines="custom")
wificapacity_obj = WiFiCapacityTest(lfclient_host=self.manager_ip,
lf_port=self.manager_http_port,
ssh_port=self.manager_ssh_port,
@@ -1144,8 +1129,6 @@ class lf_tests(lf_libs):
['traffic_types: UDP;TCP'],
["show_3s: 1"], ["show_ll_graphs: 1"], ["show_log: 1"]]
print("STATION NAME: ", list(station_data.keys())[0])
dataplane_obj = DataplaneTest(lf_host=self.manager_ip,
lf_port=self.manager_http_port,
ssh_port=self.manager_ssh_port,
@@ -1206,48 +1189,41 @@ if __name__ == '__main__':
"wan_port": "1.1.eth3",
"lan_port": None,
"ssid": {
"mode": "BRIDGE",
"ssid_data": {
"0": {
"ssid": "ssid_wpa_2g_br",
"encryption": "wpa",
"password": "something",
"band": "fiveg",
"bssid": "90:3C:B3:6C:43:04"
},
"1": {
"ssid": "ssid_wpa_2g_br",
"encryption": "wpa",
"password": "something",
"band": "twog",
"bssid": "90:3C:B3:6C:43:04"
},
"2": {
"ssid": "ssid_wpa_2g_br",
"encryption": "wpa3",
"password": "something",
"band": "fiveg",
"bssid": "90:3C:B3:6C:43:04"
}
},
"radio_data": {
"2G": {
"channel": 6,
"bandwidth": 20,
"frequency": 2437
},
"5G": {
"channel": None,
"bandwidth": None,
"frequency": None
},
"6G": {
"channel": None,
"bandwidth": None,
"frequency": None
}
}
},
"mode": "BRIDGE",
"ssid_data": {
"0": {
"ssid": "OpenWifi",
"encryption": "wpa2",
"password": "OpenWifi",
"band": "fiveg",
"bssid": "90:3C:B3:6C:43:04"
},
"1": {
"ssid": "OpenWifi",
"encryption": "wpa2",
"password": "OpenWifi",
"band": "twog",
"bssid": "90:3C:B3:6C:43:04"
}
},
"radio_data": {
"2G": {
"channel": 1,
"bandwidth": 20,
"frequency": 2437
},
"5G": {
"channel": 52,
"bandwidth": 20,
"frequency": 5260
},
"6G": {
"channel": None,
"bandwidth": None,
"frequency": None
}
}
},
"mode": "wifi6",
"identifier": "903cb36c4301",
"method": "serial",
@@ -1325,12 +1301,12 @@ if __name__ == '__main__':
# obj.chamber_view()
dut = {'903cb36c4301':
{'ssid_data': {
0: {'ssid': 'ssid_wpa_2g_br', 'encryption': 'wpa', 'password': 'something', 'band': '2G',
'bssid': '90:3C:B3:6C:43:04'}}, 'radio_data': {'2G': {'channel': 6, 'bandwidth': 20, 'frequency': 2437},
'5G': {'channel': None, 'bandwidth': None,
'frequency': None},
'6G': {'channel': None, 'bandwidth': None,
'frequency': None}}}}
0: {'ssid': 'ssid_wpa_2g_br', 'encryption': 'wpa', 'password': 'something', 'band': '2G',
'bssid': '90:3C:B3:6C:43:04'}}, 'radio_data': {'2G': {'channel': 6, 'bandwidth': 20, 'frequency': 2437},
'5G': {'channel': None, 'bandwidth': None,
'frequency': None},
'6G': {'channel': None, 'bandwidth': None,
'frequency': None}}}}
passes, result = obj.client_connectivity_test(ssid="ssid_wpa_2g_br", passkey="something", security="wpa",
extra_securities=[],