Realm Monitor

This commit is contained in:
Matthew Stidham
2020-12-21 16:10:11 -08:00
parent b3d45ffe1d
commit 669cb65777
2 changed files with 138 additions and 71 deletions

View File

@@ -17,6 +17,7 @@ from LANforge.add_monitor import *
import os
import datetime
import base64
import xlsxwriter
def wpa_ent_list():
return [
@@ -1016,8 +1017,10 @@ class L3CXProfile(LFCliBase):
def monitor(self,duration_sec=60,
interval_sec=1,
col_names=None,
show=True,
report_file=None):
created_cx=None,
show=False,
report_file=None,
excel=None):
if (duration_sec is None) or (duration_sec <= 1):
raise ValueError("L3CXProfile::monitor wants duration_sec > 1 second")
if (interval_sec is None) or (interval_sec < 1):
@@ -1026,30 +1029,79 @@ class L3CXProfile(LFCliBase):
raise ValueError("L3CXProfile::monitor wants duration_sec > interval_sec")
if col_names is None:
raise ValueError("L3CXProfile::monitor wants a list of column names to monitor")
endps = ",".join(self.created_cx.keys())
time_results = {}
#Step 1, get a list of Layer 3 columns
lfcli=LFCliBase('localhost',8080)
if created_cx == None: #No user defined endpoints
try:
print('Loading Layer 3 Connections')
endps = ','.join([[*x.keys()][0] for x in lfcli.json_get('endp')['endpoint']])
except:
print('No layer 3 connections found')
else: #User defined Layer 3 columns
try:
print('Loading user defined Layer 3 Connections')
endps=created_cx
except:
print('Please format your col_names variable like the following:')
#Step 2, column names
fields=",".join(col_names)
report_fh = None
print('fields')
print(fields)
#Step 3, create report file
if (report_file is not None) and (report_file != ""):
report_fh = open(report_file, "w")
report_fh = open(report_file, "w+")
else:
pass
#report_fh = open(report_file, "w")
#Step 4, monitor columns
start_time = datetime.datetime.now()
end_time = start_time + datetime.timedelta(seconds=duration_sec)
while datetime.datetime.now() < end_time_d:
response = self.json_get("/endp/%s?fields=%s" % (endps, fields), debug_=self.debug)
if "endpoint" not in response:
pprint.pprint(response)
raise ValueError("no endpoint?")
value_map = {}
if show:
print("Show stuff here")
if datetime.datetime.now() > end_time_d:
print('endpoints')
print(endps)
value_map = dict()
while datetime.datetime.now() < end_time:
response = lfcli.json_get("/endp/%s?fields=%s" % (endps, fields), debug_=self.debug)
# lfcli.json_get("/endp/VTsta0000-0-B,VTsta0001-1")
if "endpoint" not in response:
print(response)
raise ValueError("no endpoint?")
if show:
print(response)
value_map[datetime.datetime.now()]=response
if datetime.datetime.now() > end_time:
break;
time.sleep(interval_sec)
if report_fh is not None:
report_fh.close()
#print(value_map)
#Step 5, close and save
endpoints=[x['endpoint'] for x in value_map.values()]
endpoints2=[]
for y in range(0,len(endpoints)):
for x in range(0,len(endpoints[0])):
endpoints2.append([*[*endpoints[y][x].values()][0].values()])
timestamps=[]
for timestamp in [*value_map.keys()]:
timestamps.extend([str(timestamp)]*4)
for point in range(0,len(endpoints2)):
endpoints2[point].insert(0,timestamps[point])
workbook = xlsxwriter.Workbook(report_file)
worksheet = workbook.add_worksheet()
print(col_names)
print(type(col_names))
header_row=col_names
header_row.insert(0,'Timestamp')
print(header_row)
for col_num,data in enumerate(header_row):
worksheet.write(0,col_num,data)
row_num = 1
for x in endpoints2:
for col_num, data in enumerate(x):
worksheet.write(row_num,col_num,str(data))
row_num+=1
workbook.close()
def refresh_cx(self):

View File

@@ -16,7 +16,7 @@ from LANforge import LFUtils
import realm
import time
import datetime
from realm import TestGroupProfile
class IPV4VariableTime(LFCliBase):
def __init__(self,
@@ -28,7 +28,17 @@ class IPV4VariableTime(LFCliBase):
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
super().__init__(host, port,
_local_realm = realm.Realm(lfclient_host=host,
lfclient_port=port,
debug_=_debug_on,
halt_on_error_=_exit_on_error),
_debug=_debug_on,
_halt_on_error=_exit_on_error,
_exit_on_fail=_exit_on_fail),
self.l3cxprofile = realm.L3CXProfile(lfclient_host=host,
lfclient_port=port,
local_realm=self.local_realm)
self.upstream = upstream
self.host = host
self.port = port
@@ -43,10 +53,8 @@ class IPV4VariableTime(LFCliBase):
self.debug = _debug_on
self.name_prefix = name_prefix
self.test_duration = test_duration
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_l3_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
@@ -59,6 +67,7 @@ class IPV4VariableTime(LFCliBase):
self.station_profile.mode = mode
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap",self.ap)
#self.station_list= LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=2, padding_number_=10000, radio='wiphy0') #Make radio a user defined variable from terminal.
self.cx_profile.host = self.host
@@ -168,7 +177,6 @@ class IPV4VariableTime(LFCliBase):
self.cx_profile.create(endp_type="lf_udp", side_a=self.station_profile.station_names, side_b=self.upstream, sleep_time=0)
self._pass("PASS: Station build finished")
def main():
parser = LFCliBase.create_basic_argparse(
prog='test_ipv4_variable_time.py',
@@ -216,7 +224,7 @@ Generic command layout:
parser.add_argument('--a_min', help='--a_min bps rate minimum for side_a', default=256000)
parser.add_argument('--b_min', help='--b_min bps rate minimum for side_b', default=256000)
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="2m")
required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True)
#required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True)
optional.add_argument('--mode',help='Used to force mode of stations')
optional.add_argument('--ap',help='Used to force a connection to a particular AP')
args = parser.parse_args()
@@ -247,6 +255,12 @@ Generic command layout:
print(ip_var_test.get_fail_message())
ip_var_test.exit_fail()
ip_var_test.start(False, False)
print('ip_var_cx_names')
print(ip_var_test.cx_profile.get_cx_names())
ip_var_test.l3cxprofile.monitor(col_names=['Name','Tx Rate','Rx Rate','Tx PDUs','Rx PDUs'],
report_file='/home/lanforge/report-data/'+str(datetime.datetime.now())+'test_ipv4_variable_time.json',
duration_sec=10)
ip_var_test.stop()
if not ip_var_test.passes():
print(ip_var_test.get_fail_message())
@@ -256,6 +270,7 @@ Generic command layout:
if ip_var_test.passes():
ip_var_test.exit_success()
IPV4VariableTime.cx_profile.stop_cx()
if __name__ == "__main__":
main()