old lanforge-scripts test

Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
shivamcandela
2021-06-21 00:20:38 +05:30
parent c777ea7b7f
commit e1474f11cf
38 changed files with 388 additions and 13495 deletions

View File

@@ -1,563 +0,0 @@
#!/usr/bin/python3
'''
NAME:
lf_check.py
PURPOSE:
lf_check.py will run a series of tests based on the test TEST_DICTIONARY listed in lf_check_config.ini.
The lf_check_config.ini file is copied from lf_check_config_template.ini and local configuration is made
to the lf_check_config.ini.
EXAMPLE:
lf_check.py
NOTES:
Before using lf_check.py
1. copy lf_check_config_template.ini to the lf_check_config.ini
2. update lf_check_config.ini to enable (TRUE) tests to be run in the TEST_DICTIONARY , the TEST_DICTIONARY needs to be passed in
'''
import datetime
import pprint
import sys
if sys.version_info[0] != 3:
print("This script requires Python3")
exit()
import os
import socket
import logging
import time
from time import sleep
import argparse
import json
import configparser
import subprocess
import csv
import shutil
import os.path
# lf_report is from the parent of the current file
dir_path = os.path.dirname(os.path.realpath(__file__))
parent_dir_path = os.path.abspath(os.path.join(dir_path,os.pardir))
sys.path.insert(0, parent_dir_path)
#sys.path.append('../')
from lf_report import lf_report
sys.path.append('/')
CONFIG_FILE = os.getcwd() + '/lf_check_config.ini'
RUN_CONDITION = 'ENABLE'
# setup logging FORMAT
FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s'
# lf_check class contains verificaiton configuration and ocastrates the testing.
class lf_check():
def __init__(self,
_csv_results,
_outfile):
self.lf_mgr_ip = ""
self.lf_mgr_port = ""
self.radio_dict = {}
self.test_dict = {}
path_parent = os.path.dirname(os.getcwd())
os.chdir(path_parent)
self.scripts_wd = os.getcwd()
self.results = ""
self.outfile = _outfile
self.test_result = "Failure"
self.results_col_titles = ["Test","Command","Result","STDOUT","STDERR"]
self.html_results = ""
self.background_green = "background-color:green"
self.background_red = "background-color:red"
self.background_purple = "background-color:purple"
self.http_test_ip = ""
self.ftp_test_ip = ""
self.test_ip = ""
# section TEST_GENERIC
self.radio_lf = ""
self.ssdi = ""
self.ssid_pw = ""
self.security = ""
self.num_sta = ""
self.col_names = ""
self.upstream_port = ""
self.csv_results = _csv_results
self.csv_results_file = ""
self.csv_results_writer = ""
self.csv_results_column_headers = ""
self.logger = logging.getLogger(__name__)
self.test_timeout = 120
self.use_blank_db = "FALSE"
self.use_factory_default_db = "FALSE"
self.use_custom_db = "FALSE"
self.production_run = "FALSE"
self.email_list_production = ""
self.host_ip_production = None
self.email_list_test = ""
self.host_ip_test = None
# NOT complete : will send the email results
def send_results_email(self, report_file=None):
if (report_file is None):
print( "No report file, not sending email.")
return
report_url=report_file.replace('/home/lanforge/', '')
if report_url.startswith('/'):
report_url = report_url[1:]
# Following recommendation
# NOTE: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-from-nic-in-python
#command = 'echo "$HOSTNAME mail system works!" | mail -s "Test: $HOSTNAME $(date)" chuck.rekiere@candelatech.com'
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
message_txt = """Results from {hostname}:\\n
http://{ip}/{report}\\n
NOTE: for now to see stdout and stderr remove /home/lanforge from path.\\n
""".format(hostname=hostname, ip=ip, report=report_url)
mail_subject = "Regression Test [{hostname}] {date}".format(hostname=hostname,
date=datetime.datetime.now())
try:
if self.production_run == "TRUE":
msg = message_txt.format(ip=self.host_ip_production)
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg,
subject=mail_subject,
ip=self.host_ip_production,
address=self.email_list_production)
else:
msg = message_txt.format(ip=ip)
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg,
subject=mail_subject,
ip=ip, #self.host_ip_test,
address=self.email_list_test)
print("running:[{}]".format(command))
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# have email on separate timeout
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
print("send email timed out")
process.terminate()
def get_csv_results(self):
return self.csv_file.name
def start_csv_results(self):
print("self.csv_results")
self.csv_results_file = open(self.csv_results, "w")
self.csv_results_writer = csv.writer(self.csv_results_file, delimiter=",")
self.csv_results_column_headers = ['Test','Command','Result','STDOUT','STDERR']
self.csv_results_writer.writerow(self.csv_results_column_headers)
self.csv_results_file.flush()
def get_html_results(self):
return self.html_results
def start_html_results(self):
self.html_results += """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: left;">
<th>Test</th>
<th>Command</th>
<th>Result</th>
<th>STDOUT</th>
<th>STDERR</th>
</tr>
</thead>
<tbody>
"""
def finish_html_results(self):
self.html_results += """
</tbody>
</table>
<br>
<br>
<br>
"""
# Functions in this section are/can be overridden by descendants
# This code reads the lf_check_config.ini file to populate the test variables
def read_config_contents(self):
self.logger.info("read_config_contents {}".format(CONFIG_FILE))
config_file = configparser.ConfigParser()
success = True
success = config_file.read(CONFIG_FILE)
self.logger.info("logger worked")
if 'LF_MGR' in config_file.sections():
section = config_file['LF_MGR']
self.lf_mgr_ip = section['LF_MGR_IP']
self.lf_mgr_port = section['LF_MGR_PORT']
self.logger.info("lf_mgr_ip {}".format(self.lf_mgr_ip))
self.logger.info("lf_mgr_port {}".format(self.lf_mgr_port))
if 'TEST_NETWORK' in config_file.sections():
section = config_file['TEST_NETWORK']
self.http_test_ip = section['HTTP_TEST_IP']
self.logger.info("http_test_ip {}".format(self.http_test_ip))
self.ftp_test_ip = section['FTP_TEST_IP']
self.logger.info("ftp_test_ip {}".format(self.ftp_test_ip))
self.test_ip = section['TEST_IP']
self.logger.info("test_ip {}".format(self.test_ip))
if 'TEST_GENERIC' in config_file.sections():
section = config_file['TEST_GENERIC']
self.radio_lf = section['RADIO_USED']
self.logger.info("radio_lf {}".format(self.radio_lf))
self.ssid = section['SSID_USED']
self.logger.info("ssid {}".format(self.ssid))
self.ssid_pw = section['SSID_PW_USED']
self.logger.info("ssid_pw {}".format(self.ssid_pw))
self.security = section['SECURITY_USED']
self.logger.info("secruity {}".format(self.security))
self.num_sta = section['NUM_STA']
self.logger.info("num_sta {}".format(self.num_sta))
self.col_names = section['COL_NAMES']
self.logger.info("col_names {}".format(self.col_names))
self.upstream_port = section['UPSTREAM_PORT']
self.logger.info("upstream_port {}".format(self.upstream_port))
if 'TEST_PARAMETERS' in config_file.sections():
section = config_file['TEST_PARAMETERS']
self.test_timeout = section['TEST_TIMEOUT']
self.use_blank_db = section['LOAD_BLANK_DB']
self.use_factory_default_db = section['LOAD_FACTORY_DEFAULT_DB']
self.use_custom_db = section['LOAD_CUSTOM_DB']
self.custom_db = section['CUSTOM_DB']
self.production_run = section['PRODUCTION_RUN']
self.email_list_production = section['EMAIL_LIST_PRODUCTION']
self.host_ip_production = section['HOST_IP_PRODUCTION']
self.email_list_test = section['EMAIL_LIST_TEST']
self.host_ip_test = section['HOST_IP_TEST']
if 'RADIO_DICTIONARY' in config_file.sections():
section = config_file['RADIO_DICTIONARY']
self.radio_dict = json.loads(section.get('RADIO_DICT', self.radio_dict))
self.logger.info("self.radio_dict {}".format(self.radio_dict))
if 'TEST_DICTIONARY' in config_file.sections():
section = config_file['TEST_DICTIONARY']
# for json replace the \n and \r they are invalid json characters, allows for multiple line args
try:
self.test_dict = json.loads(section.get('TEST_DICT', self.test_dict).replace('\n',' ').replace('\r',' '))
self.logger.info("TEST_DICTIONARY: {}".format(self.test_dict))
except:
self.logger.info("Excpetion loading TEST_DICTIONARY, is there comma after the last entry? Check syntax")
def load_factory_default_db(self):
#self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
# no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load FACTORY_DFLT")
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# wait for the process to terminate
out, err = process.communicate()
errcode = process.returncode
# Not currently used
def load_blank_db(self):
#self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
# no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load BLANK")
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
def load_custom_db(self,custom_db):
#self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
# no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load {}".format(custom_db))
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# wait for the process to terminate
out, err = process.communicate()
errcode = process.returncode
def run_script_test(self):
self.start_html_results()
self.start_csv_results()
for test in self.test_dict:
if self.test_dict[test]['enabled'] == "FALSE":
self.logger.info("test: {} skipped".format(test))
# load the default database
elif self.test_dict[test]['enabled'] == "TRUE":
# Make the command replace ment a separate method call.
# loop through radios
for radio in self.radio_dict:
# Replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values)
# not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues
# --num_stations needs to be int not string (no double quotes)
if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.radio_dict[radio]["KEY"],'--radio {} --ssid {} --passwd {} --security {} --num_stations {}'
.format(self.radio_dict[radio]['RADIO'],self.radio_dict[radio]['SSID'],self.radio_dict[radio]['PASSWD'],self.radio_dict[radio]['SECURITY'],self.radio_dict[radio]['STATIONS']))
if 'HTTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP',self.http_test_ip)
if 'FTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP',self.ftp_test_ip)
if 'TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP',self.test_ip)
if 'RADIO_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED',self.radio_lf)
if 'SSID_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED',self.ssid)
if 'SSID_PW_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED',self.ssid_pw)
if 'SECURITY_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED',self.security)
if 'NUM_STA' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA',self.num_sta)
if 'COL_NAMES' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES',self.col_names)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',self.col_names)
if self.use_factory_default_db == "TRUE":
self.load_factory_default_db()
sleep(3)
self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT")
if self.use_blank_db == "TRUE":
self.load_blank_db()
sleep(1)
self.logger.info("BLANK loaded between tests with scenario.py --load BLANK")
if self.use_custom_db == "TRUE":
try:
self.load_custom_db(self.custom_db)
sleep(1)
self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db,self.custom_db))
except:
self.logger.info("custom database failed to load check existance and location")
else:
self.logger.info("no db loaded between tests: {}".format(self.use_custom_db))
sleep(1) # the sleep is to allow for the database to stablize
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
cmd_args = "{}".format(self.test_dict[test]['args'])
command = "./{} {}".format(self.test_dict[test]['command'], cmd_args)
self.logger.info("command: {}".format(command))
self.logger.info("cmd_args {}".format(cmd_args))
if self.outfile is not None:
stdout_log_txt = self.outfile
stdout_log_txt = stdout_log_txt + "-{}-stdout.txt".format(test)
#self.logger.info("stdout_log_txt: {}".format(stdout_log_txt))
stdout_log = open(stdout_log_txt, 'a')
stderr_log_txt = self.outfile
stderr_log_txt = stderr_log_txt + "-{}-stderr.txt".format(test)
#self.logger.info("stderr_log_txt: {}".format(stderr_log_txt))
stderr_log = open(stderr_log_txt, 'a')
print("running {}".format(command))
process = subprocess.Popen((command).split(' '), shell=False, stdout=stdout_log, stderr=stderr_log, universal_newlines=True)
try:
#out, err = process.communicate()
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
process.terminate()
self.test_result = "TIMEOUT"
#if err:
# self.logger.info("command Test timed out: {}".format(command))
#self.logger.info(stderr_log_txt)
if(self.test_result != "TIMEOUT"):
stderr_log_size = os.path.getsize(stderr_log_txt)
if stderr_log_size > 0 :
self.logger.info("File: {} is not empty: {}".format(stderr_log_txt,str(stderr_log_size)))
self.test_result = "Failure"
background = self.background_red
else:
self.logger.info("File: {} is empty: {}".format(stderr_log_txt,str(stderr_log_size)))
self.test_result = "Success"
background = self.background_green
else:
self.logger.info("TIMEOUT FAILURE, Check LANforge Radios")
self.test_result = "Time Out"
background = self.background_purple
self.html_results += """
<tr><td>""" + str(test) + """</td><td class='scriptdetails'>""" + str(command) + """</td>
<td style="""+ str(background) + """>""" + str(self.test_result) + """
<td><a href=""" + str(stdout_log_txt) + """ target=\"_blank\">STDOUT</a></td>"""
if self.test_result == "Failure":
self.html_results += """<td><a href=""" + str(stderr_log_txt) + """ target=\"_blank\">STDERR</a></td>"""
elif self.test_result == "Time Out":
self.html_results += """<td><a href=""" + str(stderr_log_txt) + """ target=\"_blank\">STDERR</a></td>"""
#self.html_results += """<td></td>"""
else:
self.html_results += """<td></td>"""
self.html_results += """</tr>"""
row = [test,command,self.test_result,stdout_log_txt,stderr_log_txt]
self.csv_results_writer.writerow(row)
self.csv_results_file.flush()
#self.logger.info("row: {}".format(row))
self.logger.info("test: {} executed".format(test))
else:
self.logger.info("enable value {} invalid for test: {}, test skipped".format(self.test_dict[test]['enabled'],test))
self.finish_html_results()
def main():
# arguments
parser = argparse.ArgumentParser(
prog='lf_check.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
lf_check.py : for running scripts listed in lf_check_config.ini file
''',
description='''\
lf_check.py
-----------
Summary :
---------
for running scripts listed in lf_check_config.ini
''')
parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated", default="")
parser.add_argument('--logfile', help="--logfile <logfile Name> logging for output of lf_check.py script", default="lf_check.log")
args = parser.parse_args()
# output report.
report = lf_report(_results_dir_name="lf_check",
_output_html="lf_check.html",
_output_pdf="lf-check.pdf")
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
csv_results = "lf_check{}-{}.csv".format(args.outfile,current_time)
csv_results = report.file_add_path(csv_results)
outfile = "lf_check-{}-{}".format(args.outfile,current_time)
outfile_path = report.file_add_path(outfile)
# lf_check() class created
check = lf_check(_csv_results = csv_results,
_outfile = outfile_path)
# get the git sha
process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
(commit_hash, err) = process.communicate()
exit_code = process.wait()
git_sha = commit_hash.decode('utf-8','ignore')
# set up logging
logfile = args.logfile[:-4]
print("logfile: {}".format(logfile))
logfile = "{}-{}.log".format(logfile,current_time)
logfile = report.file_add_path(logfile)
print("logfile {}".format(logfile))
formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(logfile, "w")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler(sys.stdout)) # allows to logging to file and stdout
logger.info("commit_hash: {}".format(commit_hash))
logger.info("commit_hash2: {}".format(commit_hash.decode('utf-8','ignore')))
check.read_config_contents() # CMR need mode to just print out the test config and not run
check.run_script_test()
# Generate Ouptput reports
report.set_title("LF Check: lf_check.py")
report.build_banner()
report.start_content_div()
report.set_table_title("LF Check Test Results")
report.build_table_title()
report.set_text("git sha: {}".format(git_sha))
report.build_text()
html_results = check.get_html_results()
report.set_custom_html(html_results)
report.build_custom()
html_report = report.write_html_with_timestamp()
print("html report: {}".format(html_report))
report.write_pdf_with_timestamp()
report_path = os.path.dirname(html_report)
parent_report_dir = os.path.dirname(report_path)
# copy results to lastest so someone may see the latest.
lf_check_latest_html = parent_report_dir + "/lf_check_latest.html"
# duplicates html_report file up one directory
lf_check_html_report = parent_report_dir + "/{}.html".format(outfile)
#
banner_src_png = report_path + "/banner.png"
banner_dest_png = parent_report_dir + "/banner.png"
CandelaLogo_src_png = report_path + "/CandelaLogo2-90dpi-200x90-trans.png"
CandelaLogo_dest_png = parent_report_dir + "/CandelaLogo2-90dpi-200x90-trans.png"
report_src_css = report_path + "/report.css"
report_dest_css = parent_report_dir + "/report.css"
custom_src_css = report_path + "/custom.css"
custom_dest_css = parent_report_dir + "/custom.css"
font_src_woff = report_path + "/CenturyGothic.woff"
font_dest_woff = parent_report_dir + "/CenturyGothic.woff"
#pprint.pprint([
# ('banner_src', banner_src_png),
# ('banner_dest', banner_dest_png),
# ('CandelaLogo_src_png', CandelaLogo_src_png),
# ('CandelaLogo_dest_png', CandelaLogo_dest_png),
# ('report_src_css', report_src_css),
# ('custom_src_css', custom_src_css)
#])
# copy one directory above
shutil.copyfile(html_report, lf_check_latest_html)
shutil.copyfile(html_report, lf_check_html_report)
# copy banner and logo
shutil.copyfile(banner_src_png, banner_dest_png)
shutil.copyfile(CandelaLogo_src_png, CandelaLogo_dest_png)
shutil.copyfile(report_src_css, report_dest_css)
shutil.copyfile(custom_src_css, custom_dest_css)
shutil.copyfile(font_src_woff, font_dest_woff)
print("lf_check_latest.html: "+lf_check_latest_html)
print("lf_check_html_report: "+lf_check_html_report)
check.send_results_email(report_file=lf_check_html_report)
if __name__ == '__main__':
main()

View File

@@ -1,229 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv4_connection.py
PURPOSE:
This scripts functionality has been replaced by test_ip_connection.py, consider this script deprecated
test_ipv4_connection.py will create stations and attempt to connect to an SSID. WPA, WPA2, WPA3, WEP, and Open connection types are supported
Script for creating a variable number of stations and attempting to connect them to an SSID.
A test will run to verify stations are associated and get an IP, if these conditions are both true, the test will
pass, otherwise, the test will fail.
EXAMPLE:
./test_ipv4_connection.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security open --ssid netgear --passwd BLANK --debug
Use './test_ipv4_connection.py' --help to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
import argparse
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import LANforge
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import realm
import time
import pprint
class IPv4Test(LFCliBase):
def __init__(self,
_ssid=None,
_security=None,
_password=None,
_host=None,
_port=None,
_sta_list=None,
_number_template="00000",
_radio="wiphy0",
_proxy_str=None,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(_host,
_port,
_proxy_str=_proxy_str,
_local_realm=realm.Realm(lfclient_host=_host,
lfclient_port=_port,
_exit_on_error=_exit_on_error,
_exit_on_fail=_exit_on_fail,
_proxy_str=_proxy_str,
debug_=_debug_on),
_debug=_debug_on,
_exit_on_fail=_exit_on_fail)
self.host = _host
self.port = _port
self.ssid = _ssid
self.security = _security
self.password = _password
self.sta_list = _sta_list
self.radio = _radio
self.timeout = 120
self.number_template = _number_template
self.debug = _debug_on
self.station_profile = self.local_realm.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
if self.debug:
print("----- Station List ----- ----- ----- ----- ----- ----- \n")
pprint.pprint(self.sta_list)
print("---- ~Station List ----- ----- ----- ----- ----- ----- \n")
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
def start(self, sta_list, print_pass, print_fail):
self.station_profile.admin_up()
associated_map = {}
ip_map = {}
print("Starting test...")
for sec in range(self.timeout):
for sta_name in sta_list:
eidn = self.local_realm.name_to_eid(sta_name)
url = "/port/1/%s/%s" % (eidn[1], eidn[2])
sta_status = self.json_get(url + "?fields=port,alias,ip,ap", debug_=self.debug)
if self.debug:
print(sta_status)
try:
if (sta_status is None) or (sta_status['interface'] is None) or (sta_status['interface']['ap'] is None):
continue
except:
continue
if (len(sta_status['interface']['ap']) == 17) and (sta_status['interface']['ap'][-3] == ':'):
if self.debug:
print("Associated", sta_name, sta_status['interface']['ap'], sta_status['interface']['ip'])
associated_map[sta_name] = 1
if sta_status['interface']['ip'] != '0.0.0.0':
if self.debug:
print("IP", sta_name, sta_status['interface']['ap'], sta_status['interface']['ip'])
ip_map[sta_name] = 1
if (len(sta_list) == len(ip_map)) and (len(sta_list) == len(associated_map)):
break
else:
time.sleep(1)
if self.debug:
print("sta_list", len(sta_list), sta_list)
print("ip_map", len(ip_map), ip_map)
print("associated_map", len(associated_map), associated_map)
if (len(sta_list) == len(ip_map)) and (len(sta_list) == len(associated_map)):
self._pass("PASS: All stations associated with IP", print_pass)
else:
self._fail("FAIL: Not all stations able to associate/get IP", print_fail)
print("sta_list", sta_list)
print("ip_map", ip_map)
print("associated_map", associated_map)
return self.passes()
def stop(self):
# Bring stations down
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list, debug_=self.debug)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url,
port_list=sta_list,
debug=self.debug)
time.sleep(1)
def main():
parser = LFCliBase.create_basic_argparse(
prog='test_ipv4_connection.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations that attempt to authenticate, associate, and receive IP addresses on the
chosen SSID
''',
description='''\
test_ipv4_connection.py
--------------------
Command example:
./test_ipv4_connection.py
--upstream_port eth1
--radio wiphy0
--num_stations 3
--security open
--ssid netgear
--passwd BLANK
--debug
''')
required = parser.add_argument_group('required arguments')
#required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True)
args = parser.parse_args()
#if args.debug:
# pprint.pprint(args)
# time.sleep(5)
if (args.radio is None):
raise ValueError("--radio required")
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.port_name_series(prefix="sta",
start_id=0,
end_id=num_sta-1,
padding_number=10000,
radio=args.radio)
if args.debug:
print("args.proxy: %s" % args.proxy)
ip_test = IPv4Test(_host=args.mgr,
_port=args.mgr_port,
_ssid=args.ssid,
_password=args.passwd,
_security=args.security,
_sta_list=station_list,
_radio=args.radio,
_proxy_str=args.proxy,
_debug_on=args.debug)
ip_test.cleanup(station_list)
ip_test.build()
if not ip_test.passes():
print(ip_test.get_fail_message())
ip_test.add_event(name="test_ipv4_connection.py", message=ip_test.get_fail_message())
ip_test.exit_fail()
ip_test.start(station_list, False, False)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
ip_test.add_event(name="test_ipv4_connection.py", message=ip_test.get_fail_message())
ip_test.exit_fail()
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
ip_test.add_event(name="test_ipv4_connection.py", message="Full test passed, all stations associated and got IP")
ip_test.exit_success()
if __name__ == "__main__":
main()

