mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-10-31 18:58:01 +00:00
Merge branch 'master' of https://github.com/greearb/lanforge-scripts
This commit is contained in:
135
py-json/realm.py
135
py-json/realm.py
@@ -1,55 +1,51 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import os
|
import re
|
||||||
import sys
|
|
||||||
import time
|
import time
|
||||||
sys.path.append('py-json')
|
|
||||||
import json
|
|
||||||
import pprint
|
|
||||||
from LANforge import LFRequest
|
from LANforge import LFRequest
|
||||||
from LANforge import LFUtils
|
from LANforge import LFUtils
|
||||||
import re
|
|
||||||
|
|
||||||
|
|
||||||
class Realm:
|
class Realm:
|
||||||
|
|
||||||
def __init__(self, mgrURL="http://localhost:8080"):
|
def __init__(self, mgr_url="http://localhost:8080"):
|
||||||
self.mgrURL = mgrURL
|
self.mgrURL = mgr_url
|
||||||
|
|
||||||
def cxList(self):
|
def cx_list(self):
|
||||||
#Returns json response from webpage of all layer 3 cross connects
|
#Returns json response from webpage of all layer 3 cross connects
|
||||||
lf_r = LFRequest.LFRequest(self.mgrURL + "/cx")
|
lf_r = LFRequest.LFRequest(self.mgrURL + "/cx")
|
||||||
response = lf_r.getAsJson(True)
|
response = lf_r.getAsJson(True)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def stationList(self):
|
def station_list(self):
|
||||||
#Returns list of all stations with "sta" in their name
|
#Returns list of all stations with "sta" in their name
|
||||||
list = []
|
sta_list = []
|
||||||
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
|
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
|
||||||
response = lf_r.getAsJson(True)
|
response = lf_r.getAsJson(True)
|
||||||
for x in range(len(response['interfaces'])):
|
for x in range(len(response['interfaces'])):
|
||||||
for k,v in response['interfaces'][x].items():
|
for k,v in response['interfaces'][x].items():
|
||||||
if "sta" in v['device']:
|
if "sta" in v['device']:
|
||||||
list.append(response['interfaces'][x])
|
sta_list.append(response['interfaces'][x])
|
||||||
|
|
||||||
return list
|
return sta_list
|
||||||
|
|
||||||
def vapList(self):
|
def vap_list(self):
|
||||||
#Returns list of all VAPs with "vap" in their name
|
#Returns list of all VAPs with "vap" in their name
|
||||||
list = []
|
sta_list = []
|
||||||
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
|
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
|
||||||
response = lf_r.getAsJson(True)
|
response = lf_r.getAsJson(True)
|
||||||
|
|
||||||
for x in range(len(response['interfaces'])):
|
for x in range(len(response['interfaces'])):
|
||||||
for k,v in response['interfaces'][x].items():
|
for k,v in response['interfaces'][x].items():
|
||||||
if "vap" in v['device']:
|
if "vap" in v['device']:
|
||||||
list.append(response['interfaces'][x])
|
sta_list.append(response['interfaces'][x])
|
||||||
|
|
||||||
return list
|
return sta_list
|
||||||
|
|
||||||
|
|
||||||
def findPortsLike(self, pattern=""):
|
def find_ports_like(self, pattern=""):
|
||||||
#Searches for ports that match a given pattern and returns a list of names
|
#Searches for ports that match a given pattern and returns a list of names
|
||||||
list = []
|
device_name_list = []
|
||||||
# alias is possible but device is gauranteed
|
# alias is possible but device is gauranteed
|
||||||
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
|
lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type")
|
||||||
response = lf_r.getAsJson(True)
|
response = lf_r.getAsJson(True)
|
||||||
@@ -57,144 +53,143 @@ class Realm:
|
|||||||
for x in range(len(response['interfaces'])):
|
for x in range(len(response['interfaces'])):
|
||||||
for k,v in response['interfaces'][x].items():
|
for k,v in response['interfaces'][x].items():
|
||||||
if v['device'] != "NA":
|
if v['device'] != "NA":
|
||||||
list.append(v['device'])
|
device_name_list.append(v['device'])
|
||||||
|
|
||||||
matchedList = []
|
matched_list = []
|
||||||
|
|
||||||
prefix = ""
|
prefix = ""
|
||||||
for portname in list:
|
for port_name in device_name_list:
|
||||||
try:
|
try:
|
||||||
if (pattern.index("+") > 0):
|
if pattern.index("+") > 0:
|
||||||
match = re.search(r"^([^+]+)[+]$", pattern)
|
match = re.search(r"^([^+]+)[+]$", pattern)
|
||||||
if match.group(1):
|
if match.group(1):
|
||||||
#print("name:", portname, " Group 1: ",match.group(1))
|
#print("name:", portname, " Group 1: ",match.group(1))
|
||||||
prefix = match.group(1)
|
prefix = match.group(1)
|
||||||
if (portname.index(prefix) == 0):
|
if port_name.index(prefix) == 0:
|
||||||
matchedList.append(portname)
|
matched_list.append(port_name)
|
||||||
|
|
||||||
elif (pattern.index("*") > 0):
|
elif pattern.index("*") > 0:
|
||||||
match = re.search(r"^([^\*]+)[\*]$", pattern)
|
match = re.search(r"^([^\*]+)[\*]$", pattern)
|
||||||
if match.group(1):
|
if match.group(1):
|
||||||
prefix = match.group(1)
|
prefix = match.group(1)
|
||||||
#print("group 1: ",prefix)
|
#print("group 1: ",prefix)
|
||||||
if (portname.index(prefix) == 0):
|
if port_name.index(prefix) == 0:
|
||||||
matchedList.append(portname)
|
matched_list.append(port_name)
|
||||||
|
|
||||||
elif (pattern.index("[") > 0):
|
elif pattern.index("[") > 0:
|
||||||
match = re.search(r"^([^\[]+)\[(\d+)\.\.(\d+)\]$", pattern)
|
match = re.search(r"^([^\[]+)\[(\d+)\.\.(\d+)\]$", pattern)
|
||||||
if match.group(0):
|
if match.group(0):
|
||||||
#print("[group1]: ", match.group(1))
|
#print("[group1]: ", match.group(1))
|
||||||
prefix = match.group(1)
|
prefix = match.group(1)
|
||||||
if (portname.index(prefix)):
|
if (port_name.index(prefix)):
|
||||||
matchedList.append(portname) # wrong but better
|
matched_list.append(port_name) # wrong but better
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
print(e)
|
print(e)
|
||||||
return matchedList
|
return matched_list
|
||||||
|
|
||||||
class CXProfile:
|
class CXProfile:
|
||||||
|
|
||||||
def __init__(self, mgrURL="http://localhost:8080"):
|
def __init__(self, mgr_url="http://localhost:8080"):
|
||||||
self.mgrURL = mgrURL
|
self.mgr_url = mgr_url
|
||||||
self.postData = []
|
self.post_data = []
|
||||||
|
|
||||||
def addPorts(self, side, endpType, ports=[]):
|
def add_ports(self, side, endp_type, ports=[]):
|
||||||
#Adds post data for a cross-connect between eth1 and specified list of ports, appends to array
|
#Adds post data for a cross-connect between eth1 and specified list of ports, appends to array
|
||||||
side = side.upper()
|
side = side.upper()
|
||||||
endpSideA = {
|
endp_side_a = {
|
||||||
"alias":"",
|
"alias":"",
|
||||||
"shelf":1,
|
"shelf":1,
|
||||||
"resource":1,
|
"resource":1,
|
||||||
"port":"",
|
"port":"",
|
||||||
"type":endpType,
|
"type":endp_type,
|
||||||
"min_rate":0,
|
"min_rate":0,
|
||||||
"max_rate":0,
|
"max_rate":0,
|
||||||
"min_pkt":-1,
|
"min_pkt":-1,
|
||||||
"max_pkt":0
|
"max_pkt":0
|
||||||
}
|
}
|
||||||
|
|
||||||
endpSideB = {
|
endp_side_b = {
|
||||||
"alias":"",
|
"alias":"",
|
||||||
"shelf":1,
|
"shelf":1,
|
||||||
"resource":1,
|
"resource":1,
|
||||||
"port":"",
|
"port":"",
|
||||||
"type":endpType,
|
"type":endp_type,
|
||||||
"min_rate":0,
|
"min_rate":0,
|
||||||
"max_rate":0,
|
"max_rate":0,
|
||||||
"min_pkt":-1,
|
"min_pkt":-1,
|
||||||
"max_pkt":0
|
"max_pkt":0
|
||||||
}
|
}
|
||||||
|
|
||||||
for portName in ports:
|
for port_name in ports:
|
||||||
if side == "A":
|
if side == "A":
|
||||||
endpSideA["alias"] = portName+"CX-A"
|
endp_side_a["alias"] = port_name+"CX-A"
|
||||||
endpSideA["port"] = portName
|
endp_side_a["port"] = port_name
|
||||||
endpSideB["alias"] = portName+"CX-B"
|
endp_side_b["alias"] = port_name+"CX-B"
|
||||||
endpSideB["port"] = "eth1"
|
endp_side_b["port"] = "eth1"
|
||||||
elif side == "B":
|
elif side == "B":
|
||||||
endpSideA["alias"] = portName+"CX-A"
|
endp_side_a["alias"] = port_name+"CX-A"
|
||||||
endpSideA["port"] = "eth1"
|
endp_side_a["port"] = "eth1"
|
||||||
endpSideB["alias"] = portName+"CX-B"
|
endp_side_b["alias"] = port_name+"CX-B"
|
||||||
endpSideB["port"] = portName
|
endp_side_b["port"] = port_name
|
||||||
|
|
||||||
lf_r = LFRequest.LFRequest(self.mgrURL + "/cli-json/add_endp")
|
lf_r = LFRequest.LFRequest(self.mgr_url + "/cli-json/add_endp")
|
||||||
lf_r.addPostData(endpSideA)
|
lf_r.addPostData(endp_side_a)
|
||||||
json_response = lf_r.jsonPost(True)
|
json_response = lf_r.jsonPost(True)
|
||||||
lf_r.addPostData(endpSideB)
|
lf_r.addPostData(endp_side_b)
|
||||||
json_response = lf_r.jsonPost(True)
|
json_response = lf_r.jsonPost(True)
|
||||||
#LFUtils.debug_printer.pprint(json_response)
|
#LFUtils.debug_printer.pprint(json_response)
|
||||||
time.sleep(.5)
|
time.sleep(.5)
|
||||||
|
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"alias":portName+"CX",
|
"alias":port_name+"CX",
|
||||||
"test_mgr":"default_tm",
|
"test_mgr":"default_tm",
|
||||||
"tx_endp":portName + "CX-A",
|
"tx_endp":port_name + "CX-A",
|
||||||
"rx_endp":portName + "CX-B"
|
"rx_endp":port_name + "CX-B"
|
||||||
}
|
}
|
||||||
|
|
||||||
self.postData.append(data)
|
self.post_data.append(data)
|
||||||
|
|
||||||
def create(self, sleepTime=.5):
|
def create(self, sleep_time=.5):
|
||||||
#Creates cross-connect for each port specified in the addPorts function
|
#Creates cross-connect for each port specified in the addPorts function
|
||||||
for data in self.postData:
|
for data in self.post_data:
|
||||||
lf_r = LFRequest.LFRequest(self.mgrURL + "/cli-json/add_cx")
|
lf_r = LFRequest.LFRequest(self.mgr_url + "/cli-json/add_cx")
|
||||||
lf_r.addPostData(data)
|
lf_r.addPostData(data)
|
||||||
json_response = lf_r.jsonPost(True)
|
json_response = lf_r.jsonPost(True)
|
||||||
#LFUtils.debug_printer.pprint(json_response)
|
#LFUtils.debug_printer.pprint(json_response)
|
||||||
time.sleep(sleepTime)
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
|
|
||||||
class StationProfile:
|
class StationProfile:
|
||||||
|
|
||||||
def __init__(self, mgrURL="localhost:8080", ssid="NA", ssidPass="NA", security="open", startID="", mode=0, up=True, dhcp=True):
|
def __init__(self, mgr_url="localhost:8080", ssid="NA", ssid_pass="NA", security="open", start_id="", mode=0, up=True, dhcp=True):
|
||||||
self.mgrURL = mgrURL
|
self.mgrURL = mgr_url
|
||||||
self.ssid = ssid
|
self.ssid = ssid
|
||||||
self.ssidPass = ssidPass
|
self.ssid_pass = ssid_pass
|
||||||
self.mode = mode
|
self.mode = mode
|
||||||
self.up = up
|
self.up = up
|
||||||
self.dhcp = dhcp
|
self.dhcp = dhcp
|
||||||
self.security = security
|
self.security = security
|
||||||
|
|
||||||
def build(self, resourceRadio, numStations):
|
def build(self, resource_radio, num_stations):
|
||||||
#Checks for errors in initialization values and creates specified number of stations using init parameters
|
#Checks for errors in initialization values and creates specified number of stations using init parameters
|
||||||
try:
|
try:
|
||||||
resource = port_name[0 : resourceRadio.index(".")]
|
resource = port_name[0: resource_radio.index(".")]
|
||||||
name = port_name[resourceRadio.index(".")+1 : ]
|
name = port_name[resource_radio.index(".") + 1:]
|
||||||
if (name.index(".") >= 0):
|
if name.index(".") >= 0:
|
||||||
name = name[name.index(".")+1 : ]
|
name = name[name.index(".")+1 : ]
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
print(e)
|
print(e)
|
||||||
|
|
||||||
lf_r = LFRequest.LFRequest(self.mgrURL + "/cli-json/add_sta")
|
lf_r = LFRequest.LFRequest(self.mgrURL + "/cli-json/add_sta")
|
||||||
flags = 0
|
for num in range(num_stations):
|
||||||
for num in range(numStations):
|
|
||||||
data = {
|
data = {
|
||||||
"shelf":1,
|
"shelf":1,
|
||||||
"resource":1,
|
"resource":1,
|
||||||
"radio":radio,
|
"radio":radio,
|
||||||
"sta_name":f"sta{num:05}",
|
"sta_name":f"sta{num:05}",
|
||||||
"ssid":self.ssid,
|
"ssid":self.ssid,
|
||||||
"key":self.ssidPass,
|
"key":self.ssid_pass,
|
||||||
"mode":1,
|
"mode":1,
|
||||||
"mac":"xx:xx:xx:xx:*:xx",
|
"mac":"xx:xx:xx:xx:*:xx",
|
||||||
"flags":
|
"flags":
|
||||||
|
|||||||
@@ -1,26 +1,26 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import Realm
|
import realm
|
||||||
|
|
||||||
|
|
||||||
test = Realm.Realm("http://localhost:8080")
|
test = realm.Realm("http://localhost:8080")
|
||||||
|
|
||||||
staList = test.stationList()
|
sta_list = test.station_list()
|
||||||
cxList = test.cxList()
|
cx_list = test.cx_list()
|
||||||
vapList = test.vapList()
|
vap_list = test.vap_list()
|
||||||
|
|
||||||
|
|
||||||
print(f"CXs: {cxList}\n")
|
print(f"CXs: {cx_list}\n")
|
||||||
print(f"Stations: {staList}\n")
|
print(f"Stations: {sta_list}\n")
|
||||||
print(f"VAPs: {vapList}\n")
|
print(f"VAPs: {vap_list}\n")
|
||||||
|
|
||||||
cxTest = Realm.CXProfile()
|
cxTest = realm.CXProfile()
|
||||||
|
|
||||||
cxTest.addPorts("A", "lf_udp", test.findPortsLike("sta+"))
|
cxTest.add_ports("A", "lf_udp", test.find_ports_like("sta+"))
|
||||||
cxTest.create()
|
cxTest.create()
|
||||||
|
|
||||||
print(test.findPortsLike("sta+"))
|
print(test.find_ports_like("sta+"))
|
||||||
|
|
||||||
print(test.findPortsLike("sta0*"))
|
print(test.find_ports_like("sta0*"))
|
||||||
|
|
||||||
print(test.findPortsLike("sta[0000..0002]"))
|
print(test.find_ports_like("sta[0000..0002]"))
|
||||||
|
|||||||
Reference in New Issue
Block a user