Refactored code to fit with PEP8 specifications and work with LFcliBase.

This commit is contained in:
Logan Lipke
2020-06-04 11:23:08 -07:00
parent a5a8dc370d
commit 29c18611da

View File

@@ -2,428 +2,417 @@
import os import os
import sys import sys
import time import time
sys.path.append('py-json') sys.path.append('py-json')
import json
import pprint
import datetime import datetime
from LANforge import LFRequest from LANforge import LFRequest
from LANforge import LFUtils from LANforge import LFUtils
import argparse import argparse
import re import re
import math
import string
import emailHelper import emailHelper
from LANforge.lfcli_base import LFCliBase
debugOn = False
sender = "lanforge@candelatech.com" class StressTester(LFCliBase):
def __init__(self, lfhost, lfport, _sender="lanforge@candelatech.com", _debug_on=False):
def jsonReq(mgrURL, reqURL, data, debug=False): self.sender = _sender
lf_r = LFRequest.LFRequest(mgrURL + reqURL) super().__init__(lfhost, lfport, False)
lf_r.addPostData(data)
def run(self):
if debug: parser = argparse.ArgumentParser(description="Create max stations for each radio")
json_response = lf_r.jsonPost(debug) parser.add_argument("--test_duration", type=str,
LFUtils.debug_printer.pprint(json_response) help="Full duration for the test to run. Should be specified by a number followed by a "
sys.exit(1) "character. d for days, h for hours, m for minutes, s for seconds")
else: parser.add_argument("--test_end_time", type=str,
lf_r.jsonPost(debug) help="Specify a time and date to end the test. Should be formatted as "
"year-month-date_hour:minute. Date should be specified in numbers and time "
"should be 24 "
def execWrap(cmd): "hour format. Ex: 2020-5-14_14:30")
if os.system(cmd) != 0: parser.add_argument("--report_interval", type=str,
print("\nError with " + cmd + ",bye\n") help="How often a report is made. Should be specified by a "
exit(1) "number followed by a character. d for days, h for hours, "
"m for minutes, s for seconds")
def getJsonInfo(mgrURL, reqURL): parser.add_argument("--output_dir", type=str, help="Directory to ouptut to")
lf_r = LFRequest.LFRequest(mgrURL + reqURL) parser.add_argument("--output_prefix", type=str,
json_response = lf_r.getAsJson(debugOn) help="Name of the file. Timestamp and .html will be appended to the end")
return json_response parser.add_argument("--email", type=str, help="Email address of recipient")
args = None
parser = argparse.ArgumentParser(description="Create max stations for each radio") try:
parser.add_argument("--test_duration", type=str, help="Full duration for the test to run. Should be specified by a number followed by a character. d for days, h for hours, m for minutes, s for seconds") args = parser.parse_args()
parser.add_argument("--test_end_time", type=str, help="Specify a time and date to end the test. Should be formatted as year-month-date_hour:minute. Date should be specified in numbers and time should be 24 hour format. Ex: 2020-5-14_14:30") if args.test_duration is not None:
parser.add_argument("--report_interval", type=str, help="How often a report is made. Should be specified by a number followed by a character. d for days, h for hours, m for minutes, s for seconds") pattern = re.compile("^(\d+)([dhms]$)")
parser.add_argument("--output_dir", type=str, help="Directory to ouptut to") td = pattern.match(args.test_duration)
parser.add_argument("--output_prefix", type=str, help="Name of the file. Timestamp and .html will be appended to the end") if td is not None:
parser.add_argument("--email", type=str, help="Email address of recipient") dur_time = int(td.group(1))
dur_measure = str(td.group(2))
now = datetime.datetime.now()
args = None if dur_measure == "d":
try: duration_time = datetime.timedelta(days=dur_time)
args = parser.parse_args() elif dur_measure == "h":
if (args.test_duration is not None): duration_time = datetime.timedelta(hours=dur_time)
pattern = re.compile("^(\d+)([dhms]$)") elif dur_measure == "m":
td = pattern.match(args.test_duration) duration_time = datetime.timedelta(minutes=dur_time)
if td != None: else:
durTime = int(td.group(1)) duration_time = datetime.timedelta(seconds=dur_time)
durMeasure = str(td.group(2)) else:
now = datetime.datetime.now() parser.print_help()
if durMeasure == "d": parser.exit()
durationTime = datetime.timedelta(days = durTime)
elif durMeasure == "h": elif args.test_end_time is not None:
durationTime = datetime.timedelta(hours = durTime) now = datetime.datetime.now()
elif durMeasure == "m": try:
durationTime = datetime.timedelta(minutes = durTime) end_time = datetime.datetime.strptime(args.test_end_time, "%Y-%m-%d_%H:%M")
else: if end_time < now:
durationTime = datetime.timedelta(seconds = durTime) parser.print_help()
else: raise ValueError
parser.print_help() else:
parser.exit() cur_time = datetime.datetime.now()
duration_time = end_time - cur_time
elif (args.test_end_time is not None):
now = datetime.datetime.now() except ValueError as exception:
try: print(exception)
endTime = datetime.datetime.strptime(args.test_end_time,"%Y-%m-%d_%H:%M") parser.print_help()
if endTime < now: parser.exit()
raise ValueError
parser.print_help() else:
parser.exit() parser.print_help()
else: parser.exit()
curTime = datetime.datetime.now()
durationTime = endTime - curTime if args.report_interval is not None:
pattern = re.compile("^(\d+)([dhms])$")
except ValueError as exception: ri = pattern.match(args.report_interval)
print(exception) if ri is not None:
parser.print_help() int_time = int(ri.group(1))
parser.exit() int_measure = str(ri.group(2))
else: if int_measure == "d":
parser.print_help() interval_time = datetime.timedelta(days=int_time)
parser.exit() elif int_measure == "h":
interval_time = datetime.timedelta(hours=int_time)
elif int_measure == "m":
if (args.report_interval is not None): interval_time = datetime.timedelta(minutes=int_time)
pattern = re.compile("^(\d+)([dhms])$") else:
ri = pattern.match(args.report_interval) interval_time = datetime.timedelta(seconds=int_time)
if ri != None: else:
intTime = int(ri.group(1)) parser.print_help()
intMeasure = str(ri.group(2)) parser.exit()
else:
if intMeasure == "d": parser.print_help()
intervalTime = datetime.timedelta(days = intTime) parser.exit()
elif intMeasure == "h":
intervalTime = datetime.timedelta(hours = intTime) if args.output_dir is not None:
elif intMeasure == "m": output_dir = args.output_dir
intervalTime = datetime.timedelta(minutes = intTime) else:
else: parser.print_help()
intervalTime = datetime.timedelta(seconds = intTime) parser.exit()
else:
parser.print_help() if args.output_prefix is not None:
parser.exit() output_prefix = args.output_prefix
else: else:
parser.print_help() parser.print_help()
parser.exit() parser.exit()
if args.email is not None:
if (args.output_dir != None): recipient = args.email
outputDir = args.output_dir else:
else: parser.print_help()
parser.print_help() parser.exit()
parser.exit()
except Exception as e:
if (args.output_prefix != None): parser.print_help()
outputPrefix = args.output_prefix exit(2)
else:
parser.print_help() super().check_connect()
parser.exit()
if (args.email != None): stations = []
recipient = args.email radios = {"wiphy0": 200, # max 200
else: "wiphy1": 200, # max 200
parser.print_help() "wiphy2": 64, # max 64
parser.exit() "wiphy3": 200} # max 200
# radioName:numStations
radio_ssid_map = {"wiphy0": "jedway-wpa2-x2048-4-1",
except Exception as e: "wiphy1": "jedway-wpa2-x2048-5-3",
logging.exception(e) "wiphy2": "jedway-wpa2-x2048-5-1",
usage() "wiphy3": "jedway-wpa2-x2048-4-4"}
exit(2)
ssid_passphrase_map = {"jedway-wpa2-x2048-4-1": "jedway-wpa2-x2048-4-1",
"jedway-wpa2-x2048-5-3": "jedway-wpa2-x2048-5-3",
stations = [] "jedway-wpa2-x2048-5-1": "jedway-wpa2-x2048-5-1",
radios = {"wiphy0":200, #max 200 "jedway-wpa2-x2048-4-4": "jedway-wpa2-x2048-4-4"}
"wiphy1":200, #max 200
"wiphy2":64, #max 64 padding_num = 1000 # uses all but the first number to create names for stations
"wiphy3":200} #max 200
#radioName:numStations # clean up old stations
radio_ssid_map = {"wiphy0":"jedway-wpa2-x2048-4-1", print("Cleaning up old Stations")
"wiphy1":"jedway-wpa2-x2048-5-3",
"wiphy2":"jedway-wpa2-x2048-5-1", for radio, numStations in radios.items():
"wiphy3":"jedway-wpa2-x2048-4-4"} for i in range(0, numStations):
sta_name = "sta" + radio[-1:] + str(padding_num + i)[1:]
ssid_passphrase_map = {"jedway-wpa2-x2048-4-1":"jedway-wpa2-x2048-4-1", if super().json_get("port/1/1/" + sta_name) is not None:
"jedway-wpa2-x2048-5-3":"jedway-wpa2-x2048-5-3", req_url = "cli-json/rm_vlan"
"jedway-wpa2-x2048-5-1":"jedway-wpa2-x2048-5-1",
"jedway-wpa2-x2048-4-4":"jedway-wpa2-x2048-4-4"} data = {
"shelf": 1,
paddingNum = 1000 #uses all but the first number to create names for stations "resource": 1,
"port": sta_name
mgrURL = "http://localhost:8080/" }
#clean up old stations super().json_post(req_url, data)
print("Cleaning up old Stations")
req_url = "cli-json/rm_cx"
for radio, numStations in radios.items():
for i in range(0,numStations): data = {
staName = "sta" + radio[-1:] + str(paddingNum + i)[1:] "test_mgr": "default_tm",
if getJsonInfo(mgrURL, "port/1/1/"+staName) != None: "cx_name": sta_name
reqURL = "cli-json/rm_vlan" }
super().json_post(req_url, data)
data = {
"shelf":1, req_url = "cli-json/rm_endp"
"resource":1,
"port":staName data = {
} "endp_name": sta_name + "-A"
}
jsonReq(mgrURL, reqURL, data) super().json_post(req_url, data)
reqURL = "cli-json/rm_cx" req_url = "cli-json/rm_endp"
data = {
data = { "endp_name": sta_name + "-B"
"test_mgr":"default_tm", }
"cx_name":staName super().json_post(req_url, data)
}
jsonReq(mgrURL, reqURL, data) # create new stations
print("Creating Stations")
reqURL = "cli-json/rm_endp"
req_url = "cli-json/add_sta"
data = { for radio, numStations in radios.items():
"endp_name":staName + "-A" for i in range(0, numStations):
} sta_name = "sta" + radio[-1:] + str(padding_num + i)[1:]
stations.append(sta_name)
jsonReq(mgrURL, reqURL, data) data = {
"shelf": 1,
reqURL = "cli-json/rm_endp" "resource": 1,
data = { "radio": radio,
"endp_name":staName + "-B" "sta_name": sta_name,
} "ssid": radio_ssid_map[radio],
"key": ssid_passphrase_map[radio_ssid_map[radio]],
#create new stations "mode": 1,
print("Creating Stations") "mac": "xx:xx:xx:xx:*:xx",
"flags": 0x400
reqURL = "cli-json/add_sta" }
for radio, numStations in radios.items(): # print("Creating station {}".format(sta_name))
for i in range(0,numStations): super().json_post(req_url, data)
staName = "sta" + radio[-1:] + str(paddingNum + i)[1:]
stations.append(staName) time.sleep(0.5)
data = {
"shelf":1, # LFUtils.portDhcpUpRequest(1, sta_name)
"resource":1,
"radio":radio, time.sleep(10)
"sta_name":staName,
"ssid":radio_ssid_map[radio], # check eth1 for ip
"key":ssid_passphrase_map[radio_ssid_map[radio]], eth1_ip = super().json_get("port/1/1/eth1")
"mode":1, if eth1_ip['interface']['ip'] == "0.0.0.0":
"mac":"xx:xx:xx:xx:*:xx", print("Switching eth1 to dhcp")
"flags":0x400 LFUtils.portDownRequest(1, "eth1")
} time.sleep(1)
#print("Creating station {}".format(staName)) req_url = "cli-json/set_port"
jsonReq(mgrURL, reqURL, data) data = {
"shelf": 1,
time.sleep(0.5) "resource": 1,
"port": "eth1",
#LFUtils.portDhcpUpRequest(1, staName) "current_flags": 0x80000000,
"interest": 0x4002
}
time.sleep(10)
super().json_post(req_url, data)
#check eth1 for ip # LFUtils.portDhcpUpRequest(1,"eth1")
eth1IP = getJsonInfo(mgrURL, "port/1/1/eth1") time.sleep(5)
if eth1IP['interface']['ip'] == "0.0.0.0": LFUtils.portUpRequest(1, "eth1")
print("Switching eth1 to dhcp")
LFUtils.portDownRequest(1,"eth1") time.sleep(10)
time.sleep(1)
reqURL = "cli-json/set_port" # create cross connects
data = { print("Creating cross connects")
"shelf":1, for sta_name in stations:
"resource":1, cmd = (
"port":"eth1", "./lf_firemod.pl --action create_cx --cx_name " + sta_name + " --use_ports eth1," + sta_name + " --use_speeds 2600,2600 --endp_type udp > sst.log")
"current_flags":0x80000000, LFUtils.execWrap(cmd)
"interest":0x4002
} # set stations to dchp up
print("Turning on DHCP for stations")
jsonReq(mgrURL,reqURL,data) for sta_name in stations:
#LFUtils.portDhcpUpRequest(1,"eth1") # print("Setting {} flags".format(sta_name))
time.sleep(5) req_url = "cli-json/set_port"
LFUtils.portUpRequest(1,"eth1") data = {
"shelf": 1,
"resource": 1,
time.sleep(10) "port": sta_name,
"current_flags": 0x80000000,
#create cross connects "interest": 0x4002
print("Creating cross connects") }
for staName in stations:
cmd = ("./lf_firemod.pl --action create_cx --cx_name " + staName + " --use_ports eth1," + staName + " --use_speeds 2600,2600 --endp_type udp > sst.log") super().json_post(req_url, data)
execWrap(cmd) # LFUtils.portDhcpUpRequest(1,sta_name)
#set stations to dchp up time.sleep(15)
print("Turning on DHCP for stations")
for staName in stations: # start traffic through cxs
#print("Setting {} flags".format(staName)) print("Starting CX Traffic")
reqURL = "cli-json/set_port" for name in stations:
data = { cmd = (
"shelf":1, "./lf_firemod.pl --mgr localhost --quiet 0 --action do_cmd --cmd \"set_cx_state default_tm " + name + " RUNNING\" >> sst.log")
"resource":1, LFUtils.execWrap(cmd)
"port":staName,
"current_flags":0x80000000, # create weblog for monitoring stations
"interest":0x4002 cur_time = datetime.datetime.now().strftime("%Y-%m-%d_%H%M")
} web_log = output_dir + output_prefix + "{}.html".format(cur_time)
jsonReq(mgrURL,reqURL,data) try:
#LFUtils.portDhcpUpRequest(1,staName) web_log_file = open(web_log, "w")
except IOError as err:
time.sleep(15) print(err)
print("Please ensure correct permissions have been assigned in target directory")
#start traffic through cxs sys.exit()
print("Starting CX Traffic")
for name in stations: top = """<html>
cmd = ("./lf_firemod.pl --mgr localhost --quiet 0 --action do_cmd --cmd \"set_cx_state default_tm " + name + " RUNNING\" >> sst.log") <head>
execWrap(cmd) <title>Test report</title>
<style>
body, td, p, div, span { font-size: 8pt; }
#create weblog for monitoring stations h1, h2, h3 { text-align: center; font-family: "Century Gothic",Arial,Helvetica,sans;}
curTime = datetime.datetime.now().strftime("%Y-%m-%d_%H%M") </style>
webLog = outputDir + outputPrefix + "{}.html".format(curTime) </head>
<body>
try: <h1>Long test on %s</h1>
f = open(webLog,"w") <p2>Key</p2>
<p1 style="background-color:rgb(0,255,0);">All stations associated and with ip</p1>
except IOError as err: <p1 style="background-color:rgb(255,200,0);">All stations associated and at least one without ip</p1>
print(err) <p1 style="background-color:rgb(255,150,150);">No stations associated and without ip</p1>
print("Please ensure correct permissions have been assigned in target directory") <table>
sys.exit() """ % datetime.date.today()
web_log_file.write(top)
top = """<html> web_log_file.close()
<head>
<title>Test report</title> web_log_file = open(web_log, "a")
<style> web_log_file.write("<tr>\n")
body, td, p, div, span { font-size: 8pt; }
h1, h2, h3 { text-align: center; font-family: "Century Gothic",Arial,Helvetica,sans;} for name in radios:
</style> web_log_file.write("<th>{}</th>\n".format(name))
</head>
<body> web_log_file.write("</tr>\n")
<h1>Long test on %s</h1>
<p2>Key</p2> cur_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
<p1 style="background-color:rgb(0,255,0);">All stations associated and with ip</p1> subject = "Station Test Begin Report Notification"
<p1 style="background-color:rgb(255,200,0);">All stations associated and at least one without ip</p1> body = "Report begun at {}\n See {}".format(cur_time, web_log)
<p1 style="background-color:rgb(255,150,150);">No stations associated and without ip</p1> email = emailHelper.writeEmail(body)
<table> emailHelper.sendEmail(email, self.sender, recipient, subject)
""" % datetime.date.today()
print("Logging Info to {}".format(web_log))
f.write(top) cur_time = datetime.datetime.now()
f.close() end_time = cur_time + duration_time
f = open(webLog, "a") while cur_time <= end_time:
f.write("<tr>\n") web_log_file.write("<tr>\n")
for radio, numStations in radios.items():
for name in radios: without_ip = 0
f.write("<th>{}</th>\n".format(name)) dissociated = 0
good = 0
f.write("</tr>\n")
for i in range(0, numStations):
sta_name = "sta" + radio[-1:] + str(padding_num + i)[1:]
curTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") sta_status = super().json_get("port/1/1/" + sta_name)
subject = "Station Test Begin Report Notification" # print(sta_name)
body = "Report begun at {}\n See {}".format(curTime, webLog) if sta_status['interface']['ip'] == "0.0.0.0":
email = emailHelper.writeEmail(body) without_ip += 1
emailHelper.sendEmail(email, sender, recipient, subject) if sta_status['interface']['ap'] is None:
dissociated += 1
else:
print("Logging Info to {}".format(webLog)) good += 1
curTime = datetime.datetime.now() if without_ip and not dissociated:
endTime = curTime + durationTime web_log_file.write("<td style=\"background-color:rgb(255,200,0);\">{}/{}</td>\n".format(good,
numStations)) # without IP assigned
while curTime <= (endTime): elif dissociated:
f.write("<tr>\n") web_log_file.write("<td style=\"background-color:rgb(255,150,150);\">{}/{}</td>\n".format(good,
for radio, numStations in radios.items(): numStations)) # dissociated from AP
withoutIP = 0 else:
dissociated = 0 web_log_file.write("<td style=\"background-color:rgb(0,255,0);\">{}/{}</td>\n".format(good,
good = 0 numStations)) # with IP and associated
for i in range(0,numStations): web_log_file.write("<td>{}</td>\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M")))
staName = "sta" + radio[-1:] + str(paddingNum + i)[1:] web_log_file.write("</tr>\n")
staStatus = getJsonInfo(mgrURL, "port/1/1/" + staName)
#print(staName) cur_time = datetime.datetime.now()
if staStatus['interface']['ip'] == "0.0.0.0": int_time = cur_time + interval_time
withoutIP += 1 while cur_time <= int_time:
if staStatus['interface']['ap'] == None: # print(cur_time, int_time)
dissociated += 1 time.sleep(1)
else: cur_time = datetime.datetime.now()
good += 1 # sleep(1)
cur_time = datetime.datetime.now()
if withoutIP and not dissociated:
f.write("<td style=\"background-color:rgb(255,200,0);\">{}/{}</td>\n".format(good,numStations)) #without IP assigned web_log_file.write("</table></body></html>\n")
elif dissociated: web_log_file.close()
f.write("<td style=\"background-color:rgb(255,150,150);\">{}/{}</td>\n".format(good,numStations)) #dissociated from AP
else: cur_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
f.write("<td style=\"background-color:rgb(0,255,0);\">{}/{}</td>\n".format(good,numStations)) #with IP and associated subject = "Station Test End Report Notification"
body = "Report finished at {} see {}".format(cur_time, web_log)
f.write("<td>{}</td>\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M"))) email = emailHelper.writeEmail(body)
f.write("</tr>\n") emailHelper.sendEmail(email, self.sender, recipient, subject)
curTime = datetime.datetime.now() print("Stopping CX Traffic")
intTime = curTime + intervalTime for sta_name in stations:
while curTime <= intTime: cmd = (
#print(curTime, intTime) "./lf_firemod.pl --mgr localhost --quiet 0 --action do_cmd --cmd \"set_cx_state default_tm " + sta_name + " STOPPED\" >> sst.log")
time.sleep(1) LFUtils.execWrap(cmd)
curTime = datetime.datetime.now()
#sleep(1) time.sleep(10)
curTime = datetime.datetime.now()
# remove all created stations and cross connects
f.write("</table></body></html>\n")
f.close() print("Cleaning Up...")
for sta_name in stations:
req_url = "cli-json/rm_vlan"
curTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
subject = "Station Test End Report Notification" data = {
body = "Report finished at {} see {}".format(curTime, webLog) "shelf": 1,
email = emailHelper.writeEmail(body) "resource": 1,
emailHelper.sendEmail(email, sender, recipient, subject) "port": sta_name
}
print("Stopping CX Traffic") super().json_post(req_url, data)
for name in stations:
cmd = ("./lf_firemod.pl --mgr localhost --quiet 0 --action do_cmd --cmd \"set_cx_state default_tm " + name + " STOPPED\" >> sst.log") req_url = "cli-json/rm_cx"
execWrap(cmd)
data = {
time.sleep(10) "test_mgr": "default_tm",
"cx_name": sta_name
#remove all created stations and cross connects }
super().json_post(req_url, data)
print("Cleaning Up...")
for staName in stations: req_url = "cli-json/rm_endp"
reqURL = "cli-json/rm_vlan"
data = {
data = { "endp_name": sta_name + "-A"
"shelf":1, }
"resource":1, super().json_post(req_url, data)
"port":staName
} req_url = "cli-json/rm_endp"
data = {
jsonReq(mgrURL, reqURL, data) "endp_name": sta_name + "-B"
}
reqURL = "cli-json/rm_cx"
super().json_post(req_url, data)
data = {
"test_mgr":"default_tm",
"cx_name":staName def main():
} lfjson_host = "localhost"
jsonReq(mgrURL, reqURL, data) lfjson_port = 8080
test = StressTester(lfjson_host, lfjson_port)
reqURL = "cli-json/rm_endp" test.run()
data = {
"endp_name":staName + "-A" if __name__ == "__main__":
} main()
jsonReq(mgrURL, reqURL, data)
reqURL = "cli-json/rm_endp"
data = {
"endp_name":staName + "-B"
}
jsonReq(mgrURL, reqURL, data)