test_l3_longevity.py : steps to make more generic, created ./sandbox/test_l3_longevity_cisco.py as a temporary holding spot.

This commit is contained in:
Chuck SmileyRekiere
2021-05-06 11:37:34 -06:00
parent 5ec3bbb245
commit 43943f6327
2 changed files with 170 additions and 185 deletions

View File

@@ -0,0 +1,168 @@
#!/usr/bin/python3
'''
NAME:
lf_check.py
PURPOSE:
Script is used to run a series of tests to verifiy realm changes.
EXAMPLE:
NOTES:
'''
import sys
if sys.version_info[0] != 3:
print("This script requires Python3")
exit()
import logging
import time
from time import sleep
import argparse
import pexpect
import serial
from pexpect_serial import SerialSpawn
import json
from json import load
from pprint import *
# see https://stackoverflow.com/a/13306095/11014343
class FileAdapter(object):
def __init__(self, logger):
self.logger = logger
def write(self, data):
# NOTE: data can be a partial line, multiple lines
data = data.strip() # ignore leading/trailing whitespace
if data: # non-blank
self.logger.info(data)
def flush(self):
pass # leave it to logging to flush properly
class lf_check():
# Functions in this section are/can be overridden by descendants
def readConfigContents(self, config_file):
success = True
if 'TEST_CONFIG' in config_file.sections():
section = config_file['TEST_CONFIG']
try:
lf_globals.test_list = json.loads(section.get(lf_testlist,lf_gloabs.test_list))
print("test list retrieved")
print("test list: {}".format(lf_globals.test_list))
except:
print("no test list")
def read_ap_stats(self):
# 5ghz: wl -i wl1 bs_data 2.4ghz# wl -i wl0 bs_data
stats_5ghz = "wl -i wl1 bs_data"
stats_24ghz = "w1 -i wl0 bs_data"
ap_data = ""
ap_stats = []
command = stats_5ghz
'''if band == "5ghz":
command = stats_5ghz
else:
command = stats_24ghz'''
#try:
# configure the serial interface
ser = serial.Serial("/dev/ttyUSB0", int(115200), timeout=5)
egg = SerialSpawn(ser)
egg.sendline(str(command))
egg.expect([pexpect.TIMEOUT], timeout=2) # do not detete line, waits for output
ap_data = egg.before.decode('utf-8','ignore')
#except:
# print("WARNING unable to read AP")
'''ap_stats = "\
\
" '''
ap_stats.append("root@Docsis-Gateway:~# wl -i wl1 bs_data")
ap_stats.append("Station Address PHY Mbps Data Mbps Air Use Data Use Retries bw mcs Nss ofdma mu-mimo")
ap_stats.append("50:E0:85:87:AA:19 1016.6 48.9 6.5% 24.4% 16.6% 80 9.7 2 0.0% 0.0%")
ap_stats.append("50:E0:85:84:7A:E7 880.9 52.2 7.7% 26.1% 20.0% 80 8.5 2 0.0% 0.0%")
ap_stats.append("50:E0:85:89:5D:00 840.0 47.6 6.4% 23.8% 2.3% 80 8.0 2 0.0% 0.0%")
ap_stats.append("50:E0:85:87:5B:F4 960.7 51.5 5.9% 25.7% 0.0% 80 9 2 0.0% 0.0%")
ap_stats.append("(overall) - 200.2 26.5% - -")
# TODO: Read real stats, comment out the example above.
return ap_stats
'''root@Docsis-Gateway:~# wl -i wl1 bs_data
Station Address PHY Mbps Data Mbps Air Use Data Use Retries bw mcs Nss ofdma mu-mimo
50:E0:85:87:AA:19 1064.5 52.8 6.0% 25.0% 1.5% 80 10.0 2 0.0% 0.0%
50:E0:85:84:7A:E7 927.1 53.6 7.0% 25.4% 5.7% 80 8.8 2 0.0% 0.0%
50:E0:85:89:5D:00 857.5 51.8 6.8% 24.6% 0.8% 80 8 2 0.0% 0.0%
50:E0:85:87:5B:F4 1071.7 52.8 6.0% 25.0% 1.3% 80 10 2 0.0% 0.0%
(overall) - 210.9 25.8% - -'''
'''I have some un-tested code that is starting point for querying Comcast AP in the l3_longevity script.
When still needs doing: query the data from the AP, and test that my parsing and CSV logic is working,
also add cmd-line arg to enable this or not. Would you have time to work on this and coordinate test time on
customer's system to test against their AP? Access to AP is probably ssh, possibly serial or telnet.
Firas @ Comcast can help clarify that.'''
def parse_ap_stats(self):
# Query AP for its stats. Result for /ax bcm APs looks something like this:
ap_stats = self.read_ap_stats()
ap_stats_rows = [] # Array of Arrays
for line in ap_stats:
stats_row = line.split()
ap_stats_rows.append(stats_row)
# - is this needed ?m = re.search((r'(\S+)\s+(\S+)\s+(Data Mbps)\s+(Air Use)+'ap_stats[0]
# Query all of our ports
#port_eids = self.gather_port_eids()
#for eid_name in port_eids:
# eid = self.name_to_eid(eid_name)
# url = "/port/%s/%s/%s"%(eid[0], eid[1], eid[2])
# response = self.json_get(url)
# if (response is None) or ("interface" not in response):
# print("query-port: %s: incomplete response:"%(url))
# pprint(response)
# else:
# note changed the indent
#p = response['interface']
mac = ["50:E0:85:87:AA:19","50:E0:85:84:7A:E7","50:E0:85:89:5D:00","50:E0:85:87:5B:F4"]
#mac = "50:E0:85:87:AA:19"
#mac = "50:E0:85:84:7A:E7"
#mac = "50:E0:85:89:5D:00"
#mac = "50:E0:85:87:5B:F4"
ap_row = []
i = 0
for row in ap_stats_rows:
if row[0] in mac:
#if row[0].lower == mac.lower():
ap_row = row
print("ap_row: {}".format(ap_row))
# p is map of key/values for this port
#print("ap_row : {}".format(ap_row))
#pprint(ap_row)
# Find latency, jitter for connections using this port.
#latency, jitter, tput = self.get_endp_stats_for_port(p["port"], endps)
# ap_stats_col_titles = ['Station Address','PHY Mbps','Data Mbps','Air Use','Data Use','Retries','bw','mcs','Nss','ofdma','mu-mimo'
# self.write_port_csv(len(temp_stations_list), ul, dl, ul_pdu_str, dl_pdu_str, atten_val, eid_name, p,
# latency, jitter, tput, ap_row, ap_stats_col_titles
def main():
check = lf_check()
check.parse_ap_stats()
if __name__ == '__main__':
main()

