cicd changes with latest new files, ultiple client connectivity with WPA, WPA2, EAP auth across NAT and bridge mode with vlan

Signed-off-by: Syama <syama.devi@connectus.ai>
This commit is contained in:
Syama
2020-11-20 11:25:40 -05:00
parent 33b8c7ff98
commit a66a0ac2a9
8 changed files with 2488 additions and 0 deletions

View File

@@ -0,0 +1,399 @@
import csv
import sys
import time
import datetime
from datetime import date
import json
import os
import logging
import single_client_throughput
import cloudsdk
from cloudsdk import CloudSDK
import lab_ap_info
cloudSDK_url=os.getenv('CLOUD_SDK_URL')
station = ["tput5000"]
runtime = 10
csv_path=os.getenv('CSV_PATH')
#EAP Credentials
identity=os.getenv('EAP_IDENTITY')
ttls_password=os.getenv('EAP_PWD')
local_dir=os.getenv('TPUT_LOG_DIR')
logger = logging.getLogger('Throughput_Test')
hdlr = logging.FileHandler(local_dir+"/Throughput_Testing.log")
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.INFO)
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../../py-json')
def throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput):
#parse client_tput list returned from single_client_throughput
udp_ds = client_tput[0].partition(": ")[2]
udp_us = client_tput[1].partition(": ")[2]
tcp_ds = client_tput[2].partition(": ")[2]
tcp_us = client_tput[3].partition(": ")[2]
# Find band for CSV ---> This code is not great, it SHOULD get that info from LANForge!
if "5G" in ssid_name:
frequency = "5 GHz"
elif "2dot4G" in ssid_name:
frequency = "2.4 GHz"
else:
frequency = "Unknown"
# Append row to top of CSV file
row = [ap_model, firmware, frequency, mimo, security, mode, udp_ds, udp_us, tcp_ds, tcp_us]
with open(csv_file, 'r') as readFile:
reader = csv.reader(readFile)
lines = list(reader)
lines.insert(1, row)
with open(csv_file, 'w') as writeFile:
writer = csv.writer(writeFile)
writer.writerows(lines)
readFile.close()
writeFile.close()
#Import dictionaries for AP Info
from lab_ap_info import equipment_id_dict
from lab_ap_info import profile_info_dict
from lab_ap_info import ap_models
from lab_ap_info import mimo_2dot4g
from lab_ap_info import mimo_5g
#import json file to determine if throughput should be run for specific AP model
sanity_status = json.load(open("sanity_status.json"))
#bearer = CloudSDK.get_bearer(cloudSDK_url)
#print(bearer)
#create CSV file for test run
today = str(date.today())
csv_file = csv_path+"throughput_test_"+today+".csv"
headers = ['AP Type', 'Firmware','Radio', 'MIMO', 'Security', 'Mode', 'UDP Downstream (Mbps)', 'UDP Upstream (Mbps)', 'TCP Downstream (Mbps)', 'TCP Upstream (Mbps)']
with open(csv_file, "w") as file:
create = csv.writer(file)
create.writerow(headers)
file.close()
ap_firmware_dict = {
"ea8300": '',
"ecw5211": '',
"ecw5410": '',
"ec420": ''
}
logger.info('Start of Throughput Test')
for key in equipment_id_dict:
if sanity_status['sanity_status'][key] == "passed":
logger.info("Running throughput test on " + key)
##Get Bearer Token to make sure its valid (long tests can require re-auth)
bearer = CloudSDK.get_bearer(cloudSDK_url)
###Get Current AP Firmware
customer_id = "2"
equipment_id = equipment_id_dict[key]
ap_fw = CloudSDK.ap_firmware(customer_id, equipment_id, cloudSDK_url, bearer)
fw_model = ap_fw.partition("-")[0]
print("AP MODEL UNDER TEST IS", fw_model)
print('Current AP Firmware:', ap_fw)
##add current FW to dictionary
ap_firmware_dict[fw_model] = ap_fw
###########################################################################
############## Bridge Throughput Testing #################################
###########################################################################
print("Testing for Bridge SSIDs")
logger.info("Starting Brdige SSID tput tests on " + key)
###Set Proper AP Profile for Bridge SSID Tests
test_profile_id = profile_info_dict[fw_model]["profile_id"]
#print(test_profile_id)
ap_profile = CloudSDK.set_ap_profile(equipment_id, test_profile_id, cloudSDK_url, bearer)
### Wait for Profile Push
print('-----------------PROFILE PUSH -------------------')
time.sleep(180)
# 5G WPA2 Enterprise UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
sta_list = station
radio = "wiphy3"
ssid_name = profile_info_dict[fw_model]["fiveG_WPA2-EAP_SSID"]
security = "wpa2"
eap_type = "TTLS"
mode = "Bridge"
mimo = mimo_5g[fw_model]
client_tput = single_client_throughput.eap_tput(sta_list, ssid_name, radio, security, eap_type, identity, ttls_password)
print(fw_model, "5 GHz WPA2-EAP throughput:\n", client_tput)
security = "wpa2-eap"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
#5G WPA2 UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
radio = "wiphy3"
ssid_name = profile_info_dict[fw_model]["fiveG_WPA2_SSID"]
ssid_psk = profile_info_dict[fw_model]["fiveG_WPA2_PSK"]
security = "wpa2"
mode = "Bridge"
mimo = mimo_5g[fw_model]
client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime)
print(fw_model, "5 GHz WPA2 throughput:\n",client_tput)
security = "wpa2-psk"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 5G WPA UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
radio = "wiphy3"
ssid_name = profile_info_dict[fw_model]["fiveG_WPA_SSID"]
ssid_psk = profile_info_dict[fw_model]["fiveG_WPA_PSK"]
security = "wpa"
mode = "Bridge"
mimo = mimo_5g[fw_model]
client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime)
print(fw_model, "5 GHz WPA throughput:\n",client_tput)
security = "wpa-psk"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 5G Open UDP DS/US and TCP DS/US
# ap_model = fw_model
# firmware = ap_fw
# radio = "wiphy3"
# ssid_name = profile_info_dict[fw_model]["fiveG_OPEN_SSID"]
# ssid_psk = "BLANK"
# security = "open"
#mode = "Bridge"
#mimo = mimo_5g[fw_model]
# client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime)
#print(fw_model, "5 GHz Open throughput:\n",client_tput)
#throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 2.4G WPA2 Enterprise UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
sta_list = station
radio = "wiphy0"
ssid_name = profile_info_dict[fw_model]["twofourG_WPA2-EAP_SSID"]
security = "wpa2"
eap_type = "TTLS"
mode = "Bridge"
mimo = mimo_2dot4g[fw_model]
client_tput = single_client_throughput.eap_tput(sta_list, ssid_name, radio, security, eap_type, identity,
ttls_password)
print(fw_model, "2.4 GHz WPA2-EAP throughput:\n", client_tput)
security = "wpa2-eap"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 2.4G WPA2 UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
radio = "wiphy0"
ssid_name = profile_info_dict[fw_model]["twoFourG_WPA2_SSID"]
ssid_psk = profile_info_dict[fw_model]["twoFourG_WPA2_PSK"]
security = "wpa2"
mode = "Bridge"
mimo = mimo_2dot4g[fw_model]
client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime)
print(fw_model, "2.4 GHz WPA2 throughput:\n",client_tput)
security = "wpa2-psk"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 2.4G WPA UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
radio = "wiphy0"
ssid_name = profile_info_dict[fw_model]["twoFourG_WPA_SSID"]
ssid_psk = profile_info_dict[fw_model]["twoFourG_WPA_PSK"]
security = "wpa"
mode = "Bridge"
mimo = mimo_2dot4g[fw_model]
client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime)
print(fw_model, "2.4 GHz WPA throughput:\n",client_tput)
security = "wpa-psk"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 2.4G Open UDP DS/US and TCP DS/US
#ap_model = fw_model
#firmware = ap_fw
# radio = "wiphy3"
# ssid_name = profile_info_dict[fw_model]["twoFourG_OPEN_SSID"]
# ssid_psk = "BLANK"
# security = "open"
#mode = "Bridge"
#mimo = mimo_2dot4g[fw_model]
#client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime, csv_file)
#print(fw_model, "2.4 GHz Open throughput:\n",client_tput)
#throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
###########################################################################
################# NAT Mode Throughput Testing ############################
###########################################################################
print('Testing for NAT SSIDs')
logger.info("Starting NAT SSID tput tests on " + key)
###Set Proper AP Profile for NAT SSID Tests
test_profile_id = profile_info_dict[fw_model + '_nat']["profile_id"]
print(test_profile_id)
ap_profile = CloudSDK.set_ap_profile(equipment_id, test_profile_id, cloudSDK_url, bearer)
### Wait for Profile Push
print('-----------------PROFILE PUSH -------------------')
time.sleep(180)
# 5G WPA2 Enterprise UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
sta_list = station
radio = "wiphy3"
ssid_name = profile_info_dict[fw_model+'_nat']["fiveG_WPA2-EAP_SSID"]
security = "wpa2"
eap_type = "TTLS"
mode = "NAT"
mimo = mimo_5g[fw_model]
client_tput = single_client_throughput.eap_tput(sta_list, ssid_name, radio, security, eap_type, identity,
ttls_password)
print(fw_model, "5 GHz WPA2-EAP NAT throughput:\n", client_tput)
security = "wpa2-eap"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 5G WPA2 NAT UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
radio = "wiphy3"
ssid_name = profile_info_dict[fw_model+'_nat']["fiveG_WPA2_SSID"]
ssid_psk = profile_info_dict[fw_model+'_nat']["fiveG_WPA2_PSK"]
security = "wpa2"
mode = "NAT"
mimo = mimo_5g[fw_model]
security = "wpa2-psk"
client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station,
runtime)
print(fw_model, "5 GHz WPA2 NAT throughput:\n", client_tput)
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 5G WPA UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
radio = "wiphy3"
ssid_name = profile_info_dict[fw_model+'_nat']["fiveG_WPA_SSID"]
ssid_psk = profile_info_dict[fw_model+'_nat']["fiveG_WPA_PSK"]
security = "wpa"
mode = "NAT"
mimo = mimo_5g[fw_model]
client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station,
runtime)
print(fw_model, "5 GHz WPA NAT throughput:\n", client_tput)
security = "wpa-psk"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 5G Open UDP DS/US and TCP DS/US
# ap_model = fw_model
# firmware = ap_fw
# radio = "wiphy3"
# ssid_name = profile_info_dict[fw_model+'_nat']["fiveG_OPEN_SSID"]
# ssid_psk = "BLANK"
# security = "open"
# mode = "NAT"
#mimo = mimo_5g[fw_model]
# client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime)
# print(fw_model, "5 GHz Open NAT throughput:\n",client_tput)
# throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 2.4G WPA2 Enterprise UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
sta_list = station
radio = "wiphy0"
ssid_name = profile_info_dict[fw_model+'_nat']["twofourG_WPA2-EAP_SSID"]
security = "wpa2"
eap_type = "TTLS"
mode = "NAT"
mimo = mimo_2dot4g[fw_model]
client_tput = single_client_throughput.eap_tput(sta_list, ssid_name, radio, security, eap_type, identity,
ttls_password)
print(fw_model, "2.4 GHz WPA2-EAP NAT throughput:\n", client_tput)
security = "wpa2-eap"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 2.4G WPA2 UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
radio = "wiphy0"
ssid_name = profile_info_dict[fw_model+'_nat']["twoFourG_WPA2_SSID"]
ssid_psk = profile_info_dict[fw_model+'_nat']["twoFourG_WPA2_PSK"]
security = "wpa2"
mode = "NAT"
mimo = mimo_2dot4g[fw_model]
client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station,
runtime)
print(fw_model, "2.4 GHz WPA2 NAT throughput:\n", client_tput)
security = "wpa2-psk"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 2.4G WPA UDP DS/US and TCP DS/US
ap_model = fw_model
firmware = ap_fw
radio = "wiphy0"
ssid_name = profile_info_dict[fw_model+'_nat']["twoFourG_WPA_SSID"]
ssid_psk = profile_info_dict[fw_model+'_nat']["twoFourG_WPA_PSK"]
security = "wpa"
mode = "NAT"
mimo = mimo_2dot4g[fw_model]
client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station,
runtime)
print(fw_model, "2.4 GHz WPA NAT throughput:\n", client_tput)
security = "wpa-psk"
throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
# 2.4G Open NAT UDP DS/US and TCP DS/US
# ap_model = fw_model
# firmware = ap_fw
# radio = "wiphy3"
# ssid_name = profile_info_dict[fw_model+'_nat']["twoFourG_OPEN_SSID"]
# ssid_psk = "BLANK"
# security = "open"
# mode = "NAT"
#mimo = mimo_2dot4g[fw_model]
# client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime, csv_file)
# print(fw_model, "2.4 GHz Open NAT throughput:\n",client_tput)
# throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput)
#Indicates throughput has been run for AP model
sanity_status['sanity_status'][key] = "tput run"
logger.info("Trhoughput tests complete on " + key)
elif sanity_status['sanity_status'][key] == "tput run":
print("Throughput test already run on", key)
logger.info("Throughput test already run on "+ key +" for latest AP FW")
else:
print(key,"did not pass Nightly Sanity. Skipping throughput test on this AP Model")
logger.info(key+" did not pass Nightly Sanity. Skipping throughput test.")
#Indicate which AP model has had tput test to external json file
with open('sanity_status.json', 'w') as json_file:
json.dump(sanity_status, json_file)
with open(csv_file, 'r') as readFile:
reader = csv.reader(readFile)
lines = list(reader)
row_count = len(lines)
#print(row_count)
if row_count <= 1:
os.remove(csv_file)
file.close()
else:
print("Saving File")
file.close()
print(" -- Throughput Testing Complete -- ")

