Modified add stations method

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2022-08-24 15:33:12 +05:30
parent 3bce445376
commit e35bd14630

View File

@@ -47,12 +47,14 @@ WiFiCapacityTest = wificapacitytest.WiFiCapacityTest
csvtoinflux = importlib.import_module("py-scripts.csv_to_influx")
CSVtoInflux = csvtoinflux.CSVtoInflux
class lf_tests(lf_libs):
"""
lf_tools is needed in lf_tests to do various operations needed by various tests
"""
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None, local_report_path="../reports/"):
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None,
local_report_path="../reports/"):
super().__init__(lf_data, dut_data, run_lf, log_level)
self.local_report_path = local_report_path
@@ -379,22 +381,11 @@ class lf_tests(lf_libs):
idx = None
# updating ssids on all APS
if self.run_lf:
# idx = idx
# if self.run_lf or self.cc_1:
# if band == "2G":
# idx = 0
# if band == "5G":
# idx = 1
# if band == "6g":
# idx = 2
for dut in self.dut_data:
ssid_data = []
if r_val.keys().__contains__(dut["identifier"]):
print("hi", dut["identifier"])
if dut.keys().__contains__("ssid"):
print("Hii")
if band == "2G":
print("hiiiiii")
if str(dut["ssid"]["2g-encryption"]).upper() == "OPEN":
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] +
' bssid=' + dut["ssid"]["2g-bssid"]])
@@ -404,8 +395,6 @@ class lf_tests(lf_libs):
' security=' + str(dut["ssid"]["2g-encryption"]).upper() +
' password=' + dut["ssid"]["2g-password"] +
' bssid=' + dut["ssid"]["2g-bssid"]])
print("Bye")
print(ssid_data)
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
if band == "5G":
if str(dut["ssid"]["5g-encryption"]).upper() == "OPEN":
@@ -426,7 +415,6 @@ class lf_tests(lf_libs):
' security=' + str(dut["ssid"]["6g-encryption"]).upper() +
' password=' + dut["ssid"]["6g-password"] +
' bssid=' + dut["ssid"]["6g-bssid"]])
print(ssid_data)
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
else:
for dut in self.dut_data:
@@ -436,16 +424,19 @@ class lf_tests(lf_libs):
for idx_ in dut_data[identifier]["ssid_data"]:
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() == "OPEN":
ssid_data.append(['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"]
+
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
ssid_data.append(
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"]
+
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
else:
ssid_data.append(['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"] +
' security=' + str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() +
' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] +
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
ssid_data.append(
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"] +
' security=' + str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() +
' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] +
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() in ["OPEN", "WPA", "WPA2", "WPA3", "WEP"]:
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() in ["OPEN", "WPA", "WPA2",
"WPA3", "WEP"]:
self.update_duts(identifier=identifier, ssid_data=ssid_data)
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios,
@@ -565,7 +556,8 @@ class lf_tests(lf_libs):
def wifi_capacity(self, mode="BRIDGE", vlan_id=100, batch_size="1,5,10,20,40,64,128",
instance_name="wct_instance", download_rate="1Gbps", influx_tags="",
upload_rate="1Gbps", protocol="TCP-IPv4", duration="60000", stations="", create_stations=True,
sort="interleave", raw_lines=[], move_to_influx=False, dut_data={}, ssid_name=None, num_stations={}):
sort="interleave", raw_lines=[], move_to_influx=False, dut_data={}, ssid_name=None,
num_stations={}):
wificapacity_obj_list = []
for dut in self.dut_data:
sets = [["DUT_NAME", dut["model"]]]
@@ -601,13 +593,14 @@ class lf_tests(lf_libs):
logging.error("Band is missing")
pytest.fail("band is missing")
if not isinstance(num_stations[band_], int) or num_stations[band_] == "max":
if not isinstance(num_stations[band_], int) or num_stations[band_] == "max":
logging.error("Number of stations are wrong")
pytest.fail("Number of stations are wrong")
if ssid_name is None:
logging.error("ssid name is missing")
pytest.fail("ssid name is missing")
self.add_stations(band=band_, num_stations=num_stations[band_], ssid_name=ssid_name, dut_data=dut_data, identifier=identifier)
self.add_stations(band=band_, num_stations=num_stations[band_], ssid_name=ssid_name, dut_data=dut_data,
identifier=identifier)
self.chamber_view(raw_lines="custom")
wificapacity_obj = WiFiCapacityTest(lfclient_host=self.manager_ip,
lf_port=self.manager_http_port,
@@ -663,8 +656,6 @@ class lf_tests(lf_libs):
wificapacity_obj_list.append(wificapacity_obj)
return wificapacity_obj_list
# for dut in dut_name:
# logging.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) +
# " DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx))
@@ -842,6 +833,7 @@ class lf_tests(lf_libs):
# self.temp_raw_lines.append(station_data)
# logging.debug("Raw Line : " + str(station_data))
if __name__ == '__main__':
basic_04 = {
"target": "tip_2x",
@@ -885,7 +877,7 @@ if __name__ == '__main__':
"testbed": "basic",
"scenario": "dhcp-bridge",
"details": {
"manager_ip": "10.28.3.6",
"manager_ip": "10.28.3.12",
"http_port": 8080,
"ssh_port": 22,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
@@ -915,13 +907,21 @@ if __name__ == '__main__':
obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]),
log_level=logging.DEBUG, run_lf=True)
#obj.add_stations()
#obj.add_stations(band="5G")
#obj.chamber_view(raw_lines="custom")
obj.wifi_capacity(instance_name="test_client_wpa2_BRIDGE_udp_bi", mode="BRIDGE", vlan_id=[100],
download_rate="1Gbps", batch_size="1,5,10,20,40,64,128,256",
influx_tags="wifi-capacity-udp-bidirectional-bridge-wpa2-2.4G-5G",
upload_rate="1Gbps", protocol="UDP-IPv4", duration="60000", move_to_influx=False)
# obj.add_stations()
# obj.add_stations(band="5G")
# obj.chamber_view(raw_lines="custom")
dut = {'0000c1018812': {"ssid_data": {
0: {"ssid": 'TestSSID-2G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '2G',
"bssid": '00:00:C1:01:88:15'},
1: {"ssid": 'TestSSID-5G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '5G',
"bssid": '00:00:C1:01:88:14'}}, "radio_data": {'2G': [1, 40, 2422], '5G': [36, 80, 5210], '6G': None}}}
obj.wifi_capacity(instance_name="test_client_wpa2_BRIDGE_udp_bi", mode="BRIDGE",
vlan_id=[100],
download_rate="1Gbps", batch_size="1,5,10,20,40,64,128,256",
influx_tags="Jitu",
upload_rate="1Gbps", protocol="UDP-IPv4", duration="60000",
move_to_influx=False, dut_data=dut, ssid_name="OpenWifi",
num_stations={"2G": 10, "5G": 10})
# A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1)
# print(A)
# obj.setup_relevent_profiles()