mirror of
				https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
				synced 2025-10-30 18:27:53 +00:00 
			
		
		
		
	Merge remote-tracking branch 'origin/master'
This commit is contained in:
		
							
								
								
									
										206
									
								
								Realm.py
									
									
									
									
									
								
							
							
						
						
									
										206
									
								
								Realm.py
									
									
									
									
									
								
							| @@ -1,206 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import os | ||||
| import sys | ||||
| import time | ||||
| sys.path.append('py-json') | ||||
| import json | ||||
| import pprint | ||||
| from LANforge import LFRequest | ||||
| from LANforge import LFUtils | ||||
| import re | ||||
|  | ||||
| def jsonPost(mgrURL, reqURL, data, debug=False): | ||||
|    lf_r = LFRequest.LFRequest(mgrURL + reqURL) | ||||
|    lf_r.addPostData(data) | ||||
|    json_response = lf_r.jsonPost(debug) | ||||
|    LFUtils.debug_printer.pprint(json_response) | ||||
|    sys.exit(1) | ||||
|  | ||||
|  | ||||
| class Realm: | ||||
|  | ||||
|    def __init__(self, mgrURL="http://localhost:8080"): | ||||
|       self.mgrURL = mgrURL | ||||
|  | ||||
|    def cxList(self): | ||||
|       #Returns json response from webpage of all layer 3 cross connects | ||||
|       lf_r = LFRequest.LFRequest(self.mgrURL + "/cx") | ||||
|       response = lf_r.getAsJson(True) | ||||
|       return response | ||||
|  | ||||
|    def stationList(self): | ||||
|       #Returns list of all stations with "sta" in their name | ||||
|       list = [] | ||||
|       lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type") | ||||
|       response = lf_r.getAsJson(True) | ||||
|       for x in range(len(response['interfaces'])): | ||||
|          for k,v in response['interfaces'][x].items(): | ||||
|             if "sta" in v['device']: | ||||
|                list.append(response['interfaces'][x]) | ||||
|  | ||||
|       return list | ||||
|  | ||||
|    def vapList(self): | ||||
|       #Returns list of all VAPs with "vap" in their name | ||||
|       list = [] | ||||
|       lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type") | ||||
|       response = lf_r.getAsJson(True) | ||||
|  | ||||
|       for x in range(len(response['interfaces'])): | ||||
|          for k,v in response['interfaces'][x].items(): | ||||
|             if "vap" in v['device']: | ||||
|                list.append(response['interfaces'][x]) | ||||
|  | ||||
|       return list | ||||
|  | ||||
|  | ||||
|    def findPortsLike(self, pattern=""): | ||||
|       #Searches for ports that match a given pattern and returns a list of names | ||||
|       list = [] | ||||
|       # alias is possible but device is gauranteed | ||||
|       lf_r = LFRequest.LFRequest(self.mgrURL + "/port/list?fields=_links,alias,device,port+type") | ||||
|       response = lf_r.getAsJson(True) | ||||
|       #print(response) | ||||
|       for x in range(len(response['interfaces'])): | ||||
|          for k,v in response['interfaces'][x].items(): | ||||
|             if v['device'] != "NA": | ||||
|                list.append(v['device']) | ||||
|  | ||||
|       matchedList = [] | ||||
|  | ||||
|       prefix = "" | ||||
|       for portname in list: | ||||
|          try: | ||||
|             if (pattern.index("+") > 0): | ||||
|  | ||||
|                   match = re.search(r"^([^+]+)[+]$", pattern) | ||||
|                   if match.group(1): | ||||
|                      #print("name:", portname, " Group 1: ",match.group(1)) | ||||
|                      prefix = match.group(1) | ||||
|                      if (portname.index(prefix) == 0): | ||||
|                         matchedList.append(portname) | ||||
|  | ||||
|             elif (pattern.index("*") > 0): | ||||
|                   match = re.search(r"^([^\*]+)[\*]$", pattern) | ||||
|                   if match.group(1): | ||||
|                      prefix = match.group(1) | ||||
|                      #print("group 1: ",prefix) | ||||
|                      if (portname.index(prefix) == 0): | ||||
|                         matchedList.append(portname) | ||||
|  | ||||
|             elif (pattern.index("[") > 0): | ||||
|                   match = re.search(r"^([^\[]+)\[(\d+)\.\.(\d+)\]$", pattern) | ||||
|                   if match.group(0): | ||||
|                      #print("[group1]: ", match.group(1)) | ||||
|                      prefix = match.group(1) | ||||
|                      if (portname.index(prefix)): | ||||
|                         matchedList.append(portname) # wrong but better | ||||
|          except ValueError as e: | ||||
|             print(e) | ||||
|       return matchedList | ||||
|  | ||||
| class CXProfile: | ||||
|  | ||||
|    def __init__(self, mgrURL="http://localhost:8080"): | ||||
|       self.mgrURL = mgrURL | ||||
|       self.postData = [] | ||||
|  | ||||
|    def addPorts(self, side, endpType, ports=[]): | ||||
|    #Adds post data for a cross-connect between eth1 and specified list of ports, appends to array | ||||
|       side = side.upper() | ||||
|       endpSideA = { | ||||
|       "alias":"", | ||||
|       "shelf":1, | ||||
|       "resource":1, | ||||
|       "port":"", | ||||
|       "type":endpType, | ||||
|       "min_rate":0, | ||||
|       "max_rate":0, | ||||
|       "min_pkt":-1, | ||||
|       "max_pkt":0 | ||||
|        } | ||||
|  | ||||
|       endpSideB = { | ||||
|       "alias":"", | ||||
|       "shelf":1, | ||||
|       "resource":1, | ||||
|       "port":"", | ||||
|       "type":endpType, | ||||
|       "min_rate":0, | ||||
|       "max_rate":0, | ||||
|       "min_pkt":-1, | ||||
|       "max_pkt":0 | ||||
|       } | ||||
|  | ||||
|       for portName in ports: | ||||
|          if side == "A": | ||||
|             endpSideA["alias"] = portName+"CX-A" | ||||
|             endpSideA["port"] = portName | ||||
|             endpSideB["alias"] = portName+"CX-B" | ||||
|             endpSideB["port"] = "eth1" | ||||
|          elif side == "B": | ||||
|             endpSideA["alias"] = portName+"CX-A" | ||||
|             endpSideA["port"] = "eth1" | ||||
|             endpSideB["alias"] = portName+"CX-B" | ||||
|             endpSideB["port"] = portName | ||||
|  | ||||
|          lf_r = LFRequest.LFRequest(self.mgrURL + "/cli-json/add_endp") | ||||
|          lf_r.addPostData(endpSideA) | ||||
|          json_response = lf_r.jsonPost(True) | ||||
|          lf_r.addPostData(endpSideB) | ||||
|          json_response = lf_r.jsonPost(True) | ||||
|          #LFUtils.debug_printer.pprint(json_response) | ||||
|          time.sleep(.5) | ||||
|  | ||||
|  | ||||
|          data = { | ||||
|          "alias":portName+"CX", | ||||
|          "test_mgr":"default_tm", | ||||
|          "tx_endp":portName + "CX-A", | ||||
|          "rx_endp":portName + "CX-B" | ||||
|          } | ||||
|  | ||||
|          self.postData.append(data) | ||||
|  | ||||
|    def create(self, sleepTime=.5): | ||||
|    #Creates cross-connect for each port specified in the addPorts function | ||||
|       for data in self.postData: | ||||
|          lf_r = LFRequest.LFRequest(self.mgrURL + "/cli-json/add_cx") | ||||
|          lf_r.addPostData(data) | ||||
|          json_response = lf_r.jsonPost(True) | ||||
|          LFUtils.debug_printer.pprint(json_response) | ||||
|          time.sleep(sleepTime) | ||||
|  | ||||
|  | ||||
| class StationProfile: | ||||
|  | ||||
|    def __init__(self, ssid="NA", ssidPass="NA", security="open", startID="", mode=0, up=True, dhcp=True): | ||||
|       self.ssid = ssid | ||||
|       self.ssidPass = ssidPass | ||||
|       self.mode = mode | ||||
|       self.up = up | ||||
|       self.dhcp = dhcp | ||||
|       self.security = security | ||||
|     | ||||
|    def build(self, resourceRadio, numStations): | ||||
|    #Checks for errors in initialization values and creates specified number of stations using init parameters | ||||
|    try: | ||||
|       resource = port_name[0 : resourceRadio.index(".")] | ||||
|       name = port_name[resourceRadio.index(".")+1 : ] | ||||
|          if (name.index(".") >= 0): | ||||
|             name = name[name.index(".")+1 : ] | ||||
|    except ValueError as e: | ||||
|       print(e) | ||||
|  | ||||
|    for num in range(numStations):  | ||||
|       data = { | ||||
|       "shelf":1, | ||||
|       "resource":1, | ||||
|       "radio":radio, | ||||
|       "sta_name":f"sta{num:05}", | ||||
|       "ssid":self.ssid, | ||||
|       "key":self.ssidPass, | ||||
|       "mode":1, | ||||
|       "mac":"xx:xx:xx:xx:*:xx", | ||||
|       "flags": | ||||
|       } | ||||
							
								
								
									
										26
									
								
								RealmTest.py
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								RealmTest.py
									
									
									
									
									
								
							| @@ -1,26 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| import Realm | ||||
