mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
Added method to set all cx states at once, finished basic outline for testing function
This commit is contained in:
@@ -38,15 +38,34 @@ class IPV4VariableTime(LFCliBase):
|
||||
self.cx_profile = self.local_realm.new_l3_cx_profile()
|
||||
self.test_duration = test_duration
|
||||
|
||||
def __set_all_cx_state(self, state, sleep_time=5):
|
||||
print("Setting CX States to %s" % state)
|
||||
cx_list = list(self.local_realm.cx_list())
|
||||
for cx_name in cx_list:
|
||||
if cx_name != 'handler' or cx_name != 'uri':
|
||||
req_url = "cli-json/set_cx_state"
|
||||
data = {
|
||||
"test_mgr": "default_tm",
|
||||
"cx_name": cx_name,
|
||||
"cx_state": state
|
||||
}
|
||||
|
||||
super().json_post(req_url, data)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
def run_test(self):
|
||||
cur_time = datetime.datetime.now()
|
||||
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
|
||||
while cur_time != end_time:
|
||||
self.__set_all_cx_state("RUNNING")
|
||||
while cur_time < end_time:
|
||||
#print(cur_time, end_time)
|
||||
cur_time = datetime.datetime.now()
|
||||
# Run test
|
||||
time.sleep(1)
|
||||
self.__set_all_cx_state("STOPPED")
|
||||
|
||||
def cleanup(self):
|
||||
print("Cleaning up stations")
|
||||
port_list = self.local_realm.station_list()
|
||||
sta_list = []
|
||||
for item in list(port_list):
|
||||
@@ -64,6 +83,29 @@ class IPV4VariableTime(LFCliBase):
|
||||
# print(data)
|
||||
super().json_post(req_url, data)
|
||||
|
||||
cx_list = list(self.local_realm.cx_list())
|
||||
print("Cleaning up cxs")
|
||||
for cx_name in cx_list:
|
||||
if cx_name != 'handler' or cx_name != 'uri':
|
||||
req_url = "cli-json/rm_cx"
|
||||
data = {
|
||||
"test_mgr": "default_tm",
|
||||
"cx_name": cx_name
|
||||
}
|
||||
super().json_post(req_url, data)
|
||||
|
||||
print("Cleaning up endps")
|
||||
endp_list = super().json_get("/endp")
|
||||
if endp_list is not None:
|
||||
endp_list = list(endp_list['endpoint'])
|
||||
for endp_name in range(len(endp_list)):
|
||||
name = list(endp_list[endp_name])[0]
|
||||
req_url = "cli-json/rm_endp"
|
||||
data = {
|
||||
"endp_name": name
|
||||
}
|
||||
super().json_post(req_url, data)
|
||||
|
||||
def run(self):
|
||||
super().clear_test_results()
|
||||
print("Cleaning up old stations")
|
||||
@@ -88,8 +130,9 @@ def main():
|
||||
lfjson_port = 8080
|
||||
ip_var_test = IPV4VariableTime(lfjson_host, lfjson_port, prefix="00", ssid="jedway-wpa2-x2048-4-4",
|
||||
password="jedway-wpa2-x2048-4-4",
|
||||
security="open", num_stations=1)
|
||||
security="open", num_stations=10, test_duration="1m")
|
||||
ip_var_test.run()
|
||||
ip_var_test.run_test()
|
||||
ip_var_test.cleanup()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user