Added dataplane_throughput_test method

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2022-09-02 12:04:54 +05:30
parent 1ac93e053c
commit 0d1fbf3786

View File

@@ -47,6 +47,8 @@ wificapacitytest = importlib.import_module("py-scripts.lf_wifi_capacity_test")
WiFiCapacityTest = wificapacitytest.WiFiCapacityTest
csvtoinflux = importlib.import_module("py-scripts.csv_to_influx")
CSVtoInflux = csvtoinflux.CSVtoInflux
lf_dataplane_test = importlib.import_module("py-scripts.lf_dataplane_test")
DataplaneTest = lf_dataplane_test.DataplaneTest
class lf_tests(lf_libs):
@@ -297,9 +299,6 @@ class lf_tests(lf_libs):
def wifi_capacity_test(self):
pass
def dataplane_throughput_test(self):
pass
def rate_vs_range_test(self):
pass
@@ -337,7 +336,8 @@ class lf_tests(lf_libs):
station_data_all = {}
for radio in data[identifier]["station_data"]:
client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port,
_sta_list=data[identifier]["station_data"][radio], _password=data[identifier]["passkey"],
_sta_list=data[identifier]["station_data"][radio],
_password=data[identifier]["passkey"],
_ssid=data[identifier]["ssid"],
_security=data[identifier]["encryption"])
client_connect.station_profile.sta_mode = sta_mode
@@ -392,7 +392,8 @@ class lf_tests(lf_libs):
station_data = self.client_connect(ssid=ssid, security=security, passkey=passkey, mode=mode,
band=band, num_sta=num_sta, vlan_id=vlan_id,
allure_name="Station data before simulate radar", identifier=identifier,
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal", "mode"])
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)",
"ip", "signal", "mode"])
station_list = list(station_data.keys())
table_dict = {}
sta_channel_before_dfs_list = []
@@ -414,8 +415,9 @@ class lf_tests(lf_libs):
else:
logging.error("Station not connected to applied channel")
pytest.fail("Station not connected to applied channel")
self.get_station_data(rows=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal", "mode"],
sta_name=station_list, allure_name="Station data after simulate radar")
self.get_station_data(
rows=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal", "mode"],
sta_name=station_list, allure_name="Station data after simulate radar")
for i in range(5):
sta_channel_after_dfs = self.station_data_query(station_name=station_list[0], query="channel")
if sta_channel_after_dfs != sta_channel_before_dfs and str(sta_channel_after_dfs) != "-1":
@@ -446,12 +448,6 @@ class lf_tests(lf_libs):
logging.error("5 Ghz channel didn't changed after radar detected")
pytest.fail("5 Ghz channel didn't changed after radar detected")
def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None):
dut_name = []
@@ -553,7 +549,6 @@ class lf_tests(lf_libs):
if band == dut_data[dut]["ssid_data"][idx_]["band"] and ssid_name == \
dut_data[dut]["ssid_data"][idx_]["ssid"]:
idx = idx_
print("*********idx***********", idx)
if band == "2G":
stations = None
if num_stations != "max":
@@ -720,7 +715,8 @@ class lf_tests(lf_libs):
if move_to_influx:
try:
report_name = "../reports/" + \
wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[
-1] + "/"
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
influx_port=self.influx_params["influx_port"],
influx_org=self.influx_params["influx_org"],
@@ -739,182 +735,96 @@ class lf_tests(lf_libs):
wificapacity_obj_list.append(wificapacity_obj)
return wificapacity_obj_list
# for dut in dut_name:
# logging.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) +
# " DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx))
# idx = idx
# if self.run_lf or self.cc_1:
# if band == "2G":
# idx = 0
# if band == "5G":
# idx = 1
# if band == "6g":
# idx = 2
#
# for i in self.dut_idx_mapping:
# if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band:
# idx = i
# if band == "2G":
# all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + \
# self.ax210_radios
# print("all_2g_rdio", all_radio_2g)
# if num_stations != "max":
# logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g)))
# total_sta = num_stations
# max_possible = 0
# for radio in all_radio_2g:
# max_possible = max_possible + int(self.get_max_sta(radio))
# if total_sta <= max_possible:
# per_radio_sta = int(total_sta / len(all_radio_2g))
# rem = total_sta % len(all_radio_2g)
# else:
# total_sta = max_possible
# per_radio_sta = int(total_sta / len(all_radio_2g))
# rem = total_sta % len(all_radio_2g)
# if rem != 0 and per_radio_sta == 0:
# per_radio_sta = rem / len(all_radio_2g)
# logging.info("Total stations per radio: " + str(per_radio_sta))
# for radio in all_radio_2g:
# max_possible = int(self.get_max_sta(radio))
# if total_sta == 0:
# return
# num_stations = per_radio_sta
# if rem == 0 and num_stations == 0:
# return
# if max_possible - num_stations >= rem:
# num_stations = num_stations + rem
# rem = 0
# elif max_possible - rem >= num_stations:
# num_stations = num_stations + rem
# rem = 0
# elif total_sta <= max_possible:
# num_stations = total_sta
# if per_radio_sta < 1:
# num_stations = 1
# total_sta = total_sta - num_stations
# logging.info("Adding " + str(num_stations) + " Stations on " + str(radio))
# station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
# " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
# str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
# self.temp_raw_lines.append(station_data)
# logging.debug("Raw Line : " + str(station_data))
#
# if num_stations == "max":
# logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g)))
# for radio in all_radio_2g:
# num_stations = self.get_max_sta(radio)
# logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
# station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
# " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
# str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
# self.temp_raw_lines.append(station_data)
# logging.debug("Raw Line : " + str(station_data))
#
# if band == "5G":
# all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
# if num_stations != "max":
# logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g)))
# total_sta = num_stations
# max_possible = 0
# for radio in all_radio_5g:
# max_possible = max_possible + int(self.get_max_sta(radio))
# if total_sta <= max_possible:
# per_radio_sta = int(total_sta / len(all_radio_5g))
# rem = total_sta % len(all_radio_5g)
# else:
# total_sta = max_possible
# per_radio_sta = int(total_sta / len(all_radio_5g))
# rem = total_sta % len(all_radio_5g)
# if rem != 0 and per_radio_sta == 0:
# per_radio_sta = rem / len(all_radio_5g)
# logging.info("Total stations per radio: " + str(per_radio_sta))
# for radio in all_radio_5g:
# max_possible = int(self.get_max_sta(radio))
# if total_sta == 0:
# return
# num_stations = per_radio_sta
# if rem == 0 and num_stations == 0:
# return
# if max_possible - num_stations >= rem:
# num_stations = num_stations + rem
# rem = 0
# elif max_possible - rem >= num_stations:
# num_stations = num_stations + rem
# rem = 0
# elif total_sta <= max_possible:
# num_stations = total_sta
# if per_radio_sta < 1:
# num_stations = 1
# total_sta = total_sta - num_stations
# logging.info("Adding " + str(num_stations) + " Stations on " + str(radio))
# station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
# " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
# str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
# self.temp_raw_lines.append(station_data)
# logging.debug("Raw Line : " + str(station_data))
#
# if num_stations == "max":
# logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g)))
# for radio in all_radio_5g:
# num_stations = self.get_max_sta(radio)
# logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
# station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
# " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
# str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
# self.temp_raw_lines.append(station_data)
# logging.debug("Raw Line : " + str(station_data))
# if band == "6g":
# all_radio_6g = self.ax210_radios
# if num_stations != "max":
# logging.info("Total 6G Radios Available in Testbed: " + str(len(all_radio_6g)))
# total_sta = num_stations
# max_possible = 0
# for radio in all_radio_6g:
# max_possible = max_possible + int(self.get_max_sta(radio))
# if total_sta <= max_possible:
# per_radio_sta = int(total_sta / len(all_radio_6g))
# rem = total_sta % len(all_radio_6g)
# else:
# total_sta = max_possible
# per_radio_sta = int(total_sta / len(all_radio_6g))
# rem = total_sta % len(all_radio_6g)
# if rem != 0 and per_radio_sta == 0:
# per_radio_sta = rem / len(all_radio_6g)
# logging.info("Total stations per radio: " + str(per_radio_sta))
# for radio in all_radio_6g:
# max_possible = int(self.get_max_sta(radio))
# if total_sta == 0:
# return
# num_stations = per_radio_sta
# if rem == 0 and num_stations == 0:
# return
# if max_possible - num_stations >= rem:
# num_stations = num_stations + rem
# rem = 0
# elif max_possible - rem >= num_stations:
# num_stations = num_stations + rem
# rem = 0
# elif total_sta <= max_possible:
# num_stations = total_sta
# if per_radio_sta < 1:
# num_stations = 1
# total_sta = total_sta - num_stations
# logging.info("Adding " + str(num_stations) + " Stations on " + str(radio))
# station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
# " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
# str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
# self.temp_raw_lines.append(station_data)
# logging.debug("Raw Line : " + str(station_data))
# if num_stations == "max":
# logging.info("Total AX Radios Available in Testbed: " + str(len(all_radio_6g)))
# for radio in all_radio_6g:
# num_stations = self.get_max_sta(radio)
# logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
# station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
# " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
# str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
# self.temp_raw_lines.append(station_data)
# logging.debug("Raw Line : " + str(station_data))
def dataplane_throughput_test(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", num_sta=1, mode="BRIDGE",
vlan_id=[None],
download_rate="85%", band="twog", scan_ssid=True, sta_mode=0,
upload_rate="0", duration="15s", instance_name="test_demo", raw_lines=None,
influx_tags="",
move_to_influx=False,
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip",
"signal"],
allure_attach=True, allure_name="station data"):
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
dataplane_obj_list = []
for dut in self.dut_data:
identifier = dut["identifier"]
station_data = self.client_connect(ssid=ssid, passkey=passkey, security=security, mode=mode, band=band,
vlan_id=vlan_id, num_sta=num_sta, scan_ssid=scan_ssid, sta_mode=sta_mode,
station_data=station_data,
allure_attach=allure_attach, identifier=identifier,
allure_name=allure_name)
if mode == "BRIDGE":
ret = self.get_wan_upstream_ports()
upstream_port = ret[identifier]
if mode == "NAT-WAN":
ret = self.get_wan_upstream_ports()
upstream_port = ret[identifier]
if mode == "NAT-LAN":
ret = self.get_lan_upstream_ports()
upstream_port = ret[identifier]
if mode == "VLAN":
if vlan_id is None:
logging.error("VLAN ID is Unspecified in the VLAN Case")
pytest.skip("VLAN ID is Unspecified in the VLAN Case")
else:
self.add_vlan(vlan_ids=[vlan_id])
ret = self.get_wan_upstream_ports()
upstream_port = ret[identifier] + "." + str(vlan_id)
logging.info("Upstream data: " + str(upstream_port))
if raw_lines is None:
raw_lines = [['pkts: 142;256;512;1024;MTU;4000'], ['directions: DUT Transmit;DUT Receive'],
['traffic_types: UDP;TCP'],
["show_3s: 1"], ["show_ll_graphs: 1"], ["show_log: 1"]]
print("STATION NAME: ", list(station_data.keys())[0])
dataplane_obj = DataplaneTest(lf_host=self.manager_ip,
lf_port=self.manager_http_port,
ssh_port=self.manager_ssh_port,
local_lf_report_dir=self.local_report_path,
lf_user="lanforge",
lf_password="lanforge",
instance_name=instance_name,
config_name="dpt_config",
upstream=upstream_port,
pull_report=True,
load_old_cfg=False,
download_speed=download_rate,
upload_speed=upload_rate,
duration=duration,
dut=identifier,
station=list(station_data.keys())[0],
test_tag=influx_tags,
raw_lines=raw_lines)
dataplane_obj.setup()
dataplane_obj.run()
if move_to_influx:
report_name = "../reports/" + \
dataplane_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
try:
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
influx_port=self.influx_params["influx_port"],
influx_org=self.influx_params["influx_org"],
influx_token=self.influx_params["influx_token"],
influx_bucket=self.influx_params["influx_bucket"],
path=report_name)
influx.glob()
except Exception as e:
print(e)
pass
report_name = dataplane_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
self.attach_report_graphs(report_name=report_name, pdf_name="Dataplane Throughput Test - TCP-UDP 5G")
self.attach_report_kpi(report_name=report_name)
logging.info("Test Completed... Cleaning up Stations")
self.client_disconnect(station_name=list(station_data.keys()))
dataplane_obj_list.append(dataplane_obj)
return dataplane_obj_list
if __name__ == '__main__':