mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
fixed merge conflicts
Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
547
py-scripts/scripts_deprecated/cicd_TipIntegration.py
Executable file
547
py-scripts/scripts_deprecated/cicd_TipIntegration.py
Executable file
@@ -0,0 +1,547 @@
|
||||
# import os
|
||||
# import sys
|
||||
# import base64
|
||||
# import urllib.request
|
||||
# from bs4 import BeautifulSoup
|
||||
# import ssl
|
||||
# import subprocess
|
||||
# from artifactory import ArtifactoryPath
|
||||
# import tarfile
|
||||
# import paramiko
|
||||
# from paramiko import SSHClient
|
||||
# from scp import SCPClient
|
||||
# import pexpect
|
||||
# from pexpect import pxssh
|
||||
# import paramiko
|
||||
# from scp import SCPClient
|
||||
# import pprint
|
||||
# from pprint import pprint
|
||||
# from os import listdir
|
||||
# import re
|
||||
#
|
||||
# # For finding files
|
||||
# # https://stackoverflow.com/questions/3207219/how-do-i-list-all-files-of-a-directory
|
||||
# import glob
|
||||
#
|
||||
# external_results_dir = / var / tmp / lanforge
|
||||
#
|
||||
# local_dir = os.getenv('LOG_DIR')
|
||||
# print("Local Directory where all files will be copied and logged", local_dir)
|
||||
# cicd_user = os.getenv('CICD_USER')
|
||||
# print("cicd_user = ", cicd_user)
|
||||
# cicd_pw = os.getenv('CICD_PW')
|
||||
# print("cicd pw =", cicd_pw)
|
||||
# ap_pw = os.getenv('AP_PW')
|
||||
# ap_user = os.getenv('AP_USER')
|
||||
# tr_user = os.getenv('TR_USER')
|
||||
# print("Testrail user id = ", tr_user)
|
||||
# tr_pw = os.getenv('TR_PW')
|
||||
# print("Testrail password =", tr_pw)
|
||||
# aws_host = '3.96.56.0'
|
||||
# aws_user = 'ubuntu'
|
||||
#
|
||||
# if sys.version_info[0] != 3:
|
||||
# print("This script requires Python 3")
|
||||
# exit(1)
|
||||
# if 'py-json' not in sys.path:
|
||||
# sys.path.append('../py-json')
|
||||
#
|
||||
# from LANforge.LFUtils import *
|
||||
# # if you lack __init__.py in this directory you will not find sta_connect module#
|
||||
# import sta_connect
|
||||
# import testrail_api
|
||||
# from sta_connect import StaConnect
|
||||
# from testrail_api import APIClient
|
||||
#
|
||||
# client: APIClient = APIClient('https://telecominfraproject.testrail.com')
|
||||
# client.user = tr_user
|
||||
# client.password = tr_pw
|
||||
#
|
||||
# print('Beginning file download with requests')
|
||||
#
|
||||
#
|
||||
# class GetBuild:
|
||||
# def __init__(self):
|
||||
# self.user = cicd_user
|
||||
# self.password = cicd_pw
|
||||
# ssl._create_default_https_context = ssl._create_unverified_context
|
||||
#
|
||||
# def get_latest_image(self, url):
|
||||
#
|
||||
# auth = str(
|
||||
# base64.b64encode(
|
||||
# bytes('%s:%s' % (cicd_user, cicd_pw), 'utf-8')
|
||||
# ),
|
||||
# 'ascii'
|
||||
# ).strip()
|
||||
# headers = {'Authorization': 'Basic ' + auth}
|
||||
#
|
||||
# ''' FIND THE LATEST FILE NAME'''
|
||||
# print(url)
|
||||
# req = urllib.request.Request(url, headers=headers)
|
||||
# response = urllib.request.urlopen(req)
|
||||
# html = response.read()
|
||||
# soup = BeautifulSoup(html, features="html.parser")
|
||||
# last_link = soup.find_all('a', href=True)[-1]
|
||||
# latest_file = last_link['href']
|
||||
#
|
||||
# filepath = local_dir
|
||||
# os.chdir(filepath)
|
||||
# # file_url = url + latest_file
|
||||
#
|
||||
# ''' Download the binary file from Jfrog'''
|
||||
# path = ArtifactoryPath(url, auth=(cicd_user, cicd_pw))
|
||||
# path.touch()
|
||||
# for file in path:
|
||||
# print('File =', file)
|
||||
#
|
||||
# path = ArtifactoryPath(file, auth=(cicd_user, cicd_pw))
|
||||
# print("file to be downloaded :", latest_file)
|
||||
# print("File Path:", file)
|
||||
# with path.open() as des:
|
||||
# with open(latest_file, "wb") as out:
|
||||
# out.write(des.read())
|
||||
# des.close()
|
||||
# print("Extract the tar.gz file and upgrade the AP ")
|
||||
# housing_tgz = tarfile.open(latest_file)
|
||||
# housing_tgz.extractall()
|
||||
# housing_tgz.close()
|
||||
# return "pass"
|
||||
# print("Extract the tar file, and copying the file to Linksys AP directory")
|
||||
# # with open("/Users/syamadevi/Desktop/syama/ea8300/ap_sysupgrade_output.log", "a") as output:
|
||||
# # subprocess.call("scp /Users/syamadevi/Desktop/syama/ea8300/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin root@192.100.1.1:/tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin",shell=True, stdout=output,
|
||||
# # stderr=output)
|
||||
#
|
||||
# print('SSH to Linksys and upgrade the file')
|
||||
#
|
||||
# '''
|
||||
#
|
||||
# ssh = SSHClient()
|
||||
# ssh.load_system_host_keys()
|
||||
# ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
# ssh.connect(hostname='192.100.1.1',
|
||||
# port='22',
|
||||
# username='root',
|
||||
# password='Dadun123$',
|
||||
# look_for_keys=False,
|
||||
# pkey='load_key_if_relevant')
|
||||
#
|
||||
# # SCPCLient takes a paramiko transport as its only argument
|
||||
# scp = SCPClient(ssh.get_transport())
|
||||
#
|
||||
# scp.put('test.txt', 'testD.txt')
|
||||
# scp.close()
|
||||
#
|
||||
#
|
||||
#
|
||||
# # client = paramiko.SSHClient()
|
||||
# #client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
# #client.connect('192.100.1.1', username='syama', password='Dadun123$')
|
||||
#
|
||||
# stdin, stdout, stderr = ssh.exec_command('sysupgrade /tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin')
|
||||
#
|
||||
# for line in stdout:
|
||||
# print (line.strip('\n'))
|
||||
# client.close()
|
||||
# '''
|
||||
#
|
||||
# def run_opensyncgw_in_docker(self):
|
||||
# # user_password = 'fepv6nj9guCPeEHC'
|
||||
# # my_env = os.environ.copy()
|
||||
# # my_env["userpass"] = user_password
|
||||
# # my_command = 'python --version'
|
||||
# # subprocess.Popen('echo', env=my_env)
|
||||
# with open(local_dir + "docker_jfrog_login.log", "a") as output:
|
||||
# subprocess.call(
|
||||
# "docker login --username" + cicd_user + "--password" + cicd_pw + " https://tip-tip-wlan-cloud-docker-repo.jfrog.io",
|
||||
# shell=True, stdout=output,
|
||||
# stderr=output)
|
||||
# with open(local_dir + "opensyncgw_upgrade.log", "a") as output:
|
||||
# subprocess.call(
|
||||
# "docker pull tip-tip-wlan-cloud-docker-repo.jfrog.io/opensync-gateway-and-mqtt:0.0.1-SNAPSHOT",
|
||||
# shell=True, stdout=output,
|
||||
# stderr=output)
|
||||
# with open(local_dir + "opensyncgw.log", "a") as output:
|
||||
# subprocess.call("docker run --rm -i -p 1883:1883 -p 6640:6640 -p 6643:6643 -p 4043:4043 \
|
||||
# -v ~/mosquitto/data:/mosquitto/data \
|
||||
# -v ~/mosquitto/log:/mosquitto/log \
|
||||
# -v ~/wlan-pki-cert-scripts:/opt/tip-wlan/certs \
|
||||
# -v ~/app/log:/app/logs \
|
||||
# -v ~//app/config:/app/config \
|
||||
# -e OVSDB_CONFIG_FILE='/app/config/config_2_ssids.json' \
|
||||
# tip-tip-wlan-cloud-docker-repo.jfrog.io/opensync-gateway-and-mqtt:0.0.1-SNAPSHOT", shell=True, stdout=output,
|
||||
# stderr=output)
|
||||
# print("opensync Gateway is running")
|
||||
# return "pass"
|
||||
#
|
||||
# def run_opensyncgw_in_aws(self):
|
||||
# try:
|
||||
# s = pxssh.pxssh()
|
||||
#
|
||||
# os.chdir(local_dir)
|
||||
# print("AWS OPENSYNC GW UPGRADE VIA HELM")
|
||||
# print(
|
||||
# 'Helm upgrades the latest image in the GW if a new image is found from jfrog and the AWS gateway is not upto date ')
|
||||
# # makesure the client key file is in the fame directory to login to AWS VM
|
||||
# s.login(aws_host, aws_user, ssh_key='id_key.pem')
|
||||
# s.sendline('kubectl get pods')
|
||||
#
|
||||
# # run a command
|
||||
# s.prompt() # match the prompt
|
||||
# print(s.before) # print everything before the prompt.
|
||||
# s.sendline(
|
||||
# 'helm upgrade tip-wlan wlan-cloud-helm/tip-wlan/ -n default -f wlan-cloud-helm/tip-wlan/resources/environments/dev-amazon.yaml')
|
||||
# s.prompt() # match the prompt
|
||||
# print(s.before) # print everything before the prompt.
|
||||
# s.sendline('kubectl get pods')
|
||||
#
|
||||
# # run a command
|
||||
# s.prompt() # match the prompt
|
||||
# print(s.before) # print everything before the prompt.
|
||||
# s.logout()
|
||||
# return "pass"
|
||||
#
|
||||
# except pxssh.ExceptionPxssh as e:
|
||||
# print("ALERT !!!!!! pxssh failed on login.")
|
||||
# print(e)
|
||||
#
|
||||
#
|
||||
# class openwrt_ap:
|
||||
#
|
||||
# def ap_upgrade(src, user2, host2, tgt, pwd, opts='', timeout=60):
|
||||
# ''' Performs the scp command. Transfers file(s) from local host to remote host '''
|
||||
# print("AP Model getting upgarded is :", apModel)
|
||||
# if apModel == "ecw5410":
|
||||
# ap_firmware = 'openwrt-ipq806x-generic-edgecore_ecw5410-squashfs-nand-sysupgrade.bin'
|
||||
# AP_IP = '10.10.10.207'
|
||||
# else:
|
||||
# if apModel == "ea8300":
|
||||
# ap_firmware = 'openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin'
|
||||
# AP_IP = '10.10.10.208'
|
||||
# host2 = AP_IP
|
||||
# src = src + ap_firmware
|
||||
# print("src =", src)
|
||||
# print("AP IP ", AP_IP)
|
||||
# print("AP USER =", ap_user)
|
||||
# print("AP PASSWORD =", ap_pw)
|
||||
# cmd = f'''/bin/bash -c "scp {opts} {src} {user2}@{AP_IP}:{tgt}"'''
|
||||
# print("Executing the following cmd:", cmd, sep='\n')
|
||||
#
|
||||
# tmpFl = '/tmp/scp.log'
|
||||
# fp = open(tmpFl, 'wb')
|
||||
# print(tmpFl)
|
||||
# childP = pexpect.spawn(cmd, timeout=timeout)
|
||||
# try:
|
||||
# childP.sendline(cmd)
|
||||
# childP.expect([f"{user2}@{host2}'s password:"])
|
||||
# childP.sendline(pwd)
|
||||
# childP.logfile = fp
|
||||
# childP.expect(pexpect.EOF)
|
||||
# childP.close()
|
||||
# fp.close()
|
||||
# fp = open(tmpFl, 'r')
|
||||
# stdout = fp.read()
|
||||
# fp.close()
|
||||
#
|
||||
# if childP.exitstatus != 0:
|
||||
# raise Exception(stdout)
|
||||
# except KeyboardInterrupt:
|
||||
# childP.close()
|
||||
# fp.close()
|
||||
# return
|
||||
# print(stdout)
|
||||
#
|
||||
# try:
|
||||
# s = pxssh.pxssh()
|
||||
# s.login(host2, user2, pwd)
|
||||
# # s.sendline('sysupgrade /tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin&')
|
||||
# s.sendline('sysupgrade /tmp/openwrt-ipq806x-generic-edgecore_ecw5410-squashfs-nand-sysupgrade.bin&')
|
||||
# # s.logout()
|
||||
# # s.prompt() # match the prompt
|
||||
# print(s.before) # print everything before the prompt.
|
||||
# sleep(100)
|
||||
# # s.login(host2, user2, pwd)
|
||||
# s.prompt()
|
||||
# # os.system(f"scp {local_dir}/cacert.pem root@10.10.10.207:/usr/plume/certs/ca.pem")
|
||||
# # os.system(f"scp {local_dir}/clientcert.pem root@10.10.10.207:/usr/plume/certs/client.pem")
|
||||
# # os.system(f"scp {local_dir}/clientkey_dec.pem root@10.10.10.207:/usr/plume/certs/client_dec.key")
|
||||
# # s.sendline('service opensync restart')
|
||||
# # s.prompt() # match the prompt
|
||||
# # print(s.before) # print everything before the prompt.
|
||||
# s.logout()
|
||||
# return "pass"
|
||||
# except pxssh.ExceptionPxssh as e:
|
||||
# print("ALERT !!!!!! pxssh failed on login.")
|
||||
# print(e)
|
||||
#
|
||||
# def apCopyCert(src, user2, host2, tgt, pwd, opts='', timeout=60):
|
||||
#
|
||||
# print("Copying the AP Certs")
|
||||
# '''
|
||||
# s = pxssh.pxssh()
|
||||
# print(src, users2,pwd)
|
||||
# s.login(host2, user2, pwd)
|
||||
# s.prompt() # match the prompt
|
||||
# print("Copying ca.pem")
|
||||
# os.system(f"scp {src}/cacert.pem root@10.10.10.207:/usr/plume/certs/ca.pem")
|
||||
# print("Copying the client.pem")
|
||||
# os.system(f"scp {src}/clientcert.pem root@10.10.10.207:/usr/plume/certs/client.pem")
|
||||
# print("Copying the client_dec.key")
|
||||
# os.system(f"scp {src}/clientkey_dec.pem root@10.10.10.207:/usr/plume/certs/client_dec.key")
|
||||
# s.sendline('service opensync restart')
|
||||
# s.prompt() # match the prompt
|
||||
# print(s.before) # print everything before the prompt.
|
||||
# s.logout()
|
||||
# '''
|
||||
# cacert = src + "ca.pem"
|
||||
# clientcert = src + "client.pem"
|
||||
# clientkey = src + "client_dec.key"
|
||||
# tgt = "/usr/plume/certs"
|
||||
# ap_pw
|
||||
#
|
||||
# print("src =", src)
|
||||
# print("AP IP ", host2)
|
||||
# print("AP USER =", ap_user)
|
||||
# print("AP PASSWORD =", ap_pw)
|
||||
# # cmd = f'''/bin/bash -c "scp {opts} {src} {user2}@{AP_IP}:{tgt}"'''
|
||||
# # cmd = f'''/bin/bash -c "scp {opts} {cacert} {user2}@{AP_IP}:{tgt}"'''
|
||||
# # cmd = f'''/bin/bash -c "scp {opts} {clientcert} {user2}@{AP_IP}:{tgt}"'''
|
||||
# cmd = f'''/bin/bash -c "scp {opts} {cacert} {clientcert} {clientkey} {user2}@{host2}:{tgt}"'''
|
||||
# print("Executing the following cmd:", cmd, sep='\n')
|
||||
# tmpFl = '/tmp/cert.log'
|
||||
# fp = open(tmpFl, 'wb')
|
||||
# print(tmpFl)
|
||||
# childP = pexpect.spawn(cmd, timeout=timeout)
|
||||
# try:
|
||||
# childP.sendline(cmd)
|
||||
# childP.expect([f"{user2}@{host2}'s password:"])
|
||||
# childP.sendline(ap_pw)
|
||||
# childP.logfile = fp
|
||||
# childP.expect(pexpect.EOF)
|
||||
# fp.close()
|
||||
# fp = open(tmpFl, 'r')
|
||||
# stdout = fp.read()
|
||||
# fp.close()
|
||||
#
|
||||
# if childP.exitstatus != 0:
|
||||
# # raise Exception(stdout)
|
||||
# print("there is an excess status non 0")
|
||||
# except KeyboardInterrupt:
|
||||
# childP.close()
|
||||
# fp.close()
|
||||
# return
|
||||
# print(stdout)
|
||||
#
|
||||
# def restartGw(src, user2, host2, tgt, pwd, opts='', timeout=60):
|
||||
# print("Restarting opensync GW")
|
||||
# s = pxssh.pxssh()
|
||||
# s.login(host2, user2, pwd)
|
||||
# # s.sendline('sysupgrade /tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin&')
|
||||
# s.sendline('service opensync restart')
|
||||
# # s.logout()
|
||||
# # s.prompt() # match the prompt
|
||||
# print(s.before) # print everything before the prompt.
|
||||
# s.prompt()
|
||||
# s.logout()
|
||||
#
|
||||
#
|
||||
# class RunTest:
|
||||
# def TestCase_938(self, rid):
|
||||
# '''SINGLE CLIENT CONNECTIVITY'''
|
||||
# staConnect = StaConnect("10.10.10.201", 8080, _debugOn=False)
|
||||
# staConnect.sta_mode = 0
|
||||
# staConnect.upstream_resource = 1
|
||||
# staConnect.upstream_port = "eth2"
|
||||
# staConnect.radio = "wiphy1"
|
||||
# staConnect.resource = 1
|
||||
# staConnect.dut_ssid = "autoProvisionedSsid-5u"
|
||||
# # staConnect.dut_passwd = "4C0nnectUS!"
|
||||
# staConnect.dut_passwd = "12345678"
|
||||
# staConnect.dut_security = "wpa2"
|
||||
# staConnect.station_names = ["sta01010"]
|
||||
# staConnect.runtime_secs = 30
|
||||
# staConnect.cleanup_on_exit = True
|
||||
# staConnect.run()
|
||||
# run_results = staConnect.get_result_list()
|
||||
# for result in run_results:
|
||||
# print("test result: " + result)
|
||||
# # result = 'pass'
|
||||
# print("Single Client Connectivity :", staConnect.passes)
|
||||
# if staConnect.passes() == True:
|
||||
# client.update_testrail(case_id=938, run_id=rid, status_id=1,
|
||||
# msg='client Connectivity to 5GHZ Open SSID is Passed ')
|
||||
# else:
|
||||
# client.update_testrail(case_id=938, run_id=rid, status_id=5,
|
||||
# msg='client connectivity to 5GHZ OPEN SSID is Failed')
|
||||
#
|
||||
# def TestCase_941(self, rid):
|
||||
# # MULTI CLIENT CONNECTIVITY
|
||||
# staConnect = StaConnect("10.10.10.201", 8080, _debugOn=False)
|
||||
# staConnect.sta_mode = 0
|
||||
# staConnect.upstream_resource = 1
|
||||
# staConnect.upstream_port = "eth2"
|
||||
# staConnect.radio = "wiphy1"
|
||||
# staConnect.resource = 1
|
||||
# staConnect.dut_ssid = "autoProvisionedSsid-5u"
|
||||
# # staConnect.dut_passwd = "4C0nnectUS!"
|
||||
# staConnect.dut_passwd = "12345678"
|
||||
# staConnect.dut_security = "wpa2"
|
||||
# staConnect.station_names = ["sta0020", 'sta0021', 'sta0022', 'sta0023']
|
||||
# staConnect.runtime_secs = 20
|
||||
# staConnect.cleanup_on_exit = True
|
||||
# staConnect.run()
|
||||
# run_results = staConnect.get_result_list()
|
||||
# for result in run_results:
|
||||
# print("test result: " + result)
|
||||
# if staConnect.passes() == True:
|
||||
# client.update_testrail(case_id=941, run_id=rid, status_id=1,
|
||||
# msg='client Connectivity to 5GHZ Open SSID is Passed ')
|
||||
# else:
|
||||
# client.update_testrail(case_id=941, run_id=rid, status_id=5,
|
||||
# msg='client connectivity to 5GHZ OPEN SSID is Failed')
|
||||
#
|
||||
# # Check for externally run test case results.
|
||||
# def TestCase_LF_External(self, rid):
|
||||
# # https://stackoverflow.com/questions/3207219/how-do-i-list-all-files-of-a-directory
|
||||
# results = glob.glob("%s/*_CICD_RESULTS.txt" % external_results_dir)
|
||||
# for r in results:
|
||||
# rfile = open(r, 'r')
|
||||
# lines = rfile.readlines()
|
||||
#
|
||||
# # File contents looks something like:
|
||||
# # CASE_ID 9999
|
||||
# # RUN_ID 15
|
||||
# # STATUS 1
|
||||
# # MSG Test passed nicely
|
||||
# # MSG Build ID: deadbeef
|
||||
# # MSG Results: http://cicd.telecominfraproject.com
|
||||
#
|
||||
# _case_id = -1
|
||||
# _status_id = 1 # Default to pass
|
||||
# _msg = ""
|
||||
# _rid = rid
|
||||
#
|
||||
# for line in Lines:
|
||||
# m = re.search(r'(\S+) (.*)', line)
|
||||
# k = m.group(0);
|
||||
# v = m.group(1);
|
||||
#
|
||||
# if k == "CASE_ID":
|
||||
# _case_id = v
|
||||
# if k == "RUN_ID":
|
||||
# _rid = v
|
||||
# if k == "STATUS":
|
||||
# _status_id = v
|
||||
# if k == "MSG":
|
||||
# if _msg == "":
|
||||
# _msg == v
|
||||
# else:
|
||||
# _msg += "\n"
|
||||
# _msg += v
|
||||
# if _case_id != -1:
|
||||
# client.update_testrail(case_id=_case_id, run_id=_rid, status_id=_status_id, msg=_msg)
|
||||
# os.unlink(r)
|
||||
#
|
||||
# def TestCase_939(self, rid):
|
||||
# ''' Client Count in MQTT Log'''
|
||||
# try:
|
||||
# print("Counting clients in MQTT")
|
||||
# s = pxssh.pxssh()
|
||||
# # aws_host = os.getenv(AWS_HOST)
|
||||
# # aws_user=os.getenv(AWS_USER)
|
||||
# os.chdir(local_dir)
|
||||
# # makesure the client key file is in the fame directory to login to AWS VM
|
||||
# s.login(aws_host, aws_user, ssh_key='id_key.pem')
|
||||
# s.sendline('kubectl cp tip-wlan-opensync-gw-static-f795d45-ctb5z:/app/logs/mqttData.log mqttData.log')
|
||||
# # run a command
|
||||
# s.prompt() # match the prompt
|
||||
# print(s.before) # print everything before the prompt.
|
||||
# s.sendline()
|
||||
# s.logout()
|
||||
# # return "pass"
|
||||
# print(aws_host, aws_user)
|
||||
# ssh = paramiko.SSHClient()
|
||||
# ssh.load_system_host_keys()
|
||||
# ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
# k = paramiko.RSAKey.from_private_key_file('id_key.pem')
|
||||
# ssh.connect(aws_host, username=aws_user, pkey=k)
|
||||
# print("Connected")
|
||||
# scp = SCPClient(ssh.get_transport())
|
||||
# scp.get("mqttData.log")
|
||||
# scp.close()
|
||||
# # Get the client Count
|
||||
# ClientCount = subprocess.getoutput(
|
||||
# 'grep \'{\"nodeID\"\' mqttData.log | grep clientList | tail -1 |cut -d \'=\' -f 3 | json_pp | grep macAddres | grep \'04:F0:21:55\' | tr -d , | sort | uniq | wc -l ')
|
||||
# print("client count =", ClientCount)
|
||||
# if (int(ClientCount) >= 1):
|
||||
# client.update_testrail(case_id=939, run_id=rid, status_id=1,
|
||||
# msg=ClientCount + ' Client/Clients Connected ')
|
||||
# else:
|
||||
# client.update_testrail(case_id=939, run_id=rid, status_id=5,
|
||||
# msg='No Client Connected')
|
||||
# except pxssh.ExceptionPxssh as e:
|
||||
# print("ALERT !!!!!! pxssh failed on login.")
|
||||
# print(e)
|
||||
#
|
||||
#
|
||||
# params = {
|
||||
# 'src': local_dir,
|
||||
# 'user2': ap_user,
|
||||
# 'host2': '10.10.10.207',
|
||||
# 'tgt': '/tmp/',
|
||||
# 'pwd': ap_pw,
|
||||
# 'opts': ''
|
||||
# }
|
||||
# apModel = "ecw5410"
|
||||
#
|
||||
# url = 'https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/'
|
||||
# url = url + apModel
|
||||
# projId = client.get_project_id(project_name='WLAN')
|
||||
# print("TIP WLAN Project ID Is :", projId)
|
||||
#
|
||||
# rid = client.get_run_id(test_run_name='TIP-DEMO4')
|
||||
# print(rid)
|
||||
# Test: RunTest = RunTest()
|
||||
# Build: GetBuild = GetBuild()
|
||||
# '''
|
||||
# binary_fetch_result = Build.get_latest_image(url)
|
||||
# print("UPDATING TEST RAIL WITH TEST RESULT FOR CASE_ID 940: Download latest openwrt image from Jfrog")
|
||||
#
|
||||
# if binary_fetch_result == 'pass':
|
||||
# client.update_testrail(case_id=940, run_id=rid, status_id=1, msg='latest firmware downloaded')
|
||||
# else:
|
||||
# client.update_testrail(case_id=940, run_id=rid, status_id=5, msg='Firmware Download failed')
|
||||
#
|
||||
# sleep(10)
|
||||
# print("Upgrading AP with latest image downloaded")
|
||||
# ap_upgrade_result = openwrt_ap.ap_upgrade(**params)
|
||||
# sleep(10)
|
||||
# print("UPDATING TEST RAIL WITH TEST RESULT FOR CASE_ID 937")
|
||||
# sleep(10)
|
||||
# if ap_upgrade_result == 'pass':
|
||||
# client.update_testrail(case_id=937, run_id=rid, status_id=1, msg='AP upgraded with latest Firmware')
|
||||
# else:
|
||||
# client.update_testrail(case_id=937, run_id=rid, status_id=5, msg='Firmware upgrade failed in AP ')
|
||||
# print("Upgrading AWS Opensync gateway with latest docker image from Jfrog")
|
||||
# OpensyncGw_UpgResult = Build.run_opensyncgw_in_aws()
|
||||
# if OpensyncGw_UpgResult == 'pass':
|
||||
# client.update_testrail(case_id=936, run_id=rid, status_id=1, msg='Opensync GW upgraded with latest Firmware')
|
||||
# else:
|
||||
# client.update_testrail(case_id=936, run_id=rid, status_id=5, msg='Firmware upgrade failed in Opensync Gateway')
|
||||
# sleep(10)
|
||||
# '''
|
||||
# pprint.pprint(params)
|
||||
# ap_cert_result = openwrt_ap.apCopyCert(**params)
|
||||
# print("Executing TestCase 938: single Client Connectivity test")
|
||||
# openwrt_ap.restartGw(**params)
|
||||
# Test.TestCase_938(rid)
|
||||
#
|
||||
# print("Executing TestCase 941: Multi Client Connectivity test")
|
||||
# Test.TestCase_941(rid)
|
||||
# sleep(100)
|
||||
# print("Executing TestCase 939:Counting The number of Clients Connected from MQTT")
|
||||
# Test.TestCase_939(rid)
|
||||
#
|
||||
#
|
||||
#
|
||||
#
|
||||
203
py-scripts/scripts_deprecated/cicd_testrail.py
Executable file
203
py-scripts/scripts_deprecated/cicd_testrail.py
Executable file
@@ -0,0 +1,203 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""TestRail API binding for Python 3.x.
|
||||
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
|
||||
import requests
|
||||
from pprint import pprint
|
||||
import os
|
||||
tr_user = os.getenv('TR_USER')
|
||||
tr_pw=os.getenv('TR_PW')
|
||||
|
||||
|
||||
|
||||
class APIClient:
|
||||
def __init__(self, base_url):
|
||||
self.user = tr_user
|
||||
self.password = tr_pw
|
||||
if not base_url.endswith('/'):
|
||||
base_url += '/'
|
||||
self.__url = base_url + 'index.php?/api/v2/'
|
||||
|
||||
|
||||
def send_get(self, uri, filepath=None):
|
||||
"""Issue a GET request (read) against the API.
|
||||
|
||||
Args:
|
||||
uri: The API method to call including parameters, e.g. get_case/1.
|
||||
filepath: The path and file name for attachment download; used only
|
||||
for 'get_attachment/:attachment_id'.
|
||||
|
||||
Returns:
|
||||
A dict containing the result of the request.
|
||||
"""
|
||||
return self.__send_request('GET', uri, filepath)
|
||||
|
||||
def send_post(self, uri, data):
|
||||
"""Issue a POST request (write) against the API.
|
||||
|
||||
Args:
|
||||
uri: The API method to call, including parameters, e.g. add_case/1.
|
||||
data: The data to submit as part of the request as a dict; strings
|
||||
must be UTF-8 encoded. If adding an attachment, must be the
|
||||
path to the file.
|
||||
|
||||
Returns:
|
||||
A dict containing the result of the request.
|
||||
"""
|
||||
return self.__send_request('POST', uri, data)
|
||||
|
||||
def __send_request(self, method, uri, data):
|
||||
url = self.__url + uri
|
||||
|
||||
auth = str(
|
||||
base64.b64encode(
|
||||
bytes('%s:%s' % (self.user, self.password), 'utf-8')
|
||||
),
|
||||
'ascii'
|
||||
).strip()
|
||||
headers = {'Authorization': 'Basic ' + auth}
|
||||
print("Method =" , method)
|
||||
|
||||
if method == 'POST':
|
||||
if uri[:14] == 'add_attachment': # add_attachment API method
|
||||
files = {'attachment': (open(data, 'rb'))}
|
||||
response = requests.post(url, headers=headers, files=files)
|
||||
files['attachment'].close()
|
||||
else:
|
||||
headers['Content-Type'] = 'application/json'
|
||||
payload = bytes(json.dumps(data), 'utf-8')
|
||||
response = requests.post(url, headers=headers, data=payload)
|
||||
else:
|
||||
headers['Content-Type'] = 'application/json'
|
||||
response = requests.get(url, headers=headers)
|
||||
print("headers = ", headers)
|
||||
print("resonse=", response)
|
||||
print("response code =", response.status_code)
|
||||
|
||||
if response.status_code > 201:
|
||||
|
||||
try:
|
||||
error = response.json()
|
||||
except: # response.content not formatted as JSON
|
||||
error = str(response.content)
|
||||
#raise APIError('TestRail API returned HTTP %s (%s)' % (response.status_code, error))
|
||||
print('TestRail API returned HTTP %s (%s)' % (response.status_code, error))
|
||||
return
|
||||
else:
|
||||
print(uri[:15])
|
||||
if uri[:15] == 'get_attachments': # Expecting file, not JSON
|
||||
try:
|
||||
print('opening file')
|
||||
print (str(response.content))
|
||||
open(data, 'wb').write(response.content)
|
||||
print('opened file')
|
||||
return (data)
|
||||
except:
|
||||
return ("Error saving attachment.")
|
||||
else:
|
||||
|
||||
try:
|
||||
return response.json()
|
||||
except: # Nothing to return
|
||||
return {}
|
||||
def get_project_id(self, project_name):
|
||||
"Get the project ID using project name"
|
||||
project_id = None
|
||||
projects = client.send_get('get_projects')
|
||||
pprint(projects)
|
||||
for project in projects:
|
||||
if project['name']== project_name:
|
||||
project_id = project['id']
|
||||
# project_found_flag=True
|
||||
break
|
||||
print("project Id =",project_id)
|
||||
return project_id
|
||||
|
||||
def get_run_id(self, test_run_name):
|
||||
"Get the run ID using test name and project name"
|
||||
run_id = None
|
||||
project_id = client.get_project_id(project_name='WLAN')
|
||||
|
||||
try:
|
||||
test_runs = client.send_get('get_runs/%s' % (project_id))
|
||||
print("------------TEST RUNS----------")
|
||||
pprint(test_runs)
|
||||
|
||||
except Exception:
|
||||
print
|
||||
'Exception in update_testrail() updating TestRail.'
|
||||
return None
|
||||
else:
|
||||
for test_run in test_runs:
|
||||
if test_run['name'] == test_run_name:
|
||||
run_id = test_run['id']
|
||||
print("run Id in Test Runs=",run_id)
|
||||
break
|
||||
return run_id
|
||||
|
||||
def update_testrail(self, case_id, run_id, status_id, msg):
|
||||
"Update TestRail for a given run_id and case_id"
|
||||
update_flag = False
|
||||
# Get the TestRail client account details
|
||||
# Update the result in TestRail using send_post function.
|
||||
# Parameters for add_result_for_case is the combination of runid and case id.
|
||||
# status_id is 1 for Passed, 2 For Blocked, 4 for Retest and 5 for Failed
|
||||
#status_id = 1 if result_flag else 5
|
||||
|
||||
print("result status is = ", status_id)
|
||||
print("case id=", case_id)
|
||||
print("run id passed to update is ", run_id, case_id)
|
||||
if run_id is not None:
|
||||
try:
|
||||
result = client.send_post(
|
||||
'add_result_for_case/%s/%s' % (run_id, case_id),
|
||||
{'status_id': status_id, 'comment': msg})
|
||||
print("result in post",result)
|
||||
except Exception:
|
||||
print
|
||||
'Exception in update_testrail() updating TestRail.'
|
||||
|
||||
else:
|
||||
print
|
||||
'Updated test result for case: %s in test run: %s with msg:%s' % (case_id, run_id, msg)
|
||||
|
||||
return update_flag
|
||||
|
||||
|
||||
|
||||
client: APIClient = APIClient('https://telecominfraproject.testrail.com')
|
||||
|
||||
'''
|
||||
case = client.send_get('get_case/1')
|
||||
print("---------TEST CASE 1---------")
|
||||
pprint(case)
|
||||
case = client.send_get('get_case/2')
|
||||
print("---------TEST CASE 2---------")
|
||||
pprint(case)
|
||||
print ("----------TEST Project ID----------")
|
||||
proj_id = client.get_project_id(project_name= "WLAN")
|
||||
pprint(proj_id)
|
||||
|
||||
#REST API POSTMAN PROJECT
|
||||
projId = client.get_project_id(project_name= "REST-API-POSTMAN")
|
||||
pprint("REST API POSTMAN PROJECT ID IS :", projId)
|
||||
|
||||
#print("---------TEST RUN ID-----------")
|
||||
#rid = client.get_run_id(test_run_name='Master',project_name='WLAN')
|
||||
rid=client.get_run_id(test_run_name= 'Master-Run3')
|
||||
pprint(rid)
|
||||
|
||||
result: bool= client.update_testrail(case_id = 1, run_id=rid, status_id = 5, msg ='Test Failed')
|
||||
|
||||
#result = client.send_get('get_attachment/:1', '/Users/syamadevi/Desktop/syama/python-test/TestRail/testreport.pdf')
|
||||
#print(result)
|
||||
#project_report= client.send_get("get_reports/:%s" %proj_id)
|
||||
#print(project_report)
|
||||
'''
|
||||
class APIError(Exception):
|
||||
pass
|
||||
400
py-scripts/scripts_deprecated/cicd_testrailAndInfraSetup.py
Executable file
400
py-scripts/scripts_deprecated/cicd_testrailAndInfraSetup.py
Executable file
@@ -0,0 +1,400 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import base64
|
||||
import urllib.request
|
||||
from bs4 import BeautifulSoup
|
||||
import ssl
|
||||
import subprocess
|
||||
from artifactory import ArtifactoryPath
|
||||
import tarfile
|
||||
import os
|
||||
import pexpect
|
||||
from pexpect import pxssh
|
||||
import sys
|
||||
import paramiko
|
||||
from scp import SCPClient
|
||||
|
||||
local_dir=os.getenv('LOG_DIR')
|
||||
print("Local Directory where all files will be copied and logged", local_dir)
|
||||
cicd_user=os.getenv('CICD_USER')
|
||||
print("cicd_user = ", cicd_user)
|
||||
cicd_pw=os.getenv('CICD_PW')
|
||||
print("cicd pw =",cicd_pw)
|
||||
ap_pw=os.getenv('AP_PW')
|
||||
ap_user=os.getenv('AP_USER')
|
||||
tr_user=os.getenv('TR_USER')
|
||||
print("Testrail user id = ", tr_user)
|
||||
tr_pw=os.getenv('TR_PW')
|
||||
print ("Testrail password =", tr_pw)
|
||||
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append('../py-json')
|
||||
|
||||
from LANforge.LFUtils import *
|
||||
# if you lack __init__.py in this directory you will not find sta_connect module#
|
||||
import sta_connect
|
||||
import testrail_api
|
||||
from sta_connect import StaConnect
|
||||
from testrail_api import APIClient
|
||||
|
||||
client: APIClient = APIClient('https://telecominfraproject.testrail.com')
|
||||
client.user = tr_user
|
||||
client.password = tr_pw
|
||||
|
||||
|
||||
print('Beginning file download with requests')
|
||||
|
||||
class GetBuild:
|
||||
def __init__(self):
|
||||
self.user = cicd_user
|
||||
self.password = cicd_pw
|
||||
ssl._create_default_https_context = ssl._create_unverified_context
|
||||
|
||||
def get_latest_image(self,url):
|
||||
|
||||
auth = str(
|
||||
base64.b64encode(
|
||||
bytes('%s:%s' % (cicd_user,cicd_pw ), 'utf-8')
|
||||
),
|
||||
'ascii'
|
||||
).strip()
|
||||
headers = {'Authorization': 'Basic ' + auth}
|
||||
|
||||
''' FIND THE LATEST FILE NAME'''
|
||||
print(url)
|
||||
req = urllib.request.Request(url, headers=headers)
|
||||
response = urllib.request.urlopen(req)
|
||||
html = response.read()
|
||||
soup = BeautifulSoup(html, features="html.parser")
|
||||
last_link = soup.find_all('a', href=True)[-1]
|
||||
latest_file=last_link['href']
|
||||
|
||||
filepath = local_dir
|
||||
os.chdir(filepath)
|
||||
#file_url = url + latest_file
|
||||
|
||||
''' Download the binary file from Jfrog'''
|
||||
path = ArtifactoryPath(url,auth=(cicd_user, cicd_pw))
|
||||
path.touch()
|
||||
for file in path:
|
||||
print('File =', file)
|
||||
|
||||
path = ArtifactoryPath(file, auth=(cicd_user, cicd_pw))
|
||||
print("file to be downloaded :" ,latest_file)
|
||||
print("File Path:",file)
|
||||
with path.open() as des:
|
||||
with open(latest_file, "wb") as out:
|
||||
out.write(des.read())
|
||||
des.close()
|
||||
print("Extract the tar.gz file and upgrade the AP ")
|
||||
housing_tgz = tarfile.open(latest_file)
|
||||
housing_tgz.extractall()
|
||||
housing_tgz.close()
|
||||
return "pass"
|
||||
print("Extract the tar file, and copying the file to Linksys AP directory")
|
||||
#with open("/Users/syamadevi/Desktop/syama/ea8300/ap_sysupgrade_output.log", "a") as output:
|
||||
# subprocess.call("scp /Users/syamadevi/Desktop/syama/ea8300/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin root@192.100.1.1:/tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin",shell=True, stdout=output,
|
||||
# stderr=output)
|
||||
|
||||
print('SSH to Linksys and upgrade the file')
|
||||
|
||||
'''
|
||||
|
||||
ssh = SSHClient()
|
||||
ssh.load_system_host_keys()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
ssh.connect(hostname=ap_ip,
|
||||
port='22',
|
||||
username=ap_user,
|
||||
password=ap_pw,
|
||||
look_for_keys=False,
|
||||
pkey='load_key_if_relevant')
|
||||
|
||||
# SCPCLient takes a paramiko transport as its only argument
|
||||
scp = SCPClient(ssh.get_transport())
|
||||
|
||||
scp.put('test.txt', 'testD.txt')
|
||||
scp.close()
|
||||
|
||||
|
||||
stdin, stdout, stderr = ssh.exec_command('sysupgrade /tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin')
|
||||
|
||||
for line in stdout:
|
||||
print (line.strip('\n'))
|
||||
client.close()
|
||||
'''
|
||||
|
||||
def run_opensyncgw_in_docker(self):
|
||||
#my_env = os.environ.copy()
|
||||
#my_env["userpass"] = user_password
|
||||
#my_command = 'python --version'
|
||||
#subprocess.Popen('echo', env=my_env)
|
||||
with open(local_dir +"docker_jfrog_login.log", "a") as output:
|
||||
subprocess.call("docker login --username" + cicd_user + "--password" + cicd_pw + " https://tip-tip-wlan-cloud-docker-repo.jfrog.io", shell=True, stdout=output,
|
||||
stderr=output)
|
||||
with open(local_dir +"opensyncgw_upgrade.log", "a") as output:
|
||||
subprocess.call("docker pull tip-tip-wlan-cloud-docker-repo.jfrog.io/opensync-gateway-and-mqtt:0.0.1-SNAPSHOT", shell=True, stdout=output,
|
||||
stderr=output)
|
||||
with open(local_dir+"opensyncgw.log", "a") as output:
|
||||
subprocess.call("docker run --rm -i -p 1883:1883 -p 6640:6640 -p 6643:6643 -p 4043:4043 \
|
||||
-v ~/mosquitto/data:/mosquitto/data \
|
||||
-v ~/mosquitto/log:/mosquitto/log \
|
||||
-v ~/wlan-pki-cert-scripts:/opt/tip-wlan/certs \
|
||||
-v ~/app/log:/app/logs \
|
||||
-v ~//app/config:/app/config \
|
||||
-e OVSDB_CONFIG_FILE='/app/config/config_2_ssids.json' \
|
||||
tip-tip-wlan-cloud-docker-repo.jfrog.io/opensync-gateway-and-mqtt:0.0.1-SNAPSHOT",shell=True, stdout=output,
|
||||
stderr=output)
|
||||
print("opensync Gateway is running")
|
||||
return "pass"
|
||||
|
||||
def run_opensyncgw_in_aws(self):
|
||||
try:
|
||||
s = pxssh.pxssh()
|
||||
|
||||
os.chdir(local_dir)
|
||||
print("AWS OPENSYNC GW UPGRADE VIA HELM")
|
||||
print(
|
||||
'Helm upgrades the latest image in the GW if a new image is found from jfrog and the AWS gateway is not upto date ')
|
||||
# makesure the client key file is in the fame directory to login to AWS VM
|
||||
s.login(aws_host, aws_user, ssh_key='id_key.pem')
|
||||
s.sendline('kubectl get pods')
|
||||
|
||||
# run a command
|
||||
s.prompt() # match the prompt
|
||||
print(s.before) # print everything before the prompt.
|
||||
s.sendline(
|
||||
'helm upgrade tip-wlan wlan-cloud-helm/tip-wlan/ -n default -f wlan-cloud-helm/tip-wlan/resources/environments/dev-amazon.yaml')
|
||||
s.prompt() # match the prompt
|
||||
print(s.before) # print everything before the prompt.
|
||||
s.sendline('kubectl get pods')
|
||||
|
||||
# run a command
|
||||
s.prompt() # match the prompt
|
||||
print(s.before) # print everything before the prompt.
|
||||
s.logout()
|
||||
return "pass"
|
||||
|
||||
except pxssh.ExceptionPxssh as e:
|
||||
print("ALERT !!!!!! pxssh failed on login.")
|
||||
print(e)
|
||||
|
||||
|
||||
class openwrt_linksys:
|
||||
|
||||
def ap_upgrade(src,user2,host2,tgt,pwd,opts='', timeout=60):
|
||||
''' Performs the scp command. Transfers file(s) from local host to remote host '''
|
||||
print("AP Model getting upgarded is :", apModel)
|
||||
if apModel == "ecw5410":
|
||||
ap_firmware = 'openwrt-ipq806x-generic-edgecore_ecw5410-squashfs-nand-sysupgrade.bin'
|
||||
AP_IP = '10.10.10.207'
|
||||
else:
|
||||
if apModel == "ea8300":
|
||||
ap_firmware = 'openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin'
|
||||
AP_IP = '10.10.10.208'
|
||||
host2 = AP_IP
|
||||
src = src+ ap_firmware
|
||||
print("src =", src)
|
||||
print("AP IP ", AP_IP)
|
||||
print("AP USER =", ap_user)
|
||||
print("AP PASSWORD =", ap_pw)
|
||||
cmd = f'''/bin/bash -c "scp {opts} {src} {user2}@{AP_IP}:{tgt}"'''
|
||||
print("Executing the following cmd:",cmd,sep='\n')
|
||||
|
||||
tmpFl = '/tmp/scp.log'
|
||||
fp = open(tmpFl,'wb')
|
||||
print(tmpFl)
|
||||
childP = pexpect.spawn(cmd,timeout=timeout)
|
||||
try:
|
||||
childP.sendline(cmd)
|
||||
childP.expect([f"{user2}@{host2}'s password:"])
|
||||
childP.sendline(pwd)
|
||||
childP.logfile = fp
|
||||
childP.expect(pexpect.EOF)
|
||||
childP.close()
|
||||
fp.close()
|
||||
fp = open(tmpFl,'r')
|
||||
stdout = fp.read()
|
||||
fp.close()
|
||||
|
||||
if childP.exitstatus != 0:
|
||||
raise Exception(stdout)
|
||||
except KeyboardInterrupt:
|
||||
childP.close()
|
||||
fp.close()
|
||||
return
|
||||
print(stdout)
|
||||
|
||||
try:
|
||||
s = pxssh.pxssh()
|
||||
s.login(host2, user2, pwd)
|
||||
s.sendline('sysupgrade /tmp/openwrt-ipq40xx-generic-linksys_ea8300-squashfs-sysupgrade.bin')
|
||||
s.prompt() # match the prompt
|
||||
print(s.before) # print everything before the prompt.
|
||||
s.sendline('service opensync restart')
|
||||
s.prompt() # match the prompt
|
||||
print(s.before) # print everything before the prompt.
|
||||
s.logout()
|
||||
return "pass"
|
||||
except pxssh.ExceptionPxssh as e:
|
||||
print("ALERT !!!!!! pxssh failed on login.")
|
||||
print(e)
|
||||
|
||||
|
||||
class RunTest:
|
||||
def TestCase_938(self, rid):
|
||||
'''SINGLE CLIENT CONNECTIVITY'''
|
||||
staConnect = StaConnect("10.10.10.201", 8080, _debugOn=False)
|
||||
staConnect.sta_mode = 0
|
||||
staConnect.upstream_resource = 1
|
||||
staConnect.upstream_port = "eth2"
|
||||
staConnect.radio = "wiphy1"
|
||||
staConnect.resource = 1
|
||||
staConnect.dut_ssid = "autoProvisionedSsid-5u"
|
||||
#staConnect.dut_passwd = "4C0nnectUS!"
|
||||
staConnect.dut_passwd = "[BLANK]"
|
||||
staConnect.dut_security = "open"
|
||||
staConnect.station_names = ["sta01010"]
|
||||
staConnect.runtime_secs = 30
|
||||
staConnect.cleanup_on_exit = False
|
||||
staConnect.run()
|
||||
run_results = staConnect.get_result_list()
|
||||
for result in run_results:
|
||||
print("test result: " + result)
|
||||
#result = 'pass'
|
||||
print("Single Client Connectivity :",staConnect.passes)
|
||||
if staConnect.passes() == True:
|
||||
client.update_testrail(case_id=938, run_id=rid, status_id=1, msg='client Connectivity to 5GHZ Open SSID is Passed ')
|
||||
else:
|
||||
client.update_testrail(case_id=938, run_id=rid, status_id=5, msg='client connectivity to 5GHZ OPEN SSID is Failed')
|
||||
|
||||
def TestCase_941(self, rid):
|
||||
#MULTI CLIENT CONNECTIVITY
|
||||
staConnect = StaConnect("10.10.10.201", 8080, _debugOn=False)
|
||||
staConnect.sta_mode = 0
|
||||
staConnect.upstream_resource = 1
|
||||
staConnect.upstream_port = "eth2"
|
||||
staConnect.radio = "wiphy1"
|
||||
staConnect.resource = 1
|
||||
staConnect.dut_ssid = "autoProvisionedSsid-5u"
|
||||
# staConnect.dut_passwd = "4C0nnectUS!"
|
||||
staConnect.dut_passwd = "[BLANK]"
|
||||
staConnect.dut_security = "open"
|
||||
staConnect.station_names = ["sta0020", 'sta0021', 'sta0022', 'sta0023']
|
||||
staConnect.runtime_secs = 30
|
||||
staConnect.cleanup_on_exit = False
|
||||
staConnect.run()
|
||||
run_results = staConnect.get_result_list()
|
||||
for result in run_results:
|
||||
print("test result: " + result)
|
||||
if staConnect.passes() == True:
|
||||
client.update_testrail(case_id=941, run_id=rid, status_id=1,
|
||||
msg='client Connectivity to 5GHZ Open SSID is Passed ')
|
||||
else:
|
||||
client.update_testrail(case_id=941, run_id=rid, status_id=5,
|
||||
msg='client connectivity to 5GHZ OPEN SSID is Failed')
|
||||
|
||||
def TestCase_939(self, rid):
|
||||
''' Client Count in MQTT Log'''
|
||||
try:
|
||||
print("Counting clients in MQTT")
|
||||
s = pxssh.pxssh()
|
||||
#aws_host = os.getenv(AWS_HOST)
|
||||
#aws_user=os.getenv(AWS_USER)
|
||||
os.chdir(local_dir)
|
||||
# makesure the client key file is in the fame directory to login to AWS VM
|
||||
s.login(aws_host,aws_user,ssh_key='id_key.pem')
|
||||
s.sendline('kubectl cp tip-wlan-opensync-gw-static-f795d45-ctb5z:/app/logs/mqttData.log mqttData.log')
|
||||
# run a command
|
||||
s.prompt() # match the prompt
|
||||
print(s.before) # print everything before the prompt.
|
||||
s.sendline()
|
||||
s.logout()
|
||||
#return "pass"
|
||||
print(aws_host, aws_user)
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.load_system_host_keys()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
k = paramiko.RSAKey.from_private_key_file('id_key.pem')
|
||||
ssh.connect(aws_host, username=aws_user, pkey=k)
|
||||
print("Connected")
|
||||
scp = SCPClient(ssh.get_transport())
|
||||
scp.get("mqttData.log")
|
||||
scp.close()
|
||||
# Get the client Count
|
||||
ClientCount = subprocess.getoutput(
|
||||
'grep \'{\"nodeID\"\' mqttData.log | grep clientList | tail -n 1 |cut -d \'=\' -f 3 |jq --raw-output \'{ nodeID , ncl : [ .clients[]| [ .clientList[] | select ( . | .connected == true ) |.macAddress ] | length ] } | .ncl | add |@text\'')
|
||||
print("client count =", ClientCount)
|
||||
if (int(ClientCount) >= 1):
|
||||
client.update_testrail(case_id=939, run_id=rid, status_id=1,
|
||||
msg=ClientCount + ' Client/Clients Connected ')
|
||||
else:
|
||||
client.update_testrail(case_id=939, run_id=rid, status_id=5,
|
||||
msg='No Client Connected')
|
||||
except pxssh.ExceptionPxssh as e:
|
||||
print("ALERT !!!!!! pxssh failed on login.")
|
||||
print(e)
|
||||
|
||||
|
||||
params = {
|
||||
'src': local_dir,
|
||||
'user2': ap_user,
|
||||
'host2': "x.x.x.x",
|
||||
'tgt': '/tmp/',
|
||||
'pwd': ap_pw,
|
||||
'opts': ''
|
||||
}
|
||||
apModel= "ecw5410"
|
||||
|
||||
|
||||
url = 'https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/'
|
||||
url = url + apModel
|
||||
projId = client.get_project_id(project_name= 'WLAN')
|
||||
print("TIP WLAN Project ID Is :", projId)
|
||||
|
||||
rid = client.get_run_id(test_run_name= 'TIP-DEMO4')
|
||||
print(rid)
|
||||
Test: RunTest = RunTest()
|
||||
Build: GetBuild = GetBuild()
|
||||
binary_fetch_result = Build.get_latest_image(url)
|
||||
print("UPDATING TEST RAIL WITH TEST RESULT FOR CASE_ID 940: Download latest openwrt image from Jfrog")
|
||||
|
||||
if binary_fetch_result == 'pass':
|
||||
client.update_testrail(case_id=940, run_id=rid, status_id=1, msg='latest firmware downloaded')
|
||||
else:
|
||||
client.update_testrail(case_id=940, run_id=rid, status_id=5, msg='Firmware Download failed')
|
||||
|
||||
sleep(10)
|
||||
print("Upgrading AP with latest image downloaded")
|
||||
ap_upgrade_result = openwrt_linksys.ap_upgrade(**params)
|
||||
sleep(10)
|
||||
print("UPDATING TEST RAIL WITH TEST RESULT FOR CASE_ID 937")
|
||||
sleep(10)
|
||||
if ap_upgrade_result == 'pass':
|
||||
client.update_testrail(case_id=937, run_id=rid, status_id=1, msg='AP upgraded with latest Firmware')
|
||||
else:
|
||||
client.update_testrail(case_id=937, run_id=rid, status_id=5, msg='Firmware upgrade failed in AP ')
|
||||
print("Upgrading AWS Opensync gateway with latest docker image from Jfrog")
|
||||
OpensyncGw_UpgResult = Build.run_opensyncgw_in_aws()
|
||||
if OpensyncGw_UpgResult == 'pass':
|
||||
client.update_testrail(case_id=936, run_id=rid, status_id=1, msg='Opensync GW upgraded with latest Firmware')
|
||||
else:
|
||||
client.update_testrail(case_id=936, run_id=rid, status_id=5, msg='Firmware upgrade failed in Opensync Gateway')
|
||||
sleep(10)
|
||||
|
||||
print("Executing TestCase 938: single Client Connectivity test")
|
||||
Test.TestCase_938(rid)
|
||||
sleep(10)
|
||||
print("Executing TestCase 941: Multi Client Connectivity test")
|
||||
Test.TestCase_941(rid)
|
||||
sleep(10)
|
||||
print("Executing TestCase 939:Counting The number of Clients Connected from MQTT")
|
||||
Test.TestCase_939(rid)
|
||||
|
||||
|
||||
|
||||
|
||||
154
py-scripts/scripts_deprecated/csv_processor.py
Executable file
154
py-scripts/scripts_deprecated/csv_processor.py
Executable file
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
import pandas as pd
|
||||
|
||||
#https://pandas.pydata.org/pandas-docs/stable/user_guide/visualization.html
|
||||
#https://queirozf.com/entries/pandas-dataframe-plot-examples-with-matplotlib-pyplot
|
||||
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
|
||||
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
||||
|
||||
class L3CSVParcer():
|
||||
def __init__(self,csv_file):
|
||||
|
||||
|
||||
# left this in for testing
|
||||
'''csv_obj = open(csv_file, 'r')
|
||||
csv_reader = csv.reader(csv_obj)
|
||||
print(csv_reader)
|
||||
|
||||
for row in csv_reader:
|
||||
if row[1] == 'rx':
|
||||
print(row)'''
|
||||
|
||||
include_summary = ['Time epoch','Time','Monitor','least','most','average']
|
||||
self.csv_file = csv_file
|
||||
df_s = pd.read_csv(self.csv_file,header = 0, usecols = lambda column : any(substr in column for substr in include_summary))
|
||||
|
||||
print('{}'.format(csv_file))
|
||||
csv_file_summary = self.csv_file.replace('results_','results_summary_')
|
||||
|
||||
df_s.to_csv(csv_file_summary, index = False, header=True)
|
||||
|
||||
include_raw = ['Time epoch','Time','Monitor','LT','MT']
|
||||
self.csv_file = csv_file
|
||||
df_r = pd.read_csv(self.csv_file,header = 0, usecols = lambda column : any(substr in column for substr in include_raw))
|
||||
|
||||
csv_file_raw = self.csv_file.replace('results_','results_raw_')
|
||||
df_r.to_csv(csv_file_raw, index = False, header=True)
|
||||
|
||||
'''df_rx_delta = df_r.loc[df['Monitor'] == 'rx_delta']
|
||||
|
||||
df_rx_delta.plot(x='Time epoch', y='average_rx_data')
|
||||
plt.show()
|
||||
|
||||
total_cols = len(df.axes[0])
|
||||
|
||||
|
||||
print(df.columns)
|
||||
|
||||
print(df.loc[df['Monitor'] == 'rx_delta'])
|
||||
print(df.loc[df['Monitor'] == 'rx'])
|
||||
|
||||
print(df.loc[df['Monitor'] == 'rx_delta', df.columns != 'Time'])
|
||||
|
||||
|
||||
df_rx_delta = df.loc[df['Monitor'] == 'rx_delta']
|
||||
|
||||
print(df_rx_delta.describe())
|
||||
|
||||
df_rx_delta.plot(x='Time epoch', y='average_rx_data')
|
||||
plt.show()
|
||||
|
||||
df_rx_delta.plot(x='Time', y='average_rx_data')
|
||||
plt.show()
|
||||
|
||||
df_rx_drop_pct = df.loc[df['Monitor'] == 'rx_drop_percent']
|
||||
print(df_rx_drop_pct)
|
||||
df_rx_delta.plot(x='Time epoch', y='rx_drop_percent')
|
||||
plt.show()
|
||||
|
||||
|
||||
df2 = df.filter(regex='LT-s')
|
||||
print(df2)
|
||||
#plt.plot(df2[0], df2[1]
|
||||
#plt.show()
|
||||
|
||||
df2_mean = df2.mean().sort_values(ascending=False)
|
||||
|
||||
print(df2_mean)
|
||||
|
||||
df2_mean_no_outliers = df2_mean[df2_mean(df2_mean.quantile(.10), df2_mean.quantile(.90))]
|
||||
|
||||
print("no outliers")
|
||||
print(df2_mean_no_outliers)
|
||||
|
||||
print("Top 10")
|
||||
print(df2_mean.head(10))
|
||||
print("Bottom 10")
|
||||
print(df2_mean.tail(10))
|
||||
|
||||
print("mean others")
|
||||
|
||||
|
||||
|
||||
# set display format otherwise get scientific notation
|
||||
pd.set_option('display.float_format', lambda x: '%.3f' % x)
|
||||
|
||||
df_mean = df_rx_delta.mean().sort_values()
|
||||
|
||||
#print(df_mean)
|
||||
|
||||
print(df_mean[0])
|
||||
|
||||
|
||||
|
||||
|
||||
#df_uni_cast = [col for col in df_rx_delta if 'LT' in col]
|
||||
#df_LT_rx_delta_mean = df_uni_cast.mean().sort_values()
|
||||
|
||||
#print(df_LT_rx_delta_mean)
|
||||
x = np.linspace(0, 20, 100)
|
||||
plt.plot(x, np.sin(x))
|
||||
plt.show()'''
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
#debug_on = False
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='csv_processor.py',
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='''\
|
||||
Useful Information:
|
||||
''',
|
||||
|
||||
description='''quick_test.py:
|
||||
|
||||
''')
|
||||
|
||||
|
||||
parser.add_argument('-i','--infile', help="file of csv data", default='longevity_results_08_14_2020_14_37.csv')
|
||||
parser.add_argument('--debug', help='--debug: Enable debugging',default=True)
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
#debug_on = args.debug
|
||||
|
||||
if args.infile:
|
||||
csv_file_name = args.infile
|
||||
|
||||
L3CSVParcer(csv_file_name)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
46
py-scripts/scripts_deprecated/event_break_flood.py
Executable file
46
py-scripts/scripts_deprecated/event_break_flood.py
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import threading
|
||||
from event_flood import EventFlood
|
||||
from event_breaker import EventBreaker
|
||||
import importlib
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../../../")))
|
||||
|
||||
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
|
||||
LFCliBase = lfcli_base.LFCliBase
|
||||
|
||||
|
||||
def event_break(args):
|
||||
event_breaker = EventBreaker(host=args.mgr,
|
||||
port=args.mgr_port,
|
||||
duration='30s')
|
||||
event_breaker.create()
|
||||
event_breaker.run()
|
||||
event_breaker.cleanup()
|
||||
|
||||
|
||||
def event_flooding(args):
|
||||
event_flood = EventFlood(host=args.mgr,
|
||||
port=args.mgr_port,
|
||||
duration='30s',
|
||||
pause_ms=30)
|
||||
event_flood.create()
|
||||
event_flood.run()
|
||||
event_flood.cleanup()
|
||||
|
||||
|
||||
def main():
|
||||
parser = LFCliBase.create_bare_argparse()
|
||||
args = parser.parse_args()
|
||||
flood = threading.Thread(target=event_flooding(args))
|
||||
breaker = threading.Thread(target=event_break(args))
|
||||
|
||||
flood.start()
|
||||
breaker.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
129
py-scripts/scripts_deprecated/event_breaker.py
Executable file
129
py-scripts/scripts_deprecated/event_breaker.py
Executable file
@@ -0,0 +1,129 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
This file is intended to expose concurrency
|
||||
problems in the /events/ URL handler by querying events rapidly.
|
||||
Please use concurrently with event_flood.py.
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import importlib
|
||||
from datetime import datetime
|
||||
import pprint
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../../")))
|
||||
|
||||
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
|
||||
LFCliBase = lfcli_base.LFCliBase
|
||||
realm = importlib.import_module("py-json.realm")
|
||||
Realm = realm.Realm
|
||||
|
||||
|
||||
class EventBreaker(Realm):
|
||||
def __init__(self, host, port,
|
||||
duration=None,
|
||||
_debug_on=False,
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
super().__init__(host, port)
|
||||
self.counter = 0
|
||||
self.test_duration = duration
|
||||
if self.test_duration is None:
|
||||
raise ValueError("run wants numeric run_duration_sec")
|
||||
|
||||
def create(self):
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
|
||||
now = datetime.now()
|
||||
end_time = self.parse_time(self.test_duration) + now
|
||||
client_time_ms = 0
|
||||
loop_time_ms = 0
|
||||
num_events = 0
|
||||
while datetime.now() < end_time:
|
||||
bad_events = []
|
||||
start_loop_time_ms = int(self.get_milliseconds(datetime.now()))
|
||||
print('\r♦ ', end='')
|
||||
prev_client_time_ms = client_time_ms
|
||||
response = self.json_get("/events/all")
|
||||
if self.debug:
|
||||
pprint.pprint(response)
|
||||
|
||||
if "events" not in response:
|
||||
pprint.pprint(response)
|
||||
raise AssertionError("no events in response")
|
||||
events = response["events"]
|
||||
prev_num_events = num_events
|
||||
num_events = len(events)
|
||||
if num_events != prev_num_events:
|
||||
print("%s events Δ%s" % (num_events, (num_events - prev_num_events)))
|
||||
if "candela.lanforge.HttpEvents" in response:
|
||||
client_time_ms = float(response["candela.lanforge.HttpEvents"]["duration"])
|
||||
if self.debug:
|
||||
print(" client_time %d" % client_time_ms)
|
||||
|
||||
if abs(prev_client_time_ms - client_time_ms) > 30:
|
||||
print(" client time %d ms Δ%d" % (client_time_ms, (prev_client_time_ms - client_time_ms)),
|
||||
end='')
|
||||
event_index = 0
|
||||
for record in events:
|
||||
|
||||
for k in record.keys():
|
||||
if record[k] is None:
|
||||
print(' ☠no %s☠' % k, end='')
|
||||
continue
|
||||
if self.debug:
|
||||
pprint.pprint(record[k])
|
||||
if "NA" == record[k]["event"] \
|
||||
or "NA" == record[k]["name"] \
|
||||
or "NA" == record[k]["type"] \
|
||||
or "NA" == record[k]["priority"]:
|
||||
bad_events.append(int(k))
|
||||
pprint.pprint(record[k])
|
||||
if self.debug:
|
||||
print(" ☠id[%s]☠" % k, end='')
|
||||
if len(bad_events) > 0:
|
||||
pprint.pprint(events[event_index])
|
||||
print(" ☠id[%s]☠" % bad_events, end='')
|
||||
exit(1)
|
||||
event_index += 1
|
||||
prev_loop_time_ms = loop_time_ms
|
||||
now_ms = int(self.get_milliseconds(datetime.now()))
|
||||
loop_time_ms = now_ms - start_loop_time_ms
|
||||
if (prev_loop_time_ms - loop_time_ms) > 15:
|
||||
print(" loop time %d ms Δ%d "
|
||||
% (loop_time_ms, (prev_loop_time_ms - loop_time_ms)),
|
||||
end='')
|
||||
if (prev_loop_time_ms - loop_time_ms) > 30:
|
||||
print("")
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
parser = LFCliBase.create_bare_argparse(
|
||||
prog='event_breaker.py',
|
||||
description="""event breaker is meant to be used in conjunction with event flood
|
||||
Please use the event_break_flood.py script""")
|
||||
|
||||
parser.add_argument("--test_duration", help='test duration', default="30s")
|
||||
args = parser.parse_args()
|
||||
|
||||
event_breaker = EventBreaker(host=args.mgr,
|
||||
port=args.mgr_port,
|
||||
duration=args.test_duration,
|
||||
_debug_on=True,
|
||||
_exit_on_error=True,
|
||||
_exit_on_fail=True)
|
||||
event_breaker.create()
|
||||
event_breaker.run()
|
||||
event_breaker.cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
110
py-scripts/scripts_deprecated/event_flood.py
Executable file
110
py-scripts/scripts_deprecated/event_flood.py
Executable file
@@ -0,0 +1,110 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
This file is intended to expose concurrency
|
||||
problems in the /events/ URL handler by inserting events rapidly.
|
||||
Please concurrently use with event_breaker.py.
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import importlib
|
||||
from datetime import datetime
|
||||
from time import sleep
|
||||
import pprint
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../../")))
|
||||
|
||||
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
|
||||
LFCliBase = lfcli_base.LFCliBase
|
||||
realm = importlib.import_module("py-json.realm")
|
||||
Realm = realm.Realm
|
||||
|
||||
|
||||
class EventFlood(Realm):
|
||||
def __init__(self, host, port,
|
||||
duration=None,
|
||||
pause_ms=None,
|
||||
_debug_on=False,
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
super().__init__(host, port)
|
||||
self.counter = 0
|
||||
self.test_duration = duration
|
||||
self.pause_ms = pause_ms
|
||||
if self.test_duration is None:
|
||||
raise ValueError("run wants numeric run_duration_sec")
|
||||
|
||||
def create(self):
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
last_second_ms = 0
|
||||
start_time = datetime.now()
|
||||
end_time = self.parse_time(self.test_duration) + start_time
|
||||
client_time_ms = 0
|
||||
loop_time_ms = 0
|
||||
|
||||
while datetime.now() < end_time:
|
||||
sleep(self.pause_ms / 1000)
|
||||
start_loop_time_ms = int(self.get_milliseconds(datetime.now()))
|
||||
print('\r♦ ', end='')
|
||||
response_list = []
|
||||
self.json_post("/cli-json/add_event",
|
||||
{
|
||||
"event_id": "new",
|
||||
"details": "event_flood %d" % start_loop_time_ms,
|
||||
"priority": "INFO",
|
||||
"name": "custom"
|
||||
},
|
||||
response_json_list_=response_list)
|
||||
if self.debug:
|
||||
pprint.pprint(response_list)
|
||||
prev_client_time_ms = client_time_ms
|
||||
prev_loop_time_ms = loop_time_ms
|
||||
now = int(self.get_milliseconds(datetime.now()))
|
||||
loop_time_ms = now - start_loop_time_ms
|
||||
|
||||
client_time_ms = response_list[0]["LAST"]["duration"]
|
||||
if client_time_ms != prev_client_time_ms:
|
||||
print(" client %d ms %d" % (client_time_ms,
|
||||
(prev_client_time_ms - client_time_ms)),
|
||||
end='')
|
||||
if loop_time_ms != prev_loop_time_ms:
|
||||
print(" loop %d ms %d " % (loop_time_ms,
|
||||
(prev_loop_time_ms - loop_time_ms)),
|
||||
end='')
|
||||
if (last_second_ms + 1000) < now:
|
||||
last_second_ms = now
|
||||
print("")
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
parser = LFCliBase.create_bare_argparse(
|
||||
prog='event_breaker.py',
|
||||
description="""event flood is meant to be used in conjunction with event breaker
|
||||
Please use the event_break_flood.py script""")
|
||||
|
||||
parser.add_argument("--test_duration", help='test duration', default="30s")
|
||||
parser.add_argument("--pause_ms", help='interval between submitting events', default="30")
|
||||
args = parser.parse_args()
|
||||
|
||||
event_breaker = EventFlood(host=args.mgr,
|
||||
port=args.mgr_port,
|
||||
duration=args.test_duration,
|
||||
pause_ms=int(args.pause_ms),
|
||||
_debug_on=True,
|
||||
_exit_on_error=True,
|
||||
_exit_on_fail=True)
|
||||
event_breaker.create()
|
||||
event_breaker.run()
|
||||
event_breaker.cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
220
py-scripts/scripts_deprecated/ghost_profile.py
Executable file
220
py-scripts/scripts_deprecated/ghost_profile.py
Executable file
@@ -0,0 +1,220 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
NAME: ghost_profile.py
|
||||
PURPOSE: modify ghost database from the command line.
|
||||
SETUP: A Ghost installation which the user has admin access to.
|
||||
EXAMPLE: ./ghost_profile.py --article_text_file text.txt --title Test --authors Matthew --ghost_token SECRET_KEY --host 192.168.1.1
|
||||
|
||||
There is a specific class for uploading kpi graphs called kpi_to_ghost.
|
||||
|
||||
EXAMPLE: ./ghost_profile.py --ghost_token TOKEN --ghost_host 192.168.100.147
|
||||
--folders /home/lanforge/html-reports/wifi-capacity-2021-06-04-02-51-07
|
||||
--kpi_to_ghost appl --authors Matthew --title 'wifi capacity 2021 06 04 02 51 07' --server 192.168.93.51
|
||||
--user_pull lanforge --password_pull lanforge --customer candela --testbed heather --test_run test-run-6
|
||||
--user_push matt --password_push PASSWORD
|
||||
|
||||
EXAMPLE 2: ./ghost_profile.py --ghost_token TOKEN
|
||||
--ghost_host 192.168.100.147 --server 192.168.93.51 --customer candela
|
||||
--testbed heather --user_push matt --password_push "amount%coverage;Online" --kpi_to_ghost app
|
||||
--folders /home/lanforge/html-reports/wifi-capacity-2021-06-14-10-42-29 --grafana_token TOKEN
|
||||
--grafana_host 192.168.100.201
|
||||
|
||||
this script uses pyjwt. If you get the issue module 'jwt' has no attribute 'encode', run this: pip3 uninstall jwt pyjwt && pip install pyjwt
|
||||
Matthew Stidham
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import importlib
|
||||
import argparse
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
|
||||
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
||||
|
||||
# from GhostRequest import GhostRequest
|
||||
GhostRequest = importlib.import_module("py-dashboard.GhostRequest")
|
||||
|
||||
|
||||
class UseGhost(GhostRequest):
|
||||
def __init__(self,
|
||||
_ghost_token=None,
|
||||
host="localhost",
|
||||
port=8080,
|
||||
_debug_on=False,
|
||||
_exit_on_fail=False,
|
||||
_ghost_host="localhost",
|
||||
_ghost_port=2368,
|
||||
influx_host=None,
|
||||
influx_port=None,
|
||||
influx_org=None,
|
||||
influx_token=None,
|
||||
influx_bucket=None):
|
||||
super().__init__(_ghost_host,
|
||||
str(_ghost_port),
|
||||
_api_token=_ghost_token,
|
||||
influx_host=influx_host,
|
||||
influx_port=influx_port,
|
||||
influx_org=influx_org,
|
||||
influx_token=influx_token,
|
||||
influx_bucket=influx_bucket,
|
||||
debug_=_debug_on)
|
||||
self.ghost_host = _ghost_host
|
||||
self.ghost_port = _ghost_port
|
||||
self.ghost_token = _ghost_token
|
||||
self.influx_host = influx_host
|
||||
self.influx_port = influx_port
|
||||
self.influx_org = influx_org
|
||||
self.influx_token = influx_token
|
||||
self.influx_bucket = influx_bucket
|
||||
|
||||
def create_post_from_file(self, title, file, tags, authors):
|
||||
text = open(file).read()
|
||||
return self.create_post(title=title, text=text, tags=tags, authors=authors)
|
||||
|
||||
def kpi(self,
|
||||
authors,
|
||||
folders,
|
||||
parent_folder,
|
||||
title,
|
||||
server_pull,
|
||||
ghost_host,
|
||||
port,
|
||||
user_push,
|
||||
password_push,
|
||||
customer,
|
||||
testbed,
|
||||
test_run,
|
||||
grafana_token,
|
||||
grafana_host,
|
||||
grafana_port,
|
||||
datasource,
|
||||
grafana_bucket):
|
||||
target_folders = list()
|
||||
return self.kpi_to_ghost(authors,
|
||||
folders,
|
||||
parent_folder,
|
||||
title,
|
||||
server_pull,
|
||||
ghost_host,
|
||||
port,
|
||||
user_push,
|
||||
password_push,
|
||||
customer,
|
||||
testbed,
|
||||
test_run,
|
||||
target_folders,
|
||||
grafana_token,
|
||||
grafana_host,
|
||||
grafana_port,
|
||||
datasource,
|
||||
grafana_bucket)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='ghost_profile.py',
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='''Manage Ghost Website''',
|
||||
description='''
|
||||
ghost_profile.py
|
||||
----------------
|
||||
Command example:
|
||||
./ghost_profile.py
|
||||
--ghost_token'''
|
||||
)
|
||||
optional = parser.add_argument_group('optional arguments')
|
||||
optional.add_argument('--ghost_token', default=None)
|
||||
optional.add_argument('--create_post', default=None)
|
||||
optional.add_argument('--article_text_file', default=None)
|
||||
|
||||
optional.add_argument('--ghost_port', help='Ghost port if different from 2368', default=2368)
|
||||
optional.add_argument('--ghost_host', help='Ghost host if different from localhost', default='localhost')
|
||||
optional.add_argument('--article_text')
|
||||
optional.add_argument('--article_tags', action='append')
|
||||
optional.add_argument('--authors', action='append')
|
||||
optional.add_argument('--title', default=None)
|
||||
optional.add_argument('--image', default=None)
|
||||
optional.add_argument('--folder', default=None)
|
||||
optional.add_argument('--custom_post', default=None)
|
||||
optional.add_argument('--kpi_to_ghost', help='Generate a Ghost report from KPI spreadsheets', action="store_true")
|
||||
optional.add_argument('--folders', action='append', default=None)
|
||||
optional.add_argument('--server_pull')
|
||||
optional.add_argument('--port', default=22)
|
||||
optional.add_argument('--user_push')
|
||||
optional.add_argument('--password_push')
|
||||
optional.add_argument('--customer')
|
||||
optional.add_argument('--testbed')
|
||||
optional.add_argument('--test_run', default=None)
|
||||
optional.add_argument('--grafana_token', default=None)
|
||||
optional.add_argument('--grafana_host', default=None)
|
||||
optional.add_argument('--grafana_port', default=3000)
|
||||
optional.add_argument('--parent_folder', default=None)
|
||||
optional.add_argument('--datasource', default='InfluxDB')
|
||||
optional.add_argument('--grafana_bucket', default=None)
|
||||
optional.add_argument('--influx_host')
|
||||
optional.add_argument('--influx_token', help='Username for your Influx database')
|
||||
optional.add_argument('--influx_bucket', help='Password for your Influx database')
|
||||
optional.add_argument('--influx_org', help='Name of your Influx database')
|
||||
optional.add_argument('--influx_port', help='Port where your influx database is located', default=8086)
|
||||
optional.add_argument('--influx_tag', action='append', nargs=2,
|
||||
help='--influx_tag <key> <val> Can add more than one of these.')
|
||||
optional.add_argument('--influx_mgr',
|
||||
help='IP address of the server your Influx database is hosted if different from your LANforge Manager',
|
||||
default=None)
|
||||
optional.add_argument('--debug', help='Enable debugging', default=False, action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
Ghost = UseGhost(_ghost_token=args.ghost_token,
|
||||
_ghost_port=args.ghost_port,
|
||||
_ghost_host=args.ghost_host,
|
||||
influx_host=args.influx_host,
|
||||
influx_port=args.influx_port,
|
||||
influx_org=args.influx_org,
|
||||
influx_token=args.influx_token,
|
||||
influx_bucket=args.influx_bucket,
|
||||
_debug_on=args.debug)
|
||||
|
||||
if args.create_post is not None:
|
||||
Ghost.create_post(args.title, args.article_text, args.article_tags, args.authors)
|
||||
if args.article_text_file is not None:
|
||||
Ghost.create_post_from_file(args.title, args.article_text_file, args.article_tags, args.authors)
|
||||
|
||||
if args.image is not None:
|
||||
Ghost.upload_image(args.image)
|
||||
|
||||
if args.custom_post is not None:
|
||||
if args.folders is not None:
|
||||
Ghost.custom_post(args.folders, args.authors)
|
||||
else:
|
||||
Ghost.custom_post(args.folder, args.authors)
|
||||
else:
|
||||
if args.folder is not None:
|
||||
Ghost.upload_images(args.folder)
|
||||
|
||||
if args.kpi_to_ghost:
|
||||
Ghost.kpi(args.authors,
|
||||
args.folders,
|
||||
args.parent_folder,
|
||||
args.title,
|
||||
args.server_pull,
|
||||
args.ghost_host,
|
||||
args.port,
|
||||
args.user_push,
|
||||
args.password_push,
|
||||
args.customer,
|
||||
args.testbed,
|
||||
args.test_run,
|
||||
args.grafana_token,
|
||||
args.grafana_host,
|
||||
args.grafana_port,
|
||||
args.datasource,
|
||||
args.grafana_bucket)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
12
py-scripts/scripts_deprecated/lf_influx_db.json
Normal file
12
py-scripts/scripts_deprecated/lf_influx_db.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"influx_host":"192.168.100.201",
|
||||
"influx_port": "8086",
|
||||
"influx_org": "Candela",
|
||||
"influx_token": "-u_Wd-L8o992701QF0c5UmqEp7w7Z7YOMaWLxOMgmHfATJGnQbbmYyNxHBR9PgD6taM_tcxqJl6U8DjU1xINFQ==",
|
||||
"influx_bucket": "ben",
|
||||
"influx_tag": "testbed Ferndale-01"
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
53
py-scripts/scripts_deprecated/vap_stations_example.py
Executable file
53
py-scripts/scripts_deprecated/vap_stations_example.py
Executable file
@@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
This is an outdated example. Please see modern py-scripts/test_X example scripts.
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import importlib
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
|
||||
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
||||
|
||||
|
||||
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
|
||||
LFCliBase = lfcli_base.LFCliBase
|
||||
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
|
||||
# from generic_cx import GenericCx
|
||||
|
||||
mgrURL = "http://localhost:8080/"
|
||||
staName = "sta0"
|
||||
staNameUri = "port/1/1/" + staName
|
||||
|
||||
|
||||
class VapStations(LFCliBase):
|
||||
def __init__(self, lfhost, lfport):
|
||||
super().__init__(lfhost, lfport, _debug=False)
|
||||
super().check_connect()
|
||||
|
||||
def run(self):
|
||||
list_resp = self.json_get("/stations/list")
|
||||
list_map = self.response_list_to_map(list_resp, 'stations')
|
||||
# pprint.pprint(list_map)
|
||||
|
||||
attribs = ["ap", "capabilities", "tx rate", "rx rate", "signal"]
|
||||
for eid,record in list_map.items():
|
||||
# print("mac: %s" % mac)
|
||||
mac = record["station bssid"]
|
||||
station_resp = self.json_get("/stations/%s?fields=capabilities,tx+rate,rx+rate,signal,ap" % mac)
|
||||
print("Station %s:" %mac)
|
||||
#pprint.pprint(station_resp)
|
||||
for attrib in attribs:
|
||||
print(" %s: %s" % (attrib, station_resp["station"][attrib]))
|
||||
|
||||
|
||||
def main():
|
||||
vapsta_test = VapStations("localhost", 8080)
|
||||
vapsta_test.run()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user