From 2f47a3aaba655f3e9d7280354c1de45a9b33b709 Mon Sep 17 00:00:00 2001 From: bealler Date: Mon, 23 Nov 2020 13:01:11 -0500 Subject: [PATCH] Modify Nightly_Sanity.py to use sta_connect2.py script, Remove test_connect.py, add user notes and README file --- py-scripts/tip-cicd-sanity/Nightly_Sanity.py | 137 +++--- py-scripts/tip-cicd-sanity/README.md | 61 +++ py-scripts/tip-cicd-sanity/ap_ssh.py | 6 + py-scripts/tip-cicd-sanity/cloudsdk.py | 7 + py-scripts/tip-cicd-sanity/cluster_version.py | 11 +- py-scripts/tip-cicd-sanity/eap_connect.py | 7 + py-scripts/tip-cicd-sanity/lab_ap_info.py | 10 +- .../reports/report_template.php | 262 +++++++++++ .../single_client_throughput.py | 13 + py-scripts/tip-cicd-sanity/test_connect2.py | 441 ------------------ py-scripts/tip-cicd-sanity/testrail_api.py | 6 + 11 files changed, 448 insertions(+), 513 deletions(-) create mode 100644 py-scripts/tip-cicd-sanity/README.md create mode 100755 py-scripts/tip-cicd-sanity/reports/report_template.php delete mode 100755 py-scripts/tip-cicd-sanity/test_connect2.py diff --git a/py-scripts/tip-cicd-sanity/Nightly_Sanity.py b/py-scripts/tip-cicd-sanity/Nightly_Sanity.py index 9c2576a0..75481b28 100755 --- a/py-scripts/tip-cicd-sanity/Nightly_Sanity.py +++ b/py-scripts/tip-cicd-sanity/Nightly_Sanity.py @@ -43,8 +43,11 @@ if 'py-json' not in sys.path: from LANforge.LFUtils import * # if you lack __init__.py in this directory you will not find sta_connect module# -import test_connect2 -from test_connect2 import TestConnect2 +if 'py-json' not in sys.path: + sys.path.append('../../py-scripts') + +import sta_connect2 +from sta_connect2 import StaConnect2 import testrail_api from testrail_api import APIClient import eap_connect @@ -57,8 +60,6 @@ from ap_ssh import ssh_cli_active_fw from ap_ssh import iwinfo_status import cluster_version -print(os.environ) - ### Set CloudSDK URL ### cloudSDK_url=os.getenv('CLOUD_SDK_URL') ### Directories @@ -127,31 +128,32 @@ class GetBuild: class RunTest: def Single_Client_Connectivity(self, port, radio, ssid_name, ssid_psk, security, station, test_case, rid): '''SINGLE CLIENT CONNECTIVITY using test_connect2.py''' - testConnect = TestConnect2("10.10.10.201", 8080, debug_=False) - testConnect.sta_mode = 0 - testConnect.upstream_resource = 1 - testConnect.upstream_port = port - testConnect.radio = radio - testConnect.resource = 1 - testConnect.dut_ssid = ssid_name - testConnect.dut_passwd = ssid_psk - testConnect.dut_security = security - testConnect.station_names = station - testConnect.runtime_secs = 10 - testConnect.cleanup_on_exit = True - # testConnect.cleanup() - testConnect.setup() - testConnect.start() - print("napping %f sec" % testConnect.runtime_secs) - time.sleep(testConnect.runtime_secs) - testConnect.stop() - testConnect.cleanup() - run_results = testConnect.get_result_list() + staConnect = StaConnect2("10.10.10.201", 8080, debug_=False) + staConnect.sta_mode = 0 + staConnect.upstream_resource = 1 + staConnect.upstream_port = port + staConnect.radio = radio + staConnect.resource = 1 + staConnect.dut_ssid = ssid_name + staConnect.dut_passwd = ssid_psk + staConnect.dut_security = security + staConnect.station_names = station + staConnect.sta_prefix = 'test' + staConnect.runtime_secs = 10 + staConnect.cleanup_on_exit = True + # staConnect.cleanup() + staConnect.setup() + staConnect.start() + print("napping %f sec" % staConnect.runtime_secs) + time.sleep(staConnect.runtime_secs) + staConnect.stop() + staConnect.cleanup() + run_results = staConnect.get_result_list() for result in run_results: print("test result: " + result) # result = 'pass' - print("Single Client Connectivity :", testConnect.passes) - if testConnect.passes() == True: + print("Single Client Connectivity :", staConnect.passes) + if staConnect.passes() == True: print("Single client connection to", ssid_name, "successful. Test Passed") client.update_testrail(case_id=test_case, run_id=rid, status_id=1, msg='Client connectivity passed') logger.info("Client connectivity to " + ssid_name + " Passed") @@ -274,7 +276,7 @@ ap_models = ["ec420","ea8300","ecw5211","ecw5410"] print("Getting CloudSDK version information...") try: cluster_ver = cluster_version.main() - #print(cluster_ver) + print(cluster_ver) print("CloudSDK Version Information:") print("-------------------------------------------") @@ -295,6 +297,48 @@ except: "projectVersion" : "unknown" } +############################################################################ +#################### Create Report ######################################### +############################################################################ + +# Create Report Folder for Today +today = str(date.today()) +try: + os.mkdir(report_path+today) +except OSError: + print ("Creation of the directory %s failed" % report_path) +else: + print ("Successfully created the directory %s " % report_path) + +#Copy report template to folder +try: + copyfile(report_template,report_path+today+'/report.php') +except: + print("No report template created. Report data will still be saved. Continuing with tests...") + +##Create report_data dictionary +tc_results = dict.fromkeys(test_cases, "not run") + +report_data = dict() +report_data['cloud_sdk'] = cloudsdk_cluster_info +report_data["fw_available"] = dict.fromkeys(ap_models,"Unknown") +report_data["fw_under_test"] = dict.fromkeys(ap_models,"N/A") +report_data['pass_percent'] = dict.fromkeys(ap_models,"") +report_data['tests'] = { + "ea8300" : "", + "ecw5211": "", + "ecw5410": "", + "ec420": "" + } +for key in ap_models: + report_data['tests'][key] = dict.fromkeys(test_cases,"not run") + +print(report_data) + +#write to report_data contents to json file so it has something in case of unexpected fail +with open(report_path+today+'/report_data.json', 'w') as report_json_file: + json.dump(report_data, report_json_file) + ###Get Cloud Bearer Token bearer = CloudSDK.get_bearer(cloudSDK_url) @@ -348,45 +392,6 @@ else: logger.info("New loads have been created on CloudSDK") #print("Latest FW List:",ap_latest_dict) -############################################################################ -#################### Create Report ######################################### -############################################################################ - -# Create Report Folder for Today -today = str(date.today()) -try: - os.mkdir(report_path+today) -except OSError: - print ("Creation of the directory %s failed" % report_path) -else: - print ("Successfully created the directory %s " % report_path) - -#Copy report template to folder -copyfile(report_template,report_path+today+'/report.php') - -##Create report_data dictionary -tc_results = dict.fromkeys(test_cases, "not run") - -report_data = dict() -report_data['cloud_sdk'] = cloudsdk_cluster_info -report_data["fw_available"] = dict.fromkeys(ap_models,"Unknown") -report_data["fw_under_test"] = dict.fromkeys(ap_models,"N/A") -report_data['pass_percent'] = dict.fromkeys(ap_models,"") -report_data['tests'] = { - "ea8300" : "", - "ecw5211": "", - "ecw5410": "", - "ec420": "" - } -for key in ap_models: - report_data['tests'][key] = dict.fromkeys(test_cases,"not run") - - -#write to report_data contents to json file so it has something in case of unexpected fail -with open(report_path+today+'/report_data.json', 'w') as report_json_file: - json.dump(report_data, report_json_file) - -#################################################################################### #################################################################################### ############ Update FW and Run Test Cases on Each AP Variant ####################### #################################################################################### diff --git a/py-scripts/tip-cicd-sanity/README.md b/py-scripts/tip-cicd-sanity/README.md new file mode 100644 index 00000000..872aa2ab --- /dev/null +++ b/py-scripts/tip-cicd-sanity/README.md @@ -0,0 +1,61 @@ +# TIP CICD Sanity Scripts +This directory contains scripts and modules designed for wlan sanity testing. + +## Nightly Sanity +This script is used to look for and test new firmware available for the APs. AP equipment IDs and SSID information used in test is stored in the lab_ap_info file + +1. Check current CloudSDK version +2. Find latest dev firmware on TIP jfrog and create instances on CloudSDK (if necessary) +3. Create report_data.json file with information about versions and test case pass/fail +4. For each AP model: + 1. Check current AP firmware *(If AP already running newest firmware, test will skip)* + 2. Create Testrail test run with required test cases included + 3. Upgrade AP via CloudSDK API + 4. Check if AP upgrade and CloudSDK connection successful + 5. Test client connectivity on SSID modes, marking TestRail and report_data.json with pass/fail + 6. Update sanity_status.json with **overall** pass/fail + +## Throughput Test +This script is used to test UDP and TCP throughput on different modes of SSIDs, on multiple AP models. It is designed to run on APs that have successfully passed through Nightly_Sanity test. + +For each AP model: +1) Read sanity_status.json to see if throughput should be run, if yes: + 1) Run throughput tests on SSIDs modes + 2) Record results to CSV file +2) Update sanity_status.json that throughput tests have been run + +## Environment Variables +Environment variables required to run scripts and modules in this directory +#### Nightly_Sanity.py +CLOUD_SDK_URL: CloudSDK URL for API calls +SANITY_LOG_DIR: Logger file directory +SANITY_REPORT_DIR: Report file directory +REPORT_TEMPLATE: Report template path +TR_USER: TestRail Username +TR_PWD: TestRail Password +MILESTONE: TestRail Milestone ID +PROJECT_ID: TestRail Project ID +EAP_IDENTITY: EAP identity for testing +EAP_PWD: EAP password for testing +AP_USER: Username for AP +JFROG_USER: Jfrog username +JFROG_PWD: Jfrog password + +#### cloudsdk.py +CLOUDSDK_USER: CloudSDK username +CLOUDSDK_PWD: CloudSDK password + +#### cluster_version.py +AWS_USER: CloudSDK username +AWS_PWD: CloudSDK password + +#### testrail_api.py +TR_USER tr_user TestRail Username +TR_PWD tr_pw TestRail Password + +#### Throughput_Test.py +CLOUD_SDK_URL: CloudSDK URL for API calls +CSV_PATH: Path for CSV file +EAP_IDENTITY: EAP identity for testing +EAP_PWD: EAP password for testing +TPUT_LOG_DIR: Logger file directory diff --git a/py-scripts/tip-cicd-sanity/ap_ssh.py b/py-scripts/tip-cicd-sanity/ap_ssh.py index 22b59b0d..8366ba3e 100755 --- a/py-scripts/tip-cicd-sanity/ap_ssh.py +++ b/py-scripts/tip-cicd-sanity/ap_ssh.py @@ -1,3 +1,9 @@ +################################################################################## +# Module contains functions to get specific data from AP CLI using SSH +# +# Used by Nightly_Sanity and Throughput_Test ##################################### +################################################################################## + import paramiko from paramiko import SSHClient import socket diff --git a/py-scripts/tip-cicd-sanity/cloudsdk.py b/py-scripts/tip-cicd-sanity/cloudsdk.py index 27d09965..597d27ab 100755 --- a/py-scripts/tip-cicd-sanity/cloudsdk.py +++ b/py-scripts/tip-cicd-sanity/cloudsdk.py @@ -1,5 +1,12 @@ #!/usr/bin/python3 +################################################################################## +# Module contains functions to interact with CloudSDK using APIs +# Start by calling get_bearer to obtain bearer token, then other APIs can be used +# +# Used by Nightly_Sanity and Throughput_Test ##################################### +################################################################################## + import base64 import urllib.request from bs4 import BeautifulSoup diff --git a/py-scripts/tip-cicd-sanity/cluster_version.py b/py-scripts/tip-cicd-sanity/cluster_version.py index 780d3060..baba0da5 100644 --- a/py-scripts/tip-cicd-sanity/cluster_version.py +++ b/py-scripts/tip-cicd-sanity/cluster_version.py @@ -1,3 +1,10 @@ +################################################################################## +# Used to login to AWS and determine current CloudSDK version +# Requires user to install chromedriver and specify path +# +# Used by Nightly_Sanity ######################################################### +################################################################################## + import time from threading import Thread @@ -10,11 +17,13 @@ import subprocess user=os.getenv('AWS_USER') password=os.getenv('AWS_PWD') +chromedriver_dir=os.getenv('CHROMEDRIVER_PATH') +print(chromedriver_dir) def main(): chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("--headless") - driver = webdriver.Chrome(executable_path='/home/netex/chromedriver', options=chrome_options) + driver = webdriver.Chrome(executable_path=chromedriver_dir, options=chrome_options) driver.get("https://telecominfraproject.awsapps.com/start#") time.sleep(10); elem = driver.find_element_by_xpath('//*[@id="awsui-input-0"]') diff --git a/py-scripts/tip-cicd-sanity/eap_connect.py b/py-scripts/tip-cicd-sanity/eap_connect.py index ae19a170..dfd3cebb 100755 --- a/py-scripts/tip-cicd-sanity/eap_connect.py +++ b/py-scripts/tip-cicd-sanity/eap_connect.py @@ -1,5 +1,12 @@ #!/usr/bin/env python3 +######################################################################################################### +# Built to allow connection and test of clients using EAP-TTLS. +# Functions can be called to create a station, create TCP and UDP traffic, run it a short amount of time. +# +# Used by Nightly_Sanity and Throughput_Test ############################################################ +######################################################################################################### + # This will create a station, create TCP and UDP traffic, run it a short amount of time, # and verify whether traffic was sent and received. It also verifies the station connected # to the requested BSSID if bssid is specified as an argument. diff --git a/py-scripts/tip-cicd-sanity/lab_ap_info.py b/py-scripts/tip-cicd-sanity/lab_ap_info.py index 4eac4232..8ce39b16 100755 --- a/py-scripts/tip-cicd-sanity/lab_ap_info.py +++ b/py-scripts/tip-cicd-sanity/lab_ap_info.py @@ -33,10 +33,10 @@ sanity_status = { ##Equipment IDs for Lab APs under test equipment_id_dict = { - "ea8300": "13", - "ecw5410": "12", - "ecw5211": "6", - "ec420": "11" + "ea8300": "19", + "ecw5410": "20", + "ecw5211": "21", + "ec420": "27" } equipment_ip_dict = { @@ -245,4 +245,4 @@ profile_info_dict = { "twoFourG_WPA_PSK": "Connectus123$", "twoFourG_WPA2-EAP_SSID": "ECW5211_2dot4G_WPA2-EAP_VLAN" } -} \ No newline at end of file +} diff --git a/py-scripts/tip-cicd-sanity/reports/report_template.php b/py-scripts/tip-cicd-sanity/reports/report_template.php new file mode 100755 index 00000000..0601f798 --- /dev/null +++ b/py-scripts/tip-cicd-sanity/reports/report_template.php @@ -0,0 +1,262 @@ + + + + +Testing Report + + + + + + + + +
+

