mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
LFUtils.py: lots of debug additions, whitespace/formatting
This commit is contained in:
@@ -2,22 +2,20 @@
|
|||||||
# Define useful common methods -
|
# Define useful common methods -
|
||||||
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
import sys
|
import sys
|
||||||
import os
|
|
||||||
if sys.version_info[0] != 3:
|
if sys.version_info[0] != 3:
|
||||||
print("This script requires Python 3")
|
print("This script requires Python 3")
|
||||||
exit()
|
exit()
|
||||||
|
import os
|
||||||
import pprint
|
import pprint
|
||||||
import json
|
|
||||||
import time
|
import time
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from random import seed
|
from random import seed
|
||||||
|
|
||||||
seed(int(round(time.time() * 1000)))
|
seed(int(round(time.time() * 1000)))
|
||||||
from random import randint
|
from random import randint
|
||||||
from LANforge import LFRequest
|
from LANforge import LFRequest
|
||||||
|
|
||||||
debug_printer = pprint.PrettyPrinter(indent=2)
|
debug_printer = pprint.PrettyPrinter(indent=2)
|
||||||
#global base_url # = "http://localhost:8080"
|
|
||||||
|
|
||||||
NA = "NA" # used to indicate parameter to skip
|
NA = "NA" # used to indicate parameter to skip
|
||||||
ADD_STA_FLAGS_DOWN_WPA2 = 68719477760
|
ADD_STA_FLAGS_DOWN_WPA2 = 68719477760
|
||||||
@@ -47,9 +45,12 @@ class PortEID:
|
|||||||
resource = json_s['resource']
|
resource = json_s['resource']
|
||||||
port_id = json_s['id']
|
port_id = json_s['id']
|
||||||
port_name = json_s['name']
|
port_name = json_s['name']
|
||||||
|
|
||||||
|
|
||||||
# end class PortEID
|
# end class PortEID
|
||||||
|
|
||||||
def staNewDownStaRequest(sta_name, resource_id=1, radio="wiphy0", flags=ADD_STA_FLAGS_DOWN_WPA2, ssid="", passphrase="", debug_on=False):
|
def staNewDownStaRequest(sta_name, resource_id=1, radio="wiphy0", ssid="", passphrase="",
|
||||||
|
debug_on=False):
|
||||||
"""
|
"""
|
||||||
For use with add_sta. If you don't want to generate mac addresses via patterns (xx:xx:xx:xx:81:*)
|
For use with add_sta. If you don't want to generate mac addresses via patterns (xx:xx:xx:xx:81:*)
|
||||||
you can generate octets using random_hex.pop(0)[2:] and gen_mac(parent_radio_mac, octet)
|
you can generate octets using random_hex.pop(0)[2:] and gen_mac(parent_radio_mac, octet)
|
||||||
@@ -137,6 +138,7 @@ def portUpRequest(resource_id, port_name, debug_on=False):
|
|||||||
debug_printer.pprint(data)
|
debug_printer.pprint(data)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def portDownRequest(resource_id, port_name, debug_on=False):
|
def portDownRequest(resource_id, port_name, debug_on=False):
|
||||||
"""
|
"""
|
||||||
Does not change the use_dhcp flag
|
Does not change the use_dhcp flag
|
||||||
@@ -159,7 +161,8 @@ def portDownRequest(resource_id, port_name, debug_on=False):
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def generateMac(parent_mac, random_octet):
|
def generateMac(parent_mac, random_octet, debug=False):
|
||||||
|
if debug:
|
||||||
print("************ random_octet: %s **************" % (random_octet))
|
print("************ random_octet: %s **************" % (random_octet))
|
||||||
my_oct = random_octet
|
my_oct = random_octet
|
||||||
if (len(random_octet) == 4):
|
if (len(random_octet) == 4):
|
||||||
@@ -168,6 +171,7 @@ def generateMac(parent_mac, random_octet):
|
|||||||
octets[4] = my_oct
|
octets[4] = my_oct
|
||||||
return ":".join(octets)
|
return ":".join(octets)
|
||||||
|
|
||||||
|
|
||||||
# this produces a named series similar to "sta000, sta001, sta002...sta0(end_id)"
|
# this produces a named series similar to "sta000, sta001, sta002...sta0(end_id)"
|
||||||
# the padding_number is added to the start and end numbers and the resulting sum
|
# the padding_number is added to the start and end numbers and the resulting sum
|
||||||
# has the first digit trimmed, so f(0, 1, 10000) => {0000, 0001}
|
# has the first digit trimmed, so f(0, 1, 10000) => {0000, 0001}
|
||||||
@@ -188,15 +192,9 @@ def generateRandomHex():
|
|||||||
random_hex.append(hex(random_dec))
|
random_hex.append(hex(random_dec))
|
||||||
return random_hex
|
return random_hex
|
||||||
|
|
||||||
# return reverse map of aliases to port records
|
|
||||||
def portAliasesInList(json_list):
|
|
||||||
if len(json_list) < 1:
|
|
||||||
raise Exception("empty list")
|
|
||||||
reverse_map = {}
|
|
||||||
json_interfaces = json_list
|
|
||||||
if 'interfaces' in json_list:
|
|
||||||
json_interfaces = json_list['interfaces']
|
|
||||||
|
|
||||||
|
# return reverse map of aliases to port records
|
||||||
|
#
|
||||||
# expect nested records, which is an artifact of some ORM
|
# expect nested records, which is an artifact of some ORM
|
||||||
# that other customers expect:
|
# that other customers expect:
|
||||||
# [
|
# [
|
||||||
@@ -207,52 +205,69 @@ def portAliasesInList(json_list):
|
|||||||
# },
|
# },
|
||||||
# { ... }
|
# { ... }
|
||||||
|
|
||||||
|
def portListToAliasMap(json_list):
|
||||||
|
reverse_map = {}
|
||||||
|
if len(json_list) < 1:
|
||||||
|
return reverse_map
|
||||||
|
|
||||||
|
json_interfaces = json_list
|
||||||
|
if 'interfaces' in json_list:
|
||||||
|
json_interfaces = json_list['interfaces']
|
||||||
|
|
||||||
for record in json_interfaces:
|
for record in json_interfaces:
|
||||||
if len(record.keys()) < 1:
|
if len(record.keys()) < 1:
|
||||||
continue
|
continue
|
||||||
record_keys = record.keys()
|
record_keys = record.keys()
|
||||||
#debug_printer.pprint(record_keys)
|
|
||||||
k2 = ""
|
k2 = ""
|
||||||
|
# we expect one key in record keys, but we can't expect [0] to be populated
|
||||||
|
json_entry = None
|
||||||
for k in record_keys:
|
for k in record_keys:
|
||||||
k2 = k
|
k2 = k
|
||||||
if k2.__contains__("Unknown"):
|
json_entry = record[k]
|
||||||
#debug_printer.pprint("skipping: "+k2)
|
# skip uninitialized port records
|
||||||
|
if k2.find("Unknown") >= 0:
|
||||||
continue
|
continue
|
||||||
port_json = record[k2]
|
port_json = record[k2]
|
||||||
reverse_map[port_json['alias']] = record
|
reverse_map[k2] = json_entry
|
||||||
#print("resulting map: ")
|
|
||||||
#debug_printer.pprint(reverse_map)
|
|
||||||
return reverse_map
|
return reverse_map
|
||||||
|
|
||||||
|
|
||||||
def findPortEids(resource_id=1, base_url="http://localhost:8080", port_names=()):
|
def findPortEids(resource_id=1, base_url="http://localhost:8080", port_names=(), debug=False):
|
||||||
port_eids = []
|
port_eids = []
|
||||||
if len(port_names) < 0:
|
if len(port_names) < 0:
|
||||||
return []
|
return []
|
||||||
|
port_url = "/port/1"
|
||||||
|
if base_url.endswith('/'):
|
||||||
|
port_url = port_url[1:]
|
||||||
for port_name in port_names:
|
for port_name in port_names:
|
||||||
url = "/port/1/%s/%s"%(resource_id,port_name)
|
url = f"{port_url}/{resource_id}/{port_name}"
|
||||||
lf_r = LFRequest.LFRequest(url)
|
lf_r = LFRequest.LFRequest(url)
|
||||||
try:
|
try:
|
||||||
response = lf_r.getAsJson()
|
response = lf_r.getAsJson(debug)
|
||||||
if response == None:
|
if response == None:
|
||||||
continue
|
continue
|
||||||
port_eids.append(PortEID(response))
|
port_eids.append(PortEID(response))
|
||||||
except:
|
except:
|
||||||
print("Not found: " + port_name)
|
print("Not found: " + port_name)
|
||||||
return None
|
return port_eids
|
||||||
|
|
||||||
|
|
||||||
def waitUntilPortsAdminDown(resource_id=1, base_url="http://localhost:8080", port_list=()):
|
def waitUntilPortsAdminDown(resource_id=1, base_url="http://localhost:8080", port_list=()):
|
||||||
print("waitUntilPortsAdminDown")
|
print("waitUntilPortsAdminDown")
|
||||||
up_stations = port_list.copy()
|
up_stations = port_list.copy()
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
port_url = "/port/1"
|
||||||
|
if base_url.endswith('/'):
|
||||||
|
port_url = port_url[1:]
|
||||||
while len(up_stations) > 0:
|
while len(up_stations) > 0:
|
||||||
up_stations = []
|
up_stations = []
|
||||||
for port_name in port_list:
|
for port_name in port_list:
|
||||||
url = base_url+"/port/1/%s/%s?fields=device,down" % (resource_id, port_name)
|
url = f"{base_url}{port_url}/{resource_id}/{port_name}?fields=device,down"
|
||||||
lf_r = LFRequest.LFRequest(url)
|
lf_r = LFRequest.LFRequest(url)
|
||||||
json_response = lf_r.getAsJson(show_error=False)
|
json_response = lf_r.getAsJson(show_error=False)
|
||||||
if json_response == None:
|
if json_response == None:
|
||||||
print("port %s disappeared"%port_name)
|
print(f"port {port_name} disappeared")
|
||||||
continue
|
continue
|
||||||
if "interface" in json_response:
|
if "interface" in json_response:
|
||||||
json_response = json_response['interface']
|
json_response = json_response['interface']
|
||||||
@@ -261,14 +276,19 @@ def waitUntilPortsAdminDown(resource_id=1, base_url="http://localhost:8080", por
|
|||||||
sleep(1)
|
sleep(1)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def waitUntilPortsAdminUp(resource_id=1, base_url="http://localhost:8080", port_list=()):
|
def waitUntilPortsAdminUp(resource_id=1, base_url="http://localhost:8080", port_list=()):
|
||||||
print("waitUntilPortsAdminUp")
|
print("waitUntilPortsAdminUp")
|
||||||
down_stations = port_list.copy()
|
down_stations = port_list.copy()
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
port_url = "/port/1"
|
||||||
|
if base_url.endswith('/'):
|
||||||
|
port_url = port_url[1:]
|
||||||
|
#url = /%s/%s?fields=device,down" % (resource_id, port_name)
|
||||||
while len(down_stations) > 0:
|
while len(down_stations) > 0:
|
||||||
down_stations = []
|
down_stations = []
|
||||||
for port_name in port_list:
|
for port_name in port_list:
|
||||||
url = base_url+"/port/1/%s/%s?fields=device,down" % (resource_id, port_name)
|
url = f"{base_url}{port_url}/{resource_id}/{port_name}?fields=device,down"
|
||||||
lf_r = LFRequest.LFRequest(url)
|
lf_r = LFRequest.LFRequest(url)
|
||||||
json_response = lf_r.getAsJson(show_error=False)
|
json_response = lf_r.getAsJson(show_error=False)
|
||||||
if json_response == None:
|
if json_response == None:
|
||||||
@@ -282,78 +302,103 @@ def waitUntilPortsAdminUp(resource_id=1, base_url="http://localhost:8080", port_
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def waitUntilPortsDisappear(resource_id=1, base_url="http://localhost:8080", port_list=()):
|
def waitUntilPortsDisappear(resource_id=1, base_url="http://localhost:8080", port_list=(), debug=False):
|
||||||
|
if (debug):
|
||||||
print("waitUntilPortsDisappear")
|
print("waitUntilPortsDisappear")
|
||||||
|
url = "/port/1"
|
||||||
|
if base_url.endswith('/'):
|
||||||
|
url = url[1:]
|
||||||
found_stations = port_list.copy()
|
found_stations = port_list.copy()
|
||||||
sleep(1)
|
sleep(1)
|
||||||
while len(found_stations) > 0:
|
while len(found_stations) > 0:
|
||||||
found_stations = []
|
found_stations = []
|
||||||
for port_name in port_list:
|
|
||||||
sleep(1)
|
sleep(1)
|
||||||
url = base_url+"/port/1/%s/%s" % (resource_id, port_name)
|
for port_name in port_list:
|
||||||
lf_r = LFRequest.LFRequest(url)
|
check_url = f"{base_url}{url}/{resource_id}/{port_name}"
|
||||||
json_response = lf_r.getAsJson(show_error=False)
|
if debug:
|
||||||
|
print("checking:"+check_url)
|
||||||
|
lf_r = LFRequest.LFRequest(check_url)
|
||||||
|
json_response = lf_r.getAsJson(show_error=debug)
|
||||||
if (json_response != None):
|
if (json_response != None):
|
||||||
found_stations.append(port_name)
|
found_stations.append(port_name)
|
||||||
return None
|
return
|
||||||
|
|
||||||
|
|
||||||
def waitUntilPortsAppear(resource_id=1, base_url="http://localhost:8080", port_list=()):
|
def waitUntilPortsAppear(resource_id=1, base_url="http://localhost:8080", port_list=(), debug=False):
|
||||||
|
if debug:
|
||||||
print("waitUntilPortsAppear")
|
print("waitUntilPortsAppear")
|
||||||
found_stations = []
|
found_stations = []
|
||||||
sleep(2)
|
sleep(2)
|
||||||
|
port_url = "/port/1"
|
||||||
|
ncshow_url = "/cli-form/nc_show_ports"
|
||||||
|
if base_url.endswith('/'):
|
||||||
|
port_url = port_url[1:]
|
||||||
|
ncshow_url = ncshow_url[1:]
|
||||||
|
|
||||||
while len(found_stations) < len(port_list):
|
while len(found_stations) < len(port_list):
|
||||||
found_stations = []
|
found_stations = []
|
||||||
for port_name in port_list:
|
for port_name in port_list:
|
||||||
sleep(1)
|
sleep(1)
|
||||||
url = base_url+"/port/1/%s/%s" % (resource_id, port_name)
|
url = f"{base_url}{port_url}/{resource_id}/{port_name}"
|
||||||
lf_r = LFRequest.LFRequest(url)
|
lf_r = LFRequest.LFRequest(url)
|
||||||
json_response = lf_r.getAsJson(show_error=False)
|
json_response = lf_r.getAsJson(show_error=False)
|
||||||
if (json_response != None):
|
if (json_response != None):
|
||||||
found_stations.append(port_name)
|
found_stations.append(port_name)
|
||||||
else:
|
else:
|
||||||
lf_r = LFRequest.LFRequest(base_url+"/cli-form/nc_show_ports")
|
lf_r = LFRequest.LFRequest(f"{base_url}{ncshow_url}")
|
||||||
lf_r.addPostData({"shelf": 1, "resource": resource_id, "port": port_name, "flags": 1})
|
lf_r.addPostData({"shelf": 1, "resource": resource_id, "port": port_name, "flags": 1})
|
||||||
lf_r.formPost()
|
lf_r.formPost()
|
||||||
sleep(2)
|
sleep(2)
|
||||||
|
if debug:
|
||||||
print("These stations appeared: " + ", ".join(found_stations))
|
print("These stations appeared: " + ", ".join(found_stations))
|
||||||
return None
|
return
|
||||||
|
|
||||||
def removePort(resource, port_name, baseurl="http://localhost:8080/"):
|
|
||||||
lf_r = LFRequest.LFRequest(baseurl+"cli-json/rm_vlan")
|
def removePort(resource, port_name, baseurl="http://localhost:8080/", debug=False):
|
||||||
|
if debug:
|
||||||
|
print("removePort")
|
||||||
|
url = "/cli-json/rm_vlan"
|
||||||
|
if baseurl.endswith('/'):
|
||||||
|
url = url[1:]
|
||||||
|
lf_r = LFRequest.LFRequest(baseurl + url)
|
||||||
lf_r.addPostData({
|
lf_r.addPostData({
|
||||||
"shelf": 1,
|
"shelf": 1,
|
||||||
"resource": resource,
|
"resource": resource,
|
||||||
"port": port_name
|
"port": port_name
|
||||||
})
|
})
|
||||||
lf_r.jsonPost(False)
|
lf_r.jsonPost(debug)
|
||||||
|
|
||||||
def removeCX(mgrURL, cxNames):
|
|
||||||
|
def removeCX(baseurl, cxNames, debug=False):
|
||||||
|
url = "/cli-json/rm_cx"
|
||||||
|
if baseurl.endswith('/'):
|
||||||
|
url = url[1:]
|
||||||
for name in cxNames:
|
for name in cxNames:
|
||||||
#print(f"Removing CX {name}")
|
|
||||||
data = {
|
data = {
|
||||||
"test_mgr": "all",
|
"test_mgr": "all",
|
||||||
"cx_name": name
|
"cx_name": name
|
||||||
}
|
}
|
||||||
lf_r = LFRequest.LFRequest(mgrURL+"cli-json/rm_cx")
|
lf_r = LFRequest.LFRequest(baseurl + url)
|
||||||
lf_r.addPostData(data)
|
lf_r.addPostData(data)
|
||||||
lf_r.jsonPost()
|
lf_r.jsonPost(debug)
|
||||||
|
|
||||||
def removeEndps(mgrURL, endpNames):
|
|
||||||
|
def removeEndps(baseurl, endpNames, debug=False):
|
||||||
|
url = "/cli-json/rm_endp"
|
||||||
|
if baseurl.endswith('/'):
|
||||||
|
url = url[1:]
|
||||||
|
lf_r = LFRequest.LFRequest(baseurl + url)
|
||||||
for name in endpNames:
|
for name in endpNames:
|
||||||
#print(f"Removing endp {name}")
|
|
||||||
data = {
|
data = {
|
||||||
"endp_name": name
|
"endp_name": name
|
||||||
}
|
}
|
||||||
lf_r = LFRequest.LFRequest(mgrURL+"cli-json/rm_endp")
|
|
||||||
lf_r.addPostData(data)
|
lf_r.addPostData(data)
|
||||||
lf_r.jsonPost()
|
lf_r.jsonPost(debug)
|
||||||
|
|
||||||
|
|
||||||
def execWrap(cmd):
|
def execWrap(cmd):
|
||||||
if os.system(cmd) != 0:
|
if os.system(cmd) != 0:
|
||||||
print("\nError with " + cmd + ",bye\n")
|
print("\nError with '" + cmd + "', bye\n")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
|
|
||||||
###
|
###
|
||||||
|
|||||||
Reference in New Issue
Block a user