View File

@@ -1,248 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv4_l4.py
PURPOSE:
test_ipv4_l4.py will create stations and endpoints to generate and verify layer-4 traffic
This script will monitor the bytes-rd attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
EXAMPLE:
./test_ipv4_l4.py --upstream_port eth1 (optional) --radio wiphy0 (required) --num_stations 3 (optional)
--security {open|wep|wpa|wpa2|wpa3} (required) --ssid netgear (required)
--url "dl http://10.40.0.1 /dev/null" (required) --password admin123 (required)
--test_duration 2m (optional) --debug (optional)
Use './test_ipv4_l4.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../../py-json')
import argparse
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import *
from LANforge import LFUtils
import argparse
import realm
import time
import datetime
class IPV4L4(LFCliBase):
def __init__(self, host, port, ssid, security, password, url,
station_list,
number_template="00000", radio="wiphy0",
test_duration="5m", upstream_port="eth1",
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.radio = radio
self.upstream_port = upstream_port
self.ssid = ssid
self.security = security
self.password = password
self.url = url
self.number_template = number_template
self.sta_list = station_list
self.test_duration = test_duration
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_l4_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
self.cx_profile.url = self.url
def __compare_vals(self, old_list, new_list):
passes = 0
expected_passes = 0
if len(old_list) == len(new_list):
for item, value in old_list.items():
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
# print(item, old_list[item], new_list[item], passes, expected_passes)
if passes == expected_passes:
return True
else:
return False
else:
return False
def __get_values(self):
time.sleep(1)
cx_list = self.json_get("layer4/list?fields=name,bytes-rd", debug_=self.debug)
# print("==============\n", cx_list, "\n==============")
cx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
for item, value in cx_name.items():
for value_name, value_rx in value.items():
if item in self.cx_profile.created_cx.keys() and value_name == 'bytes-rd':
cx_map[item] = value_rx
return cx_map
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None)
def start(self, print_pass=False, print_fail=False):
temp_stas = self.station_profile.station_names.copy()
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(temp_stas):
self._pass("All stations got IPs", print_pass)
else:
self._fail("Stations failed to get IPs", print_fail)
exit(1)
cur_time = datetime.datetime.now()
old_rx_values = self.__get_values()
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
self.cx_profile.start_cx()
passes = 0
expected_passes = 0
print("Starting Test...")
while cur_time < end_time:
interval_time = cur_time + datetime.timedelta(minutes=1)
while cur_time < interval_time:
cur_time = datetime.datetime.now()
time.sleep(1)
new_rx_values = self.__get_values()
# print(old_rx_values, new_rx_values)
# print("\n-----------------------------------")
# print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
# print("-----------------------------------\n")
expected_passes += 1
if self.__compare_vals(old_rx_values, new_rx_values):
passes += 1
else:
self._fail("FAIL: Not all stations increased traffic", print_fail)
break
old_rx_values = new_rx_values
cur_time = datetime.datetime.now()
if passes == expected_passes:
self._pass("PASS: All tests passes", print_pass)
def stop(self):
self.cx_profile.stop_cx()
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.cx_profile.cleanup()
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
lfjson_host = "localhost"
lfjson_port = 8080
parser = LFCliBase.create_basic_argparse(
prog='test_generic.py',
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create layer-4 endpoints and test that the bytes-rd from the chosen URL are increasing over the
duration of the test
''',
description='''\
test_ipv4_l4.py
--------------------
Generic command layout:
python ./test_ipv4_l4.py --upstream_port <port> --radio <radio 0> <stations> <ssid> <ssid password> <security type: wpa2, open, wpa3> --debug
Command Line Example:
python3 ./test_ipv4_l4.py
--upstream_port eth1 (optional)
--radio wiphy0 (required)
--num_stations 3 (optional)
--security {open|wep|wpa|wpa2|wpa3} (required)
--ssid netgear (required)
--url "dl http://10.40.0.1 /dev/null" (required)
--password admin123 (required)
--test_duration 2m (optional)
--debug (optional)
''')
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
parser.add_argument('--url', help='--url specifies upload/download, address, and dest',
default="dl http://10.40.0.1 /dev/null")
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.portNameSeries(prefix_="sta",
start_id_=0,
end_id_=num_sta - 1,
padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
ssid=args.ssid,
radio=args.radio,
password=args.passwd,
security=args.security,
station_list=station_list,
url=args.url,
test_duration=args.test_duration,
upstream_port=args.upstream_port,
_debug_on=args.debug)
ip_test.cleanup(station_list)
ip_test.build()
print('Stations built')
if not ip_test.passes():
print(ip_test.get_fail_message())
ip_test.exit_fail()
print('Starting Stations')
ip_test.start(False, False)
print('Stopping Stations')
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
ip_test.exit_fail()
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
print("Full test passed, all endpoints had increased bytes-rd throughout test duration")
ip_test.exit_success()
if __name__ == "__main__":
main()

View File

@@ -1,240 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv4_l4_ftp_upload.py
PURPOSE:
test_ipv4_l4_ftp_upload.py will create stations and endpoints to generate and verify layer-4 upload traffic over an ftp connection
This script will monitor the bytes-wr attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
EXAMPLE:
./test_ipv4_l4_ftp_upload.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --test_duration 2m --url "ul ftp://10.40.0.1 /dev/null" --requests_per_ten 600
--debug
Use './test_ipv4_l4_ftp_upload.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../../py-json')
import argparse
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import *
from LANforge import LFUtils
import realm
import time
import datetime
class IPV4L4(LFCliBase):
def __init__(self, host, port, ssid, security, password, url, requests_per_ten, station_list, number_template="00000",
upstream_port="eth1", radio="wiphy0",
test_duration="5m",
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.ssid = ssid
self.radio = radio
self.upstream_port = upstream_port
self.security = security
self.password = password
self.url = url
self.requests_per_ten = requests_per_ten
self.number_template = number_template
self.sta_list = station_list
self.test_duration = test_duration
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_l4_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
print("##### station_profile.lfclient_url: {}".format(self.station_profile.lfclient_url))
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
self.cx_profile.url = self.url
self.cx_profile.requests_per_ten = self.requests_per_ten
self.port_util = realm.PortUtils(self.local_realm)
def __compare_vals(self, old_list, new_list):
passes = 0
expected_passes = 0
if len(old_list) == len(new_list):
for item, value in old_list.items():
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
# print(item, old_list[item], new_list[item], passes, expected_passes)
if passes == expected_passes:
return True
else:
return False
else:
return False
def __get_values(self):
time.sleep(1)
cx_list = self.json_get("layer4/list?fields=name,bytes-rd", debug_=self.debug)
# print("==============\n", cx_list, "\n==============")
cx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
for item, value in cx_name.items():
for value_name, value_rx in value.items():
if item in self.cx_profile.created_cx.keys() and value_name == 'bytes-wr':
cx_map[item] = value_rx
return cx_map
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug, suppress_related_commands_=True)
def start(self, print_pass=False, print_fail=False):
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=True)
temp_stas = self.sta_list.copy()
self.station_profile.admin_up()
# temp_stas.append(self.local_realm.name_to_eid(self.upstream_port)[2])
if self.local_realm.wait_for_ip(temp_stas):
self._pass("All stations got IPs", print_pass)
else:
self._fail("Stations failed to get IPs", print_fail)
exit(1)
cur_time = datetime.datetime.now()
old_rx_values = self.__get_values()
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
self.cx_profile.start_cx()
passes = 0
expected_passes = 0
print("Starting Test...")
while cur_time < end_time:
interval_time = cur_time + datetime.timedelta(minutes=1)
while cur_time < interval_time:
cur_time = datetime.datetime.now()
time.sleep(1)
new_rx_values = self.__get_values()
# print(old_rx_values, new_rx_values)
# print("\n-----------------------------------")
# print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
# print("-----------------------------------\n")
expected_passes += 1
if self.__compare_vals(old_rx_values, new_rx_values):
passes += 1
else:
self._fail("FAIL: Not all stations increased traffic", print_fail)
break
old_rx_values = new_rx_values
cur_time = datetime.datetime.now()
if passes == expected_passes:
self._pass("PASS: All tests passes", print_pass)
def stop(self):
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=False)
self.cx_profile.stop_cx()
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.cx_profile.cleanup()
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
lfjson_port = 8080
parser = LFCliBase.create_basic_argparse(
prog='test_ipv4_l4_ftp',
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create layer-4 endpoints to an ftp server and test that the bytes-wr are increasing over the duration
of the test
''',
description='''\
test_ipv4_l4_ftp_upload.py
--------------------
Generic command example:
./test_ipv4_l4_ftp_upload.py --upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--test_duration 2m \\
--url "ul ftp://10.40.0.1 /dev/null" \\
--requests_per_ten 600 \\
--debug
''')
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
parser.add_argument('--url', help='--url specifies upload/download, address, and dest',
default="ul ftp://10.40.0.1 /dev/null")
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
ssid=args.ssid,
password=args.passwd,
security=args.security,
station_list=station_list,
url=args.url,
test_duration=args.test_duration,
requests_per_ten=args.requests_per_ten,
_debug_on=args.debug,
upstream_port=args.upstream_port)
ip_test.cleanup(station_list)
ip_test.build()
if not ip_test.passes():
print(ip_test.get_fail_message())
exit(1)
ip_test.start(False, False)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
exit(1)
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
print("Full test passed, all endpoints had increased bytes-rd throughout test duration")
if __name__ == "__main__":
main()

