sta_connect.py: Added kpi.csv generation,

reverted to older stop method,
renamed finish method to prevent conflict
This commit is contained in:
Logan Lipke
2022-05-03 10:31:26 -07:00
committed by shivam
parent 630da2e77f
commit 24171d2f11

View File

@@ -36,6 +36,8 @@ LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
removeCX = LFUtils.removeCX removeCX = LFUtils.removeCX
removeEndps = LFUtils.removeEndps removeEndps = LFUtils.removeEndps
realm = importlib.import_module("py-json.realm") realm = importlib.import_module("py-json.realm")
lf_kpi_csv = importlib.import_module("py-scripts.lf_kpi_csv")
lf_report = importlib.import_module("py-scripts.lf_report")
Realm = realm.Realm Realm = realm.Realm
set_port = importlib.import_module("py-json.LANforge.set_port") set_port = importlib.import_module("py-json.LANforge.set_port")
add_sta = importlib.import_module("py-json.LANforge.add_sta") add_sta = importlib.import_module("py-json.LANforge.add_sta")
@@ -152,7 +154,7 @@ class StaConnect(Realm):
time.sleep(self.runtime_secs) time.sleep(self.runtime_secs)
if not self.stop(): if not self.stop():
return False return False
if not self.finish(): if not self.finish_test():
return False return False
# remove all endpoints and cxs # remove all endpoints and cxs
@@ -191,6 +193,8 @@ class StaConnect(Realm):
self._fail("Warning: %s lacks ip address" % self.get_upstream_url()) self._fail("Warning: %s lacks ip address" % self.get_upstream_url())
return False return False
# TODO: Precleanup
for sta_name in self.station_names: for sta_name in self.station_names:
self.rm_port(sta_name, check_exists=True, debug_=self.debug) self.rm_port(sta_name, check_exists=True, debug_=self.debug)
# TODO: Check return code. # TODO: Check return code.
@@ -441,9 +445,18 @@ class StaConnect(Realm):
return True return True
def stop(self): def stop(self):
self.cx_profile.stop_cx() # self.cx_profile.stop_cx()
print("Stopping CX Traffic")
for cx_name in self.cx_names.keys():
data = {
"test_mgr": "ALL",
"cx_name": cx_name,
"cx_state": "STOPPED"
}
self.json_post("/cli-json/set_cx_state", data)
return True
def finish(self): def finish_test(self):
# Refresh stats # Refresh stats
print("\nRefresh CX stats") print("\nRefresh CX stats")
for cx_name in self.cx_names.keys(): for cx_name in self.cx_names.keys():
@@ -477,12 +490,15 @@ class StaConnect(Realm):
ptest_b_tx = ptest['endpoint']['tx bytes'] ptest_b_tx = ptest['endpoint']['tx bytes']
ptest_b_rx = ptest['endpoint']['rx bytes'] ptest_b_rx = ptest['endpoint']['rx bytes']
# TODO: Return based on compares
self.compare_vals("testTCP-A TX", ptest_a_tx) self.compare_vals("testTCP-A TX", ptest_a_tx)
self.compare_vals("testTCP-A RX", ptest_a_rx) self.compare_vals("testTCP-A RX", ptest_a_rx)
self.compare_vals("testTCP-B TX", ptest_b_tx) self.compare_vals("testTCP-B TX", ptest_b_tx)
self.compare_vals("testTCP-B RX", ptest_b_rx) self.compare_vals("testTCP-B RX", ptest_b_rx)
return True
except Exception as e: except Exception as e:
self.error(e) self.error(e)
@@ -491,6 +507,47 @@ class StaConnect(Realm):
# self.test_results.append("FAILED message will fail") # self.test_results.append("FAILED message will fail")
# print("\n") # print("\n")
def generate_report(self, test_rig, test_tag, dut_hw_version, dut_sw_version, dut_model_num, dut_serial_num, test_id, csv_outfile):
report = lf_report.lf_report(_results_dir_name="sta_connect_test")
kpi_path = report.get_report_path()
print("kpi_path :{kpi_path}".format(kpi_path=kpi_path))
kpi_csv = lf_kpi_csv.lf_kpi_csv(
_kpi_path=kpi_path,
_kpi_test_rig=test_rig,
_kpi_test_tag=test_tag,
_kpi_dut_hw_version=dut_hw_version,
_kpi_dut_sw_version=dut_sw_version,
_kpi_dut_model_num=dut_model_num,
_kpi_dut_serial_num=dut_serial_num,
_kpi_test_id=test_id)
kpi_csv.kpi_dict['Units'] = "Mbps"
for endpoint in self.resulting_endpoints:
# print(endpoint)
kpi_csv.kpi_csv_get_dict_update_time()
kpi_csv.kpi_dict['short-description'] = "{endp_name}".format(
endp_name=self.resulting_endpoints[endpoint]['endpoint']['name'])
kpi_csv.kpi_dict['numeric-score'] = "{endp_tx}".format(
endp_tx=self.resulting_endpoints[endpoint]['endpoint']['tx bytes'])
kpi_csv.kpi_dict['Units'] = "bps"
kpi_csv.kpi_dict['Graph-Group'] = "Endpoint TX bytes"
kpi_csv.kpi_csv_write_dict(kpi_csv.kpi_dict)
kpi_csv.kpi_dict['short-description'] = "{endp_name}".format(
endp_name=self.resulting_endpoints[endpoint]['endpoint']['name'])
kpi_csv.kpi_dict['numeric-score'] = "{endp_rx}".format(
endp_rx=self.resulting_endpoints[endpoint]['endpoint']['rx bytes'])
kpi_csv.kpi_dict['Units'] = "bps"
kpi_csv.kpi_dict['Graph-Group'] = "Endpoint RX bytes"
kpi_csv.kpi_csv_write_dict(kpi_csv.kpi_dict)
if csv_outfile is not None:
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
csv_outfile = "{}_{}-sta_connect.csv".format(
csv_outfile, current_time)
csv_outfile = report.file_add_path(csv_outfile)
print("csv output file : {}".format(csv_outfile))
def cleanup(self): def cleanup(self):
for sta_name in self.station_names: for sta_name in self.station_names:
self.rm_port(sta_name) self.rm_port(sta_name)
@@ -519,7 +576,21 @@ Example:
parser.add_argument("--sta_mode", type=str, parser.add_argument("--sta_mode", type=str,
help="LANforge station-mode setting (see add_sta LANforge CLI documentation, default is 0 (auto))", default=0) help="LANforge station-mode setting (see add_sta LANforge CLI documentation, default is 0 (auto))", default=0)
parser.add_argument("--dut_bssid", type=str, help="DUT BSSID to which we expect to connect.", default="MyAP") parser.add_argument("--dut_bssid", type=str, help="DUT BSSID to which we expect to connect.", default="MyAP")
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="2m") parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="20s")
parser.add_argument("--test_rig", default="", help="test rig for kpi.csv, testbed that the tests are run on")
parser.add_argument("--test_tag", default="",
help="test tag for kpi.csv, test specific information to differentiate the test")
parser.add_argument("--dut_hw_version", default="",
help="dut hw version for kpi.csv, hardware version of the device under test")
parser.add_argument("--dut_sw_version", default="",
help="dut sw version for kpi.csv, software version of the device under test")
parser.add_argument("--dut_model_num", default="",
help="dut model for kpi.csv, model number / name of the device under test")
parser.add_argument("--dut_serial_num", default="",
help="dut serial for kpi.csv, serial number / serial number of the device under test")
parser.add_argument("--test_priority", default="", help="dut model for kpi.csv, test-priority is arbitrary number")
# parser.add_argument("--test_id", default="sta_connect", help="test-id for kpi.csv, script or test name")
parser.add_argument('--csv_outfile', help="--csv_outfile <Output file for csv data>", default="sta_connect_kpi")
args = parser.parse_args() args = parser.parse_args()
monitor_interval = Realm.parse_time(args.test_duration).total_seconds() monitor_interval = Realm.parse_time(args.test_duration).total_seconds()
@@ -530,10 +601,13 @@ Example:
_dut_bssid=args.dut_bssid, _dut_ssid=args.ssid, _dut_security=args.security, _dut_bssid=args.dut_bssid, _dut_ssid=args.ssid, _dut_security=args.security,
_sta_name=["sta0000"]) _sta_name=["sta0000"])
# TODO: check run return code
staConnect.run() staConnect.run()
staConnect.get_result_list() staConnect.get_result_list()
staConnect.generate_report(test_rig=args.test_rig, test_tag=args.test_tag, dut_hw_version=args.dut_hw_version,
dut_sw_version=args.dut_sw_version, dut_model_num=args.dut_model_num,
dut_serial_num=args.dut_serial_num, test_id=args.test_id, csv_outfile=args.csv_outfile)
if not staConnect.passes(): if not staConnect.passes():
print("FAIL: Some tests failed") print("FAIL: Some tests failed")
else: else: