py-json: working example of creating and destroying stations

This commit is contained in:
Jed Reynolds
2019-10-29 16:09:56 -07:00
parent 1d963fc1b5
commit 9da3b1379d
3 changed files with 298 additions and 116 deletions

View File

@@ -1,5 +1,5 @@
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Class holds default settings for json requests -
# Class holds default settings for json requests -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
import sys
@@ -13,75 +13,73 @@ import json
class LFRequest:
Default_Base_URL = "http://localhost:8080"
requested_urls = []
post_datas = []
default_headers = {
'Accept': 'application/json'}
Default_Base_URL = "http://localhost:8080"
requested_urls = []
post_datas = []
default_headers = {
'Accept': 'application/json'}
def __init__(self, urls):
self.requested_urls.append(urls)
def __init__(self, urls):
self.requested_urls.append(urls)
# request first url on stack
def formPost(self):
responses = []
urlenc_data = ""
if ((len(self.post_datas) > 0) and (self.post_datas[0] != None)):
urlenc_data = urllib.parse.urlencode(self.post_datas.pop(0)).encode("utf-8")
print("data looks like:"+str(urlenc_data))
request = urllib.request.Request(url=self.requested_urls.pop(0),
data=urlenc_data,
headers=self.default_headers)
else:
request = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
print("No data for this jsonPost?")
# request first url on stack
def formPost(self):
responses = []
urlenc_data = ""
if ((len(self.post_datas) > 0) and (self.post_datas[0] != None)):
urlenc_data = urllib.parse.urlencode(self.post_datas.pop(0)).encode("utf-8")
#print("data looks like:" + str(urlenc_data))
request = urllib.request.Request(url=self.requested_urls.pop(0),
data=urlenc_data,
headers=self.default_headers)
else:
request = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
print("No data for this jsonPost?")
request.headers['Content-type'] = 'application/x-www-form-urlencoded'
responses.append(urllib.request.urlopen(request))
return responses[0]
request.headers['Content-type'] = 'application/x-www-form-urlencoded'
responses.append(urllib.request.urlopen(request))
return responses[0]
def jsonPost(self):
responses = []
if ((len(self.post_datas) > 0) and (self.post_datas[0] != None)):
request = urllib.request.Request(url=self.requested_urls.pop(0),
data=self.post_datas.pop(0),
headers=self.default_headers)
else:
request = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
print("No data for this jsonPost?")
def jsonPost(self):
responses = []
if ((len(self.post_datas) > 0) and (self.post_datas[0] != None)):
request = urllib.request.Request(url=self.requested_urls.pop(0),
data=json.dumps(self.post_datas.pop(0)).encode("utf-8"),
headers=self.default_headers)
else:
request = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
print("No data for this jsonPost?")
request.headers['Content-type'] = 'application/json'
responses.append(urllib.request.urlopen(request))
return responses[0]
request.headers['Content-type'] = 'application/json'
responses.append(urllib.request.urlopen(request))
return responses[0]
def get(self):
myrequest = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
myresponses = []
#print(responses[0].__dict__.keys()) is how you would see parts of response
try:
myresponses.append(urllib.request.urlopen(myrequest))
return myresponses[0]
def get(self):
myrequest = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
myresponses = []
# print(responses[0].__dict__.keys()) is how you would see parts of response
try:
myresponses.append(urllib.request.urlopen(myrequest))
return myresponses[0]
except:
print("Error: ", sys.exc_info()[0])
except:
print("Error: ", sys.exc_info()[0])
return None
return None
def getAsJson(self):
responses = []
responses.append(self.get())
if (len(responses) < 1):
return None
if ((responses[0] == None) or (responses[0].status != 200)):
print("Item not found")
return None
def getAsJson(self):
responses = []
responses.append(self.get())
if (len(responses) < 1):
return None
json_data = json.loads(responses[0].read())
return json_data
if ((responses[0] == None) or (responses[0].status != 200)):
print("Item not found")
return None
json_data = json.loads(responses[0].read())
return json_data
def addPostData(self, post_data):
self.post_datas.append(post_data)
def addPostData(self, post_data):
self.post_datas.append(post_data)
# ~LFRequest

148
py-json/LANforge/LFUtils.py Normal file
View File

