mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-02 19:58:03 +00:00
Create dut_to_grafana and fixing dependencies
Signed-off-by: Matthew Stidham <stidmatt@gmail.com>
This commit is contained in:
@@ -933,7 +933,7 @@ class Realm(LFCliBase):
|
||||
|
||||
def new_dut_profile(self, ver = 1):
|
||||
if ver == 1:
|
||||
dut_profile = dut_profile.DUTProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
|
||||
dut_profile = DUTProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
|
||||
# elif ver == 2:
|
||||
# import dut_profile2
|
||||
# dut_profile = dut_profile2.DUTProfile2(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
|
||||
|
||||
@@ -188,7 +188,7 @@ def main():
|
||||
""")
|
||||
parser.add_argument("-m", "--lfmgr", type=str,
|
||||
help="address of the LANforge GUI machine (localhost is default)")
|
||||
parser.add_argument("-o", "--port", type=int,
|
||||
parser.add_argument("-o", "--port", type=int, default=8080,
|
||||
help="IP Port the LANforge GUI is listening on (8080 is default)")
|
||||
parser.add_argument("-cs", "--create_scenario", "--create_lf_scenario", type=str,
|
||||
help="name of scenario to be created")
|
||||
|
||||
@@ -46,12 +46,10 @@ How to Run this:
|
||||
Output : DUT will be created in Chamber View
|
||||
"""
|
||||
|
||||
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
import time
|
||||
import re
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
@@ -63,6 +61,7 @@ if 'py-json' not in sys.path:
|
||||
from cv_dut_profile import cv_dut as dut
|
||||
from cv_test_manager import cv_test as cvtest
|
||||
|
||||
|
||||
class DUT(dut):
|
||||
def __init__(self,
|
||||
lfmgr="localhost",
|
||||
@@ -88,72 +87,47 @@ class DUT(dut):
|
||||
self.create_dut(dut_name=self.dut_name)
|
||||
|
||||
def add_ssids(self):
|
||||
flags = dict()
|
||||
flags['wep'] = 0x8
|
||||
flags['wpa'] = 0x10
|
||||
flags['wpa2'] = 0x20
|
||||
flags['wpa3'] = 0x100
|
||||
flags['11r'] = 0x200
|
||||
flags['eap-ttls'] = 0x400
|
||||
flags['eap-peap'] = 0x800
|
||||
if self.ssid:
|
||||
for j in range(len(self.ssid)):
|
||||
self.ssid[j] = self.ssid[j][0].split(' ')
|
||||
for k in range(len(self.ssid[j])):
|
||||
self.ssid[j][k] = self.ssid[j][k].split('=')
|
||||
d = dict()
|
||||
for item in self.ssid[j]:
|
||||
d[item[0].lower()] = item[1]
|
||||
self.ssid[j] = d
|
||||
self.ssid[j]['flag'] = []
|
||||
self.ssid[j].keys
|
||||
|
||||
for i in range(len(self.ssid)):
|
||||
if 'security' in self.ssid[j].keys():
|
||||
self.ssid[j]['security'] = self.ssid[j]['security'].split('|')
|
||||
for security in self.ssid[j]['security']:
|
||||
try:
|
||||
self.ssid[j]['flag'].append(flags[security.lower()])
|
||||
except:
|
||||
pass
|
||||
|
||||
if " " in self.ssid[i][0]:
|
||||
self.ssid[i][0] = (re.split(' ', self.ssid[i][0]))
|
||||
elif "," in self.ssid[i][0]:
|
||||
self.ssid[i][0] = (re.split(',', self.ssid[i][0]))
|
||||
elif ", " in self.ssid[i][0]:
|
||||
self.ssid[i][0] = (re.split(',', self.ssid[i][0]))
|
||||
elif " ," in self.ssid[i][0]:
|
||||
self.ssid[i][0] = (re.split(',', self.ssid[i][0]))
|
||||
else:
|
||||
print("Wrong arguments entered !")
|
||||
exit(1)
|
||||
|
||||
ssid_idx = 0
|
||||
ssid = "[BLANK]"
|
||||
passwd = "[BLANK]"
|
||||
bssid = "00:00:00:00:00:00"
|
||||
flag = 0x0
|
||||
|
||||
for j in range(len(self.ssid[i][0])):
|
||||
self.ssid[i][0][j] = self.ssid[i][0][j].split("=")
|
||||
for k in range(len(self.ssid[i][0][j])):
|
||||
name = self.ssid[i][0][j][k]
|
||||
if str(name) == "SSID" or str(name) == "ssid" or str(name) == "s":
|
||||
ssid = self.ssid[i][0][j][k + 1]
|
||||
elif str(name) == "PASSWORD" or str(name) == "password" or str(name) == "pass":
|
||||
passwd = self.ssid[i][0][j][k + 1]
|
||||
elif str(name) == "ssid_idx" or str(name) == "no" or str(name) == "N":
|
||||
ssid_idx = self.ssid[i][0][j][k + 1]
|
||||
elif str(name) == "security" or str(name) == "sec":
|
||||
if self.ssid[i][0][j][k + 1]:
|
||||
all_flags = self.ssid[i][0][j][k + 1].split("|")
|
||||
for flags in all_flags:
|
||||
if flags == "WEP" or flags == "wep":
|
||||
flag += 0x8
|
||||
if flags == "WPA" or flags == "wpa":
|
||||
flag += 0x10
|
||||
if flags == "WPA2" or flags == "wpa2":
|
||||
flag += 0x20
|
||||
if flags == "WPA3" or flags == "wpa3":
|
||||
flag += 0x100
|
||||
if flags == "11r":
|
||||
flag += 0x200
|
||||
if flags == "EAP-TTLS":
|
||||
flag += 0x400
|
||||
if flags == "EAP-PEAP":
|
||||
flag += 0x800
|
||||
elif str(name) == "BSSID" or str(name) == "bssid" or str(name) == "B":
|
||||
bssid = self.ssid[i][0][j][k + 1]
|
||||
else:
|
||||
continue
|
||||
if 'bssid' not in self.ssid[j].keys():
|
||||
self.ssid[j]['bssid'] = '00:00:00:00:00:00'
|
||||
|
||||
self.add_ssid(dut_name=self.dut_name,
|
||||
ssid_idx=ssid_idx,
|
||||
ssid=ssid,
|
||||
passwd=passwd,
|
||||
bssid=bssid,
|
||||
ssid_flags=flag,
|
||||
ssid_idx=self.ssid[j]['ssid_idx'],
|
||||
ssid=self.ssid[j]['ssid'],
|
||||
passwd=self.ssid[j]['password'],
|
||||
bssid=self.ssid[j]['bssid'],
|
||||
ssid_flags=self.ssid[j]['flag'],
|
||||
ssid_flags_mask=0xFFFFFFFF
|
||||
)
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="""
|
||||
@@ -170,7 +144,6 @@ def main():
|
||||
parser.add_argument("-s", "--ssid", action='append', nargs=1,
|
||||
help="SSID", default=[])
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
new_dut = DUT(args.lfmgr,
|
||||
args.port,
|
||||
@@ -185,5 +158,6 @@ def main():
|
||||
time.sleep(2)
|
||||
new_dut.cv_test.sync_cv()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
|
||||
Example
|
||||
.\Lexus_Final.py --lf_host 192.168.200.15 --dut_host 192.168.200.18 --dut_radio wiphy1 --lf_radio wiphy1 --num_sta 1 --sta_id 1 --lf_ssid lanforge_ap --dut_ssid lexusap --security open --dut_upstream eth2 --lf_upstream eth1 --protocol lf_udp --min_bps 1000 --max_bps 10000 --time 1
|
||||
This Script is intended to automate the testing of DUT That has stations as well as AP.
|
||||
This Script is intended to automate the testing of DUT that has stations as well as AP.
|
||||
To automate the simultaenous testing and check the DUT Temperature
|
||||
'''
|
||||
|
||||
|
||||
@@ -95,7 +95,7 @@ def main():
|
||||
description='''{file}
|
||||
--------------------
|
||||
Generic command layout:
|
||||
python ./{file} --dut [DUT name] # update existing DUT record
|
||||
python {file} --dut [DUT name] # update existing DUT record
|
||||
--entry [key,value] # update/add entry by specifying key and value
|
||||
--flag [flag,0|1] # toggle a flag on 1 or off 0
|
||||
--notes "all lines of text"
|
||||
@@ -117,7 +117,7 @@ python3 {file} --mgr 192.168.100.24 --update Pathfinder \
|
||||
'''.format(file=__file__, params=param_string, flags=flags_string),
|
||||
epilog="See",
|
||||
)
|
||||
parser.add_argument("-d", "--dut", type=str, help="name of DUT record")
|
||||
parser.add_argument("--dut", type=str, help="name of DUT record")
|
||||
parser.add_argument("-p", "--param", type=str, action="append", help="name,value pair to set parameter")
|
||||
parser.add_argument("-f", "--flag", type=str, action="append", help="name,1/0/True/False pair to turn parameter on or off")
|
||||
parser.add_argument("-n", "--notes", type=str, action="append", help="replace lines of notes in the record")
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
'''
|
||||
This script loads and builds a Chamber View Scenario, runs WiFi Capacity Test, and posts the results to Grafana
|
||||
'''
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
@@ -12,11 +14,7 @@ if 'py-json' not in sys.path:
|
||||
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
|
||||
sys.path.append(os.path.join(os.path.abspath('..'), 'py-dashboard'))
|
||||
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge import LFUtils
|
||||
import json
|
||||
from csv_to_grafana import CSVtoInflux
|
||||
from create_l3 import CreateL3
|
||||
|
||||
from lf_wifi_capacity_test import WiFiCapacityTest
|
||||
from cv_test_manager import *
|
||||
|
||||
@@ -36,7 +34,8 @@ def main():
|
||||
--influx_token
|
||||
--influx_bucket
|
||||
--target_csv
|
||||
--panel_name'''
|
||||
--panel_name
|
||||
--dut_name'''
|
||||
)
|
||||
|
||||
cv_add_base_parser(parser) # see cv_test_manager.py
|
||||
@@ -55,28 +54,13 @@ def main():
|
||||
help="Select requested upload rate. Kbps, Mbps, Gbps units supported. Default is 10Mbps")
|
||||
parser.add_argument("--sort", type=str, default="interleave",
|
||||
help="Select station sorting behaviour: none | interleave | linear Default is interleave.")
|
||||
parser.add_argument("-s", "--stations", type=str, default="",
|
||||
help="If specified, these stations will be used. If not specified, all available stations will be selected. Example: 1.1.sta001,1.1.wlan0,...")
|
||||
parser.add_argument("-cs", "--create_stations", default=False, action='store_true',
|
||||
help="create stations in lanforge (by default: False)")
|
||||
parser.add_argument('--a_min', help='--a_min bps rate minimum for side_a', default=256000)
|
||||
parser.add_argument('--b_min', help='--b_min bps rate minimum for side_b', default=256000)
|
||||
parser.add_argument('--number_template', help='Start the station numbering with a particular number. Default is 0000',
|
||||
default=0000)
|
||||
parser.add_argument('--mode', help='Used to force mode of stations')
|
||||
parser.add_argument('--ap', help='Used to force a connection to a particular AP')
|
||||
parser.add_argument("-radio", "--radio", default="wiphy0",
|
||||
help="create stations in lanforge at this radio (by default: wiphy0)")
|
||||
parser.add_argument("-ssid", "--ssid", default="",
|
||||
help="ssid name")
|
||||
parser.add_argument("-security", "--security", default="open",
|
||||
help="ssid Security type")
|
||||
parser.add_argument("-passwd", "--passwd", default="[BLANK]",
|
||||
help="ssid Password")
|
||||
parser.add_argument("--num_stations", default=2)
|
||||
parser.add_argument("--mgr_port", default=8080)
|
||||
parser.add_argument("--upstream_port", default="1.1.eth1")
|
||||
parser.add_argument("--debug", default=False)
|
||||
parser.add_argument("--scenario", help="", default=None)
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -100,12 +84,6 @@ def main():
|
||||
download_rate=args.download_rate,
|
||||
upload_rate=args.upload_rate,
|
||||
sort=args.sort,
|
||||
stations=args.stations,
|
||||
create_stations=args.create_stations,
|
||||
radio=args.radio,
|
||||
ssid=args.ssid,
|
||||
security=args.security,
|
||||
paswd=args.passwd,
|
||||
enables=args.enable,
|
||||
disables=args.disable,
|
||||
raw_lines=args.raw_line,
|
||||
|
||||
Reference in New Issue
Block a user