View File

@@ -1,251 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv4_l4_ftp_urls_per_ten.py
PURPOSE:
test_ipv4_l4_ftp_urls_per_ten.py will create stations and endpoints to generate and verify layer-4 traffic over an ftp connection
This script will monitor the urls/s attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
EXAMPLE:
./test_ipv4_l4_ftp_urls_per_ten.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --test_duration 2m --interval 1s --mode 1 --ap "00:0e:8e:78:e1:76" --requests_per_ten 600
--num_tests 1 --url "ul ftp://lanforge:lanforge@10.40.0.1/example.txt /home/lanforge/example.txt" --debug
Use './test_ipv4_l4_ftp_urls_per_ten.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import argparse
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import *
from LANforge import LFUtils
import realm
import time
import datetime
class IPV4L4(LFCliBase):
def __init__(self, ssid, security, password, url, requests_per_ten, station_list,test_duration="2m",host="localhost", port=8080,
number_template="00000", num_tests=1, radio="wiphy0", mode=0, ap=None,
_debug_on=False, upstream_port="eth1",
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host,
port,
_debug=_debug_on,
_local_realm = realm.Realm(lfclient_host=host, lfclient_port=port),
_exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.radio = radio
self.upstream_port = upstream_port
self.ssid = ssid
self.security = security
self.password = password
self.mode = mode
self.ap=ap
self.url = url
self.requests_per_ten = requests_per_ten
self.number_template = number_template
self.sta_list = station_list
self.num_tests = num_tests
self.target_requests_per_ten = requests_per_ten
self.test_duration = test_duration
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_l4_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = self.mode
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap",self.ap)
self.cx_profile.url = self.url
print(self.cx_profile.url)
self.cx_profile.requests_per_ten = self.requests_per_ten
self.port_util = realm.PortUtils(self.local_realm)
def __check_request_rate(self):
endp_list = self.json_get("layer4/list?fields=urls/s")
expected_passes = 0
passes = 0
if endp_list is not None and endp_list['endpoint'] is not None:
endp_list = endp_list['endpoint']
for item in endp_list:
for name, info in item.items():
if name in self.cx_profile.created_cx.keys():
expected_passes += 1
if info['urls/s'] * self.requests_per_ten >= self.target_requests_per_ten * .9:
# print(name, info['urls/s'], info['urls/s'] * self.requests_per_ten, self.target_requests_per_ten * .9)
passes += 1
return passes == expected_passes
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug, suppress_related_commands_=None)
def start(self, print_pass=False, print_fail=False):
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=True)
temp_stas = self.sta_list.copy()
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(temp_stas):
self._pass("All stations got IPs", print_pass)
else:
self._fail("Stations failed to get IPs", print_fail)
exit(1)
self.cx_profile.start_cx()
print("Starting test")
curr_time = datetime.datetime.now()
end_time = self.local_realm.parse_time(self.test_duration) + curr_time
sleep_interval = self.local_realm.parse_time(self.test_duration) // 5
passes = 0
expected_passes = 0
for test in range(self.num_tests):
expected_passes += 1
while curr_time < end_time:
time.sleep(sleep_interval.total_seconds())
curr_time = datetime.datetime.now()
if self.cx_profile.check_errors(self.debug):
if self.__check_request_rate():
passes += 1
else:
self._fail("FAIL: Request rate did not exceed 90% target rate", print_fail)
break
else:
self._fail("FAIL: Errors found getting to %s " % self.url, print_fail)
break
if passes == expected_passes:
self._pass("PASS: All tests passes", print_pass)
def stop(self):
self.cx_profile.stop_cx()
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=False)
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.cx_profile.cleanup()
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
lfjson_port = 8080
parser = LFCliBase.create_basic_argparse(
prog='test_ipv4_l4_urls_per_ten',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create layer-4 endpoints to connect to an ftp server and test that urls/s are meeting or exceeding the target rate
''',
description='''\
test_ipv4_l4_ftp_urls_per_ten.py
--------------------
Generic command example:
python3 ./test_ipv4_l4_ftp_urls_per_ten.py --upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--test_duration 2m \\ {2m | 30s | 3h | 1d ...etc}
--interval 1s \\
--mode 1
{"auto" : "0",
"a" : "1",
"b" : "2",
"g" : "3",
"abg" : "4",
"abgn" : "5",
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
"bgnAX" : "13",
--ap "00:0e:8e:78:e1:76"
--requests_per_ten 600 \\
--num_tests 1 \\
--url "ul ftp://lanforge:lanforge@10.40.0.1/example.txt /home/lanforge/example.txt"
--debug
''')
optional = parser.add_argument_group('optional arguments')
required = parser.add_argument_group('required arguments')
required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True)
parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
parser.add_argument('--test_duration', help='--test duration of a single test', default=600)
parser.add_argument('--num_tests', help='--num_tests number of tests to run. Each test runs 10 minutes', default=1)
parser.add_argument('--url', help='--url specifies upload/download, address, and dest',
default="dl http://10.40.0.1 /dev/null")
optional.add_argument('--mode',help='Used to force mode of stations')
optional.add_argument('--ap',help='Used to force a connection to a particular AP')
#parser.add_argument('--target_per_ten', help='--target_per_ten target number of request per ten minutes. test will check for 90 percent of this value',
#default=600)
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
ssid=args.ssid,
password=args.passwd,
upstream_port=args.upstream_port,
radio= args.radio,
security=args.security,
station_list=station_list,
url=args.url,
mode=args.mode,
ap=args.ap,
num_tests=args.num_tests,
requests_per_ten=args.requests_per_ten,
test_duration=args.test_duration,
_debug_on=args.debug)
ip_test.cleanup(station_list)
ip_test.build()
ip_test.start()
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
exit(1)
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
print("Full test passed, all endpoints met or exceeded 90 percent of the target rate")
if __name__ == "__main__":
main()

View File

@@ -1,254 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv4_l4_ftp_wifi.py
PURPOSE:
test_ipv4_l4_ftp_wifi.py will create stations and endpoints to generate and verify layer-4 traffic over an ftp connection
This script will monitor the bytes-wr attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
EXAMPLE:
./test_ipv4_l4_ftp_wifi.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --dest 10.40.0.1 --test_duration 2m --interval 1s --requests_per_ten 600
--dest /var/www/html/data_slug_4K.bin --source /tmp/data_slug_4K.bin --ftp_user lanforge --ftp_passwd lanforge
--debug
Use './test_ipv4_l4_ftp_wifi.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../../py-json')
import argparse
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import *
from LANforge import LFUtils
import realm
import time
import datetime
class IPV4L4(LFCliBase):
def __init__(self, host, port, ssid, security, password, requests_per_ten, station_list, number_template="00000",
upstream_port="eth1", radio="wiphy0", ftp_user="localhost", ftp_passwd="localhost",
source="", dest="",
test_duration="5m",
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.ssid = ssid
self.radio = radio
self.upstream_port = upstream_port
self.security = security
self.password = password
self.requests_per_ten = requests_per_ten
self.number_template = number_template
self.sta_list = station_list
self.test_duration = test_duration
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_http_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.ftp_user = ftp_user
self.ftp_passwd = ftp_passwd
self.source = source
self.dest = dest
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
self.cx_profile.requests_per_ten = self.requests_per_ten
self.port_util = realm.PortUtils(self.local_realm)
def __compare_vals(self, old_list, new_list):
passes = 0
expected_passes = 0
if len(old_list) == len(new_list):
for item, value in old_list.items():
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
# print(item, old_list[item], new_list[item], passes, expected_passes)
if passes == expected_passes:
return True
else:
return False
else:
return False
def __get_values(self):
time.sleep(1)
cx_list = self.json_get("layer4/list?fields=name,bytes-wr", debug_=self.debug)
# print("==============\n", cx_list, "\n==============")
cx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
for item, value in cx_name.items():
for value_name, value_rx in value.items():
if item in self.cx_profile.created_cx.keys() and value_name == 'bytes-wr':
cx_map[item] = value_rx
return cx_map
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(self.sta_list):
self._pass("All stations got IPs")
else:
self._fail("Stations failed to get IPs")
exit(1)
self.cx_profile.direction = "ul"
self.cx_profile.dest = self.dest
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=True, ftp=True, user=self.ftp_user, passwd=self.ftp_passwd,
source=self.source)
def start(self, print_pass=False, print_fail=False):
print("Starting Test...")
self.cx_profile.start_cx()
cur_time = datetime.datetime.now()
old_rx_values = self.__get_values()
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
passes = 0
expected_passes = 0
while cur_time < end_time:
interval_time = cur_time + datetime.timedelta(minutes=1)
while cur_time < interval_time:
cur_time = datetime.datetime.now()
time.sleep(1)
new_rx_values = self.__get_values()
# print(old_rx_values, new_rx_values)
# print("\n-----------------------------------")
# print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
# print("-----------------------------------\n")
expected_passes += 1
if self.__compare_vals(old_rx_values, new_rx_values):
passes += 1
else:
self._fail("FAIL: Not all stations increased traffic", print_fail)
break
old_rx_values = new_rx_values
cur_time = datetime.datetime.now()
if passes == expected_passes:
self._pass("PASS: All tests passes", print_pass)
def stop(self):
self.cx_profile.stop_cx()
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.cx_profile.cleanup()
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
lfjson_port = 8080
parser = LFCliBase.create_basic_argparse(
prog='test_ipv4_l4_ftp_wifi.py',
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create layer-4 endpoints and test that the bytes-wr from the chosen URL are increasing over the
duration of the test
''',
description='''\
test_ipv4_l4_ftp_wifi.py
--------------------
Generic command example:
python3 ./test_ipv4_l4_ftp_wifi.py --upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--dest 10.40.0.1 \\
--test_duration 2m \\
--interval 1s \\
--requests_per_ten 600 \\
--dest /var/www/html/data_slug_4K.bin \\
--source /tmp/data_slug_4K.bin \\
--ftp_user lanforge \\
--ftp_passwd lanforge \\
--debug
''')
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
parser.add_argument('--dest', help='--dest specifies the destination for the file', default="/var/www/html/data_slug_4K.bin")
parser.add_argument('--source', help='--source specifies the source of the file',
default="/tmp/data_slug_4K.bin")
parser.add_argument('--ftp_user', help='--ftp_user sets the username to be used for ftp', default="lanforge")
parser.add_argument('--ftp_passwd', help='--ftp_user sets the password to be used for ftp', default="lanforge")
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
ssid=args.ssid,
password=args.passwd,
security=args.security,
station_list=station_list,
test_duration=args.test_duration,
requests_per_ten=args.requests_per_ten,
_debug_on=args.debug,
upstream_port=args.upstream_port,
ftp_user=args.ftp_user,
ftp_passwd=args.ftp_passwd,
dest=args.dest,
source=args.source)
ip_test.cleanup(station_list)
ip_test.build()
if not ip_test.passes():
print(ip_test.get_fail_message())
exit(1)
ip_test.start(False, False)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
exit(1)
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
print("Full test passed, all endpoints had increased bytes-wr throughout test duration")
if __name__ == "__main__":
main()

