diff --git a/py-scripts/tip-cicd-sanity/Throughput_Test.py b/py-scripts/tip-cicd-sanity/Throughput_Test.py new file mode 100755 index 00000000..daf615ea --- /dev/null +++ b/py-scripts/tip-cicd-sanity/Throughput_Test.py @@ -0,0 +1,399 @@ +import csv +import sys +import time +import datetime +from datetime import date +import json +import os +import logging + +import single_client_throughput +import cloudsdk +from cloudsdk import CloudSDK +import lab_ap_info + +cloudSDK_url=os.getenv('CLOUD_SDK_URL') +station = ["tput5000"] +runtime = 10 +csv_path=os.getenv('CSV_PATH') + +#EAP Credentials +identity=os.getenv('EAP_IDENTITY') +ttls_password=os.getenv('EAP_PWD') + +local_dir=os.getenv('TPUT_LOG_DIR') +logger = logging.getLogger('Throughput_Test') +hdlr = logging.FileHandler(local_dir+"/Throughput_Testing.log") +formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') +hdlr.setFormatter(formatter) +logger.addHandler(hdlr) +logger.setLevel(logging.INFO) + + +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit(1) + +if 'py-json' not in sys.path: + sys.path.append('../../py-json') + +def throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput): + #parse client_tput list returned from single_client_throughput + udp_ds = client_tput[0].partition(": ")[2] + udp_us = client_tput[1].partition(": ")[2] + tcp_ds = client_tput[2].partition(": ")[2] + tcp_us = client_tput[3].partition(": ")[2] + # Find band for CSV ---> This code is not great, it SHOULD get that info from LANForge! + if "5G" in ssid_name: + frequency = "5 GHz" + elif "2dot4G" in ssid_name: + frequency = "2.4 GHz" + else: + frequency = "Unknown" + # Append row to top of CSV file + row = [ap_model, firmware, frequency, mimo, security, mode, udp_ds, udp_us, tcp_ds, tcp_us] + with open(csv_file, 'r') as readFile: + reader = csv.reader(readFile) + lines = list(reader) + lines.insert(1, row) + with open(csv_file, 'w') as writeFile: + writer = csv.writer(writeFile) + writer.writerows(lines) + readFile.close() + writeFile.close() + +#Import dictionaries for AP Info +from lab_ap_info import equipment_id_dict +from lab_ap_info import profile_info_dict +from lab_ap_info import ap_models +from lab_ap_info import mimo_2dot4g +from lab_ap_info import mimo_5g +#import json file to determine if throughput should be run for specific AP model +sanity_status = json.load(open("sanity_status.json")) + + +#bearer = CloudSDK.get_bearer(cloudSDK_url) +#print(bearer) + +#create CSV file for test run +today = str(date.today()) +csv_file = csv_path+"throughput_test_"+today+".csv" +headers = ['AP Type', 'Firmware','Radio', 'MIMO', 'Security', 'Mode', 'UDP Downstream (Mbps)', 'UDP Upstream (Mbps)', 'TCP Downstream (Mbps)', 'TCP Upstream (Mbps)'] +with open(csv_file, "w") as file: + create = csv.writer(file) + create.writerow(headers) + file.close() + +ap_firmware_dict = { + "ea8300": '', + "ecw5211": '', + "ecw5410": '', + "ec420": '' +} + +logger.info('Start of Throughput Test') + +for key in equipment_id_dict: + if sanity_status['sanity_status'][key] == "passed": + logger.info("Running throughput test on " + key) + ##Get Bearer Token to make sure its valid (long tests can require re-auth) + bearer = CloudSDK.get_bearer(cloudSDK_url) + ###Get Current AP Firmware + customer_id = "2" + equipment_id = equipment_id_dict[key] + ap_fw = CloudSDK.ap_firmware(customer_id, equipment_id, cloudSDK_url, bearer) + fw_model = ap_fw.partition("-")[0] + print("AP MODEL UNDER TEST IS", fw_model) + print('Current AP Firmware:', ap_fw) + ##add current FW to dictionary + ap_firmware_dict[fw_model] = ap_fw + + ########################################################################### + ############## Bridge Throughput Testing ################################# + ########################################################################### + print("Testing for Bridge SSIDs") + logger.info("Starting Brdige SSID tput tests on " + key) + ###Set Proper AP Profile for Bridge SSID Tests + test_profile_id = profile_info_dict[fw_model]["profile_id"] + #print(test_profile_id) + ap_profile = CloudSDK.set_ap_profile(equipment_id, test_profile_id, cloudSDK_url, bearer) + ### Wait for Profile Push + print('-----------------PROFILE PUSH -------------------') + time.sleep(180) + + # 5G WPA2 Enterprise UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + sta_list = station + radio = "wiphy3" + ssid_name = profile_info_dict[fw_model]["fiveG_WPA2-EAP_SSID"] + security = "wpa2" + eap_type = "TTLS" + mode = "Bridge" + mimo = mimo_5g[fw_model] + client_tput = single_client_throughput.eap_tput(sta_list, ssid_name, radio, security, eap_type, identity, ttls_password) + print(fw_model, "5 GHz WPA2-EAP throughput:\n", client_tput) + security = "wpa2-eap" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + #5G WPA2 UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + radio = "wiphy3" + ssid_name = profile_info_dict[fw_model]["fiveG_WPA2_SSID"] + ssid_psk = profile_info_dict[fw_model]["fiveG_WPA2_PSK"] + security = "wpa2" + mode = "Bridge" + mimo = mimo_5g[fw_model] + client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime) + print(fw_model, "5 GHz WPA2 throughput:\n",client_tput) + security = "wpa2-psk" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 5G WPA UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + radio = "wiphy3" + ssid_name = profile_info_dict[fw_model]["fiveG_WPA_SSID"] + ssid_psk = profile_info_dict[fw_model]["fiveG_WPA_PSK"] + security = "wpa" + mode = "Bridge" + mimo = mimo_5g[fw_model] + client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime) + print(fw_model, "5 GHz WPA throughput:\n",client_tput) + security = "wpa-psk" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 5G Open UDP DS/US and TCP DS/US + # ap_model = fw_model + # firmware = ap_fw + # radio = "wiphy3" + # ssid_name = profile_info_dict[fw_model]["fiveG_OPEN_SSID"] + # ssid_psk = "BLANK" + # security = "open" + #mode = "Bridge" + #mimo = mimo_5g[fw_model] + # client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime) + #print(fw_model, "5 GHz Open throughput:\n",client_tput) + #throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 2.4G WPA2 Enterprise UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + sta_list = station + radio = "wiphy0" + ssid_name = profile_info_dict[fw_model]["twofourG_WPA2-EAP_SSID"] + security = "wpa2" + eap_type = "TTLS" + mode = "Bridge" + mimo = mimo_2dot4g[fw_model] + client_tput = single_client_throughput.eap_tput(sta_list, ssid_name, radio, security, eap_type, identity, + ttls_password) + print(fw_model, "2.4 GHz WPA2-EAP throughput:\n", client_tput) + security = "wpa2-eap" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 2.4G WPA2 UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + radio = "wiphy0" + ssid_name = profile_info_dict[fw_model]["twoFourG_WPA2_SSID"] + ssid_psk = profile_info_dict[fw_model]["twoFourG_WPA2_PSK"] + security = "wpa2" + mode = "Bridge" + mimo = mimo_2dot4g[fw_model] + client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime) + print(fw_model, "2.4 GHz WPA2 throughput:\n",client_tput) + security = "wpa2-psk" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 2.4G WPA UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + radio = "wiphy0" + ssid_name = profile_info_dict[fw_model]["twoFourG_WPA_SSID"] + ssid_psk = profile_info_dict[fw_model]["twoFourG_WPA_PSK"] + security = "wpa" + mode = "Bridge" + mimo = mimo_2dot4g[fw_model] + client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime) + print(fw_model, "2.4 GHz WPA throughput:\n",client_tput) + security = "wpa-psk" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 2.4G Open UDP DS/US and TCP DS/US + #ap_model = fw_model + #firmware = ap_fw + # radio = "wiphy3" + # ssid_name = profile_info_dict[fw_model]["twoFourG_OPEN_SSID"] + # ssid_psk = "BLANK" + # security = "open" + #mode = "Bridge" + #mimo = mimo_2dot4g[fw_model] + #client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime, csv_file) + #print(fw_model, "2.4 GHz Open throughput:\n",client_tput) + #throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + ########################################################################### + ################# NAT Mode Throughput Testing ############################ + ########################################################################### + print('Testing for NAT SSIDs') + logger.info("Starting NAT SSID tput tests on " + key) + ###Set Proper AP Profile for NAT SSID Tests + test_profile_id = profile_info_dict[fw_model + '_nat']["profile_id"] + print(test_profile_id) + ap_profile = CloudSDK.set_ap_profile(equipment_id, test_profile_id, cloudSDK_url, bearer) + + ### Wait for Profile Push + print('-----------------PROFILE PUSH -------------------') + time.sleep(180) + + # 5G WPA2 Enterprise UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + sta_list = station + radio = "wiphy3" + ssid_name = profile_info_dict[fw_model+'_nat']["fiveG_WPA2-EAP_SSID"] + security = "wpa2" + eap_type = "TTLS" + mode = "NAT" + mimo = mimo_5g[fw_model] + client_tput = single_client_throughput.eap_tput(sta_list, ssid_name, radio, security, eap_type, identity, + ttls_password) + print(fw_model, "5 GHz WPA2-EAP NAT throughput:\n", client_tput) + security = "wpa2-eap" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 5G WPA2 NAT UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + radio = "wiphy3" + ssid_name = profile_info_dict[fw_model+'_nat']["fiveG_WPA2_SSID"] + ssid_psk = profile_info_dict[fw_model+'_nat']["fiveG_WPA2_PSK"] + security = "wpa2" + mode = "NAT" + mimo = mimo_5g[fw_model] + security = "wpa2-psk" + client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, + runtime) + print(fw_model, "5 GHz WPA2 NAT throughput:\n", client_tput) + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 5G WPA UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + radio = "wiphy3" + ssid_name = profile_info_dict[fw_model+'_nat']["fiveG_WPA_SSID"] + ssid_psk = profile_info_dict[fw_model+'_nat']["fiveG_WPA_PSK"] + security = "wpa" + mode = "NAT" + mimo = mimo_5g[fw_model] + client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, + runtime) + print(fw_model, "5 GHz WPA NAT throughput:\n", client_tput) + security = "wpa-psk" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 5G Open UDP DS/US and TCP DS/US + # ap_model = fw_model + # firmware = ap_fw + # radio = "wiphy3" + # ssid_name = profile_info_dict[fw_model+'_nat']["fiveG_OPEN_SSID"] + # ssid_psk = "BLANK" + # security = "open" + # mode = "NAT" + #mimo = mimo_5g[fw_model] + # client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime) + # print(fw_model, "5 GHz Open NAT throughput:\n",client_tput) + # throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 2.4G WPA2 Enterprise UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + sta_list = station + radio = "wiphy0" + ssid_name = profile_info_dict[fw_model+'_nat']["twofourG_WPA2-EAP_SSID"] + security = "wpa2" + eap_type = "TTLS" + mode = "NAT" + mimo = mimo_2dot4g[fw_model] + client_tput = single_client_throughput.eap_tput(sta_list, ssid_name, radio, security, eap_type, identity, + ttls_password) + print(fw_model, "2.4 GHz WPA2-EAP NAT throughput:\n", client_tput) + security = "wpa2-eap" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 2.4G WPA2 UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + radio = "wiphy0" + ssid_name = profile_info_dict[fw_model+'_nat']["twoFourG_WPA2_SSID"] + ssid_psk = profile_info_dict[fw_model+'_nat']["twoFourG_WPA2_PSK"] + security = "wpa2" + mode = "NAT" + mimo = mimo_2dot4g[fw_model] + client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, + runtime) + print(fw_model, "2.4 GHz WPA2 NAT throughput:\n", client_tput) + security = "wpa2-psk" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 2.4G WPA UDP DS/US and TCP DS/US + ap_model = fw_model + firmware = ap_fw + radio = "wiphy0" + ssid_name = profile_info_dict[fw_model+'_nat']["twoFourG_WPA_SSID"] + ssid_psk = profile_info_dict[fw_model+'_nat']["twoFourG_WPA_PSK"] + security = "wpa" + mode = "NAT" + mimo = mimo_2dot4g[fw_model] + client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, + runtime) + print(fw_model, "2.4 GHz WPA NAT throughput:\n", client_tput) + security = "wpa-psk" + throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + # 2.4G Open NAT UDP DS/US and TCP DS/US + # ap_model = fw_model + # firmware = ap_fw + # radio = "wiphy3" + # ssid_name = profile_info_dict[fw_model+'_nat']["twoFourG_OPEN_SSID"] + # ssid_psk = "BLANK" + # security = "open" + # mode = "NAT" + #mimo = mimo_2dot4g[fw_model] + # client_tput = single_client_throughput.main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime, csv_file) + # print(fw_model, "2.4 GHz Open NAT throughput:\n",client_tput) + # throughput_csv(csv_file, ssid_name, ap_model, mimo, firmware, security, mode, client_tput) + + #Indicates throughput has been run for AP model + sanity_status['sanity_status'][key] = "tput run" + logger.info("Trhoughput tests complete on " + key) + + elif sanity_status['sanity_status'][key] == "tput run": + print("Throughput test already run on", key) + logger.info("Throughput test already run on "+ key +" for latest AP FW") + + else: + print(key,"did not pass Nightly Sanity. Skipping throughput test on this AP Model") + logger.info(key+" did not pass Nightly Sanity. Skipping throughput test.") + +#Indicate which AP model has had tput test to external json file +with open('sanity_status.json', 'w') as json_file: + json.dump(sanity_status, json_file) + +with open(csv_file, 'r') as readFile: + reader = csv.reader(readFile) + lines = list(reader) + row_count = len(lines) + #print(row_count) + +if row_count <= 1: + os.remove(csv_file) + file.close() + +else: + print("Saving File") + file.close() + +print(" -- Throughput Testing Complete -- ") diff --git a/py-scripts/tip-cicd-sanity/ap_ssh.py b/py-scripts/tip-cicd-sanity/ap_ssh.py new file mode 100755 index 00000000..22b59b0d --- /dev/null +++ b/py-scripts/tip-cicd-sanity/ap_ssh.py @@ -0,0 +1,65 @@ +import paramiko +from paramiko import SSHClient +import socket + +def ssh_cli_active_fw(ap_ip, username, password): + try: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(ap_ip, username=username, password=password, timeout=5) + stdin, stdout, stderr = client.exec_command('/usr/opensync/bin/ovsh s AWLAN_Node -c | grep FW_IMAGE_ACTIVE') + + version_matrix = str(stdout.read()) + version_matrix_split = version_matrix.partition('FW_IMAGE_ACTIVE","')[2] + cli_active_fw = version_matrix_split.partition('"],[')[0] + #print("Active FW is",cli_active_fw) + + stdin, stdout, stderr = client.exec_command('/usr/opensync/bin/ovsh s Manager -c | grep status') + status = str(stdout.read()) + + if "ACTIVE" in status: + #print("AP is in Active state") + state = "active" + elif "BACKOFF" in status: + #print("AP is in Backoff state") + state = "backoff" + else: + #print("AP is not in Active state") + state = "unknown" + + cli_info = { + "state": state, + "active_fw": cli_active_fw + } + + return(cli_info) + + except paramiko.ssh_exception.AuthenticationException: + print("Authentication Error, Check Credentials") + return "ERROR" + except paramiko.SSHException: + print("Cannot SSH to the AP") + return "ERROR" + except socket.timeout: + print("AP Unreachable") + return "ERROR" + +def iwinfo_status(ap_ip, username, password): + try: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(ap_ip, username=username, password=password, timeout=5) + stdin, stdout, stderr = client.exec_command('iwinfo | grep ESSID') + + for line in stdout.read().splitlines(): + print(line) + + except paramiko.ssh_exception.AuthenticationException: + print("Authentication Error, Check Credentials") + return "ERROR" + except paramiko.SSHException: + print("Cannot SSH to the AP") + return "ERROR" + except socket.timeout: + print("AP Unreachable") + return "ERROR" \ No newline at end of file diff --git a/py-scripts/tip-cicd-sanity/cloudsdk.py b/py-scripts/tip-cicd-sanity/cloudsdk.py new file mode 100755 index 00000000..27d09965 --- /dev/null +++ b/py-scripts/tip-cicd-sanity/cloudsdk.py @@ -0,0 +1,152 @@ +#!/usr/bin/python3 + +import base64 +import urllib.request +from bs4 import BeautifulSoup +import ssl +import subprocess, os +from artifactory import ArtifactoryPath +import tarfile +import paramiko +from paramiko import SSHClient +from scp import SCPClient +import os +import pexpect +from pexpect import pxssh +import sys +import paramiko +from scp import SCPClient +import pprint +from pprint import pprint +from os import listdir +import re +import requests +import json +import testrail_api +import logging +import datetime +import time + +user=os.getenv('CLOUDSDK_USER') +password=os.getenv('CLOUDSDK_PWD') + +###Class for CloudSDK Interaction via RestAPI +class CloudSDK: + def __init__(self): + self.user = user + + def get_bearer(cloudSDK_url): + cloud_login_url = cloudSDK_url+"/management/v1/oauth2/token" + payload = ''' + { + "userId": "'''+user+'''", + "password": "'''+password+'''" + } + ''' + headers = { + 'Content-Type': 'application/json' + } + try: + token_response = requests.request("POST", cloud_login_url, headers=headers, data=payload) + except requests.exceptions.RequestException as e: + raise SystemExit("Exiting Script! Cloud not get bearer token for reason:",e) + token_data = token_response.json() + bearer_token = token_data['access_token'] + return(bearer_token) + + def ap_firmware(customer_id,equipment_id, cloudSDK_url, bearer): + equip_fw_url = cloudSDK_url+"/portal/status/forEquipment?customerId="+customer_id+"&equipmentId="+equipment_id + payload = {} + headers = { + 'Authorization': 'Bearer ' + bearer + } + status_response = requests.request("GET", equip_fw_url, headers=headers, data=payload) + status_code = status_response.status_code + if status_code is 200: + status_data = status_response.json() + #print(status_data) + current_ap_fw = status_data[2]['details']['reportedSwVersion'] + return current_ap_fw + else: + return("ERROR") + + def CloudSDK_images(apModel, cloudSDK_url, bearer): + getFW_url = cloudSDK_url+"/portal/firmware/version/byEquipmentType?equipmentType=AP&modelId=" + apModel + payload = {} + headers = { + 'Authorization': 'Bearer ' + bearer + } + response = requests.request("GET", getFW_url, headers=headers, data=payload) + ap_fw_details = response.json() + ###return ap_fw_details + fwlist = [] + for version in ap_fw_details: + fwlist.append(version.get('versionName')) + return(fwlist) + #fw_versionNames = ap_fw_details[0]['versionName'] + #return fw_versionNames + + def firwmare_upload(commit, apModel,latest_image,fw_url,cloudSDK_url,bearer): + fw_upload_url = cloudSDK_url+"/portal/firmware/version" + payload = "{\n \"model_type\": \"FirmwareVersion\",\n \"id\": 0,\n \"equipmentType\": \"AP\",\n \"modelId\": \""+apModel+"\",\n \"versionName\": \""+latest_image+"\",\n \"description\": \"\",\n \"filename\": \""+fw_url+"\",\n \"commit\": \""+commit+"\",\n \"validationMethod\": \"MD5_CHECKSUM\",\n \"validationCode\": \"19494befa87eb6bb90a64fd515634263\",\n \"releaseDate\": 1596192028877,\n \"createdTimestamp\": 0,\n \"lastModifiedTimestamp\": 0\n}\n\n" + headers = { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer ' + bearer + } + + response = requests.request("POST", fw_upload_url, headers=headers, data=payload) + #print(response) + upload_result = response.json() + return(upload_result) + + def get_firmware_id(latest_ap_image, cloudSDK_url, bearer): + #print(latest_ap_image) + fw_id_url = cloudSDK_url+"/portal/firmware/version/byName?firmwareVersionName="+latest_ap_image + + payload = {} + headers = { + 'Authorization': 'Bearer ' + bearer + } + response = requests.request("GET", fw_id_url, headers=headers, data=payload) + fw_data = response.json() + latest_fw_id = fw_data['id'] + return latest_fw_id + + def update_firmware(equipment_id, latest_firmware_id, cloudSDK_url, bearer): + url = cloudSDK_url+"/portal/equipmentGateway/requestFirmwareUpdate?equipmentId="+equipment_id+"&firmwareVersionId="+latest_firmware_id + + payload = {} + headers = { + 'Authorization': 'Bearer ' + bearer + } + + response = requests.request("POST", url, headers=headers, data=payload) + #print(response.text) + return response.json() + + def set_ap_profile(equipment_id, test_profile_id, cloudSDK_url, bearer): + ###Get AP Info + url = cloudSDK_url+"/portal/equipment?equipmentId="+equipment_id + payload = {} + headers = { + 'Authorization': 'Bearer ' + bearer + } + + response = requests.request("GET", url, headers=headers, data=payload) + print(response) + + ###Add Lab Profile ID to Equipment + equipment_info = response.json() + #print(equipment_info) + equipment_info["profileId"] = test_profile_id + #print(equipment_info) + + ###Update AP Info with Required Profile ID + url = cloudSDK_url+"/portal/equipment" + headers = { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer ' + bearer + } + + response = requests.request("PUT", url, headers=headers, data=json.dumps(equipment_info)) + #print(response) diff --git a/py-scripts/tip-cicd-sanity/eap_connect.py b/py-scripts/tip-cicd-sanity/eap_connect.py new file mode 100755 index 00000000..ae19a170 --- /dev/null +++ b/py-scripts/tip-cicd-sanity/eap_connect.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python3 + +# This will create a station, create TCP and UDP traffic, run it a short amount of time, +# and verify whether traffic was sent and received. It also verifies the station connected +# to the requested BSSID if bssid is specified as an argument. +# The script will clean up the station and connections at the end of the test. + +import sys + +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit(1) + +if 'py-json' not in sys.path: + sys.path.append('../../py-json') + +import argparse +import LANforge +from LANforge import LFUtils +# from LANforge import LFCliBase +from LANforge import lfcli_base +from LANforge.lfcli_base import LFCliBase +from LANforge.LFUtils import * +import realm +from realm import Realm +import pprint + +OPEN="open" +WEP="wep" +WPA="wpa" +WPA2="wpa2" +MODE_AUTO=0 + +class EAPConnect(LFCliBase): + def __init__(self, host, port, security=None, ssid=None, sta_list=None, number_template="00000", _debug_on=False, _dut_bssid="", + _exit_on_error=False, _sta_name=None, _resource=1, radio="wiphy0", key_mgmt="WPA-EAP", eap="", identity="", + ttls_passwd="", hessid=None, ttls_realm="", domain="", _exit_on_fail=False, _cleanup_on_exit=True): + super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail) + self.host = host + self.port = port + self.ssid = ssid + self.radio = radio + self.security = security + #self.password = password + self.sta_list = sta_list + self.key_mgmt = key_mgmt + self.eap = eap + self.identity = identity + self.ttls_passwd = ttls_passwd + self.ttls_realm = ttls_realm + self.domain = domain + self.hessid = hessid + self.dut_bssid = _dut_bssid + self.timeout = 120 + self.number_template = number_template + self.debug = _debug_on + self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port) + self.station_profile = self.local_realm.new_station_profile() + self.station_profile.lfclient_url = self.lfclient_url + self.station_profile.ssid = self.ssid + self.station_profile.security = self.security + self.station_profile.number_template_ = self.number_template + self.station_profile.mode = 0 + #Added to test_ipv4_ttls code + self.upstream_url = None # defer construction + self.sta_url_map = None + self.upstream_resource = None + self.upstream_port = "eth2" + self.station_names = [] + if _sta_name is not None: + self.station_names = [_sta_name] + self.localrealm = Realm(lfclient_host=host, lfclient_port=port) + self.resource = _resource + self.cleanup_on_exit = _cleanup_on_exit + self.resulting_stations = {} + self.resulting_endpoints = {} + self.station_profile = None + self.l3_udp_profile = None + self.l3_tcp_profile = None + + # def get_realm(self) -> Realm: # py > 3.6 + def get_realm(self): + return self.localrealm + + def get_station_url(self, sta_name_=None): + if sta_name_ is None: + raise ValueError("get_station_url wants a station name") + if self.sta_url_map is None: + self.sta_url_map = {} + for sta_name in self.station_names: + self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name) + return self.sta_url_map[sta_name_] + + def get_upstream_url(self): + if self.upstream_url is None: + self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port) + return self.upstream_url + + # Compare pre-test values to post-test values + def compare_vals(self, name, postVal, print_pass=False, print_fail=True): + # print(f"Comparing {name}") + if postVal > 0: + self._pass("%s %s" % (name, postVal), print_pass) + else: + self._fail("%s did not report traffic: %s" % (name, postVal), print_fail) + + def remove_stations(self): + for name in self.station_names: + LFUtils.removePort(self.resource, name, self.lfclient_url) + + def num_associated(self, bssid): + counter = 0 + # print("there are %d results" % len(self.station_results)) + fields = "_links,port,alias,ip,ap,port+type" + self.station_results = self.localrealm.find_ports_like("eap*", fields, debug_=False) + if (self.station_results is None) or (len(self.station_results) < 1): + self.get_failed_result_list() + for eid,record in self.station_results.items(): + #print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") + #pprint(eid) + #pprint(record) + if record["ap"] == bssid: + counter += 1 + #print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") + return counter + + def clear_test_results(self): + self.resulting_stations = {} + self.resulting_endpoints = {} + super().clear_test_results() + #super(StaConnect, self).clear_test_results().test_results.clear() + + def setup(self): + self.clear_test_results() + self.check_connect() + upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False) + + if upstream_json is None: + self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True) + return False + + if upstream_json['interface']['ip'] == "0.0.0.0": + if self.debug: + pprint.pprint(upstream_json) + self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True) + return False + + # remove old stations + print("Removing old stations") + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + response = self.json_get(sta_url) + if (response is not None) and (response["interface"] is not None): + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names) + + # Create stations and turn dhcp on + self.station_profile = self.localrealm.new_station_profile() + + # Build stations + self.station_profile.use_security(self.security, self.ssid, passwd="[BLANK]") + self.station_profile.set_number_template(self.number_template) + print("Creating stations") + self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) + self.station_profile.set_command_param("set_port", "report_timer", 1500) + self.station_profile.set_command_flag("set_port", "rpt_timer", 1) + self.station_profile.set_wifi_extra(key_mgmt=self.key_mgmt, eap=self.eap, identity=self.identity, + passwd=self.ttls_passwd, + realm=self.ttls_realm, domain=self.domain, + hessid=self.hessid) + self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug, use_radius=True, hs20_enable=False) + self._pass("PASS: Station build finished") + + # Create UDP endpoints + self.l3_udp_profile = self.localrealm.new_l3_cx_profile() + self.l3_udp_profile.side_a_min_bps = 128000 + self.l3_udp_profile.side_b_min_bps = 128000 + self.l3_udp_profile.side_a_min_pdu = 1200 + self.l3_udp_profile.side_b_min_pdu = 1500 + self.l3_udp_profile.report_timer = 1000 + self.l3_udp_profile.name_prefix = "udp" + self.l3_udp_profile.create(endp_type="lf_udp", + side_a=list(self.localrealm.find_ports_like("eap+")), + side_b="%d.%s" % (self.resource, self.upstream_port), + suppress_related_commands=True) + + # Create TCP endpoints + self.l3_tcp_profile = self.localrealm.new_l3_cx_profile() + self.l3_tcp_profile.side_a_min_bps = 128000 + self.l3_tcp_profile.side_b_min_bps = 56000 + self.l3_tcp_profile.name_prefix = "tcp" + self.l3_tcp_profile.report_timer = 1000 + self.l3_tcp_profile.create(endp_type="lf_tcp", + side_a=list(self.localrealm.find_ports_like("eap+")), + side_b="%d.%s" % (self.resource, self.upstream_port), + suppress_related_commands=True) + + def start(self): + if self.station_profile is None: + self._fail("Incorrect setup") + pprint.pprint(self.station_profile) + if self.station_profile.up is None: + self._fail("Incorrect station profile, missing profile.up") + if self.station_profile.up == False: + print("\nBringing ports up...") + data = {"shelf": 1, + "resource": self.resource, + "port": "ALL", + "probe_flags": 1} + self.json_post("/cli-json/nc_show_ports", data) + self.station_profile.admin_up() + LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names) + + # station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl())) + duration = 0 + maxTime = 60 + ip = "0.0.0.0" + ap = "" + print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="") + connected_stations = {} + while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime): + duration += 3 + time.sleep(3) + print(".", end="") + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + station_info = self.json_get(sta_url + "?fields=port,ip,ap") + + # LFUtils.debug_printer.pprint(station_info) + if (station_info is not None) and ("interface" in station_info): + if "ip" in station_info["interface"]: + ip = station_info["interface"]["ip"] + if "ap" in station_info["interface"]: + ap = station_info["interface"]["ap"] + + if (ap == "Not-Associated") or (ap == ""): + if self.debug: + print(" -%s," % sta_name, end="") + else: + if ip == "0.0.0.0": + if self.debug: + print(" %s (0.0.0.0)" % sta_name, end="") + else: + connected_stations[sta_name] = sta_url + data = { + "shelf":1, + "resource": self.resource, + "port": "ALL", + "probe_flags": 1 + } + self.json_post("/cli-json/nc_show_ports", data) + + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + station_info = self.json_get(sta_url) # + "?fields=port,ip,ap") + if station_info is None: + print("unable to query %s" % sta_url) + self.resulting_stations[sta_url] = station_info + ap = station_info["interface"]["ap"] + ip = station_info["interface"]["ip"] + if (ap != "") and (ap != "Not-Associated"): + print(" %s +AP %s, " % (sta_name, ap), end="") + if self.dut_bssid != "": + if self.dut_bssid.lower() == ap.lower(): + self._pass(sta_name+" connected to BSSID: " + ap) + # self.test_results.append("PASSED: ) + # print("PASSED: Connected to BSSID: "+ap) + else: + self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap)) + else: + self._fail(sta_name+" did not connect to AP") + return False + + if ip == "0.0.0.0": + self._fail("%s did not get an ip. Ending test" % sta_name) + else: + self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip)) + + if self.passes() == False: + if self.cleanup_on_exit: + print("Cleaning up...") + self.remove_stations() + return False + + # start cx traffic + print("\nStarting CX Traffic") + self.l3_udp_profile.start_cx() + self.l3_tcp_profile.start_cx() + time.sleep(1) + # Refresh stats + self.l3_udp_profile.refresh_cx() + self.l3_tcp_profile.refresh_cx() + + def collect_endp_stats(self, endp_map): + print("Collecting Data") + fields="?fields=name,tx+bytes,rx+bytes" + for (cx_name, endps) in endp_map.items(): + try: + endp_url = "/endp/%s%s" % (endps[0], fields) + endp_json = self.json_get(endp_url) + self.resulting_endpoints[endp_url] = endp_json + ptest_a_tx = endp_json['endpoint']['tx bytes'] + ptest_a_rx = endp_json['endpoint']['rx bytes'] + + #ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"]) + endp_url = "/endp/%s%s" % (endps[1], fields) + endp_json = self.json_get(endp_url) + self.resulting_endpoints[endp_url] = endp_json + + ptest_b_tx = endp_json['endpoint']['tx bytes'] + ptest_b_rx = endp_json['endpoint']['rx bytes'] + + self.compare_vals("testTCP-A TX", ptest_a_tx) + self.compare_vals("testTCP-A RX", ptest_a_rx) + + self.compare_vals("testTCP-B TX", ptest_b_tx) + self.compare_vals("testTCP-B RX", ptest_b_rx) + + except Exception as e: + print("Is this the function having the error?") + self.error(e) + + + def stop(self): + # stop cx traffic + print("Stopping CX Traffic") + self.l3_udp_profile.stop_cx() + self.l3_tcp_profile.stop_cx() + + # Refresh stats + print("\nRefresh CX stats") + self.l3_udp_profile.refresh_cx() + self.l3_tcp_profile.refresh_cx() + + print("Sleeping for 5 seconds") + time.sleep(5) + + # get data for endpoints JSON + self.collect_endp_stats(self.l3_udp_profile.created_cx) + self.collect_endp_stats(self.l3_tcp_profile.created_cx) + # print("\n") + + def cleanup(self): + # remove all endpoints and cxs + if self.cleanup_on_exit: + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + curr_endp_names = [] + removeCX(self.lfclient_url, self.l3_udp_profile.get_cx_names()) + removeCX(self.lfclient_url, self.l3_tcp_profile.get_cx_names()) + for (cx_name, endp_names) in self.l3_udp_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + for (cx_name, endp_names) in self.l3_tcp_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug) + +# ~class + + + +if __name__ == "__main__": + main() diff --git a/py-scripts/tip-cicd-sanity/lab_ap_info.py b/py-scripts/tip-cicd-sanity/lab_ap_info.py new file mode 100755 index 00000000..4eac4232 --- /dev/null +++ b/py-scripts/tip-cicd-sanity/lab_ap_info.py @@ -0,0 +1,248 @@ +#!/usr/bin/python3 + +ap_models = ["ec420","ea8300","ecw5211","ecw5410"] + +##AP Models for firmware upload +cloud_sdk_models = { + "ec420": "EC420-G1", + "ea8300": "EA8300-CA", + "ecw5211": "ECW5211", + "ecw5410": "ECW5410" +} + +mimo_5g = { + "ec420": "4x4", + "ea8300": "2x2", + "ecw5211": "2x2", + "ecw5410": "4x4" +} + +mimo_2dot4g = { + "ec420": "2x2", + "ea8300": "2x2", + "ecw5211": "2x2", + "ecw5410": "4x4" +} + +sanity_status = { + "ea8300": "failed", + "ecw5211": 'passed', + "ecw5410": 'failed', + "ec420": 'failed' +} + +##Equipment IDs for Lab APs under test +equipment_id_dict = { + "ea8300": "13", + "ecw5410": "12", + "ecw5211": "6", + "ec420": "11" +} + +equipment_ip_dict = { + "ea8300": "10.10.10.103", + "ecw5410": "10.10.10.105", + "ec420": "10.10.10.104", + "ecw5211": "10.10.10.102" +} + +eqiupment_credentials_dict = { + "ea8300": "openwifi", + "ecw5410": "openwifi", + "ec420": "openwifi", + "ecw5211": "admin123" +} +###Testing AP Profile Information +profile_info_dict = { + "ecw5410": { + "profile_id": "2", + "fiveG_WPA2_SSID": "ECW5410_5G_WPA2", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "ECW5410_5G_WPA", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "ECW5410_5G_OPEN", + "fiveG_WPA2-EAP_SSID": "ECW5410_5G_WPA2-EAP", + "twoFourG_OPEN_SSID": "ECW5410_2dot4G_OPEN", + "twoFourG_WPA2_SSID": "ECW5410_2dot4G_WPA2", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"ECW5410_2dot4G_WPA", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "ECW5410_2dot4G_WPA2-EAP" + }, + + "ea8300": { + "profile_id": "153", + "fiveG_WPA2_SSID": "EA8300_5G_WPA2", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "EA8300_5G_WPA", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "EA8300_5G_OPEN", + "fiveG_WPA2-EAP_SSID": "EA8300_5G_WPA2-EAP", + "twoFourG_OPEN_SSID": "EA8300_2dot4G_OPEN", + "twoFourG_WPA2_SSID": "EA8300_2dot4G_WPA2", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"EA8300_2dot4G_WPA", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "EA8300_2dot4G_WPA2-EAP" + }, + + "ec420": { + "profile_id": "20", + "fiveG_WPA2_SSID": "EC420_5G_WPA2", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "EC420_5G_WPA", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "EC420_5G_OPEN", + "fiveG_WPA2-EAP_SSID": "EC420_5G_WPA2-EAP", + "twoFourG_OPEN_SSID": "EC420_2dot4G_OPEN", + "twoFourG_WPA2_SSID": "EC420_2dot4G_WPA2", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"EC420_2dot4G_WPA", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "EC420_2dot4G_WPA2-EAP" + }, + + "ecw5211": { + "profile_id": "27", + "fiveG_WPA2_SSID": "ECW5211_5G_WPA2", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "ECW5211_5G_WPA", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "ECW5211_5G_OPEN", + "fiveG_WPA2-EAP_SSID": "ECW5211_5G_WPA2-EAP", + "twoFourG_OPEN_SSID": "ECW5211_2dot4G_OPEN", + "twoFourG_WPA2_SSID": "ECW5211_2dot4G_WPA2", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"ECW5211_2dot4G_WPA", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "ECW5211_2dot4G_WPA2-EAP" + }, + + "ecw5410_nat": { + "profile_id": "68", + "fiveG_WPA2_SSID": "ECW5410_5G_WPA2_NAT", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "ECW5410_5G_WPA_NAT", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "ECW5410_5G_OPEN_NAT", + "fiveG_WPA2-EAP_SSID": "ECW5410_5G_WPA2-EAP_NAT", + "twoFourG_OPEN_SSID": "ECW5410_2dot4G_OPEN_NAT", + "twoFourG_WPA2_SSID": "ECW5410_2dot4G_WPA2_NAT", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"ECW5410_2dot4G_WPA_NAT", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "ECW5410_2dot4G_WPA2-EAP_NAT" + }, + + "ea8300_nat": { + "profile_id": "67", + "fiveG_WPA2_SSID": "EA8300_5G_WPA2_NAT", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "EA8300_5G_WPA_NAT", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "EA8300_5G_OPEN_NAT", + "fiveG_WPA2-EAP_SSID": "EA8300_5G_WPA2-EAP_NAT", + "twoFourG_OPEN_SSID": "EA8300_2dot4G_OPEN_NAT", + "twoFourG_WPA2_SSID": "EA8300_2dot4G_WPA2_NAT", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"EA8300_2dot4G_WPA_NAT", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "EA8300_2dot4G_WPA2-EAP_NAT" + }, + + "ec420_nat": { + "profile_id": "70", + "fiveG_WPA2_SSID": "EC420_5G_WPA2_NAT", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "EC420_5G_WPA_NAT", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "EC420_5G_OPEN_NAT", + "fiveG_WPA2-EAP_SSID": "EC420_5G_WPA2-EAP_NAT", + "twoFourG_OPEN_SSID": "EC420_2dot4G_OPEN_NAT", + "twoFourG_WPA2_SSID": "EC420_2dot4G_WPA2_NAT", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"EC420_2dot4G_WPA_NAT", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "EC420_2dot4G_WPA2-EAP_NAT" + }, + + "ecw5211_nat": { + "profile_id": "69", + "fiveG_WPA2_SSID": "ECW5211_5G_WPA2_NAT", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "ECW5211_5G_WPA_NAT", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "ECW5211_5G_OPEN_NAT", + "fiveG_WPA2-EAP_SSID": "ECW5211_5G_WPA2-EAP_NAT", + "twoFourG_OPEN_SSID": "ECW5211_2dot4G_OPEN_NAT", + "twoFourG_WPA2_SSID": "ECW5211_2dot4G_WPA2_NAT", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"ECW5211_2dot4G_WPA_NAT", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "ECW5211_2dot4G_WPA2-EAP_NAT" + }, + + "ecw5410_vlan": { + "profile_id": "338", + "fiveG_WPA2_SSID": "ECW5410_5G_WPA2_VLAN", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "ECW5410_5G_WPA_VLAN", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "ECW5410_5G_OPEN_VLAN", + "fiveG_WPA2-EAP_SSID": "ECW5410_5G_WPA2-EAP_VLAN", + "twoFourG_OPEN_SSID": "ECW5410_2dot4G_OPEN_VLAN", + "twoFourG_WPA2_SSID": "ECW5410_2dot4G_WPA2_VLAN", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"ECW5410_2dot4G_WPA_VLAN", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "ECW5410_2dot4G_WPA2-EAP_VLAN" + }, + + "ea8300_vlan": { + "profile_id": "319", + "fiveG_WPA2_SSID": "EA8300_5G_WPA2_VLAN", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "EA8300_5G_WPA_VLAN", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "EA8300_5G_OPEN_VLAN", + "fiveG_WPA2-EAP_SSID": "EA8300_5G_WPA2-EAP_VLAN", + "twoFourG_OPEN_SSID": "EA8300_2dot4G_OPEN_VLAN", + "twoFourG_WPA2_SSID": "EA8300_2dot4G_WPA2_VLAN", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"EA8300_2dot4G_WPA_VLAN", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "EA8300_2dot4G_WPA2-EAP_VLAN" + }, + + "ec420_vlan": { + "profile_id": "357", + "fiveG_WPA2_SSID": "EC420_5G_WPA2_VLAN", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "EC420_5G_WPA_VLAN", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "EC420_5G_OPEN_VLAN", + "fiveG_WPA2-EAP_SSID": "EC420_5G_WPA2-EAP_VLAN", + "twoFourG_OPEN_SSID": "EC420_2dot4G_OPEN_VLAN", + "twoFourG_WPA2_SSID": "EC420_2dot4G_WPA2_VLAN", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"EC420_2dot4G_WPA_VLAN", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "EC420_2dot4G_WPA2-EAP_VLAN" + }, + + "ecw5211_vlan": { + "profile_id": "364", + "fiveG_WPA2_SSID": "ECW5211_5G_WPA2_VLAN", + "fiveG_WPA2_PSK": "Connectus123$", + "fiveG_WPA_SSID": "ECW5211_5G_WPA_VLAN", + "fiveG_WPA_PSK": "Connectus123$", + "fiveG_OPEN_SSID": "ECW5211_5G_OPEN_VLAN", + "fiveG_WPA2-EAP_SSID": "ECW5211_5G_WPA2-EAP_VLAN", + "twoFourG_OPEN_SSID": "ECW5211_2dot4G_OPEN_VLAN", + "twoFourG_WPA2_SSID": "ECW5211_2dot4G_WPA2_VLAN", + "twoFourG_WPA2_PSK": "Connectus123$", + "twoFourG_WPA_SSID":"ECW5211_2dot4G_WPA_VLAN", + "twoFourG_WPA_PSK": "Connectus123$", + "twoFourG_WPA2-EAP_SSID": "ECW5211_2dot4G_WPA2-EAP_VLAN" + } +} \ No newline at end of file diff --git a/py-scripts/tip-cicd-sanity/sanity_status.json b/py-scripts/tip-cicd-sanity/sanity_status.json new file mode 100755 index 00000000..a0d9d4c0 --- /dev/null +++ b/py-scripts/tip-cicd-sanity/sanity_status.json @@ -0,0 +1 @@ +{"sanity_status": {"ea8300": "failed", "ecw5211": "failed", "ecw5410": "failed", "ec420": "failed"}, "sanity_run": {"new_data": "yes"}} \ No newline at end of file diff --git a/py-scripts/tip-cicd-sanity/single_client_throughput.py b/py-scripts/tip-cicd-sanity/single_client_throughput.py new file mode 100755 index 00000000..5330461e --- /dev/null +++ b/py-scripts/tip-cicd-sanity/single_client_throughput.py @@ -0,0 +1,1051 @@ +#!/usr/bin/env python3 + +# Script is based off of sta_connect2.py +# Script built for max throughput testing on a single client +# The main function of the script creates a station, then tests: +# 1. UDP Downstream (AP to STA) +# 2. UDP Upstream (STA to AP) +# 3. TCP Downstream (AP to STA) +# 4. TCP Upstream (STA to AP) +# The script will clean up the station and connections at the end of the test. + +import sys +import csv + +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit(1) + +if 'py-json' not in sys.path: + sys.path.append('../../py-json') + +import argparse +from LANforge import LFUtils +# from LANforge import LFCliBase +from LANforge import lfcli_base +from LANforge.lfcli_base import LFCliBase +from LANforge.LFUtils import * +import realm +from realm import Realm +import pprint + +OPEN="open" +WEP="wep" +WPA="wpa" +WPA2="wpa2" +MODE_AUTO=0 + +class SingleClient(LFCliBase): + def __init__(self, host, port, _dut_ssid="jedway-open-1", _dut_passwd="NA", _dut_bssid="", + _user="", _passwd="", _sta_mode="0", _radio="wiphy0", + _resource=1, _upstream_resource=1, _upstream_port="eth1", + _sta_name=None, debug_=False, _dut_security=OPEN, _exit_on_error=False, + _cleanup_on_exit=True, _runtime_sec=60, _exit_on_fail=False): + # do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn) + # that is py2 era syntax and will force self into the host variable, making you + # very confused. + super().__init__(host, port, _debug=debug_, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail) + self.debug = debug_ + self.dut_security = _dut_security + self.dut_ssid = _dut_ssid + self.dut_passwd = _dut_passwd + self.dut_bssid = _dut_bssid + self.user = _user + self.passwd = _passwd + self.sta_mode = _sta_mode # See add_sta LANforge CLI users guide entry + self.radio = _radio + self.resource = _resource + self.upstream_resource = _upstream_resource + self.upstream_port = _upstream_port + self.runtime_secs = _runtime_sec + self.cleanup_on_exit = _cleanup_on_exit + self.sta_url_map = None # defer construction + self.upstream_url = None # defer construction + self.station_names = [] + if _sta_name is not None: + self.station_names = [ _sta_name ] + # self.localrealm :Realm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6 + self.localrealm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6 + self.resulting_stations = {} + self.resulting_endpoints = {} + self.station_profile = None + self.l3_udp_profile = None + self.l3_tcp_profile = None + + # def get_realm(self) -> Realm: # py > 3.6 + def get_realm(self): + return self.localrealm + + def get_station_url(self, sta_name_=None): + if sta_name_ is None: + raise ValueError("get_station_url wants a station name") + if self.sta_url_map is None: + self.sta_url_map = {} + for sta_name in self.station_names: + self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name) + return self.sta_url_map[sta_name_] + + def get_upstream_url(self): + if self.upstream_url is None: + self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port) + return self.upstream_url + + # Compare pre-test values to post-test values + def compare_vals(self, name, postVal, print_pass=False, print_fail=True): + # print(f"Comparing {name}") + if postVal > 0: + self._pass("%s %s" % (name, postVal), print_pass) + else: + self._fail("%s did not report traffic: %s" % (name, postVal), print_fail) + + def remove_stations(self): + for name in self.station_names: + LFUtils.removePort(self.resource, name, self.lfclient_url) + + def num_associated(self, bssid): + counter = 0 + # print("there are %d results" % len(self.station_results)) + fields = "_links,port,alias,ip,ap,port+type" + self.station_results = self.localrealm.find_ports_like("sta*", fields, debug_=False) + if (self.station_results is None) or (len(self.station_results) < 1): + self.get_failed_result_list() + for eid,record in self.station_results.items(): + #print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") + #pprint(eid) + #pprint(record) + if record["ap"] == bssid: + counter += 1 + #print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") + return counter + + def clear_test_results(self): + self.resulting_stations = {} + self.resulting_endpoints = {} + super().clear_test_results() + #super(StaConnect, self).clear_test_results().test_results.clear() + + def setup(self): + self.clear_test_results() + self.check_connect() + upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False) + + if upstream_json is None: + self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True) + return False + + if upstream_json['interface']['ip'] == "0.0.0.0": + if self.debug: + pprint.pprint(upstream_json) + self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True) + return False + + # remove old stations + print("Removing old stations") + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + response = self.json_get(sta_url) + if (response is not None) and (response["interface"] is not None): + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names) + + # Create stations and turn dhcp on + self.station_profile = self.localrealm.new_station_profile() + + if self.dut_security == WPA2: + self.station_profile.use_security(security_type="wpa2", ssid=self.dut_ssid, passwd=self.dut_passwd) + elif self.dut_security == WPA: + self.station_profile.use_security(security_type="wpa", ssid=self.dut_ssid, passwd=self.dut_passwd) + elif self.dut_security == OPEN: + self.station_profile.use_security(security_type="open", ssid=self.dut_ssid, passwd="[BLANK]") + elif self.dut_security == WPA: + self.station_profile.use_security(security_type="wpa", ssid=self.dut_ssid, passwd=self.dut_passwd) + elif self.dut_security == WEP: + self.station_profile.use_security(security_type="wep", ssid=self.dut_ssid, passwd=self.dut_passwd) + self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) + + print("Adding new stations ", end="") + self.station_profile.create(radio=self.radio, sta_names_=self.station_names, up_=False, debug=self.debug, suppress_related_commands_=True) + LFUtils.wait_until_ports_appear(self.lfclient_url, self.station_names, debug=self.debug) + + def start(self): + if self.station_profile is None: + self._fail("Incorrect setup") + pprint.pprint(self.station_profile) + if self.station_profile.up is None: + self._fail("Incorrect station profile, missing profile.up") + if self.station_profile.up == False: + print("\nBringing ports up...") + data = {"shelf": 1, + "resource": self.resource, + "port": "ALL", + "probe_flags": 1} + self.json_post("/cli-json/nc_show_ports", data) + self.station_profile.admin_up() + LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names) + + # station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl())) + duration = 0 + maxTime = 100 + ip = "0.0.0.0" + ap = "" + print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="") + connected_stations = {} + while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime): + duration += 3 + time.sleep(10) + print(".", end="") + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + station_info = self.json_get(sta_url + "?fields=port,ip,ap") + + # LFUtils.debug_printer.pprint(station_info) + if (station_info is not None) and ("interface" in station_info): + if "ip" in station_info["interface"]: + ip = station_info["interface"]["ip"] + if "ap" in station_info["interface"]: + ap = station_info["interface"]["ap"] + + if (ap == "Not-Associated") or (ap == ""): + if self.debug: + print(" -%s," % sta_name, end="") + else: + if ip == "0.0.0.0": + if self.debug: + print(" %s (0.0.0.0)" % sta_name, end="") + else: + connected_stations[sta_name] = sta_url + data = { + "shelf":1, + "resource": self.resource, + "port": "ALL", + "probe_flags": 1 + } + self.json_post("/cli-json/nc_show_ports", data) + + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + station_info = self.json_get(sta_url) # + "?fields=port,ip,ap") + if station_info is None: + print("unable to query %s" % sta_url) + self.resulting_stations[sta_url] = station_info + ap = station_info["interface"]["ap"] + ip = station_info["interface"]["ip"] + if (ap != "") and (ap != "Not-Associated"): + print(" %s +AP %s, " % (sta_name, ap), end="") + if self.dut_bssid != "": + if self.dut_bssid.lower() == ap.lower(): + self._pass(sta_name+" connected to BSSID: " + ap) + # self.test_results.append("PASSED: ) + # print("PASSED: Connected to BSSID: "+ap) + else: + self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap)) + else: + self._fail(sta_name+" did not connect to AP") + return False + + if ip == "0.0.0.0": + self._fail("%s did not get an ip. Ending test" % sta_name) + else: + self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip)) + + if self.passes() == False: + if self.cleanup_on_exit: + print("Cleaning up...") + self.remove_stations() + return False + + def udp_profile(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu): + # Create UDP endpoint - Alex's code! + self.l3_udp_tput_profile = self.localrealm.new_l3_cx_profile() + self.l3_udp_tput_profile.side_a_min_bps = side_a_min_bps + self.l3_udp_tput_profile.side_b_min_bps = side_b_min_bps + self.l3_udp_tput_profile.side_a_min_pdu = side_a_min_pdu + self.l3_udp_tput_profile.side_b_min_pdu = side_b_min_pdu + self.l3_udp_tput_profile.report_timer = 1000 + self.l3_udp_tput_profile.name_prefix = "udp" + self.l3_udp_tput_profile.create(endp_type="lf_udp", + side_a=list(self.localrealm.find_ports_like("tput+")), + side_b="%d.%s" % (self.resource, self.upstream_port), + suppress_related_commands=True) + + def tcp_profile(self, side_a_min_bps, side_b_min_bps): + # Create TCP endpoints - original code! + self.l3_tcp_tput_profile = self.localrealm.new_l3_cx_profile() + self.l3_tcp_tput_profile.side_a_min_bps = side_a_min_bps + self.l3_tcp_tput_profile.side_b_min_bps = side_b_min_bps + self.l3_tcp_tput_profile.name_prefix = "tcp" + self.l3_tcp_tput_profile.report_timer = 1000 + self.l3_tcp_tput_profile.create(endp_type="lf_tcp", + side_a=list(self.localrealm.find_ports_like("tput+")), + side_b="%d.%s" % (self.resource, self.upstream_port), + suppress_related_commands=True) + + # Start UDP Downstream Traffic + def udp_throughput(self): + print("\nStarting UDP Traffic") + self.l3_udp_tput_profile.start_cx() + time.sleep(1) + self.l3_udp_tput_profile.refresh_cx() + + def tcp_throughput(self): + print("\nStarting TCP Traffic") + self.l3_tcp_tput_profile.start_cx() + time.sleep(1) + self.l3_tcp_tput_profile.refresh_cx() + + def udp_stop(self): + # stop cx traffic + print("Stopping CX Traffic") + self.l3_udp_tput_profile.stop_cx() + + # Refresh stats + print("\nRefresh CX stats") + self.l3_udp_tput_profile.refresh_cx() + + print("Sleeping for 5 seconds") + time.sleep(5) + + # get data for endpoints JSON + return self.collect_client_stats(self.l3_udp_tput_profile.created_cx) + # print("\n") + + def tcp_stop(self): + # stop cx traffic + print("Stopping CX Traffic") + self.l3_tcp_tput_profile.stop_cx() + + # Refresh stats + print("\nRefresh CX stats") + self.l3_tcp_tput_profile.refresh_cx() + + print("Sleeping for 5 seconds") + time.sleep(5) + + # get data for endpoints JSON + return self.collect_client_stats(self.l3_tcp_tput_profile.created_cx) + # print("\n") + + # New Endpoint code to print TX and RX numbers + def collect_client_stats(self, endp_map): + print("Collecting Data") + fields="?fields=name,tx+bytes,rx+bytes" + for (cx_name, endps) in endp_map.items(): + try: + endp_url = "/endp/%s%s" % (endps[0], fields) + endp_json = self.json_get(endp_url) + self.resulting_endpoints[endp_url] = endp_json + ptest_a_tx = endp_json['endpoint']['tx bytes'] + ptest_a_rx = endp_json['endpoint']['rx bytes'] + + # ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"]) + endp_url = "/endp/%s%s" % (endps[1], fields) + endp_json = self.json_get(endp_url) + self.resulting_endpoints[endp_url] = endp_json + ptest_b_tx = endp_json['endpoint']['tx bytes'] + ptest_b_rx = endp_json['endpoint']['rx bytes'] + + byte_values = [] + byte_values.append("Station TX: " + str(ptest_a_tx)) + byte_values.append("Station RX: " + str(ptest_a_rx)) + byte_values.append("AP TX: " + str(ptest_b_tx)) + byte_values.append("AP RX: " + str(ptest_b_rx)) + + return byte_values + + except Exception as e: + self.error(e) + + def cleanup_udp(self): + # remove all endpoints and cxs + if self.cleanup_on_exit: + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + curr_endp_names = [] + removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names()) + for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug) + + def cleanup_tcp(self): + # remove all endpoints and cxs + if self.cleanup_on_exit: + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + curr_endp_names = [] + removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names()) + for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug) + + def cleanup(self): + # remove all endpoints and cxs + if self.cleanup_on_exit: + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + curr_endp_names = [] + removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names()) + removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names()) + for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + removeEndps(self.lfclient_url, curr_endp_names, debug=self.debug) + + def udp_unidirectional(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line): + self.udp_profile(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu) + self.start() + print("Running", direction, "Traffic for %s seconds" % self.runtime_secs) + self.udp_throughput() + print("napping %f sec" % self.runtime_secs) + time.sleep(self.runtime_secs) + values = self.udp_stop() + print(values) + # Get value required for measurement + bytes = values[values_line] + # Get value in Bits and convert to Mbps + bits = (int(bytes.split(": ", 1)[1])) * 8 + mpbs = round((bits / 1000000) / self.runtime_secs, 2) + return mpbs + + def tcp_unidirectional(self, side_a_min_bps, side_b_min_bps, direction, values_line): + self.tcp_profile(side_a_min_bps, side_b_min_bps) + self.start() + print("Running", direction, "Traffic for %s seconds" % self.runtime_secs) + self.tcp_throughput() + print("napping %f sec" % self.runtime_secs) + time.sleep(self.runtime_secs) + values = self.tcp_stop() + print(values) + # Get value required for measurement + bytes = values[values_line] + # Get value in Bits and convert to Mbps + bits = (int(bytes.split(": ", 1)[1])) * 8 + mpbs = round((bits / 1000000) / self.runtime_secs, 2) + return mpbs + + def throughput_csv(csv_file, ssid_name, ap_model, firmware, security, udp_ds, udp_us, tcp_ds, tcp_us): + # Find band for CSV ---> This code is not great, it SHOULD get that info from LANForge! + if "5G" in ssid_name: + frequency = "5 GHz" + + elif "2dot4G" in ssid_name: + frequency = "2.4 GHz" + + else: + frequency = "Unknown" + + # Append row to top of CSV file + row = [ap_model, firmware, frequency, security, udp_ds, udp_us, tcp_ds, tcp_us] + + with open(csv_file, 'r') as readFile: + reader = csv.reader(readFile) + lines = list(reader) + lines.insert(1, row) + + with open(csv_file, 'w') as writeFile: + writer = csv.writer(writeFile) + writer.writerows(lines) + + readFile.close() + writeFile.close() + + +class SingleClientEAP(LFCliBase): + def __init__(self, host, port, security=None, ssid=None, sta_list=None, number_template="00000", _debug_on=False, _dut_bssid="", + _exit_on_error=False, _sta_name=None, _resource=1, radio="wiphy0", key_mgmt="WPA-EAP", eap="", identity="", + ttls_passwd="", hessid=None, ttls_realm="", domain="", _exit_on_fail=False, _cleanup_on_exit=True): + super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail) + self.host = host + self.port = port + self.ssid = ssid + self.radio = radio + self.security = security + #self.password = password + self.sta_list = sta_list + self.key_mgmt = key_mgmt + self.eap = eap + self.identity = identity + self.ttls_passwd = ttls_passwd + self.ttls_realm = ttls_realm + self.domain = domain + self.hessid = hessid + self.dut_bssid = _dut_bssid + self.timeout = 120 + self.number_template = number_template + self.debug = _debug_on + self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port) + self.station_profile = self.local_realm.new_station_profile() + self.station_profile.lfclient_url = self.lfclient_url + self.station_profile.ssid = self.ssid + self.station_profile.security = self.security + self.station_profile.number_template_ = self.number_template + self.station_profile.mode = 0 + #Added to test_ipv4_ttls code + self.upstream_url = None # defer construction + self.sta_url_map = None + self.upstream_resource = None + self.upstream_port = "eth2" + self.station_names = [] + if _sta_name is not None: + self.station_names = [_sta_name] + self.localrealm = Realm(lfclient_host=host, lfclient_port=port) + self.resource = _resource + self.cleanup_on_exit = _cleanup_on_exit + self.resulting_stations = {} + self.resulting_endpoints = {} + self.station_profile = None + self.l3_udp_profile = None + self.l3_tcp_profile = None + + # def get_realm(self) -> Realm: # py > 3.6 + def get_realm(self): + return self.localrealm + + def get_station_url(self, sta_name_=None): + if sta_name_ is None: + raise ValueError("get_station_url wants a station name") + if self.sta_url_map is None: + self.sta_url_map = {} + for sta_name in self.station_names: + self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name) + return self.sta_url_map[sta_name_] + + def get_upstream_url(self): + if self.upstream_url is None: + self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port) + return self.upstream_url + + # Compare pre-test values to post-test values + def compare_vals(self, name, postVal, print_pass=False, print_fail=True): + # print(f"Comparing {name}") + if postVal > 0: + self._pass("%s %s" % (name, postVal), print_pass) + else: + self._fail("%s did not report traffic: %s" % (name, postVal), print_fail) + + def remove_stations(self): + for name in self.station_names: + LFUtils.removePort(self.resource, name, self.lfclient_url) + + def num_associated(self, bssid): + counter = 0 + # print("there are %d results" % len(self.station_results)) + fields = "_links,port,alias,ip,ap,port+type" + self.station_results = self.localrealm.find_ports_like("eap*", fields, debug_=False) + if (self.station_results is None) or (len(self.station_results) < 1): + self.get_failed_result_list() + for eid,record in self.station_results.items(): + #print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") + #pprint(eid) + #pprint(record) + if record["ap"] == bssid: + counter += 1 + #print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") + return counter + + def clear_test_results(self): + self.resulting_stations = {} + self.resulting_endpoints = {} + super().clear_test_results() + #super(StaConnect, self).clear_test_results().test_results.clear() + + def setup(self): + self.clear_test_results() + self.check_connect() + upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False) + + if upstream_json is None: + self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True) + return False + + if upstream_json['interface']['ip'] == "0.0.0.0": + if self.debug: + pprint.pprint(upstream_json) + self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True) + return False + + # remove old stations + print("Removing old stations") + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + response = self.json_get(sta_url) + if (response is not None) and (response["interface"] is not None): + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names) + + # Create stations and turn dhcp on + self.station_profile = self.localrealm.new_station_profile() + + # Build stations + self.station_profile.use_security(self.security, self.ssid, passwd="[BLANK]") + self.station_profile.set_number_template(self.number_template) + print("Creating stations") + self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) + self.station_profile.set_command_param("set_port", "report_timer", 1500) + self.station_profile.set_command_flag("set_port", "rpt_timer", 1) + self.station_profile.set_wifi_extra(key_mgmt=self.key_mgmt, eap=self.eap, identity=self.identity, + passwd=self.ttls_passwd, + realm=self.ttls_realm, domain=self.domain, + hessid=self.hessid) + self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug, use_radius=True, hs20_enable=False) + self._pass("PASS: Station build finished") + + def start(self): + if self.station_profile is None: + self._fail("Incorrect setup") + pprint.pprint(self.station_profile) + if self.station_profile.up is None: + self._fail("Incorrect station profile, missing profile.up") + if self.station_profile.up == False: + print("\nBringing ports up...") + data = {"shelf": 1, + "resource": self.resource, + "port": "ALL", + "probe_flags": 1} + self.json_post("/cli-json/nc_show_ports", data) + self.station_profile.admin_up() + LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names) + + # station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl())) + duration = 0 + maxTime = 100 + ip = "0.0.0.0" + ap = "" + print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="") + connected_stations = {} + while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime): + duration += 3 + time.sleep(10) + print(".", end="") + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + station_info = self.json_get(sta_url + "?fields=port,ip,ap") + + # LFUtils.debug_printer.pprint(station_info) + if (station_info is not None) and ("interface" in station_info): + if "ip" in station_info["interface"]: + ip = station_info["interface"]["ip"] + if "ap" in station_info["interface"]: + ap = station_info["interface"]["ap"] + + if (ap == "Not-Associated") or (ap == ""): + if self.debug: + print(" -%s," % sta_name, end="") + else: + if ip == "0.0.0.0": + if self.debug: + print(" %s (0.0.0.0)" % sta_name, end="") + else: + connected_stations[sta_name] = sta_url + data = { + "shelf":1, + "resource": self.resource, + "port": "ALL", + "probe_flags": 1 + } + self.json_post("/cli-json/nc_show_ports", data) + + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + station_info = self.json_get(sta_url) # + "?fields=port,ip,ap") + if station_info is None: + print("unable to query %s" % sta_url) + self.resulting_stations[sta_url] = station_info + ap = station_info["interface"]["ap"] + ip = station_info["interface"]["ip"] + if (ap != "") and (ap != "Not-Associated"): + print(" %s +AP %s, " % (sta_name, ap), end="") + if self.dut_bssid != "": + if self.dut_bssid.lower() == ap.lower(): + self._pass(sta_name+" connected to BSSID: " + ap) + # self.test_results.append("PASSED: ) + # print("PASSED: Connected to BSSID: "+ap) + else: + self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap)) + else: + self._fail(sta_name+" did not connect to AP") + return False + + if ip == "0.0.0.0": + self._fail("%s did not get an ip. Ending test" % sta_name) + else: + self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip)) + + if self.passes() == False: + if self.cleanup_on_exit: + print("Cleaning up...") + self.remove_stations() + return False + + def udp_profile(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu): + # Create UDP endpoint - Alex's code! + self.l3_udp_tput_profile = self.localrealm.new_l3_cx_profile() + self.l3_udp_tput_profile.side_a_min_bps = side_a_min_bps + self.l3_udp_tput_profile.side_b_min_bps = side_b_min_bps + self.l3_udp_tput_profile.side_a_min_pdu = side_a_min_pdu + self.l3_udp_tput_profile.side_b_min_pdu = side_b_min_pdu + self.l3_udp_tput_profile.report_timer = 1000 + self.l3_udp_tput_profile.name_prefix = "udp" + self.l3_udp_tput_profile.create(endp_type="lf_udp", + side_a=list(self.localrealm.find_ports_like("tput+")), + side_b="%d.%s" % (self.resource, self.upstream_port), + suppress_related_commands=True) + + def tcp_profile(self, side_a_min_bps, side_b_min_bps): + # Create TCP endpoints - original code! + self.l3_tcp_tput_profile = self.localrealm.new_l3_cx_profile() + self.l3_tcp_tput_profile.side_a_min_bps = side_a_min_bps + self.l3_tcp_tput_profile.side_b_min_bps = side_b_min_bps + self.l3_tcp_tput_profile.name_prefix = "tcp" + self.l3_tcp_tput_profile.report_timer = 1000 + self.l3_tcp_tput_profile.create(endp_type="lf_tcp", + side_a=list(self.localrealm.find_ports_like("tput+")), + side_b="%d.%s" % (self.resource, self.upstream_port), + suppress_related_commands=True) + + # Start UDP Downstream Traffic + def udp_throughput(self): + print("\nStarting UDP Traffic") + self.l3_udp_tput_profile.start_cx() + time.sleep(1) + self.l3_udp_tput_profile.refresh_cx() + + def tcp_throughput(self): + print("\nStarting TCP Traffic") + self.l3_tcp_tput_profile.start_cx() + time.sleep(1) + self.l3_tcp_tput_profile.refresh_cx() + + def udp_stop(self): + # stop cx traffic + print("Stopping CX Traffic") + self.l3_udp_tput_profile.stop_cx() + + # Refresh stats + print("\nRefresh CX stats") + self.l3_udp_tput_profile.refresh_cx() + + print("Sleeping for 5 seconds") + time.sleep(5) + + # get data for endpoints JSON + return self.collect_client_stats(self.l3_udp_tput_profile.created_cx) + # print("\n") + + def tcp_stop(self): + # stop cx traffic + print("Stopping CX Traffic") + self.l3_tcp_tput_profile.stop_cx() + + # Refresh stats + print("\nRefresh CX stats") + self.l3_tcp_tput_profile.refresh_cx() + + print("Sleeping for 5 seconds") + time.sleep(5) + + # get data for endpoints JSON + return self.collect_client_stats(self.l3_tcp_tput_profile.created_cx) + # print("\n") + + # New Endpoint code to print TX and RX numbers + def collect_client_stats(self, endp_map): + print("Collecting Data") + fields="?fields=name,tx+bytes,rx+bytes" + for (cx_name, endps) in endp_map.items(): + try: + endp_url = "/endp/%s%s" % (endps[0], fields) + endp_json = self.json_get(endp_url) + self.resulting_endpoints[endp_url] = endp_json + ptest_a_tx = endp_json['endpoint']['tx bytes'] + ptest_a_rx = endp_json['endpoint']['rx bytes'] + + # ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"]) + endp_url = "/endp/%s%s" % (endps[1], fields) + endp_json = self.json_get(endp_url) + self.resulting_endpoints[endp_url] = endp_json + ptest_b_tx = endp_json['endpoint']['tx bytes'] + ptest_b_rx = endp_json['endpoint']['rx bytes'] + + byte_values = [] + byte_values.append("Station TX: " + str(ptest_a_tx)) + byte_values.append("Station RX: " + str(ptest_a_rx)) + byte_values.append("AP TX: " + str(ptest_b_tx)) + byte_values.append("AP RX: " + str(ptest_b_rx)) + + return byte_values + + except Exception as e: + self.error(e) + + def cleanup_udp(self): + # remove all endpoints and cxs + if self.cleanup_on_exit: + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + curr_endp_names = [] + removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names()) + for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug) + + def cleanup_tcp(self): + # remove all endpoints and cxs + if self.cleanup_on_exit: + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + curr_endp_names = [] + removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names()) + for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug) + + def cleanup(self): + # remove all endpoints and cxs + if self.cleanup_on_exit: + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + curr_endp_names = [] + removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names()) + removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names()) + for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items(): + curr_endp_names.append(endp_names[0]) + curr_endp_names.append(endp_names[1]) + removeEndps(self.lfclient_url, curr_endp_names, debug=self.debug) + + def udp_unidirectional(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line): + self.udp_profile(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu) + self.start() + print("Running", direction, "Traffic for %s seconds" % self.runtime_secs) + self.udp_throughput() + print("napping %f sec" % self.runtime_secs) + time.sleep(self.runtime_secs) + values = self.udp_stop() + print(values) + # Get value required for measurement + bytes = values[values_line] + # Get value in Bits and convert to Mbps + bits = (int(bytes.split(": ", 1)[1])) * 8 + mpbs = round((bits / 1000000) / self.runtime_secs, 2) + return mpbs + + def tcp_unidirectional(self, side_a_min_bps, side_b_min_bps, direction, values_line): + self.tcp_profile(side_a_min_bps, side_b_min_bps) + self.start() + print("Running", direction, "Traffic for %s seconds" % self.runtime_secs) + self.tcp_throughput() + print("napping %f sec" % self.runtime_secs) + time.sleep(self.runtime_secs) + values = self.tcp_stop() + print(values) + # Get value required for measurement + bytes = values[values_line] + # Get value in Bits and convert to Mbps + bits = (int(bytes.split(": ", 1)[1])) * 8 + mpbs = round((bits / 1000000) / self.runtime_secs, 2) + return mpbs +# ~class + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +########################## Test Code ################################ + +##Main will perform 4 throughput tests on SSID provided by input and return a list with the values + +def main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime): + ######## Establish Client Connection ######################### + singleClient = SingleClient("10.10.10.201", 8080, debug_=False) + singleClient.sta_mode = 0 + singleClient.upstream_resource = 1 + singleClient.upstream_port = "eth2" + singleClient.radio = radio + singleClient.resource = 1 + singleClient.dut_ssid = ssid_name + singleClient.dut_passwd = ssid_psk + singleClient.dut_security = security + singleClient.station_names = station + singleClient.runtime_secs = runtime + singleClient.cleanup_on_exit = True + + #Create List for Throughput Data + tput_data = [] + + ####### Setup UDP Profile and Run Traffic Downstream (AP to STA) ####################### + singleClient.setup() + side_a_min_bps = 56000 + side_b_min_bps = 500000000 + side_a_min_pdu = 1200 + side_b_min_pdu = 1500 + direction = "Downstream" + values_line = 1 # 1 = Station Rx + try: + udp_ds = singleClient.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line) + print("UDP Downstream:", udp_ds, "Mbps") + tput_data.append("UDP Downstream: " + str(udp_ds)) + except: + udp_ds = "error" + print("UDP Downstream Test Error") + tput_data.append("UDP Downstream: Error") + + + ####### Setup UDP Profile and Run Traffic Upstream (STA to AP) ####################### + #singleClient.setup() + side_a_min_bps = 500000000 + side_b_min_bps = 0 + side_a_min_pdu = 1200 + side_b_min_pdu = 1500 + direction = "Upstream" + values_line = 3 # 3 = AP Rx + try: + udp_us = singleClient.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line) + print("UDP Upstream:",udp_us,"Mbps") + tput_data.append("UDP Upstream: " + str(udp_us)) + except: + udp_us = "error" + print("UDP Upstream Test Error") + tput_data.append("UDP Upstream: Error") + #Cleanup UDP Endpoints + #singleClient.cleanup_udp() + + + ####### Setup TCP Profile and Run Traffic Downstream (AP to STA) ####################### + #singleClient.setup() + side_a_min_bps = 0 + side_b_min_bps = 500000000 + direction = "Downstream" + values_line = 1 # 1 = Station Rx + try: + tcp_ds = singleClient.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line) + print("TCP Downstream:",tcp_ds,"Mbps") + tput_data.append("TCP Downstream: " + str(tcp_ds)) + except: + tcp_ds = "error" + print("TCP Downstream Test Error") + tput_data.append("TCP Downstream: Error") + + ####### Setup TCP Profile and Run Traffic Upstream (STA to AP) ####################### + #singleClient.setup() + side_a_min_bps = 500000000 + side_b_min_bps = 0 + direction = "Upstream" + values_line = 3 # 3 = AP Rx + try: + tcp_us = singleClient.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line) + print("TCP Upstream:",tcp_us,"Mbps") + tput_data.append("TCP Upstream: " + str(tcp_us)) + except: + tcp_us = "error" + print("TCP Upstream Test Error") + tput_data.append("TCP Uptream: Error") + + #Cleanup TCP Endpoints + #singleClient.cleanup_tcp() + + #Cleanup Endpoints + singleClient.cleanup() + + return(tput_data) + +def eap_tput(sta_list, ssid_name, radio, security, eap_type, identity, ttls_password): + eap_connect = SingleClientEAP("10.10.10.201", 8080, _debug_on=True) + eap_connect.upstream_resource = 1 + eap_connect.upstream_port = "eth2" + eap_connect.security = security + eap_connect.sta_list = sta_list + eap_connect.station_names = sta_list + eap_connect.ssid = ssid_name + eap_connect.radio = radio + eap_connect.eap = eap_type + eap_connect.identity = identity + eap_connect.ttls_passwd = ttls_password + eap_connect.runtime_secs = 10 + + #Create List for Throughput Data + tput_data = [] + + ####### Setup UDP Profile and Run Traffic Downstream (AP to STA) ####################### + eap_connect.setup() + side_a_min_bps = 56000 + side_b_min_bps = 500000000 + side_a_min_pdu = 1200 + side_b_min_pdu = 1500 + direction = "Downstream" + values_line = 1 # 1 = Station Rx + try: + udp_ds = eap_connect.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line) + print("UDP Downstream:", udp_ds, "Mbps") + tput_data.append("UDP Downstream: " + str(udp_ds)) + except: + udp_ds = "error" + print("UDP Downstream Test Error") + tput_data.append("UDP Downstream: Error") + + + ####### Setup UDP Profile and Run Traffic Upstream (STA to AP) ####################### + #singleClient.setup() + side_a_min_bps = 500000000 + side_b_min_bps = 0 + side_a_min_pdu = 1200 + side_b_min_pdu = 1500 + direction = "Upstream" + values_line = 3 # 3 = AP Rx + try: + udp_us = eap_connect.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line) + print("UDP Upstream:",udp_us,"Mbps") + tput_data.append("UDP Upstream: " + str(udp_us)) + except: + udp_us = "error" + print("UDP Upstream Test Error") + tput_data.append("UDP Upstream: Error") + #Cleanup UDP Endpoints + #singleClient.cleanup_udp() + + + ####### Setup TCP Profile and Run Traffic Downstream (AP to STA) ####################### + #singleClient.setup() + side_a_min_bps = 0 + side_b_min_bps = 500000000 + direction = "Downstream" + values_line = 1 # 1 = Station Rx + try: + tcp_ds = eap_connect.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line) + print("TCP Downstream:",tcp_ds,"Mbps") + tput_data.append("TCP Downstream: " + str(tcp_ds)) + except: + tcp_ds = "error" + print("TCP Downstream Test Error") + tput_data.append("TCP Downstream: Error") + + ####### Setup TCP Profile and Run Traffic Upstream (STA to AP) ####################### + #singleClient.setup() + side_a_min_bps = 500000000 + side_b_min_bps = 0 + direction = "Upstream" + values_line = 3 # 3 = AP Rx + try: + tcp_us = eap_connect.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line) + print("TCP Upstream:",tcp_us,"Mbps") + tput_data.append("TCP Upstream: " + str(tcp_us)) + except: + tcp_us = "error" + print("TCP Upstream Test Error") + tput_data.append("TCP Uptream: Error") + + #Cleanup TCP Endpoints + #singleClient.cleanup_tcp() + + #Cleanup Endpoints + eap_connect.cleanup() + + return(tput_data) diff --git a/py-scripts/tip-cicd-sanity/testrail_api.py b/py-scripts/tip-cicd-sanity/testrail_api.py new file mode 100644 index 00000000..ecabe72b --- /dev/null +++ b/py-scripts/tip-cicd-sanity/testrail_api.py @@ -0,0 +1,207 @@ +"""TestRail API binding for Python 3.x. + +""" + +import base64 +import json + +import requests +from pprint import pprint +import os +tr_user=os.getenv('TR_USER') +tr_pw=os.getenv('TR_PWD') + +class APIClient: + def __init__(self, base_url): + self.user = tr_user + self.password = tr_pw + if not base_url.endswith('/'): + base_url += '/' + self.__url = base_url + 'index.php?/api/v2/' + + + def send_get(self, uri, filepath=None): + """Issue a GET request (read) against the API. + + Args: + uri: The API method to call including parameters, e.g. get_case/1. + filepath: The path and file name for attachment download; used only + for 'get_attachment/:attachment_id'. + + Returns: + A dict containing the result of the request. + """ + return self.__send_request('GET', uri, filepath) + + def send_post(self, uri, data): + """Issue a POST request (write) against the API. + + Args: + uri: The API method to call, including parameters, e.g. add_case/1. + data: The data to submit as part of the request as a dict; strings + must be UTF-8 encoded. If adding an attachment, must be the + path to the file. + + Returns: + A dict containing the result of the request. + """ + return self.__send_request('POST', uri, data) + + def __send_request(self, method, uri, data): + url = self.__url + uri + + auth = str( + base64.b64encode( + bytes('%s:%s' % (self.user, self.password), 'utf-8') + ), + 'ascii' + ).strip() + headers = {'Authorization': 'Basic ' + auth} + #print("Method =" , method) + + if method == 'POST': + if uri[:14] == 'add_attachment': # add_attachment API method + files = {'attachment': (open(data, 'rb'))} + response = requests.post(url, headers=headers, files=files) + files['attachment'].close() + else: + headers['Content-Type'] = 'application/json' + payload = bytes(json.dumps(data), 'utf-8') + response = requests.post(url, headers=headers, data=payload) + else: + headers['Content-Type'] = 'application/json' + response = requests.get(url, headers=headers) + #print("headers = ", headers) + #print("resonse=", response) + #print("response code =", response.status_code) + + if response.status_code > 201: + + try: + error = response.json() + except: # response.content not formatted as JSON + error = str(response.content) + #raise APIError('TestRail API returned HTTP %s (%s)' % (response.status_code, error)) + print('TestRail API returned HTTP %s (%s)' % (response.status_code, error)) + return + else: + print(uri[:15]) + if uri[:15] == 'get_attachments': # Expecting file, not JSON + try: + print('opening file') + print (str(response.content)) + open(data, 'wb').write(response.content) + print('opened file') + return (data) + except: + return ("Error saving attachment.") + else: + + try: + return response.json() + except: # Nothing to return + return {} + + def get_project_id(self, project_name): + "Get the project ID using project name" + project_id = None + projects = client.send_get('get_projects') + #pprint(projects) + for project in projects: + if project['name']== project_name: + project_id = project['id'] + # project_found_flag=True + break + print("project Id =",project_id) + return project_id + + def get_run_id(self, test_run_name): + "Get the run ID using test name and project name" + run_id = None + project_id = client.get_project_id(project_name='WLAN') + + try: + test_runs = client.send_get('get_runs/%s' % (project_id)) + #print("------------TEST RUNS----------") + #pprint(test_runs) + + except Exception: + print + 'Exception in update_testrail() updating TestRail.' + return None + else: + for test_run in test_runs: + if test_run['name'] == test_run_name: + run_id = test_run['id'] + #print("run Id in Test Runs=",run_id) + break + return run_id + + + def update_testrail(self, case_id, run_id, status_id, msg): + "Update TestRail for a given run_id and case_id" + update_flag = False + # Get the TestRail client account details + # Update the result in TestRail using send_post function. + # Parameters for add_result_for_case is the combination of runid and case id. + # status_id is 1 for Passed, 2 For Blocked, 4 for Retest and 5 for Failed + #status_id = 1 if result_flag is True else 5 + + print("result status Pass/Fail = ", status_id) + print("case id=", case_id) + print("run id passed to update is ", run_id, case_id) + if run_id is not None: + try: + result = client.send_post( + 'add_result_for_case/%s/%s' % (run_id, case_id), + {'status_id': status_id, 'comment': msg}) + print("result in post",result) + except Exception: + print + 'Exception in update_testrail() updating TestRail.' + + else: + print + 'Updated test result for case: %s in test run: %s with msg:%s' % (case_id, run_id, msg) + + return update_flag + + def create_testrun(self, name, case_ids, project_id, milestone_id): + result = client.send_post( + 'add_run/%s' % (project_id), + {'name': name, 'case_ids': case_ids, 'milestone_id': milestone_id, 'include_all': False}) + print("result in post", result) +client: APIClient = APIClient('https://telecominfraproject.testrail.com') +client.user = 'syama.devi@connectus.ai' +client.password = 'Connectus123$' + + +###Old Demo Code +#case = client.send_get('get_case/936') +#print("---------TEST CASE 1---------") +#pprint(case) +#case = client.send_get('get_case/937') +#print("---------TEST CASE 2---------") +#pprint(case) +#print ("----------TEST Project ID----------") +#proj_id = client.get_project_id(project_name= "WLAN") +#pprint(proj_id) + +#REST API POSTMAN PROJECT +#projId = client.get_project_id(project_name= "REST-API-POSTMAN") +#pprint("REST API POSTMAN PROJECT ID IS :", projId) + +#print("---------TEST RUN ID-----------") +#rid = client.get_run_id(test_run_name='Master',project_name='WLAN') +#rid=client.get_run_id(test_run_name= 'Master-Run3') +#pprint(rid) + +#result: bool= client.update_testrail(case_id = 1, run_id=rid, status_id = 5, msg ='Test Failed') + +#result = client.send_get('get_attachment/:1', '/Users/syamadevi/Desktop/syama/python-test/TestRail/testreport.pdf') +#print(result) +#project_report= client.send_get("get_reports/:%s" %proj_id) +#print(project_report) + +class APIError(Exception): + pass