Radius and EAP Clients are working from sdk_set_profile.py

This commit is contained in:
shivam
2021-02-12 16:14:13 +05:30
parent e8389ca569
commit 4bb2e2a71a
5 changed files with 134 additions and 141 deletions

View File

@@ -37,6 +37,7 @@ from ap_ssh import ssh_cli_active_fw
import lab_ap_info
import ap_ssh
###Class for CloudSDK Interaction via RestAPI
class CloudSDK:
def __init__(self, command_line_args):
@@ -52,7 +53,7 @@ class CloudSDK:
self.bearer = self.get_bearer(self.base_url, self.cloud_type)
def get_bearer(self, cloudSDK_url, cloud_type):
cloud_login_url = cloudSDK_url+"/management/"+cloud_type+"/oauth2/token"
cloud_login_url = cloudSDK_url + "/management/" + cloud_type + "/oauth2/token"
payload = '''
{
"userId": "''' + self.user + '''",
@@ -66,11 +67,11 @@ class CloudSDK:
token_response = requests.request("POST", cloud_login_url, headers=headers, data=payload)
self.check_response("POST", token_response, headers, payload, cloud_login_url)
except requests.exceptions.RequestException as e:
raise SystemExit("Exiting Script! Cloud not get bearer token for reason:",e)
raise SystemExit("Exiting Script! Cloud not get bearer token for reason:", e)
token_data = token_response.json()
bearer_token = token_data['access_token']
bearer = bearer_token
return(bearer_token)
return (bearer_token)
def check_response(self, cmd, response, headers, data_str, url):
if response.status_code >= 500 or self.verbose:
@@ -104,7 +105,7 @@ class CloudSDK:
"date": "N/A",
"commitId": "N/A",
"projectVersion": "N/A"
}
}
if report_data:
report_data['cloud_sdk'][key] = cloudsdk_cluster_info
@@ -123,7 +124,7 @@ class CloudSDK:
report_data['cloud_sdk'][key] = cloudsdk_cluster_info
if (ap_cli_fw != latest_ap_image or force_upgrade == True) and not skip_upgrade:
print('Updating firmware, old: %s new: %s'%(ap_cli_fw, latest_ap_image))
print('Updating firmware, old: %s new: %s' % (ap_cli_fw, latest_ap_image))
do_upgrade = True
if report_data:
report_data['fw_available'][key] = "Yes"
@@ -131,7 +132,6 @@ class CloudSDK:
return do_upgrade
# client is testrail client
def do_upgrade_ap_fw(self, command_line_args, report_data, test_cases, testrail_client, ap_image, cloudModel, model,
jfrog_user, jfrog_pwd, testrails_rid, customer_id, equipment_id, logger):
@@ -177,7 +177,7 @@ class CloudSDK:
commit = ap_image.split("-")[-1]
try:
fw_upload_status = self.firwmare_upload(commit, cloudModel, ap_image, fw_url, cloudSDK_url,
bearer)
bearer)
fw_id = fw_upload_status['id']
print("Upload Complete.", ap_image, "FW ID is", fw_id)
if testrail_client:
@@ -208,7 +208,8 @@ class CloudSDK:
if report_data and test_cases:
report_data['tests'][key][test_cases["upgrade_api"]] = "passed"
if testrail_client:
testrail_client.update_testrail(case_id=test_cases["upgrade_api"], run_id=rid, status_id=1, msg='Upgrade request using API successful')
testrail_client.update_testrail(case_id=test_cases["upgrade_api"], run_id=rid, status_id=1,
msg='Upgrade request using API successful')
if logger:
logger.info('Firmware upgrade API successfully sent')
else:
@@ -216,7 +217,8 @@ class CloudSDK:
# mark upgrade test case as failed with CloudSDK error
if test_cases:
if testrail_client:
testrail_client.update_testrail(case_id=test_cases["upgrade_api"], run_id=rid, status_id=5, msg='Error requesting upgrade via API')
testrail_client.update_testrail(case_id=test_cases["upgrade_api"], run_id=rid, status_id=5,
msg='Error requesting upgrade via API')
if report_data:
report_data['tests'][key][test_cases["upgrade_api"]] = "failed"
if logger:
@@ -227,7 +229,8 @@ class CloudSDK:
# mark upgrade test case as failed with CloudSDK error
if test_cases:
if testrail_client:
testrail_client.update_testrail(case_id=test_cases["upgrade_api"], run_id=rid, status_id=5,msg='Error requesting upgrade via API')
testrail_client.update_testrail(case_id=test_cases["upgrade_api"], run_id=rid, status_id=5,
msg='Error requesting upgrade via API')
if report_data:
report_data['tests'][key][test_cases["upgrade_api"]] = "failed"
if logger:
@@ -241,17 +244,17 @@ class CloudSDK:
if test_cases:
test_id_cloud = test_cases["cloud_fw"]
cloud_ap_fw = self.ap_firmware(customer_id, equipment_id, cloudSDK_url, bearer)
print('Current AP Firmware from CloudSDK: %s requested-image: %s'%(cloud_ap_fw, ap_image))
print('Current AP Firmware from CloudSDK: %s requested-image: %s' % (cloud_ap_fw, ap_image))
if logger:
logger.info('AP Firmware from CloudSDK: ' + cloud_ap_fw)
if cloud_ap_fw == "ERROR":
print("AP FW Could not be read from CloudSDK")
elif cloud_ap_fw == ap_image:
print("CloudSDK status shows upgrade successful!")
sdk_ok = True
break
else:
print("AP FW from CloudSDK status is not latest build. Will try again in 30 seconds.")
@@ -271,7 +274,7 @@ class CloudSDK:
cli_ok = True
break
else:
print("probed api-cli-fw: %s != requested-image: %s"%(ap_cli_fw, ap_image))
print("probed api-cli-fw: %s != requested-image: %s" % (ap_cli_fw, ap_image))
continue
except Exception as ex:
ap_cli_info = "ERROR"
@@ -363,7 +366,7 @@ class CloudSDK:
return False
def ap_firmware(self, customer_id, equipment_id, cloudSDK_url, bearer):
equip_fw_url = cloudSDK_url+"/portal/status/forEquipment?customerId="+customer_id+"&equipmentId="+equipment_id
equip_fw_url = cloudSDK_url + "/portal/status/forEquipment?customerId=" + customer_id + "&equipmentId=" + equipment_id
payload = {}
headers = {
'Authorization': 'Bearer ' + bearer
@@ -373,17 +376,17 @@ class CloudSDK:
status_code = status_response.status_code
if status_code == 200:
status_data = status_response.json()
#print(status_data)
# print(status_data)
current_ap_fw = status_data[2]['details']['reportedSwVersion']
return current_ap_fw
else:
return("ERROR")
return ("ERROR")
def CloudSDK_images(self, apModel, cloudSDK_url, bearer):
if apModel and apModel != "None":
getFW_url = cloudSDK_url+"/portal/firmware/version/byEquipmentType?equipmentType=AP&modelId=" + apModel
getFW_url = cloudSDK_url + "/portal/firmware/version/byEquipmentType?equipmentType=AP&modelId=" + apModel
else:
getFW_url = cloudSDK_url+"/portal/firmware/version/byEquipmentType?equipmentType=AP"
getFW_url = cloudSDK_url + "/portal/firmware/version/byEquipmentType?equipmentType=AP"
payload = {}
headers = {
'Authorization': 'Bearer ' + bearer
@@ -394,14 +397,14 @@ class CloudSDK:
###return ap_fw_details
fwlist = []
for version in ap_fw_details:
fwlist.append(version.get('versionName'))
return(fwlist)
#fw_versionNames = ap_fw_details[0]['versionName']
#return fw_versionNames
fwlist.append(version.get('versionName'))
return (fwlist)
# fw_versionNames = ap_fw_details[0]['versionName']
# return fw_versionNames
def firwmare_upload(self, commit, apModel,latest_image,fw_url,cloudSDK_url,bearer):
fw_upload_url = cloudSDK_url+"/portal/firmware/version"
payload = "{\n \"model_type\": \"FirmwareVersion\",\n \"id\": 0,\n \"equipmentType\": \"AP\",\n \"modelId\": \""+apModel+"\",\n \"versionName\": \""+latest_image+"\",\n \"description\": \"\",\n \"filename\": \""+fw_url+"\",\n \"commit\": \""+commit+"\",\n \"validationMethod\": \"MD5_CHECKSUM\",\n \"validationCode\": \"19494befa87eb6bb90a64fd515634263\",\n \"releaseDate\": 1596192028877,\n \"createdTimestamp\": 0,\n \"lastModifiedTimestamp\": 0\n}\n\n"
def firwmare_upload(self, commit, apModel, latest_image, fw_url, cloudSDK_url, bearer):
fw_upload_url = cloudSDK_url + "/portal/firmware/version"
payload = "{\n \"model_type\": \"FirmwareVersion\",\n \"id\": 0,\n \"equipmentType\": \"AP\",\n \"modelId\": \"" + apModel + "\",\n \"versionName\": \"" + latest_image + "\",\n \"description\": \"\",\n \"filename\": \"" + fw_url + "\",\n \"commit\": \"" + commit + "\",\n \"validationMethod\": \"MD5_CHECKSUM\",\n \"validationCode\": \"19494befa87eb6bb90a64fd515634263\",\n \"releaseDate\": 1596192028877,\n \"createdTimestamp\": 0,\n \"lastModifiedTimestamp\": 0\n}\n\n"
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer
@@ -409,13 +412,13 @@ class CloudSDK:
response = requests.request("POST", fw_upload_url, headers=headers, data=payload)
self.check_response("POST", response, headers, payload, fw_upload_url)
#print(response)
# print(response)
upload_result = response.json()
return(upload_result)
return (upload_result)
def get_firmware_id(self, latest_ap_image, cloudSDK_url, bearer):
#print(latest_ap_image)
fw_id_url = cloudSDK_url+"/portal/firmware/version/byName?firmwareVersionName="+latest_ap_image
# print(latest_ap_image)
fw_id_url = cloudSDK_url + "/portal/firmware/version/byName?firmwareVersionName=" + latest_ap_image
payload = {}
headers = {
@@ -438,7 +441,7 @@ class CloudSDK:
rv = []
req = 0
while True:
print("Request %i: in get-paged-url, url: %s"%(req, url))
print("Request %i: in get-paged-url, url: %s" % (req, url))
response = requests.request("GET", url, headers=headers, data=payload)
self.check_response("GET", response, headers, payload, url)
rjson = response.json()
@@ -449,12 +452,12 @@ class CloudSDK:
if rjson['context']['lastPage']:
break
context_str = json.dumps(rjson['context'])
#print("context-str: %s"%(context_str))
# print("context-str: %s"%(context_str))
url = url_base + "&paginationContext=" + urllib.parse.quote(context_str)
req = req + 1
#print("Profile, reading another page, context:")
#print(rjson['context'])
#print("url: %s"%(fw_id_url))
# print("Profile, reading another page, context:")
# print(rjson['context'])
# print("url: %s"%(fw_id_url))
return rv
@@ -464,7 +467,7 @@ class CloudSDK:
'Authorization': 'Bearer ' + bearer
}
print("Get-url, url: %s"%(url))
print("Get-url, url: %s" % (url))
response = requests.request("GET", url, headers=headers, data=payload)
self.check_response("GET", response, headers, payload, url)
return response.json()
@@ -491,7 +494,8 @@ class CloudSDK:
prof_model_type = e['model_type']
prof_type = e['profileType']
prof_name = e['name']
print("looking for profile: %s checking prof-id: %s model-type: %s type: %s name: %s"%(name, prof_id, prof_model_type, prof_type, prof_name))
print("looking for profile: %s checking prof-id: %s model-type: %s type: %s name: %s" % (
name, prof_id, prof_model_type, prof_type, prof_name))
if name == prof_name:
return e
return None
@@ -505,7 +509,7 @@ class CloudSDK:
}
response = requests.request("DELETE", url, headers=headers, data=payload)
self.check_response("DELETE", response, headers, payload, url)
return(response)
return (response)
def delete_equipment(self, cloudSDK_url, bearer, eq_id):
url = cloudSDK_url + '/portal/equipment/?equipmentId=' + eq_id
@@ -516,7 +520,7 @@ class CloudSDK:
}
response = requests.request("DELETE", url, headers=headers, data=payload)
self.check_response("DELETE", response, headers, payload, url)
return(response)
return (response)
def get_customer_locations(self, cloudSDK_url, bearer, customer_id):
url_base = cloudSDK_url + "/portal/location/forCustomer" + "?customerId=" + customer_id
@@ -573,10 +577,10 @@ class CloudSDK:
}
response = requests.request("DELETE", url, headers=headers, data=payload)
self.check_response("DELETE", response, headers, payload, url)
return(response)
return (response)
def update_firmware(self, equipment_id, latest_firmware_id, cloudSDK_url, bearer):
url = cloudSDK_url+"/portal/equipmentGateway/requestFirmwareUpdate?equipmentId="+equipment_id+"&firmwareVersionId="+latest_firmware_id
url = cloudSDK_url + "/portal/equipmentGateway/requestFirmwareUpdate?equipmentId=" + equipment_id + "&firmwareVersionId=" + latest_firmware_id
payload = {}
headers = {
@@ -585,12 +589,12 @@ class CloudSDK:
response = requests.request("POST", url, headers=headers, data=payload)
self.check_response("POST", response, headers, payload, url)
#print(response.text)
# print(response.text)
return response.json()
# This one is not yet tested, coded from spec, could have bugs.
def ap_reboot(self, equipment_id, cloudSDK_url, bearer):
url = cloudSDK_url+"/portal/equipmentGateway/requestApReboot?equipmentId="+equipment_id
url = cloudSDK_url + "/portal/equipmentGateway/requestApReboot?equipmentId=" + equipment_id
payload = {}
headers = {
@@ -599,12 +603,12 @@ class CloudSDK:
response = requests.request("POST", url, headers=headers, data=payload)
self.check_response("POST", response, headers, payload, url)
#print(response.text)
# print(response.text)
return response.json()
# This one is not yet tested, coded from spec, could have bugs.
def ap_switch_sw_bank(self, equipment_id, cloudSDK_url, bearer):
url = cloudSDK_url+"/portal/equipmentGateway/requestApSwitchSoftwareBank?equipmentId="+equipment_id
url = cloudSDK_url + "/portal/equipmentGateway/requestApSwitchSoftwareBank?equipmentId=" + equipment_id
payload = {}
headers = {
@@ -613,12 +617,12 @@ class CloudSDK:
response = requests.request("POST", url, headers=headers, data=payload)
self.check_response("POST", response, headers, payload, url)
#print(response.text)
# print(response.text)
return response.json()
# This one is not yet tested, coded from spec, could have bugs.
def ap_factory_reset(self, equipment_id, cloudSDK_url, bearer):
url = cloudSDK_url+"/portal/equipmentGateway/requestApFactoryReset?equipmentId="+equipment_id
url = cloudSDK_url + "/portal/equipmentGateway/requestApFactoryReset?equipmentId=" + equipment_id
payload = {}
headers = {
@@ -627,12 +631,12 @@ class CloudSDK:
response = requests.request("POST", url, headers=headers, data=payload)
self.check_response("POST", response, headers, payload, url)
#print(response.text)
# print(response.text)
return response.json()
# This one is not yet tested, coded from spec, could have bugs.
def ap_channel_change(self, equipment_id, cloudSDK_url, bearer):
url = cloudSDK_url+"/portal/equipmentGateway/requestChannelChange?equipmentId="+equipment_id
url = cloudSDK_url + "/portal/equipmentGateway/requestChannelChange?equipmentId=" + equipment_id
payload = {}
headers = {
@@ -641,7 +645,7 @@ class CloudSDK:
response = requests.request("POST", url, headers=headers, data=payload)
self.check_response("POST", response, headers, payload, url)
#print(response.text)
# print(response.text)
return response.json()
def set_ap_profile(self, equipment_id, test_profile_id):
@@ -658,7 +662,7 @@ class CloudSDK:
###Add Lab Profile ID to Equipment
equipment_info = response.json()
#print(equipment_info)
# print(equipment_info)
equipment_info["profileId"] = test_profile_id
print(equipment_info)
@@ -674,8 +678,8 @@ class CloudSDK:
print(response)
def get_cloudsdk_version(self, cloudSDK_url, bearer):
#print(latest_ap_image)
url = cloudSDK_url+"/ping"
# print(latest_ap_image)
url = cloudSDK_url + "/ping"
payload = {}
headers = {
@@ -693,14 +697,14 @@ class CloudSDK:
profile["name"] = name
profile["childProfileIds"] = child_profiles
url = cloudSDK_url+"/portal/profile"
url = cloudSDK_url + "/portal/profile"
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer
}
data_str = json.dumps(profile)
print("Creating new ap-profile, data: %s"%(data_str))
print("Creating new ap-profile, data: %s" % (data_str))
response = requests.request("POST", url, headers=headers, data=data_str)
self.check_response("POST", response, headers, data_str, url)
ap_profile = response.json()
@@ -720,27 +724,28 @@ class CloudSDK:
profile["name"] = name
profile["childProfileIds"] = child_profiles
url = cloudSDK_url+"/portal/profile"
url = cloudSDK_url + "/portal/profile"
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer
}
data_str = json.dumps(profile)
print("Updating ap-profile, data: %s"%(data_str))
print("Updating ap-profile, data: %s" % (data_str))
response = requests.request("PUT", url, headers=headers, data=data_str)
self.check_response("PUT", response, headers, data_str, url)
print(response)
if self.verbose:
p2 = self.get_customer_profile_by_name(cloudSDK_url, bearer, customer_id, name)
print("AP Profile: %s after update:"%(name))
print("AP Profile: %s after update:" % (name))
print(json.dumps(p2, indent=4, sort_keys=True))
return profile['id']
def create_ssid_profile(self, cloudSDK_url, bearer, customer_id, template, name, ssid, passkey, radius, security, mode, vlan, radios):
print("create-ssid-profile, template: %s"%(template))
def create_ssid_profile(self, cloudSDK_url, bearer, customer_id, template, name, ssid, passkey, radius, security,
mode, vlan, radios):
print("create-ssid-profile, template: %s" % (template))
profile = self.get_customer_profile_by_name(cloudSDK_url, bearer, customer_id, template)
profile['name'] = name
@@ -762,7 +767,6 @@ class CloudSDK:
self.check_response("POST", response, headers, data_str, url)
ssid_profile = response.json()
return ssid_profile['id']
def create_or_update_ssid_profile(self, cloudSDK_url, bearer, customer_id, template, name,
ssid, passkey, radius, security, mode, vlan, radios):
@@ -774,7 +778,7 @@ class CloudSDK:
ssid, passkey, radius, security, mode, vlan, radios)
# Update then.
print("Update existing ssid profile, name: %s"%(name))
print("Update existing ssid profile, name: %s" % (name))
profile['name'] = name
profile['details']['ssid'] = ssid
profile['details']['keyStr'] = passkey
@@ -797,7 +801,7 @@ class CloudSDK:
# General usage: get the default profile, modify it accordingly, pass it back to here
# Not tested yet.
def create_rf_profile(self, cloudSDK_url, bearer, customer_id, template, name, new_prof):
print("create-rf-profile, template: %s"%(template))
print("create-rf-profile, template: %s" % (template))
url = cloudSDK_url + "/portal/profile"
headers = {
@@ -809,7 +813,7 @@ class CloudSDK:
self.check_response("POST", response, headers, data_str, url)
ssid_profile = response.json()
return ssid_profile['id']
# Not tested yet.
def create_or_update_rf_profile(self, cloudSDK_url, bearer, customer_id, template, name,
new_prof):
@@ -820,7 +824,7 @@ class CloudSDK:
return self.create_rf_profile(cloudSDK_url, bearer, customer_id, template, name, new_prof)
# Update then.
print("Update existing ssid profile, name: %s"%(name))
print("Update existing ssid profile, name: %s" % (name))
url = cloudSDK_url + "/portal/profile"
headers = {
@@ -834,12 +838,11 @@ class CloudSDK:
def create_radius_profile(self, cloudSDK_url, bearer, customer_id, template, name, subnet_name, subnet, subnet_mask,
region, server_name, server_ip, secret, auth_port):
print("Create-radius-profile called, template: %s"%(template))
print("Create-radius-profile called, template: %s" % (template))
profile = self.get_customer_profile_by_name(cloudSDK_url, bearer, customer_id, template)
profile['name'] = name
subnet_config = profile['details']['subnetConfiguration']
old_subnet_name = list(subnet_config.keys())[0]
subnet_config[subnet_name] = subnet_config.pop(old_subnet_name)
@@ -868,7 +871,7 @@ class CloudSDK:
}
data_str = json.dumps(profile)
print("Sending json to create radius: %s"%(data_str))
print("Sending json to create radius: %s" % (data_str))
response = requests.request("POST", url, headers=headers, data=data_str)
self.check_response("POST", response, headers, data_str, url)
radius_profile = response.json()
@@ -876,51 +879,31 @@ class CloudSDK:
radius_profile_id = radius_profile['id']
return radius_profile_id
def create_or_update_radius_profile(self, cloudSDK_url, bearer, customer_id, template, name, subnet_name, subnet, subnet_mask,
def create_or_update_radius_profile(self, cloudSDK_url, bearer, customer_id, template, name, subnet_name, subnet,
subnet_mask,
region, server_name, server_ip, secret, auth_port):
profile = self.get_customer_profile_by_name(cloudSDK_url, bearer, customer_id, name)
if profile == None:
# create one then
return self.create_radius_profile(cloudSDK_url, bearer, customer_id, template, name, subnet_name, subnet, subnet_mask,
region, server_name, server_ip, secret, auth_port)
print("Found existing radius profile, will update, name: %s"%(name))
with open(template, 'r+') as radius_profile:
profile = json.load(radius_profile)
subnet_config = profile['details']['subnetConfiguration']
old_subnet_name = list(subnet_config.keys())[0]
subnet_config[subnet_name] = subnet_config.pop(old_subnet_name)
profile['details']['subnetConfiguration'][subnet_name]['subnetAddress'] = subnet
profile['details']['subnetConfiguration'][subnet_name]['subnetCidrPrefix'] = subnet_mask
profile['details']['subnetConfiguration'][subnet_name]['subnetName'] = subnet_name
# profile = {"model_type": "Profile", "id": 129, "customerId": "2", "profileType": "radius", "name": "Automation_Radius_Nightly", "details": {"model_type": "RadiusProfile", "primaryRadiusAuthServer": {"model_type": "RadiusServer", "ipAddress": "18.189.25.141", "secret": "testing123", "port": 1812, "timeout": 5}, "secondaryRadiusAuthServer": null, "primaryRadiusAccountingServer": null, "secondaryRadiusAccountingServer": null, "profileType": "radius"}, "createdTimestamp": 1602263176599, "lastModifiedTimestamp": 1611708334061, "childProfileIds": []}
profile['name'] = name
profile['customerId'] = customer_id
profile['details']["primaryRadiusAuthServer"]['ipAddress'] = server_ip
profile['details']["primaryRadiusAuthServer"]['secret'] = secret
profile['details']["primaryRadiusAuthServer"]['port'] = auth_port
region_map = profile['details']['serviceRegionMap']
old_region = list(region_map.keys())[0]
region_map[region] = region_map.pop(old_region)
profile['details']['serviceRegionName'] = region
profile['details']['subnetConfiguration'][subnet_name]['serviceRegionName'] = region
profile['details']['serviceRegionMap'][region]['regionName'] = region
server_map = profile['details']['serviceRegionMap'][region]['serverMap']
old_server_name = list(server_map.keys())[0]
server_map[server_name] = server_map.pop(old_server_name)
profile['details']['serviceRegionMap'][region]['serverMap'][server_name][0]['ipAddress'] = server_ip
profile['details']['serviceRegionMap'][region]['serverMap'][server_name][0]['secret'] = secret
profile['details']['serviceRegionMap'][region]['serverMap'][server_name][0]['authPort'] = auth_port
with open(template, 'w') as radius_profile:
json.dump(profile, radius_profile)
url = cloudSDK_url + "/portal/profile"
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer
}
data_str = json.dumps(profile)
response = requests.request("PUT", url, headers=headers, data=data_str)
self.check_response("PUT", response, headers, data_str, url)
# TODO: Commented code below is wrong, obviously in hindsight. But, need to parse response
# and throw exception or something if it is an error code.
#response = requests.request("PUT", url, headers=headers, data=profile)
print(response)
return profile['id']
response = requests.request("POST", url, headers=headers, data=open(template, 'rb'))
radius_profile = response.json()
radius_profile_id = radius_profile['id']
return radius_profile_id
# Library for creating AP Profiles
@@ -958,11 +941,11 @@ class CreateAPProfiles:
"name": "Lab-RADIUS",
"subnet_name": "Lab",
"subnet": "10.10.0.0",
"subnet": "8.189.25.141",
"subnet_mask": 16,
"region": "Toronto",
"server_name": "Lab-RADIUS",
"server_ip": "10.10.10.203",
"server_ip": "18.189.25.141",
"secret": "testing123",
"auth_port": 1812
}
@@ -1139,11 +1122,11 @@ class CreateAPProfiles:
self.ssid_data["2g"]["wpa"]["nat"] = ssid_2g_wpa
self.ssid_data["2g"]["wpa"]["vlan"] = ssid_2g_wpa
def create_radius_profile(self, radius_name, rid, key):
def create_radius_profile(self, radius_name=None, radius_template=None, rid=None, key=None):
### Create RADIUS profile - used for all EAP SSIDs
self.radius_name = radius_name
self.radius_template = "Radius-Profile" # Default radius profile found in cloud-sdk
self.radius_template = radius_template # Default radius profile found in cloud-sdk
self.subnet_name = self.radius_info['subnet_name']
self.subnet = self.radius_info['subnet']
self.subnet_mask = self.radius_info['subnet_mask']
@@ -1166,10 +1149,10 @@ class CreateAPProfiles:
self.server_name, self.server_ip,
self.secret, self.auth_port)
print("radius profile Id is", self.radius_profile)
client.update_testrail(case_id=self.test_cases["radius_profile"], run_id=self.rid, status_id=1,
msg='RADIUS profile created successfully')
self.client.update_testrail(case_id=self.test_cases["radius_profile"], run_id=self.rid, status_id=1,
msg='RADIUS profile created successfully')
self.test_cases["radius_profile"] = "passed"
except:
except Exception as ex:
print(ex)
logging.error(logging.traceback.format_exc())
print("RADIUS Profile Create Error, will skip radius profile.")
@@ -1196,7 +1179,6 @@ class CreateAPProfiles:
print("CreateAPProfile::create_ssid_profile, skip-wpa: ", skip_wpa, " skip-wpa2: ", skip_wpa2, " skip-eap: ",
skip_eap)
if not skip_eap:
# 5G eap
try:
@@ -1220,7 +1202,8 @@ class CreateAPProfiles:
logging.error(logging.traceback.format_exc())
self.fiveG_eap = None
print("5G EAP SSID create failed - " + mode + " mode")
self.client.update_testrail(case_id=self.test_cases["ssid_5g_eap_" + mode], run_id=self.rid, status_id=5,
self.client.update_testrail(case_id=self.test_cases["ssid_5g_eap_" + mode], run_id=self.rid,
status_id=5,
msg='5G EAP SSID create failed - ' + mode + ' mode')
self.test_cases["ssid_5g_eap_" + mode] = "failed"
@@ -1236,7 +1219,8 @@ class CreateAPProfiles:
mode.upper(), 1,
["is2dot4GHz"])
print("2.4G EAP SSID created successfully - " + mode + " mode")
self.client.update_testrail(case_id=self.test_cases["ssid_5g_eap_" + mode], run_id=self.rid, status_id=1,
self.client.update_testrail(case_id=self.test_cases["ssid_5g_eap_" + mode], run_id=self.rid,
status_id=1,
msg='2.4G EAP SSID created successfully - ' + mode + ' mode')
self.test_cases["ssid_5g_eap_" + mode] = "passed"
except Exception as ex:
@@ -1244,11 +1228,11 @@ class CreateAPProfiles:
logging.error(logging.traceback.format_exc())
self.twoFourG_eap = None
print("2.4G EAP SSID create failed - bridge mode")
self.client.update_testrail(case_id=self.test_cases["ssid_5g_eap_" + mode], run_id=self.rid, status_id=5,
self.client.update_testrail(case_id=self.test_cases["ssid_5g_eap_" + mode], run_id=self.rid,
status_id=5,
msg='2.4G EAP SSID create failed - bridge mode')
self.test_cases["ssid_5g_eap_" + mode] = "failed"
if not skip_wpa2:
# 5g wpa2
try:
@@ -1271,7 +1255,8 @@ class CreateAPProfiles:
logging.error(logging.traceback.format_exc())
self.fiveG_wpa2 = None
print("5G WPA2 SSID create failed - " + mode + " mode")
self.client.update_testrail(case_id=self.test_cases["ssid_5g_wpa2_" + mode], run_id=self.rid, status_id=5,
self.client.update_testrail(case_id=self.test_cases["ssid_5g_wpa2_" + mode], run_id=self.rid,
status_id=5,
msg='5G WPA2 SSID create failed - ' + mode + ' mode')
self.test_cases["ssid_5g_wpa2_" + mode] = "failed"
@@ -1280,7 +1265,8 @@ class CreateAPProfiles:
self.twoFourG_wpa2 = self.cloud.create_or_update_ssid_profile(self.command_line_args.sdk_base_url,
self.bearer, self.customer_id,
self.ssid_template,
self.profile_data['2g']['wpa2'][self.mode],
self.profile_data['2g']['wpa2'][
self.mode],
self.ssid_data['2g']['wpa2'][self.mode],
self.psk_data['2g']['wpa2'][self.mode],
"Radius-Accounting-Profile",
@@ -1296,11 +1282,11 @@ class CreateAPProfiles:
logging.error(logging.traceback.format_exc())
self.twoFourG_wpa2 = None
print("2.4G WPA2 SSID create failed - " + mode + " mode")
self.client.update_testrail(case_id=self.test_cases["ssid_2g_wpa2_" + mode], run_id=self.rid, status_id=5,
self.client.update_testrail(case_id=self.test_cases["ssid_2g_wpa2_" + mode], run_id=self.rid,
status_id=5,
msg='2.4G WPA2 SSID create failed - ' + mode + ' mode')
self.test_cases["ssid_2g_wpa2_" + mode] = "failed"
if not skip_wpa:
# 5g wpa
try:
@@ -1324,7 +1310,8 @@ class CreateAPProfiles:
logging.error(logging.traceback.format_exc())
self.fiveG_wpa = None
print("5G WPA SSID create failed - " + mode + " mode")
self.client.update_testrail(case_id=self.test_cases["ssid_5g_wpa_" + mode], run_id=self.rid, status_id=5,
self.client.update_testrail(case_id=self.test_cases["ssid_5g_wpa_" + mode], run_id=self.rid,
status_id=5,
msg='5G WPA SSID create failed - ' + mode + ' mode')
self.test_cases["ssid_5g_wpa_" + mode] = "failed"
@@ -1340,7 +1327,8 @@ class CreateAPProfiles:
mode.upper(), 1,
["is2dot4GHz"])
print("2.4G WPA SSID created successfully - " + mode + " mode")
self.client.update_testrail(case_id=self.test_cases["ssid_2g_wpa_" + mode], run_id=self.rid, status_id=1,
self.client.update_testrail(case_id=self.test_cases["ssid_2g_wpa_" + mode], run_id=self.rid,
status_id=1,
msg='2.4G WPA SSID created successfully - ' + mode + ' mode')
self.test_cases["ssid_2g_wpa_" + mode] = "passed"
except Exception as ex:
@@ -1348,7 +1336,8 @@ class CreateAPProfiles:
logging.error(logging.traceback.format_exc())
self.twoFourG_wpa = None
print("2.4G WPA SSID create failed - " + mode + " mode")
self.client.update_testrail(case_id=self.test_cases["ssid_2g_wpa_" + mode], run_id=self.rid, status_id=5,
self.client.update_testrail(case_id=self.test_cases["ssid_2g_wpa_" + mode], run_id=self.rid,
status_id=5,
msg='2.4G WPA SSID create failed - ' + mode + ' mode')
self.test_cases["ssid_2g_wpa_" + mode] = "failed"
@@ -1408,17 +1397,17 @@ class CreateAPProfiles:
self.child_profiles)
self.test_profile_id = self.create_ap_profile
print("Test Profile ID for Test is:", self.test_profile_id)
self.client.update_testrail(case_id=self.test_cases["ap_"+ self.mode], run_id=self.rid, status_id=1,
self.client.update_testrail(case_id=self.test_cases["ap_" + self.mode], run_id=self.rid, status_id=1,
msg='AP profile for ' + mode + ' tests created successfully')
self.test_cases["ap_"+self.mode] = "passed"
self.test_cases["ap_" + self.mode] = "passed"
except Exception as ex:
print(ex)
logging.error(logging.traceback.format_exc())
create_ap_profile = "error"
print("Error creating AP profile for bridge tests. Will use existing AP profile")
self.client.update_testrail(case_id=self.test_cases["ap_"+self.mode], run_id=self.rid, status_id=5,
self.client.update_testrail(case_id=self.test_cases["ap_" + self.mode], run_id=self.rid, status_id=5,
msg='AP profile for ' + mode + ' tests could not be created using API')
self.test_cases["ap_"+self.mode] = "failed"
self.test_cases["ap_" + self.mode] = "failed"
self.ap_profile = self.cloud.set_ap_profile(eq_id, self.test_profile_id)