Merge branch 'master' of github.com:greearb/lanforge-scripts

This commit is contained in:
Ben Greear
2020-06-04 06:12:57 -07:00
11 changed files with 898 additions and 377 deletions

View File

@@ -1,19 +1,16 @@
#!/usr/bin/env python3
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append('py-json')
import traceback
from LANforge import LFUtils
from LANforge.LFUtils import *
from LANforge.lfcli_base import LFCliBase
import create_genlink as genl
debugOn = True
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
from generic_cx import GenericCx
mgrURL = "http://localhost:8080/"
staName = "sta0"
@@ -23,7 +20,7 @@ staNameUri = "port/1/1/" + staName
class ConnectTest(LFCliBase):
def __init__(self, lfhost, lfport):
super().__init__(lfhost, lfport, True)
super().checkConnect()
super().check_connect()
# compare pre-test values to post-test values
@staticmethod
@@ -37,7 +34,7 @@ class ConnectTest(LFCliBase):
def run(self):
print("See home/lanforge/Documents/connectTestLogs/connectTestLatest for specific values on latest test")
eth1IP = super().jsonGet("port/1/1/eth1")
eth1IP = super().json_get("port/1/1/eth1")
if eth1IP['interface']['ip'] == "0.0.0.0":
print("Warning: Eth1 lacks ip address")
exit(1)
@@ -45,7 +42,7 @@ class ConnectTest(LFCliBase):
# Create stations and turn dhcp on
print("Creating station and turning on dhcp")
response = super().jsonGet(staNameUri)
response = super().json_get(staNameUri)
if response is not None:
if response["interface"] is not None:
print("removing old station")
@@ -65,7 +62,7 @@ class ConnectTest(LFCliBase):
"mac": "xx:xx:xx:xx:*:xx",
"flags": (0x400 + 0x20000 + 0x1000000000) # create admin down
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(0.05)
reqURL = "cli-json/set_port"
data = {
@@ -75,16 +72,16 @@ class ConnectTest(LFCliBase):
"current_flags": (0x1 + 0x80000000),
"interest": (0x2 + 0x4000 + 0x800000) # current, dhcp, down,
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
time.sleep(0.5)
super().jsonPost("cli-json/set_port", portUpRequest(1, staName))
super().json_post("cli-json/set_port", portUpRequest(1, staName))
reqURL = "cli-json/nc_show_ports"
data = {"shelf": 1,
"resource": 1,
"port": staName,
"probe_flags": 1}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
time.sleep(0.5)
waitUntilPortsAdminUp(1, mgrURL, [staName])
@@ -92,7 +89,7 @@ class ConnectTest(LFCliBase):
maxTime = 300
ip = "0.0.0.0"
while (ip == "0.0.0.0") and (duration < maxTime):
station_info = super().jsonGet(staNameUri + "?fields=port,ip")
station_info = super().json_get(staNameUri + "?fields=port,ip")
LFUtils.debug_printer.pprint(station_info)
if (station_info is not None) and ("interface" in station_info) and ("ip" in station_info["interface"]):
ip = station_info["interface"]["ip"]
@@ -111,10 +108,10 @@ class ConnectTest(LFCliBase):
print("Creating endpoints and cross connects")
# create cx for tcp and udp
cmd = (
f"./lf_firemod.pl --action create_cx --cx_name testTCP --use_ports {staName},eth1 --use_speeds 360000,150000 --endp_type tcp > /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
f"./lf_firemod.pl --action create_cx --cx_name testTCP --use_ports {staName},eth1 --use_speeds 360000,150000 --endp_type tcp > ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
cmd = (
f"./lf_firemod.pl --action create_cx --cx_name testUDP --use_ports {staName},eth1 --use_speeds 360000,150000 --endp_type udp >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
f"./lf_firemod.pl --action create_cx --cx_name testUDP --use_ports {staName},eth1 --use_speeds 360000,150000 --endp_type udp >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
time.sleep(.05)
@@ -130,7 +127,7 @@ class ConnectTest(LFCliBase):
"url_rate": 600,
"url": "dl http://10.40.0.1/ /dev/null"
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.05)
# create cx for l4_endp
@@ -141,7 +138,7 @@ class ConnectTest(LFCliBase):
"tx_endp": "l4Test",
"rx_endp": "NA"
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.05)
# create fileio endpoint
@@ -154,7 +151,7 @@ class ConnectTest(LFCliBase):
"type": "fe_nfs",
"directory": "/mnt/fe-test"
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.05)
# create fileio cx
@@ -165,10 +162,11 @@ class ConnectTest(LFCliBase):
"tx_endp": "fioTest",
"rx_endp": "NA"
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.05)
# create generic endpoints
genl = GenericCx(lfclient_host=self.lfjson_host, lfclient_port=self.lfjson_port)
genl.createGenEndp("genTest1", 1, 1, staName, "gen_generic")
genl.createGenEndp("genTest2", 1, 1, staName, "gen_generic")
genl.setFlags("genTest1", "ClearPortOnStart", 1)
@@ -185,7 +183,7 @@ class ConnectTest(LFCliBase):
"tx_endp": "genTest1",
"rx_endp": "genTest2"
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.05)
# create redirects for wanlink
@@ -196,7 +194,7 @@ class ConnectTest(LFCliBase):
"port": "rdd0",
"peer_ifname": "rdd1"
}
super().jsonPost(url, data)
super().json_post(url, data)
url = "cli-json/add_rdd"
data = {
@@ -205,7 +203,7 @@ class ConnectTest(LFCliBase):
"port": "rdd1",
"peer_ifname": "rdd0"
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.05)
# reset redirect ports
@@ -215,7 +213,7 @@ class ConnectTest(LFCliBase):
"resource": 1,
"port": "rdd0"
}
super().jsonPost(url, data)
super().json_post(url, data)
url = "cli-json/reset_port"
data = {
@@ -223,7 +221,7 @@ class ConnectTest(LFCliBase):
"resource": 1,
"port": "rdd1"
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.05)
# create wanlink endpoints
@@ -236,7 +234,7 @@ class ConnectTest(LFCliBase):
"latency": 20,
"max_rate": 1544000
}
super().jsonPost(url, data)
super().json_post(url, data)
url = "cli-json/add_wl_endp"
data = {
@@ -247,7 +245,7 @@ class ConnectTest(LFCliBase):
"latency": 30,
"max_rate": 1544000
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.05)
# create wanlink cx
@@ -258,41 +256,45 @@ class ConnectTest(LFCliBase):
"tx_endp": "wlan0",
"rx_endp": "wlan1"
}
super().jsonPost(url, data)
super().json_post(url, data)
time.sleep(.5)
cxNames = ["testTCP", "testUDP", "CX_l4Test", "CX_fioTest", "CX_genTest1", "CX_wlan0"]
# get data before running traffic
try:
testTCPA = super().jsonGet("endp/testTCP-A?fields=tx+bytes,rx+bytes")
testTCPATX = testTCPA['endpoint']['tx bytes']
testTCPARX = testTCPA['endpoint']['rx bytes']
get_info = {}
get_info['testTCPA'] = super().json_get("endp/testTCP-A?fields=tx+bytes,rx+bytes")
get_info['testTCPB'] = super().json_get("endp/testTCP-B?fields=tx+bytes,rx+bytes")
get_info['testUDPA'] = super().json_get("endp/testUDP-A?fields=tx+bytes,rx+bytes")
get_info['testUDPB'] = super().json_get("endp/testUDP-B?fields=tx+bytes,rx+bytes")
get_info['l4Test'] = super().json_get("layer4/l4Test?fields=bytes-rd")
get_info['genTest1'] = super().json_get("generic/genTest1?fields=last+results")
get_info['wlan0'] = super().json_get("wl_ep/wlan0")
get_info['wlan1'] = super().json_get("wl_ep/wlan1")
testTCPB = super().jsonGet("endp/testTCP-B?fields=tx+bytes,rx+bytes")
testTCPBTX = testTCPB['endpoint']['tx bytes']
testTCPBRX = testTCPB['endpoint']['rx bytes']
for name in get_info:
if 'endpoint' not in name:
print(get_info[name])
raise ValueError (f"{name} missing endpoint value")
testUDPA = super().jsonGet("endp/testUDP-A?fields=tx+bytes,rx+bytes")
testUDPATX = testUDPA['endpoint']['tx bytes']
testUDPARX = testUDPA['endpoint']['rx bytes']
testTCPATX = get_info['testTCPA']['endpoint']['tx bytes']
testTCPARX = get_info['testTCPA']['endpoint']['rx bytes']
testTCPBTX = get_info['testTCPB']['endpoint']['tx bytes']
testTCPBRX = get_info['testTCPB']['endpoint']['rx bytes']
testUDPB = super().jsonGet("endp/testUDP-B?fields=tx+bytes,rx+bytes")
testUDPBTX = testUDPB['endpoint']['tx bytes']
testUDPBRX = testUDPB['endpoint']['rx bytes']
testUDPATX = get_info['testUDPA']['endpoint']['tx bytes']
testUDPARX = get_info['testUDPA']['endpoint']['rx bytes']
testUDPBTX = get_info['testUDPB']['endpoint']['tx bytes']
testUDPBRX = get_info['testUDPB']['endpoint']['rx bytes']
l4Test = super().jsonGet("layer4/l4Test?fields=bytes-rd")
l4TestBR = l4Test['endpoint']['bytes-rd']
l4TestBR = get_info['l4Test']['endpoint']['bytes-rd']
genTest1LR = get_info['genTest1']['endpoint']['last results']
genTest1 = super().jsonGet("generic/genTest1?fields=last+results")
genTest1LR = genTest1['endpoint']['last results']
wlan0 = super().jsonGet("wl_ep/wlan0")
wlan0TXB = wlan0['endpoint']['tx bytes']
wlan0RXP = wlan0['endpoint']['rx pkts']
wlan1 = super().jsonGet("wl_ep/wlan1")
wlan1TXB = wlan1['endpoint']['tx bytes']
wlan1RXP = wlan1['endpoint']['rx pkts']
wlan0TXB = get_info['wlan0']['endpoint']['tx bytes']
wlan0RXP = get_info['wlan0']['endpoint']['rx pkts']
wlan1TXB = get_info['wlan1']['endpoint']['tx bytes']
wlan1RXP = get_info['wlan1']['endpoint']['rx pkts']
except Exception as e:
print("Something went wrong")
@@ -306,6 +308,7 @@ class ConnectTest(LFCliBase):
"wlan0", "wlan1"]
removeCX(mgrURL, cxNames)
removeEndps(mgrURL, endpNames)
traceback.print_stack()
sys.exit(1)
# start cx traffic
@@ -320,51 +323,51 @@ class ConnectTest(LFCliBase):
# show tx and rx bytes for ports
os.system("echo eth1 >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo eth1 >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_portmod.pl --quiet 1 --manager localhost --port_name eth1 --show_port \"Txb,Rxb\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_portmod.pl --quiet 1 --manager localhost --port_name eth1 --show_port \"Txb,Rxb\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system(f"echo {staName} >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system(f"echo {staName} >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
f"./lf_portmod.pl --quiet 1 --manager localhost --port_name {staName} --show_port \"Txb,Rxb\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
f"./lf_portmod.pl --quiet 1 --manager localhost --port_name {staName} --show_port \"Txb,Rxb\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
# show tx and rx for endpoints PERL
os.system("echo TestTCP-A >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo TestTCP-A >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name testTCP-A --endp_vals \"Tx Bytes,Rx Bytes\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name testTCP-A --endp_vals \"Tx Bytes,Rx Bytes\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system("echo TestTCP-B >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo TestTCP-B >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name testTCP-B --endp_vals \"Tx Bytes,Rx Bytes\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name testTCP-B --endp_vals \"Tx Bytes,Rx Bytes\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system("echo TestUDP-A >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo TestUDP-A >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name testUDP-A --endp_vals \"Tx Bytes,Rx Bytes\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name testUDP-A --endp_vals \"Tx Bytes,Rx Bytes\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system("echo TestUDP-B >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo TestUDP-B >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name testUDP-B --endp_vals \"Tx Bytes,Rx Bytes\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name testUDP-B --endp_vals \"Tx Bytes,Rx Bytes\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system("echo l4Test >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo l4Test >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name l4Test --endp_vals Bytes-Read-Total >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name l4Test --endp_vals Bytes-Read-Total >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system("echo fioTest >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo fioTest >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name fioTest --endp_vals \"Bytes Written,Bytes Read\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name fioTest --endp_vals \"Bytes Written,Bytes Read\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system("echo genTest1 >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo genTest1 >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name genTest1 >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name genTest1 >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system("echo wlan0 >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo wlan0 >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name wlan0 --endp_vals \"Rx Pkts,Tx Bytes,Cur-Backlog,Dump File,Tx3s\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name wlan0 --endp_vals \"Rx Pkts,Tx Bytes,Cur-Backlog,Dump File,Tx3s\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
os.system("echo wlan1 >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
os.system("echo wlan1 >> ~/Documents/connectTestLogs/connectTestLatest.log")
cmd = (
"./lf_firemod.pl --action show_endp --endp_name wlan1 --endp_vals \"Rx Pkts,Tx Bytes,Cur-Backlog,Dump File,Tx3s\" >> /home/lanforge/Documents/connectTestLogs/connectTestLatest.log")
"./lf_firemod.pl --action show_endp --endp_name wlan1 --endp_vals \"Rx Pkts,Tx Bytes,Cur-Backlog,Dump File,Tx3s\" >> ~/Documents/connectTestLogs/connectTestLatest.log")
execWrap(cmd)
# stop cx traffic
@@ -379,32 +382,33 @@ class ConnectTest(LFCliBase):
# get data for endpoints JSON
print("Collecting Data")
try:
ptestTCPA = super().jsonGet("endp/testTCP-A?fields=tx+bytes,rx+bytes")
ptestTCPA = super().json_get("endp/testTCP-A?fields=tx+bytes,rx+bytes")
ptestTCPATX = ptestTCPA['endpoint']['tx bytes']
ptestTCPARX = ptestTCPA['endpoint']['rx bytes']
ptestTCPB = super().jsonGet("endp/testTCP-B?fields=tx+bytes,rx+bytes")
ptestTCPB = super().json_get("endp/testTCP-B?fields=tx+bytes,rx+bytes")
ptestTCPBTX = ptestTCPB['endpoint']['tx bytes']
ptestTCPBRX = ptestTCPB['endpoint']['rx bytes']
ptestUDPA = super().jsonGet("endp/testUDP-A?fields=tx+bytes,rx+bytes")
ptestUDPA = super().json_get("endp/testUDP-A?fields=tx+bytes,rx+bytes")
ptestUDPATX = ptestUDPA['endpoint']['tx bytes']
ptestUDPARX = ptestUDPA['endpoint']['rx bytes']
ptestUDPB = super().jsonGet("endp/testUDP-B?fields=tx+bytes,rx+bytes")
ptestUDPB = super().json_get("endp/testUDP-B?fields=tx+bytes,rx+bytes")
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes']
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes']
pl4Test = super().jsonGet("layer4/l4Test?fields=bytes-rd")
pl4Test = super().json_get("layer4/l4Test?fields=bytes-rd")
pl4TestBR = pl4Test['endpoint']['bytes-rd']
pgenTest1 = super().jsonGet("generic/genTest1?fields=last+results")
pgenTest1 = super().json_get("generic/genTest1?fields=last+results")
pgenTest1LR = pgenTest1['endpoint']['last results']
pwlan0 = super().jsonGet("wl_ep/wlan0")
pwlan0 = super().json_get("wl_ep/wlan0")
pwlan0TXB = pwlan0['endpoint']['tx bytes']
pwlan0RXP = pwlan0['endpoint']['rx pkts']
pwlan1 = super().jsonGet("wl_ep/wlan1")
pwlan1 = super().json_get("wl_ep/wlan1")
pwlan1TXB = pwlan1['endpoint']['tx bytes']
pwlan1RXP = pwlan1['endpoint']['rx pkts']
except Exception as e:
@@ -417,7 +421,7 @@ class ConnectTest(LFCliBase):
"resource": 1,
"port": staName
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
endpNames = ["testTCP-A", "testTCP-B",
"testUDP-A", "testUDP-B",

View File

@@ -2,34 +2,32 @@
# Define useful common methods -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit()
import os
import pprint
import json
import time
from time import sleep
from random import seed
seed( int(round(time.time() * 1000)))
seed(int(round(time.time() * 1000)))
from random import randint
from LANforge import LFRequest
debug_printer = pprint.PrettyPrinter(indent=2)
#global base_url # = "http://localhost:8080"
NA = "NA" # used to indicate parameter to skip
NA = "NA" # used to indicate parameter to skip
ADD_STA_FLAGS_DOWN_WPA2 = 68719477760
REPORT_TIMER_MS_FAST = 1500
REPORT_TIMER_MS_SLOW = 3000
class PortEID:
shelf = 1
resource = 1
port_id = 0
port_name = ""
shelf = 1
resource = 1
port_id = 0
port_name = ""
def __init__(self, p_resource=1, p_port_id=0, p_port_name=""):
resource = p_resource
@@ -47,9 +45,12 @@ class PortEID:
resource = json_s['resource']
port_id = json_s['id']
port_name = json_s['name']
# end class PortEID
def staNewDownStaRequest(sta_name, resource_id=1, radio="wiphy0", flags=ADD_STA_FLAGS_DOWN_WPA2, ssid="", passphrase="", debug_on=False):
def staNewDownStaRequest(sta_name, resource_id=1, radio="wiphy0", ssid="", passphrase="",
debug_on=False):
"""
For use with add_sta. If you don't want to generate mac addresses via patterns (xx:xx:xx:xx:81:*)
you can generate octets using random_hex.pop(0)[2:] and gen_mac(parent_radio_mac, octet)
@@ -59,14 +60,14 @@ def staNewDownStaRequest(sta_name, resource_id=1, radio="wiphy0", flags=ADD_STA_
:type sta_name: str
"""
data = {
"shelf":1,
"shelf": 1,
"resource": resource_id,
"radio": radio,
"sta_name": sta_name,
"flags": ADD_STA_FLAGS_DOWN_WPA2, # note flags for add_sta do not match set_port
"ssid": ssid,
"key": passphrase,
"mac": "xx:xx:xx:xx:*:xx", # "NA", #gen_mac(parent_radio_mac, random_hex.pop(0))
"mac": "xx:xx:xx:xx:*:xx", # "NA", #gen_mac(parent_radio_mac, random_hex.pop(0))
"mode": 0,
"rate": "DEFAULT"
}
@@ -87,8 +88,8 @@ def portSetDhcpDownRequest(resource_id, port_name, debug_on=False):
"shelf": 1,
"resource": resource_id,
"port": port_name,
"current_flags": 2147483649, # 0x1 = interface down + 2147483648 use DHCP values
"interest": 75513858, # includes use_current_flags + dhcp + dhcp_rls + ifdown
"current_flags": 2147483649, # 0x1 = interface down + 2147483648 use DHCP values
"interest": 75513858, # includes use_current_flags + dhcp + dhcp_rls + ifdown
"report_timer": REPORT_TIMER_MS_FAST
}
if (debug_on):
@@ -108,8 +109,8 @@ def portDhcpUpRequest(resource_id, port_name, debug_on=False):
"shelf": 1,
"resource": resource_id,
"port": port_name,
"current_flags": 2147483648, # vs 0x1 = interface down + use_dhcp
"interest": 75513858, # includes use_current_flags + dhcp + dhcp_rls + ifdown
"current_flags": 2147483648, # vs 0x1 = interface down + use_dhcp
"interest": 75513858, # includes use_current_flags + dhcp + dhcp_rls + ifdown
"report_timer": REPORT_TIMER_MS_FAST,
}
if (debug_on):
@@ -128,8 +129,8 @@ def portUpRequest(resource_id, port_name, debug_on=False):
"shelf": 1,
"resource": resource_id,
"port": port_name,
"current_flags": 0, # vs 0x1 = interface down
"interest": 8388610, # includes use_current_flags + dhcp + dhcp_rls + ifdown
"current_flags": 0, # vs 0x1 = interface down
"interest": 8388610, # includes use_current_flags + dhcp + dhcp_rls + ifdown
"report_timer": REPORT_TIMER_MS_FAST,
}
print("portUpRequest")
@@ -137,6 +138,7 @@ def portUpRequest(resource_id, port_name, debug_on=False):
debug_printer.pprint(data)
return data
def portDownRequest(resource_id, port_name, debug_on=False):
"""
Does not change the use_dhcp flag
@@ -150,8 +152,8 @@ def portDownRequest(resource_id, port_name, debug_on=False):
"shelf": 1,
"resource": resource_id,
"port": port_name,
"current_flags": 1, # vs 0x0 = interface up
"interest": 8388610, # = current_flags + ifdown
"current_flags": 1, # vs 0x0 = interface up
"interest": 8388610, # = current_flags + ifdown
"report_timer": REPORT_TIMER_MS_FAST,
}
if (debug_on):
@@ -159,8 +161,9 @@ def portDownRequest(resource_id, port_name, debug_on=False):
return data
def generateMac(parent_mac, random_octet):
print("************ random_octet: %s **************"%(random_octet))
def generateMac(parent_mac, random_octet, debug=False):
if debug:
print("************ random_octet: %s **************" % (random_octet))
my_oct = random_octet
if (len(random_octet) == 4):
my_oct = random_octet[2:]
@@ -168,13 +171,14 @@ def generateMac(parent_mac, random_octet):
octets[4] = my_oct
return ":".join(octets)
# this produces a named series similar to "sta000, sta001, sta002...sta0(end_id)"
# the padding_number is added to the start and end numbers and the resulting sum
# has the first digit trimmed, so f(0, 1, 10000) => {0000, 0001}
def portNameSeries(prefix="sta", start_id=0, end_id=1, padding_number=10000):
name_list = []
for i in range((padding_number+start_id), (padding_number+end_id+1)):
sta_name = prefix+str(i)[1:]
for i in range((padding_number + start_id), (padding_number + end_id + 1)):
sta_name = prefix + str(i)[1:]
name_list.append(sta_name)
return name_list
@@ -188,71 +192,82 @@ def generateRandomHex():
random_hex.append(hex(random_dec))
return random_hex
# return reverse map of aliases to port records
def portAliasesInList(json_list):
if len(json_list) < 1:
raise Exception("empty list")
#
# expect nested records, which is an artifact of some ORM
# that other customers expect:
# [
# {
# "1.1.eth0": {
# "alias":"eth0"
# }
# },
# { ... }
def portListToAliasMap(json_list):
reverse_map = {}
if len(json_list) < 1:
return reverse_map
json_interfaces = json_list
if 'interfaces' in json_list:
json_interfaces = json_list['interfaces']
# expect nested records, which is an artifact of some ORM
# that other customers expect:
# [
# {
# "1.1.eth0": {
# "alias":"eth0"
# }
# },
# { ... }
for record in json_interfaces:
if len(record.keys()) < 1:
continue
record_keys = record.keys()
#debug_printer.pprint(record_keys)
k2 = ""
# we expect one key in record keys, but we can't expect [0] to be populated
json_entry = None
for k in record_keys:
k2 = k
if k2.__contains__("Unknown"):
#debug_printer.pprint("skipping: "+k2)
json_entry = record[k]
# skip uninitialized port records
if k2.find("Unknown") >= 0:
continue
port_json = record[k2]
reverse_map[port_json['alias']] = record
#print("resulting map: ")
#debug_printer.pprint(reverse_map)
reverse_map[k2] = json_entry
return reverse_map
def findPortEids(resource_id=1, base_url="http://localhost:8080", port_names=()):
def findPortEids(resource_id=1, base_url="http://localhost:8080", port_names=(), debug=False):
port_eids = []
if len(port_names) < 0:
return []
port_url = "/port/1"
if base_url.endswith('/'):
port_url = port_url[1:]
for port_name in port_names:
url = "/port/1/%s/%s"%(resource_id,port_name)
url = f"{port_url}/{resource_id}/{port_name}"
lf_r = LFRequest.LFRequest(url)
try:
response = lf_r.getAsJson()
response = lf_r.getAsJson(debug)
if response == None:
continue
port_eids.append(PortEID(response))
except:
print("Not found: "+port_name)
return None
print("Not found: " + port_name)
return port_eids
def waitUntilPortsAdminDown(resource_id=1, base_url="http://localhost:8080", port_list=()):
print("waitUntilPortsAdminDown")
up_stations = port_list.copy()
sleep(1)
port_url = "/port/1"
if base_url.endswith('/'):
port_url = port_url[1:]
while len(up_stations) > 0:
up_stations = []
for port_name in port_list:
url = base_url+"/port/1/%s/%s?fields=device,down" % (resource_id, port_name)
url = f"{base_url}{port_url}/{resource_id}/{port_name}?fields=device,down"
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson(show_error=False)
if json_response == None:
print("port %s disappeared"%port_name)
print(f"port {port_name} disappeared")
continue
if "interface" in json_response:
json_response = json_response['interface']
@@ -261,18 +276,23 @@ def waitUntilPortsAdminDown(resource_id=1, base_url="http://localhost:8080", por
sleep(1)
return None
def waitUntilPortsAdminUp(resource_id=1, base_url="http://localhost:8080", port_list=()):
print("waitUntilPortsAdminUp")
down_stations = port_list.copy()
sleep(1)
port_url = "/port/1"
if base_url.endswith('/'):
port_url = port_url[1:]
#url = /%s/%s?fields=device,down" % (resource_id, port_name)
while len(down_stations) > 0:
down_stations = []
for port_name in port_list:
url = base_url+"/port/1/%s/%s?fields=device,down" % (resource_id, port_name)
url = f"{base_url}{port_url}/{resource_id}/{port_name}?fields=device,down"
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson(show_error=False)
if json_response == None:
print("port %s disappeared"%port_name)
print("port %s disappeared" % port_name)
continue
if "interface" in json_response:
json_response = json_response['interface']
@@ -282,78 +302,103 @@ def waitUntilPortsAdminUp(resource_id=1, base_url="http://localhost:8080", port_
return None
def waitUntilPortsDisappear(resource_id=1, base_url="http://localhost:8080", port_list=()):
print("waitUntilPortsDisappear")
def waitUntilPortsDisappear(resource_id=1, base_url="http://localhost:8080", port_list=(), debug=False):
if (debug):
print("waitUntilPortsDisappear")
url = "/port/1"
if base_url.endswith('/'):
url = url[1:]
found_stations = port_list.copy()
sleep(1)
while len(found_stations) > 0:
found_stations = []
sleep(1)
for port_name in port_list:
sleep(1)
url = base_url+"/port/1/%s/%s" % (resource_id, port_name)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson(show_error=False)
check_url = f"{base_url}{url}/{resource_id}/{port_name}"
if debug:
print("checking:"+check_url)
lf_r = LFRequest.LFRequest(check_url)
json_response = lf_r.getAsJson(show_error=debug)
if (json_response != None):
found_stations.append(port_name)
return None
return
def waitUntilPortsAppear(resource_id=1, base_url="http://localhost:8080", port_list=()):
print("waitUntilPortsAppear")
def waitUntilPortsAppear(resource_id=1, base_url="http://localhost:8080", port_list=(), debug=False):
if debug:
print("waitUntilPortsAppear")
found_stations = []
sleep(2)
port_url = "/port/1"
ncshow_url = "/cli-form/nc_show_ports"
if base_url.endswith('/'):
port_url = port_url[1:]
ncshow_url = ncshow_url[1:]
while len(found_stations) < len(port_list):
found_stations = []
for port_name in port_list:
sleep(1)
url = base_url+"/port/1/%s/%s" % (resource_id, port_name)
url = f"{base_url}{port_url}/{resource_id}/{port_name}"
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson(show_error=False)
if (json_response != None):
found_stations.append(port_name)
else:
lf_r = LFRequest.LFRequest(base_url+"/cli-form/nc_show_ports")
lf_r.addPostData({"shelf":1, "resource":resource_id, "port":port_name, "flags":1})
lf_r.formPost()
lf_r = LFRequest.LFRequest(f"{base_url}{ncshow_url}")
lf_r.addPostData({"shelf": 1, "resource": resource_id, "port": port_name, "flags": 1})
lf_r.formPost()
sleep(2)
print("These stations appeared: "+", ".join(found_stations))
return None
if debug:
print("These stations appeared: " + ", ".join(found_stations))
return
def removePort(resource, port_name, baseurl="http://localhost:8080/"):
lf_r = LFRequest.LFRequest(baseurl+"cli-json/rm_vlan")
def removePort(resource, port_name, baseurl="http://localhost:8080/", debug=False):
if debug:
print("removePort")
url = "/cli-json/rm_vlan"
if baseurl.endswith('/'):
url = url[1:]
lf_r = LFRequest.LFRequest(baseurl + url)
lf_r.addPostData({
"shelf": 1,
"resource": resource,
"port": port_name
})
lf_r.jsonPost(False)
lf_r.jsonPost(debug)
def removeCX(mgrURL, cxNames):
for name in cxNames:
#print(f"Removing CX {name}")
data = {
"test_mgr":"all",
"cx_name":name
}
lf_r = LFRequest.LFRequest(mgrURL+"cli-json/rm_cx")
lf_r.addPostData(data)
lf_r.jsonPost()
def removeEndps(mgrURL, endpNames):
for name in endpNames:
#print(f"Removing endp {name}")
data = {
"endp_name":name
}
lf_r = LFRequest.LFRequest(mgrURL+"cli-json/rm_endp")
lf_r.addPostData(data)
lf_r.jsonPost()
def removeCX(baseurl, cxNames, debug=False):
url = "/cli-json/rm_cx"
if baseurl.endswith('/'):
url = url[1:]
for name in cxNames:
data = {
"test_mgr": "all",
"cx_name": name
}
lf_r = LFRequest.LFRequest(baseurl + url)
lf_r.addPostData(data)
lf_r.jsonPost(debug)
def removeEndps(baseurl, endpNames, debug=False):
url = "/cli-json/rm_endp"
if baseurl.endswith('/'):
url = url[1:]
lf_r = LFRequest.LFRequest(baseurl + url)
for name in endpNames:
data = {
"endp_name": name
}
lf_r.addPostData(data)
lf_r.jsonPost(debug)
def execWrap(cmd):
if os.system(cmd) != 0:
print("\nError with " + cmd + ",bye\n")
exit(1)
if os.system(cmd) != 0:
print("\nError with '" + cmd + "', bye\n")
exit(1)
###

View File

@@ -0,0 +1,53 @@
# Flags for the add_sta command
add_sta_flags = {
"wpa_enable" : 0x10, # Enable WPA
"custom_conf" : 0x20, # Use Custom wpa_supplicant config file.
"wep_enable" : 0x200, # Use wpa_supplicant configured for WEP encryption.
"wpa2_enable" : 0x400, # Use wpa_supplicant configured for WPA2 encryption.
"ht40_disable" : 0x800, # Disable HT-40 even if hardware and AP support it.
"scan_ssid" : 0x1000, # Enable SCAN-SSID flag in wpa_supplicant.
"passive_scan" : 0x2000, # Use passive scanning (don't send probe requests).
"disable_sgi" : 0x4000, # Disable SGI (Short Guard Interval).
"lf_sta_migrate" : 0x8000, # OK-To-Migrate (Allow station migration between LANforge radios)
"verbose" : 0x10000, # Verbose-Debug: Increase debug info in wpa-supplicant and hostapd logs.
"80211u_enable" : 0x20000, # Enable 802.11u (Interworking) feature.
"80211u_auto" : 0x40000, # Enable 802.11u (Interworking) Auto-internetworking feature. Always enabled currently.
"80211u_gw" : 0x80000, # AP Provides access to internet (802.11u Interworking)
"80211u_additional" : 0x100000, # AP requires additional step for access (802.11u Interworking)
"80211u_e911" : 0x200000, # AP claims emergency services reachable (802.11u Interworking)
"80211u_e911_unauth" : 0x400000, # AP provides Unauthenticated emergency services (802.11u Interworking)
"hs20_enable" : 0x800000, # Enable Hotspot 2.0 (HS20) feature. Requires WPA-2.
"disable_gdaf" : 0x1000000, # AP: Disable DGAF (used by HotSpot 2.0).
"8021x_radius" : 0x2000000, # Use 802.1x (RADIUS for AP).
"80211r_pmska_cache" : 0x4000000, # Enable oportunistic PMSKA caching for WPA2 (Related to 802.11r).
"disable_ht80" : 0x8000000, # Disable HT80 (for AC chipset NICs only)
"ibss_mode" : 0x20000000, # Station should be in IBSS mode.
"osen_enable" : 0x40000000, # Enable OSEN protocol (OSU Server-only Authentication)
"disable_roam" : 0x80000000, # Disable automatic station roaming based on scan results.
"ht160_enable" : 0x100000000, # Enable HT160 mode.
"disable_fast_reauth" : 0x200000000, # Disable fast_reauth option for virtual stations.
"mesh_mode" : 0x400000000, # Station should be in MESH mode.
"power_save_enable" : 0x800000000, # Station should enable power-save. May not work in all drivers/configurations.
"create_admin_down" : 0x1000000000, # Station should be created admin-down.
"wds-mode" : 0x2000000000, # WDS station (sort of like a lame mesh), not supported on ath10k
"no-supp-op-class-ie" : 0x4000000000, # Do not include supported-oper-class-IE in assoc requests. May work around AP bugs.
"txo-enable" : 0x8000000000, # Enable/disable tx-offloads, typically managed by set_wifi_txo command
"use-wpa3" : 0x10000000000, # Enable WPA-3 (SAE Personal) mode.
}
add_sta_modes = {
"AUTO" : 0, # 802.11g
"802.11a" : 1, # 802.11a
"b" : 2, # 802.11b
"g" : 3, # 802.11g
"abg" : 4, # 802.11abg
"abgn" : 5, # 802.11abgn
"bgn" : 6, # 802.11bgn
"bg" : 7, # 802.11bg
"abgnAC" : 8, # 802.11abgn-AC
"anAC" : 9, # 802.11an-AC
"an" : 10, # 802.11an
"bgnAC" : 11, # 802.11bgn-AC
"abgnAX" : 12, # 802.11abgn-AX, a/b/g/n/AC/AX (dual-band AX) support
"bgnAX" : 13, # 802.11bgn-AX
"anAX" : 14, # 802.11an-AX
}

View File

@@ -1,49 +1,85 @@
#!env /usr/bin/python
# Extend this class to use common set of debug and request features for your script
import pprint
from pprint import pprint
import LANforge.LFUtils
from LANforge.LFUtils import *
import traceback
from traceback import extract_stack
class LFCliBase:
# do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn)
# that is py2 era syntax and will force self into the host variable, making you
# very confused.
def __init__(self, _lfjson_host, _lfjson_port, _debug=False):
def __init__(self, _lfjson_host, _lfjson_port, _debug=False, _halt_on_error=False):
self.lfjson_host = _lfjson_host
self.lfjson_port = _lfjson_port
self.debugOn = _debug
self.haltOnError = _halt_on_error
self.mgr_url = "http://%s:%s/" % (self.lfjson_host, self.lfjson_port)
def jsonPost(self, _req_url, _data):
lf_r = LFRequest.LFRequest(self.mgr_url + _req_url)
_data['suppress_preexec_cli'] = True
_data['suppress_preexec_method'] = True
lf_r.addPostData(_data)
if (self.debugOn):
LANforge.LFUtils.debug_printer.pprint(_data)
json_response = lf_r.jsonPost(self.debugOn)
# Debugging
# if (json_response != None):
# print("jsonReq: response: ")
# LFUtils.debug_printer.pprint(vars(json_response))
def json_post(self, _req_url, _data):
json_response = None
if self.mgr_url.endswith('/') and _req_url.startswith('/'):
_req_url = _req_url[1:]
try:
lf_r = LFRequest.LFRequest(self.mgr_url + _req_url)
_data['suppress_preexec_cli'] = True
_data['suppress_preexec_method'] = True
lf_r.addPostData(_data)
if (self.debugOn):
LANforge.LFUtils.debug_printer.pprint(_data)
json_response = lf_r.jsonPost(self.debugOn)
except Exception as x:
if self.debugOn or self.haltOnError:
print(f"jsonPost posted to {_req_url}")
pprint(_data)
print(f"Exception {x}:")
traceback.print_exception(Exception, x, x.__traceback__, chain=True)
if self.haltOnError:
exit(1)
return json_response
def jsonGet(self, _req_url):
lf_r = LFRequest.LFRequest(self.mgr_url + _req_url)
json_response = lf_r.getAsJson(self.debugOn)
def json_get(self, _req_url):
json_response = None
if self.mgr_url.endswith('/') and _req_url.startswith('/'):
_req_url = _req_url[1:]
try:
lf_r = LFRequest.LFRequest(self.mgr_url + _req_url)
json_response = lf_r.getAsJson(self.debugOn)
if (json_response is None) and self.debugOn:
raise ValueError(json_response)
except ValueError as ve:
if self.debugOn or self.haltOnError:
print("jsonGet asked for "+_req_url)
print(f"Exception {ve}:")
traceback.print_exception(ValueError, ve, ve.__traceback__, chain=True)
if self.haltOnError:
sys.exit(1)
return json_response
def checkConnect(self):
def error(self, exception):
#print(f"lfcli_base error: {exception}")
pprint.pprint(exception)
traceback.print_exception(Exception, exception, exception.__traceback__, chain=True)
if self.haltOnError:
print("halting on error")
sys.exit(1)
#else:
# print("continuing...")
def check_connect(self):
print(f"Checking for LANforge GUI connection: {self.mgr_url}")
response = self.jsonGet("/")
response = self.json_get("/")
duration = 0
while (response is None) and (duration < 300):
print(f"LANforge GUI connection not found sleeping 5 seconds, tried: {self.mgr_url}")
duration += 2
time.sleep(2)
response = self.jsonGet("")
response = self.json_get("")
if duration >= 300:
print("Could not connect to LANforge GUI")

View File

@@ -0,0 +1,100 @@
set_port_current_flags = {
"if_down": 0x1, # Interface Down
"fixed_10bt_hd": 0x2, # Fixed-10bt-HD (half duplex)
"fixed_10bt_fd": 0x4, # Fixed-10bt-FD
"fixed_100bt_hd": 0x8, # Fixed-100bt-HD
"fixed_100bt_fd": 0x10, # Fixed-100bt-FD
"auto_neg": 0x100, # auto-negotiate
"adv_10bt_hd": 0x100000, # advert-10bt-HD
"adv_10bt_fd": 0x200000, # advert-10bt-FD
"adv_100bt_hd": 0x400000, # advert-100bt-HD
"adv_100bt_fd": 0x800000, # advert-100bt-FD
"adv_flow_ctl": 0x8000000, # advert-flow-control
"promisc": 0x10000000, # PROMISC
"use_dhcp": 0x80000000, # USE-DHCP
"adv_10g_hd": 0x400000000, # advert-10G-HD
"adv_10g_fd": 0x800000000, # advert-10G-FD
"tso_enabled": 0x1000000000, # TSO-Enabled
"lro_enabled": 0x2000000000, # LRO-Enabled
"gro_enabled": 0x4000000000, # GRO-Enabled
"ufo_enabled": 0x8000000000, # UFO-Enabled
"gso_enabled": 0x10000000000, # GSO-Enabled
"use_dhcpv6": 0x20000000000, # USE-DHCPv6
"rxfcs": 0x40000000000, # RXFCS
"no_dhcp_rel": 0x80000000000, # No-DHCP-Release
"staged_ifup": 0x100000000000, # Staged-IFUP
"http_enabled": 0x200000000000, # Enable HTTP (nginx) service for this port.
"ftp_enabled": 0x400000000000, # Enable FTP (vsftpd) service for this port.
"aux_mgt": 0x800000000000, # Enable Auxillary-Management flag for this port.
"no_dhcp_restart": 0x1000000000000, # Disable restart of DHCP on link connect (ie, wifi).
# This should usually be enabled when testing wifi
# roaming so that the wifi station can roam
# without having to re-acquire a DHCP lease each
# time it roams.
"ignore_dhcp": 0x2000000000000, # Don't set DHCP acquired IP on interface,
# instead print CLI text message. May be useful
# in certain wifi-bridging scenarios where external
# traffic-generator cannot directly support DHCP.
"no_ifup_post": 0x4000000000000, # Skip ifup-post script if we can detect that we
# have roamed. Roaming is considered true if
# the IPv4 address has not changed.
"radius_enabled": 0x20000000000000, # Enable RADIUS service (using hostapd as radius server)
"ipsec_client": 0x40000000000000, # Enable client IPSEC xfrm on this port.
"ipsec_concentrator": 0x80000000000000, # Enable concentrator (upstream) IPSEC xfrm on this port.
"service_dns": 0x100000000000000, # Enable DNS (dnsmasq) service on this port.
}
set_port_cmd_flags = {
"reset_transceiver": 0x1, # Reset transciever
"restart_link_neg": 0x2, # Restart link negotiation
"force_MII_probe": 0x4, # Force MII probe
"no_hw_probe": 0x8, # Don't probe hardware
"probe_wifi": 0x10, # Probe WIFI
"new_gw_probe": 0x20, # Force new GW probe
"new_gw_probe_dev": 0x40, # Force new GW probe for ONLY this interface
"from_user": 0x80, # from_user (Required to change Mgt Port config
# (IP, DHCP, etc)
"skip_port_bounce": 0x100, # skip-port-bounce (Don't ifdown/up
# interface if possible.)
"from_dhcp": 0x200, # Settings come from DHCP client.
"abort_if_scripts": 0x400, # Forceably abort all ifup/down scripts on this Port.
"use_pre_ifdown": 0x800, # Call pre-ifdown script before bringing interface down.
}
set_port_interest_flags = {
"command_flags" : 0x1, # apply command flags
"current_flags" : 0x2, # apply current flags
"ip_address" : 0x4, # IP address
"ip_Mask" : 0x8, # IP mask
"ip_gateway" : 0x10, # IP gateway
"mac_address" : 0x20, # MAC address
"supported_flags" : 0x40, # apply supported flags
"link_speed" : 0x80, # Link speed
"mtu" : 0x100, # MTU
"tx_queue_length" : 0x200, # TX Queue Length
"promisc_mode" : 0x400, # PROMISC mode
"interal_use_1" : 0x800, # (INTERNAL USE)
"alias" : 0x1000, # Port alias
"rx_all" : 0x2000, # Rx-ALL
"dhcp" : 0x4000, # including client-id.
"rpt_timer" : 0x8000, # Report Timer
"bridge" : 0x10000, # BRIDGE
"ipv6_addrs" : 0x20000, # IPv6 Address
"bypass" : 0x40000, # Bypass
"gen_offload" : 0x80000, # Generic offload flags, everything but LRO
"cpu_mask" : 0x100000, # CPU Mask, useful for pinning process to CPU core
"lro_offload" : 0x200000, # LRO (Must be disabled when used in Wanlink,
# and probably in routers)
"sta_br_id" : 0x400000, # WiFi Bridge identifier. 0 means no bridging.
"ifdown" : 0x800000, # Down interface
"dhcpv6" : 0x1000000, # Use DHCPv6
"rxfcs" : 0x2000000, # RXFCS
"dhcp_rls" : 0x4000000, # DHCP release
"svc_httpd" : 0x8000000, # Enable/disable HTTP Service for a port
"svc_ftpd" : 0x10000000, # Enable/disable FTP Service for a port
"aux_mgt" : 0x20000000, # Enable/disable Auxillary-Management for a port
"no_dhcp_conn" : 0x40000000, # Enable/disable NO-DHCP-ON-CONNECT flag for a port
"no_apply_dhcp" : 0x80000000, # Enable/disable NO-APPLY-DHCP flag for a port
"skip_ifup_roam" : 0x100000000, # Enable/disable SKIP-IFUP-ON-ROAM flag for a port
}

View File

@@ -1,55 +0,0 @@
#!/usr/bin/env python3
import os
import time
import sys
import json
import pprint
from LANforge import LFRequest
from LANforge import LFUtils
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit()
def jsonReq(mgrURL,reqURL,postData,debug=False):
lf_r = LFRequest.LFRequest(mgrURL + reqURL)
lf_r.addPostData(postData)
if debug:
json_response = lf_r.jsonPost(True)
LFUtils.debug_printer.pprint(json_response)
sys.exit(1)
else:
lf_r.jsonPost()
def createGenEndp(alias, shelf, rsrc, port, type):
mgrURL = "http://localhost:8080/"
reqURL = "cli-json/add_gen_endp"
data = {
"alias":alias,
"shelf":shelf,
"resource":rsrc,
"port":port,
"type":type
}
jsonReq(mgrURL,reqURL,data)
def setFlags(endpName, flagName,val):
mgrURL = "http://localhost:8080/"
reqURL = "cli-json/set_endp_flag"
data = {
"name":endpName,
"flag":flagName,
"val":val
}
jsonReq(mgrURL,reqURL,data)
def setCmd(endpName,cmd):
mgrURL = "http://localhost:8080/"
reqURL = "cli-json/set_gen_cmd"
data = {
"name":endpName,
"command":cmd
}
jsonReq(mgrURL,reqURL,data)

View File

@@ -205,7 +205,7 @@ def main():
json_response = lf_r.getAsJson()
if json_response == None:
raise Exception("no reponse to: "+url)
port_map = LFUtils.portAliasesInList(json_response)
port_map = LFUtils.portListToAliasMap(json_response)
#LFUtils.debug_printer.pprint(port_map)
for sta_name in desired_stations:

39
py-json/generic_cx.py Executable file
View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit()
from LANforge.lfcli_base import LFCliBase
class GenericCx(LFCliBase):
def __init__(self, lfclient_host, lfclient_port):
super().__init__(lfclient_host, lfclient_port)
self.lfclient_host = lfclient_host
self.lfclient_port = lfclient_port
def createGenEndp(self, alias, shelf, rsrc, port, type):
data = {
"alias": alias,
"shelf": shelf,
"resource": rsrc,
"port": port,
"type": type
}
super().json_post("cli-json/add_gen_endp", data)
def setFlags(self, endpName, flagName, val):
data = {
"name": endpName,
"flag": flagName,
"val": val
}
super().json_post("cli-json/set_endp_flag", data)
def setCmd(self, endpName, cmd):
data = {
"name": endpName,
"command": cmd
}
super().json_post("cli-json/set_gen_cmd", data)

View File

@@ -1,40 +1,64 @@
#!/usr/bin/env python3
import re
import time
import pprint
from pprint import pprint
from LANforge import LFRequest
from LANforge import LFUtils
from LANforge import set_port
from LANforge import add_sta
from LANforge import lfcli_base
from LANforge.lfcli_base import LFCliBase
class Realm(LFCliBase):
def __init__(self, lfclient_host="localhost", lfclient_port=8080, debug=False):
super().__init__(lfclient_host, lfclient_port, debug, _halt_on_error=True)
self.lfclient_url = f"http://{lfclient_host}:{lfclient_port}"
super().check_connect()
class Realm:
def __init__(self, mgr_url="http://localhost:8080"):
self.mgrURL = mgr_url
# Returns json response from webpage of all layer 3 cross connects
def cx_list(self):
#Returns json response from webpage of all layer 3 cross connects
lf_r = LFRequest.LFRequest(self.mgrURL + "/cx")
response = lf_r.getAsJson(True)
response = super().json_get("/cx")
return response
# Returns map of all stations with port+type == WIFI-STATION
def station_map(self):
response = super().json_get("/port/list?fields=_links,alias,device,port+type")
if (response is None) or ("interfaces" not in response):
pprint(response)
print("station_list: incomplete response, halting")
exit(1)
sta_map = {}
temp_map = LFUtils.portListToAliasMap(response)
for k,v in temp_map.items():
if (v['port type'] == "WIFI-STA"):
sta_map[k] = v;
temp_map.clear()
del temp_map
del response
return sta_map
# Returns list of all stations with port+type == WIFI-STATION
def station_list(self):
#Returns list of all stations with "sta" in their name
sta_list = []
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
response = lf_r.getAsJson(True)
response = super().json_get("/port/list?fields=_links,alias,device,port+type")
if (response is None) or ("interfaces" not in response):
print("station_list: incomplete response:")
pprint(response)
exit(1)
for x in range(len(response['interfaces'])):
for k,v in response['interfaces'][x].items():
if "sta" in v['device']:
if v['port type'] == "WIFI-STA":
sta_list.append(response['interfaces'][x])
del response
return sta_list
# Returns list of all VAPs with "vap" in their name
def vap_list(self):
#Returns list of all VAPs with "vap" in their name
sta_list = []
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
response = lf_r.getAsJson(True)
response = super().json_get("/port/list?fields=_links,alias,device,port+type")
for x in range(len(response['interfaces'])):
for k,v in response['interfaces'][x].items():
if "vap" in v['device']:
@@ -42,58 +66,93 @@ class Realm:
return sta_list
# removes port by eid/eidpn
def removeVlanByEid(self, eid):
if (eid is None) or ("" == eid):
raise ValueError(f"removeVlanByEid wants eid like 1.1.sta0 but given[{eid}]")
hunks = eid.split('.')
#print("- - - - - - - - - - - - - - - - -")
#pprint(hunks)
#pprint(self.lfclient_url)
#print("- - - - - - - - - - - - - - - - -")
if (len(hunks) > 3) or (len(hunks) < 2):
raise ValueError(f"removeVlanByEid wants eid like 1.1.sta0 but given[{eid}]")
elif len(hunks) == 3:
LFUtils.removePort(hunks[1], hunks[2], self.lfclient_url)
else:
LFUtils.removePort(hunks[0], hunks[1], self.lfclient_url)
# Searches for ports that match a given pattern and returns a list of names
def find_ports_like(self, pattern=""):
#Searches for ports that match a given pattern and returns a list of names
device_name_list = []
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
response = lf_r.getAsJson(True)
#print(response)
for x in range(len(response['interfaces'])):
for k,v in response['interfaces'][x].items():
if v['device'] != "NA":
device_name_list.append(v['device'])
response = super().json_get("/port/list?fields=_links,alias,device,port+type")
alias_map = LFUtils.portListToAliasMap(response)
#pprint(alias_map)
prelim_map = {}
matched_map = {}
for name,record in alias_map.items():
try:
#print("- prelim - - - - - - - - - - - - - - - - - - -")
#pprint(record)
if (record["port type"] == "WIFI-STA"):
prelim_map[name] = record
matched_list = []
except Exception as x:
super().error(x)
prefix = ""
for port_name in device_name_list:
try:
if pattern.index("+") > 0:
match = re.search(r"^([^+]+)[+]$", pattern)
if match.group(1):
#print("name:", port_name, " Group 1: ",match.group(1))
prefix = match.group(1)
if port_name.index(prefix) == 0:
matched_list.append(port_name)
try:
if pattern.find("+") > 0:
match = re.search(r"^([^+]+)[+]$", pattern)
if match.group(1):
#print("name:", port_name, " Group 1: ",match.group(1))
prefix = match.group(1)
for port_eid,record in prelim_map.items():
if port_eid.find(prefix) >= 0:
matched_map[port_eid] = record
elif pattern.index("*") > 0:
match = re.search(r"^([^\*]+)[\*]$", pattern)
if match.group(1):
prefix = match.group(1)
#print("group 1: ",prefix)
if port_name.index(prefix) == 0:
matched_list.append(port_name)
elif pattern.find("*") > 0:
match = re.search(r"^([^\*]+)[\*]$", pattern)
if match.group(1):
prefix = match.group(1)
#print("group 1: ",prefix)
for port_eid,record in prelim_map.items():
if port_eid.find(prefix) >= 0:
matched_map[port_eid] = record
elif pattern.index("[") > 0:
match = re.search(r"^([^\[]+)\[(\d+)\.\.(\d+)\]$", pattern)
if match.group(0):
#print("[group1]: ", match.group(1))
prefix = match.group(1)
if port_name.index(prefix):
matched_list.append(port_name) # wrong but better
except ValueError as e:
print(e)
return matched_list
elif pattern.find("[") > 0:
match = re.search(r"^([^\[]+)\[(\d+)\.\.(\d+)\]$", pattern)
if match.group(0):
#print("[group1]: ", match.group(1))
#print("[group2]: ", match.group(2))
#print("[group3]: ", match.group(3))
prefix = match.group(1)
for port_eid,record in prelim_map.items():
if port_eid.find(prefix) >= 0:
port_suf = record["device"][len(prefix):]
if (port_suf >= match.group(2)) and (port_suf <= match.group(3)):
#print(f"{port_name}: suffix[{port_name}] between {match.group(2)}:{match.group(3)}")
matched_map[port_eid] = record
except ValueError as e:
super().error(e)
return matched_map
def new_station_profile(self):
station_prof = StationProfile(self.lfclient_url, debug=self.debugOn)
return station_prof
def new_cx_profile(self):
cx_prof = CXProfile(self.lfclient_url, debug=self.debugOn)
return cx_prof
class CXProfile:
def __init__(self, mgr_url="http://localhost:8080"):
self.mgr_url = mgr_url
def __init__(self, lfclient_host, lfclient_port, debug=False):
self.lfclient_url = f"http://{lfclient_host}:{lfclient_port}/"
self.debug = debug
self.post_data = []
# Adds post data for a cross-connect between eth1 and specified list of ports, appends to array
def add_ports(self, side, endp_type, ports=[]):
#Adds post data for a cross-connect between eth1 and specified list of ports, appends to array
side = side.upper()
endp_side_a = {
"alias":"",
@@ -131,7 +190,7 @@ class CXProfile:
endp_side_b["alias"] = port_name+"CX-B"
endp_side_b["port"] = port_name
lf_r = LFRequest.LFRequest(self.mgr_url + "/cli-json/add_endp")
lf_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/add_endp")
lf_r.addPostData(endp_side_a)
json_response = lf_r.jsonPost(True)
lf_r.addPostData(endp_side_b)
@@ -149,50 +208,208 @@ class CXProfile:
self.post_data.append(data)
# Creates cross-connect for each port specified in the addPorts function
def create(self, sleep_time=.5):
#Creates cross-connect for each port specified in the addPorts function
for data in self.post_data:
lf_r = LFRequest.LFRequest(self.mgr_url + "/cli-json/add_cx")
lf_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/add_cx")
lf_r.addPostData(data)
json_response = lf_r.jsonPost(True)
#LFUtils.debug_printer.pprint(json_response)
time.sleep(sleep_time)
# use the station profile to set the combination of features you want on your stations
# once this combination is configured, build the stations with the build(resource, radio, number) call
# build() calls will fail if the station already exists. Please survey and clean your resource
# before calling build()
# survey = Realm.findStations(resource=1)
# Realm.removeStations(survey)
# profile = Realm.newStationProfile()
# profile.set...
# profile.build(resource, radio, 64)
#
class StationProfile:
def __init__(self, mgr_url="localhost:8080", ssid="NA", ssid_pass="NA", security="open", start_id="", mode=0, up=True, dhcp=True):
self.mgrURL = mgr_url
def __init__(self, lfclient_url, ssid="NA", ssid_pass="NA", security="open", prefix="00000", mode=0, up=True, dhcp=True, debug=False):
self.debug = debug
self.lfclient_url = lfclient_url
self.ssid = ssid
self.ssid_pass = ssid_pass
self.mode = mode
self.up = up
self.dhcp = dhcp
self.security = security
self.COMMANDS = ["add_sta", "set_port"]
self.desired_add_sta_flags = ["wpa2_enable", "80211u_enable", "create_admin_down"]
self.desired_add_sta_flags_mask = ["wpa2_enable", "80211u_enable", "create_admin_down"]
self.prefix = prefix
self.add_sta_data = {
"shelf": 1,
"resource": 1,
"radio": None,
"sta_name": None,
"ssid": None,
"key": None,
"mode": 0,
"mac": "xx:xx:xx:xx:*:xx",
"flags": 0, # (0x400 + 0x20000 + 0x1000000000) # create admin down
}
self.desired_set_port_current_flags = ["if_down", "use_dhcp"]
self.desired_set_port_interest_flags = ["current_flags", "dhcp", "ifdown"]
self.set_port_data = {
"shelf": 1,
"resource": 1,
"port": None,
"current_flags": 0,
"interest": 0, #(0x2 + 0x4000 + 0x800000) # current, dhcp, down,
}
def build(self, resource_radio, num_stations):
#Checks for errors in initialization values and creates specified number of stations using init parameters
try:
resource = resource_radio[0: resource_radio.index(".")]
name = resource_radio[resource_radio.index(".") + 1:]
if name.index(".") >= 0:
radio_name = name[name.index(".")+1 : ]
except ValueError as e:
print(e)
def use_wpa2(self, on=False, ssid=None, passwd=None):
if on:
if (ssid is None) or ("" == ssid):
raise ValueError("use_wpa2: WPA2 requires ssid")
if (passwd is None) or ("" == passwd):
raise ValueError("use_wpa2: WPA2 requires passphrase or [BLANK]")
self.set_command_param("add_sta", "ssid", ssid)
self.set_command_param("add_sta", "key", passwd)
else:
self.set_command_flag("add_sta", "flags", "wpa2_enable", 0)
lf_r = LFRequest.LFRequest(self.mgrURL + "/cli-json/add_sta")
def set_command_param(self, command_name, param_name, param_value):
# we have to check what the param name is
if (command_name is None) or (command_name == ""):
return
if (param_name is None) or (param_name == ""):
return
if command_name not in self.COMMANDS:
super().error(f"Command name name [{command_name}] not defined in {self.COMMANDS}")
return
if command_name == "add_sta":
self.add_sta_data[param_name] = param_value
elif command_name == "set_port":
self.set_port_data[param_name] = param_value
def set_command_flag(self, command_name, param_name, value):
# we have to check what the param name is
if (command_name is None) or (command_name == ""):
return
if (param_name is None) or (param_name == ""):
return
if command_name not in self.COMMANDS:
print(f"Command name name [{command_name}] not defined in {self.COMMANDS}")
return
if command_name == "add_sta":
if (param_name not in add_sta.add_sta_flags) and (param_name not in add_sta.add_sta_modes):
print(f"Parameter name [{param_name}] not defined in add_sta.py")
if self.debug:
pprint(add_sta.add_sta_flags)
return
if (value == 1) and (param_name not in self.desired_add_sta_flags):
self.desired_add_sta_flags_flags.append(param_name)
elif value == 0:
self.desired_set_port_flags_flags.remove(param_name)
elif command_name == "set_port":
if (param_name not in set_port.set_port_current_flags) and (param_name not in set_port.set_port_cmd_flags):
print(f"Parameter name [{param_name}] not defined in set_port.py")
if self.debug:
pprint(set_port.set_port_cmd_flags)
pprint(set_port.set_port_current_flags)
pprint(set_port.set_port_interest_flags)
return
if (value == 1) and (param_name not in self.desired_set_port_flags):
self.desired_set_port_flags_flags.append(param_name)
elif value == 0:
self.desired_set_port_flags_flags.remove(param_name)
# use this for hinting station name; stations begin with 'sta', the
# stations created with a prefix '0100' indicate value 10100 + n with
# resulting substring(1,) applied; station 900 becomes 'sta1000'
def set_prefix(self, pref):
self.prefix = pref
def add_named_flags(self, desired_list, command_ref):
if desired_list is None:
raise ValueError("addNamedFlags wants a list of desired flag names")
if len(desired_list) < 1:
print("addNamedFlags: empty desired list")
return 0
if (command_ref is None) or (len(command_ref) < 1):
raise ValueError("addNamedFlags wants a maps of flag values")
result = 0
for name in desired_list:
if (name is None) or (name == ""):
continue
if name not in command_ref:
if self.debug:
pprint(command_ref)
raise ValueError("flag %s not in map" % name )
result += command_ref[name]
return result
def format_sta_name(self, num):
wd = len(self.prefix)
v = int(self.prefix, 10)
if v > 0:
num += v
template = f"sta%0{wd}d"
name = template % num
#if self.debug:
# print(f"XXXXXXXXXXX {name} XXXXXXXXXXXXXXX")
return name
# Checks for errors in initialization values and creates specified number of stations using init parameters
def build(self, resource, radio, num_stations, dry_run=False, debug=False):
# try:
# resource = resource_radio[0: resource_radio.index(".")]
# name = resource_radio[resource_radio.index(".") + 1:]
# if name.index(".") >= 0:
# radio_name = name[name.index(".")+1 : ]
# print(f"Building {num_stations} on radio {resource}.{radio_name}")
# except ValueError as e:
# print(e)
# create stations down, do set_port on them, then set stations up
self.add_sta_data["flags"] = self.add_named_flags(self.desired_add_sta_flags, add_sta.add_sta_flags)
self.add_sta_data["flags_mask"] = self.add_named_flags(self.desired_add_sta_flags_mask, add_sta.add_sta_flags)
self.add_sta_data["radio"] = radio
self.add_sta_data["resource"] = resource
self.set_port_data["current_flags"] = self.add_named_flags(self.desired_set_port_current_flags, set_port.set_port_current_flags)
self.set_port_data["interest"] = self.add_named_flags(self.desired_set_port_interest_flags, set_port.set_port_interest_flags)
add_sta_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/add_sta")
set_port_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/set_port")
sta_names = []
for num in range(num_stations):
data = {
"shelf":1,
"resource":resource,
"radio":radio_name,
"sta_name":f"sta{num:05}",
"ssid":self.ssid,
"key":self.ssid_pass,
"mode":1,
"mac":"xx:xx:xx:xx:*:xx",
"flags":
}
lf_r.addPostData(data)
json_response = lf_r.jsonPost(True)
sta_name = self.format_sta_name(num)
self.add_sta_data["sta_name"] = sta_name
self.set_port_data["port"] = sta_name
sta_names.append(sta_name)
if debug:
print(f"- 381 - {sta_name}- - - - - - - - - - - - - - - - - - ")
pprint(self.add_sta_data)
pprint(self.set_port_data)
print("- ~381 - - - - - - - - - - - - - - - - - - - ")
if dry_run:
print("dry run")
continue
add_sta_r.addPostData(self.add_sta_data)
json_response = add_sta_r.jsonPost(debug)
set_port_r.addPostData(self.set_port_data)
json_response = set_port_r.jsonPost(debug)
time.sleep(0.03)
LFUtils.waitUntilPortsAppear(resource, self.lfclient_url, sta_names)
# and set ports up
if dry_run:
return
for sta_name in sta_names:
req = LFUtils.portUpRequest(resource, sta_name, debug_on=False)
set_port_r.addPostData(req)
json_response = set_port_r.jsonPost(debug)
time.sleep(0.03)
print(f"created {num} stations")
#

View File

@@ -1,26 +1,108 @@
#!/usr/bin/env python3
import pprint
import time
from pprint import pprint
import realm
from realm import Realm
import LANforge
from LANforge import LFUtils
localrealm = Realm("localhost", 8080, True)
test = realm.Realm("http://localhost:8080")
print("** Existing Stations **")
try:
sta_list = localrealm.station_list()
print(f"\n{len(sta_list)} Station List:")
print(sta_list)
del sta_list
sta_map = localrealm.station_map()
print(f"\n{len(sta_map)} Station Map:")
print(sta_map)
del sta_map
print("\n Stations like wlan+:")
print(localrealm.find_ports_like("wlan+"))
print("\n Stations like wlan0:")
print(localrealm.find_ports_like("wlan0*"))
print("\n Stations between wlan0..wlan2:")
print(localrealm.find_ports_like("wlan[0..2]"))
except Exception as x:
pprint(x)
exit(1)
sta_list = test.station_list()
cx_list = test.cx_list()
vap_list = test.vap_list()
print("\n** Removing previous stations **")
station_map = localrealm.find_ports_like("sta+")
for eid,record in station_map.items():
pprint(eid)
# a list of these objects is not super useful unless
localrealm.removeVlanByEid(eid)
time.sleep(0.03)
# convert station map to plain list
del_sta_names = []
try:
for eid,value in station_map.items():
#print("jjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjj")
#pprint(eid)
#print("rfind: %d" % )
#print("jjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjj")
tname = eid[eid.rfind('.'):]
del_sta_names.append(tname)
except Exception as x:
localrealm.error(x)
print(f"CXs: {cx_list}\n")
print(f"Stations: {sta_list}\n")
print(f"VAPs: {vap_list}\n")
LFUtils.waitUntilPortsDisappear(resource_id=1, base_url=localrealm.lfclient_url, port_list=del_sta_names, debug=False)
print("** Creating Stations **")
profile = localrealm.new_station_profile()
profile.use_wpa2(True, "jedway-wpa2-x2048-5-1", "jedway-wpa2-x2048-5-1")
profile.set_command_flag("add_sta", "80211u_enable", 1)
profile.set_prefix("0100")
profile.build(1, "wiphy0", 5)
cxTest = realm.CXProfile()
try:
sta_list = localrealm.station_list()
print(f"{len(sta_list)} Stations:")
pprint(sta_list)
print(" Stations like sta+:")
print(localrealm.find_ports_like("wlan+"))
print(" Stations like sta0:")
print(localrealm.find_ports_like("wlan0*"))
print(" Stations between wlan0..wlan2:")
print(localrealm.find_ports_like("wlan[0..2]"))
except Exception as x:
pprint(x)
exit(1)
cxTest.add_ports("A", "lf_udp", test.find_ports_like("sta+"))
cxTest.create()
print(" - - - - TESTING - - - - - -")
exit(0)
print(test.find_ports_like("sta+"))
print("** Existing vAPs **")
try:
vap_list = localrealm.vap_list()
print(f"{len(vap_list)} VAPs:")
pprint(vap_list)
except Exception as x:
localrealm.error(x)
exit(1)
print(test.find_ports_like("sta0*"))
print("** Existing CXs **")
try:
cx_list = localrealm.cx_list()
print(f"{len(cx_list)} CXs:")
pprint(cx_list)
except Exception as x:
localrealm.error(x)
exit(1)
print(test.find_ports_like("sta[0000..0002]"))
print("** Removing previous CXs **")
print("** Creating CXs **")
try:
cxProfile = localrealm.new_cx_profile()
# set attributes of cxProfile
cxProfile.add_ports("A", "lf_udp", localrealm.find_ports_like("sta+"))
cxProfile.create()
except Exception as x:
pprint(x)
exit(1)
#

View File

@@ -67,8 +67,8 @@ class StaConnect(LFCliBase):
print("FAILED: %s did not report traffic: %s" % (name, postVal))
def run(self):
self.checkConnect()
eth1IP = self.jsonGet(self.getUpstreamUrl())
self.check_connect()
eth1IP = self.json_get(self.getUpstreamUrl())
if eth1IP is None:
print("Unable to query "+self.upstream_port+", bye")
sys.exit(1)
@@ -76,7 +76,7 @@ class StaConnect(LFCliBase):
print(f"Warning: {self.getUpstreamUrl()} lacks ip address")
url = self.getStaUrl()
response = super().jsonGet(url)
response = super().json_get(url)
if response is not None:
if response["interface"] is not None:
print("removing old station")
@@ -101,7 +101,7 @@ class StaConnect(LFCliBase):
"flags": flags # verbose, wpa2
}
print("Adding new station %s " % self.sta_name)
super().jsonPost(url, data)
super().json_post(url, data)
reqURL = "cli-json/set_port"
data = {
@@ -112,14 +112,14 @@ class StaConnect(LFCliBase):
"interest": 0x4002 # set dhcp, current flags
}
print("Configuring %s..." % self.sta_name)
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
reqURL = "cli-json/nc_show_ports"
data = {"shelf": 1,
"resource": self.resource,
"port": self.sta_name,
"probe_flags": 1}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
LFUtils.waitUntilPortsAdminUp(self.resource, self.mgr_url, [self.sta_name])
# station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
@@ -130,7 +130,7 @@ class StaConnect(LFCliBase):
while (ip == "0.0.0.0") and (duration < maxTime):
duration += 2
time.sleep(2)
station_info = super().jsonGet(f"{self.getStaUrl()}?fields=port,ip,ap")
station_info = super().json_get(f"{self.getStaUrl()}?fields=port,ip,ap")
# LFUtils.debug_printer.pprint(station_info)
if (station_info is not None) and ("interface" in station_info):
@@ -176,7 +176,7 @@ class StaConnect(LFCliBase):
"ip_port": "-1",
"min_rate": 1000000
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
reqURL = "cli-json/add_endp"
data = {
@@ -188,7 +188,7 @@ class StaConnect(LFCliBase):
"ip_port": "-1",
"min_rate": 1000000
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
# Create CX
reqURL = "cli-json/add_cx"
@@ -198,7 +198,7 @@ class StaConnect(LFCliBase):
"tx_endp": "testUDP-A",
"rx_endp": "testUDP-B",
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
# Create TCP endpoints
reqURL = "cli-json/add_endp"
@@ -211,7 +211,7 @@ class StaConnect(LFCliBase):
"ip_port": "0",
"min_rate": 1000000
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
reqURL = "cli-json/add_endp"
data = {
@@ -223,7 +223,7 @@ class StaConnect(LFCliBase):
"ip_port": "-1",
"min_rate": 1000000
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
# Create CX
reqURL = "cli-json/add_cx"
@@ -233,7 +233,7 @@ class StaConnect(LFCliBase):
"tx_endp": "testTCP-A",
"rx_endp": "testTCP-B",
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
cxNames = ["testTCP", "testUDP"]
endpNames = ["testTCP-A", "testTCP-B",
@@ -248,7 +248,7 @@ class StaConnect(LFCliBase):
"cx_name": cxNames[name],
"cx_state": "RUNNING"
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
# Refresh stats
print("\nRefresh CX stats")
@@ -258,7 +258,7 @@ class StaConnect(LFCliBase):
"test_mgr": "ALL",
"cross_connect": cxNames[name]
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
# print("Sleeping for 15 seconds")
time.sleep(15)
@@ -272,7 +272,7 @@ class StaConnect(LFCliBase):
"cx_name": cxNames[name],
"cx_state": "STOPPED"
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
# Refresh stats
print("\nRefresh CX stats")
@@ -282,7 +282,7 @@ class StaConnect(LFCliBase):
"test_mgr": "ALL",
"cross_connect": cxNames[name]
}
super().jsonPost(reqURL, data)
super().json_post(reqURL, data)
# print("Sleeping for 5 seconds")
time.sleep(5)
@@ -290,19 +290,19 @@ class StaConnect(LFCliBase):
# get data for endpoints JSON
print("Collecting Data")
try:
ptestTCPA = super().jsonGet("endp/testTCP-A?fields=tx+bytes,rx+bytes")
ptestTCPA = super().json_get("endp/testTCP-A?fields=tx+bytes,rx+bytes")
ptestTCPATX = ptestTCPA['endpoint']['tx bytes']
ptestTCPARX = ptestTCPA['endpoint']['rx bytes']
ptestTCPB = super().jsonGet("endp/testTCP-B?fields=tx+bytes,rx+bytes")
ptestTCPB = super().json_get("endp/testTCP-B?fields=tx+bytes,rx+bytes")
ptestTCPBTX = ptestTCPB['endpoint']['tx bytes']
ptestTCPBRX = ptestTCPB['endpoint']['rx bytes']
ptestUDPA = super().jsonGet("endp/testUDP-A?fields=tx+bytes,rx+bytes")
ptestUDPA = super().json_get("endp/testUDP-A?fields=tx+bytes,rx+bytes")
ptestUDPATX = ptestUDPA['endpoint']['tx bytes']
ptestUDPARX = ptestUDPA['endpoint']['rx bytes']
ptestUDPB = super().jsonGet("endp/testUDP-B?fields=tx+bytes,rx+bytes")
ptestUDPB = super().json_get("endp/testUDP-B?fields=tx+bytes,rx+bytes")
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes']
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes']
except Exception as e:
@@ -316,7 +316,7 @@ class StaConnect(LFCliBase):
"port": self.sta_name
}
self.jsonPost(reqURL, data)
self.json_post(reqURL, data)
removeCX(self.mgr_url, cxNames)
removeEndps(self.mgr_url, endpNames)