View File

@@ -0,0 +1,65 @@
import paramiko
from paramiko import SSHClient
import socket
def ssh_cli_active_fw(ap_ip, username, password):
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ap_ip, username=username, password=password, timeout=5)
stdin, stdout, stderr = client.exec_command('/usr/opensync/bin/ovsh s AWLAN_Node -c | grep FW_IMAGE_ACTIVE')
version_matrix = str(stdout.read())
version_matrix_split = version_matrix.partition('FW_IMAGE_ACTIVE","')[2]
cli_active_fw = version_matrix_split.partition('"],[')[0]
#print("Active FW is",cli_active_fw)
stdin, stdout, stderr = client.exec_command('/usr/opensync/bin/ovsh s Manager -c | grep status')
status = str(stdout.read())
if "ACTIVE" in status:
#print("AP is in Active state")
state = "active"
elif "BACKOFF" in status:
#print("AP is in Backoff state")
state = "backoff"
else:
#print("AP is not in Active state")
state = "unknown"
cli_info = {
"state": state,
"active_fw": cli_active_fw
}
return(cli_info)
except paramiko.ssh_exception.AuthenticationException:
print("Authentication Error, Check Credentials")
return "ERROR"
except paramiko.SSHException:
print("Cannot SSH to the AP")
return "ERROR"
except socket.timeout:
print("AP Unreachable")
return "ERROR"
def iwinfo_status(ap_ip, username, password):
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ap_ip, username=username, password=password, timeout=5)
stdin, stdout, stderr = client.exec_command('iwinfo | grep ESSID')
for line in stdout.read().splitlines():
print(line)
except paramiko.ssh_exception.AuthenticationException:
print("Authentication Error, Check Credentials")
return "ERROR"
except paramiko.SSHException:
print("Cannot SSH to the AP")
return "ERROR"
except socket.timeout:
print("AP Unreachable")
return "ERROR"

