Revert "Update py_scripts to be in line with pip standards"

This reverts commit 37df48594e.
This commit is contained in:
Chuck SmileyRekiere
2021-09-09 05:32:05 -06:00
parent 5328154e21
commit 743b5f56c9
31 changed files with 235 additions and 260 deletions

View File

@@ -27,42 +27,47 @@ import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py_json' not in sys.path:
sys.path.append('../py_json')
if 'py-json' not in sys.path:
sys.path.append('../py-json')
import argparse
import time
from LANforge import LFUtils
from LANforge import lfcli_base
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import *
import realm
from realm import Realm
import logging
import paramiko as pm
from paramiko.ssh_exception import NoValidConnectionsError as exception
import xlsxwriter
from bokeh.io import show
from bokeh.io import output_file, show
from bokeh.plotting import figure
from bokeh.models import LinearAxis, Range1d
from bokeh.models import HoverTool
from bokeh.layouts import row
# Specifically for Measuring CPU Core Temperatures
class Login_DUT:
def __init__(self, threadID, name, HOST):
self.threadID = threadID
self.name = name
self.host=HOST
self.USERNAME = "lanforge"
self.PASSWORD = "lanforge"
self.CLIENT= pm.SSHClient()
self.LF1= self.Connect()
self.data_core1=[]
self.data_core2=[]
if self.CLIENT == 0:
exit()
print("Connected to " +HOST+" DUT to Measure the Core Temperature")
self.threadID = threadID
self.name = name
self.host=HOST
self.USERNAME = "lanforge"
self.PASSWORD = "lanforge"
self.CLIENT= pm.SSHClient()
self.LF1= self.Connect()
self.data_core1=[]
self.data_core2=[]
if self.CLIENT == 0:
exit()
print("Connected to " +HOST+" DUT to Measure the Core Temperature")
def run(self):
stdin, stdout, stderr= self.CLIENT.exec_command("sensors")
out_lines = stdout.readlines()
@@ -70,14 +75,14 @@ class Login_DUT:
print(out_lines[len(out_lines)-3], out_lines[len(out_lines)-2])
self.data_core1.append(out_lines[len(out_lines)-3])
self.data_core2.append(out_lines[len(out_lines)-2])
def Connect(self):
self.CLIENT.load_system_host_keys()
self.CLIENT.set_missing_host_key_policy(pm.AutoAddPolicy())
try:
self.CLIENT.connect(self.host, username=self.USERNAME, password=self.PASSWORD,timeout=10)
return None
return None
except exception as error:
self.CLIENT = 0;
return None
@@ -95,14 +100,14 @@ class LoadScenario(LFCliBase):
# Class to create stations and run L3 Cross connects and run them for given time. It also stores the endpoint names for measuring throughput
class CreateSTA_CX(LFCliBase):
def __init__(self, host, port, radio, num_sta, sta_id, ssid, security, password, upstream, protocol, min_bps, max_bps, security_debug_on=True, _exit_on_error=True, _exit_on_fail=True):
super().__init__(host, port, _debug=security_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.radio = radio
self.num_sta = num_sta
self.sta_id = sta_id
@@ -131,7 +136,7 @@ class CreateSTA_CX(LFCliBase):
self.endp=[]
for i in range(sta_id,sta_id+num_sta):
self.sta_list.append("sta00")
#portDhcpUpRequest
'''
upstream_dhcp = LFRequest.LFRequest("http://"+str(host)+":"+str(port)+"/"+"/cli-form/set_port")
@@ -151,36 +156,36 @@ class CreateSTA_CX(LFCliBase):
self.station_profile.admin_up()
#Wait for a while
time.sleep(15)
#Setting up the Parameters for CX
self.cx_profile.side_a_min_bps = self.min_bps
self.cx_profile.side_b_min_bps = self.min_bps
self.cx_profile.side_a_max_bps = self.max_bps
self.cx_profile.side_b_max_bps = self.max_bps
self.cx_profile.side_a_min_pdu = 'Auto'
self.cx_profile.side_b_min_pdu = 'Auto'
self.cx_profile.report_timer = 1000
self.cx_profile.side_a_min_pkt='Same'
self.cx_profile.side_a_max_pkt='Same'
#Create Connections of Given Parameters
self.cx_profile.create(self.protocol, side_a="1.1."+self.upstream, side_b=list(self.local_realm.find_ports_like("sta0+")))
time.sleep(15)
# Getting all the Endpoint Names for measuring Throughput Later
for i in self.cx_profile.get_cx_names():
self.cx_names.append(i)
self.cx_names.append(i)
for j in self.cx_names:
x=self.local_realm.json_get("/cx/"+j)
self.endp.append(x.get(j).get('endpoints')[1])
#print(self.endp)
return 0
def start(self):
#self.station_profile.admin_up()
self.cx_profile.start_cx()
time.sleep(5)
return 0
@@ -195,7 +200,7 @@ class CreateSTA_CX(LFCliBase):
def cleanup(self):
# Removing Connections
self.local_realm.cleanup_cxe_prefix(self.cx_profile.name_prefix)
vap = self.local_realm.find_ports_like("vap+")
bridges = self.local_realm.find_ports_like("br+")
station_map = self.local_realm.find_ports_like("sta+")
@@ -207,7 +212,7 @@ class CreateSTA_CX(LFCliBase):
for eid,record in vap.items():
self.local_realm.remove_vlan_by_eid(eid)
time.sleep(0.03)
#Removing stations
for eid,record in station_map.items():
self.local_realm.remove_vlan_by_eid(eid)
@@ -223,13 +228,13 @@ class CreateSTA_CX(LFCliBase):
LFUtils.waitUntilPortsDisappear(base_url=self.local_realm.lfclient_url, port_list=del_sta_names, debug=True)
print("Ports Successfully Cleaned up")
return 0
except:
except:
print("Ports Successfully Cleaned up")
time.sleep(5)
return 0
# Generates XLSX Report
# Generates XLSX Report
def GenerateReport(throughput_sta, throughput_vap, core1_temp, core2_temp, duration,name):
workbook = xlsxwriter.Workbook(name)
worksheet = workbook.add_worksheet()
@@ -272,7 +277,7 @@ def GenerateReport(throughput_sta, throughput_vap, core1_temp, core2_temp, durat
# Plotting Function for Parameters
def plot(throughput_sta, throughput_vap, core1_temp, core2_temp, Time):
s1 = figure()
s1.title.text = "WIFI Throughput vs Temperature Plot"
s1.xaxis.axis_label = "Time in Seconds"
@@ -283,10 +288,10 @@ def plot(throughput_sta, throughput_vap, core1_temp, core2_temp, Time):
s1.line( Time, throughput_vap, color='blue')
#s1.circle(Time, throughput_vap, color='blue')
s1.extra_y_ranges = {"Temperature": Range1d(start=0, end=150)}
s1.add_layout(LinearAxis(y_range_name="Temperature", axis_label="Temperature in Degree Celsius"), 'right')
s1.line(Time, core1_temp, y_range_name='Temperature', color='red')
#s1.circle(Time, core1_temp, y_range_name='Temperature', color='red')
@@ -295,7 +300,7 @@ def plot(throughput_sta, throughput_vap, core1_temp, core2_temp, Time):
show(s1)
# Creates the Instance for LFCliBase
class VAP_Measure(LFCliBase):
def __init__(self, lfclient_host, lfclient_port):
@@ -307,7 +312,7 @@ class VAP_Measure(LFCliBase):
def main():
parser = argparse.ArgumentParser(description="Test Scenario of DUT Temperature measurement along with simultaneous throughput on VAP as well as stations")
parser.add_argument("-m", "--manager", type=str, help="Enter the address of Lanforge Manager (By default localhost)")
parser.add_argument("-sc", "--scenario", type=str, help="Enter the Name of the Scenario you want to load (by Default DFLT)")
parser.add_argument("-r", "--radio", type=str, help="Enter the radio on which you want to create a station/s on ")
@@ -323,112 +328,112 @@ def main():
parser.add_argument("-t", "--duration", type=int, help="Enter the Time for which you want to run test (In Minutes)")
parser.add_argument("-o", "--report_name", type=str, help="Enter the Name of the Output file ('Report.xlsx')")
args = None
try:
args = parser.parse_args()
# Lanforge Manager IP Address
if (args.manager is None):
manager = "localhost"
if (args.manager != None):
manager = args.manager
args = parser.parse_args()
# Lanforge Manager IP Address
if (args.manager is None):
manager = "localhost"
if (args.manager is not None):
manager = args.manager
# Scenario Name
if (args.scenario is not None):
scenario = args.scenario
# Radio Name
if (args.radio is not None):
radio = args.radio
# Number of Stations
if (args.num_sta is None):
num_sta = 0
if (args.num_sta is not None):
num_sta = args.num_sta
# Station ID
if (args.sta_id is None):
sta_id = '0'
if (args.sta_id is not None):
sta_id = args.sta_id
# SSID
if (args.ssid is not None):
ssid = args.ssid
if (args.ssid is not None):
ssid = args.ssid
# Scenario Name
if (args.scenario != None):
scenario = args.scenario
# Radio Name
if (args.radio != None):
radio = args.radio
# Security (Open by Default)
if (args.security is None):
security = 'open'
if (args.security is not None):
security = args.security
# Number of Stations
if (args.num_sta is None):
num_sta = 0
if (args.num_sta != None):
num_sta = args.num_sta
# Password (if Security is not Open)
if (args.password is not None):
password = args.password
if (args.password is 'open'):
password = "[Blank]"
if (args.password is None):
password = "[Blank]"
# Upstream Port (By default br0000)
if (args.upstream is None):
upstream = 'br0000'
if (args.upstream is not None):
upstream = args.upstream
# Protocol (By Default lf_udp)
if (args.protocol is not None):
protocol = args.protocol
if (args.protocol is None):
protocol = 'lf_udp'
#Min BPS
if (args.min_mbps is not None):
min_bps = int(args.min_mbps)*1000000
if (args.min_mbps is None):
min_bps = int(1000)*1000000
if (args.max_mbps is None ):
max_bps = int(1000)*1000000
# Station ID
if (args.sta_id is None):
sta_id = '0'
if (args.sta_id != None):
sta_id = args.sta_id
# SSID
if (args.ssid != None):
ssid = args.ssid
if (args.ssid != None):
ssid = args.ssid
# Security (Open by Default)
if (args.security is None):
security = 'open'
if (args.security != None):
security = args.security
# Password (if Security is not Open)
if (args.password != None):
password = args.password
if (args.password == 'open'):
password = "[Blank]"
if (args.password is None):
password = "[Blank]"
# Upstream Port (By default br0000)
if (args.upstream is None):
upstream = 'br0000'
if (args.upstream != None):
upstream = args.upstream
# Protocol (By Default lf_udp)
if (args.protocol != None):
protocol = args.protocol
if (args.protocol is None):
protocol = 'lf_udp'
#Min BPS
if (args.min_mbps != None):
min_bps = int(args.min_mbps)*1000000
if (args.min_mbps is None):
min_bps = int(1000)*1000000
if (args.max_mbps is None ):
max_bps = int(1000)*1000000
if (args.min_mbps != None):
min_bps = int(args.min_mbps)*1000000
if (args.max_mbps != None and args.max_mbps != "same"):
max_bps = int(args.max_mbps)*1000000
if (args.max_mbps != None and args.max_mbps == "same"):
max_bps = args.min_mbps
if (args.duration != None):
duration = (args.duration * 60)/5
if (args.report_name != None):
report_name = args.report_name
if (args.duration is None):
duration = (1 * 60)/5
if (args.report_name is None):
report_name = "report.xlsx"
if (args.min_mbps is not None):
min_bps = int(args.min_mbps)*1000000
if (args.max_mbps is not None and args.max_mbps is not "same"):
max_bps = int(args.max_mbps)*1000000
if (args.max_mbps is not None and args.max_mbps is "same"):
max_bps = args.min_mbps
if (args.duration is not None):
duration = (args.duration * 60)/5
if (args.report_name is not None):
report_name = args.report_name
if (args.duration is None):
duration = (1 * 60)/5
if (args.report_name is None):
report_name = "report.xlsx"
except Exception as e:
logging.exception(e)
exit(2)
logging.exception(e)
exit(2)
# Start DUT
#Loading the Scenario on Lanforge_1 (Here Considered as DUT) [Created VAP With SSID 'lexusap' on wiphy0 with eth1 as backhaul]
Scenario_1 = LoadScenario("192.168.200.18", 8080, "Lexus_DUT")
dut_traffic_profile = CreateSTA_CX("192.168.200.18", 8080, "wiphy1", 1, 0, 'lanforge_ap', 'open', password, 'br0000', 'lf_udp', min_bps, max_bps)
dut_traffic_profile.build()
print("DUT All Set... Lets setup Lanforge")
#Loading the Scenario on Lanforge_2 (Here Considered as LANFORGE Test) [Created VAP With SSID 'lanforge_ap' on wiphy0 with eth2 as backhaul]
DB_Lanforge_2 = "LANforge_TEST"
Scenario_2 = LoadScenario(manager, 8080, scenario)
lf_traffic_profile = CreateSTA_CX(manager, 8080, radio, num_sta, sta_id, ssid, security, password, upstream, protocol, min_bps, max_bps)
lf_traffic_profile.build()
@@ -451,7 +456,7 @@ def main():
#List for Storing the Total Throughput
throughput_sta =[]
throughput_vap =[]
# This loop will get the Data from All the endpoints and sum up to give total Throughput over time
for i in range(0,int(duration)):
temp=0
@@ -468,9 +473,9 @@ def main():
dut_traffic_profile.cleanup()
lf_traffic_profile.cleanup()
GenerateReport(throughput_sta, throughput_vap, dut_temp_obj.data_core1, dut_temp_obj.data_core2, duration, report_name)
if __name__ == '__main__':
main()