@@ -0,0 +1,148 @@
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Define useful common methods -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit()
import pprint
import json
import time
from time import sleep
from random import seed
seed( int(round(time.time() * 1000)))
from random import randint
from LANforge import LFRequest
debug_printer = pprint.PrettyPrinter(indent=2)
base_url = "http://localhost:8080"
class PortEID:
shelf: 1
resource: 1
port_id: 0
port_name: ""
def __init__(self, p_resource=1, p_port_id=0, p_port_name=""):
resource = p_resource
port_id = p_port_id
port_name = p_port_name
def __init__(self, json_response):
if json_response == None:
raise Exception("No json input")
json_s = json_response
if json_response['interface'] != None:
json_s = json_response['interface']
debug_printer(json_s)
resource = json_s['resource']
port_id = json_s['id']
port_name = json_s['name']
# end class PortEID
def portNameSeries(prefix="sta", start_id=0, end_id=1, padding_number=1000):
name_list = []
for i in range((padding_number+start_id), (padding_number+end_id)):
sta_name = prefix+str(i)[1:]
name_list.append(sta_name)
return name_list
# generate random hex if you need it for mac addresses
def generateRandomHex():
# generate a few random numbers and convert them into hex:
random_hex = []
for rn in range(0, 254):
random_dec = randint(0, 254)
random_hex.append(hex(random_dec))
return random_hex
# return reverse map of aliases to port records
def portAliasesInList(json_list):
if len(json_list) < 1:
raise Exception("empty list")
reverse_map = {}
json_interfaces = json_list
if 'interfaces' in json_list:
json_interfaces = json_list['interfaces']
# expect nested records, which is an artifact of some ORM
# that other customers expect:
# [
# {
# "1.1.eth0": {
# "alias":"eth0"
# }
# },
# { ... }
for record in json_interfaces:
if len(record.keys()) < 1:
continue
record_keys = record.keys()
#debug_printer.pprint(record_keys)
k2 = ""
for k in record_keys:
k2 = k
if k2.__contains__("Unknown"):
#debug_printer.pprint("skipping: "+k2)
continue
port_json = record[k2]
reverse_map[port_json['alias']] = record
#print("resulting map: ")
#debug_printer.pprint(reverse_map)
return reverse_map
def findPortEids(resource_id=1, port_names=(), base_url="http://localhost:8080"):
port_eids = []
if len(port_names) < 0:
return []
for port_name in port_names:
url = "/port/1/%s/%s"%(resource_id,port_name)
lf_r = LFRequest.LFRequest(url)
try:
response = lf_r.getAsJson()
if response == None:
continue
port_eids.append(PortEID(response))
except:
print("Not found: "+port_name)
return None
def waitUntilPortsDisappear(resource_id=1, port_list=()):
found_stations = port_list.copy()
sleep(1)
while len(found_stations) > 0:
found_stations = []
for port_name in port_list:
sleep(1)
url = base_url+"/port/1/%s/%s" % (resource_id, port_name)
print("Example 2: checking for station : "+url)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson()
if (json_response != None):
found_stations.append(port_name)
return None
###
def waitUntilPortsAppear(resource_id=1, port_list=()):
found_stations = []
sleep(1)
while len(found_stations) < len(port_list):
found_stations = []
for port_name in port_list:
sleep(1)
url = base_url+"/port/1/%s/%s" % (resource_id, port_name)
print("Example 2: checking for station : "+url)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson()
if (json_response != None):
found_stations.append(port_name)
return None

View File