View File

@@ -0,0 +1,152 @@
#!/usr/bin/python3
import base64
import urllib.request
from bs4 import BeautifulSoup
import ssl
import subprocess, os
from artifactory import ArtifactoryPath
import tarfile
import paramiko
from paramiko import SSHClient
from scp import SCPClient
import os
import pexpect
from pexpect import pxssh
import sys
import paramiko
from scp import SCPClient
import pprint
from pprint import pprint
from os import listdir
import re
import requests
import json
import testrail_api
import logging
import datetime
import time
user=os.getenv('CLOUDSDK_USER')
password=os.getenv('CLOUDSDK_PWD')
###Class for CloudSDK Interaction via RestAPI
class CloudSDK:
def __init__(self):
self.user = user
def get_bearer(cloudSDK_url):
cloud_login_url = cloudSDK_url+"/management/v1/oauth2/token"
payload = '''
{
"userId": "'''+user+'''",
"password": "'''+password+'''"
}
'''
headers = {
'Content-Type': 'application/json'
}
try:
token_response = requests.request("POST", cloud_login_url, headers=headers, data=payload)
except requests.exceptions.RequestException as e:
raise SystemExit("Exiting Script! Cloud not get bearer token for reason:",e)
token_data = token_response.json()
bearer_token = token_data['access_token']
return(bearer_token)
def ap_firmware(customer_id,equipment_id, cloudSDK_url, bearer):
equip_fw_url = cloudSDK_url+"/portal/status/forEquipment?customerId="+customer_id+"&equipmentId="+equipment_id
payload = {}
headers = {
'Authorization': 'Bearer ' + bearer
}
status_response = requests.request("GET", equip_fw_url, headers=headers, data=payload)
status_code = status_response.status_code
if status_code is 200:
status_data = status_response.json()
#print(status_data)
current_ap_fw = status_data[2]['details']['reportedSwVersion']
return current_ap_fw
else:
return("ERROR")
def CloudSDK_images(apModel, cloudSDK_url, bearer):
getFW_url = cloudSDK_url+"/portal/firmware/version/byEquipmentType?equipmentType=AP&modelId=" + apModel
payload = {}
headers = {
'Authorization': 'Bearer ' + bearer
}
response = requests.request("GET", getFW_url, headers=headers, data=payload)
ap_fw_details = response.json()
###return ap_fw_details
fwlist = []
for version in ap_fw_details:
fwlist.append(version.get('versionName'))
return(fwlist)
#fw_versionNames = ap_fw_details[0]['versionName']
#return fw_versionNames
def firwmare_upload(commit, apModel,latest_image,fw_url,cloudSDK_url,bearer):
fw_upload_url = cloudSDK_url+"/portal/firmware/version"
payload = "{\n \"model_type\": \"FirmwareVersion\",\n \"id\": 0,\n \"equipmentType\": \"AP\",\n \"modelId\": \""+apModel+"\",\n \"versionName\": \""+latest_image+"\",\n \"description\": \"\",\n \"filename\": \""+fw_url+"\",\n \"commit\": \""+commit+"\",\n \"validationMethod\": \"MD5_CHECKSUM\",\n \"validationCode\": \"19494befa87eb6bb90a64fd515634263\",\n \"releaseDate\": 1596192028877,\n \"createdTimestamp\": 0,\n \"lastModifiedTimestamp\": 0\n}\n\n"
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer
}
response = requests.request("POST", fw_upload_url, headers=headers, data=payload)
#print(response)
upload_result = response.json()
return(upload_result)
def get_firmware_id(latest_ap_image, cloudSDK_url, bearer):
#print(latest_ap_image)
fw_id_url = cloudSDK_url+"/portal/firmware/version/byName?firmwareVersionName="+latest_ap_image
payload = {}
headers = {
'Authorization': 'Bearer ' + bearer
}
response = requests.request("GET", fw_id_url, headers=headers, data=payload)
fw_data = response.json()
latest_fw_id = fw_data['id']
return latest_fw_id
def update_firmware(equipment_id, latest_firmware_id, cloudSDK_url, bearer):
url = cloudSDK_url+"/portal/equipmentGateway/requestFirmwareUpdate?equipmentId="+equipment_id+"&firmwareVersionId="+latest_firmware_id
payload = {}
headers = {
'Authorization': 'Bearer ' + bearer
}
response = requests.request("POST", url, headers=headers, data=payload)
#print(response.text)
return response.json()
def set_ap_profile(equipment_id, test_profile_id, cloudSDK_url, bearer):
###Get AP Info
url = cloudSDK_url+"/portal/equipment?equipmentId="+equipment_id
payload = {}
headers = {
'Authorization': 'Bearer ' + bearer
}
response = requests.request("GET", url, headers=headers, data=payload)
print(response)
###Add Lab Profile ID to Equipment
equipment_info = response.json()
#print(equipment_info)
equipment_info["profileId"] = test_profile_id
#print(equipment_info)
###Update AP Info with Required Profile ID
url = cloudSDK_url+"/portal/equipment"
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer
}
response = requests.request("PUT", url, headers=headers, data=json.dumps(equipment_info))
#print(response)