View File

@@ -48,7 +48,6 @@ import time
import datetime
import subprocess
import csv
import random
# This class handles running the test and generating reports.
class L3VariableTime(Realm):
@@ -426,130 +425,6 @@ class L3VariableTime(Realm):
print("new-list:",new_list)
return False
# Verify communication to cisco controller is as expected.
# Can add support for different controllers by editing this
# or creating similar methods for different controllers.
def verify_controller(self):
if self.args == None:
return
if self.args.cisco_ctlr == None:
return
try:
print("scheme {} ctlr {} user {} passwd {} AP {} series {} band {} action {}".format(self.args.cisco_scheme,self.args.cisco_ctlr,self.args.cisco_user,
self.args.cisco_passwd, self.args.cisco_ap, self.args.cisco_series, self.args.cisco_band,"summary"))
ctl_output = subprocess.run(["../wifi_ctl_9800_3504.py", "--scheme", self.args.cisco_scheme, "-d", self.args.cisco_ctlr, "-u",
self.args.cisco_user, "-p", self.args.cisco_passwd,
"-a", self.args.cisco_ap,"--series", self.args.cisco_series,"--action", "summary"], capture_output=True)
pss = ctl_output.stdout.decode('utf-8', 'ignore')
print(pss)
except subprocess.CalledProcessError as process_error:
print("Controller unable to commicate to AP or unable to communicate to controller error code: {} output {}"
.format(process_error.returncode, process_error.output))
time.sleep(1)
exit(1)
# Find our station count
searchap = False
for line in pss.splitlines():
if (line.startswith("---------")):
searchap = True
continue
#TODO need to test with 9800 series to chelck the values
if (searchap):
pat = "%s\s+\S+\s+\S+\s+\S+\s+\S+.* \S+\s+\S+\s+(\S+)\s+\["%(self.args.cisco_ap)
#print("AP line: %s"%(line))
m = re.search(pat, line)
if (m != None):
sta_count = m.group(1)
print("AP line: %s"%(line))
print("sta-count: %s"%(sta_count))
if (int(sta_count) != int(self.total_stas)):
print("WARNING: Cisco Controller reported %s stations, should be %s"%(sta_count, self.total_stas))
if self.args.cap_ctl_out:
pss = ctl_output.stdout.decode('utf-8', 'ignore')
print(pss)
#advanced (showes summary)
#./wifi_ctl_9800_3504.py --scheme ssh -d 172.19.36.168 -p <controller_pw> --port 23 -a "9120-Chamber-1" --band a --action advanced --series 9800
def controller_show_ap_channel(self):
advanced = subprocess.run(["../wifi_ctl_9800_3504.py", "--scheme", self.args.cisco_scheme, "-d", self.args.cisco_ctlr, "-u",
self.args.cisco_user, "-p", self.args.cisco_passwd,
"-a", self.args.cisco_ap,"--series", self.args.cisco_series, "--action", "ap_channel"], capture_output=True)
pss = advanced.stdout.decode('utf-8', 'ignore')
print(pss)
if self.args.cisco_series == "9800":
for line in pss.splitlines():
search_str = self.args.cisco_ap
print("line {}".format(line))
element_list = line.lstrip().split()
print("element_list {}".format(element_list))
if (line.lstrip().startswith(search_str)):
print("line {}".format(line))
element_list = line.lstrip().split()
print("element_list {}".format(element_list))
# AP Name (0) mac (1) slot (2) Admin State [enable/disable] (3) Oper State [Up/Down] (4) Width (5) Txpwr (6,7) channel (8) mode (9)
print("ap: {} slof {} channel {} chan_width {}".format(element_list[0],element_list[2],element_list[8],element_list[5]))
if (str(self.args.cisco_channel) in str(element_list[8])) and (str(self.args.cisco_chan_width) in str(element_list[5])):
print("ap {} configuration successful: channel {} in expected {} chan_width {} in expected {}"
.format(element_list[0],self.args.cisco_channel,element_list[8],self.args.cisco_chan_width,element_list[5]))
else:
print("WARNING ap {} configuration: channel {} in expected {} chan_width {} in expected {}"
.format(element_list[0],self.args.cisco_channel,element_list[8],self.args.cisco_chan_width,element_list[5]))
break
else:
print("checking for 802.11{}".format(self.args.cisco_band))
for line in pss.splitlines():
#print("line {}".format(line))
search_str = "802.11{}".format(self.args.cisco_band)
if (line.lstrip().startswith(search_str)):
print("line {}".format(line))
element_list = line.lstrip().split()
print("element_list {}".format(element_list))
print("ap: {} channel {} chan_width {}".format(self.args.cisco_ap,element_list[4],element_list[5]))
if (str(self.args.cisco_channel) in str(element_list[4])) and (str(self.args.cisco_chan_width) in str(element_list[5])):
print("ap configuration successful: channel {} in expected {} chan_width {} in expected {}"
.format(self.args.cisco_channel,element_list[4],self.args.cisco_chan_width,element_list[5]))
else:
print("AP WARNING: channel {} expected {} chan_width {} expected {}"
.format(element_list[4],self.cisco_channel,element_list[5],self.args.cisco_chan_width))
break
print("configure ap {} channel {} chan_width {}".format(self.args.cisco_ap,self.args.cisco_channel,self.args.cisco_chan_width))
# Verify channel and channel width.
# This script supports resetting ports, allowing one to test AP/controller under data load
# while bouncing wifi stations. Check here to see if we should reset ports.
def reset_port_check(self):
for station_profile in self.station_profiles:
if station_profile.reset_port_extra_data['reset_port_enable']:
if station_profile.reset_port_extra_data['reset_port_timer_started'] == False:
print("reset_port_time_min: {}".format(station_profile.reset_port_extra_data['reset_port_time_min']))
print("reset_port_time_max: {}".format(station_profile.reset_port_extra_data['reset_port_time_max']))
station_profile.reset_port_extra_data['seconds_till_reset'] = \
random.randint(station_profile.reset_port_extra_data['reset_port_time_min'],\
station_profile.reset_port_extra_data['reset_port_time_max'])
station_profile.reset_port_extra_data['reset_port_timer_started'] = True
print("on radio {} seconds_till_reset {}".format(station_profile.add_sta_data['radio'],station_profile.reset_port_extra_data['seconds_till_reset']))
else:
station_profile.reset_port_extra_data['seconds_till_reset'] = station_profile.reset_port_extra_data['seconds_till_reset'] - 1
if self.debug: print("radio: {} countdown seconds_till_reset {}".format(station_profile.add_sta_data['radio'] ,station_profile.reset_port_extra_data['seconds_till_reset']))
if ((station_profile.reset_port_extra_data['seconds_till_reset'] <= 0)):
station_profile.reset_port_extra_data['reset_port_timer_started'] = False
port_to_reset = random.randint(0,len(station_profile.station_names)-1)
print("reset on radio {} station: {}".format(station_profile.add_sta_data['radio'],station_profile.station_names[port_to_reset]))
self.reset_port(station_profile.station_names[port_to_reset])
# Cleanup any older config that a previous run of this test may have created.
def pre_cleanup(self):
@@ -723,7 +598,6 @@ class L3VariableTime(Realm):
for atten_idx in self.attenuators:
self.set_atten(atten_idx, atten_val)
self.verify_controller()
print("Starting multicast traffic (if any configured)")
self.multicast_profile.start_mc(debug_=self.debug)
self.multicast_profile.refresh_mc(debug_=self.debug)
@@ -750,9 +624,9 @@ class L3VariableTime(Realm):
#interval_time = cur_time + datetime.timedelta(seconds=5)
interval_time = cur_time + datetime.timedelta(seconds=self.polling_interval_seconds)
#print("polling_interval_seconds {}".format(self.polling_interval_seconds))
while cur_time < interval_time:
cur_time = datetime.datetime.now()
self.reset_port_check()
time.sleep(1)
self.epoch_time = int(time.time())
@@ -1044,13 +918,6 @@ python .\\test_l3_longevity.py --test_duration <duration> --endp_type <traffic t
--radio "radio==<radio> stations==<number stations> ssid==<ssid> ssid_pw==<ssid password> security==<security type: wpa2, open, wpa3>" --debug
Multiple radios may be entered with individual --radio switches
generic command with controller setting channel and channel width test duration 5 min
python3 test_l3_longevity.py --cisco_ctlr <IP> --cisco_dfs True/False --mgr <Lanforge IP>
--cisco_channel <channel> --cisco_chan_width <20,40,80,120> --endp_type 'lf_udp lf_tcp mc_udp' --upstream_port <1.ethX>
--radio "radio==<radio 0 > stations==<number stations> ssid==<ssid> ssid_pw==<ssid password> security==<wpa2 , open>"
--radio "radio==<radio 1 > stations==<number stations> ssid==<ssid> ssid_pw==<ssid password> security==<wpa2 , open>"
--test_duration 5m
# UDP bi-directional test, no use of controller.
/test_l3_longevity.py --mgr localhost --endp_type 'lf_udp lf_tcp' --upstream_port 1.1.eth1 \
--radio "radio==1.1.wiphy0 stations==10 ssid==ASUS_70 ssid_pw==[BLANK] security==open" \
@@ -1077,16 +944,6 @@ BK, BE, VI, VO: Optional wifi related Tos Settings. Or, use your preferred num
#################################
#Command switches
#################################
--cisco_ctlr <IP of Cisco Controller>',default=None
--cisco_user <User-name for Cisco Controller>',default="admin"
--cisco_passwd <Password for Cisco Controller>',default="Cisco123
--cisco_prompt <Prompt for Cisco Controller>',default="(Cisco Controller) >
--cisco_ap <Cisco AP in question>',default="APA453.0E7B.CF9C"
--cisco_dfs <True/False>',default=False
--cisco_channel <channel>',default=None , no change
--cisco_chan_width <20 40 80 160>',default="20",choices=["20","40","80","160"]
--cisco_band <a | b | abgn>',default="a",choices=["a", "b", "abgn"]
--mgr <hostname for where LANforge GUI is running>',default='localhost'
-d / --test_duration <how long to run> example --time 5d (5 days) default: 3m options: number followed by d, h, m or s',default='3m'
@@ -1113,49 +970,9 @@ python3 .\\test_l3_longevity.py --test_duration 4m --endp_type \"lf_tcp lf_udp m
--radio "radio==wiphy0 stations==32 ssid==candelaTech-wpa2-x2048-4-1 ssid_pw==candelaTech-wpa2-x2048-4-1 security==wpa2"
--radio "radio==wiphy1 stations==64 ssid==candelaTech-wpa2-x2048-5-3 ssid_pw==candelaTech-wpa2-x2048-5-3 security==wpa2"
Example #2 using cisco controller
1. cisco controller at 192.168.100.112
2. cisco dfs True
3. cisco channel 52
4. cisco channel width 20
5. traffic 'lf_udp lf_tcp mc_udp'
6. upstream port eth3
7. radio #0 wiphy0 stations 3 ssid test_candela ssid_pw [BLANK] secruity Open
8. radio #1 wiphy1 stations 16 ssid test_candela ssid_pw [BLANK] security Open
9. lanforge manager at 192.168.100.178
10. duration 5m
Command:
python3 test_l3_longevity.py --cisco_ctlr 192.168.100.112 --cisco_dfs True --mgr 192.168.100.178
--cisco_channel 52 --cisco_chan_width 20 --endp_type 'lf_udp lf_tcp mc_udp' --upstream_port 1.eth3
--radio "radio==1.wiphy0 stations==3 ssid==test_candela ssid_pw==[BLANK] security==open"
--radio "radio==1.wiphy1 stations==16 ssid==test_candela ssid_pw==[BLANK] security==open"
--test_duration 5m
''')
parser.add_argument('--cisco_ctlr', help='--cisco_ctlr <IP of Cisco Controller>',default=None)
parser.add_argument('--cisco_user', help='--cisco_user <User-name for Cisco Controller>',default="admin")
parser.add_argument('--cisco_passwd', help='--cisco_passwd <Password for Cisco Controller>',default="Cisco123")
parser.add_argument('--cisco_prompt', help='--cisco_prompt <Prompt for Cisco Controller>',default="\(Cisco Controller\) >")
parser.add_argument('--cisco_ap', help='--cisco_ap <Cisco AP in question>',default="APA453.0E7B.CF9C")
parser.add_argument('--cisco_dfs', help='--cisco_dfs <True/False>',default=False)
parser.add_argument('--cisco_channel', help='--cisco_channel <channel>',default=None)
parser.add_argument('--cisco_chan_width', help='--cisco_chan_width <20 40 80 160>',default="20",choices=["20","40","80","160"])
parser.add_argument('--cisco_band', help='--cisco_band <a | b | abgn>',default="a",choices=["a", "b", "abgn"])
parser.add_argument('--cisco_series', help='--cisco_series <9800 | 3504>',default="3504",choices=["9800","3504"])
parser.add_argument('--cisco_scheme', help='--cisco_scheme (serial|telnet|ssh): connect via serial, ssh or telnet',default="ssh",choices=["serial","telnet","ssh"])
parser.add_argument('--cisco_wlan', help='--cisco_wlan <wlan name> default: NA, NA means no change',default="NA")
parser.add_argument('--cisco_wlanID', help='--cisco_wlanID <wlanID> default: NA , NA means not change',default="NA")
parser.add_argument('--cisco_tx_power', help='--cisco_tx_power <1 | 2 | 3 | 4 | 5 | 6 | 7 | 8> 1 is highest power default NA NA means no change',default="NA"
,choices=["1","2","3","4","5","6","7","8","NA"])
parser.add_argument('--tty', help='--tty \"/dev/ttyUSB2\" the serial interface to the AP')
parser.add_argument('--baud', help='--baud \"9600\" baud rate for the serial interface',default="9600")
parser.add_argument('--amount_ports_to_reset', help='--amount_ports_to_reset \"<min amount ports> <max amount ports>\" ', default=None)
@@ -1196,7 +1013,7 @@ python3 test_l3_longevity.py --cisco_ctlr 192.168.100.112 --cisco_dfs True --mgr
influx_add_parser_args(parser)
parser.add_argument("--cap_ctl_out", help="--cap_ctl_out, switch the cisco controller output will be captured", action='store_true')
parser.add_argument("--cap_ctl_out", help="--cap_ctl_out, switch the controller output will be captured", action='store_true')
parser.add_argument("--wait", help="--wait <time> , time to wait at the end of the test", default='0')
parser.add_argument("--show_least_most_csv", help="Should we show the least/most csv column data in reports?", action='store_true', default=False)