@@ -11,11 +11,11 @@ if sys.version_info[0] != 3:
import json
import pprint
from LANforge import LFRequest
from random import seed
from random import randint
import time
from time import sleep
from LANforge import LFRequest
from LANforge import LFUtils
def gen_mac(parent_mac, random_octet):
print("************ random_octet: %s **************"%(random_octet))
@@ -28,7 +28,6 @@ def gen_mac(parent_mac, random_octet):
def main():
base_url = "http://localhost:8080"
shelf_id = 1 # typicaly assume Shelf 1
resource_id = 1 # typically you're using resource 1 in stand alone realm
radio = "wiphy0"
start_id = 200
@@ -37,50 +36,66 @@ def main():
ssid = "jedway-wpa2-x2048-4-1"
passphrase = "jedway-wpa2-x2048-4-1"
j_printer = pprint.PrettyPrinter(indent=2)
json_post = ""
json_response = ""
lf_r = LFRequest.LFRequest(base_url+"/port/1/1/wiphy0")
wiphy0_json = lf_r.getAsJson()
if (wiphy0_json is None) or (wiphy0_json['interface'] is None):
print("Unable to find radio. Are we connected?")
exit(1)
print("# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -")
print("# radio wiphy0 -")
print("# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -")
j_printer.pprint(wiphy0_json['interface'])
LFUtils.debug_printer.pprint(wiphy0_json['interface']['alias'])
#parent_radio_mac = wiphy0_json['interface']['mac']
print("# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -")
parent_radio_mac = wiphy0_json['interface']['mac']
# generate a few random numbers and convert them into hex:
random_hex = []
seed( int(round(time.time() * 1000)))
for rn in range(0, 254):
random_dec = randint(0, 254)
random_hex.append(hex(random_dec))
found_stations = []
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# example 1 -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# uses URLs /cli-form/rm_vlan, /cli-form/add_sta
# for each of the station names, delete them if they exist
# This section uses URLs /cli-form/rm_vlan, /cli-form/add_sta
# The /cli-form URIs take URL-encoded form posts
#
# For each of the station names, delete them if they exist. It
# takes a few milliseconds to delete them, so after deleting them
# you need to poll until they don't appear.
#
# NOTE: the ID field of the EID is ephemeral, so best stick to
# requesting the station name. The station name can be formatted
# with leading zeros, sta00001 is legal and != {sta0001, sta001, sta01, or sta1}
# with leading zeros, sta00001 is legal
# and != {sta0001, sta001, sta01, or sta1}
for i in range((padding_number+start_id), (padding_number+end_id)):
sta_name = "sta"+str(i)[1:]
url = base_url+"/port/%s/%s/%s" % (shelf_id, resource_id, sta_name)
print("checking for station : "+url)
desired_stations = LFUtils.portNameSeries("sta", start_id, end_id, padding_number)
for sta_name in desired_stations:
url = base_url+"/port/1/%s/%s" % (resource_id, sta_name)
print("Example 1: Checking for station : "+url)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson()
if (json_response != None):
print("I would delete station %s now"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-form/rm_vlan")
lf_r.addPostData( {
"shelf":1,
"resource": resource_id,
"port": "sta%s"%i
})
json_response = lf_r.formPost()
print(json_response)
found_stations.append(sta_name)
print("Next we create station %s"%sta_name)
for sta_name in found_stations:
print("Ex 1: Deleting station %s ...."%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-form/rm_vlan")
lf_r.addPostData( {
"shelf":1,
"resource": resource_id,
"port": sta_name
})
json_response = lf_r.formPost()
sleep(0.05) # best to give LANforge a few millis between rm_vlan commands
LFUtils.waitUntilPortsDisappear(resource_id, found_stations)
print("Example 1: Next we create stations...")
for sta_name in desired_stations:
print("Example 1: Next we create station %s"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-form/add_sta")
# flags are a decimal equivalent of a hexadecimal bitfield
# you can submit as either 0x(hex) or (dec)
@@ -98,11 +113,15 @@ def main():
# do not take advantage of address mask matchin in Ath10k hardware, so we developed
# this pattern to randomize a section of octets. XX: keep parent, *: randomize, and
# chars [0-9a-f]: use this digit
#
# If you get errors like "X is invalid hex character", this indicates a previous
# rm_vlan call has not removed your station yet: you cannot rewrite mac addresses
# with this call, just create new stations
lf_r.addPostData( {
"shelf":1,
"resource": resource_id,
"radio": radio,
"sta_name": "sta%s"%i,
"sta_name": sta_name,
"flags":68727874560,
"ssid": ssid,
"key": passphrase,
@@ -112,53 +131,70 @@ def main():
})
json_response = lf_r.formPost()
print(json_response)
print("done")
sleep(1)
LFUtils.waitUntilPortsAppear()
print("...done with example 1\n\n")
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# example 2 -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -d
# Example 2 -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# uses URLs /cli-json/rm_vlan, /cli-json/add_sta
# and those accept POST in json formatted text
desired_stations = []
found_stations = []
for i in range((padding_number+start_id), (padding_number+end_id)):
sta_name = "sta"+str(i)[1:]
url = base_url+"/port/%s/%s/%s" % (shelf_id, resource_id, sta_name)
print("checking for station : "+url)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson()
if (json_response != None):
print("I would delete station %s now"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-json/rm_vlan")
lf_r.addPostData( {
desired_stations.append("sta"+str(i)[1:])
print("Example 2: using port list to find stations")
url = base_url+"/port/1/%s/list?fields=alias" % (resource_id)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson()
if json_response == None:
raise Exception("no reponse to: "+url)
port_map = LFUtils.portAliasesInList(json_response)
#LFUtils.debug_printer.pprint(port_map)
for sta_name in desired_stations:
print("Example 2: checking for station : "+sta_name)
#LFUtils.debug_printer.pprint(port_map.keys())
if sta_name in port_map.keys():
print("found station : "+sta_name)
found_stations.append(sta_name)
for sta_name in found_stations:
print("Example 2: delete station %s ..."%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-json/rm_vlan")
lf_r.addPostData({
"shelf":1,
"resource": resource_id,
"port": "sta%s"%i
"port": sta_name
})
json_response = lf_r.jsonPost()
print(json_response)
lf_r.jsonPost()
sleep(0.05)
print("Next we create station %s"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-form/add_sta")
LFUtils.waitUntilPortsDisappear(resource_id, found_stations)
for sta_name in desired_stations:
print("Example 2: create station %s"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-json/add_sta")
# see notes from example 1 on flags and mac address patterns
octet = random_hex.pop(0)[2:]
#octet = random_hex.pop(0)[2:] is a method
#gen_mac(parent_radio_mac, octet)
lf_r.addPostData( {
"shelf":1,
"resource": resource_id,
"radio": radio,
"sta_name": "sta%s"%i,
"sta_name": sta_name,
"flags":68727874560,
"ssid": ssid,
"key": passphrase,
"mac": "xx:xx:xx:xx:*:xx", #gen_mac(parent_radio_mac, octet)
"mac": "xx:xx:xx:xx:*:xx",
"mode": 0,
"rate": "DEFAULT"
})
json_response = lf_r.jsonPost()
print(json_response)
print("done")
lf_r.jsonPost()
print("...done with Example 2")
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
if __name__ == "__main__":