View File

@@ -0,0 +1,365 @@
#!/usr/bin/env python3
# This will create a station, create TCP and UDP traffic, run it a short amount of time,
# and verify whether traffic was sent and received. It also verifies the station connected
# to the requested BSSID if bssid is specified as an argument.
# The script will clean up the station and connections at the end of the test.
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../../py-json')
import argparse
import LANforge
from LANforge import LFUtils
# from LANforge import LFCliBase
from LANforge import lfcli_base
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import *
import realm
from realm import Realm
import pprint
OPEN="open"
WEP="wep"
WPA="wpa"
WPA2="wpa2"
MODE_AUTO=0
class EAPConnect(LFCliBase):
def __init__(self, host, port, security=None, ssid=None, sta_list=None, number_template="00000", _debug_on=False, _dut_bssid="",
_exit_on_error=False, _sta_name=None, _resource=1, radio="wiphy0", key_mgmt="WPA-EAP", eap="", identity="",
ttls_passwd="", hessid=None, ttls_realm="", domain="", _exit_on_fail=False, _cleanup_on_exit=True):
super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.ssid = ssid
self.radio = radio
self.security = security
#self.password = password
self.sta_list = sta_list
self.key_mgmt = key_mgmt
self.eap = eap
self.identity = identity
self.ttls_passwd = ttls_passwd
self.ttls_realm = ttls_realm
self.domain = domain
self.hessid = hessid
self.dut_bssid = _dut_bssid
self.timeout = 120
self.number_template = number_template
self.debug = _debug_on
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
#Added to test_ipv4_ttls code
self.upstream_url = None # defer construction
self.sta_url_map = None
self.upstream_resource = None
self.upstream_port = "eth2"
self.station_names = []
if _sta_name is not None:
self.station_names = [_sta_name]
self.localrealm = Realm(lfclient_host=host, lfclient_port=port)
self.resource = _resource
self.cleanup_on_exit = _cleanup_on_exit
self.resulting_stations = {}
self.resulting_endpoints = {}
self.station_profile = None
self.l3_udp_profile = None
self.l3_tcp_profile = None
# def get_realm(self) -> Realm: # py > 3.6
def get_realm(self):
return self.localrealm
def get_station_url(self, sta_name_=None):
if sta_name_ is None:
raise ValueError("get_station_url wants a station name")
if self.sta_url_map is None:
self.sta_url_map = {}
for sta_name in self.station_names:
self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name)
return self.sta_url_map[sta_name_]
def get_upstream_url(self):
if self.upstream_url is None:
self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port)
return self.upstream_url
# Compare pre-test values to post-test values
def compare_vals(self, name, postVal, print_pass=False, print_fail=True):
# print(f"Comparing {name}")
if postVal > 0:
self._pass("%s %s" % (name, postVal), print_pass)
else:
self._fail("%s did not report traffic: %s" % (name, postVal), print_fail)
def remove_stations(self):
for name in self.station_names:
LFUtils.removePort(self.resource, name, self.lfclient_url)
def num_associated(self, bssid):
counter = 0
# print("there are %d results" % len(self.station_results))
fields = "_links,port,alias,ip,ap,port+type"
self.station_results = self.localrealm.find_ports_like("eap*", fields, debug_=False)
if (self.station_results is None) or (len(self.station_results) < 1):
self.get_failed_result_list()
for eid,record in self.station_results.items():
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
#pprint(eid)
#pprint(record)
if record["ap"] == bssid:
counter += 1
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
return counter
def clear_test_results(self):
self.resulting_stations = {}
self.resulting_endpoints = {}
super().clear_test_results()
#super(StaConnect, self).clear_test_results().test_results.clear()
def setup(self):
self.clear_test_results()
self.check_connect()
upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False)
if upstream_json is None:
self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True)
return False
if upstream_json['interface']['ip'] == "0.0.0.0":
if self.debug:
pprint.pprint(upstream_json)
self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True)
return False
# remove old stations
print("Removing old stations")
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
response = self.json_get(sta_url)
if (response is not None) and (response["interface"] is not None):
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names)
# Create stations and turn dhcp on
self.station_profile = self.localrealm.new_station_profile()
# Build stations
self.station_profile.use_security(self.security, self.ssid, passwd="[BLANK]")
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.set_wifi_extra(key_mgmt=self.key_mgmt, eap=self.eap, identity=self.identity,
passwd=self.ttls_passwd,
realm=self.ttls_realm, domain=self.domain,
hessid=self.hessid)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug, use_radius=True, hs20_enable=False)
self._pass("PASS: Station build finished")
# Create UDP endpoints
self.l3_udp_profile = self.localrealm.new_l3_cx_profile()
self.l3_udp_profile.side_a_min_bps = 128000
self.l3_udp_profile.side_b_min_bps = 128000
self.l3_udp_profile.side_a_min_pdu = 1200
self.l3_udp_profile.side_b_min_pdu = 1500
self.l3_udp_profile.report_timer = 1000
self.l3_udp_profile.name_prefix = "udp"
self.l3_udp_profile.create(endp_type="lf_udp",
side_a=list(self.localrealm.find_ports_like("eap+")),
side_b="%d.%s" % (self.resource, self.upstream_port),
suppress_related_commands=True)
# Create TCP endpoints
self.l3_tcp_profile = self.localrealm.new_l3_cx_profile()
self.l3_tcp_profile.side_a_min_bps = 128000
self.l3_tcp_profile.side_b_min_bps = 56000
self.l3_tcp_profile.name_prefix = "tcp"
self.l3_tcp_profile.report_timer = 1000
self.l3_tcp_profile.create(endp_type="lf_tcp",
side_a=list(self.localrealm.find_ports_like("eap+")),
side_b="%d.%s" % (self.resource, self.upstream_port),
suppress_related_commands=True)
def start(self):
if self.station_profile is None:
self._fail("Incorrect setup")
pprint.pprint(self.station_profile)
if self.station_profile.up is None:
self._fail("Incorrect station profile, missing profile.up")
if self.station_profile.up == False:
print("\nBringing ports up...")
data = {"shelf": 1,
"resource": self.resource,
"port": "ALL",
"probe_flags": 1}
self.json_post("/cli-json/nc_show_ports", data)
self.station_profile.admin_up()
LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names)
# station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
duration = 0
maxTime = 60
ip = "0.0.0.0"
ap = ""
print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="")
connected_stations = {}
while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime):
duration += 3
time.sleep(3)
print(".", end="")
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
station_info = self.json_get(sta_url + "?fields=port,ip,ap")
# LFUtils.debug_printer.pprint(station_info)
if (station_info is not None) and ("interface" in station_info):
if "ip" in station_info["interface"]:
ip = station_info["interface"]["ip"]
if "ap" in station_info["interface"]:
ap = station_info["interface"]["ap"]
if (ap == "Not-Associated") or (ap == ""):
if self.debug:
print(" -%s," % sta_name, end="")
else:
if ip == "0.0.0.0":
if self.debug:
print(" %s (0.0.0.0)" % sta_name, end="")
else:
connected_stations[sta_name] = sta_url
data = {
"shelf":1,
"resource": self.resource,
"port": "ALL",
"probe_flags": 1
}
self.json_post("/cli-json/nc_show_ports", data)
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
station_info = self.json_get(sta_url) # + "?fields=port,ip,ap")
if station_info is None:
print("unable to query %s" % sta_url)
self.resulting_stations[sta_url] = station_info
ap = station_info["interface"]["ap"]
ip = station_info["interface"]["ip"]
if (ap != "") and (ap != "Not-Associated"):
print(" %s +AP %s, " % (sta_name, ap), end="")
if self.dut_bssid != "":
if self.dut_bssid.lower() == ap.lower():
self._pass(sta_name+" connected to BSSID: " + ap)
# self.test_results.append("PASSED: )
# print("PASSED: Connected to BSSID: "+ap)
else:
self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap))
else:
self._fail(sta_name+" did not connect to AP")
return False
if ip == "0.0.0.0":
self._fail("%s did not get an ip. Ending test" % sta_name)
else:
self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip))
if self.passes() == False:
if self.cleanup_on_exit:
print("Cleaning up...")
self.remove_stations()
return False
# start cx traffic
print("\nStarting CX Traffic")
self.l3_udp_profile.start_cx()
self.l3_tcp_profile.start_cx()
time.sleep(1)
# Refresh stats
self.l3_udp_profile.refresh_cx()
self.l3_tcp_profile.refresh_cx()
def collect_endp_stats(self, endp_map):
print("Collecting Data")
fields="?fields=name,tx+bytes,rx+bytes"
for (cx_name, endps) in endp_map.items():
try:
endp_url = "/endp/%s%s" % (endps[0], fields)
endp_json = self.json_get(endp_url)
self.resulting_endpoints[endp_url] = endp_json
ptest_a_tx = endp_json['endpoint']['tx bytes']
ptest_a_rx = endp_json['endpoint']['rx bytes']
#ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"])
endp_url = "/endp/%s%s" % (endps[1], fields)
endp_json = self.json_get(endp_url)
self.resulting_endpoints[endp_url] = endp_json
ptest_b_tx = endp_json['endpoint']['tx bytes']
ptest_b_rx = endp_json['endpoint']['rx bytes']
self.compare_vals("testTCP-A TX", ptest_a_tx)
self.compare_vals("testTCP-A RX", ptest_a_rx)
self.compare_vals("testTCP-B TX", ptest_b_tx)
self.compare_vals("testTCP-B RX", ptest_b_rx)
except Exception as e:
print("Is this the function having the error?")
self.error(e)
def stop(self):
# stop cx traffic
print("Stopping CX Traffic")
self.l3_udp_profile.stop_cx()
self.l3_tcp_profile.stop_cx()
# Refresh stats
print("\nRefresh CX stats")
self.l3_udp_profile.refresh_cx()
self.l3_tcp_profile.refresh_cx()
print("Sleeping for 5 seconds")
time.sleep(5)
# get data for endpoints JSON
self.collect_endp_stats(self.l3_udp_profile.created_cx)
self.collect_endp_stats(self.l3_tcp_profile.created_cx)
# print("\n")
def cleanup(self):
# remove all endpoints and cxs
if self.cleanup_on_exit:
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
curr_endp_names = []
removeCX(self.lfclient_url, self.l3_udp_profile.get_cx_names())
removeCX(self.lfclient_url, self.l3_tcp_profile.get_cx_names())
for (cx_name, endp_names) in self.l3_udp_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
for (cx_name, endp_names) in self.l3_tcp_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
# ~class
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,248 @@
#!/usr/bin/python3
ap_models = ["ec420","ea8300","ecw5211","ecw5410"]
##AP Models for firmware upload
cloud_sdk_models = {
"ec420": "EC420-G1",
"ea8300": "EA8300-CA",
"ecw5211": "ECW5211",
"ecw5410": "ECW5410"
}
mimo_5g = {
"ec420": "4x4",
"ea8300": "2x2",
"ecw5211": "2x2",
"ecw5410": "4x4"
}
mimo_2dot4g = {
"ec420": "2x2",
"ea8300": "2x2",
"ecw5211": "2x2",
"ecw5410": "4x4"
}
sanity_status = {
"ea8300": "failed",
"ecw5211": 'passed',
"ecw5410": 'failed',
"ec420": 'failed'
}
##Equipment IDs for Lab APs under test
equipment_id_dict = {
"ea8300": "13",
"ecw5410": "12",
"ecw5211": "6",
"ec420": "11"
}
equipment_ip_dict = {
"ea8300": "10.10.10.103",
"ecw5410": "10.10.10.105",
"ec420": "10.10.10.104",
"ecw5211": "10.10.10.102"
}
eqiupment_credentials_dict = {
"ea8300": "openwifi",
"ecw5410": "openwifi",
"ec420": "openwifi",
"ecw5211": "admin123"
}
###Testing AP Profile Information
profile_info_dict = {
"ecw5410": {
"profile_id": "2",
"fiveG_WPA2_SSID": "ECW5410_5G_WPA2",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "ECW5410_5G_WPA",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "ECW5410_5G_OPEN",
"fiveG_WPA2-EAP_SSID": "ECW5410_5G_WPA2-EAP",
"twoFourG_OPEN_SSID": "ECW5410_2dot4G_OPEN",
"twoFourG_WPA2_SSID": "ECW5410_2dot4G_WPA2",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"ECW5410_2dot4G_WPA",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "ECW5410_2dot4G_WPA2-EAP"
},
"ea8300": {
"profile_id": "153",
"fiveG_WPA2_SSID": "EA8300_5G_WPA2",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "EA8300_5G_WPA",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "EA8300_5G_OPEN",
"fiveG_WPA2-EAP_SSID": "EA8300_5G_WPA2-EAP",
"twoFourG_OPEN_SSID": "EA8300_2dot4G_OPEN",
"twoFourG_WPA2_SSID": "EA8300_2dot4G_WPA2",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"EA8300_2dot4G_WPA",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "EA8300_2dot4G_WPA2-EAP"
},
"ec420": {
"profile_id": "20",
"fiveG_WPA2_SSID": "EC420_5G_WPA2",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "EC420_5G_WPA",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "EC420_5G_OPEN",
"fiveG_WPA2-EAP_SSID": "EC420_5G_WPA2-EAP",
"twoFourG_OPEN_SSID": "EC420_2dot4G_OPEN",
"twoFourG_WPA2_SSID": "EC420_2dot4G_WPA2",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"EC420_2dot4G_WPA",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "EC420_2dot4G_WPA2-EAP"
},
"ecw5211": {
"profile_id": "27",
"fiveG_WPA2_SSID": "ECW5211_5G_WPA2",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "ECW5211_5G_WPA",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "ECW5211_5G_OPEN",
"fiveG_WPA2-EAP_SSID": "ECW5211_5G_WPA2-EAP",
"twoFourG_OPEN_SSID": "ECW5211_2dot4G_OPEN",
"twoFourG_WPA2_SSID": "ECW5211_2dot4G_WPA2",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"ECW5211_2dot4G_WPA",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "ECW5211_2dot4G_WPA2-EAP"
},
"ecw5410_nat": {
"profile_id": "68",
"fiveG_WPA2_SSID": "ECW5410_5G_WPA2_NAT",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "ECW5410_5G_WPA_NAT",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "ECW5410_5G_OPEN_NAT",
"fiveG_WPA2-EAP_SSID": "ECW5410_5G_WPA2-EAP_NAT",
"twoFourG_OPEN_SSID": "ECW5410_2dot4G_OPEN_NAT",
"twoFourG_WPA2_SSID": "ECW5410_2dot4G_WPA2_NAT",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"ECW5410_2dot4G_WPA_NAT",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "ECW5410_2dot4G_WPA2-EAP_NAT"
},
"ea8300_nat": {
"profile_id": "67",
"fiveG_WPA2_SSID": "EA8300_5G_WPA2_NAT",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "EA8300_5G_WPA_NAT",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "EA8300_5G_OPEN_NAT",
"fiveG_WPA2-EAP_SSID": "EA8300_5G_WPA2-EAP_NAT",
"twoFourG_OPEN_SSID": "EA8300_2dot4G_OPEN_NAT",
"twoFourG_WPA2_SSID": "EA8300_2dot4G_WPA2_NAT",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"EA8300_2dot4G_WPA_NAT",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "EA8300_2dot4G_WPA2-EAP_NAT"
},
"ec420_nat": {
"profile_id": "70",
"fiveG_WPA2_SSID": "EC420_5G_WPA2_NAT",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "EC420_5G_WPA_NAT",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "EC420_5G_OPEN_NAT",
"fiveG_WPA2-EAP_SSID": "EC420_5G_WPA2-EAP_NAT",
"twoFourG_OPEN_SSID": "EC420_2dot4G_OPEN_NAT",
"twoFourG_WPA2_SSID": "EC420_2dot4G_WPA2_NAT",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"EC420_2dot4G_WPA_NAT",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "EC420_2dot4G_WPA2-EAP_NAT"
},
"ecw5211_nat": {
"profile_id": "69",
"fiveG_WPA2_SSID": "ECW5211_5G_WPA2_NAT",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "ECW5211_5G_WPA_NAT",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "ECW5211_5G_OPEN_NAT",
"fiveG_WPA2-EAP_SSID": "ECW5211_5G_WPA2-EAP_NAT",
"twoFourG_OPEN_SSID": "ECW5211_2dot4G_OPEN_NAT",
"twoFourG_WPA2_SSID": "ECW5211_2dot4G_WPA2_NAT",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"ECW5211_2dot4G_WPA_NAT",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "ECW5211_2dot4G_WPA2-EAP_NAT"
},
"ecw5410_vlan": {
"profile_id": "338",
"fiveG_WPA2_SSID": "ECW5410_5G_WPA2_VLAN",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "ECW5410_5G_WPA_VLAN",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "ECW5410_5G_OPEN_VLAN",
"fiveG_WPA2-EAP_SSID": "ECW5410_5G_WPA2-EAP_VLAN",
"twoFourG_OPEN_SSID": "ECW5410_2dot4G_OPEN_VLAN",
"twoFourG_WPA2_SSID": "ECW5410_2dot4G_WPA2_VLAN",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"ECW5410_2dot4G_WPA_VLAN",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "ECW5410_2dot4G_WPA2-EAP_VLAN"
},
"ea8300_vlan": {
"profile_id": "319",
"fiveG_WPA2_SSID": "EA8300_5G_WPA2_VLAN",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "EA8300_5G_WPA_VLAN",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "EA8300_5G_OPEN_VLAN",
"fiveG_WPA2-EAP_SSID": "EA8300_5G_WPA2-EAP_VLAN",
"twoFourG_OPEN_SSID": "EA8300_2dot4G_OPEN_VLAN",
"twoFourG_WPA2_SSID": "EA8300_2dot4G_WPA2_VLAN",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"EA8300_2dot4G_WPA_VLAN",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "EA8300_2dot4G_WPA2-EAP_VLAN"
},
"ec420_vlan": {
"profile_id": "357",
"fiveG_WPA2_SSID": "EC420_5G_WPA2_VLAN",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "EC420_5G_WPA_VLAN",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "EC420_5G_OPEN_VLAN",
"fiveG_WPA2-EAP_SSID": "EC420_5G_WPA2-EAP_VLAN",
"twoFourG_OPEN_SSID": "EC420_2dot4G_OPEN_VLAN",
"twoFourG_WPA2_SSID": "EC420_2dot4G_WPA2_VLAN",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"EC420_2dot4G_WPA_VLAN",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "EC420_2dot4G_WPA2-EAP_VLAN"
},
"ecw5211_vlan": {
"profile_id": "364",
"fiveG_WPA2_SSID": "ECW5211_5G_WPA2_VLAN",
"fiveG_WPA2_PSK": "Connectus123$",
"fiveG_WPA_SSID": "ECW5211_5G_WPA_VLAN",
"fiveG_WPA_PSK": "Connectus123$",
"fiveG_OPEN_SSID": "ECW5211_5G_OPEN_VLAN",
"fiveG_WPA2-EAP_SSID": "ECW5211_5G_WPA2-EAP_VLAN",
"twoFourG_OPEN_SSID": "ECW5211_2dot4G_OPEN_VLAN",
"twoFourG_WPA2_SSID": "ECW5211_2dot4G_WPA2_VLAN",
"twoFourG_WPA2_PSK": "Connectus123$",
"twoFourG_WPA_SSID":"ECW5211_2dot4G_WPA_VLAN",
"twoFourG_WPA_PSK": "Connectus123$",
"twoFourG_WPA2-EAP_SSID": "ECW5211_2dot4G_WPA2-EAP_VLAN"
}
}

