test_l3_scenario_throughut: Make file pip compliant

Signed-off-by: Matthew Stidham <stidmatt@gmail.com>
This commit is contained in:
Matthew Stidham
2021-11-24 13:44:54 -08:00
parent eb6d4a4e70
commit 661e427128

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
'''
"""
This Script Loads the Existing Scenario and Run the Simultaenous Throughput over time and Generate Report and Plot the Graph
This Script has three classes :
1. LoadScenario : It will load the existing saved scenario to the Lanforge (Here used for Loading Bridged VAP)
@@ -23,7 +23,7 @@
This Script is intended to automate the testing of DUT That has stations as well as AP.
To automate the simultaenous testing and check the DUT Temperature
'''
"""
import sys
import os
import importlib
@@ -31,7 +31,6 @@ import argparse
import time
import logging
import paramiko as pmgo
from paramiko.ssh_exception import NoValidConnectionsError as exception
import xlsxwriter
from bokeh.io import show
from bokeh.plotting import figure
@@ -43,7 +42,6 @@ if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
@@ -57,104 +55,96 @@ Realm = realm.Realm
class Login_DUT:
def __init__(self, threadID, name, HOST):
self.threadID = threadID
self.name = name
self.host=HOST
self.USERNAME = "lanforge"
self.PASSWORD = "lanforge"
self.CLIENT= pmgo.SSHClient()
self.LF1= self.Connect()
self.data_core1=[]
self.data_core2=[]
if self.CLIENT == 0:
exit()
print("Connected to " +HOST+" DUT to Measure the Core Temperature")
self.threadID = threadID
self.name = name
self.host = HOST
self.USERNAME = "lanforge"
self.PASSWORD = "lanforge"
self.CLIENT = pmgo.SSHClient()
self.data_core1 = []
self.data_core2 = []
if self.CLIENT == 0:
exit()
print("Connected to " + HOST + " DUT to Measure the Core Temperature")
self.Connect()
def run(self):
stdin, stdout, stderr= self.CLIENT.exec_command("sensors")
stdin, stdout, stderr = self.CLIENT.exec_command("sensors")
out_lines = stdout.readlines()
err_lines = stderr.readlines()
print(out_lines[len(out_lines)-3], out_lines[len(out_lines)-2])
self.data_core1.append(out_lines[len(out_lines)-3])
self.data_core2.append(out_lines[len(out_lines)-2])
print(out_lines[len(out_lines) - 3], out_lines[len(out_lines) - 2])
self.data_core1.append(out_lines[len(out_lines) - 3])
self.data_core2.append(out_lines[len(out_lines) - 2])
def Connect(self):
self.CLIENT.load_system_host_keys()
self.CLIENT.set_missing_host_key_policy(pmgo.AutoAddPolicy())
try:
self.CLIENT.connect(self.host, username=self.USERNAME, password=self.PASSWORD,timeout=10)
return None
except exception as error:
self.CLIENT = 0;
return None
self.CLIENT.connect(self.host, username=self.USERNAME, password=self.PASSWORD, timeout=10)
# Class to Load a Scenario that has been Created in Chamber View saved under DB/[Database_Name]
class LoadScenario(LFCliBase):
def __init__(self, host, port, db_name, security_debug_on=False, _exit_on_error=False,_exit_on_fail=False):
def __init__(self, host, port, db_name, security_debug_on=False, _exit_on_error=False, _exit_on_fail=False):
super().__init__(host, port, _debug=security_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.json_post("/cli-json/load", { "name": db_name, "action": 'overwrite' })
print(host+ " : Scenario Loaded...")
self.json_post("/cli-json/load", {"name": db_name, "action": 'overwrite'})
print(host + " : Scenario Loaded...")
time.sleep(2)
# Generates XLSX Report
def GenerateReport(scenario, detail, throughput_sta, throughput_vap, absolute_time, relative_time, core1_temp, core2_temp, duration, name):
def GenerateReport(scenario, detail, throughput_sta, throughput_vap, absolute_time, relative_time, core1_temp,
core2_temp, duration, name):
workbook = xlsxwriter.Workbook(name)
worksheet = workbook.add_worksheet()
worksheet.write('A1',"Scenario Runned: "+scenario+"\n Scenario Details: "+detail)
worksheet.write('A1', "Scenario Runned: " + scenario + "\n Scenario Details: " + detail)
worksheet.write('A2', 'ABSOLUTE TIME')
worksheet.write('B2', 'RELATIVE TIME (ms)')
worksheet.write('C2', 'THROUGHPUT STATION SIDE (Mbps)')
worksheet.write('D2', 'THROUGHPUT VAP SIDE (Mbps)')
worksheet.write('E2', 'CORE 0 TEMP (Degree Celsius)')
worksheet.write('F2', 'CORE 1 TEMP (Degree Celsius)')
core1=[]
core2=[]
core1 = []
core2 = []
j=3
j = 3
for i in absolute_time:
worksheet.write('A'+str(j),i)
j=j+1
worksheet.write('A' + str(j), i)
j = j + 1
j=3
j = 3
for i in relative_time:
worksheet.write('B'+str(j),i)
j=j+1
worksheet.write('B' + str(j), i)
j = j + 1
sta_throu=[]
vap_throu=[]
j=3
sta_throu = []
vap_throu = []
j = 3
for i in throughput_sta:
print(i)
sta_throu.append(i/1000000)
worksheet.write('C'+str(j), str(i/1000000)+" Mbps")
j=j+1
j=3
sta_throu.append(i / 1000000)
worksheet.write('C' + str(j), str(i / 1000000) + " Mbps")
j = j + 1
j = 3
for i in throughput_vap:
print(i)
vap_throu.append(i/1000000)
worksheet.write('D'+str(j), str(i/1000000)+" Mbps")
j=j+1
j=3
vap_throu.append(i / 1000000)
worksheet.write('D' + str(j), str(i / 1000000) + " Mbps")
j = j + 1
j = 3
for i in core1_temp:
core1.append(int(str(i).split(':')[1].split('(')[0].split('.')[0].split('+')[1]))
worksheet.write('E'+str(j),str(i).split(':')[1].split('(')[0] )
j=j+1
j=3
worksheet.write('E' + str(j), str(i).split(':')[1].split('(')[0])
j = j + 1
j = 3
for i in core2_temp:
core2.append(int(str(i).split(':')[1].split('(')[0].split('.')[0].split('+')[1]))
worksheet.write('F'+str(j), str(i).split(':')[1].split('(')[0])
j=j+1
worksheet.write('F' + str(j), str(i).split(':')[1].split('(')[0])
j = j + 1
Time =[]
for i in range(0,int(duration)*5):
Time = []
for i in range(0, int(duration) * 5):
Time.append(i)
plot(sta_throu, vap_throu, core1, core2, Time)
workbook.close()
@@ -162,27 +152,26 @@ def GenerateReport(scenario, detail, throughput_sta, throughput_vap, absolute_ti
# Plotting Function for Parameters
def plot(throughput_sta, throughput_vap, core1_temp, core2_temp, Time):
print(throughput_vap)
s1 = figure(plot_width=1000, plot_height=600)
s1.title.text = "WIFI Throughput vs Temperature Plot"
s1.xaxis.axis_label = "Time "
s1.yaxis.axis_label = "Throughput in Mbps"
s1.line( Time, throughput_sta, color='black', legend_label ="Throughput Over Station Connections ")
#s1.circle(Time, throughput_sta, color='red')
s1.line(Time, throughput_sta, color='black', legend_label="Throughput Over Station Connections ")
# s1.circle(Time, throughput_sta, color='red')
s1.line( Time, throughput_vap, color='blue', legend_label ="Throughput Over VAP ")
#s1.circle(Time, throughput_vap, color='blue')
s1.line(Time, throughput_vap, color='blue', legend_label="Throughput Over VAP ")
# s1.circle(Time, throughput_vap, color='blue')
s1.extra_y_ranges = {"Temperature": Range1d(start=0, end=150)}
s1.add_layout(LinearAxis(y_range_name="Temperature", axis_label="Temperature in Degree Celsius"), 'right')
s1.line(Time, core1_temp, y_range_name='Temperature', color='red', legend_label ="CPU CORE 0 TEMPERATURE ")
#s1.circle(Time, core1_temp, y_range_name='Temperature', color='red')
s1.line(Time, core1_temp, y_range_name='Temperature', color='red', legend_label="CPU CORE 0 TEMPERATURE ")
# s1.circle(Time, core1_temp, y_range_name='Temperature', color='red')
s1.line(Time, core2_temp, y_range_name='Temperature', color='green', legend_label ="CPU CORE 1 TEMPERATURE ")
#s1.circle(Time, core2_temp, y_range_name='Temperature', color='blue')
s1.line(Time, core2_temp, y_range_name='Temperature', color='green', legend_label="CPU CORE 1 TEMPERATURE ")
# s1.circle(Time, core2_temp, y_range_name='Temperature', color='blue')
show(s1)
@@ -193,46 +182,40 @@ class VAP_Measure(LFCliBase):
super().__init__(lfclient_host, lfclient_port)
# Added Standard Function to Fetch L3 CX and VAP Directly
class FindPorts(LFCliBase):
def __init__(self, host, port, security_debug_on=False, _exit_on_error=False,_exit_on_fail=False):
def __init__(self, host, port, security_debug_on=False, _exit_on_error=False, _exit_on_fail=False):
super().__init__(host, port, _debug=security_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
#Creating a Realm Object
# Creating a Realm Object
self.local_realm = Realm(lfclient_host=host, lfclient_port=port)
def FindExistingCX(self):
return self.local_realm.cx_list()
def FindExistingCX(self):
def FindVAP(self):
return self.local_realm.vap_list()
return self.local_realm.cx_list()
def FindVAP(self):
return self.local_realm.vap_list()
# Utility to Find the Traffic Running on Existing CX and VAP
def PortUtility(host, port, duration, report_name, scenario, detail):
lf_utils = FindPorts(host, port)
# cx data will be having all parameters of L3 Connections available in the Realm. It is needed to get the names of all L3 CX, which is stored in cx_names. It is required so as we can extract the real time data running on that CX
cx_data = lf_utils.FindExistingCX()
#print(cx_data)
# print(cx_data)
# vap_list will have the List of all the vap ports available, This is required to get the VAP names in order to fetch the throughput over that vap
vap_list =lf_utils.FindVAP()
vap_measure_obj=VAP_Measure(host,port)
hostname=socket.gethostbyname(socket.gethostname())
vap_list = lf_utils.FindVAP()
vap_measure_obj = VAP_Measure(host, port)
hostname = socket.gethostbyname(socket.gethostname())
dut_temp_obj = Login_DUT(1, "Thread-1", hostname)
#print(vap_list)
vap_names=[]
# print(vap_list)
vap_names = []
for i in vap_list:
vap_names.append(str(i.keys()).split('.')[2].split('\'')[0])
print(vap_names[0])
@@ -240,95 +223,69 @@ def PortUtility(host, port, duration, report_name, scenario, detail):
cx_names = list(cx_data.keys())
cx_names.remove('handler')
cx_names.remove('uri')
absolute_time=[]
temp_time =[]
absolute_time = []
temp_time = []
Total_Throughput_CX_Side =[]
Total_Throughput_VAP_Side =[]
print(lf_utils.local_realm.json_get("/cx/"+cx_names[0]).get(cx_names[0]).get('state'))
for i in cx_names:
while(lf_utils.local_realm.json_get("/cx/"+cx_names[0]).get(cx_names[0]).get('state') != 'Run'):
Total_Throughput_CX_Side = []
Total_Throughput_VAP_Side = []
print(lf_utils.local_realm.json_get("/cx/" + cx_names[0]).get(cx_names[0]).get('state'))
for _ in cx_names:
while lf_utils.local_realm.json_get("/cx/" + cx_names[0]).get(cx_names[0]).get('state') != 'Run':
continue
offset=int(round(time.time() * 1000))
for i in range(0,int(duration)):
temp=0
for i in cx_names:
temp=temp+int(lf_utils.local_realm.json_get("/cx/"+i).get(i).get('bps rx a'))
#temp=temp+lf_utils.local_realm.json_get("/cx/"+i).get(i).get('bps rx b')
for i in vap_names:
Total_Throughput_VAP_Side.append(int(vap_measure_obj.json_get("/port/1/1/"+str(i)).get('interface').get('bps rx')))
offset = int(round(time.time() * 1000))
for _ in range(0, int(duration)):
temp = 0
for _ in cx_names:
temp = temp + int(lf_utils.local_realm.json_get("/cx/" + i).get(i).get('bps rx a'))
# temp=temp+lf_utils.local_realm.json_get("/cx/"+i).get(i).get('bps rx b')
for _ in vap_names:
Total_Throughput_VAP_Side.append(
int(vap_measure_obj.json_get("/port/1/1/" + str(i)).get('interface').get('bps rx')))
absolute_time.append(datetime.now().strftime("%H:%M:%S"))
temp_time.append(int(round(time.time() * 1000)-offset))
temp_time.append(int(round(time.time() * 1000) - offset))
Total_Throughput_CX_Side.append(temp)
dut_temp_obj.run()
time.sleep(5)
relative_time=[]
relative_time.append(0)
for i in range (0,len(temp_time)-1):
relative_time.append(temp_time[i+1]-temp_time[i])
relative_time = [0]
for i in range(0, len(temp_time) - 1):
relative_time.append(temp_time[i + 1] - temp_time[i])
print(Total_Throughput_CX_Side)
print(Total_Throughput_VAP_Side)
GenerateReport(scenario, detail, Total_Throughput_CX_Side, Total_Throughput_VAP_Side, absolute_time, relative_time, dut_temp_obj.data_core1, dut_temp_obj.data_core2, duration, report_name)
GenerateReport(scenario, detail, Total_Throughput_CX_Side, Total_Throughput_VAP_Side, absolute_time, relative_time,
dut_temp_obj.data_core1, dut_temp_obj.data_core2, duration, report_name)
# main method
def main():
parser = argparse.ArgumentParser(
prog="test_l3_scenario_throughput.py",
formatter_class=argparse.RawTextHelpFormatter,
description="Test Scenario of DUT Temperature measurement along with simultaneous throughput on VAP as well as stations")
parser.add_argument("-m", "--manager", type=str, help="Enter the address of Lanforge Manager (By default localhost)")
parser.add_argument("-sc", "--scenario", type=str, help="Enter the Name of the Scenario you want to load (by Default DFLT)")
parser.add_argument("-m", "--manager", type=str,
help="Enter the address of Lanforge Manager (By default localhost)")
parser.add_argument("-sc", "--scenario", type=str,
help="Enter the Name of the Scenario you want to load (by Default DFLT)")
parser.add_argument("-t", "--duration", type=str, help="Enter the Time for which you want to run test")
parser.add_argument("-o", "--report_name", type=str, help="Enter the Name of the Output file ('Report.xlsx')")
parser.add_argument("-td", "--test_detail", type=str, help="Enter the Test Detail in Quotes ")
parser.add_argument("-o", "--report_name", type=str, help="Enter the Name of the Output file ('Report.xlsx')", default='report.xlsx')
parser.add_argument("-td", "--test_detail", type=str, help="Enter the Test Detail in Quotes ", default='Blank test')
args = None
args = parser.parse_args()
try:
args = parser.parse_args()
# Lanforge Manager IP Address
if (args.manager is None):
manager = "localhost"
if (args.manager is not None):
manager = args.manager
if (args.scenario is not None):
scenario = args.scenario
if (args.report_name is not None):
report_name = args.report_name
if (args.duration is None):
duration = (1 * 60)/5
if (args.report_name is None):
report_name = "report.xlsx"
if (args.test_detail is not None):
test_detail = args.test_detail
if (args.test_detail is None):
test_detail = "Blank test"
except Exception as e:
logging.exception(e)
exit(2)
hostname=socket.gethostbyname(socket.gethostname())
hostname = socket.gethostbyname(socket.gethostname())
# Loading DUT Scenario
Scenario_1 = LoadScenario("192.168.200.18", 8080, "Lexus_Dut")
# Loading LF Scenario
DB_Lanforge_2 = "LF_Device"
Scenario_2 = LoadScenario(manager, 8080, scenario)
#Wait for Sometime
Scenario_2 = LoadScenario(args.manager, 8080, args.scenario)
# Wait for Sometime
time.sleep(10)
duration_sec=Realm.parse_time(args.duration).total_seconds() * 60
duration_sec = Realm.parse_time(args.duration).total_seconds() * 60
# Port Utility function for reading CX and VAP
PortUtility(manager,8080, duration_sec, report_name, scenario, test_detail)
PortUtility(args.manager, 8080, duration_sec, args.report_name, args.scenario, args.test_detail)
if __name__ == '__main__':