diff --git a/py-json/realm.py b/py-json/realm.py index 88c1f9ca..0f58ae9f 100755 --- a/py-json/realm.py +++ b/py-json/realm.py @@ -17,6 +17,7 @@ from LANforge.add_monitor import * import os import datetime import base64 +import xlsxwriter def wpa_ent_list(): return [ @@ -265,7 +266,7 @@ class Realm(LFCliBase): if endp_name.startswith(prefix): self.rm_endp(endp_name) else: - if self.debug: + if self.debug: print("cleanup_cxe_prefix no endpoints: endp_list{}".format(endp_list) ) def channel_freq(self, channel_=0): @@ -589,20 +590,20 @@ class Realm(LFCliBase): print("Incrementing stations with IP addresses found") num_sta_with_ips+=1 else: - num_sta_with_ips+=1 + num_sta_with_ips+=1 if ipv6: v = response['interface'] if (v['ip'] in waiting_states): if debug: print("Waiting for port %s to get IPv6 Address."%(sta_eid)) - + else: if debug: print("Found IP: %s on port: %s"%(v['ip'], sta_eid)) print("Incrementing stations with IP addresses found") num_sta_with_ips+=1 else: - num_sta_with_ips+=1 + num_sta_with_ips+=1 return num_sta_with_ips @@ -614,7 +615,7 @@ class Realm(LFCliBase): dur_time = int(td.group(1)) dur_measure = str(td.group(2)) if dur_measure == "d": - duration_sec = dur_time * 24 * 60 * 60 + duration_sec = dur_time * 24 * 60 * 60 elif dur_measure == "h": duration_sec = dur_time * 60 * 60 elif dur_measure == "m": @@ -1011,13 +1012,15 @@ class L3CXProfile(LFCliBase): self.data = {} for cx_name in self.get_cx_names(): self.data[cx_name] = self.json_get("/cx/" + cx_name).get(cx_name) - return self.data + return self.data - def monitor(self, duration_sec=60, + def monitor(self,duration_sec=60, interval_sec=1, col_names=None, - show=True, - report_file=None): + created_cx=None, + show=False, + report_file=None, + excel=None): if (duration_sec is None) or (duration_sec <= 1): raise ValueError("L3CXProfile::monitor wants duration_sec > 1 second") if (interval_sec is None) or (interval_sec < 1): @@ -1026,30 +1029,79 @@ class L3CXProfile(LFCliBase): raise ValueError("L3CXProfile::monitor wants duration_sec > interval_sec") if col_names is None: raise ValueError("L3CXProfile::monitor wants a list of column names to monitor") - endps = ",".join(self.created_cx.keys()) - time_results = {} + #Step 1, get a list of Layer 3 columns + lfcli=LFCliBase('localhost',8080) + if created_cx == None: #No user defined endpoints + try: + print('Loading Layer 3 Connections') + endps = ','.join([[*x.keys()][0] for x in lfcli.json_get('endp')['endpoint']]) + except: + print('No layer 3 connections found') + else: #User defined Layer 3 columns + try: + print('Loading user defined Layer 3 Connections') + endps=created_cx + except: + print('Please format your col_names variable like the following:') + #Step 2, column names fields=",".join(col_names) - report_fh = None + print('fields') + print(fields) + #Step 3, create report file if (report_file is not None) and (report_file != ""): - report_fh = open(report_file, "w") - + report_fh = open(report_file, "w+") + else: + pass + #report_fh = open(report_file, "w") + #Step 4, monitor columns start_time = datetime.datetime.now() end_time = start_time + datetime.timedelta(seconds=duration_sec) - while datetime.datetime.now() < end_time_d: - response = self.json_get("/endp/%s?fields=%s" % (endps, fields), debug_=self.debug) - if "endpoint" not in response: - pprint.pprint(response) - raise ValueError("no endpoint?") - value_map = {} - if show: - print("Show stuff here") - if datetime.datetime.now() > end_time_d: + print('endpoints') + print(endps) + value_map = dict() + while datetime.datetime.now() < end_time: + response = lfcli.json_get("/endp/%s?fields=%s" % (endps, fields), debug_=self.debug) + # lfcli.json_get("/endp/VTsta0000-0-B,VTsta0001-1") + if "endpoint" not in response: + print(response) + raise ValueError("no endpoint?") + if show: + print(response) + value_map[datetime.datetime.now()]=response + if datetime.datetime.now() > end_time: break; time.sleep(interval_sec) - if report_fh is not None: - report_fh.close() + #print(value_map) + + #Step 5, close and save + endpoints=[x['endpoint'] for x in value_map.values()] + endpoints2=[] + for y in range(0,len(endpoints)): + for x in range(0,len(endpoints[0])): + endpoints2.append([*[*endpoints[y][x].values()][0].values()]) + timestamps=[] + for timestamp in [*value_map.keys()]: + timestamps.extend([str(timestamp)]*4) + for point in range(0,len(endpoints2)): + endpoints2[point].insert(0,timestamps[point]) + workbook = xlsxwriter.Workbook(report_file) + worksheet = workbook.add_worksheet() + print(col_names) + print(type(col_names)) + header_row=col_names + header_row.insert(0,'Timestamp') + print(header_row) + for col_num,data in enumerate(header_row): + worksheet.write(0,col_num,data) + row_num = 1 + for x in endpoints2: + for col_num, data in enumerate(x): + worksheet.write(row_num,col_num,str(data)) + row_num+=1 + workbook.close() + def refresh_cx(self): @@ -1523,7 +1575,7 @@ class GenCXProfile(LFCliBase): self.local_realm.json_post("/cli-json/nc_show_endpoints", {"endpoint": "all"}) time.sleep(sleep_time) - + for endp_tpl in endp_tpls: gen_name_a = endp_tpl[2] gen_name_b = endp_tpl[3] @@ -1990,7 +2042,7 @@ class VAPProfile(LFCliBase): if self.wifi_extra_data_modified: wifi_extra_r.addPostData(self.wifi_extra_data) json_response = wifi_extra_r.jsonPost(debug) - + port_list = self.local_realm.json_get("port/1/1/list") if port_list is not None: diff --git a/py-scripts/test_ipv4_variable_time.py b/py-scripts/test_ipv4_variable_time.py index 26a08ab3..e223339d 100755 --- a/py-scripts/test_ipv4_variable_time.py +++ b/py-scripts/test_ipv4_variable_time.py @@ -16,7 +16,7 @@ from LANforge import LFUtils import realm import time import datetime - +from realm import TestGroupProfile class IPV4VariableTime(LFCliBase): def __init__(self, @@ -28,7 +28,17 @@ class IPV4VariableTime(LFCliBase): _debug_on=False, _exit_on_error=False, _exit_on_fail=False): - super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail) + super().__init__(host, port, + _local_realm = realm.Realm(lfclient_host=host, + lfclient_port=port, + debug_=_debug_on, + halt_on_error_=_exit_on_error), + _debug=_debug_on, + _halt_on_error=_exit_on_error, + _exit_on_fail=_exit_on_fail), + self.l3cxprofile = realm.L3CXProfile(lfclient_host=host, + lfclient_port=port, + local_realm=self.local_realm) self.upstream = upstream self.host = host self.port = port @@ -43,10 +53,8 @@ class IPV4VariableTime(LFCliBase): self.debug = _debug_on self.name_prefix = name_prefix self.test_duration = test_duration - self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port) self.station_profile = self.local_realm.new_station_profile() self.cx_profile = self.local_realm.new_l3_cx_profile() - self.station_profile.lfclient_url = self.lfclient_url self.station_profile.ssid = self.ssid self.station_profile.ssid_pass = self.password @@ -58,7 +66,8 @@ class IPV4VariableTime(LFCliBase): self.station_profile.mode = 9 self.station_profile.mode = mode if self.ap is not None: - self.station_profile.set_command_param("add_sta", "ap",self.ap) + self.station_profile.set_command_param("add_sta", "ap",self.ap) + #self.station_list= LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=2, padding_number_=10000, radio='wiphy0') #Make radio a user defined variable from terminal. self.cx_profile.host = self.host @@ -120,7 +129,7 @@ class IPV4VariableTime(LFCliBase): while curr_time < end_time: time.sleep(sleep_interval.total_seconds()) - + new_cx_rx_values = self.__get_rx_values() if self.debug: print(old_cx_rx_values, new_cx_rx_values) @@ -157,7 +166,7 @@ class IPV4VariableTime(LFCliBase): debug=self.debug) def build(self): - + self.station_profile.use_security(self.security, self.ssid, self.password) self.station_profile.set_number_template(self.number_template) print("Creating stations") @@ -168,7 +177,6 @@ class IPV4VariableTime(LFCliBase): self.cx_profile.create(endp_type="lf_udp", side_a=self.station_profile.station_names, side_b=self.upstream, sleep_time=0) self._pass("PASS: Station build finished") - def main(): parser = LFCliBase.create_basic_argparse( prog='test_ipv4_variable_time.py', @@ -178,45 +186,45 @@ def main(): ''', description='''\ -test_ipv4_variable_time.py: --------------------- -Generic command layout: + test_ipv4_variable_time.py: + -------------------- + Generic command layout: - python3 ./test_ipv4_variable_time.py - --upstream_port eth1 - --radio wiphy0 - --num_stations 32 - --security {open|wep|wpa|wpa2|wpa3} \\ - --mode 1 - {"auto" : "0", - "a" : "1", - "b" : "2", - "g" : "3", - "abg" : "4", - "abgn" : "5", - "bgn" : "6", - "bg" : "7", - "abgnAC" : "8", - "anAC" : "9", - "an" : "10", - "bgnAC" : "11", - "abgnAX" : "12", - "bgnAX" : "13", - --ssid netgear - --password admin123 - --test_duration 2m (default) - --a_min 1000 - --b_min 1000 - --ap "00:0e:8e:78:e1:76" - --debug - ''') + python3 ./test_ipv4_variable_time.py + --upstream_port eth1 + --radio wiphy0 + --num_stations 32 + --security {open|wep|wpa|wpa2|wpa3} \\ + --mode 1 + {"auto" : "0", + "a" : "1", + "b" : "2", + "g" : "3", + "abg" : "4", + "abgn" : "5", + "bgn" : "6", + "bg" : "7", + "abgnAC" : "8", + "anAC" : "9", + "an" : "10", + "bgnAC" : "11", + "abgnAX" : "12", + "bgnAX" : "13", + --ssid netgear + --password admin123 + --test_duration 2m (default) + --a_min 1000 + --b_min 1000 + --ap "00:0e:8e:78:e1:76" + --debug + ''') optional = parser.add_argument_group('optional arguments') required = parser.add_argument_group('required arguments') parser.add_argument('--a_min', help='--a_min bps rate minimum for side_a', default=256000) parser.add_argument('--b_min', help='--b_min bps rate minimum for side_b', default=256000) parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="2m") - required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True) + #required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True) optional.add_argument('--mode',help='Used to force mode of stations') optional.add_argument('--ap',help='Used to force a connection to a particular AP') args = parser.parse_args() @@ -232,11 +240,11 @@ Generic command layout: ssid=args.ssid, password=args.passwd, radio=args.radio, - security=args.security, - test_duration=args.test_duration, + security=args.security, + test_duration=args.test_duration, use_ht160=False, - side_a_min_rate=args.a_min, - side_b_min_rate=args.b_min, + side_a_min_rate=args.a_min, + side_b_min_rate=args.b_min, mode=args.mode, ap=args.ap, _debug_on=args.debug) @@ -247,6 +255,12 @@ Generic command layout: print(ip_var_test.get_fail_message()) ip_var_test.exit_fail() ip_var_test.start(False, False) + print('ip_var_cx_names') + print(ip_var_test.cx_profile.get_cx_names()) + + ip_var_test.l3cxprofile.monitor(col_names=['Name','Tx Rate','Rx Rate','Tx PDUs','Rx PDUs'], + report_file='/home/lanforge/report-data/'+str(datetime.datetime.now())+'test_ipv4_variable_time.json', + duration_sec=10) ip_var_test.stop() if not ip_var_test.passes(): print(ip_var_test.get_fail_message()) @@ -256,6 +270,7 @@ Generic command layout: if ip_var_test.passes(): ip_var_test.exit_success() + IPV4VariableTime.cx_profile.stop_cx() if __name__ == "__main__": main()