|  | ||||
|  | ||||
| test = Realm.Realm("http://localhost:8080") | ||||
|  | ||||
| staList = test.stationList() | ||||
| cxList = test.cxList() | ||||
| vapList = test.vapList() | ||||
|  | ||||
|  | ||||
| print(f"CXs: {cxList}\n") | ||||
| print(f"Stations: {staList}\n") | ||||
| print(f"VAPs: {vapList}\n") | ||||
|  | ||||
| cxTest = Realm.CXProfile() | ||||
|  | ||||
| cxTest.addPorts("A", "lf_udp", test.findPortsLike("sta+")) | ||||
| cxTest.create() | ||||
|  | ||||
| print(test.findPortsLike("sta+")) | ||||
|  | ||||
| print(test.findPortsLike("sta0*")) | ||||
|  | ||||
| print(test.findPortsLike("sta[0000..0002]")) | ||||
| @@ -51,7 +51,7 @@ dirFiles = [] | ||||
|  | ||||
| for file in dir: | ||||
| 	if ver in file: | ||||
| 		fileTime = datetime.datetime.strptime(time.ctime(os.stat(file).st_ctime), "%a %B %d %H:%M:%S %Y") # Fri May  8 08:31:43 2020 | ||||
| 		fileTime = datetime.datetime.strptime(time.ctime(os.stat(file).st_ctime), "%a %b %d %H:%M:%S %Y") # Fri May  8 08:31:43 2020 | ||||
| 		dirFiles.append({'filename':file[25:], 'timestamp':fileTime}) | ||||
|  | ||||
| if len(dirFiles) == 0: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Logan Lipke
					Logan Lipke