py-json: simplifies get request and error output, examples work better

This commit is contained in:
Jed Reynolds
2019-10-30 16:16:03 -07:00
parent 3c60c2da99
commit 57878e0412
3 changed files with 188 additions and 97 deletions

View File

@@ -8,74 +8,114 @@ if sys.version_info[0] != 3:
exit()
import urllib.request
from urllib import error
import urllib.parse
import json
import LANforge
from LANforge import LFUtils
class LFRequest:
Default_Base_URL = "http://localhost:8080"
requested_urls = []
requested_url = ""
post_datas = []
default_headers = {
'Accept': 'application/json'}
def __init__(self, urls):
self.requested_urls.append(urls)
def __init__(self, url):
self.requested_url = url
# request first url on stack
def formPost(self):
def formPost(self, show_error=True):
responses = []
urlenc_data = ""
if ((len(self.post_datas) > 0) and (self.post_datas[0] != None)):
urlenc_data = urllib.parse.urlencode(self.post_datas.pop(0)).encode("utf-8")
#print("data looks like:" + str(urlenc_data))
request = urllib.request.Request(url=self.requested_urls.pop(0),
request = urllib.request.Request(url=self.requested_url,
data=urlenc_data,
headers=self.default_headers)
else:
request = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
request = urllib.request.Request(url=self.requested_url, headers=self.default_headers)
print("No data for this jsonPost?")
request.headers['Content-type'] = 'application/x-www-form-urlencoded'
try:
responses.append(urllib.request.urlopen(request))
return responses[0]
except urllib.error.HTTPError:
if (show_error):
print("-------------------------------------------------------------")
print("Request URL:")
LFUtils.debug_printer.pprint(request.get_full_url())
print("Request Content-type:")
LFUtils.debug_printer.pprint(request.get_header('Content-type'))
print("Request Accept:")
LFUtils.debug_printer.pprint(request.get_header('Accept'))
print("Request Data:")
LFUtils.debug_printer.pprint(request.data)
if (len(responses) > 0):
print("-------------------------------------------------------------")
print("Response:")
LFUtils.debug_printer.pprint(responses[0].reason)
print("-------------------------------------------------------------")
def jsonPost(self):
return None
def jsonPost(self, show_error=True):
responses = []
if ((len(self.post_datas) > 0) and (self.post_datas[0] != None)):
request = urllib.request.Request(url=self.requested_urls.pop(0),
request = urllib.request.Request(url=self.requested_url,
data=json.dumps(self.post_datas.pop(0)).encode("utf-8"),
headers=self.default_headers)
else:
request = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
request = urllib.request.Request(url=self.requested_url, headers=self.default_headers)
print("No data for this jsonPost?")
request.headers['Content-type'] = 'application/json'
try:
responses.append(urllib.request.urlopen(request))
return responses[0]
except urllib.error.HTTPError:
if (show_error):
print("-------------------------------------------------------------")
print("Request URL:")
LFUtils.debug_printer.pprint(request.get_full_url())
print("Request Content-type:")
LFUtils.debug_printer.pprint(request.get_header('Content-type'))
print("Request Accept:")
LFUtils.debug_printer.pprint(request.get_header('Accept'))
print("Request Data:")
LFUtils.debug_printer.pprint(request.data)
if (len(responses) > 0):
print("-------------------------------------------------------------")
print("Response:")
LFUtils.debug_printer.pprint(responses[0].reason)
print("-------------------------------------------------------------")
return None
def get(self):
myrequest = urllib.request.Request(url=self.requested_urls.pop(0), headers=self.default_headers)
def get(self, show_error=True):
myrequest = urllib.request.Request(url=self.requested_url, headers=self.default_headers)
myresponses = []
# print(responses[0].__dict__.keys()) is how you would see parts of response
try:
myresponses.append(urllib.request.urlopen(myrequest))
return myresponses[0]
except:
if (show_error):
print("Url: "+myrequest.get_full_url())
print("Error: ", sys.exc_info()[0])
return None
def getAsJson(self):
def getAsJson(self, show_error=True):
responses = []
responses.append(self.get())
responses.append(self.get(show_error))
if (len(responses) < 1):
return None
if ((responses[0] == None) or (responses[0].status != 200)):
print("Item not found")
if (responses[0] == None):
if (show_error):
print("No response from "+self.requested_url)
return None
json_data = json.loads(responses[0].read())
return json_data

View File

@@ -44,27 +44,92 @@ class PortEID:
port_name = json_s['name']
# end class PortEID
# for use with set_port
def portDhcpUpRequest(resource_id, port_name):
def staNewDownStaRequest(sta_name, resource_id=1, radio="wiphy0", flags=ADD_STA_FLAGS_DOWN_WPA2, ssid="", passphrase="", debug_on=False):
"""
For use with add_sta. If you don't want to generate mac addresses via patterns (xx:xx:xx:xx:81:*)
you can generate octets using random_hex.pop(0)[2:] and gen_mac(parent_radio_mac, octet)
See http://localhost:8080/help/add_sta
:param passphrase:
:param ssid:
:type sta_name: str
"""
data = {
"shelf": 1,
"shelf":1,
"resource": resource_id,
"port": port_name,
"current_flags": 2147483648, # 0x1 = interface down + 2147483648 use DHCP values
"interest": 75513859 # includes use_command_flags + use_current_flags + dhcp + dhcp_rls + ifdown
"radio": radio,
"sta_name": sta_name,
"flags": ADD_STA_FLAGS_DOWN_WPA2, # note flags for add_sta do not match set_port
"ssid": ssid,
"key": passphrase,
"mac": "xx:xx:xx:xx:*:xx", # "NA", #gen_mac(parent_radio_mac, random_hex.pop(0))
"mode": 0,
"rate": "DEFAULT"
}
if (debug_on):
debug_printer.pprint(data)
return data
# for use with set_port
def portDownRequest(resource_id, port_name):
def portSetDhcpDownRequest(resource_id, port_name, debug_on=False):
"""
See http://localhost:8080/help/set_port
:param resource_id:
:param port_name:
:return:
"""
data = {
"shelf": 1,
"resource": resource_id,
"port": port_name,
"current_flags": 1, # 0x0 = interface up
"interest": 75513859
"current_flags": 2147483649, # 0x1 = interface down + 2147483648 use DHCP values
"interest": 75513858, # includes use_current_flags + dhcp + dhcp_rls + ifdown
"report_timer": 3000
}
if (debug_on):
debug_printer.pprint(data)
return data
def portDhcpUpRequest(resource_id, port_name, debug_on=False):
"""
See http://localhost:8080/help/set_port
:param resource_id:
:param port_name:
:return:
"""
data = {
"shelf": 1,
"resource": resource_id,
"port": port_name,
"current_flags": 2147483648, # vs 0x1 = interface down + use_dhcp
"interest": 75513858, # includes use_current_flags + dhcp + dhcp_rls + ifdown
"report_timer": 2200,
}
if (debug_on):
debug_printer.pprint(data)
return data
def portDownRequest(resource_id, port_name, debug_on=False):
"""
Does not change the use_dhcp flag
See http://localhost:8080/help/set_port
:param resource_id:
:param port_name:
:return:
"""
data = {
"shelf": 1,
"resource": resource_id,
"port": port_name,
"report_timer": 3000,
"current_flags": 1, # vs 0x0 = interface up
"interest": 8388610 # = current_flags + ifdown
}
if (debug_on):
debug_printer.pprint(data)
return data
@@ -124,11 +189,9 @@ def portAliasesInList(json_list):
k2 = ""
for k in record_keys:
k2 = k
if k2.__contains__("Unknown"):
#debug_printer.pprint("skipping: "+k2)
continue
port_json = record[k2]
reverse_map[port_json['alias']] = record
#print("resulting map: ")
@@ -161,9 +224,8 @@ def waitUntilPortsDisappear(resource_id=1, port_list=()):
for port_name in port_list:
sleep(1)
url = base_url+"/port/1/%s/%s" % (resource_id, port_name)
print("Example 2: checking for station : "+url)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson()
json_response = lf_r.getAsJson(show_error=False)
if (json_response != None):
found_stations.append(port_name)
return None
@@ -177,9 +239,9 @@ def waitUntilPortsAppear(resource_id=1, port_list=()):
for port_name in port_list:
sleep(1)
url = base_url+"/port/1/%s/%s" % (resource_id, port_name)
print("Example 2: checking for station : "+url)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson()
json_response = lf_r.getAsJson(show_error=False)
if (json_response != None):
found_stations.append(port_name)
sleep(1)
return None

View File

@@ -9,11 +9,10 @@ if sys.version_info[0] != 3:
print("This script requires Python 3")
exit()
import json
import pprint
import time
from time import sleep
import pprint
import LANforge
from LANforge import LFRequest
from LANforge import LFUtils
@@ -65,13 +64,13 @@ def main():
# and != {sta0001, sta001, sta01, or sta1}
desired_stations = LFUtils.portNameSeries("sta", start_id, end_id, padding_number)
LFUtils.debug_printer.pprint(desired_stations)
#LFUtils.debug_printer.pprint(desired_stations)
print("Example 1: will create stations %s"%(",".join(desired_stations)))
for sta_name in desired_stations:
url = base_url+"/port/1/%s/%s" % (resource_id, sta_name)
print("Example 1: Checking for station : "+url)
print("Ex 1: Checking for station : "+url)
lf_r = LFRequest.LFRequest(url)
json_response = lf_r.getAsJson()
json_response = lf_r.getAsJson(show_error=False)
if (json_response != None):
found_stations.append(sta_name)
@@ -88,10 +87,10 @@ def main():
LFUtils.waitUntilPortsDisappear(resource_id, found_stations)
print("Example 1: Next we create stations...")
print("Ex 1: Next we create stations...")
#68727874560 was previous flags
for sta_name in desired_stations:
print("Example 1: Next we create station %s"%sta_name)
print("Ex 1: Next we create station %s"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-form/add_sta")
# flags are a decimal equivalent of a hexadecimal bitfield
# you can submit as either 0x(hex) or (dec)
@@ -113,25 +112,21 @@ def main():
# If you get errors like "X is invalid hex character", this indicates a previous
# rm_vlan call has not removed your station yet: you cannot rewrite mac addresses
# with this call, just create new stations
lf_r.addPostData( {
"shelf":1,
"resource": resource_id,
"radio": radio,
"sta_name": sta_name,
"flags": LFUtils.ADD_STA_FLAGS_DOWN_WPA2, # note flags for add_sta do not match set_port
"ssid": ssid,
"key": passphrase,
"mac": "xx:xx:xx:xx:*:xx", # "NA", #gen_mac(parent_radio_mac, random_hex.pop(0))
"mode": 0,
"rate": "DEFAULT"
})
json_response = lf_r.formPost()
lf_r.addPostData( LFUtils.staNewDownStaRequest(sta_name, resource_id=resource_id, radio=radio, ssid=ssid, passphrase=passphrase))
lf_r.formPost()
sleep(0.05)
LFUtils.waitUntilPortsAppear(resource_id, desired_stations)
for sta_name in desired_stations:
lf_r = LFRequest.LFRequest(base_url+"/cli-form/set_port")
lf_r.addPostData( LFUtils.portSetDhcpDownRequest(resource_id, sta_name))
lf_r.formPost()
sleep(0.05)
# the LANforge API separates STA creation and ethernet port settings
# We need to revisit the stations we create and amend flags to add
# things like DHCP or ip+gateway, admin-{up,down}
sleep(1)
LFUtils.waitUntilPortsAppear(resource_id, desired_stations)
for sta_name in desired_stations:
print("Ex 1: station up %s"%sta_name)
@@ -139,17 +134,19 @@ def main():
data = LFUtils.portDhcpUpRequest(resource_id, sta_name)
lf_r.addPostData(data)
lf_r.jsonPost()
sleep(0.05)
LFUtils.waitUntilPortsAppear(resource_id, desired_stations)
sleep(1)
for sta_name in desired_stations:
print("Ex 1: sta down %s"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-json/set_port")
data = LFUtils.portDownRequest(resource_id, sta_name)
lf_r.addPostData(data)
lf_r.jsonPost()
# for sta_name in desired_stations:
# print("Ex 1: sta down %s"%sta_name)
# lf_r = LFRequest.LFRequest(base_url+"/cli-json/set_port")
# lf_r.addPostData(LFUtils.portDownRequest(resource_id, sta_name))
# lf_r.jsonPost()
# sleep(0.05)
print("...done with example 1\n\n")
sleep(2)
sleep(4)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Example 2 -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -157,8 +154,9 @@ def main():
# and those accept POST in json formatted text
desired_stations = []
found_stations = []
for i in range((padding_number+start_id), (padding_number+end_id)):
desired_stations.append("sta"+str(i)[1:])
start_id = 220
end_id = 222
desired_stations = LFUtils.portNameSeries("sta", start_id, end_id, padding_number)
print("Example 2: using port list to find stations")
sleep(1)
@@ -167,12 +165,11 @@ def main():
json_response = lf_r.getAsJson()
if json_response == None:
raise Exception("no reponse to: "+url)
port_map = LFUtils.portAliasesInList(json_response)
#LFUtils.debug_printer.pprint(port_map)
for sta_name in desired_stations:
print("Ex 2: checking for station : "+sta_name)
#LFUtils.debug_printer.pprint(port_map.keys())
if sta_name in port_map.keys():
print("found station : "+sta_name)
found_stations.append(sta_name)
@@ -185,56 +182,48 @@ def main():
"resource": resource_id,
"port": sta_name
})
lf_r.jsonPost()
lf_r.jsonPost(show_error=False)
sleep(0.05)
LFUtils.waitUntilPortsDisappear(resource_id, found_stations)
for sta_name in desired_stations:
print("Ex 2: create station %s"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-json/add_sta")
# see notes from example 1 on flags and mac address patterns
#octet = random_hex.pop(0)[2:] is a method
#gen_mac(parent_radio_mac, octet)
lf_r.addPostData( {
"shelf":1,
"resource": resource_id,
"radio": radio,
"sta_name": sta_name,
"flags": LFUtils.ADD_STA_FLAGS_DOWN_WPA2,
"ssid": ssid,
"key": passphrase,
"mac": "xx:xx:xx:xx:*:xx",
"mode": 0,
"rate": "DEFAULT"
})
lf_r.addPostData(LFUtils.staNewDownStaRequest(sta_name, resource_id=resource_id, radio=radio, ssid=ssid, passphrase=passphrase))
lf_r.jsonPost()
sleep(0.05)
LFUtils.waitUntilPortsAppear(resource_id, desired_stations)
# the LANforge API separates STA creation and ethernet port settings
# We need to revisit the stations we create and amend flags to add
# things like DHCP or ip+gateway, admin-{up,down}
for sta_name in desired_stations:
print("Ex 2: create station %s"%sta_name)
print("Ex 2: set port %s"%sta_name)
lf_r = LFRequest.LFRequest(base_url+"/cli-json/set_port")
data = LFUtils.portDhcpUpRequest(resource_id, sta_name)
lf_r.addPostData(data)
lf_r.jsonPost()
sleep(0.05)
print("...done with Example 2")
sleep(1)
print("Example 3: bring ports up and down")
sleep(1)
print("Ex 3: setting ports up...")
desired_stations.insert(0, "sta0200")
desired_stations.insert(1, "sta0201")
desired_stations.insert(2, "sta0202")
wait_for_these = []
for sta_name in desired_stations:
lf_r = LFRequest.LFRequest(base_url+"/port/1/%s/%s?fields=port,device,down"%(resource_id, sta_name))
json_response = lf_r.getAsJson()
if json_response['interface']['down'] is 'true':
data = LFUtils.portUpRequest(resource_id, sta_name)
url = base_url+"/cli-json/set_port"
lf_r = LFRequest.LFRequest(url)
lf_r.addPostData(data)
lf_r.addPostData(LFUtils.portDhcpUpRequest(resource_id, sta_name))
print("setting %s up"%sta_name)
json_response = lf_r.jsonPost()
lf_r.jsonPost()
wait_for_these.append(sta_name)
LFUtils.waitUntilPortsAppear(resource_id, wait_for_these)
exit(0) ######################################################