View File

@@ -0,0 +1 @@
{"sanity_status": {"ea8300": "failed", "ecw5211": "failed", "ecw5410": "failed", "ec420": "failed"}, "sanity_run": {"new_data": "yes"}}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,207 @@
"""TestRail API binding for Python 3.x.
"""
import base64
import json
import requests
from pprint import pprint
import os
tr_user=os.getenv('TR_USER')
tr_pw=os.getenv('TR_PWD')
class APIClient:
def __init__(self, base_url):
self.user = tr_user
self.password = tr_pw
if not base_url.endswith('/'):
base_url += '/'
self.__url = base_url + 'index.php?/api/v2/'
def send_get(self, uri, filepath=None):
"""Issue a GET request (read) against the API.
Args:
uri: The API method to call including parameters, e.g. get_case/1.
filepath: The path and file name for attachment download; used only
for 'get_attachment/:attachment_id'.
Returns:
A dict containing the result of the request.
"""
return self.__send_request('GET', uri, filepath)
def send_post(self, uri, data):
"""Issue a POST request (write) against the API.
Args:
uri: The API method to call, including parameters, e.g. add_case/1.
data: The data to submit as part of the request as a dict; strings
must be UTF-8 encoded. If adding an attachment, must be the
path to the file.
Returns:
A dict containing the result of the request.
"""
return self.__send_request('POST', uri, data)
def __send_request(self, method, uri, data):
url = self.__url + uri
auth = str(
base64.b64encode(
bytes('%s:%s' % (self.user, self.password), 'utf-8')
),
'ascii'
).strip()
headers = {'Authorization': 'Basic ' + auth}
#print("Method =" , method)
if method == 'POST':
if uri[:14] == 'add_attachment': # add_attachment API method
files = {'attachment': (open(data, 'rb'))}
response = requests.post(url, headers=headers, files=files)
files['attachment'].close()
else:
headers['Content-Type'] = 'application/json'
payload = bytes(json.dumps(data), 'utf-8')
response = requests.post(url, headers=headers, data=payload)
else:
headers['Content-Type'] = 'application/json'
response = requests.get(url, headers=headers)
#print("headers = ", headers)
#print("resonse=", response)
#print("response code =", response.status_code)
if response.status_code > 201:
try:
error = response.json()
except: # response.content not formatted as JSON
error = str(response.content)
#raise APIError('TestRail API returned HTTP %s (%s)' % (response.status_code, error))
print('TestRail API returned HTTP %s (%s)' % (response.status_code, error))
return
else:
print(uri[:15])
if uri[:15] == 'get_attachments': # Expecting file, not JSON
try:
print('opening file')
print (str(response.content))
open(data, 'wb').write(response.content)
print('opened file')
return (data)
except:
return ("Error saving attachment.")
else:
try:
return response.json()
except: # Nothing to return
return {}
def get_project_id(self, project_name):
"Get the project ID using project name"
project_id = None
projects = client.send_get('get_projects')
#pprint(projects)
for project in projects:
if project['name']== project_name:
project_id = project['id']
# project_found_flag=True
break
print("project Id =",project_id)
return project_id
def get_run_id(self, test_run_name):
"Get the run ID using test name and project name"
run_id = None
project_id = client.get_project_id(project_name='WLAN')
try:
test_runs = client.send_get('get_runs/%s' % (project_id))
#print("------------TEST RUNS----------")
#pprint(test_runs)
except Exception:
print
'Exception in update_testrail() updating TestRail.'
return None
else:
for test_run in test_runs:
if test_run['name'] == test_run_name:
run_id = test_run['id']
#print("run Id in Test Runs=",run_id)
break
return run_id
def update_testrail(self, case_id, run_id, status_id, msg):
"Update TestRail for a given run_id and case_id"
update_flag = False
# Get the TestRail client account details
# Update the result in TestRail using send_post function.
# Parameters for add_result_for_case is the combination of runid and case id.
# status_id is 1 for Passed, 2 For Blocked, 4 for Retest and 5 for Failed
#status_id = 1 if result_flag is True else 5
print("result status Pass/Fail = ", status_id)
print("case id=", case_id)
print("run id passed to update is ", run_id, case_id)
if run_id is not None:
try:
result = client.send_post(
'add_result_for_case/%s/%s' % (run_id, case_id),
{'status_id': status_id, 'comment': msg})
print("result in post",result)
except Exception:
print
'Exception in update_testrail() updating TestRail.'
else:
print
'Updated test result for case: %s in test run: %s with msg:%s' % (case_id, run_id, msg)
return update_flag
def create_testrun(self, name, case_ids, project_id, milestone_id):
result = client.send_post(
'add_run/%s' % (project_id),
{'name': name, 'case_ids': case_ids, 'milestone_id': milestone_id, 'include_all': False})
print("result in post", result)
client: APIClient = APIClient('https://telecominfraproject.testrail.com')
client.user = 'syama.devi@connectus.ai'
client.password = 'Connectus123$'
###Old Demo Code
#case = client.send_get('get_case/936')
#print("---------TEST CASE 1---------")
#pprint(case)
#case = client.send_get('get_case/937')
#print("---------TEST CASE 2---------")
#pprint(case)
#print ("----------TEST Project ID----------")
#proj_id = client.get_project_id(project_name= "WLAN")
#pprint(proj_id)
#REST API POSTMAN PROJECT
#projId = client.get_project_id(project_name= "REST-API-POSTMAN")
#pprint("REST API POSTMAN PROJECT ID IS :", projId)
#print("---------TEST RUN ID-----------")
#rid = client.get_run_id(test_run_name='Master',project_name='WLAN')
#rid=client.get_run_id(test_run_name= 'Master-Run3')
#pprint(rid)
#result: bool= client.update_testrail(case_id = 1, run_id=rid, status_id = 5, msg ='Test Failed')
#result = client.send_get('get_attachment/:1', '/Users/syamadevi/Desktop/syama/python-test/TestRail/testreport.pdf')
#print(result)
#project_report= client.send_get("get_reports/:%s" %proj_id)
#print(project_report)
class APIError(Exception):
pass