CICD Nightly Sanity Report -

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +

Test Results

+
Test CaseDescriptionEA8300 ResultECW5211 ResultECW5410 ResultEC420 Result
New FW Available
FW Under Test
Test Pass Rate%%%%
2233AP Upgrade Successful
5247CloudSDK Reports Correct FW
5222AP-CloudSDK Connection Active
5214Client connectivity to 2.4 GHz WPA2-EAP - Bridge Mode
2237Client connectivity to 2.4 GHz WPA2 - Bridge Mode
2420Client connectivity to 2.4 GHz WPA - Bridge Mode
5215Client connectivity to 5 GHz WPA2-EAP - Bridge Mode
2236Client connectivity to 5 GHz WPA2 - Bridge Mode
2419Client connectivity to 5 GHz WPA - Bridge Mode
5216Client connectivity to 2.4 GHz WPA2-EAP - NAT Mode
4325Client connectivity to 2.4 GHz WPA2 - NAT Mode
4323Client connectivity to 2.4 GHz WPA - NAT Mode
5217Client connectivity to 5 GHz WPA2-EAP - NAT Mode
4326Client connectivity to 5 GHz WPA2 - NAT Mode
4324Client connectivity to 5 GHz WPA - NAT Mode
5253Client connectivity to 2.4 GHz WPA2-EAP - Custom VLAN
5251Client connectivity to 2.4 GHz WPA2 - Custom VLAN
5252Client connectivity to 2.4 GHz WPA - Custom VLAN
5250Client connectivity to 5 GHz WPA2-EAP - Custom VLAN
5248Client connectivity to 5 GHz WPA2 - Custom VLAN
5249Client connectivity to 5 GHz WPA - Custom VLAN
+ + \ No newline at end of file diff --git a/py-scripts/tip-cicd-sanity/single_client_throughput.py b/py-scripts/tip-cicd-sanity/single_client_throughput.py index 5330461e..53b71167 100755 --- a/py-scripts/tip-cicd-sanity/single_client_throughput.py +++ b/py-scripts/tip-cicd-sanity/single_client_throughput.py @@ -1,5 +1,18 @@ #!/usr/bin/env python3 +#################################################################################### +# Script is based off of LANForge sta_connect2.py +# Script built for max throughput testing on a single client +# The main function of the script creates a station, then tests: +# 1. UDP Downstream (AP to STA) +# 2. UDP Upstream (STA to AP) +# 3. TCP Downstream (AP to STA) +# 4. TCP Upstream (STA to AP) +# The script will clean up the station and connections at the end of the test. +# +# Used by Throughput_Test ########################################################### +#################################################################################### + # Script is based off of sta_connect2.py # Script built for max throughput testing on a single client # The main function of the script creates a station, then tests: diff --git a/py-scripts/tip-cicd-sanity/test_connect2.py b/py-scripts/tip-cicd-sanity/test_connect2.py deleted file mode 100755 index 9bc71dab..00000000 --- a/py-scripts/tip-cicd-sanity/test_connect2.py +++ /dev/null @@ -1,441 +0,0 @@ -#!/usr/bin/env python3 - -# Custom NetExperience version of sta_connect2.py - which changest he script to look for prefix "test" instead of "sta" - -# This will create a station, create TCP and UDP traffic, run it a short amount of time, -# and verify whether traffic was sent and received. It also verifies the station connected -# to the requested BSSID if bssid is specified as an argument. -# The script will clean up the station and connections at the end of the test. - -import sys - -if sys.version_info[0] != 3: - print("This script requires Python 3") - exit(1) - -if 'py-json' not in sys.path: - sys.path.append('../../py-json') - -import argparse -import LANforge -from LANforge import LFUtils -# from LANforge import LFCliBase -from LANforge import lfcli_base -from LANforge.lfcli_base import LFCliBase -from LANforge.LFUtils import * -import realm -from realm import Realm -import pprint - -OPEN="open" -WEP="wep" -WPA="wpa" -WPA2="wpa2" -MODE_AUTO=0 - -class TestConnect2(LFCliBase): - def __init__(self, host, port, _dut_ssid="jedway-open-1", _dut_passwd="NA", _dut_bssid="", - _user="", _passwd="", _sta_mode="0", _radio="wiphy0", - _resource=1, _upstream_resource=1, _upstream_port="eth1", - _sta_name=None, debug_=False, _dut_security=OPEN, _exit_on_error=False, - _cleanup_on_exit=True, _runtime_sec=60, _exit_on_fail=False): - # do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn) - # that is py2 era syntax and will force self into the host variable, making you - # very confused. - super().__init__(host, port, _debug=debug_, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail) - self.debug = debug_ - self.dut_security = _dut_security - self.dut_ssid = _dut_ssid - self.dut_passwd = _dut_passwd - self.dut_bssid = _dut_bssid - self.user = _user - self.passwd = _passwd - self.sta_mode = _sta_mode # See add_sta LANforge CLI users guide entry - self.radio = _radio - self.resource = _resource - self.upstream_resource = _upstream_resource - self.upstream_port = _upstream_port - self.runtime_secs = _runtime_sec - self.cleanup_on_exit = _cleanup_on_exit - self.sta_url_map = None # defer construction - self.upstream_url = None # defer construction - self.station_names = [] - if _sta_name is not None: - self.station_names = [ _sta_name ] - # self.localrealm :Realm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6 - self.localrealm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6 - self.resulting_stations = {} - self.resulting_endpoints = {} - self.station_profile = None - self.l3_udp_profile = None - self.l3_tcp_profile = None - - # def get_realm(self) -> Realm: # py > 3.6 - def get_realm(self): - return self.localrealm - - def get_station_url(self, sta_name_=None): - if sta_name_ is None: - raise ValueError("get_station_url wants a station name") - if self.sta_url_map is None: - self.sta_url_map = {} - for sta_name in self.station_names: - self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name) - return self.sta_url_map[sta_name_] - - def get_upstream_url(self): - if self.upstream_url is None: - self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port) - return self.upstream_url - - # Compare pre-test values to post-test values - def compare_vals(self, name, postVal, print_pass=False, print_fail=True): - # print(f"Comparing {name}") - if postVal > 0: - self._pass("%s %s" % (name, postVal), print_pass) - else: - self._fail("%s did not report traffic: %s" % (name, postVal), print_fail) - - def remove_stations(self): - for name in self.station_names: - LFUtils.removePort(self.resource, name, self.lfclient_url) - - def num_associated(self, bssid): - counter = 0 - # print("there are %d results" % len(self.station_results)) - fields = "_links,port,alias,ip,ap,port+type" - self.station_results = self.localrealm.find_ports_like("test*", fields, debug_=False) - if (self.station_results is None) or (len(self.station_results) < 1): - self.get_failed_result_list() - for eid,record in self.station_results.items(): - #print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") - #pprint(eid) - #pprint(record) - if record["ap"] == bssid: - counter += 1 - #print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") - return counter - - def clear_test_results(self): - self.resulting_stations = {} - self.resulting_endpoints = {} - super().clear_test_results() - #super(StaConnect, self).clear_test_results().test_results.clear() - - def setup(self): - self.clear_test_results() - self.check_connect() - upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False) - - if upstream_json is None: - self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True) - return False - - if upstream_json['interface']['ip'] == "0.0.0.0": - if self.debug: - pprint.pprint(upstream_json) - self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True) - return False - - # remove old stations - print("Removing old stations") - for sta_name in self.station_names: - sta_url = self.get_station_url(sta_name) - response = self.json_get(sta_url) - if (response is not None) and (response["interface"] is not None): - for sta_name in self.station_names: - LFUtils.removePort(self.resource, sta_name, self.lfclient_url) - LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names) - - # Create stations and turn dhcp on - self.station_profile = self.localrealm.new_station_profile() - - if self.dut_security == WPA2: - self.station_profile.use_security(security_type="wpa2", ssid=self.dut_ssid, passwd=self.dut_passwd) - elif self.dut_security == OPEN: - self.station_profile.use_security(security_type="open", ssid=self.dut_ssid, passwd="[BLANK]") - elif self.dut_security == WPA: - self.station_profile.use_security(security_type="wpa", ssid=self.dut_ssid, passwd=self.dut_passwd) - elif self.dut_security == WEP: - self.station_profile.use_security(security_type="wep", ssid=self.dut_ssid, passwd=self.dut_passwd) - self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) - - print("Adding new stations ", end="") - self.station_profile.create(radio=self.radio, sta_names_=self.station_names, up_=False, debug=self.debug, suppress_related_commands_=True) - LFUtils.wait_until_ports_appear(self.lfclient_url, self.station_names, debug=self.debug) - - # Create UDP endpoints - self.l3_udp_profile = self.localrealm.new_l3_cx_profile() - self.l3_udp_profile.side_a_min_bps = 128000 - self.l3_udp_profile.side_b_min_bps = 128000 - self.l3_udp_profile.side_a_min_pdu = 1200 - self.l3_udp_profile.side_b_min_pdu = 1500 - self.l3_udp_profile.report_timer = 1000 - self.l3_udp_profile.name_prefix = "udp" - self.l3_udp_profile.create(endp_type="lf_udp", - side_a=list(self.localrealm.find_ports_like("test+")), - side_b="%d.%s" % (self.resource, self.upstream_port), - suppress_related_commands=True) - - # Create TCP endpoints - self.l3_tcp_profile = self.localrealm.new_l3_cx_profile() - self.l3_tcp_profile.side_a_min_bps = 128000 - self.l3_tcp_profile.side_b_min_bps = 56000 - self.l3_tcp_profile.name_prefix = "tcp" - self.l3_tcp_profile.report_timer = 1000 - self.l3_tcp_profile.create(endp_type="lf_tcp", - side_a=list(self.localrealm.find_ports_like("test+")), - side_b="%d.%s" % (self.resource, self.upstream_port), - suppress_related_commands=True) - - def start(self): - if self.station_profile is None: - self._fail("Incorrect setup") - pprint.pprint(self.station_profile) - if self.station_profile.up is None: - self._fail("Incorrect station profile, missing profile.up") - if self.station_profile.up == False: - print("\nBringing ports up...") - data = {"shelf": 1, - "resource": self.resource, - "port": "ALL", - "probe_flags": 1} - self.json_post("/cli-json/nc_show_ports", data) - self.station_profile.admin_up() - LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names) - - # station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl())) - duration = 0 - maxTime = 90 - ip = "0.0.0.0" - ap = "" - print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="") - connected_stations = {} - while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime): - duration += 3 - time.sleep(3) - print(".", end="") - for sta_name in self.station_names: - sta_url = self.get_station_url(sta_name) - station_info = self.json_get(sta_url + "?fields=port,ip,ap") - - # LFUtils.debug_printer.pprint(station_info) - if (station_info is not None) and ("interface" in station_info): - if "ip" in station_info["interface"]: - ip = station_info["interface"]["ip"] - if "ap" in station_info["interface"]: - ap = station_info["interface"]["ap"] - - if (ap == "Not-Associated") or (ap == ""): - if self.debug: - print(" -%s," % sta_name, end="") - else: - if ip == "0.0.0.0": - if self.debug: - print(" %s (0.0.0.0)" % sta_name, end="") - else: - connected_stations[sta_name] = sta_url - data = { - "shelf":1, - "resource": self.resource, - "port": "ALL", - "probe_flags": 1 - } - self.json_post("/cli-json/nc_show_ports", data) - - for sta_name in self.station_names: - sta_url = self.get_station_url(sta_name) - station_info = self.json_get(sta_url) # + "?fields=port,ip,ap") - if station_info is None: - print("unable to query %s" % sta_url) - self.resulting_stations[sta_url] = station_info - ap = station_info["interface"]["ap"] - ip = station_info["interface"]["ip"] - if (ap != "") and (ap != "Not-Associated"): - print(" %s +AP %s, " % (sta_name, ap), end="") - if self.dut_bssid != "": - if self.dut_bssid.lower() == ap.lower(): - self._pass(sta_name+" connected to BSSID: " + ap) - # self.test_results.append("PASSED: ) - # print("PASSED: Connected to BSSID: "+ap) - else: - self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap)) - else: - self._fail(sta_name+" did not connect to AP") - return False - - if ip == "0.0.0.0": - self._fail("%s did not get an ip. Ending test" % sta_name) - else: - self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip)) - - if self.passes() == False: - if self.cleanup_on_exit: - print("Cleaning up...") - self.remove_stations() - return False - - # start cx traffic - print("\nStarting CX Traffic") - self.l3_udp_profile.start_cx() - self.l3_tcp_profile.start_cx() - time.sleep(1) - # Refresh stats - self.l3_udp_profile.refresh_cx() - self.l3_tcp_profile.refresh_cx() - - def collect_endp_stats(self, endp_map): - print("Collecting Data") - fields="?fields=name,tx+bytes,rx+bytes" - for (cx_name, endps) in endp_map.items(): - try: - endp_url = "/endp/%s%s" % (endps[0], fields) - endp_json = self.json_get(endp_url) - self.resulting_endpoints[endp_url] = endp_json - ptest_a_tx = endp_json['endpoint']['tx bytes'] - ptest_a_rx = endp_json['endpoint']['rx bytes'] - - #ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"]) - endp_url = "/endp/%s%s" % (endps[1], fields) - endp_json = self.json_get(endp_url) - self.resulting_endpoints[endp_url] = endp_json - - ptest_b_tx = endp_json['endpoint']['tx bytes'] - ptest_b_rx = endp_json['endpoint']['rx bytes'] - - self.compare_vals("testTCP-A TX", ptest_a_tx) - self.compare_vals("testTCP-A RX", ptest_a_rx) - - self.compare_vals("testTCP-B TX", ptest_b_tx) - self.compare_vals("testTCP-B RX", ptest_b_rx) - - except Exception as e: - self.error(e) - - - def stop(self): - # stop cx traffic - print("Stopping CX Traffic") - self.l3_udp_profile.stop_cx() - self.l3_tcp_profile.stop_cx() - - # Refresh stats - print("\nRefresh CX stats") - self.l3_udp_profile.refresh_cx() - self.l3_tcp_profile.refresh_cx() - - print("Sleeping for 5 seconds") - time.sleep(5) - - # get data for endpoints JSON - self.collect_endp_stats(self.l3_udp_profile.created_cx) - self.collect_endp_stats(self.l3_tcp_profile.created_cx) - # print("\n") - - def cleanup(self): - # remove all endpoints and cxs - if self.cleanup_on_exit: - for sta_name in self.station_names: - LFUtils.removePort(self.resource, sta_name, self.lfclient_url) - curr_endp_names = [] - removeCX(self.lfclient_url, self.l3_udp_profile.get_cx_names()) - removeCX(self.lfclient_url, self.l3_tcp_profile.get_cx_names()) - for (cx_name, endp_names) in self.l3_udp_profile.created_cx.items(): - curr_endp_names.append(endp_names[0]) - curr_endp_names.append(endp_names[1]) - for (cx_name, endp_names) in self.l3_tcp_profile.created_cx.items(): - curr_endp_names.append(endp_names[0]) - curr_endp_names.append(endp_names[1]) - removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug) - -# ~class - - -# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -def main(): - lfjson_host = "localhost" - lfjson_port = 8080 - parser = argparse.ArgumentParser( - description="""LANforge Unit Test: Connect Station to AP -Example: -./sta_connect2.py --dest 192.168.100.209 --dut_ssid OpenWrt-2 --dut_bssid 24:F5:A2:08:21:6C -""") - parser.add_argument("-d", "--dest", type=str, help="address of the LANforge GUI machine (localhost is default)") - parser.add_argument("-o", "--port", type=int, help="IP Port the LANforge GUI is listening on (8080 is default)") - parser.add_argument("-u", "--user", type=str, help="TBD: credential login/username") - parser.add_argument("-p", "--passwd", type=str, help="TBD: credential password") - parser.add_argument("--resource", type=str, help="LANforge Station resource ID to use, default is 1") - parser.add_argument("--upstream_resource", type=str, help="LANforge Ethernet port resource ID to use, default is 1") - parser.add_argument("--upstream_port", type=str, help="LANforge Ethernet port name, default is eth2") - parser.add_argument("--radio", type=str, help="LANforge radio to use, default is wiphy0") - parser.add_argument("--sta_mode", type=str, - help="LANforge station-mode setting (see add_sta LANforge CLI documentation, default is 0 (auto))") - parser.add_argument("--dut_ssid", type=str, help="DUT SSID") - parser.add_argument("--dut_security", type=str, help="DUT security: openLF, wpa, wpa2, wpa3") - parser.add_argument("--dut_passwd", type=str, help="DUT PSK password. Do not set for OPEN auth") - parser.add_argument("--dut_bssid", type=str, help="DUT BSSID to which we expect to connect.") - parser.add_argument("--debug", type=str, help="enable debugging") - - args = parser.parse_args() - if args.dest is not None: - lfjson_host = args.dest - if args.port is not None: - lfjson_port = args.port - - on_flags = [ 1, "1", "on", "yes", "true" ] - debug_v = False - if args.debug is not None: - if args.debug in on_flags: - debug_v = True - - staConnect = StaConnect2(lfjson_host, lfjson_port, - debug_=True, - _exit_on_fail=True, - _exit_on_error=False) - staConnect.station_names = [ "sta0000" ] - if args.user is not None: - staConnect.user = args.user - if args.passwd is not None: - staConnect.passwd = args.passwd - if args.sta_mode is not None: - staConnect.sta_mode = args.sta_mode - if args.upstream_resource is not None: - staConnect.upstream_resource = args.upstream_resource - if args.upstream_port is not None: - staConnect.upstream_port = args.upstream_port - if args.radio is not None: - staConnect.radio = args.radio - if args.resource is not None: - staConnect.resource = args.resource - if args.dut_ssid is not None: - staConnect.dut_ssid = args.dut_ssid - if args.dut_passwd is not None: - staConnect.dut_passwd = args.dut_passwd - if args.dut_bssid is not None: - staConnect.dut_bssid = args.dut_bssid - if args.dut_security is not None: - staConnect.dut_security = args.dut_security - - # staConnect.cleanup() - staConnect.setup() - staConnect.start() - print("napping %f sec" % staConnect.runtime_secs) - time.sleep(staConnect.runtime_secs) - staConnect.stop() - run_results = staConnect.get_result_list() - is_passing = staConnect.passes() - if is_passing == False: - print("FAIL: Some tests failed") - else: - print("PASS: All tests pass") - print(staConnect.get_all_message()) - - staConnect.cleanup() - -# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -if __name__ == "__main__": - main() diff --git a/py-scripts/tip-cicd-sanity/testrail_api.py b/py-scripts/tip-cicd-sanity/testrail_api.py index ecabe72b..81c62802 100644 --- a/py-scripts/tip-cicd-sanity/testrail_api.py +++ b/py-scripts/tip-cicd-sanity/testrail_api.py @@ -2,6 +2,12 @@ """ +#################################################################### +# Custom version of testrail_api module +# +# Used by Nightly_Sanity ########################################### +#################################################################### + import base64 import json