View File

@@ -1,299 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv4_l4_urls_per_ten.py
PURPOSE:
test_ipv4_l4_urls_per_ten.py will create stations and endpoints to generate and verify layer-4 traffic
This script will monitor the urls/s attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
EXAMPLE:
./test_ipv4_l4_urls_per_ten.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --requests_per_ten 600 --mode 1 --num_tests 1 --url "dl http://10.40.0.1 /dev/null"
--ap "00:0e:8e:78:e1:76" --target_per_ten 600 --output_format csv --report_file ~/Documents/results.csv --test_duration 2m
--debug
Use './test_ipv4_l4_urls_per_ten.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import argparse
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import realm
import time
import datetime
from realm import TestGroupProfile
class IPV4L4(LFCliBase):
def __init__(self,
host="localhost",
port=8080,
ssid=None,
security=None,
password=None,
url=None,
requests_per_ten=None,
station_list=None,
test_duration="2m",
ap=None,
mode=0,
target_requests_per_ten=60,
number_template="00000",
num_tests=1,
radio="wiphy0",
_debug_on=False,
upstream_port="eth1",
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.radio = radio
self.upstream_port = upstream_port
self.ssid = ssid
self.security = security
self.password = password
self.url = url
self.mode=mode
self.ap=ap
self.debug=_debug_on
self.requests_per_ten = int(requests_per_ten)
self.number_template = number_template
self.test_duration=test_duration
self.sta_list = station_list
self.num_tests = int(num_tests)
self.target_requests_per_ten = int(target_requests_per_ten)
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.l4cxprofile=realm.L4CXProfile(lfclient_host=host,
lfclient_port=port,local_realm=self.local_realm)
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_l4_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = self.mode
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap",self.ap)
self.cx_profile.url = self.url
self.cx_profile.requests_per_ten = self.requests_per_ten
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug, suppress_related_commands_=None)
def start(self, print_pass=False, print_fail=False):
temp_stas = self.sta_list.copy()
# temp_stas.append(self.local_realm.name_to_eid(self.upstream_port)[2])
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(temp_stas):
self._pass("All stations got IPs", print_pass)
else:
self._fail("Stations failed to get IPs", print_fail)
exit(1)
self.cx_profile.start_cx()
print("Starting test...")
def stop(self):
self.cx_profile.stop_cx()
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.cx_profile.cleanup()
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
parser = LFCliBase.create_basic_argparse(
prog='test_ipv4_l4_urls_per_ten',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create layer-4 endpoints to connect to a url and test that urls/s are meeting or exceeding the target rate
''',
description='''\
test_ipv4_l4_urls_per_ten.py:
--------------------
Generic command example:
python3 ./test_ipv4_l4_urls_per_ten.py
--upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--requests_per_ten 600 \\
--mode 1
{"auto" : "0",
"a" : "1",
"b" : "2",
"g" : "3",
"abg" : "4",
"abgn" : "5",
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
"bgnAX" : "13"} \\
--num_tests 1 \\
--url "dl http://10.40.0.1 /dev/null" \\
--ap "00:0e:8e:78:e1:76"
--target_per_ten 600 \\
--output_format csv \\
--report_file ~/Documents/results.csv \\
--test_duration 2m \\
--debug
''')
required = None
for agroup in parser._action_groups:
if agroup.title == "required arguments":
required = agroup
#if required is not None:
optional = None
for agroup in parser._action_groups:
if agroup.title == "optional arguments":
optional = agroup
if optional is not None:
optional.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
optional.add_argument('--num_tests', help='--num_tests number of tests to run. Each test runs 10 minutes', default=1)
optional.add_argument('--url', help='--url specifies upload/download, address, and dest',default="dl http://10.40.0.1 /dev/null")
optional.add_argument('--test_duration', help='duration of test',default="2m")
optional.add_argument('--target_per_ten', help='--target_per_ten target number of request per ten minutes. test will check for 90 percent this value',default=600)
optional.add_argument('--mode',help='Used to force mode of stations')
optional.add_argument('--ap',help='Used to force a connection to a particular AP')
optional.add_argument('--report_file',help='where you want to store results')
optional.add_argument('--output_format', help='choose csv or xlsx') #update once other forms are completed
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
if args.report_file is None:
if args.output_format in ['csv','json','html','hdf','stata','pickle','pdf','parquet','png','df','xlsx']:
output_form=args.output_format.lower()
print("Defaulting file output placement to /home/lanforge.")
rpt_file='/home/data.' + output_form
else:
print("Defaulting data file output type to Excel")
rpt_file='/home/lanforge/data.xlsx'
output_form='xlsx'
else:
rpt_file=args.report_file
if args.output_format is None:
output_form=str(args.report_file).split('.')[-1]
else:
output_form=args.output_format
#Create directory
if args.report_file is None:
try:
homedir = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':','-')+'test_ipv4_l4_urls_per_ten'
path = os.path.join('/home/lanforge/report-data/',homedir)
os.mkdir(path)
except:
path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print('Saving file to local directory')
else:
pass
if args.report_file is None:
if args.output_format in ['csv','json','html','hdf','stata','pickle','pdf','png','df','parquet','xlsx']:
rpt_file=path+'/data.' + args.output_format
output=args.output_format
else:
print('Defaulting data file output type to Excel')
rpt_file=path+'/data.xlsx'
output='xlsx'
else:
rpt_file=args.report_file
if args.output_format is None:
output=str(args.report_file).split('.')[-1]
else:
output=args.output_format
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
upstream_port=args.upstream_port,
security=args.security,
station_list=station_list,
url=args.url,
mode=args.mode,
ap=args.ap,
_debug_on=args.debug,
test_duration=args.test_duration,
num_tests=args.num_tests,
target_requests_per_ten=args.target_per_ten,
requests_per_ten=args.requests_per_ten)
ip_test.cleanup(station_list)
ip_test.build()
ip_test.start()
try:
layer4traffic=','.join([[*x.keys()][0] for x in ip_test.local_realm.json_get('layer4')['endpoint']])
except:
pass
ip_test.l4cxprofile.monitor(col_names=['bytes-rd', 'urls/s'],
report_file=rpt_file,
duration_sec=ip_test.local_realm.parse_time(args.test_duration).total_seconds(),
created_cx=layer4traffic,
output_format=output_form,
script_name='test_ipv4_l4_urls_per_ten',
arguments=args,
debug=args.debug)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
exit(1)
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
print("Full test passed, all endpoints met or exceeded 90 percent of the target rate")
if __name__ == "__main__":
main()

View File

@@ -1,246 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv4_l4_wifi.py
PURPOSE:
test_ipv4_l4_wifi.py will create stations and endpoints to generate and verify layer-4 upload traffic
This script will monitor the bytes-rd attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
EXAMPLE:
./test_ipv4_l4_wifi.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security open --ssid netgear
--passwd admin123 --test_duration 2m --requests_per_ten 600 --direction {ul | dl}
--dest /dev/null (or 10.40.0.1) --debug
Use './test_ipv4_l4_wifi.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../../py-json')
import argparse
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import *
from LANforge import LFUtils
import argparse
import realm
import time
import datetime
class IPV4L4(LFCliBase):
def __init__(self, host, port, ssid, security, password, requests_per_ten, station_list,
number_template="00000", radio="wiphy0", direction="dl", dest="/dev/null",
test_duration="5m", upstream_port="eth1",
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.radio = radio
self.upstream_port = upstream_port
self.ssid = ssid
self.direction = direction
self.dest = dest
self.security = security
self.password = password
self.requests_per_ten = requests_per_ten
self.number_template = number_template
self.sta_list = station_list
self.test_duration = test_duration
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_http_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
self.cx_profile.requests_per_ten = self.requests_per_ten
def __compare_vals(self, old_list, new_list):
passes = 0
expected_passes = 0
if len(old_list) == len(new_list):
for item, value in old_list.items():
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
if passes == expected_passes:
return True
else:
return False
else:
return False
def __get_values(self):
time.sleep(1)
cx_list = self.json_get("layer4/list?fields=name,bytes-rd", debug_=self.debug)
if self.debug:
print("==============\n", cx_list, "\n==============")
cx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
for item, value in cx_name.items():
for value_name, value_rx in value.items():
if item in self.cx_profile.created_cx.keys() and value_name == 'bytes-rd':
cx_map[item] = value_rx
return cx_map
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(self.sta_list):
self._pass("All stations got IPs")
else:
self._fail("Stations failed to get IPs")
self.cx_profile.direction = self.direction
self.cx_profile.dest = self.dest
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None, http=True)
def start(self, print_pass=False, print_fail=False):
cur_time = datetime.datetime.now()
old_rx_values = self.__get_values()
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
self.cx_profile.start_cx()
passes = 0
expected_passes = 0
print("Starting Test...")
while cur_time < end_time:
interval_time = cur_time + datetime.timedelta(minutes=1)
while cur_time < interval_time:
cur_time = datetime.datetime.now()
time.sleep(1)
new_rx_values = self.__get_values()
if self.debug:
print(old_rx_values, new_rx_values)
print("\n-----------------------------------")
print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
print("-----------------------------------\n")
expected_passes += 1
if self.__compare_vals(old_rx_values, new_rx_values):
passes += 1
else:
self._fail("FAIL: Not all stations increased traffic", print_fail)
break
old_rx_values = new_rx_values
cur_time = datetime.datetime.now()
if passes == expected_passes:
self._pass("PASS: All tests passes", print_pass)
def stop(self):
self.cx_profile.stop_cx()
for sta_name in self.sta_list:
data = LFUtils.port_down_request(1, self.local_realm.name_to_eid(sta_name)[2])
url = "cli-json/set_port"
self.json_post(url, data)
def cleanup(self, sta_list):
self.cx_profile.cleanup()
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
lfjson_host = "localhost"
lfjson_port = 8080
parser = LFCliBase.create_basic_argparse(
prog='test_ipv4_l4_wifi.py',
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create layer-4 endpoints and test that the bytes-rd from the chosen URL are increasing over the
duration of the test
''',
description='''\
test_ipv4_l4_wifi.py:
--------------------
Generic command example:
python3 ./test_ipv4_l4_wifi.py --upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--test_duration 2m \\
--requests_per_ten 600 \\
--direction {ul | dl} \\
--dest /dev/null (or 10.40.0.1)\\
--debug
''')
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
parser.add_argument('--direction', help='--direction <ul | dl> specifies upload or download', default="dl")
parser.add_argument('--dest', help='--dest specifies the destination (dl) or source (ul) for the file', default="/dev/null")
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
station_list=station_list,
direction=args.direction,
dest=args.dest,
test_duration=args.test_duration,
upstream_port=args.upstream_port,
requests_per_ten=args.requests_per_ten,
_debug_on=args.debug)
ip_test.cleanup(station_list)
ip_test.build()
if not ip_test.passes():
print(ip_test.get_fail_message())
ip_test.exit_fail()
ip_test.start(False, False)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
ip_test.exit_fail()
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
("Full test passed, all endpoints had increased bytes-rd throughout test duration")
ip_test.exit_success()
if __name__ == "__main__":
main()

