diff --git a/py-json/LANforge/lfcli_base.py b/py-json/LANforge/lfcli_base.py index 68c89c06..40935c6d 100644 --- a/py-json/LANforge/lfcli_base.py +++ b/py-json/LANforge/lfcli_base.py @@ -108,6 +108,9 @@ class LFCliBase: fail_messages = self.get_failed_result_list() return "\n".join(fail_messages) + def get_all_message(self): + return "\n".join(self.test_results) + def passes(self): pass_counter = 0 fail_counter = 0 diff --git a/py-scripts/cicd_testrail.py b/py-scripts/cicd_testrail.py new file mode 100644 index 00000000..1cc2f2a9 --- /dev/null +++ b/py-scripts/cicd_testrail.py @@ -0,0 +1,201 @@ +"""TestRail API binding for Python 3.x. + +""" + +import base64 +import json + +import requests +from pprint import pprint +import os +tr_user = os.getenv('TR_USER') +tr_pw=os.getenv('TR_PW') + + + +class APIClient: + def __init__(self, base_url): + self.user = tr_user + self.password = tr_pw + if not base_url.endswith('/'): + base_url += '/' + self.__url = base_url + 'index.php?/api/v2/' + + + def send_get(self, uri, filepath=None): + """Issue a GET request (read) against the API. + + Args: + uri: The API method to call including parameters, e.g. get_case/1. + filepath: The path and file name for attachment download; used only + for 'get_attachment/:attachment_id'. + + Returns: + A dict containing the result of the request. + """ + return self.__send_request('GET', uri, filepath) + + def send_post(self, uri, data): + """Issue a POST request (write) against the API. + + Args: + uri: The API method to call, including parameters, e.g. add_case/1. + data: The data to submit as part of the request as a dict; strings + must be UTF-8 encoded. If adding an attachment, must be the + path to the file. + + Returns: + A dict containing the result of the request. + """ + return self.__send_request('POST', uri, data) + + def __send_request(self, method, uri, data): + url = self.__url + uri + + auth = str( + base64.b64encode( + bytes('%s:%s' % (self.user, self.password), 'utf-8') + ), + 'ascii' + ).strip() + headers = {'Authorization': 'Basic ' + auth} + print("Method =" , method) + + if method == 'POST': + if uri[:14] == 'add_attachment': # add_attachment API method + files = {'attachment': (open(data, 'rb'))} + response = requests.post(url, headers=headers, files=files) + files['attachment'].close() + else: + headers['Content-Type'] = 'application/json' + payload = bytes(json.dumps(data), 'utf-8') + response = requests.post(url, headers=headers, data=payload) + else: + headers['Content-Type'] = 'application/json' + response = requests.get(url, headers=headers) + print("headers = ", headers) + print("resonse=", response) + print("response code =", response.status_code) + + if response.status_code > 201: + + try: + error = response.json() + except: # response.content not formatted as JSON + error = str(response.content) + #raise APIError('TestRail API returned HTTP %s (%s)' % (response.status_code, error)) + print('TestRail API returned HTTP %s (%s)' % (response.status_code, error)) + return + else: + print(uri[:15]) + if uri[:15] == 'get_attachments': # Expecting file, not JSON + try: + print('opening file') + print (str(response.content)) + open(data, 'wb').write(response.content) + print('opened file') + return (data) + except: + return ("Error saving attachment.") + else: + + try: + return response.json() + except: # Nothing to return + return {} + def get_project_id(self, project_name): + "Get the project ID using project name" + project_id = None + projects = client.send_get('get_projects') + pprint(projects) + for project in projects: + if project['name']== project_name: + project_id = project['id'] + # project_found_flag=True + break + print("project Id =",project_id) + return project_id + + def get_run_id(self, test_run_name): + "Get the run ID using test name and project name" + run_id = None + project_id = client.get_project_id(project_name='WLAN') + + try: + test_runs = client.send_get('get_runs/%s' % (project_id)) + print("------------TEST RUNS----------") + pprint(test_runs) + + except Exception: + print + 'Exception in update_testrail() updating TestRail.' + return None + else: + for test_run in test_runs: + if test_run['name'] == test_run_name: + run_id = test_run['id'] + print("run Id in Test Runs=",run_id) + break + return run_id + + def update_testrail(self, case_id, run_id, status_id, msg): + "Update TestRail for a given run_id and case_id" + update_flag = False + # Get the TestRail client account details + # Update the result in TestRail using send_post function. + # Parameters for add_result_for_case is the combination of runid and case id. + # status_id is 1 for Passed, 2 For Blocked, 4 for Retest and 5 for Failed + #status_id = 1 if result_flag is True else 5 + + print("result status is = ", status_id) + print("case id=", case_id) + print("run id passed to update is ", run_id, case_id) + if run_id is not None: + try: + result = client.send_post( + 'add_result_for_case/%s/%s' % (run_id, case_id), + {'status_id': status_id, 'comment': msg}) + print("result in post",result) + except Exception: + print + 'Exception in update_testrail() updating TestRail.' + + else: + print + 'Updated test result for case: %s in test run: %s with msg:%s' % (case_id, run_id, msg) + + return update_flag + + + +client: APIClient = APIClient('https://telecominfraproject.testrail.com') + +''' +case = client.send_get('get_case/1') +print("---------TEST CASE 1---------") +pprint(case) +case = client.send_get('get_case/2') +print("---------TEST CASE 2---------") +pprint(case) +print ("----------TEST Project ID----------") +proj_id = client.get_project_id(project_name= "WLAN") +pprint(proj_id) + +#REST API POSTMAN PROJECT +projId = client.get_project_id(project_name= "REST-API-POSTMAN") +pprint("REST API POSTMAN PROJECT ID IS :", projId) + +#print("---------TEST RUN ID-----------") +#rid = client.get_run_id(test_run_name='Master',project_name='WLAN') +rid=client.get_run_id(test_run_name= 'Master-Run3') +pprint(rid) + +result: bool= client.update_testrail(case_id = 1, run_id=rid, status_id = 5, msg ='Test Failed') + +#result = client.send_get('get_attachment/:1', '/Users/syamadevi/Desktop/syama/python-test/TestRail/testreport.pdf') +#print(result) +#project_report= client.send_get("get_reports/:%s" %proj_id) +#print(project_report) +''' +class APIError(Exception): + pass diff --git a/py-scripts/cicd_testrailAndInfraSetup.py b/py-scripts/cicd_testrailAndInfraSetup.py new file mode 100644 index 00000000..f950446b --- /dev/null +++ b/py-scripts/cicd_testrailAndInfraSetup.py @@ -0,0 +1,402 @@ + +import base64 +import urllib.request +from bs4 import BeautifulSoup +import ssl +import subprocess, os +from artifactory import ArtifactoryPath +import tarfile +import paramiko +from paramiko import SSHClient +from scp import SCPClient +import os +import pexpect +from pexpect import pxssh +import sys +import paramiko +from scp import SCPClient + +local_dir=os.getenv('LOG_DIR') +print("Local Directory where all files will be copied and logged", local_dir) +cicd_user=os.getenv('CICD_USER') +print("cicd_user = ", cicd_user) +cicd_pw=os.getenv('CICD_PW') +print("cicd pw =",cicd_pw) +ap_pw=os.getenv('AP_PW') +ap_user=os.getenv('AP_USER') +tr_user=os.getenv('TR_USER') +print("Testrail user id = ", tr_user) +tr_pw=os.getenv('TR_PW') +print ("Testrail password =", tr_pw) + + +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit(1) +if 'py-json' not in sys.path: + sys.path.append('../py-json') + +from LANforge.LFUtils import * +# if you lack __init__.py in this directory you will not find sta_connect module# +import sta_connect +import testrail_api +from sta_connect import StaConnect +from testrail_api import APIClient + +client: APIClient = APIClient('https://telecominfraproject.testrail.com') +client.user = tr_user +client.password = tr_pw + + +print('Beginning file download with requests') + +class GetBuild: + def __init__(self): + self.user = cicd_user + self.password = cicd_pw + ssl._create_default_https_context = ssl._create_unverified_context + + def get_latest_image(self,url): + + auth = str( + base64.b64encode( + bytes('%s:%s' % (cicd_user,cicd_pw ), 'utf-8') + ), + 'ascii' + ).strip() + headers = {'Authorization': 'Basic ' + auth} + + ''' FIND THE LATEST FILE NAME''' + print(url) + req = urllib.request.Request(url, headers=headers) + response = urllib.request.urlopen(req) + html = response.read() + soup = BeautifulSoup(html, features="html.parser") + last_link = soup.find_all('a', href=True)[-1] + latest_file=last_link['href'] + + filepath = local_dir + os.chdir(filepath) + #file_url = url + latest_file + + ''' Download the binary file from Jfrog''' + path = ArtifactoryPath(url,auth=(cicd_user, cicd_pw)) + path.touch() + for file in path: + print('File =', file) + + path = ArtifactoryPath(file, auth=(cicd_user, cicd_pw)) + print("file to be downloaded :" ,latest_file) + print("File Path:",file) + with path.open() as des: + with open(latest_file, "wb") as out: + out.write(des.read()) + des.close() + print("Extract the tar.gz file and upgrade the AP ") + housing_tgz = tarfile.open(latest_file) + housing_tgz.extractall() + housing_tgz.close() + return "pass" + print("Extract the tar file, and copying the file to Linksys AP directory") + #with open("/Users/syamadevi/Desktop/syama/ea8300/ap_sysupgrade_output.log", "a") as output: + # subprocess.call("scp /Users/syamadevi/Desktop/syama/ea8300/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin root@192.100.1.1:/tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin",shell=True, stdout=output, + # stderr=output) + + print('SSH to Linksys and upgrade the file') + + ''' + + ssh = SSHClient() + ssh.load_system_host_keys() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(hostname=ap_ip, + port='22', + username=ap_user, + password=ap_pw, + look_for_keys=False, + pkey='load_key_if_relevant') + + # SCPCLient takes a paramiko transport as its only argument + scp = SCPClient(ssh.get_transport()) + + scp.put('test.txt', 'testD.txt') + scp.close() + + + stdin, stdout, stderr = ssh.exec_command('sysupgrade /tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin') + + for line in stdout: + print (line.strip('\n')) + client.close() + ''' + + def run_opensyncgw_in_docker(self): + #my_env = os.environ.copy() + #my_env["userpass"] = user_password + #my_command = 'python --version' + #subprocess.Popen('echo', env=my_env) + with open(local_dir +"docker_jfrog_login.log", "a") as output: + subprocess.call("docker login --username" + cicd_user + "--password" + cicd_pw + " https://tip-tip-wlan-cloud-docker-repo.jfrog.io", shell=True, stdout=output, + stderr=output) + with open(local_dir +"opensyncgw_upgrade.log", "a") as output: + subprocess.call("docker pull tip-tip-wlan-cloud-docker-repo.jfrog.io/opensync-gateway-and-mqtt:0.0.1-SNAPSHOT", shell=True, stdout=output, + stderr=output) + with open(local_dir+"opensyncgw.log", "a") as output: + subprocess.call("docker run --rm -i -p 1883:1883 -p 6640:6640 -p 6643:6643 -p 4043:4043 \ + -v ~/mosquitto/data:/mosquitto/data \ + -v ~/mosquitto/log:/mosquitto/log \ + -v ~/wlan-pki-cert-scripts:/opt/tip-wlan/certs \ + -v ~/app/log:/app/logs \ + -v ~//app/config:/app/config \ + -e OVSDB_CONFIG_FILE='/app/config/config_2_ssids.json' \ + tip-tip-wlan-cloud-docker-repo.jfrog.io/opensync-gateway-and-mqtt:0.0.1-SNAPSHOT",shell=True, stdout=output, + stderr=output) + print("opensync Gateway is running") + return "pass" + + def run_opensyncgw_in_aws(self): + try: + s = pxssh.pxssh() + + os.chdir(local_dir) + print("AWS OPENSYNC GW UPGRADE VIA HELM") + print( + 'Helm upgrades the latest image in the GW if a new image is found from jfrog and the AWS gateway is not upto date ') + # makesure the client key file is in the fame directory to login to AWS VM + s.login(aws_host, aws_user, ssh_key='id_key.pem') + s.sendline('kubectl get pods') + + # run a command + s.prompt() # match the prompt + print(s.before) # print everything before the prompt. + s.sendline( + 'helm upgrade tip-wlan wlan-cloud-helm/tip-wlan/ -n default -f wlan-cloud-helm/tip-wlan/resources/environments/dev-amazon.yaml') + s.prompt() # match the prompt + print(s.before) # print everything before the prompt. + s.sendline('kubectl get pods') + + # run a command + s.prompt() # match the prompt + print(s.before) # print everything before the prompt. + s.logout() + return "pass" + + except pxssh.ExceptionPxssh as e: + print("ALERT !!!!!! pxssh failed on login.") + print(e) + + +class openwrt_linksys: + + def ap_upgrade(src,user2,host2,tgt,pwd,opts='', timeout=60): + ''' Performs the scp command. Transfers file(s) from local host to remote host ''' + print("AP Model getting upgarded is :", apModel) + if apModel == "ecw5410": + ap_firmware = 'openwrt-ipq806x-generic-edgecore_ecw5410-squashfs-nand-sysupgrade.bin' + AP_IP = '10.10.10.207' + else: + if apModel == "ea8300": + ap_firmware = 'openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin' + AP_IP = '10.10.10.208' + host2 = AP_IP + src = src+ ap_firmware + print("src =", src) + print("AP IP ", AP_IP) + print("AP USER =", ap_user) + print("AP PASSWORD =", ap_pw) + cmd = f'''/bin/bash -c "scp {opts} {src} {user2}@{AP_IP}:{tgt}"''' + print("Executing the following cmd:",cmd,sep='\n') + + tmpFl = '/tmp/scp.log' + fp = open(tmpFl,'wb') + print(tmpFl) + childP = pexpect.spawn(cmd,timeout=timeout) + try: + childP.sendline(cmd) + childP.expect([f"{user2}@{host2}'s password:"]) + childP.sendline(pwd) + childP.logfile = fp + childP.expect(pexpect.EOF) + childP.close() + fp.close() + fp = open(tmpFl,'r') + stdout = fp.read() + fp.close() + + if childP.exitstatus != 0: + raise Exception(stdout) + except KeyboardInterrupt: + childP.close() + fp.close() + return + print(stdout) + + try: + s = pxssh.pxssh() + s.login(host2, user2, pwd) + s.sendline('sysupgrade /tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin') + s.prompt() # match the prompt + print(s.before) # print everything before the prompt. + s.sendline('service opensync restart') + s.prompt() # match the prompt + print(s.before) # print everything before the prompt. + s.logout() + return "pass" + except pxssh.ExceptionPxssh as e: + print("ALERT !!!!!! pxssh failed on login.") + print(e) + + +class RunTest: + def TestCase_938(self, rid): + '''SINGLE CLIENT CONNECTIVITY''' + staConnect = StaConnect("10.10.10.201", 8080, _debugOn=False) + staConnect.sta_mode = 0 + staConnect.upstream_resource = 1 + staConnect.upstream_port = "eth2" + staConnect.radio = "wiphy1" + staConnect.resource = 1 + staConnect.dut_ssid = "autoProvisionedSsid-5u" + #staConnect.dut_passwd = "4C0nnectUS!" + staConnect.dut_passwd = "[BLANK]" + staConnect.dut_security = "open" + staConnect.station_names = ["sta01010"] + staConnect.runtime_secs = 30 + staConnect.cleanup_on_exit = False + staConnect.run() + run_results = staConnect.get_result_list() + for result in run_results: + print("test result: " + result) + #result = 'pass' + print("Single Client Connectivity :",staConnect.passes) + if staConnect.passes() == True: + client.update_testrail(case_id=938, run_id=rid, status_id=1, msg='client Connectivity to 5GHZ Open SSID is Passed ') + else: + client.update_testrail(case_id=938, run_id=rid, status_id=5, msg='client connectivity to 5GHZ OPEN SSID is Failed') + + def TestCase_941(self, rid): + #MULTI CLIENT CONNECTIVITY + staConnect = StaConnect("10.10.10.201", 8080, _debugOn=False) + staConnect.sta_mode = 0 + staConnect.upstream_resource = 1 + staConnect.upstream_port = "eth2" + staConnect.radio = "wiphy1" + staConnect.resource = 1 + staConnect.dut_ssid = "autoProvisionedSsid-5u" + # staConnect.dut_passwd = "4C0nnectUS!" + staConnect.dut_passwd = "[BLANK]" + staConnect.dut_security = "open" + staConnect.station_names = ["sta0020", 'sta0021', 'sta0022', 'sta0023'] + staConnect.runtime_secs = 30 + staConnect.cleanup_on_exit = False + staConnect.run() + run_results = staConnect.get_result_list() + for result in run_results: + print("test result: " + result) + if staConnect.passes() == True: + client.update_testrail(case_id=941, run_id=rid, status_id=1, + msg='client Connectivity to 5GHZ Open SSID is Passed ') + else: + client.update_testrail(case_id=941, run_id=rid, status_id=5, + msg='client connectivity to 5GHZ OPEN SSID is Failed') + + def TestCase_939(self, rid): + ''' Client Count in MQTT Log''' + try: + print("Counting clients in MQTT") + s = pxssh.pxssh() + #aws_host = os.getenv(AWS_HOST) + #aws_user=os.getenv(AWS_USER) + os.chdir(local_dir) + # makesure the client key file is in the fame directory to login to AWS VM + s.login(aws_host,aws_user,ssh_key='id_key.pem') + s.sendline('kubectl cp tip-wlan-opensync-gw-static-f795d45-ctb5z:/app/logs/mqttData.log mqttData.log') + # run a command + s.prompt() # match the prompt + print(s.before) # print everything before the prompt. + s.sendline() + s.logout() + #return "pass" + print(aws_host, aws_user) + ssh = paramiko.SSHClient() + ssh.load_system_host_keys() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + k = paramiko.RSAKey.from_private_key_file('id_key.pem') + ssh.connect(aws_host, username=aws_user, pkey=k) + print("Connected") + scp = SCPClient(ssh.get_transport()) + scp.get("mqttData.log") + scp.close() + # Get the client Count + ClientCount = subprocess.getoutput( + 'grep \'{\"nodeID\"\' mqttData.log | grep clientList | tail -n 1 |cut -d \'=\' -f 3 |jq --raw-output \'{ nodeID , ncl : [ .clients[]| [ .clientList[] | select ( . | .connected == true ) |.macAddress ] | length ] } | .ncl | add |@text\'') + print("client count =", ClientCount) + if (int(ClientCount) >= 1): + client.update_testrail(case_id=939, run_id=rid, status_id=1, + msg=ClientCount + ' Client/Clients Connected ') + else: + client.update_testrail(case_id=939, run_id=rid, status_id=5, + msg='No Client Connected') + except pxssh.ExceptionPxssh as e: + print("ALERT !!!!!! pxssh failed on login.") + print(e) + + +params = { + 'src': local_dir, + 'user2': ap_user, + 'host2': "x.x.x.x", + 'tgt': '/tmp/', + 'pwd': ap_pw, + 'opts': '' +} +apModel= "ecw5410" + + +url = 'https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/' +url = url + apModel +projId = client.get_project_id(project_name= 'WLAN') +print("TIP WLAN Project ID Is :", projId) + +rid = client.get_run_id(test_run_name= 'TIP-DEMO4') +print(rid) +Test: RunTest = RunTest() +Build: GetBuild = GetBuild() +binary_fetch_result = Build.get_latest_image(url) +print("UPDATING TEST RAIL WITH TEST RESULT FOR CASE_ID 940: Download latest openwrt image from Jfrog") + +if binary_fetch_result == 'pass': + client.update_testrail(case_id=940, run_id=rid, status_id=1, msg='latest firmware downloaded') +else: + client.update_testrail(case_id=940, run_id=rid, status_id=5, msg='Firmware Download failed') + +sleep(10) +print("Upgrading AP with latest image downloaded") +ap_upgrade_result = openwrt_linksys.ap_upgrade(**params) +sleep(10) +print("UPDATING TEST RAIL WITH TEST RESULT FOR CASE_ID 937") +sleep(10) +if ap_upgrade_result == 'pass': + client.update_testrail(case_id=937, run_id=rid, status_id=1, msg='AP upgraded with latest Firmware') +else: + client.update_testrail(case_id=937, run_id=rid, status_id=5, msg='Firmware upgrade failed in AP ') +print("Upgrading AWS Opensync gateway with latest docker image from Jfrog") +OpensyncGw_UpgResult = Build.run_opensyncgw_in_aws() +if OpensyncGw_UpgResult == 'pass': + client.update_testrail(case_id=936, run_id=rid, status_id=1, msg='Opensync GW upgraded with latest Firmware') +else: + client.update_testrail(case_id=936, run_id=rid, status_id=5, msg='Firmware upgrade failed in Opensync Gateway') +sleep(10) + +print("Executing TestCase 938: single Client Connectivity test") +Test.TestCase_938(rid) +sleep(10) +print("Executing TestCase 941: Multi Client Connectivity test") +Test.TestCase_941(rid) +sleep(10) +print("Executing TestCase 939:Counting The number of Clients Connected from MQTT") +Test.TestCase_939(rid) + + + + diff --git a/py-scripts/sta_connect.py b/py-scripts/sta_connect.py index a5c51a1a..af194693 100755 --- a/py-scripts/sta_connect.py +++ b/py-scripts/sta_connect.py @@ -26,7 +26,7 @@ from realm import Realm OPEN="open" WEP="wep" WPA="wpa" -WPA2="wpa" +WPA2="wpa2" MODE_AUTO=0 class StaConnect(LFCliBase): @@ -476,7 +476,7 @@ Example: lfjson_port = args.port staConnect = StaConnect(lfjson_host, lfjson_port) - + staConnect.station_names = [ "sta0000" ] if args.user is not None: staConnect.user = args.user if args.passwd is not None: @@ -499,8 +499,17 @@ Example: staConnect.dut_ssid = args.dut_ssid staConnect.run() + run_results = staConnect.get_result_list() + + is_passing = staConnect.passes() + if is_passing == False: + print("FAIL: Some tests failed") + else: + print("PASS: All tests pass") + + print(staConnect.get_all_message()) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -