lf_check.py : placed in sandbox for checking cusomter site

Signed-off-by: smileyrekiere <smileyrekiere@gmail.com>
This commit is contained in:
smileyrekiere
2021-04-30 16:53:35 -06:00
committed by Chuck SmileyRekiere
parent 5d053c8194
commit 1b60b9f605

View File

@@ -0,0 +1,167 @@
#!/usr/bin/python3
'''
NAME:
lf_check.py
PURPOSE:
Script is used to run a series of tests to verifiy realm changes.
EXAMPLE:
NOTES:
'''
import sys
if sys.version_info[0] != 3:
print("This script requires Python3")
exit()
import logging
import time
from time import sleep
import argparse
import pexpect
import serial
import json
from json import load
from pprint import *
# see https://stackoverflow.com/a/13306095/11014343
class FileAdapter(object):
def __init__(self, logger):
self.logger = logger
def write(self, data):
# NOTE: data can be a partial line, multiple lines
data = data.strip() # ignore leading/trailing whitespace
if data: # non-blank
self.logger.info(data)
def flush(self):
pass # leave it to logging to flush properly
class lf_check():
# Functions in this section are/can be overridden by descendants
def readConfigContents(self, config_file):
success = True
if 'TEST_CONFIG' in config_file.sections():
section = config_file['TEST_CONFIG']
try:
lf_globals.test_list = json.loads(section.get(lf_testlist,lf_gloabs.test_list))
print("test list retrieved")
print("test list: {}".format(lf_globals.test_list))
except:
print("no test list")
def read_ap_stats(self):
# 5ghz: wl -i wl1 bs_data 2.4ghz# wl -i wl0 bs_data
stats_5ghz = "wl -i wl1 bs_data"
stats_24ghz = "w1 -i wl0 bs_data"
ap_data = ""
ap_stats = []
command = stats_5ghz
'''if band == "5ghz":
command = stats_5ghz
else:
command = stats_24ghz'''
'''try:
# configure the serial interface
ser = serial.Serial(self.args.tty, int(self.args.baud), timeout=5)
egg = SerialSpawn(ser)
egg.sendline(str(command))
egg.expect([pexpect.TIMEOUT], timeout=2) # do not detete line, waits for output
ap_data = egg.before.decode('utf-8','ignore')
except:
print("WARNING unable to read AP")
'''
'''ap_stats = "\
\
" '''
ap_stats.append("root@Docsis-Gateway:~# wl -i wl1 bs_data")
ap_stats.append("Station Address PHY Mbps Data Mbps Air Use Data Use Retries bw mcs Nss ofdma mu-mimo")
ap_stats.append("50:E0:85:87:AA:19 1016.6 48.9 6.5% 24.4% 16.6% 80 9.7 2 0.0% 0.0%")
ap_stats.append("50:E0:85:84:7A:E7 880.9 52.2 7.7% 26.1% 20.0% 80 8.5 2 0.0% 0.0%")
ap_stats.append("50:E0:85:89:5D:00 840.0 47.6 6.4% 23.8% 2.3% 80 8.0 2 0.0% 0.0%")
ap_stats.append("50:E0:85:87:5B:F4 960.7 51.5 5.9% 25.7% 0.0% 80 9 2 0.0% 0.0%")
ap_stats.append("(overall) - 200.2 26.5% - -")
# TODO: Read real stats, comment out the example above.
return ap_stats
'''root@Docsis-Gateway:~# wl -i wl1 bs_data
Station Address PHY Mbps Data Mbps Air Use Data Use Retries bw mcs Nss ofdma mu-mimo
50:E0:85:87:AA:19 1064.5 52.8 6.0% 25.0% 1.5% 80 10.0 2 0.0% 0.0%
50:E0:85:84:7A:E7 927.1 53.6 7.0% 25.4% 5.7% 80 8.8 2 0.0% 0.0%
50:E0:85:89:5D:00 857.5 51.8 6.8% 24.6% 0.8% 80 8 2 0.0% 0.0%
50:E0:85:87:5B:F4 1071.7 52.8 6.0% 25.0% 1.3% 80 10 2 0.0% 0.0%
(overall) - 210.9 25.8% - -'''
'''I have some un-tested code that is starting point for querying Comcast AP in the l3_longevity script.
When still needs doing: query the data from the AP, and test that my parsing and CSV logic is working,
also add cmd-line arg to enable this or not. Would you have time to work on this and coordinate test time on
customer's system to test against their AP? Access to AP is probably ssh, possibly serial or telnet.
Firas @ Comcast can help clarify that.'''
def parse_ap_stats(self):
# Query AP for its stats. Result for /ax bcm APs looks something like this:
ap_stats = self.read_ap_stats()
ap_stats_rows = [] # Array of Arrays
for line in ap_stats:
stats_row = line.split()
ap_stats_rows.append(stats_row)
# - is this needed ?m = re.search((r'(\S+)\s+(\S+)\s+(Data Mbps)\s+(Air Use)+'ap_stats[0]
# Query all of our ports
#port_eids = self.gather_port_eids()
#for eid_name in port_eids:
# eid = self.name_to_eid(eid_name)
# url = "/port/%s/%s/%s"%(eid[0], eid[1], eid[2])
# response = self.json_get(url)
# if (response is None) or ("interface" not in response):
# print("query-port: %s: incomplete response:"%(url))
# pprint(response)
# else:
# note changed the indent
#p = response['interface']
mac = ["50:E0:85:87:AA:19","50:E0:85:84:7A:E7","50:E0:85:89:5D:00","50:E0:85:87:5B:F4"]
#mac = "50:E0:85:87:AA:19"
#mac = "50:E0:85:84:7A:E7"
#mac = "50:E0:85:89:5D:00"
#mac = "50:E0:85:87:5B:F4"
ap_row = []
i = 0
for row in ap_stats_rows:
if row[0] in mac:
#if row[0].lower == mac.lower():
ap_row = row
print("ap_row: {}".format(ap_row))
# p is map of key/values for this port
#print("ap_row : {}".format(ap_row))
#pprint(ap_row)
# Find latency, jitter for connections using this port.
#latency, jitter, tput = self.get_endp_stats_for_port(p["port"], endps)
# ap_stats_col_titles = ['Station Address','PHY Mbps','Data Mbps','Air Use','Data Use','Retries','bw','mcs','Nss','ofdma','mu-mimo'
# self.write_port_csv(len(temp_stations_list), ul, dl, ul_pdu_str, dl_pdu_str, atten_val, eid_name, p,
# latency, jitter, tput, ap_row, ap_stats_col_titles
def main():
check = lf_check()
check.parse_ap_stats()
if __name__ == '__main__':
main()