View File

@@ -1,493 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv4_variable_time.py
PURPOSE:
test_ipv4_variable_time.py will create stations and endpoints to generate and verify layer-3 traffic.
This Script has two working modes:
Mode 1:
When station is not available,
This script will create a variable number of stations each with their own set of cross-connects and endpoints.
It will then create layer 3 traffic over a specified amount of time, testing for increased traffic at regular intervals.
This test will pass if all stations increase traffic over the full test duration.
Mode 2:
When station is already available This script will create layer3 cross-connects and endpoints It will then
create layer 3 traffic over a specified amount of time, testing for increased traffic at regular intervals.
This test will pass if all stations increase traffic over the full test duration.
Use './test_ipv4_variable_time.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import argparse
from LANforge import LFUtils
from realm import Realm
import time
import datetime
class IPV4VariableTime(Realm):
def __init__(self,
ssid=None,
security=None,
password=None,
sta_list=[],
create_sta=True,
name_prefix=None,
upstream=None,
radio=None,
host="localhost",
port=8080,
mode=0,
ap=None,
traffic_type=None,
side_a_min_rate=56, side_a_max_rate=0,
side_b_min_rate=56, side_b_max_rate=0,
number_template="00000",
test_duration="5m",
use_ht160=False,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(lfclient_host=host,
lfclient_port=port),
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.sta_list = sta_list
self.create_sta = create_sta
self.security = security
self.password = password
self.radio = radio
self.mode = mode
self.ap = ap
self.traffic_type = traffic_type
self.number_template = number_template
self.debug = _debug_on
# self.json_post("/cli-json/set_resource", {
# "shelf":1,
# "resource":all,
# "max_staged_bringup": 30,
# "max_trying_ifup": 15,
# "max_station_bringup": 6
# })
self.name_prefix = name_prefix
self.test_duration = test_duration
self.station_profile = self.new_station_profile()
self.cx_profile = self.new_l3_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.debug = self.debug
self.station_profile.use_ht160 = use_ht160
if self.station_profile.use_ht160:
self.station_profile.mode = 9
self.station_profile.mode = mode
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap", self.ap)
self.cx_profile.host = self.host
self.cx_profile.port = self.port
self.cx_profile.name_prefix = self.name_prefix
self.cx_profile.side_a_min_bps = side_a_min_rate
self.cx_profile.side_a_max_bps = side_a_max_rate
self.cx_profile.side_b_min_bps = side_b_min_rate
self.cx_profile.side_b_max_bps = side_b_max_rate
def start(self, print_pass=False, print_fail=False):
if self.create_sta:
self.station_profile.admin_up()
# to-do- check here if upstream port got IP
temp_stas = self.station_profile.station_names.copy()
if self.wait_for_ip(temp_stas):
self._pass("All stations got IPs")
else:
self._fail("Stations failed to get IPs")
self.exit_fail()
self.cx_profile.start_cx()
def stop(self):
self.cx_profile.stop_cx()
if self.create_sta:
self.station_profile.admin_down()
def pre_cleanup(self):
self.cx_profile.cleanup_prefix()
if self.create_sta:
for sta in self.sta_list:
self.rm_port(sta, check_exists=True)
def cleanup(self):
self.cx_profile.cleanup()
if self.create_sta:
self.station_profile.cleanup()
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
debug=self.debug)
def build(self):
if self.create_sta:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.cx_profile.create(endp_type=self.traffic_type, side_a=self.sta_list,
side_b=self.upstream,
sleep_time=0)
def main():
parser = Realm.create_basic_argparse(
prog='test_ipv4_variable_time.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations to test connection and traffic on VAPs of varying security types (WEP, WPA, WPA2, WPA3, Open)
''',
description='''\
test_ipv4_variable_time.py:
--------------------
Generic command layout:
python3 ./test_ipv4_variable_time.py
--upstream_port eth1
--radio wiphy0
--num_stations 32
--security {open|wep|wpa|wpa2|wpa3}
--mode 1
{"auto" : "0",
"a" : "1",
"b" : "2",
"g" : "3",
"abg" : "4",
"abgn" : "5",
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
"bgnAX" : "13"}
--ssid netgear
--password admin123
--test_duration 2m (default)
--monitor_interval_ms
--a_min 3000
--b_min 1000
--ap "00:0e:8e:78:e1:76"
--output_format csv
--traffic_type lf_udp
--report_file ~/Documents/results.csv (Example of csv file output - please use another extension for other file formats)
--compared_report ~/Documents/results_prev.csv (Example of csv file retrieval - please use another extension for other file formats) - UNDER CONSTRUCTION
--layer3_cols 'name','tx bytes','rx bytes','dropped' (column names from the GUI to print on report - please read below to know what to put here according to preferences)
--port_mgr_cols 'ap','ip' (column names from the GUI to print on report - please read below to know what to put here according to preferences)
--debug
python3 ./test_ipv4_variable_time.py
--upstream_port eth1 (upstream POrt)
--traffic_type lf_udp (traffic type, lf_udp | lf_tcp)
--test_duration 5m (duration to run traffic 5m --> 5 Minutes)
--create_sta False (False, means it will not create stations and use the sta_names specified below)
--sta_names sta000,sta001,sta002 (used if --create_sta False, comma separated names of stations)
===============================================================================
** FURTHER INFORMATION **
Using the layer3_cols flag:
Currently the output function does not support inputting the columns in layer3_cols the way they are displayed in the GUI. This quirk is under construction. To output
certain columns in the GUI in your final report, please match the according GUI column display to it's counterpart to have the columns correctly displayed in
your report.
GUI Column Display Layer3_cols argument to type in (to print in report)
Name | 'name'
EID | 'eid'
Run | 'run'
Mng | 'mng'
Script | 'script'
Tx Rate | 'tx rate'
Tx Rate (1 min) | 'tx rate (1&nbsp;min)'
Tx Rate (last) | 'tx rate (last)'
Tx Rate LL | 'tx rate ll'
Rx Rate | 'rx rate'
Rx Rate (1 min) | 'rx rate (1&nbsp;min)'
Rx Rate (last) | 'rx rate (last)'
Rx Rate LL | 'rx rate ll'
Rx Drop % | 'rx drop %'
Tx PDUs | 'tx pdus'
Tx Pkts LL | 'tx pkts ll'
PDU/s TX | 'pdu/s tx'
Pps TX LL | 'pps tx ll'
Rx PDUs | 'rx pdus'
Rx Pkts LL | 'pps rx ll'
PDU/s RX | 'pdu/s tx'
Pps RX LL | 'pps rx ll'
Delay | 'delay'
Dropped | 'dropped'
Jitter | 'jitter'
Tx Bytes | 'tx bytes'
Rx Bytes | 'rx bytes'
Replays | 'replays'
TCP Rtx | 'tcp rtx'
Dup Pkts | 'dup pkts'
Rx Dup % | 'rx dup %'
OOO Pkts | 'ooo pkts'
Rx OOO % | 'rx ooo %'
RX Wrong Dev | 'rx wrong dev'
CRC Fail | 'crc fail'
RX BER | 'rx ber'
CX Active | 'cx active'
CX Estab/s | 'cx estab/s'
1st RX | '1st rx'
CX TO | 'cx to'
Pattern | 'pattern'
Min PDU | 'min pdu'
Max PDU | 'max pdu'
Min Rate | 'min rate'
Max Rate | 'max rate'
Send Buf | 'send buf'
Rcv Buf | 'rcv buf'
CWND | 'cwnd'
TCP MSS | 'tcp mss'
Bursty | 'bursty'
A/B | 'a/b'
Elapsed | 'elapsed'
Destination Addr | 'destination addr'
Source Addr | 'source addr'
''')
parser.add_argument('--mode', help='Used to force mode of stations')
parser.add_argument('--ap', help='Used to force a connection to a particular AP')
parser.add_argument('--traffic_type', help='Select the Traffic Type [lf_udp, lf_tcp]', required=True)
parser.add_argument('--output_format', help='choose either csv or xlsx')
parser.add_argument('--report_file', help='where you want to store results', default=None)
parser.add_argument('--a_min', help='--a_min bps rate minimum for side_a', default=256000)
parser.add_argument('--b_min', help='--b_min bps rate minimum for side_b', default=256000)
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="2m")
parser.add_argument('--layer3_cols', help='Columns wished to be monitored from layer 3 endpoint tab',
default=['name', 'tx bytes', 'rx bytes', 'tx rate', 'rx rate'])
parser.add_argument('--port_mgr_cols', help='Columns wished to be monitored from port manager tab',
default=['ap', 'ip', 'parent dev'])
parser.add_argument('--compared_report', help='report path and file which is wished to be compared with new report',
default=None)
parser.add_argument('--monitor_interval',
help='how frequently do you want your monitor function to take measurements; \, 35s, 2h',
default='10s')
parser.add_argument('--influx_token', help='Username for your Influx database')
parser.add_argument('--influx_bucket', help='Password for your Influx database')
parser.add_argument('--influx_org', help='Name of your Influx database')
parser.add_argument('--influx_port', help='Port where your influx database is located', default=8086)
parser.add_argument('--influx_tag', action='append', nargs=2,
help='--influx_tag <key> <val> Can add more than one of these.')
parser.add_argument('--influx_mgr',
help='IP address of the server your Influx database is hosted if different from your LANforge Manager',
default=None)
parser.add_argument('--create_sta', help='Used to force a connection to a particular AP', default=True)
parser.add_argument('--sta_names', help='Used to force a connection to a particular AP', default="sta0000")
args = parser.parse_args()
create_sta = True
if args.create_sta == "False":
create_sta = False
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_sta = int(args.num_stations)
# Create directory
# if file path with output file extension is not given...
# check if home/lanforge/report-data exists. if not, save
# in new folder based in current file's directory
if args.report_file is None:
new_file_path = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-h-%M-m-%S-s")).replace(':',
'-') + '_test_ipv4_variable_time' # create path name
try:
path = os.path.join('/home/lanforge/report-data/', new_file_path)
os.mkdir(path)
except:
curr_dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
path = os.path.join(curr_dir_path, new_file_path)
os.mkdir(path)
systeminfopath = str(path) + '/systeminfo.txt'
if args.output_format in ['csv', 'json', 'html', 'hdf', 'stata', 'pickle', 'pdf', 'png', 'parquet',
'xlsx']:
report_f = str(path) + '/data.' + args.output_format
output = args.output_format
else:
print(
'Not supporting this report format or cannot find report format provided. Defaulting to csv data file output type, naming it data.csv.')
report_f = str(path) + '/data.csv'
output = 'csv'
else:
systeminfopath = str(args.report_file).split('/')[-1]
report_f = args.report_file
if args.output_format is None:
output = str(args.report_file).split('.')[-1]
else:
output = args.output_format
print("IPv4 Test Report Data: {}".format(report_f))
# Retrieve last data file
compared_rept = None
if args.compared_report:
compared_report_format = args.compared_report.split('.')[-1]
# if compared_report_format not in ['csv', 'json', 'dta', 'pkl','html','xlsx','parquet','h5']:
if compared_report_format != 'csv':
print(ValueError("Cannot process this file type. Please select a different file and re-run script."))
exit(1)
else:
compared_rept = args.compared_report
if create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta - 1, padding_number_=10000,
radio=args.radio)
else:
station_list = args.sta_names.split(",")
ip_var_test = IPV4VariableTime(host=args.mgr,
port=args.mgr_port,
number_template="0000",
sta_list=station_list,
create_sta=create_sta,
name_prefix="VT",
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
test_duration=args.test_duration,
use_ht160=False,
side_a_min_rate=args.a_min,
side_b_min_rate=args.b_min,
mode=args.mode,
ap=args.ap,
traffic_type=args.traffic_type,
_debug_on=args.debug)
ip_var_test.pre_cleanup()
ip_var_test.build()
# exit()
if create_sta:
if not ip_var_test.passes():
print(ip_var_test.get_fail_message())
ip_var_test.exit_fail()
try:
layer3connections = ','.join([[*x.keys()][0] for x in ip_var_test.json_get('endp')['endpoint']])
except:
raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
if type(args.layer3_cols) is not list:
layer3_cols = list(args.layer3_cols.split(","))
# send col names here to file to reformat
else:
layer3_cols = args.layer3_cols
# send col names here to file to reformat
if type(args.port_mgr_cols) is not list:
port_mgr_cols = list(args.port_mgr_cols.split(","))
# send col names here to file to reformat
else:
port_mgr_cols = args.port_mgr_cols
# send col names here to file to reformat
if args.debug:
print("Layer 3 Endp column names are...")
print(layer3_cols)
print("Port Manager column names are...")
print(port_mgr_cols)
print("Layer 3 Endp column names are...")
print(layer3_cols)
print("Port Manager column names are...")
print(port_mgr_cols)
try:
monitor_interval = Realm.parse_time(args.monitor_interval).total_seconds()
except ValueError as error:
print(str(error))
print(ValueError(
"The time string provided for monitor_interval argument is invalid. Please see supported time stamp increments and inputs for monitor_interval in --help. "))
exit(1)
ip_var_test.start(False, False)
# if args.influx_mgr is None:
# manager = args.mgr
# else:
# manager = args.influx_mgr
if args.influx_org is not None:
from influx2 import RecordInflux
grapher = RecordInflux( # _influx_host=manager,
_influx_port=args.influx_port,
_influx_org=args.influx_org,
_influx_token=args.influx_token,
_influx_bucket=args.influx_bucket)
devices = [station.split('.')[-1] for station in station_list]
tags = dict()
tags['script'] = 'test_ipv4_variable_time'
try:
for k in args.influx_tag:
tags[k[0]] = k[1]
except:
pass
grapher.monitor_port_data(longevity=Realm.parse_time(args.test_duration).total_seconds(),
devices=devices,
monitor_interval=Realm.parse_time(args.monitor_interval).total_seconds(),
tags=tags)
ip_var_test.cx_profile.monitor(layer3_cols=layer3_cols,
sta_list=station_list,
# port_mgr_cols=port_mgr_cols,
report_file=report_f,
systeminfopath=systeminfopath,
duration_sec=Realm.parse_time(args.test_duration).total_seconds(),
monitor_interval_ms=monitor_interval,
created_cx=layer3connections,
output_format=output,
compared_report=compared_rept,
script_name='test_ipv4_variable_time',
arguments=args,
debug=args.debug)
ip_var_test.stop()
if create_sta:
if not ip_var_test.passes():
print(ip_var_test.get_fail_message())
ip_var_test.exit_fail()
LFUtils.wait_until_ports_admin_up(port_list=station_list)
if ip_var_test.passes():
ip_var_test.success()
ip_var_test.cleanup()
print("IPv4 Variable Time Test Report Data: {}".format(report_f))
if __name__ == "__main__":
main()

View File

@@ -1,297 +0,0 @@
#!/usr/bin/env python3
"""test_ipv4_variable_time.py will create stations and endpoints to generate and verify layer-3 traffic.
This script will create a variable number of stations each with their own set of cross-connects and endpoints.
It will then create layer 3 traffic over a specified amount of time, testing for increased traffic at regular intervals.
This test will pass if all stations increase traffic over the full test duration.
Use './test_ipv4_variable_time.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
import argparse
from LANforge import LFUtils
from realm import Realm
from test_base import TestBase
import time
import datetime
class IPV4VariableTime(Realm, TestBase):
def __init__(self,
ssid=None,
security=None,
password=None,
sta_list=[],
name_prefix=None,
upstream=None,
radio=None,
host="localhost",
port=8080,
mode=0,
ap=None,
monitor=False,
side_a_min_rate=56, side_a_max_rate=0,
side_b_min_rate=56, side_b_max_rate=0,
number_template="00000", test_duration="5m", use_ht160=False,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(lfclient_host=host,
lfclient_port=port)
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.sta_list = sta_list
self.security = security
self.password = password
self.radio = radio
self.mode = mode
self.ap = ap
self.number_template = number_template
self.debug = _debug_on
# self.json_post("/cli-json/set_resource", {
# "shelf":1,
# "resource":all,
# "max_staged_bringup": 30,
# "max_trying_ifup": 15,
# "max_station_bringup": 6
# })
self.name_prefix = name_prefix
self.test_duration = test_duration
self.station_profile = self.new_station_profile(ver = 2, station_list = sta_list)
self.cx_profile = self.new_l3_cx_profile(ver = 2)
#station profile settings
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.debug = self.debug
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.use_ht160 = use_ht160
self.station_profile.mode = mode
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap", self.ap)
#cx profile settings
self.cx_profile.host = self.host
self.cx_profile.port = self.port
self.cx_profile.name_prefix = self.name_prefix
self.cx_profile.side_a_min_bps = side_a_min_rate
self.cx_profile.side_a_max_bps = side_a_max_rate
self.cx_profile.side_b_min_bps = side_b_min_rate
self.cx_profile.side_b_max_bps = side_b_max_rate
self.profiles.extend([self.station_profile, self.cx_profile])
def main():
optional = []
optional.append({'name': '--mode', 'help': 'Used to force mode of stations'})
optional.append({'name': '--ap', 'help': 'Used to force a connection to a particular AP'})
optional.append({'name': '--output_format', 'help': 'choose either csv or xlsx'})
optional.append({'name': '--report_file', 'help': 'where you want to store results', 'default': None})
optional.append({'name': '--a_min', 'help': '--a_min bps rate minimum for side_a', 'default': 256000})
optional.append({'name': '--b_min', 'help': '--b_min bps rate minimum for side_b', 'default': 256000})
optional.append(
{'name': '--test_duration', 'help': '--test_duration sets the duration of the test', 'default': "2m"})
optional.append({'name': '--layer3_cols', 'help': 'Columns wished to be monitored from layer 3 endpoint tab',
'default': ['name', 'tx bytes', 'rx bytes']})
optional.append({'name': '--port_mgr_cols', 'help': 'Columns wished to be monitored from port manager tab',
'default': ['ap', 'ip', 'parent dev']})
optional.append(
{'name': '--compared_report', 'help': 'report path and file which is wished to be compared with new report',
'default': None})
optional.append({'name': '--monitor_interval',
'help': 'frequency of monitor polls - ex: 250ms, 35s, 2h',
'default': '2s'})
optional.append({'name': '--monitor',
'help': 'whether test data should be recorded and stored in a report'})
parser = Realm.create_basic_argparse(
prog='test_ipv4_variable_time.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations to test connection and traffic on VAPs of varying security types (WEP, WPA, WPA2, WPA3, Open)
''',
description='''\
test_ipv4_variable_time.py:
--------------------
Generic command layout:
python3 ./test_ipv4_variable_time.py
--upstream_port eth1
--radio wiphy0
--num_stations 32
--security {open|wep|wpa|wpa2|wpa3}
--mode 1
{"auto" : "0",
"a" : "1",
"b" : "2",
"g" : "3",
"abg" : "4",
"abgn" : "5",
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
"bgnAX" : "13"}
--ssid netgear
--password admin123
--test_duration 2m (default)
--monitor_interval_ms
--monitor
--a_min 3000
--b_min 1000
--ap "00:0e:8e:78:e1:76"
--output_format csv
--report_file ~/Documents/results.csv (Example of csv file output - please use another extension for other file formats)
--compared_report ~/Documents/results_prev.csv (Example of csv file retrieval - please use another extension for other file formats) - UNDER CONSTRUCTION
--layer3_cols'name','tx bytes','rx bytes','dropped' (column names from the GUI to print on report - please read below to know what to put here according to preferences)
--port_mgr_cols 'ap','ip' (column names from the GUI to print on report - please read below to know what to put here according to preferences)
--debug
===============================================================================
** FURTHER INFORMATION **
Using the layer3_cols flag:
Currently the output function does not support inputting the columns in layer3_cols the way they are displayed in the GUI. This quirk is under construction. To output
certain columns in the GUI in your final report, please match the according GUI column display to it's counterpart to have the columns correctly displayed in
your report.
GUI Column Display Layer3_cols argument to type in (to print in report)
Name | 'name'
EID | 'eid'
Run | 'run'
Mng | 'mng'
Script | 'script'
Tx Rate | 'tx rate'
Tx Rate (1 min) | 'tx rate (1&nbsp;min)'
Tx Rate (last) | 'tx rate (last)'
Tx Rate LL | 'tx rate ll'
Rx Rate | 'rx rate'
Rx Rate (1 min) | 'rx rate (1&nbsp;min)'
Rx Rate (last) | 'rx rate (last)'
Rx Rate LL | 'rx rate ll'
Rx Drop % | 'rx drop %'
Tx PDUs | 'tx pdus'
Tx Pkts LL | 'tx pkts ll'
PDU/s TX | 'pdu/s tx'
Pps TX LL | 'pps tx ll'
Rx PDUs | 'rx pdus'
Rx Pkts LL | 'pps rx ll'
PDU/s RX | 'pdu/s tx'
Pps RX LL | 'pps rx ll'
Delay | 'delay'
Dropped | 'dropped'
Jitter | 'jitter'
Tx Bytes | 'tx bytes'
Rx Bytes | 'rx bytes'
Replays | 'replays'
TCP Rtx | 'tcp rtx'
Dup Pkts | 'dup pkts'
Rx Dup % | 'rx dup %'
OOO Pkts | 'ooo pkts'
Rx OOO % | 'rx ooo %'
RX Wrong Dev | 'rx wrong dev'
CRC Fail | 'crc fail'
RX BER | 'rx ber'
CX Active | 'cx active'
CX Estab/s | 'cx estab/s'
1st RX | '1st rx'
CX TO | 'cx to'
Pattern | 'pattern'
Min PDU | 'min pdu'
Max PDU | 'max pdu'
Min Rate | 'min rate'
Max Rate | 'max rate'
Send Buf | 'send buf'
Rcv Buf | 'rcv buf'
CWND | 'cwnd'
TCP MSS | 'tcp mss'
Bursty | 'bursty'
A/B | 'a/b'
Elapsed | 'elapsed'
Destination Addr | 'destination addr'
Source Addr | 'source addr'
''',
more_optional=optional)
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_sta = int(args.num_stations)
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta - 1, padding_number_=10000,
radio=args.radio)
#transfer below to l3cxprofile2 or base_profile-----------------------#
# try:
# layer3connections = ','.join([[*x.keys()][0] for x in ip_var_test.json_get('endp')['endpoint']])
# except:
# raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
# if type(args.layer3_cols) is not list:
# layer3_cols = list(args.layer3_cols.split(","))
# # send col names here to file to reformat
# else:
# layer3_cols = args.layer3_cols
# # send col names here to file to reformat
# if type(args.port_mgr_cols) is not list:
# port_mgr_cols = list(args.port_mgr_cols.split(","))
# # send col names here to file to reformat
# else:
# port_mgr_cols = args.port_mgr_cols
# # send col names here to file to reformat
# if args.debug:
# print("Layer 3 Endp column names are...")
# print(layer3_cols)
# print("Port Manager column names are...")
# print(port_mgr_cols)
ip_var_test = IPV4VariableTime(host=args.mgr,
port=args.mgr_port,
number_template="0000",
sta_list=station_list,
name_prefix="VT",
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
test_duration=args.test_duration,
use_ht160=False,
side_a_min_rate=args.a_min,
side_b_min_rate=args.b_min,
mode=args.mode,
ap=args.ap,
_debug_on=args.debug)
ip_var_test.begin()
if __name__ == "__main__":
main()

