mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
Merge branch 'master' of https://github.com/greearb/lanforge-scripts
This commit is contained in:
387
py-scripts/lf_dut_sta_vap_test.py
Normal file
387
py-scripts/lf_dut_sta_vap_test.py
Normal file
@@ -0,0 +1,387 @@
|
||||
#!/usr/bin/env python3
|
||||
'''
|
||||
This Scrip has two classes :
|
||||
1. LoadScenario : It will load the existing saved scenario to the Lanforge (Here used for Loading Bridged VAP)
|
||||
2. CreateSTA_CX : It will create stations and L3 Cross connects and start them
|
||||
3. Login_DUT : This class is specifically used to test the Linux based DUT that has SSH Server. It is used to read the CPU Core temperature during testing
|
||||
In this example, Another Lanforge is used as DUT
|
||||
It also have a function : GenerateReport that generates the report in xlsx format as well as it plots the Graph of throughput over time with temperature
|
||||
|
||||
|
||||
Example
|
||||
.\lf_dut_sta_vap_test.py --lf_host 192.168.200.15 --dut_host 192.168.200.18 --dut_radio wiphy1 --lf_radio wiphy1 --num_sta 1 --sta_id 1 --lf_ssid lanforge_ap --dut_ssid lexusap --security open --dut_upstream eth2 --lf_upstream eth1 --protocol lf_udp --min_bps 1000 --max_bps 10000 --time 1
|
||||
This Script is intended to automate the testing of DUT That has stations as well as AP.
|
||||
To automate the simultaenous testing and check the DUT Temperature
|
||||
'''
|
||||
|
||||
import sys
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append('../py-json')
|
||||
|
||||
import pexpect
|
||||
import argparse
|
||||
import time
|
||||
from LANforge import LFUtils
|
||||
from LANforge import lfcli_base
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
import realm
|
||||
from realm import VRProfile
|
||||
from realm import Realm
|
||||
import logging
|
||||
import paramiko as pm
|
||||
from paramiko.ssh_exception import NoValidConnectionsError as exception
|
||||
import xlsxwriter
|
||||
from bokeh.io import output_file, show
|
||||
from bokeh.plotting import figure
|
||||
from bokeh.models import LinearAxis, Range1d
|
||||
from bokeh.models import HoverTool
|
||||
|
||||
|
||||
|
||||
|
||||
class Login_DUT:
|
||||
|
||||
def __init__(self, threadID, name, HOST):
|
||||
self.threadID = threadID
|
||||
self.name = name
|
||||
self.host=HOST
|
||||
self.USERNAME = "lanforge"
|
||||
self.PASSWORD = "lanforge"
|
||||
self.CLIENT= pm.SSHClient()
|
||||
self.LF1= self.Connect()
|
||||
self.data_core1=[]
|
||||
self.data_core2=[]
|
||||
if self.CLIENT == 0:
|
||||
exit()
|
||||
print("Connected to " +HOST+" DUT to Measure the Core Temperature")
|
||||
def run(self):
|
||||
stdin, stdout, stderr= self.CLIENT.exec_command("sensors")
|
||||
out_lines = stdout.readlines()
|
||||
err_lines = stderr.readlines()
|
||||
print(out_lines[len(out_lines)-3],out_lines[len(out_lines)-2])
|
||||
self.data_core1.append(out_lines[len(out_lines)-3])
|
||||
self.data_core2.append(out_lines[len(out_lines)-2])
|
||||
|
||||
def Connect(self):
|
||||
self.CLIENT.load_system_host_keys()
|
||||
self.CLIENT.set_missing_host_key_policy(pm.AutoAddPolicy())
|
||||
try:
|
||||
self.CLIENT.connect(self.host, username=self.USERNAME, password=self.PASSWORD,timeout=10)
|
||||
return None
|
||||
except exception as error:
|
||||
self.CLIENT = 0;
|
||||
return None
|
||||
|
||||
|
||||
# Class to Load a Scenario that has been Created in Chamber View saved under DB/[Database_Name]
|
||||
class LoadScenario(LFCliBase):
|
||||
def __init__(self, host, port, db_name, security_debug_on=False, _exit_on_error=False,_exit_on_fail=False):
|
||||
super().__init__(host, port, _debug=security_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.json_post("/cli-json/load", { "name": db_name, "action": 'overwrite' })
|
||||
print("Scenario Loaded...")
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
class CreateSTA_CX(LFCliBase):
|
||||
|
||||
def __init__(self, host, port, radio, num_sta, sta_id, ssid, security, password, upstream, protocol, min_bps, max_bps, security_debug_on=True, _exit_on_error=True,_exit_on_fail=True):
|
||||
super().__init__(host, port, _debug=security_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
|
||||
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.radio = radio
|
||||
|
||||
self.num_sta = num_sta
|
||||
self.sta_id = sta_id
|
||||
|
||||
self.ssid = ssid
|
||||
|
||||
self.security = security
|
||||
self.password = password
|
||||
|
||||
self.upstream = upstream
|
||||
self.protocol = protocol
|
||||
|
||||
self.min_bps =min_bps
|
||||
self.max_bps =max_bps
|
||||
|
||||
#Creating a Realm Object
|
||||
self.local_realm = Realm(lfclient_host=host, lfclient_port=port)
|
||||
|
||||
#Creating Profile Objects
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.cx_profile = self.local_realm.new_l3_cx_profile()
|
||||
|
||||
#Setting CX Name
|
||||
self.cx_profile.name_prefix_="Connection"
|
||||
self.cx_names = []
|
||||
self.sta_list = []
|
||||
self.endp=[]
|
||||
for i in range(sta_id,sta_id+num_sta):
|
||||
self.sta_list.append("sta00")
|
||||
|
||||
#portDhcpUpRequest
|
||||
upstream_dhcp = LFRequest.LFRequest("http://"+str(host)+":"+str(port)+"/"+"/cli-form/set_port")
|
||||
upstream_dhcp.addPostData( LFUtils.portSetDhcpDownRequest(1, upstream))
|
||||
upstream_dhcp.formPost()
|
||||
time.sleep(2)
|
||||
upstream_dhcp.addPostData( LFUtils.portUpRequest(1, upstream))
|
||||
upstream_dhcp.formPost()
|
||||
print(upstream + "Set to DHCP For Cross Connects")
|
||||
|
||||
|
||||
def build(self):
|
||||
|
||||
#Creating Stations of Given Profile Settings
|
||||
self.station_profile.use_security(self.security, self.ssid, passwd=self.password)
|
||||
self.station_profile.create(self.radio,num_stations=self.num_sta, sta_names_=self.sta_list)
|
||||
self.station_profile.admin_up()
|
||||
#Wait for a while
|
||||
time.sleep(15)
|
||||
|
||||
#Setting up the Parameters for CX
|
||||
self.cx_profile.side_a_min_bps = self.min_bps
|
||||
self.cx_profile.side_b_min_bps = self.min_bps
|
||||
self.cx_profile.side_a_max_bps = self.max_bps
|
||||
self.cx_profile.side_b_max_bps = self.max_bps
|
||||
|
||||
self.cx_profile.side_a_min_pdu = 'Auto'
|
||||
self.cx_profile.side_b_min_pdu = 'Auto'
|
||||
self.cx_profile.report_timer = 1000
|
||||
self.cx_profile.side_a_min_pkt='Same'
|
||||
self.cx_profile.side_a_max_pkt='Same'
|
||||
|
||||
#Create Connections of Given Parameters
|
||||
self.cx_profile.create(self.protocol, side_a="1.1."+self.upstream, side_b=list(self.local_realm.find_ports_like("sta0+")))
|
||||
time.sleep(15)
|
||||
|
||||
|
||||
for i in self.cx_profile.get_cx_names():
|
||||
self.cx_names.append(i)
|
||||
for j in self.cx_names:
|
||||
x=self.local_realm.json_get("/cx/"+j)
|
||||
self.endp.append(x.get(j).get('endpoints')[1])
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def start(self):
|
||||
#self.station_profile.admin_up()
|
||||
|
||||
self.cx_profile.start_cx()
|
||||
time.sleep(5)
|
||||
return 0
|
||||
|
||||
def stop(self):
|
||||
self.cx_profile.stop_cx()
|
||||
time.sleep(5)
|
||||
self.lf_stations.admin_down()
|
||||
time.sleep(5)
|
||||
return 0
|
||||
|
||||
def cleanup(self):
|
||||
self.local_realm.cleanup_cxe_prefix(self.cx_profile.name_prefix)
|
||||
station_map = self.local_realm.find_ports_like("sta+")
|
||||
for eid,record in station_map.items():
|
||||
self.local_realm.remove_vlan_by_eid(eid)
|
||||
time.sleep(0.03)
|
||||
del_sta_names = []
|
||||
try:
|
||||
for eid,value in station_map.items():
|
||||
tname = eid[eid.rfind('.'):]
|
||||
del_sta_names.append(tname)
|
||||
except Exception as x:
|
||||
self.local_realm.error(x)
|
||||
try:
|
||||
LFUtils.waitUntilPortsDisappear(base_url=self.local_realm.lfclient_url, port_list=del_sta_names, debug=True)
|
||||
print("Ports Successfully Cleaned up")
|
||||
return 0
|
||||
except:
|
||||
print("Ports Successfully Cleaned up")
|
||||
time.sleep(5)
|
||||
return 0
|
||||
|
||||
def GenerateReport(throughput, core1_temp, core2_temp):
|
||||
workbook = xlsxwriter.Workbook('demo.xlsx')
|
||||
worksheet = workbook.add_worksheet()
|
||||
worksheet.write('A1', 'THROUGHPUT OVER TIME ')
|
||||
worksheet.write('B1', 'CORE 0 TEMP')
|
||||
worksheet.write('C1', 'CORE 1 TEMP')
|
||||
core1=[]
|
||||
core2=[]
|
||||
j=2
|
||||
for i in throughput:
|
||||
worksheet.write('A'+str(j), str(i/1000000)+" MBPS")
|
||||
j=j+1
|
||||
j=2
|
||||
for i in core1_temp:
|
||||
core1.append(int(str(i).split(':')[1].split('(')[0].split('.')[0].split('+')[1]))
|
||||
worksheet.write('B'+str(j),str(i).split(':')[1].split('(')[0] )
|
||||
j=j+1
|
||||
j=2
|
||||
for i in core2_temp:
|
||||
core2.append(int(str(i).split(':')[1].split('(')[0].split('.')[0].split('+')[1]))
|
||||
worksheet.write('C'+str(j), str(i).split(':')[1].split('(')[0])
|
||||
j=j+1
|
||||
workbook.close()
|
||||
|
||||
x=[]
|
||||
for i in range(0,5*len(throughput)):
|
||||
x.append(i)
|
||||
|
||||
'''
|
||||
y = throughput
|
||||
|
||||
TOOLTIPS = [
|
||||
("index", "$index"),
|
||||
("(Throughput,Time)", "($y, $x)"),
|
||||
|
||||
]
|
||||
|
||||
s1 = figure(y_range = (0,15000),tooltips = TOOLTIPS)
|
||||
|
||||
s1.line(y, x, color='#f45666')
|
||||
s1.circle(y, x, color='pink')
|
||||
|
||||
|
||||
s1.extra_y_ranges = {"NumStations":}
|
||||
|
||||
s1.add_layout(LinearAxis(y_range_name="NumStations"), 'right')
|
||||
s1.line(core1,x, y_range_name='NumStations', color='blue')
|
||||
s1.circle(core1, x, y_range_name='NumStations', color='black')
|
||||
|
||||
s1.line(core2,x, y_range_name='NumStations', color='blue')
|
||||
s1.circle(core2, x, y_range_name='NumStations', color='black')
|
||||
show(s1)
|
||||
'''
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(description="Test Scenario of DUT Temperature measurement along with simultaneous throughput")
|
||||
|
||||
parser.add_argument("-m", "--lf_host", type=str, help="Enter the address of LF which will test the DUT")
|
||||
parser.add_argument("-d", "--dut_host", type=str, help="Enter the address of LF Which is to be dut")
|
||||
|
||||
parser.add_argument("-lr", "--lf_radio", type=str, help="Enter the radio on which you want to create a station/s on (Lanforge Side)")
|
||||
parser.add_argument("-dr", "--dut_radio", type=str, help="Enter the radio on which you want to create a station/s on (DUT Side)")
|
||||
|
||||
parser.add_argument("-n", "--num_sta", type=int, help="Enter the Number of Stations You want to create")
|
||||
|
||||
parser.add_argument("-st", "--sta_id", type=int, help="Enter Station id [for sta001, enter 1]")
|
||||
|
||||
parser.add_argument("-ls", "--lf_ssid", type=str, help="Enter the ssid, with which you want to associate your stations (Enter the SSID of VAP in Lanforge)")
|
||||
parser.add_argument("-ds", "--dut_ssid", type=str, help="Enter the ssid, with which you want to associate your stations (Enter the SSID of VAP in DUT)")
|
||||
parser.add_argument("-sec", "--security", type=str, help="Enter the security type [open, wep, wpa, wpa2]")
|
||||
parser.add_argument("-p", "--password", type=str, help="Enter the password if security is not open")
|
||||
parser.add_argument("-lu", "--lf_upstream", type=str, help="Enter the upstream ethernet port")
|
||||
parser.add_argument("-du", "--dut_upstream", type=str, help="Enter the upstream ethernet port")
|
||||
parser.add_argument("-pr", "--protocol", type=str, help="Enter the protocol on which you want to run your connections [lf_udp, lf_tcp]")
|
||||
parser.add_argument("-minb", "--min_bps", type=str, help="Enter the Minimum Rate")
|
||||
parser.add_argument("-maxb", "--max_bps", type=str, help="Enter the Maximum Rate")
|
||||
parser.add_argument("-t", "--time", type=int, help="Enter the Time for which you want to run test (In Minutes)")
|
||||
|
||||
args = None
|
||||
|
||||
try:
|
||||
args = parser.parse_args()
|
||||
if (args.lf_host is not None):
|
||||
lf_host = args.lf_host
|
||||
if (args.dut_host is not None):
|
||||
dut_host = args.dut_host
|
||||
if (args.lf_radio is not None):
|
||||
lf_radio = args.lf_radio
|
||||
if (args.dut_radio is not None):
|
||||
dut_radio = args.dut_radio
|
||||
if (args.num_sta is not None):
|
||||
num_sta = args.num_sta
|
||||
if (args.sta_id is not None):
|
||||
sta_id = args.sta_id
|
||||
if (args.dut_ssid is not None):
|
||||
dut_ssid = args.dut_ssid
|
||||
if (args.lf_ssid is not None):
|
||||
lf_ssid = args.lf_ssid
|
||||
if (args.security is not None):
|
||||
security = args.security
|
||||
if (args.password is not None):
|
||||
password = args.password
|
||||
if (args.password is None):
|
||||
password = "[Blank]"
|
||||
if (args.lf_upstream is not None):
|
||||
lf_upstream = args.lf_upstream
|
||||
if (args.dut_upstream is not None):
|
||||
dut_upstream = args.dut_upstream
|
||||
if (args.protocol is not None):
|
||||
protocol = args.protocol
|
||||
if (args.min_bps is not None):
|
||||
min_bps = int(args.min_bps)*1000000
|
||||
if (args.max_bps is not None and args.max_bps is not "same"):
|
||||
max_bps = int(args.max_bps)*1000000
|
||||
if (args.max_bps is not None and args.max_bps is "same"):
|
||||
max_bps = args.min_bps
|
||||
if (args.time is not None):
|
||||
tme = (args.time * 60)/5
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
|
||||
exit(2)
|
||||
|
||||
DB_Lanforge_1 = "AUTO_CV-2020-09-18-07-41-19"
|
||||
#Loading the Scenario on Lanforge_1 (Here Considered as DUT) [Created VAP With SSID 'lexusap' on wiphy0 with eth1 as backhaul]
|
||||
Scenario_1 = LoadScenario(dut_host,8080,DB_Lanforge_1)
|
||||
|
||||
DB_Lanforge_2 = "AUTO_CV-2020-09-18-08-00-36"
|
||||
#Loading the Scenario on Lanforge_2 (Here Considered as LANFORGE Test) [Created VAP With SSID 'lanforge_ap' on wiphy0 with eth2 as backhaul]
|
||||
Scenario_2 = LoadScenario(lf_host,8080,DB_Lanforge_2)
|
||||
|
||||
#Create Station and cross connects on Lanforge_1 that connects on VAP on Lanforge_2
|
||||
DUT = CreateSTA_CX(dut_host,8080,dut_radio,num_sta, sta_id, lf_ssid, security, password, dut_upstream, protocol, min_bps, max_bps)
|
||||
#DUT.cleanup()
|
||||
DUT.build()
|
||||
DUT.start()
|
||||
|
||||
#Create Station and cross connects on Lanforge_2 that connects on VAP on Lanforge_1 (lexus_ap)
|
||||
LF = CreateSTA_CX(lf_host,8080,lf_radio,num_sta, sta_id, dut_ssid, security, password, lf_upstream, protocol, min_bps, max_bps)
|
||||
#LF.cleanup()
|
||||
LF.build()
|
||||
LF.start()
|
||||
|
||||
print("Collecting Throughput Values...")
|
||||
a = Login_DUT(1, "Thread-1", dut_host)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
csv_data =[]
|
||||
for i in range(0,int(tme)):
|
||||
temp=0
|
||||
for j in LF.endp:
|
||||
x=LF.local_realm.json_get("/endp/"+j).get('endpoint').get('rx rate')
|
||||
temp=temp+x
|
||||
for j in DUT.endp:
|
||||
y=DUT.local_realm.json_get("/endp/"+j).get('endpoint').get('rx rate')
|
||||
temp=temp+y
|
||||
csv_data.append(temp)
|
||||
a.run()
|
||||
print(temp)
|
||||
time.sleep(5)
|
||||
print(csv_data)
|
||||
DUT.cleanup()
|
||||
LF.cleanup()
|
||||
GenerateReport(csv_data,a.data_core1,a.data_core2)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user