mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-03 12:18:00 +00:00
Added exception handling and station cleanup
This commit is contained in:
155
connectTest.py
155
connectTest.py
@@ -13,7 +13,7 @@ from LANforge import LFRequest
|
||||
from LANforge import LFUtils
|
||||
import create_genlink as genl
|
||||
|
||||
|
||||
debugOn = True
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
@@ -39,15 +39,15 @@ def jsonReq(mgrURL, reqURL, data, debug=False):
|
||||
lf_r.addPostData(data)
|
||||
|
||||
if debug:
|
||||
json_response = lf_r.jsonPost(True)
|
||||
json_response = lf_r.jsonPost(debug)
|
||||
LFUtils.debug_printer.pprint(json_response)
|
||||
sys.exit(1)
|
||||
else:
|
||||
lf_r.jsonPost()
|
||||
lf_r.jsonPost(debug)
|
||||
|
||||
def getJsonInfo(mgrURL, reqURL, name):
|
||||
lf_r = LFRequest.LFRequest(mgrURL + reqURL)
|
||||
json_response = lf_r.getAsJson()
|
||||
json_response = lf_r.getAsJson(debugOn)
|
||||
return json_response
|
||||
#print(name)
|
||||
#j_printer = pprint.PrettyPrinter(indent=2)
|
||||
@@ -79,13 +79,12 @@ print("Creating endpoints and cross connects")
|
||||
|
||||
url = "cli-json/add_sta"
|
||||
data = {
|
||||
data = {
|
||||
"shelf":1,
|
||||
"resource":1,
|
||||
"radio":"wiphy0",
|
||||
"sta_name":"sta00000",
|
||||
"ssid":"jedway-wpa2-x64-3-1",
|
||||
"key"::"jedway-wpa2-x64-3-1",
|
||||
"key":"jedway-wpa2-x64-3-1",
|
||||
"mode":1,
|
||||
"mac":"xx:xx:xx:xx:*:xx",
|
||||
"flags":0x400
|
||||
@@ -255,42 +254,62 @@ data = {
|
||||
jsonReq(mgrURL,url,data)
|
||||
time.sleep(.5)
|
||||
|
||||
|
||||
cxNames = ["testTCP","testUDP", "CX_l4Test", "CX_fioTest", "CX_genTest1", "CX_wlTest1"]
|
||||
|
||||
#get data before running traffic
|
||||
testTCPA = getJsonInfo(mgrURL, "endp/testTCP-A?fields=tx+bytes,rx+bytes", "testTCP-A")
|
||||
testTCPATX = testTCPA['endpoint']['tx bytes']
|
||||
testTCPARX = testTCPA['endpoint']['rx bytes']
|
||||
try:
|
||||
testTCPA = getJsonInfo(mgrURL, "endp/testTCP-A?fields=tx+bytes,rx+bytes", "testTCP-A")
|
||||
testTCPATX = testTCPA['endpoint']['tx bytes']
|
||||
testTCPARX = testTCPA['endpoint']['rx bytes']
|
||||
|
||||
testTCPB = getJsonInfo(mgrURL, "endp/testTCP-B?fields=tx+bytes,rx+bytes", "testTCP-B")
|
||||
testTCPBTX = testTCPB['endpoint']['tx bytes']
|
||||
testTCPBRX = testTCPB['endpoint']['rx bytes']
|
||||
testTCPB = getJsonInfo(mgrURL, "endp/testTCP-B?fields=tx+bytes,rx+bytes", "testTCP-B")
|
||||
testTCPBTX = testTCPB['endpoint']['tx bytes']
|
||||
testTCPBRX = testTCPB['endpoint']['rx bytes']
|
||||
|
||||
testUDPA = getJsonInfo(mgrURL, "endp/testUDP-A?fields=tx+bytes,rx+bytes", "testUDP-A")
|
||||
testUDPATX = testUDPA['endpoint']['tx bytes']
|
||||
testUDPARX = testUDPA['endpoint']['rx bytes']
|
||||
testUDPA = getJsonInfo(mgrURL, "endp/testUDP-A?fields=tx+bytes,rx+bytes", "testUDP-A")
|
||||
testUDPATX = testUDPA['endpoint']['tx bytes']
|
||||
testUDPARX = testUDPA['endpoint']['rx bytes']
|
||||
|
||||
testUDPB = getJsonInfo(mgrURL, "endp/testUDP-B?fields=tx+bytes,rx+bytes", "testUDP-B")
|
||||
testUDPBTX = testUDPB['endpoint']['tx bytes']
|
||||
testUDPBRX = testUDPB['endpoint']['rx bytes']
|
||||
testUDPB = getJsonInfo(mgrURL, "endp/testUDP-B?fields=tx+bytes,rx+bytes", "testUDP-B")
|
||||
testUDPBTX = testUDPB['endpoint']['tx bytes']
|
||||
testUDPBRX = testUDPB['endpoint']['rx bytes']
|
||||
|
||||
l4Test = getJsonInfo(mgrURL, "layer4/l4Test?fields=bytes-rd", "l4Test")
|
||||
l4TestBR = l4Test['endpoint']['bytes-rd']
|
||||
l4Test = getJsonInfo(mgrURL, "layer4/l4Test?fields=bytes-rd", "l4Test")
|
||||
l4TestBR = l4Test['endpoint']['bytes-rd']
|
||||
|
||||
genTest1 = getJsonInfo(mgrURL, "generic/genTest1?fields=last+results", "genTest1")
|
||||
genTest1LR = genTest1['endpoint']['last results']
|
||||
genTest1 = getJsonInfo(mgrURL, "generic/genTest1?fields=last+results", "genTest1")
|
||||
genTest1LR = genTest1['endpoint']['last results']
|
||||
|
||||
wlTest1 = getJsonInfo(mgrURL,"wl_ep/wlTest1","wlTest1")
|
||||
wlTest1TXB = wlTest1['endpoint']['tx bytes']
|
||||
wlTest1RXP = wlTest1['endpoint']['rx pkts']
|
||||
wlTest2 = getJsonInfo(mgrURL,"wl_ep/wlTest2","wlTest2")
|
||||
wlTest2TXB = wlTest2['endpoint']['tx bytes']
|
||||
wlTest2RXP = wlTest2['endpoint']['rx pkts']
|
||||
wlTest1 = getJsonInfo(mgrURL,"wl_ep/wlTest1","wlTest1")
|
||||
wlTest1TXB = wlTest1['endpoint']['tx bytes']
|
||||
wlTest1RXP = wlTest1['endpoint']['rx pkts']
|
||||
wlTest2 = getJsonInfo(mgrURL,"wl_ep/wlTest2","wlTest2")
|
||||
wlTest2TXB = wlTest2['endpoint']['tx bytes']
|
||||
wlTest2RXP = wlTest2['endpoint']['rx pkts']
|
||||
except Exception as e:
|
||||
print("Something went wrong")
|
||||
print(e)
|
||||
print("Cleaning up...")
|
||||
reqURL = "cli-json/rm_vlan"
|
||||
data = {
|
||||
"shelf":1,
|
||||
"resource":1,
|
||||
"port":"sta00000"
|
||||
}
|
||||
|
||||
jsonReq(mgrURL, reqURL, data)
|
||||
|
||||
endpNames = ["testTCP-A", "testTCP-B",
|
||||
"testUDP-A", "testUDP-B",
|
||||
"l4Test", "fioTest",
|
||||
"genTest1", "genTest2",
|
||||
"wlTest1","wlTest2"]
|
||||
removeCX(mgrURL, cxNames)
|
||||
removeEndps(mgrURL, endpNames)
|
||||
sys.exit(1)
|
||||
|
||||
#start cx traffic
|
||||
print("\nStarting CX Traffic")
|
||||
cxNames = ["testTCP","testUDP", "CX_l4Test", "CX_fioTest", "CX_genTest1", "CX_wlTest1"]
|
||||
for name in range(len(cxNames)):
|
||||
cmd = (f"./lf_firemod.pl --mgr localhost --quiet yes --action do_cmd --cmd \"set_cx_state default_tm {cxNames[name]} RUNNING\" >> /tmp/connectTest.log")
|
||||
execWrap(cmd)
|
||||
@@ -349,34 +368,56 @@ time.sleep(15)
|
||||
|
||||
#get data for endpoints JSON
|
||||
print("Collecting Data")
|
||||
ptestTCPA = getJsonInfo(mgrURL, "endp/testTCP-A?fields=tx+bytes,rx+bytes", "testTCP-A")
|
||||
ptestTCPATX = ptestTCPA['endpoint']['tx bytes']
|
||||
ptestTCPARX = ptestTCPA['endpoint']['rx bytes']
|
||||
try:
|
||||
ptestTCPA = getJsonInfo(mgrURL, "endp/testTCP-A?fields=tx+bytes,rx+bytes", "testTCP-A")
|
||||
ptestTCPATX = ptestTCPA['endpoint']['tx bytes']
|
||||
ptestTCPARX = ptestTCPA['endpoint']['rx bytes']
|
||||
|
||||
ptestTCPB = getJsonInfo(mgrURL, "endp/testTCP-B?fields=tx+bytes,rx+bytes", "testTCP-B")
|
||||
ptestTCPBTX = ptestTCPB['endpoint']['tx bytes']
|
||||
ptestTCPBRX = ptestTCPB['endpoint']['rx bytes']
|
||||
ptestTCPB = getJsonInfo(mgrURL, "endp/testTCP-B?fields=tx+bytes,rx+bytes", "testTCP-B")
|
||||
ptestTCPBTX = ptestTCPB['endpoint']['tx bytes']
|
||||
ptestTCPBRX = ptestTCPB['endpoint']['rx bytes']
|
||||
|
||||
ptestUDPA = getJsonInfo(mgrURL, "endp/testUDP-A?fields=tx+bytes,rx+bytes", "testUDP-A")
|
||||
ptestUDPATX = ptestUDPA['endpoint']['tx bytes']
|
||||
ptestUDPARX = ptestUDPA['endpoint']['rx bytes']
|
||||
ptestUDPA = getJsonInfo(mgrURL, "endp/testUDP-A?fields=tx+bytes,rx+bytes", "testUDP-A")
|
||||
ptestUDPATX = ptestUDPA['endpoint']['tx bytes']
|
||||
ptestUDPARX = ptestUDPA['endpoint']['rx bytes']
|
||||
|
||||
ptestUDPB = getJsonInfo(mgrURL, "endp/testUDP-B?fields=tx+bytes,rx+bytes", "testUDP-B")
|
||||
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes']
|
||||
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes']
|
||||
ptestUDPB = getJsonInfo(mgrURL, "endp/testUDP-B?fields=tx+bytes,rx+bytes", "testUDP-B")
|
||||
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes']
|
||||
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes']
|
||||
|
||||
pl4Test = getJsonInfo(mgrURL, "layer4/l4Test?fields=bytes-rd", "l4Test")
|
||||
pl4TestBR = pl4Test['endpoint']['bytes-rd']
|
||||
pl4Test = getJsonInfo(mgrURL, "layer4/l4Test?fields=bytes-rd", "l4Test")
|
||||
pl4TestBR = pl4Test['endpoint']['bytes-rd']
|
||||
|
||||
pgenTest1 = getJsonInfo(mgrURL, "generic/genTest1?fields=last+results", "genTest1")
|
||||
pgenTest1LR = pgenTest1['endpoint']['last results']
|
||||
pgenTest1 = getJsonInfo(mgrURL, "generic/genTest1?fields=last+results", "genTest1")
|
||||
pgenTest1LR = pgenTest1['endpoint']['last results']
|
||||
|
||||
pwlTest1 = getJsonInfo(mgrURL,"wl_ep/wlTest1","wlTest1")
|
||||
pwlTest1TXB = pwlTest1['endpoint']['tx bytes']
|
||||
pwlTest1RXP = pwlTest1['endpoint']['rx pkts']
|
||||
pwlTest2 = getJsonInfo(mgrURL,"wl_ep/wlTest2","wlTest2")
|
||||
pwlTest2TXB = pwlTest2['endpoint']['tx bytes']
|
||||
pwlTest2RXP = pwlTest2['endpoint']['rx pkts']
|
||||
pwlTest1 = getJsonInfo(mgrURL,"wl_ep/wlTest1","wlTest1")
|
||||
pwlTest1TXB = pwlTest1['endpoint']['tx bytes']
|
||||
pwlTest1RXP = pwlTest1['endpoint']['rx pkts']
|
||||
pwlTest2 = getJsonInfo(mgrURL,"wl_ep/wlTest2","wlTest2")
|
||||
pwlTest2TXB = pwlTest2['endpoint']['tx bytes']
|
||||
pwlTest2RXP = pwlTest2['endpoint']['rx pkts']
|
||||
except Exception as e:
|
||||
print("Something went wrong")
|
||||
print(e)
|
||||
print("Cleaning up...")
|
||||
reqURL = "cli-json/rm_vlan"
|
||||
data = {
|
||||
"shelf":1,
|
||||
"resource":1,
|
||||
"port":"sta00000"
|
||||
}
|
||||
|
||||
jsonReq(mgrURL, reqURL, data)
|
||||
|
||||
endpNames = ["testTCP-A", "testTCP-B",
|
||||
"testUDP-A", "testUDP-B",
|
||||
"l4Test", "fioTest",
|
||||
"genTest1", "genTest2",
|
||||
"wlTest1","wlTest2"]
|
||||
removeCX(mgrURL, cxNames)
|
||||
removeEndps(mgrURL, endpNames)
|
||||
sys.exit(1)
|
||||
|
||||
#print("Sleeping for 5 seconds")
|
||||
time.sleep(5)
|
||||
@@ -421,6 +462,16 @@ print("\n")
|
||||
|
||||
#remove all endpoints and cxs
|
||||
print("Cleaning up...")
|
||||
|
||||
reqURL = "cli-json/rm_vlan"
|
||||
data = {
|
||||
"shelf":1,
|
||||
"resource":1,
|
||||
"port":"sta00000"
|
||||
}
|
||||
|
||||
jsonReq(mgrURL, reqURL, data)
|
||||
|
||||
endpNames = ["testTCP-A", "testTCP-B",
|
||||
"testUDP-A", "testUDP-B",
|
||||
"l4Test", "fioTest",
|
||||
|
||||
Reference in New Issue
Block a user