View File

@@ -1,225 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv6_connection.py
PURPOSE:
This scripts functionality has been replaced by test_ip_connection.py, consider this script deprecated
test_ipv6_connection.py will create stations and attempt to connect to an SSID using IPv6. WPA, WPA2, WPA3, WEP, and Open connection types are supported
Script for creating a variable number of stations and attempting to connect them to an SSID using IPv6.
A test will run to verify stations are associated and get an IP, if these conditions are both true, the test will
pass, otherwise, the test will fail.
EXAMPLE:
./test_ipv6_connection.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --proxy --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --mode 1 --ap "00:0e:8e:78:e1:76" --test_id --timeout 120 --debug
Use './test_ipv6_connection.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import LANforge
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import argparse
import realm
import time
import pprint
class IPv6Test(LFCliBase):
def __init__(self, ssid, security, password,ap=None, mode=0, sta_list=None, num_stations=0, prefix="00000", host="localhost", port=8080,
_debug_on=False, timeout=120, radio="wiphy0",
_exit_on_error=False,
_exit_on_fail=False,
number_template="00"):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.ssid = ssid
self.radio = radio
self.security = security
self.password = password
self.ap=ap
self.mode=mode
self.num_stations = num_stations
self.sta_list = sta_list
self.timeout = timeout
self.prefix = prefix
self.debug = _debug_on
self.number_template = number_template
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
if mode is not None:
self.station_profile.mode = mode
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
def build(self):
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap", self.ap)
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.prefix)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
def start(self, sta_list, print_pass, print_fail):
self.station_profile.admin_up()
associated_map = {}
ip_map = {}
print("Starting test...")
for sec in range(self.timeout):
for sta_name in sta_list:
shelf = self.local_realm.name_to_eid(sta_name)[0]
resource = self.local_realm.name_to_eid(sta_name)[1]
name = self.local_realm.name_to_eid(sta_name)[2]
sta_status = self.json_get("port/%s/%s/%s?fields=port,alias,ipv6+address,ap" % (shelf, resource, name),
debug_=self.debug)
if self.debug:
print(sta_status)
try:
if (sta_status is None) or (sta_status['interface'] is None) or (sta_status['interface']['ap'] is None):
continue
except:
continue
if len(sta_status['interface']['ap']) == 17 and sta_status['interface']['ap'][-3] == ':':
if self.debug:
pprint.pprint(sta_status['interface'])
#print("Associated", sta_name, sta_status['interface']['ap'], sta_status['interface']['ip'])
associated_map[sta_name] = 1
if sta_status['interface']['ipv6 address'] != 'DELETED' and \
not sta_status['interface']['ipv6 address'].startswith('fe80') \
and sta_status['interface']['ipv6 address'] != 'AUTO':
if self.debug:
print("IPv6 address:", sta_name, sta_status['interface']['ap'], sta_status['interface']['ipv6 address'])
ip_map[sta_name] = 1
if (len(sta_list) == len(ip_map)) and (len(sta_list) == len(associated_map)):
break
else:
time.sleep(1)
if self.debug:
print("sta_list", len(sta_list), sta_list)
print("ip_map", len(ip_map), ip_map)
print("associated_map", len(associated_map), associated_map)
if (len(sta_list) == len(ip_map)) and (len(sta_list) == len(associated_map)):
self._pass("PASS: All stations associated with IP", print_pass)
else:
self._fail("FAIL: Not all stations able to associate/get IP", print_fail)
print("sta_list", sta_list)
print("ip_map", ip_map)
print("associated_map", associated_map)
return self.passes()
def stop(self):
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
parser = LFCliBase.create_basic_argparse(
prog='test_ipv6_connection.py',
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations that attempt to authenticate, associate, and receive IPV6 addresses on the
chosen SSID
''',
description='''\
test_ipv6_connection.py:
--------------------------------------------------
Generic command example:
python3 ./test_ipv6_connection.py
--upstream_port eth1
--radio wiphy0
--num_stations 3
--proxy
--security {open|wep|wpa|wpa2|wpa3}
--ssid netgear
--passwd admin123
--mode 1
--ap "00:0e:8e:78:e1:76"
--test_id
--timeout 120
--debug
''')
required = None
for agroup in parser._action_groups:
if agroup.title == "required arguments":
required = agroup
#if required is not None:
optional = None
for agroup in parser._action_groups:
if agroup.title == "optional arguments":
optional = agroup
if optional is not None:
optional.add_argument("--ap", help="Add BSSID of access point to connect to")
optional.add_argument('--mode', help=LFCliBase.Help_Mode)
optional.add_argument('--timeout', help='--timeout sets the length of time to wait until a connection is successful', default=30)
args = parser.parse_args()
num_sta=2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ipv6_test = IPv6Test(host=args.mgr, port=args.mgr_port,
ssid=args.ssid,
password=args.passwd,
security=args.security,
ap=args.ap,
mode=args.mode,
sta_list=station_list,
_debug_on=args.debug)
ipv6_test.cleanup(station_list)
ipv6_test.build()
print('Sleeping for 30 seconds...', flush=True)
time.sleep(30)
if not ipv6_test.passes():
print(ipv6_test.get_fail_message())
exit(1)
ipv6_test.start(station_list, False, False)
ipv6_test.stop()
if not ipv6_test.passes():
print(ipv6_test.get_fail_message())
exit(1)
time.sleep(20)
ipv6_test.cleanup(station_list)
if ipv6_test.passes():
print("Full test passed, all stations associated and got IP")
if __name__ == "__main__":
main()

View File

@@ -1,309 +0,0 @@
#!/usr/bin/env python3
"""
NAME: test_ipv6_variable_time.py
PURPOSE:
test_ipv6_variable_time.py will create stations and endpoints to generate and verify layer-3 traffic over IPv6.
This script will create a variable number of stations, using IPv6, each with their own set of cross-connects and endpoints.
It will then create layer 3 traffic over a specified amount of time, testing for increased traffic at regular intervals.
This test will pass if all stations increase traffic over the full test duration.
EXAMPLE:
./test_ipv6_connection.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --upstream 10.40.0.1 --test_duration 2m --interval 1s --a_min 256000 --b_min 256000
--debug
Use './test_ipv6_variable_time.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import argparse
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import realm
import time
import datetime
class IPV6VariableTime(LFCliBase):
def __init__(self,
_host="localhost",
_port=8080,
_ssid=None,
_security=None,
_password=None,
_sta_list=None,
_name_prefix="sta",
_upstream="1.1.eth1",
_radio="wiphy0",
_side_a_min_rate=256000,
_side_a_max_rate=0, # same
_side_b_min_rate=256000,
_side_b_max_rate=0, # same
_number_template="00000",
_test_duration="5m",
_use_ht160=False,
_cx_type=None,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(_host,
_port,
_local_realm=realm.Realm(lfclient_host=_host,
lfclient_port=_port,
debug_=_debug_on,
_exit_on_error=_exit_on_error,
_exit_on_fail=_exit_on_fail),
_debug=_debug_on,
_exit_on_fail=_exit_on_fail)
self.upstream = _upstream
self.ssid = _ssid
self.sta_list = _sta_list
self.security = _security
self.password = _password
self.radio = _radio
self.number_template = _number_template
self.debug = _debug_on
self.name_prefix = _name_prefix
self.test_duration = _test_duration
self.cx_type = _cx_type
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_l3_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.debug = self.debug
self.station_profile.use_ht160 = _use_ht160
if self.station_profile.use_ht160:
self.station_profile.mode = 9
self.cx_profile.host = _host
self.cx_profile.port = _port
self.cx_profile.name_prefix = _name_prefix
self.cx_profile.side_a_min_bps = _side_a_min_rate
self.cx_profile.side_a_max_bps = _side_a_max_rate
self.cx_profile.side_b_min_bps = _side_b_min_rate
self.cx_profile.side_b_max_bps = _side_b_max_rate
def __get_rx_values(self):
cx_list = self.json_get("endp?fields=name,rx+bytes", debug_=self.debug)
# print(self.cx_profile.created_cx.values())
# print("==============\n", cx_list, "\n==============")
cx_rx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
for item, value in cx_name.items():
for value_name, value_rx in value.items():
if value_name == 'rx bytes' and item in self.cx_profile.created_cx.values():
cx_rx_map[item] = value_rx
return cx_rx_map
def __compare_vals(self, old_list, new_list):
passes = 0
expected_passes = 0
if len(old_list) == len(new_list):
for item, value in old_list.items():
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
# print(item, new_list[item], old_list[item], passes, expected_passes)
if passes == expected_passes:
return True
else:
return False
else:
return False
def start(self, print_pass=False, print_fail=False):
self.station_profile.admin_up()
temp_stas = self.station_profile.station_names.copy()
# temp_stas.append(self.upstream)
if self.local_realm.wait_for_ip(temp_stas, ipv4=False, ipv6=True):
self._pass("All stations got IPs", print_pass)
else:
self._fail("Stations failed to get IPs", print_fail)
exit(1)
cur_time = datetime.datetime.now()
old_cx_rx_values = self.__get_rx_values()
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
self.cx_profile.start_cx()
passes = 0
expected_passes = 0
while cur_time < end_time:
interval_time = cur_time + datetime.timedelta(minutes=1)
while cur_time < interval_time:
cur_time = datetime.datetime.now()
time.sleep(1)
new_cx_rx_values = self.__get_rx_values()
# print(old_cx_rx_values, new_cx_rx_values)
# print("\n-----------------------------------")
# print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
# print("-----------------------------------\n")
expected_passes += 1
if self.__compare_vals(old_cx_rx_values, new_cx_rx_values):
passes += 1
else:
self._fail("FAIL: Not all stations increased traffic", print_fail)
break
old_cx_rx_values = new_cx_rx_values
cur_time = datetime.datetime.now()
if passes == expected_passes:
self._pass("PASS: All tests passed", print_pass)
def stop(self):
self.cx_profile.stop_cx()
self.station_profile.admin_down()
def pre_cleanup(self):
self.cx_profile.cleanup_prefix()
for sta in self.sta_list:
self.local_realm.rm_port(sta, check_exists=True, debug_=self.debug)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url,
port_list=self.sta_list,
debug=self.debug)
def cleanup(self):
self.cx_profile.cleanup()
self.station_profile.cleanup()
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url,
port_list=self.station_profile.station_names,
debug=self.debug)
def build(self):
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self.cx_profile.create(endp_type=self.cx_type, side_a=self.station_profile.station_names, side_b=self.upstream,
sleep_time=0)
self._pass("PASS: Station build finished")
def main():
parser = LFCliBase.create_basic_argparse(
prog='test_ipv6_variable_time.py',
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations to test IPV6 connection and traffic on VAPs of varying security types (WEP, WPA, WPA2, WPA3, Open)
''',
description='''\
test_ipv6_variable_time.py:
--------------------
Generic command example:
./test_ipv6_connection.py --upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--upstream 10.40.0.1 \\
--test_duration 2m \\
--interval 1s \\
--a_min 256000 \\
--b_min 256000 \\
--debug
''')
required_args=None
for group in parser._action_groups:
if group.title == "required arguments":
required_args=group
break;
if required_args is not None:
required_args.add_argument('--a_min', help='minimum bps rate for side_a', default=256000)
required_args.add_argument('--b_min', help='minimum bps rate for side_b', default=256000)
required_args.add_argument('--cx_type', help='tcp6 or udp6')
required_args.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
optional_args=None
for group in parser._action_groups:
if group.title == "optional arguments":
optional_args=group
break;
if optional_args is not None:
optional_args.add_argument('--mode', help='Used to force mode of stations')
optional_args.add_argument('--ap', help='Used to force a connection to a particular AP')
optional_args.add_argument("--a_max", help="Maximum side_a bps speed", default=0)
optional_args.add_argument("--b_max", help="Maximum side_b bps speed", default=0)
args = parser.parse_args()
CX_TYPES=("tcp6", "udp6", "lf_tcp6", "lf_udp6")
if (args.cx_type is None) or (args.cx_type not in CX_TYPES):
print("cx_type needs to be lf_tcp6 or lf_udp6, bye")
exit(1)
if args.cx_type == "tcp6":
args.cx_type = "lf_tcp6"
if args.cx_type == "udp6":
args.cx_type = "lf_udp6"
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ip_var_test = IPV6VariableTime(_host=args.mgr,
_port=args.mgr_port,
_number_template="00",
_sta_list=station_list,
_name_prefix="VT",
_upstream=args.upstream_port,
_ssid=args.ssid,
_password=args.passwd,
_radio=args.radio,
_security=args.security,
_test_duration=args.test_duration,
_use_ht160=False,
_side_a_min_rate=args.a_min,
_side_b_min_rate=args.b_min,
_side_a_max_rate=args.a_max,
_side_b_max_rate=args.b_max,
_cx_type=args.cx_type,
_debug_on=args.debug)
ip_var_test.pre_cleanup()
ip_var_test.build()
if not ip_var_test.passes():
print(ip_var_test.get_fail_message())
exit(1)
ip_var_test.start(False, False)
ip_var_test.stop()
if not ip_var_test.passes():
print(ip_var_test.get_fail_message())
exit(1)
time.sleep(30)
ip_var_test.cleanup()
if ip_var_test.passes():
print("Full test passed, all connections increased rx bytes")
if __name__ == "__main__":
main()