Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
shivamcandela
2021-06-20 20:17:58 +05:30
31 changed files with 12323 additions and 325 deletions

View File

@@ -1,14 +1,14 @@
#!/bin/bash
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- #
# Check for large files and purge the ones requested #
# #
# The -a switch will automatically purge core files when there #
# is only 5GB of space left on filesystem. #
# #
# To install as a cron-job, add the following line to /etc/crontab: #
# 1 * * * * root /home/lanforge/scripts/check_large_files.sh -a 2>&1 | logger -t check_large_files
# #
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- #
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- #
# Check for large files and purge the ones requested #
# #
# The -a switch will automatically purge core files when there #
# is only 5GB of space left on filesystem. #
# #
# To install as a cron-job, add the following line to /etc/crontab: #
# 1 * * * * root /home/lanforge/scripts/check_large_files.sh -a 2>&1 | logger -t check_large_files #
# #
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- #
# set -x
# set -e
# these are default selections
@@ -284,7 +284,7 @@ kernel_to_relnum() {
}
empty_trash_can() {
set -vux
#set -vux
if [ -x /usr/bin/trash-empty ]; then
for can in "${trash_cans[@]}"; do
if [[ $can = /home* ]]; then
@@ -460,13 +460,17 @@ compress_report_data() {
note "compress report data..."
cd /home/lanforge
# local csvfiles=( $( find /home/lanforge -iname "*.csv" -print0 ))
while read f; do
(( $verbose > 0 )) && echo " compressing $f"
xz -7 "$f"
done < <(find html-reports/ lf_reports/ report-data/ tmp/ -type f \
-a \( -name '*.csv' -o -name '*.pdf' -o -name '*.pdf' -o -name '*.pcap' -o -name '*.pcapng' \) )
local vile_list=(`find html-reports/ lf_reports/ report-data/ tmp/ -type f \
-a \( -name '*.csv' -o -name '*.pdf' -o -name '*.pdf' -o -name '*.pcap' -o -name '*.pcapng' \)`)
counter=1
for f in "${vile_list[@]}"; do
(( $verbose > 0 )) && echo " compressing $f" || echo -n " ${counter}/${#vile_list[@]}"
nice xz -T0 -5 "$f"
(( counter+=1 ))
done
totals[r]=0
cd -
echo ""
}
clean_var_tmp() {
@@ -567,7 +571,7 @@ survey_kernel_files() {
fi
done
if (( $verbose > 0 )) && (( ${#libmod_sort_names[@]} > 0 )); then
if (( ${#libmod_sort_names[@]} > 0 )); then
# debug "Removable libmod dirs: "
while read ser; do
file="${libmod_sort_names[$ser]}"

View File

@@ -1,8 +1,10 @@
#!/usr/bin/env python3
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Class holds default settings for json requests to Grafana -
# Class holds default settings for json requests to Ghost -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
import ast
import os
import sys
if sys.version_info[0] != 3:
@@ -13,25 +15,72 @@ import requests
import jwt
from datetime import datetime as date
import json
import subprocess
from scp import SCPClient
import paramiko
from GrafanaRequest import GrafanaRequest
class CSVReader:
def read_csv(self,
file,
sep=','):
df = open(file).read().split('\n')
rows = list()
for x in df:
if len(x) > 0:
rows.append(x.split(sep))
length = list(range(0, len(df[0])))
columns = dict(zip(df[0], length))
return rows
def get_column(self,
df,
value):
index = df[0].index(value)
values = []
for row in df[1:]:
values.append(row[index])
return values
class GhostRequest:
def __init__(self,
_ghostjson_host,
_ghostjson_port,
_ghost_json_host,
_ghost_json_port,
_api_token=None,
_headers=dict(),
_overwrite='false',
debug_=False,
die_on_error_=False):
self.debug = debug_
self.die_on_error = die_on_error_
self.ghostjson_url = "http://%s:%s/ghost/api/v3" % (_ghostjson_host, _ghostjson_port)
self.ghost_json_host = _ghost_json_host
self.ghost_json_port = _ghost_json_port
self.ghost_json_url = "http://%s:%s/ghost/api/v3" % (_ghost_json_host, _ghost_json_port)
self.data = dict()
self.data['overwrite'] = _overwrite
self.ghostjson_login = self.ghostjson_url + '/admin/session/'
self.ghost_json_login = self.ghost_json_url + '/admin/session/'
self.api_token = _api_token
self.images = list()
self.pdfs = list()
def encode_token(self):
# Split the key into ID and SECRET
key_id, secret = self.api_token.split(':')
# Prepare header and payload
iat = int(date.now().timestamp())
header = {'alg': 'HS256', 'typ': 'JWT', 'kid': key_id}
payload = {
'iat': iat,
'exp': iat + 5 * 60,
'aud': '/v3/admin/'
}
token = jwt.encode(payload, bytes.fromhex(secret), algorithm='HS256', headers=header)
return token
def create_post(self,
title=None,
@@ -39,15 +88,7 @@ class GhostRequest:
tags=None,
authors=None,
status="published"):
ghostjson_url = self.ghostjson_url + '/admin/posts/'
datastore = dict()
datastore['title'] = title
if tags is not None:
datastore['tags'] = tags
if authors is not None:
datastore['authors'] = authors
datastore['html'] = text
datastore['status'] = status
ghost_json_url = self.ghost_json_url + '/admin/posts/?source=html'
post = dict()
posts = list()
datastore = dict()
@@ -59,18 +100,160 @@ class GhostRequest:
headers = dict()
# Split the key into ID and SECRET
id, secret = self.api_token.split(':')
# Prepare header and payload
iat = int(date.now().timestamp())
header = {'alg': 'HS256', 'typ': 'JWT', 'kid': id}
payload = {
'iat': iat,
'exp': iat + 5 * 60,
'aud': '/v3/admin/'
}
token = jwt.encode(payload, bytes.fromhex(secret), algorithm='HS256', headers=header)
token = self.encode_token()
headers['Authorization'] = 'Ghost {}'.format(token)
requests.post(ghostjson_url, json=post, headers=headers)
response = requests.post(ghost_json_url, json=post, headers=headers)
if self.debug:
print(datastore)
print(ghost_json_url)
print('\n')
print(post)
print('\n')
print(headers)
print(response.headers)
def upload_image(self,
image):
print(image)
ghost_json_url = self.ghost_json_url + '/admin/images/upload/'
token = self.encode_token()
bashCommand = "curl -X POST -F 'file=@%s' -H \"Authorization: Ghost %s\" %s" % (image, token, ghost_json_url)
proc = subprocess.Popen(bashCommand, shell=True, stdout=subprocess.PIPE)
output = proc.stdout.read().decode('utf-8')
print(output)
self.images.append(json.loads(output)['images'][0]['url'])
def upload_images(self,
folder):
for image in os.listdir(folder):
if 'kpi' in image:
if 'png' in image:
self.upload_image(folder + '/' + image)
print('images %s' % self.images)
def custom_post(self,
folder,
authors,
title='custom'):
self.upload_images(folder)
head = '''<p>This is a custom post created via a script</p>'''
for picture in self.images:
head = head + '<img src="%s"></img>' % picture
head = head + '''<p>This is the end of the example</p>'''
self.create_post(title=title,
text=head,
tags='custom',
authors=authors)
def wifi_capacity_to_ghost(self,
authors,
folders,
title=None,
server_pull=None,
ghost_host=None,
port='22',
user_pull='lanforge',
password_pull='lanforge',
user_push=None,
password_push=None,
customer=None,
testbed='Unknown Testbed',
test_run=None,
target_folders=list(),
grafana_dashboard=None,
grafana_token=None,
grafana_host=None,
grafana_port=3000):
text = ''
csvreader = CSVReader()
if test_run is None:
test_run = sorted(folders)[0].split('/')[-1].strip('/')
for folder in folders:
print(folder)
ssh_pull = paramiko.SSHClient()
ssh_pull.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
ssh_pull.connect(server_pull,
port,
username=user_pull,
password=password_pull,
allow_agent=False,
look_for_keys=False)
scp_pull = SCPClient(ssh_pull.get_transport())
scp_pull.get(folder, recursive=True)
target_folder = str(folder).rstrip('/').split('/')[-1]
target_folders.append(target_folder)
print(target_folder)
try:
target_file = '%s/kpi.csv' % target_folder
print('target file %s' % target_file)
df = csvreader.read_csv(file=target_file, sep='\t')
csv_testbed = csvreader.get_column(df, 'test-rig')[0]
print(csv_testbed)
except:
pass
if len(csv_testbed) > 2:
testbed = csv_testbed
text = text + 'Testbed: %s<br />' % testbed
if testbed == 'Unknown Testbed':
raise UserWarning('Please define your testbed')
print('testbed %s' % testbed)
ssh_push = paramiko.SSHClient()
ssh_push.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
ssh_push.connect(ghost_host,
port,
username=user_push,
password=password_push,
allow_agent=False,
look_for_keys=False)
scp_push = SCPClient(ssh_push.get_transport())
local_path = '/home/%s/%s/%s/%s' % (user_push, customer, testbed, test_run)
transport = paramiko.Transport((ghost_host, port))
transport.connect(None, user_push, password_push)
sftp = paramiko.sftp_client.SFTPClient.from_transport(transport)
print(local_path)
try:
sftp.mkdir(local_path)
except:
print('folder %s already exists' % local_path)
scp_push.put(target_folder, recursive=True, remote_path=local_path)
files = sftp.listdir(local_path + '/' + target_folder)
# print('Files: %s' % files)
for file in files:
if 'pdf' in file:
url = 'http://%s/%s/%s/%s/%s/%s' % (
ghost_host, customer.strip('/'), testbed, test_run, target_folder, file)
text = text + 'PDF of results: <a href="%s">%s</a><br />' % (url, file)
print(url)
scp_pull.close()
scp_push.close()
self.upload_images(target_folder)
for image in self.images:
if 'kpi-' in image:
if '-print' not in image:
text = text + '<img src="%s"></img>' % image
self.images = []
if grafana_token is not None:
GR = GrafanaRequest(grafana_token,
grafana_host,
grafanajson_port=grafana_port
)
GR.create_snapshot(title=grafana_dashboard)
snapshot = GR.list_snapshots()[-1]
text = text + '<iframe src="%s" width="100%s" height=500></iframe>' % (snapshot['externalUrl'], '%')
now = date.now()
if title is None:
title = "%s %s %s %s:%s report" % (now.day, now.month, now.year, now.hour, now.minute)
if grafana_dashboard is not None:
pass
self.create_post(title=title,
text=text,
tags='custom',
authors=authors)

View File

@@ -16,10 +16,10 @@ import json
class GrafanaRequest:
def __init__(self,
_grafana_token,
_grafanajson_host,
_grafanajson_port,
grafanajson_port=3000,
_folderID=0,
_api_token=None,
_headers=dict(),
_overwrite='false',
debug_=False,
@@ -27,9 +27,12 @@ class GrafanaRequest:
self.debug = debug_
self.die_on_error = die_on_error_
self.headers = _headers
self.headers['Authorization'] = 'Bearer ' + _api_token
self.headers['Authorization'] = 'Bearer ' + _grafana_token
self.headers['Content-Type'] = 'application/json'
self.grafanajson_url = "http://%s:%s" % (_grafanajson_host, _grafanajson_port)
self.grafanajson_host = _grafanajson_host
self.grafanajson_port = grafanajson_port
self.grafanajson_token = _grafana_token
self.grafanajson_url = "http://%s:%s" % (_grafanajson_host, grafanajson_port)
self.data = dict()
self.data['overwrite'] = _overwrite
@@ -40,13 +43,14 @@ class GrafanaRequest:
pass
def list_dashboards(self):
url = self.grafanajson_url + '/api/search?folderIds=0&query=&starred=false'
return requests.get(url).text
url = self.grafanajson_url + '/api/search'
print(url)
return json.loads(requests.get(url,headers=self.headers).text)
def create_dashboard(self,
dashboard_name=None,
):
self.grafanajson_url = self.grafanajson_url + "/api/dashboards/db"
grafanajson_url = self.grafanajson_url + "/api/dashboards/db"
datastore = dict()
dashboard = dict()
dashboard['id'] = None
@@ -58,37 +62,59 @@ class GrafanaRequest:
datastore['dashboard'] = dashboard
datastore['overwrite'] = False
data = json.dumps(datastore, indent=4)
return requests.post(self.grafanajson_url, headers=self.headers, data=data, verify=False)
return requests.post(grafanajson_url, headers=self.headers, data=data, verify=False)
def delete_dashboard(self,
dashboard_uid=None):
self.grafanajson_url = self.grafanajson_url + "/api/dashboards/uid/" + dashboard_uid
return requests.post(self.grafanajson_url, headers=self.headers, verify=False)
grafanajson_url = self.grafanajson_url + "/api/dashboards/uid/" + dashboard_uid
return requests.post(grafanajson_url, headers=self.headers, verify=False)
def create_dashboard_from_data(self,
json_file=None):
self.grafanajson_url = self.grafanajson_url + '/api/dashboards/db'
grafanajson_url = self.grafanajson_url + '/api/dashboards/db'
datastore = dict()
dashboard = dict(json.loads(open(json_file).read()))
datastore['dashboard'] = dashboard
datastore['overwrite'] = False
data = json.dumps(datastore, indent=4)
#return print(data)
return requests.post(self.grafanajson_url, headers=self.headers, data=data, verify=False)
return requests.post(grafanajson_url, headers=self.headers, data=data, verify=False)
def create_dashboard_from_dict(self,
dictionary=None):
self.grafanajson_url = self.grafanajson_url + '/api/dashboards/db'
grafanajson_url = self.grafanajson_url + '/api/dashboards/db'
datastore = dict()
dashboard = dict(json.loads(dictionary))
datastore['dashboard'] = dashboard
datastore['overwrite'] = False
data = json.dumps(datastore, indent=4)
#return print(data)
return requests.post(self.grafanajson_url, headers=self.headers, data=data, verify=False)
return requests.post(grafanajson_url, headers=self.headers, data=data, verify=False)
def create_custom_dashboard(self,
datastore=None):
data = json.dumps(datastore, indent=4)
return requests.post(self.grafanajson_url, headers=self.headers, data=data, verify=False)
return requests.post(self.grafanajson_url, headers=self.headers, data=data, verify=False)
def create_snapshot(self, title):
grafanajson_url = self.grafanajson_url + '/api/snapshots'
data=self.get_dashboard(title)
data['expires'] = 3600
data['external'] = True
print(data)
return requests.post(grafanajson_url, headers=self.headers, json=data, verify=False).text
def list_snapshots(self):
grafanajson_url = self.grafanajson_url + '/api/dashboard/snapshots'
print(grafanajson_url)
return json.loads(requests.get(grafanajson_url, headers=self.headers, verify=False).text)
def get_dashboard(self, target):
dashboards = self.list_dashboards()
for dashboard in dashboards:
if dashboard['title'] == target:
uid = dashboard['uid']
grafanajson_url = self.grafanajson_url + '/api/dashboards/uid/' + uid
print(grafanajson_url)
return json.loads(requests.get(grafanajson_url, headers=self.headers, verify=False).text)

File diff suppressed because it is too large Load Diff

View File

@@ -88,7 +88,7 @@ class CreateStation(Realm):
def main():
parser = LFCliBase.create_basic_argparse(
parser = LFCliBase.create_basic_argparse( # see create_basic_argparse in ../py-json/LANforge/lfcli_base.py
prog='create_station.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
@@ -101,6 +101,7 @@ def main():
Command example:
./create_station.py
--radio wiphy0
--start_id 2
--num_stations 3
--security open
--ssid netgear
@@ -108,7 +109,7 @@ Command example:
--debug
''')
required = parser.add_argument_group('required arguments')
# required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True)
required.add_argument('--start_id', help='--start_id <value> default 0', default=0)
args = parser.parse_args()
# if args.debug:
@@ -117,16 +118,22 @@ Command example:
if (args.radio is None):
raise ValueError("--radio required")
start_id = 0
if (args.start_id != 0):
start_id = int(args.start_id)
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.port_name_series(prefix="sta",
start_id=0,
end_id=num_sta - 1,
start_id=start_id,
end_id=start_id + num_sta - 1,
padding_number=10000,
radio=args.radio)
print("station_list {}".format(station_list))
set_txo_data={
"txo_enable": 1,
"txpower": 255,

View File

@@ -6,6 +6,14 @@ PURPOSE: modify ghost database from the command line.
SETUP: A Ghost installation which the user has admin access to.
EXAMPLE: ./ghost_profile.py --article_text_file text.txt --title Test --authors Matthew --ghost_token SECRET_KEY --host 192.168.1.1
There is a specific class for uploading wifi capacity graphs called wifi_capacity.
EXAMPLE: ./ghost_profile.py --ghost_token TOKEN --ghost_host 192.168.100.147
--folders /home/lanforge/html-reports/wifi-capacity-2021-06-04-02-51-07
--wifi_capacity appl --authors Matthew --title 'wifi capacity 2021 06 04 02 51 07' --server 192.168.93.51
--user_pull lanforge --password_pull lanforge --customer candela --testbed heather --test_run test-run-6
--user_push matt --password_push PASSWORD
Matthew Stidham
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
@@ -23,9 +31,9 @@ if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-dashboard'))
from GhostRequest import GhostRequest
from LANforge.lfcli_base import LFCliBase
class UseGhost(LFCliBase):
class UseGhost:
def __init__(self,
_ghost_token=None,
host="localhost",
@@ -34,11 +42,13 @@ class UseGhost(LFCliBase):
_exit_on_fail=False,
_ghost_host="localhost",
_ghost_port=2368, ):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.ghost_host = _ghost_host
self.ghost_port = _ghost_port
self.ghost_token = _ghost_token
self.GP = GhostRequest(self.ghost_host, str(self.ghost_port), _api_token=self.ghost_token)
self.GP = GhostRequest(self.ghost_host,
str(self.ghost_port),
_api_token=self.ghost_token,
debug_=_debug_on)
def create_post(self, title, text, tags, authors):
return self.GP.create_post(title=title, text=text, tags=tags, authors=authors)
@@ -47,9 +57,56 @@ class UseGhost(LFCliBase):
text = open(file).read()
return self.GP.create_post(title=title, text=text, tags=tags, authors=authors)
def upload_image(self, image):
return self.GP.upload_image(image)
def upload_images(self, folder):
return self.GP.upload_images(folder)
def custom_post(self, folder, authors):
return self.GP.custom_post(folder, authors)
def wifi_capacity(self,
authors,
folders,
title,
server_pull,
ghost_host,
port,
user_pull,
password_pull,
user_push,
password_push,
customer,
testbed,
test_run,
grafana_dashboard,
grafana_token,
grafana_host,
grafana_port):
target_folders = list()
return self.GP.wifi_capacity_to_ghost(authors,
folders,
title,
server_pull,
ghost_host,
port,
user_pull,
password_pull,
user_push,
password_push,
customer,
testbed,
test_run,
target_folders,
grafana_dashboard,
grafana_token,
grafana_host,
grafana_port)
def main():
parser = LFCliBase.create_basic_argparse(
parser = argparse.ArgumentParser(
prog='ghost_profile.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''Manage Ghost Website''',
@@ -71,17 +128,68 @@ def main():
optional.add_argument('--article_tags', action='append')
optional.add_argument('--authors', action='append')
optional.add_argument('--title', default=None)
optional.add_argument('--image', default=None)
optional.add_argument('--folder', default=None)
optional.add_argument('--custom_post', default=None)
optional.add_argument('--wifi_capacity', default=None)
optional.add_argument('--folders', action='append', default=None)
optional.add_argument('--server_pull')
optional.add_argument('--port', default=22)
optional.add_argument('--user_pull', default='lanforge')
optional.add_argument('--password_pull', default='lanforge')
optional.add_argument('--user_push')
optional.add_argument('--password_push')
optional.add_argument('--customer')
optional.add_argument('--testbed')
optional.add_argument('--test_run', default=None)
optional.add_argument('--grafana_dashboard')
optional.add_argument('--grafana_token', default=None)
optional.add_argument('--grafana_host', default=None)
optional.add_argument('--grafana_port', default=3000)
optional.add_argument('--debug')
args = parser.parse_args()
Ghost = UseGhost(_ghost_token=args.ghost_token,
_ghost_port=args.ghost_port,
_ghost_host=args.ghost_host)
_ghost_host=args.ghost_host,
_debug_on=args.debug)
if args.create_post is not None:
Ghost.create_post(args.title, args.article_text, args.article_tags, args.authors)
if args.article_text_file is not None:
Ghost.create_post_from_file(args.title, args.article_text_file, args.article_tags, args.authors)
if args.image is not None:
Ghost.upload_image(args.image)
if args.custom_post is not None:
if args.folders is not None:
Ghost.custom_post(args.folders, args.authors)
else:
Ghost.custom_post(args.folder, args.authors)
else:
if args.folder is not None:
Ghost.upload_images(args.folder)
if args.wifi_capacity is not None:
Ghost.wifi_capacity(args.authors,
args.folders,
args.title,
args.server_pull,
args.ghost_host,
args.port,
args.user_pull,
args.password_pull,
args.user_push,
args.password_push,
args.customer,
args.testbed,
args.test_run,
args.grafana_dashboard,
args.grafana_token,
args.grafana_host,
args.grafana_port)
if __name__ == "__main__":
main()

View File

@@ -24,36 +24,37 @@ import string
import random
class UseGrafana(LFCliBase):
def __init__(self,
_grafana_token,
host="localhost",
_grafana_host="localhost",
port=8080,
_debug_on=False,
_exit_on_fail=False,
_grafana_port=3000):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.grafana_token = _grafana_token
self.grafana_port = _grafana_port
self.grafana_host = _grafana_host
self.GR = GrafanaRequest(self.grafana_host, str(self.grafana_port), _folderID=0, _api_token=self.grafana_token)
#!/usr/bin/env python3
def create_dashboard(self,
dashboard_name):
return self.GR.create_dashboard(dashboard_name)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Class holds default settings for json requests to Grafana -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
import sys
def delete_dashboard(self,
dashboard_uid):
return self.GR.delete_dashboard(dashboard_uid)
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit()
def list_dashboards(self):
return self.GR.list_dashboards()
import requests
def create_dashboard_from_data(self,
json_file):
return self.GR.create_dashboard_from_data(json_file=json_file)
import json
#!/usr/bin/env python3
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Class holds default settings for json requests to Grafana -
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit()
import requests
import json
class UseGrafana(GrafanaRequest):
def groupby(self, params, grouptype):
dic = dict()
dic['params'] = list()
@@ -301,7 +302,6 @@ class UseGrafana(LFCliBase):
return dict(zip(graph_group, units))
def main():
parser = LFCliBase.create_basic_argparse(
prog='grafana_profile.py',
@@ -353,11 +353,13 @@ def main():
optional.add_argument('--from_date', help='Date you want to start your Grafana dashboard from', default='now-1y')
optional.add_argument('--graph_height', help='Custom height for the graph on grafana dashboard', default=8)
optional.add_argument('--graph_width', help='Custom width for the graph on grafana dashboard', default=12)
optional.add_argument('--create_snapshot', action='store_true')
optional.add_argument('--list_snapshots', action='store_true')
args = parser.parse_args()
Grafana = UseGrafana(args.grafana_token,
args.grafana_port,
args.grafana_host
args.grafana_host,
grafanajson_port=args.grafana_port
)
if args.dashboard_name is not None:
Grafana.create_dashboard(args.dashboard_name)
@@ -386,6 +388,13 @@ def main():
graph_height=args.graph_height,
graph__width=args.graph_width)
if args.create_snapshot:
Grafana.create_snapshot(args.title)
if args.list_snapshots:
Grafana.list_snapshots()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,20 @@
{
"mgr":"192.168.0.101",
"port":"8080",
"lf_user":"lanforge",
"lf_password":"lanforge",
"instance_name":"dataplane-instance",
"config_name":"test_con",
"upstream":"1.1.eth1",
"dut":"asus_5g",
"duration":"15s",
"station":"1.1.eth2",
"download_speed":"85%",
"upload_speed":"0",
"pull_report": true,
"raw_line": ["pkts: Custom;60;MTU", "cust_pkt_sz: 88 1200", "directions: DUT Transmit", "traffic_types: UDP", "bandw_options: 20", "spatial_streams: 1"]
}

View File

@@ -215,11 +215,16 @@ class DataplaneTest(cv_test):
def main():
parser = argparse.ArgumentParser("""
parser = argparse.ArgumentParser(description="""
IMPORTANT: Start lanforge with socket 3990 : ./lfclient.bash -cli-socket 3990
lfclient.bash is located in the LANforgeGUI_X.X.X directory
On local or remote system: ./lfclient.bash -cli-socket 3990 -s LF_MGR
On local system the -s LF_MGR will be local_host if not provided
Open this file in an editor and read the top notes for more details.
Example:
./lf_dataplane_test.py --mgr localhost --port 8080 --lf_user lanforge --lf_password lanforge \
--instance_name dataplane-instance --config_name test_con --upstream 1.1.eth2 \
--dut linksys-8450 --duration 15s --station 1.1.sta01500 \
@@ -234,11 +239,52 @@ def main():
--influx_bucket ben \
--influx_tag testbed Ferndale-01
Example 2:
./lf_dataplane_test.py --json <name>.json
see sample json file: lf_dataplane_config.json
Sample <name>.json between using eth1 and eth2
{
"mgr":"192.168.0.101",
"port":"8080",
"lf_user":"lanforge",
"lf_password":"lanforge",
"instance_name":"dataplane-instance",
"config_name":"test_con",
"upstream":"1.1.eth1",
"dut":"asus_5g",
"duration":"15s",
"station":"1.1.eth2",
"download_speed":"85%",
"upload_speed":"0",
"raw_line": ["pkts: Custom;60;MTU", "cust_pkt_sz: 88 1200", "directions: DUT Transmit", "traffic_types: UDP", "bandw_options: 20", "spatial_streams: 1"]
}
Sample <name>.json between using eth1 and station 1.1.sta0002
{
"mgr":"192.168.0.101",
"port":"8080",
"lf_user":"lanforge",
"lf_password":"lanforge",
"instance_name":"dataplane-instance",
"config_name":"test_con",
"upstream":"1.1.eth1",
"dut":"asus_5g",
"duration":"15s",
"station":"1.1.sta0002",
"download_speed":"85%",
"upload_speed":"0",
"raw_line": ["pkts: Custom;60;MTU", "cust_pkt_sz: 88 1200", "directions: DUT Transmit", "traffic_types: UDP", "bandw_options: 20", "spatial_streams: 1"]
}
"""
)
cv_add_base_parser(parser) # see cv_test_manager.py
parser.add_argument('--json', help="--json <config.json> json input file", default="")
parser.add_argument("-u", "--upstream", type=str, default="",
help="Upstream port for wifi capacity test ex. 1.1.eth2")
parser.add_argument("--station", type=str, default="",
@@ -247,7 +293,7 @@ def main():
parser.add_argument("--dut", default="",
help="Specify DUT used by this test, example: linksys-8450")
parser.add_argument("--download_speed", default="",
help="Specify requested download speed. Percentage of theoretical is also supported. Default: 85%")
help="Specify requested download speed. Percentage of theoretical is also supported. Default: 85%%.")
parser.add_argument("--upload_speed", default="",
help="Specify requested upload speed. Percentage of theoretical is also supported. Default: 0")
parser.add_argument("--duration", default="",
@@ -257,6 +303,47 @@ def main():
args = parser.parse_args()
# use json config file
if args.json != "":
try:
with open(args.json, 'r') as json_config:
json_data = json.load(json_config)
except:
print("Error reading {}".format(args.json))
# json configuation takes presidence to command line
# TODO see if there is easier way to search presence, look at parser args
if "mgr" in json_data:
args.mgr = json_data["mgr"]
if "port" in json_data:
args.port = json_data["port"]
if "lf_user" in json_data:
args.lf_user = json_data["lf_user"]
if "lf_password" in json_data:
args.lf_password = json_data["lf_password"]
if "instance_name" in json_data:
args.instance_name = json_data["instance_name"]
if "config_name" in json_data:
args.config_name = json_data["config_name"]
if "upstream" in json_data:
args.upstream = json_data["upstream"]
if "dut" in json_data:
args.dut = json_data["dut"]
if "duration" in json_data:
args.duration = json_data["duration"]
if "station" in json_data:
args.station = json_data["station"]
if "download_speed" in json_data:
args.download_speed = json_data["download_speed"]
if "upload_speed" in json_data:
args.upload_speed = json_data["upload_speed"]
if "pull_report" in json_data:
args.pull_report = json_data["pull_report"]
if "raw_line" in json_data:
# the json_data is a list , need to make into a list of lists, to match command line raw_line paramaters
# https://www.tutorialspoint.com/convert-list-into-list-of-lists-in-python
json_data_tmp = [[x] for x in json_data["raw_line"]]
args.raw_line = json_data_tmp
cv_base_adjust_parser(args)
CV_Test = DataplaneTest(lf_host=args.mgr,

View File

@@ -0,0 +1,334 @@
#!/usr/bin/env python3
"""
Note: To Run this script gui should be opened with
path: cd LANforgeGUI_5.4.3 (5.4.3 can be changed with GUI version)
pwd (Output : /home/lanforge/LANforgeGUI_5.4.3)
./lfclient.bash -cli-socket 3990
This script is used to automate running Dataplane tests. You
may need to view a Dataplane test configured through the GUI to understand
the options and how best to input data.
./lf_dataplane_test.py --mgr localhost --port 8080 --lf_user lanforge --lf_password lanforge \
--instance_name dataplane-instance --config_name test_con --upstream 1.1.eth2 \
--dut linksys-8450 --duration 15s --station 1.1.sta01500 \
--download_speed 85% --upload_speed 0 \
--raw_line 'pkts: Custom;60;142;256;512;1024;MTU' \
--raw_line 'cust_pkt_sz: 88 1200' \
--raw_line 'directions: DUT Transmit;DUT Receive' \
--raw_line 'traffic_types: UDP;TCP' \
--test_rig Testbed-01 --pull_report \
--influx_host c7-graphana --influx_port 8086 --influx_org Candela \
--influx_token=-u_Wd-L8o992701QF0c5UmqEp7w7Z7YOMaWLxOMgmHfATJGnQbbmYyNxHBR9PgD6taM_tcxqJl6U8DjU1xINFQ== \
--influx_bucket ben \
--influx_tag testbed Ferndale-01
Note:
--raw_line 'line contents' will add any setting to the test config. This is
useful way to support any options not specifically enabled by the
command options.
--set modifications will be applied after the other config has happened,
so it can be used to override any other config.
Example of raw text config for Dataplane, to show other possible options:
show_events: 1
show_log: 0
port_sorting: 0
kpi_id: Dataplane Pkt-Size
notes0: ec5211 in bridge mode, wpa2 auth.
bg: 0xE0ECF8
test_rig:
show_scan: 1
auto_helper: 0
skip_2: 0
skip_5: 0
skip_5b: 1
skip_dual: 0
skip_tri: 1
selected_dut: ea8300
duration: 15000
traffic_port: 1.1.157 sta01500
upstream_port: 1.1.2 eth2
path_loss: 10
speed: 85%
speed2: 0Kbps
min_rssi_bound: -150
max_rssi_bound: 0
channels: AUTO
modes: Auto
pkts: Custom;60;142;256;512;1024;MTU
spatial_streams: AUTO
security_options: AUTO
bandw_options: AUTO
traffic_types: UDP;TCP
directions: DUT Transmit;DUT Receive
txo_preamble: OFDM
txo_mcs: 0 CCK, OFDM, HT, VHT
txo_retries: No Retry
txo_sgi: OFF
txo_txpower: 15
attenuator: 0
attenuator2: 0
attenuator_mod: 255
attenuator_mod2: 255
attenuations: 0..+50..950
attenuations2: 0..+50..950
chamber: 0
tt_deg: 0..+45..359
cust_pkt_sz: 88 1200
show_bar_labels: 1
show_prcnt_tput: 0
show_3s: 0
show_ll_graphs: 0
show_gp_graphs: 1
show_1m: 1
pause_iter: 0
outer_loop_atten: 0
show_realtime: 1
operator:
mconn: 1
mpkt: 1000
tos: 0
loop_iterations: 1
"""
import sys
import os
import argparse
import time
import json
from os import path
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
from cv_test_manager import cv_test
from cv_test_manager import *
class DataplaneTest(cv_test):
def __init__(self,
lf_host="localhost",
lf_port=8080,
lf_user="lanforge",
lf_password="lanforge",
ssh_port=22,
local_path="",
instance_name="dpt_instance",
config_name="dpt_config",
upstream="1.1.eth2",
pull_report=False,
load_old_cfg=False,
upload_speed="0",
download_speed="85%",
duration="15s",
station="1.1.sta01500",
dut="NA",
enables=[],
disables=[],
raw_lines=[],
raw_lines_file="",
sets=[],
graph_groups=None,
report_dir=""
):
super().__init__(lfclient_host=lf_host, lfclient_port=lf_port)
self.lf_host = lf_host
self.lf_port = lf_port
self.lf_user = lf_user
self.lf_password = lf_password
self.instance_name = instance_name
self.config_name = config_name
self.dut = dut
self.duration = duration
self.upstream = upstream
self.station = station
self.pull_report = pull_report
self.load_old_cfg = load_old_cfg
self.test_name = "Dataplane"
self.upload_speed = upload_speed
self.download_speed = download_speed
self.enables = enables
self.disables = disables
self.raw_lines = raw_lines
self.raw_lines_file = raw_lines_file
self.sets = sets
self.graph_groups = graph_groups
self.report_dir = report_dir
self.ssh_port = ssh_port
self.local_path = local_path
def setup(self):
# Nothing to do at this time.
return
def run(self):
self.sync_cv()
time.sleep(2)
self.sync_cv()
blob_test = "dataplane-test-latest-"
self.rm_text_blob(self.config_name, blob_test) # To delete old config with same name
self.show_text_blob(None, None, False)
# Test related settings
cfg_options = []
### HERE###
self.apply_cfg_options(cfg_options, self.enables, self.disables, self.raw_lines, self.raw_lines_file)
# cmd line args take precedence and so come last in the cfg array.
if self.upstream != "":
cfg_options.append("upstream_port: " + self.upstream)
if self.station != "":
cfg_options.append("traffic_port: " + self.station)
if self.download_speed != "":
cfg_options.append("speed: " + self.download_speed)
if self.upload_speed != "":
cfg_options.append("speed2: " + self.upload_speed)
if self.duration != "":
cfg_options.append("duration: " + self.duration)
if self.dut != "":
cfg_options.append("selected_dut: " + self.dut)
# We deleted the scenario earlier, now re-build new one line at a time.
self.build_cfg(self.config_name, blob_test, cfg_options)
cv_cmds = []
self.create_and_run_test(self.load_old_cfg, self.test_name, self.instance_name,
self.config_name, self.sets,
self.pull_report, self.lf_host, self.lf_user, self.lf_password,
cv_cmds, ssh_port=self.ssh_port, local_path=self.local_path,
graph_groups_file=self.graph_groups)
self.rm_text_blob(self.config_name, blob_test) # To delete old config with same name
def main():
parser = argparse.ArgumentParser("""
Open this file in an editor and read the top notes for more details.
Example:
./lf_dataplane_test.py --mgr localhost --port 8080 --lf_user lanforge --lf_password lanforge \
--instance_name dataplane-instance --config_name test_con --upstream 1.1.eth2 \
--dut linksys-8450 --duration 15s --station 1.1.sta01500 \
--download_speed 85% --upload_speed 0 \
--raw_line 'pkts: Custom;60;142;256;512;1024;MTU' \
--raw_line 'cust_pkt_sz: 88 1200' \
--raw_line 'directions: DUT Transmit;DUT Receive' \
--raw_line 'traffic_types: UDP;TCP' \
--test_rig Testbed-01 --pull_report \
--influx_host c7-graphana --influx_port 8086 --influx_org Candela \
--influx_token=-u_Wd-L8o992701QF0c5UmqEp7w7Z7YOMaWLxOMgmHfATJGnQbbmYyNxHBR9PgD6taM_tcxqJl6U8DjU1xINFQ== \
--influx_bucket ben \
--influx_tag testbed Ferndale-01
"""
)
cv_add_base_parser(parser) # see cv_test_manager.py
parser.add_argument('--json', help="--json <config.json> json input file", default="")
parser.add_argument("-u", "--upstream", type=str, default="",
help="Upstream port for wifi capacity test ex. 1.1.eth2")
parser.add_argument("--station", type=str, default="",
help="Station to be used in this test, example: 1.1.sta01500")
parser.add_argument("--dut", default="",
help="Specify DUT used by this test, example: linksys-8450")
parser.add_argument("--download_speed", default="",
help="Specify requested download speed. Percentage of theoretical is also supported. Default: 85%")
parser.add_argument("--upload_speed", default="",
help="Specify requested upload speed. Percentage of theoretical is also supported. Default: 0")
parser.add_argument("--duration", default="",
help="Specify duration of each traffic run")
parser.add_argument("--graph_groups", help="File to save graph_groups to", default=None)
parser.add_argument("--report_dir", default="")
args = parser.parse_args()
# TODO
if args.json != "":
try:
with open(args.json, 'r') as json_config:
json_data = json.load(json_config)
except:
print("Error reading {}".format(args.json))
# json configuation takes presidence to command line
# TODO see if there is easier way to search presence, look at parser args
if "mgr" in json_data:
args.mgr = json_data["mgr"]
if "port" in json_data:
args.port = json_data["port"]
if "lf_user" in json_data:
args.lf_user = json_data["lf_user"]
if "lf_password" in json_data:
args.lf_password = json_data["lf_password"]
if "instance_name" in json_data:
args.instance_name = json_data["instance_name"]
if "config_name" in json_data:
args.config_name = json_data["config_name"]
if "upstream" in json_data:
args.upstream = json_data["upstream"]
if "dut" in json_data:
args.dut = json_data["dut"]
if "duration" in json_data:
args.duration = json_data["duration"]
if "station" in json_data:
args.station = json_data["station"]
if "download_speed" in json_data:
args.download_speed = json_data["download_speed"]
if "upload_speed" in json_data:
args.upload_speed = json_data["upload_speed"]
if "raw_line" in json_data:
# the json_data is a list , need to make into a list of lists, to match command line raw_line paramaters
# https://www.tutorialspoint.com/convert-list-into-list-of-lists-in-python
json_data_tmp = [[x] for x in json_data["raw_line"]]
args.raw_line = json_data_tmp
cv_base_adjust_parser(args)
print(args)
#exit(1)
# if json present use json config will override
CV_Test = DataplaneTest(lf_host = args.mgr,
lf_port = args.port,
lf_user = args.lf_user,
lf_password = args.lf_password,
instance_name = args.instance_name,
config_name = args.config_name,
upstream = args.upstream,
pull_report = args.pull_report,
load_old_cfg = args.load_old_cfg,
download_speed = args.download_speed,
upload_speed = args.upload_speed,
duration = args.duration,
dut = args.dut,
station = args.station,
enables = args.enable,
disables = args.disable,
raw_lines = args.raw_line, # this is interesting.
raw_lines_file = args.raw_lines_file,
sets = args.set,
graph_groups = args.graph_groups
)
CV_Test.setup()
CV_Test.run()
CV_Test.check_influx_kpi(args)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,118 @@
#!/usr/bin/python3
'''
NAME:
lf_read_json.py
PURPOSE:
Test out reading configuration data from a .json style config file
EXAMPLE:
./lf_read_json.py --file <name>.json
NOTES:
TO DO NOTES:
'''
import sys
if sys.version_info[0] != 3:
print("This script requires Python3")
exit()
from time import sleep
import argparse
import json
class lf_read_json():
def __init__(self):
self.timeout = 10
def preprocess_data(self):
pass
def main():
# arguments
parser = argparse.ArgumentParser(
prog='lf_read_json.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
lf_read_json.py : read json
''',
description='''\
lf_read_json.py
-----------
Summary :
---------
./lf_dataplane_json.py --mgr 192.168.0.101 --port 8080 --lf_user lanforge --lf_password lanforge --instance_name dataplane-instance --config_name test_con --upstream 1.1.eth1 --dut asus_5g --duration 15s --station 1.1.13.sta0002 --download_speed 85% --upload_speed 0 --raw_line 'pkts: Custom;60;MTU' --raw_line 'cust_pkt_sz: 88 1200' --raw_line 'directions: DUT Transmit' --raw_line 'traffic_types: UDP' --raw_line 'bandw_options: 20' --raw_line 'spatial_streams: 1
''')
parser.add_argument('--json', help="--json <config.json> json input file", default="config.json")
args = parser.parse_args()
config_json = args.json
print("config_json {}".format(config_json))
with open(config_json, 'r') as config_file:
config_data = json.load(config_file)
print(config_data)
print("mgr: {}".format(config_data["mgr"]))
#print("raw_line: {}".format(config_data["raw_line"]))
raw = []
raw = config_data["raw_line"]
print(raw)
# raw is a list
raw2 = [[x] for x in raw]
print(raw2)
'''
for r in raw_lines:
cfg_options.append(r[0])
'''
'''./lf_dataplane_json.py --mgr 192.168.0.101 --port 8080 --lf_user lanforge --lf_password lanforge --instance_name dataplane-instance --config_name test_con --upstream 1.1.eth1 --dut asus_5g --duration 15s --station 1.1.13.sta0002 --download_speed 85% --upload_speed 0 --raw_line 'pkts: Custom;60;MTU' --raw_line 'cust_pkt_sz: 88 1200' --raw_line 'directions: DUT Transmit' --raw_line 'traffic_types: UDP' --raw_line 'bandw_options: 20' --raw_line 'spatial_streams: 1'
Namespace(config_name='test_con', disable=[], download_speed='85%', duration='15s', dut='asus_5g', enable=[], graph_groups=None, influx_bucket=None, influx_host=None, influx_org=None, influx_port=8086, influx_tag=[], influx_token=None, instance_name='dataplane-instance', json='', lf_password='lanforge', lf_user='lanforge', load_old_cfg=False, mgr='192.168.0.101', port=8080, pull_report=False,
correct version:
raw_line=[['pkts: Custom;60;MTU'], ['cust_pkt_sz: 88 1200'], ['directions: DUT Transmit'], ['traffic_types: UDP'], ['bandw_options: 20'], ['spatial_streams: 1']], raw_lines_file='', report_dir='', set=[], station='1.1.13.sta0002', test_rig='', upload_speed='0', upstream='1.1.eth1')
'''
''' Incorrect version
raw_line={'pkts': ['Custom', '60', 'MTU'], 'cust_pkt_sz': ['88', '1200'], 'directions': 'DUT Transmit', 'traffic_types': 'UDP', 'bandw_options': '20', 'stpatial_streams': '1'}
'''
'''cfg_options = []
for r in raw:
print(r)
test = '{}:{}'.format(r,raw[r])
cfg_options.append(test)
print(cfg_options)
'''
#dave = []
#for key,val in raw.items(): dave.append(raw.items())
#print(dave)
if "mgr" in config_data:
print("mgr present")
print("END lf_read_json.py")
if __name__ == "__main__":
main()

View File

@@ -4,6 +4,7 @@
NAME: test_ipv4_connection.py
PURPOSE:
This scripts functionality has been replaced by test_ip_connection.py, consider this script deprecated
test_ipv4_connection.py will create stations and attempt to connect to an SSID. WPA, WPA2, WPA3, WEP, and Open connection types are supported
Script for creating a variable number of stations and attempting to connect them to an SSID.
@@ -27,7 +28,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import LANforge
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils

View File

@@ -26,7 +26,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../py-json')
sys.path.append('../../py-json')
import argparse
from LANforge.lfcli_base import LFCliBase

View File

@@ -25,7 +25,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../py-json')
sys.path.append('../../py-json')
import argparse
from LANforge.lfcli_base import LFCliBase

View File

@@ -25,7 +25,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import argparse
from LANforge.lfcli_base import LFCliBase

View File

@@ -26,7 +26,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../py-json')
sys.path.append('../../py-json')
import argparse
from LANforge.lfcli_base import LFCliBase

View File

@@ -27,7 +27,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import argparse
from LANforge.lfcli_base import LFCliBase

View File

@@ -25,7 +25,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../py-json')
sys.path.append('../../py-json')
import argparse
from LANforge.lfcli_base import LFCliBase

View File

@@ -32,7 +32,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import argparse
from LANforge import LFUtils

View File

@@ -4,6 +4,7 @@
NAME: test_ipv6_connection.py
PURPOSE:
This scripts functionality has been replaced by test_ip_connection.py, consider this script deprecated
test_ipv6_connection.py will create stations and attempt to connect to an SSID using IPv6. WPA, WPA2, WPA3, WEP, and Open connection types are supported
Script for creating a variable number of stations and attempting to connect them to an SSID using IPv6.
@@ -26,7 +27,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import LANforge
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils

View File

@@ -28,7 +28,7 @@ if sys.version_info[0] != 3:
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
import argparse
from LANforge.lfcli_base import LFCliBase

292
py-scripts/test_ip_connection.py Executable file
View File

@@ -0,0 +1,292 @@
#!/usr/bin/env python3
"""
NAME: test_ip_connection.py
This script combines functionality of test_ipv4_connection.py and test_ipv6_connection.py.
test_ipv4_connection.py and test_ipv6_connection.py are located in py-scripts/scripts_deprecated
PURPOSE:
test_ip_connection.py will create stations and attempt to connect to an SSID. WPA, WPA2, WPA3, WEP, and Open connection types are supported
Script for creating a variable number of stations and attempting to connect them to an SSID.
A test will run to verify stations are associated and get an IP, if these conditions are both true, the test will
pass, otherwise, the test will fail.
EXAMPLE:
./test_ip_connection.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security open --ssid netgear --passwd BLANK --debug
./test_ip_connection.py --upstream_port eth1 --ipv6 --radio wiphy0 --num_stations 3 --proxy --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --mode 1 --ap "00:0e:8e:78:e1:76" --test_id --timeout 120 --debug
Use './test_ip_connection.py' --help to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
import argparse
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
import LANforge
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import realm
import time
import pprint
class ConnectTest(LFCliBase):
def __init__(self,
_ssid=None,
_security=None,
_password=None,
_host=None,
_port=None,
_sta_list=None,
_number_template="00000",
_radio="wiphy0",
_proxy_str=None,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False,
_ap=None,
_ipv6=False,
_mode=0,
_num_stations=0,
_timeout=120):
super().__init__(_host,
_port,
_proxy_str=_proxy_str,
_local_realm=realm.Realm(lfclient_host=_host,
lfclient_port=_port,
_exit_on_error=_exit_on_error,
_exit_on_fail=_exit_on_fail,
_proxy_str=_proxy_str,
debug_=_debug_on),
_debug=_debug_on,
_exit_on_fail=_exit_on_fail)
self.host = _host
self.port = _port
self.ssid = _ssid
self.security = _security
self.password = _password
self.sta_list = _sta_list
self.radio = _radio
self.timeout = 120
self.number_template = _number_template
self.debug = _debug_on
self.ap = _ap
self.mode = _mode
self.ipv6 = _ipv6
self.num_stations = _num_stations
self.station_profile = self.local_realm.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
if self.debug:
print("----- Station List ----- ----- ----- ----- ----- ----- \n")
pprint.pprint(self.sta_list)
print("---- ~Station List ----- ----- ----- ----- ----- ----- \n")
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
def start(self, sta_list, print_pass, print_fail):
self.station_profile.admin_up()
associated_map = {}
ip_map = {}
print("Starting test...")
for sec in range(self.timeout):
for sta_name in sta_list:
shelf = self.local_realm.name_to_eid(sta_name)[0]
resource = self.local_realm.name_to_eid(sta_name)[1]
name = self.local_realm.name_to_eid(sta_name)[2]
if self.ipv6:
url = "port/%s/%s/%s?fields=port,alias,ipv6+address,ap" % (shelf, resource, name)
else:
url = "port/%s/%s/%s?fields=port,alias,ip,ap" % (shelf, resource, name)
sta_status = self.json_get(url, debug_=self.debug)
if self.debug:
print(sta_status)
if sta_status is None or sta_status['interface'] is None or sta_status['interface']['ap'] is None:
continue
if (len(sta_status['interface']['ap']) == 17) and (sta_status['interface']['ap'][-3] == ':'):
associated_map[sta_name] = 1
if self.debug:
if self.ipv6:
print("Associated", sta_name, sta_status['interface']['ap'], sta_status['interface']['ipv6 address'])
else:
print("Associated", sta_name, sta_status['interface']['ap'], sta_status['interface']['ip'])
if self.ipv6:
if sta_status['interface']['ipv6 address'] != 'DELETED' and \
not sta_status['interface']['ipv6 address'].startswith('fe80') \
and sta_status['interface']['ipv6 address'] != 'AUTO':
ip_map[sta_name] = 1
if self.debug:
print("IPv6 address:", sta_name, sta_status['interface']['ap'],
sta_status['interface']['ipv6 address'])
else:
if sta_status['interface']['ip'] != '0.0.0.0':
ip_map[sta_name] = 1
if self.debug:
print("IP", sta_name, sta_status['interface']['ap'], sta_status['interface']['ip'])
if (len(sta_list) == len(ip_map)) and (len(sta_list) == len(associated_map)):
break
else:
time.sleep(1)
if self.debug:
print("sta_list", len(sta_list), sta_list)
print("ip_map", len(ip_map), ip_map)
print("associated_map", len(associated_map), associated_map)
if (len(sta_list) == len(ip_map)) and (len(sta_list) == len(associated_map)):
self._pass("PASS: All stations associated with IP", print_pass)
else:
self._fail("FAIL: Not all stations able to associate/get IP", print_fail)
print("sta_list", sta_list)
print("ip_map", ip_map)
print("associated_map", associated_map)
return self.passes()
def stop(self):
# Bring stations down
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list, debug_=self.debug)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url,
port_list=sta_list,
debug=self.debug)
time.sleep(1)
def main():
parser = LFCliBase.create_basic_argparse(
prog='test_ip_connection.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations that attempt to authenticate, associate, and receive IP addresses on the
chosen SSID
''',
description='''\
test_ip_connection.py
--------------------------------------
Generic ipv6 command example:
python3 ./test_ip_connection.py
--upstream_port eth1
--radio wiphy0
--num_stations 3
--ipv6
--proxy
--security {open|wep|wpa|wpa2|wpa3}
--ssid netgear
--passwd admin123
--mode 1
--ap "00:0e:8e:78:e1:76"
--test_id
--timeout 120
--debug
Generic ipv4 command example:
./test_ip_connection.py
--upstream_port eth1
--radio wiphy0
--num_stations 3
--security open
--ssid netgear
--passwd BLANK
--debug''')
required = None
for agroup in parser._action_groups:
if agroup.title == "required arguments":
required = agroup
# if required is not None:
optional = None
for agroup in parser._action_groups:
if agroup.title == "optional arguments":
optional = agroup
if optional is not None:
optional.add_argument("--ipv6", help="Use ipv6 connections instead of ipv4", action="store_true", default=False)
optional.add_argument("--ap", help="Add BSSID of access point to connect to")
optional.add_argument('--mode', help=LFCliBase.Help_Mode)
optional.add_argument('--timeout',
help='--timeout sets the length of time to wait until a connection is successful',
default=30)
args = parser.parse_args()
if (args.radio is None):
raise ValueError("--radio required")
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.port_name_series(prefix="sta",
start_id=0,
end_id=num_sta - 1,
padding_number=10000,
radio=args.radio)
if args.debug:
print("args.proxy: %s" % args.proxy)
ip_test = ConnectTest(_host=args.mgr,
_port=args.mgr_port,
_ssid=args.ssid,
_password=args.passwd,
_security=args.security,
_sta_list=station_list,
_radio=args.radio,
_proxy_str=args.proxy,
_debug_on=args.debug,
_ipv6=args.ipv6,
_ap=args.ap,
_mode=args.mode,
_timeout=args.timeout)
ip_test.cleanup(station_list)
ip_test.build()
if not ip_test.passes():
print(ip_test.get_fail_message())
ip_test.add_event(name="test_ip_connection.py", message=ip_test.get_fail_message())
ip_test.exit_fail()
ip_test.start(station_list, False, False)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
ip_test.add_event(name="test_ip_connection.py", message=ip_test.get_fail_message())
ip_test.exit_fail()
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
ip_test.add_event(name="test_ip_connection.py", message="Full test passed, all stations associated and got IP")
ip_test.exit_success()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,520 @@
#!/usr/bin/env python3
"""
NAME: test_ip_variable_time.py
PURPOSE:
test_ip_variable_time.py will create stations and endpoints to generate and verify layer-3 traffic over ipv4 or ipv6.
This script replaces the functionality of test_ipv4_variable_time.py and test_ipv6_variable_time.py
This Script has two working modes:
Mode 1:
When station is not available,
This script will create a variable number of stations each with their own set of cross-connects and endpoints.
It will then create layer 3 traffic over a specified amount of time, testing for increased traffic at regular intervals.
This test will pass if all stations increase traffic over the full test duration.
Mode 2:
When station is already available This script will create layer3 cross-connects and endpoints It will then
create layer 3 traffic over a specified amount of time, testing for increased traffic at regular intervals.
This test will pass if all stations increase traffic over the full test duration.
Use './test_ip_variable_time.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
import argparse
from LANforge import LFUtils
from realm import Realm
import time
import datetime
class IPVariableTime(Realm):
def __init__(self,
ssid=None,
security=None,
password=None,
sta_list=[],
create_sta=True,
name_prefix=None,
upstream=None,
radio=None,
host="localhost",
port=8080,
mode=0,
ap=None,
traffic_type=None,
side_a_min_rate=56, side_a_max_rate=0,
side_b_min_rate=56, side_b_max_rate=0,
number_template="00000",
test_duration="5m",
use_ht160=False,
ipv6=False,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(lfclient_host=host,
lfclient_port=port),
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.sta_list = sta_list
self.create_sta = create_sta
self.security = security
self.password = password
self.radio = radio
self.mode = mode
self.ap = ap
self.traffic_type = traffic_type
self.number_template = number_template
self.debug = _debug_on
# self.json_post("/cli-json/set_resource", {
# "shelf":1,
# "resource":all,
# "max_staged_bringup": 30,
# "max_trying_ifup": 15,
# "max_station_bringup": 6
# })
self.name_prefix = name_prefix
self.test_duration = test_duration
self.station_profile = self.new_station_profile()
self.cx_profile = self.new_l3_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.debug = self.debug
self.station_profile.use_ht160 = use_ht160
if self.station_profile.use_ht160:
self.station_profile.mode = 9
self.station_profile.mode = mode
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap", self.ap)
self.cx_profile.host = self.host
self.cx_profile.port = self.port
self.ipv6 = ipv6
self.cx_profile.name_prefix = self.name_prefix
self.cx_profile.side_a_min_bps = side_a_min_rate
self.cx_profile.side_a_max_bps = side_a_max_rate
self.cx_profile.side_b_min_bps = side_b_min_rate
self.cx_profile.side_b_max_bps = side_b_max_rate
def start(self, print_pass=False, print_fail=False):
if self.create_sta:
self.station_profile.admin_up()
# to-do- check here if upstream port got IP
temp_stas = self.station_profile.station_names.copy()
if self.wait_for_ip(temp_stas, ipv4=not self.ipv6, ipv6=self.ipv6):
self._pass("All stations got IPs")
else:
self._fail("Stations failed to get IPs")
self.exit_fail()
self.cx_profile.start_cx()
def stop(self):
self.cx_profile.stop_cx()
if self.create_sta:
self.station_profile.admin_down()
def pre_cleanup(self):
self.cx_profile.cleanup_prefix()
if self.create_sta:
for sta in self.sta_list:
self.rm_port(sta, check_exists=True)
def cleanup(self):
self.cx_profile.cleanup()
if self.create_sta:
self.station_profile.cleanup()
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
debug=self.debug)
def build(self):
if self.create_sta:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.cx_profile.create(endp_type=self.traffic_type, side_a=self.sta_list,
side_b=self.upstream,
sleep_time=0)
def main():
parser = Realm.create_basic_argparse(
prog='test_ip_variable_time.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations to test connection and traffic on VAPs of varying security types (WEP, WPA, WPA2, WPA3, Open)
over ipv4 or ipv6
''',
description='''\
test_ip_variable_time.py:
--------------------
Generic command layout:
python3 ./test_ip_variable_time.py
--upstream_port eth1
--radio wiphy0
--num_stations 32
--security {open|wep|wpa|wpa2|wpa3}
--mode 1
{"auto" : "0",
"a" : "1",
"b" : "2",
"g" : "3",
"abg" : "4",
"abgn" : "5",
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
"bgnAX" : "13"}
--ssid netgear
--password admin123
--test_duration 2m (default)
--monitor_interval_ms
--a_min 3000
--b_min 1000
--ap "00:0e:8e:78:e1:76"
--output_format csv
--traffic_type lf_udp
--report_file ~/Documents/results.csv (Example of csv file output - please use another extension for other file formats)
--compared_report ~/Documents/results_prev.csv (Example of csv file retrieval - please use another extension for other file formats) - UNDER CONSTRUCTION
--layer3_cols 'name','tx bytes','rx bytes','dropped' (column names from the GUI to print on report - please read below to know what to put here according to preferences)
--port_mgr_cols 'ap','ip' (column names from the GUI to print on report - please read below to know what to put here according to preferences)
--debug
python3 ./test_ip_variable_time.py
--upstream_port eth1 (upstream Port)
--traffic_type lf_udp (traffic type, lf_udp | lf_tcp)
--test_duration 5m (duration to run traffic 5m --> 5 Minutes)
--create_sta False (False, means it will not create stations and use the sta_names specified below)
--sta_names sta000,sta001,sta002 (used if --create_sta False, comma separated names of stations)
===============================================================================
** FURTHER INFORMATION **
Using the layer3_cols flag:
Currently the output function does not support inputting the columns in layer3_cols the way they are displayed in the GUI. This quirk is under construction. To output
certain columns in the GUI in your final report, please match the according GUI column display to it's counterpart to have the columns correctly displayed in
your report.
GUI Column Display Layer3_cols argument to type in (to print in report)
Name | 'name'
EID | 'eid'
Run | 'run'
Mng | 'mng'
Script | 'script'
Tx Rate | 'tx rate'
Tx Rate (1 min) | 'tx rate (1&nbsp;min)'
Tx Rate (last) | 'tx rate (last)'
Tx Rate LL | 'tx rate ll'
Rx Rate | 'rx rate'
Rx Rate (1 min) | 'rx rate (1&nbsp;min)'
Rx Rate (last) | 'rx rate (last)'
Rx Rate LL | 'rx rate ll'
Rx Drop % | 'rx drop %'
Tx PDUs | 'tx pdus'
Tx Pkts LL | 'tx pkts ll'
PDU/s TX | 'pdu/s tx'
Pps TX LL | 'pps tx ll'
Rx PDUs | 'rx pdus'
Rx Pkts LL | 'pps rx ll'
PDU/s RX | 'pdu/s tx'
Pps RX LL | 'pps rx ll'
Delay | 'delay'
Dropped | 'dropped'
Jitter | 'jitter'
Tx Bytes | 'tx bytes'
Rx Bytes | 'rx bytes'
Replays | 'replays'
TCP Rtx | 'tcp rtx'
Dup Pkts | 'dup pkts'
Rx Dup % | 'rx dup %'
OOO Pkts | 'ooo pkts'
Rx OOO % | 'rx ooo %'
RX Wrong Dev | 'rx wrong dev'
CRC Fail | 'crc fail'
RX BER | 'rx ber'
CX Active | 'cx active'
CX Estab/s | 'cx estab/s'
1st RX | '1st rx'
CX TO | 'cx to'
Pattern | 'pattern'
Min PDU | 'min pdu'
Max PDU | 'max pdu'
Min Rate | 'min rate'
Max Rate | 'max rate'
Send Buf | 'send buf'
Rcv Buf | 'rcv buf'
CWND | 'cwnd'
TCP MSS | 'tcp mss'
Bursty | 'bursty'
A/B | 'a/b'
Elapsed | 'elapsed'
Destination Addr | 'destination addr'
Source Addr | 'source addr'
''')
parser.add_argument('--mode', help='Used to force mode of stations')
parser.add_argument('--ap', help='Used to force a connection to a particular AP')
parser.add_argument('--traffic_type', help='Select the Traffic Type [lf_udp, lf_tcp, udp, tcp], type will be '
'adjusted automatically between ipv4 and ipv6 based on use of --ipv6 flag'
, required=True)
parser.add_argument('--output_format', help='choose either csv or xlsx')
parser.add_argument('--report_file', help='where you want to store results', default=None)
parser.add_argument('--a_min', help='--a_min bps rate minimum for side_a', default=256000)
parser.add_argument('--b_min', help='--b_min bps rate minimum for side_b', default=256000)
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="2m")
parser.add_argument('--layer3_cols', help='Columns wished to be monitored from layer 3 endpoint tab',
default=['name', 'tx bytes', 'rx bytes', 'tx rate', 'rx rate'])
parser.add_argument('--port_mgr_cols', help='Columns wished to be monitored from port manager tab',
default=['ap', 'ip', 'parent dev'])
parser.add_argument('--compared_report', help='report path and file which is wished to be compared with new report',
default=None)
parser.add_argument('--monitor_interval',
help='how frequently do you want your monitor function to take measurements; \, 35s, 2h',
default='10s')
parser.add_argument('--ipv6', help='Sets the test to use IPv6 traffic instead of IPv4', action='store_true')
parser.add_argument('--influx_token', help='Username for your Influx database')
parser.add_argument('--influx_bucket', help='Password for your Influx database')
parser.add_argument('--influx_org', help='Name of your Influx database')
parser.add_argument('--influx_port', help='Port where your influx database is located', default=8086)
parser.add_argument('--influx_tag', action='append', nargs=2,
help='--influx_tag <key> <val> Can add more than one of these.')
parser.add_argument('--influx_mgr',
help='IP address of the server your Influx database is hosted if different from your LANforge Manager',
default=None)
parser.add_argument('--create_sta', help='Used to force a connection to a particular AP', default=True)
parser.add_argument('--sta_names', help='Used to force a connection to a particular AP', default="sta0000")
args = parser.parse_args()
create_sta = True
if args.create_sta == "False":
create_sta = False
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_sta = int(args.num_stations)
# Create directory
# if file path with output file extension is not given...
# check if home/lanforge/report-data exists. if not, save
# in new folder based in current file's directory
if args.report_file is None:
new_file_path = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-h-%M-m-%S-s")).replace(':',
'-') + '_test_ip_variable_time' # create path name
try:
path = os.path.join('/home/lanforge/report-data/', new_file_path)
os.mkdir(path)
except:
curr_dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
path = os.path.join(curr_dir_path, new_file_path)
os.mkdir(path)
systeminfopath = str(path) + '/systeminfo.txt'
if args.output_format in ['csv', 'json', 'html', 'hdf', 'stata', 'pickle', 'pdf', 'png', 'parquet',
'xlsx']:
report_f = str(path) + '/data.' + args.output_format
output = args.output_format
else:
print(
'Not supporting this report format or cannot find report format provided. Defaulting to csv data file '
'output type, naming it data.csv.')
report_f = str(path) + '/data.csv'
output = 'csv'
else:
systeminfopath = str(args.report_file).split('/')[-1]
report_f = args.report_file
if args.output_format is None:
output = str(args.report_file).split('.')[-1]
else:
output = args.output_format
print("IP Test Report Data: {}".format(report_f))
# Retrieve last data file
compared_rept = None
if args.compared_report:
compared_report_format = args.compared_report.split('.')[-1]
# if compared_report_format not in ['csv', 'json', 'dta', 'pkl','html','xlsx','parquet','h5']:
if compared_report_format != 'csv':
print(ValueError("Cannot process this file type. Please select a different file and re-run script."))
exit(1)
else:
compared_rept = args.compared_report
if create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta - 1, padding_number_=10000,
radio=args.radio)
else:
station_list = args.sta_names.split(",")
CX_TYPES = ("tcp", "udp", "lf_tcp", "lf_udp")
if (args.traffic_type is None) or (args.traffic_type not in CX_TYPES):
print("cx_type needs to be lf_tcp, lf_udp, tcp, or udp, bye")
exit(1)
if args.ipv6:
if args.traffic_type == "tcp" or args.traffic_type == "lf_tcp":
args.traffic_type = "lf_tcp6"
if args.traffic_type == "udp" or args.traffic_type == "lf_udp":
args.traffic_type = "lf_udp6"
else:
if args.traffic_type == "tcp":
args.traffic_type = "lf_tcp"
if args.traffic_type == "udp":
args.traffic_type = "lf_udp"
ip_var_test = IPVariableTime(host=args.mgr,
port=args.mgr_port,
number_template="0000",
sta_list=station_list,
create_sta=create_sta,
name_prefix="VT",
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
test_duration=args.test_duration,
use_ht160=False,
side_a_min_rate=args.a_min,
side_b_min_rate=args.b_min,
mode=args.mode,
ap=args.ap,
ipv6=args.ipv6,
traffic_type=args.traffic_type,
_debug_on=args.debug)
ip_var_test.pre_cleanup()
ip_var_test.build()
# exit()
if create_sta:
if not ip_var_test.passes():
print(ip_var_test.get_fail_message())
ip_var_test.exit_fail()
try:
layer3connections = ','.join([[*x.keys()][0] for x in ip_var_test.json_get('endp')['endpoint']])
except:
raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
if type(args.layer3_cols) is not list:
layer3_cols = list(args.layer3_cols.split(","))
# send col names here to file to reformat
else:
layer3_cols = args.layer3_cols
# send col names here to file to reformat
if type(args.port_mgr_cols) is not list:
port_mgr_cols = list(args.port_mgr_cols.split(","))
# send col names here to file to reformat
else:
port_mgr_cols = args.port_mgr_cols
# send col names here to file to reformat
if args.debug:
print("Layer 3 Endp column names are...")
print(layer3_cols)
print("Port Manager column names are...")
print(port_mgr_cols)
print("Layer 3 Endp column names are...")
print(layer3_cols)
print("Port Manager column names are...")
print(port_mgr_cols)
try:
monitor_interval = Realm.parse_time(args.monitor_interval).total_seconds()
except ValueError as error:
print(str(error))
print(ValueError(
"The time string provided for monitor_interval argument is invalid. Please see supported time stamp increments and inputs for monitor_interval in --help. "))
exit(1)
ip_var_test.start(False, False)
# if args.influx_mgr is None:
# manager = args.mgr
# else:
# manager = args.influx_mgr
if args.influx_org is not None:
from influx2 import RecordInflux
grapher = RecordInflux( # _influx_host=manager,
_influx_port=args.influx_port,
_influx_org=args.influx_org,
_influx_token=args.influx_token,
_influx_bucket=args.influx_bucket)
devices = [station.split('.')[-1] for station in station_list]
tags = dict()
tags['script'] = 'test_ip_variable_time'
try:
for k in args.influx_tag:
tags[k[0]] = k[1]
except:
pass
grapher.monitor_port_data(longevity=Realm.parse_time(args.test_duration).total_seconds(),
devices=devices,
monitor_interval=Realm.parse_time(args.monitor_interval).total_seconds(),
tags=tags)
ip_var_test.cx_profile.monitor(layer3_cols=layer3_cols,
sta_list=station_list,
# port_mgr_cols=port_mgr_cols,
report_file=report_f,
systeminfopath=systeminfopath,
duration_sec=Realm.parse_time(args.test_duration).total_seconds(),
monitor_interval_ms=monitor_interval,
created_cx=layer3connections,
output_format=output,
compared_report=compared_rept,
script_name='test_ip_variable_time',
arguments=args,
debug=args.debug)
ip_var_test.stop()
if create_sta:
if not ip_var_test.passes():
print(ip_var_test.get_fail_message())
ip_var_test.exit_fail()
LFUtils.wait_until_ports_admin_up(port_list=station_list)
if ip_var_test.passes():
ip_var_test.success()
ip_var_test.cleanup()
print("IP Variable Time Test Report Data: {}".format(report_f))
if __name__ == "__main__":
main()

View File

@@ -934,7 +934,7 @@ python3 .\\test_l3_longevity.py --test_duration 4m --endp_type \"lf_tcp lf_udp m
parser.add_argument('-t', '--endp_type', help='--endp_type <types of traffic> example --endp_type \"lf_udp lf_tcp mc_udp\" Default: lf_udp , options: lf_udp, lf_udp6, lf_tcp, lf_tcp6, mc_udp, mc_udp6',
default='lf_udp', type=valid_endp_types)
parser.add_argument('-u', '--upstream_port', help='--upstream_port <cross connect upstream_port> example: --upstream_port eth1',default='eth1')
parser.add_argument('--downstream_port', help='--downstream_port <cross connect downstream_port> example: --downstream_port eth2',default='eth2')
parser.add_argument('--downstream_port', help='--downstream_port <cross connect downstream_port> example: --downstream_port eth2')
parser.add_argument('-o','--csv_outfile', help="--csv_outfile <Output file for csv data>", default="")
parser.add_argument('--polling_interval', help="--polling_interval <seconds>", default='60s')
@@ -1219,7 +1219,7 @@ python3 .\\test_l3_longevity.py --test_duration 4m --endp_type \"lf_tcp lf_udp m
ap_umsched_file.close()
ap_msched_data = ip_var_test.get_ap_msched()
ap_msched = report.file_add_path("ap_msched.txt")
ap_msched = "{}-{}".format(file_date,"ap_msched.txt")
ap_msched = report.file_add_path(ap_msched)
ap_msched_file = open(ap_msched, "w")
ap_msched_file.write(str(ap_msched_data))

446
py-scripts/test_l4.py Executable file
View File

@@ -0,0 +1,446 @@
#!/usr/bin/env python3
"""
NAME: test_l4.py
PURPOSE:
test_l4.py will create stations and endpoints to generate and verify layer-4 traffic
This script will monitor the urls/s, bytes-rd, or bytes-wr attribute of the endpoints.
These attributes can be tested over FTP using a --ftp flag.
If the the monitored value does not continually increase, this test will not pass.
This script replaces the functionality of test_ipv4_l4.py, test_ipv4_l4_ftp_upload.py, test_ipv4_l4_ftp_urls_per_ten.py,
test_ipv4_l4_ftp_wifi.py, test_ipv4_l4_urls_per_ten.py, test_ipv4_l4_urls_per_ten.py, test_ipv4_l4_wifi.py
EXAMPLE (urls/s):
./test_l4.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --requests_per_ten 600 --mode 1 --num_tests 1 --test_type 'urls/s'
--url "dl http://10.40.0.1 /dev/null" --ap "00:0e:8e:78:e1:76" --target_per_ten 600 --output_format csv
--report_file ~/Documents/results.csv --test_duration 2m --debug
EXAMPLE (bytes-wr):
./test_l4.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --test_duration 2m --url "ul http://10.40.0.1 /dev/null"
--requests_per_ten 600 --test_type bytes-wr --debug
EXAMPLE (bytes-rd):
./test_l4.py --upstream_port eth1 (optional) --radio wiphy0 (required) --num_stations 3 (optional)
--security {open|wep|wpa|wpa2|wpa3} (required) --ssid netgear (required)
--url "dl http://10.40.0.1 /dev/null" (required) --password admin123 (required)
--test_duration 2m (optional) --test_type bytes-rd --debug (optional)
EXAMPLE (ftp urls/s):
./test_l4.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --test_duration 2m --interval 1s --mode 1 --ap "00:0e:8e:78:e1:76"
--requests_per_ten 600 --num_tests 1 --ftp --test_type 'urls/s'
--url "ul ftp://lanforge:lanforge@10.40.0.1/example.txt /home/lanforge/example.txt" --debug
EXAMPLE (ftp bytes-wr):
./test_l4.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
--ssid netgear --passwd admin123 --test_duration 2m --url "ul ftp://10.40.0.1 /dev/null"
--requests_per_ten 600 --ftp --test_type bytes-wr --debug
EXAMPLE (ftp bytes-rd):
./test_l4.py --upstream_port eth1 (optional) --radio wiphy0 (required) --num_stations 3 (optional)
--security {open|wep|wpa|wpa2|wpa3} (required) --ssid netgear (required)
--url "dl ftp://10.40.0.1 /dev/null" (required) --password admin123 (required)
--test_duration 2m (optional) --ftp --test_type bytes-rd --debug (optional)
Use './test_l4.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
import argparse
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import realm
import time
import datetime
from realm import TestGroupProfile
class IPV4L4(LFCliBase):
def __init__(self,
host="localhost",
port=8080,
ssid=None,
security=None,
password=None,
url=None,
ftp_user=None,
ftp_passwd=None,
requests_per_ten=None,
station_list=None,
test_duration="2m",
ap=None,
mode=0,
target_requests_per_ten=60,
number_template="00000",
num_tests=1,
radio="wiphy0",
_debug_on=False,
upstream_port="eth1",
ftp=False,
test_type=None,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.radio = radio
self.upstream_port = upstream_port
self.ssid = ssid
self.security = security
self.password = password
self.url = url
self.mode = mode
self.ap = ap
self.debug = _debug_on
self.requests_per_ten = int(requests_per_ten)
self.number_template = number_template
self.test_duration = test_duration
self.sta_list = station_list
self.num_tests = int(num_tests)
self.target_requests_per_ten = int(target_requests_per_ten)
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.l4cxprofile = realm.L4CXProfile(lfclient_host=host,
lfclient_port=port, local_realm=self.local_realm)
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_l4_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = self.mode
self.test_type = test_type
self.ftp_user = ftp_user
self.ftp_passwd = ftp_passwd
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap", self.ap)
self.cx_profile.url = self.url
self.cx_profile.requests_per_ten = self.requests_per_ten
self.ftp = ftp
if self.ftp and 'ftp://' not in self.url:
print("WARNING! FTP test chosen, but ftp:// not present in url!")
if self.test_type != 'urls/s' and self.test_type != 'bytes-wr' and self.test_type != 'bytes-rd':
raise ValueError("Unknown test type: %s\nValid test types are urls/s, bytes-rd, or bytes-wr" % self.test_type)
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
if self.ftp:
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=True, ftp=self.ftp,
user=self.ftp_user, passwd=self.ftp_passwd,
source=self.source)
else:
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None)
def __check_request_rate(self):
endp_list = self.json_get("layer4/list?fields=urls/s")
expected_passes = 0
passes = 0
if endp_list is not None and endp_list['endpoint'] is not None:
endp_list = endp_list['endpoint']
for item in endp_list:
for name, info in item.items():
if name in self.cx_profile.created_cx.keys():
expected_passes += 1
if info['urls/s'] * self.requests_per_ten >= self.target_requests_per_ten * .9:
passes += 1
return passes == expected_passes
def __compare_vals(self, old_list, new_list):
passes = 0
expected_passes = 0
if len(old_list) == len(new_list):
for item, value in old_list.items():
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
if passes == expected_passes:
return True
else:
return False
else:
return False
def __get_bytes(self):
time.sleep(1)
cx_list = self.json_get("layer4/list?fields=name,%s" % self.test_type, debug_=self.debug)
# print("==============\n", cx_list, "\n==============")
cx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
for item, value in cx_name.items():
for value_name, value_rx in value.items():
if item in self.cx_profile.created_cx.keys() and value_name == self.test_type:
cx_map[item] = value_rx
return cx_map
def start(self, print_pass=False, print_fail=False):
if self.ftp:
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=True)
temp_stas = self.sta_list.copy()
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(temp_stas):
self._pass("All stations got IPs", print_pass)
else:
self._fail("Stations failed to get IPs", print_fail)
exit(1)
self.cx_profile.start_cx()
print("Starting test")
curr_time = datetime.datetime.now()
if self.test_type != 'urls/s':
old_rx_values = self.__get_bytes()
end_time = self.local_realm.parse_time(self.test_duration) + curr_time
sleep_interval = self.local_realm.parse_time(self.test_duration) // 5
passes = 0
expected_passes = 0
for test in range(self.num_tests):
expected_passes += 1
while curr_time < end_time:
time.sleep(sleep_interval.total_seconds())
curr_time = datetime.datetime.now()
if self.test_type == 'urls/s':
if self.cx_profile.check_errors(self.debug):
if self.__check_request_rate():
passes += 1
else:
self._fail("FAIL: Request rate did not exceed target rate", print_fail)
break
else:
self._fail("FAIL: Errors found getting to %s " % self.url, print_fail)
break
else:
new_rx_values = self.__get_bytes()
expected_passes += 1
if self.__compare_vals(old_rx_values, new_rx_values):
passes += 1
else:
self._fail("FAIL: Not all stations increased traffic", print_fail)
break
old_rx_values = new_rx_values
cur_time = datetime.datetime.now()
if passes == expected_passes:
self._pass("PASS: All tests passes", print_pass)
def stop(self):
self.cx_profile.stop_cx()
if self.ftp:
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=False)
self.station_profile.admin_down()
def cleanup(self, sta_list):
self.cx_profile.cleanup()
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
parser = LFCliBase.create_basic_argparse(
prog='test_l4',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
This script will monitor the urls/s, bytes-rd, or bytes-wr attribute of the endpoints.
''',
description='''\
test_l4.py:
--------------------
Generic command example:
python3 ./test_l4.py
--upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--requests_per_ten 600 \\
--mode 1
{"auto" : "0",
"a" : "1",
"b" : "2",
"g" : "3",
"abg" : "4",
"abgn" : "5",
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
"bgnAX" : "13"} \\
--num_tests 1 \\
--url "dl http://10.40.0.1 /dev/null" \\
--ap "00:0e:8e:78:e1:76"
--target_per_ten 600 \\
--output_format csv \\
--report_file ~/Documents/results.csv \\
--test_duration 2m \\
--debug
''')
required = None
for agroup in parser._action_groups:
if agroup.title == "required arguments":
required = agroup
# if required is not None:
optional = None
for agroup in parser._action_groups:
if agroup.title == "optional arguments":
optional = agroup
if optional is not None:
optional.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes',
default=600)
optional.add_argument('--num_tests', help='--num_tests number of tests to run. Each test runs 10 minutes',
default=1)
optional.add_argument('--url', help='--url specifies upload/download, address, and dest',
default="dl http://10.40.0.1 /dev/null")
optional.add_argument('--test_duration', help='duration of test', default="2m")
optional.add_argument('--target_per_ten',
help='--target_per_ten target number of request per ten minutes. test will check for 90 percent this value',
default=600)
optional.add_argument('--mode', help='Used to force mode of stations')
optional.add_argument('--ap', help='Used to force a connection to a particular AP')
optional.add_argument('--report_file', help='where you want to store results')
optional.add_argument('--output_format', help='choose csv or xlsx') # update once other forms are completed
optional.add_argument('--ftp', help='Use ftp for the test', action='store_true')
optional.add_argument('--test_type', help='Choose type of test to run {url/s, bytes-rd, bytes-wr}', default='bytes-rd')
optional.add_argument('--ftp_user', help='--ftp_user sets the username to be used for ftp', default="lanforge")
optional.add_argument('--ftp_passwd', help='--ftp_user sets the password to be used for ftp', default="lanforge")
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
if args.report_file is None:
if args.output_format in ['csv', 'json', 'html', 'hdf', 'stata', 'pickle', 'pdf', 'parquet', 'png', 'df',
'xlsx']:
output_form = args.output_format.lower()
print("Defaulting file output placement to /home/lanforge.")
rpt_file = '/home/data.' + output_form
else:
print("Defaulting data file output type to Excel")
rpt_file = '/home/lanforge/data.xlsx'
output_form = 'xlsx'
else:
rpt_file = args.report_file
if args.output_format is None:
output_form = str(args.report_file).split('.')[-1]
else:
output_form = args.output_format
# Create directory
if args.report_file is None:
try:
homedir = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':',
'-') + 'test_l4'
path = os.path.join('/home/lanforge/report-data/', homedir)
os.mkdir(path)
except:
path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print('Saving file to local directory')
else:
pass
if args.report_file is None:
if args.output_format in ['csv', 'json', 'html', 'hdf', 'stata', 'pickle', 'pdf', 'png', 'df', 'parquet',
'xlsx']:
rpt_file = path + '/data.' + args.output_format
output = args.output_format
else:
print('Defaulting data file output type to Excel')
rpt_file = path + '/data.xlsx'
output = 'xlsx'
else:
rpt_file = args.report_file
if args.output_format is None:
output = str(args.report_file).split('.')[-1]
else:
output = args.output_format
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta - 1, padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
upstream_port=args.upstream_port,
security=args.security,
station_list=station_list,
url=args.url,
mode=args.mode,
ap=args.ap,
ftp=args.ftp,
ftp_user=args.ftp_user,
ftp_passwd=args.ftp_passwd,
test_type=args.test_type,
_debug_on=args.debug,
test_duration=args.test_duration,
num_tests=args.num_tests,
target_requests_per_ten=args.target_per_ten,
requests_per_ten=args.requests_per_ten)
ip_test.cleanup(station_list)
ip_test.build()
ip_test.start()
try:
layer4traffic = ','.join([[*x.keys()][0] for x in ip_test.local_realm.json_get('layer4')['endpoint']])
except:
pass
ip_test.l4cxprofile.monitor(col_names=['bytes-rd', 'urls/s', 'bytes-wr'],
report_file=rpt_file,
duration_sec=ip_test.local_realm.parse_time(args.test_duration).total_seconds(),
created_cx=layer4traffic,
output_format=output_form,
script_name='test_l4',
arguments=args,
debug=args.debug)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
exit(1)
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
print("Full test passed, all endpoints met or exceeded 90 percent of the target rate")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,50 @@
{
"test_parameters":{
"test_timeout": 200,
"load_blank_db": false,
"load_factory_default_db": true,
"load_custom_db": false,
"custom_db": "DFLT_ETH1_GEN",
"email_list_production": "chuck.rekiere@candelatech.com",
"host_ip_production": "192.168.95.6",
"email_list_test": "chuck.rekiere@candelatech.com",
"host_ip_test": "192.168.95.6"
},
"test_network":{
"http_test_ip": "10.40.0.10",
"ftp_test_ip": "10.40.0.10",
"test_ip": "192.168.0.104"
},
"test_generic":{
"radio_used": "wiphy1",
"ssid_used": "ct523c-vap",
"ssid_pw_used": "ct523c-vap",
"security_used": "wpa2",
"num_sta": 4,
"col_names": "name,tx_byptes,rx_bytes,dropped",
"upstream_port": "eth1"
},
"radio_dict":{
"RADIO_0_CFG":{"KEY":"RADIO_0_CFG","RADIO":"wiphy0","STATIONS":"4","SSID":"ct523c-vap","PASSWD":"ct523c-vap","SECURITY":"wpa2"},
"RADIO_1_CFG":{"KEY":"RADIO_1_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ct523c-vap","PASSWD":"ct523c-vap","SECURITY":"wpa2"}
},
"test_suites":{
"suite_one":{
"create_l3":{"enabled":"TRUE","command":"create_l4.py","args":"--radio RADIO_USED --ssid SSID_USED --passwd SSID_PW_USED --security SECURITY_USED --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"RADIO_1_CFG --debug"},
"create_l4_2":{"enabled":"TRUE","command":"create_l4.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --debug"}
},
"suite_two":{
"test_l3_longevity":{"enabled":"TRUE","command":"test_l3_longevity.py","args":"--test_duration 15s --polling_interval 5s --upstream_port eth1 --radio 'radio==wiphy1,stations==4,ssid==ct523c-vap,ssid_pw==ct523c-vap,security==wpa2' --endp_type lf_udp --rates_are_totals --side_a_min_bps=20000 --side_b_min_bps=300000000"}
},
"TEST_DICTONARY":{
"create_l3":{"enabled":"TRUE","command":"create_l4.py","args":"--radio RADIO_USED --ssid SSID_USED --passwd SSID_PW_USED --security SECURITY_USED --debug"},
"test_l3_longevity":{"enabled":"TRUE","command":"test_l3_longevity.py","args":"--test_duration 15s --polling_interval 5s --upstream_port eth1 --radio 'radio==wiphy1,stations==4,ssid==ct523c-vap,ssid_pw==ct523c-vap,security==wpa2' --endp_type lf_udp --rates_are_totals --side_a_min_bps=20000 --side_b_min_bps=300000000"}
}
}
}

View File

@@ -5,20 +5,36 @@ NAME:
lf_check.py
PURPOSE:
lf_check.py will run a series of tests based on the test TEST_DICTIONARY listed in lf_check_config.ini.
The lf_check_config.ini file is copied from lf_check_config_template.ini and local configuration is made
to the lf_check_config.ini.
lf_check.py will tests based on .ini file or .json file.
The config file may be copied from lf_check_config_template.ini, or can be generated.
The config file name can be passed in as a configuraiton parameter.
The json file may be copied from lf_check.json and updated. Currently all the parameters are needed to be set to a value
The --production flag determine the email list for results
EXAMPLE:
lf_check.py
lf_check.py # this will use the defaults
lf_check.py --ini <unique ini file> --test_suite <suite to use in .ini file>
lf_check.py --ini <unique ini file> --test_suite <suite to use in .ini file> --production
lf_check.py --use_json --json <unique json file> --test_suite
lf_check.py --use_json --json <unique json file> --production
NOTES:
Before using lf_check.py
1. copy lf_check_config_template.ini to the lf_check_config.ini
2. update lf_check_config.ini to enable (TRUE) tests to be run in the TEST_DICTIONARY , the TEST_DICTIONARY needs to be passed in
Using .ini:
1. copy lf_check_config_template.ini to <file name>.ini , this will avoid .ini being overwritten on git pull
2. update <file name>.ini to enable (TRUE) tests to be run in the test suite, the default suite is the TEST_DICTIONARY
3. update other configuration to specific test bed for example radios
Using .json:
1. copy lf_check.json to <file name>.json this will avoide .json being overwritten on git pull
2. update lf_check.json to enable (TRUE) tests to be run in the test suite, the default TEST_DICTIONARY
TO DO NOTES:
6/4/2021 : add server (telnet localhost 4001) build info, GUI build shaw, and Kernel version to the output.
6/14/2021 :
1. add server (telnet localhost 4001) build info, GUI build sha, and Kernel version to the output.
2. add unique database prior to each run
'''
import datetime
@@ -28,7 +44,6 @@ if sys.version_info[0] != 3:
print("This script requires Python3")
exit()
import os
import socket
import logging
@@ -40,28 +55,34 @@ import configparser
import subprocess
import csv
import shutil
import os.path
from os import path
# lf_report is from the parent of the current file
dir_path = os.path.dirname(os.path.realpath(__file__))
parent_dir_path = os.path.abspath(os.path.join(dir_path,os.pardir))
sys.path.insert(0, parent_dir_path)
#sys.path.append('../')
from lf_report import lf_report
sys.path.append('/')
CONFIG_FILE = os.getcwd() + '/lf_check_config.ini'
RUN_CONDITION = 'ENABLE'
# setup logging FORMAT
FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s'
# lf_check class contains verificaiton configuration and ocastrates the testing.
class lf_check():
def __init__(self,
_use_json,
_config_ini,
_json_data,
_test_suite,
_production,
_csv_results,
_outfile):
self.use_json = _use_json
self.json_data = _json_data
self.config_ini = _config_ini
self.test_suite = _test_suite
self.production_run = _production
self.lf_mgr_ip = ""
self.lf_mgr_port = ""
self.radio_dict = {}
@@ -100,7 +121,6 @@ class lf_check():
self.use_blank_db = "FALSE"
self.use_factory_default_db = "FALSE"
self.use_custom_db = "FALSE"
self.production_run = "FALSE"
self.email_list_production = ""
self.host_ip_production = None
self.email_list_test = ""
@@ -114,7 +134,7 @@ class lf_check():
report_url=report_file.replace('/home/lanforge/', '')
if report_url.startswith('/'):
report_url = report_url[1:]
# Following recommendation
# following recommendation
# NOTE: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-from-nic-in-python
#command = 'echo "$HOSTNAME mail system works!" | mail -s "Test: $HOSTNAME $(date)" chuck.rekiere@candelatech.com'
hostname = socket.gethostname()
@@ -124,11 +144,9 @@ http://{ip}/{report}
NOTE: for now to see stdout and stderr remove /home/lanforge from path.
""".format(hostname=hostname, ip=ip, report=report_url)
mail_subject = "Regression Test [{hostname}] {date}".format(hostname=hostname,
date=datetime.datetime.now())
mail_subject = "Regression Test [{hostname}] {date}".format(hostname=hostname, date=datetime.datetime.now())
try:
if self.production_run == "TRUE":
if self.production_run == True:
msg = message_txt.format(ip=self.host_ip_production)
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg,
@@ -189,15 +207,173 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
<br>
"""
# Functions in this section are/can be overridden by descendants
# This code reads the lf_check_config.ini file to populate the test variables
def read_config_contents(self):
self.logger.info("read_config_contents {}".format(CONFIG_FILE))
def read_config(self):
if self.use_json:
self.read_config_json()
else:
self.read_config_ini()
# there is probably a more efficient way to do this in python
# Keeping it obvious for now, may be refactored later
def read_config_json(self):
#self.logger.info("read_config_json_contents {}".format(self.json_data))
if "test_parameters" in self.json_data:
self.logger.info("json: read test_parameters")
#self.logger.info("test_parameters {}".format(self.json_data["test_parameters"]))
self.read_test_parameters()
else:
self.logger.info("EXITING test_parameters not in json {}".format(self.json_data))
exit(1)
if "test_network" in self.json_data:
self.logger.info("json: read test_network")
#self.logger.info("test_network {}".format(self.json_data["test_network"]))
self.read_test_network()
else:
self.logger.info("EXITING test_network not in json {}".format(self.json_data))
exit(1)
if "test_generic" in self.json_data:
self.logger.info("json: read test_generic")
#self.logger.info("test_generic {}".format(self.json_data["test_generic"]))
self.read_test_generic()
else:
self.logger.info("EXITING test_generic not in json {}".format(self.json_data))
exit(1)
if "radio_dict" in self.json_data:
self.logger.info("json: read radio_dict")
#self.logger.info("radio_dict {}".format(self.json_data["radio_dict"]))
self.radio_dict = self.json_data["radio_dict"]
self.logger.info("self.radio_dict {}".format(self.radio_dict))
else:
self.logger.info("EXITING radio_dict not in json {}".format(self.json_data))
exit(1)
if "test_suites" in self.json_data:
self.logger.info("json: read test_suites looking for: {}".format(self.test_suite))
#self.logger.info("test_suites {}".format(self.json_data["test_suites"]))
if self.test_suite in self.json_data["test_suites"]:
self.test_dict = self.json_data["test_suites"][self.test_suite]
#self.logger.info("self.test_dict {}".format(self.test_dict))
else:
self.logger.info("EXITING test_suite {} Not Present in json test_suites: {}".format(self.test_suite, self.json_data["test_suites"]))
exit(1)
else:
self.logger.info("EXITING test_suites not in json {}".format(self.json_data))
exit(1)
def read_test_parameters(self):
if "test_timeout" in self.json_data["test_parameters"]:
self.test_timeout = self.json_data["test_parameters"]["test_timeout"]
else:
self.logger.info("test_timeout not in test_parameters json")
exit(1)
if "load_blank_db" in self.json_data["test_parameters"]:
self.load_blank_db = self.json_data["test_parameters"]["load_blank_db"]
else:
self.logger.info("load_blank_db not in test_parameters json")
exit(1)
if "load_factory_default_db" in self.json_data["test_parameters"]:
self.load_factory_default_db = self.json_data["test_parameters"]["load_factory_default_db"]
else:
self.logger.info("load_factory_default_db not in test_parameters json")
exit(1)
if "load_custom_db" in self.json_data["test_parameters"]:
self.load_custom_db = self.json_data["test_parameters"]["load_custom_db"]
else:
self.logger.info("load_custom_db not in test_parameters json")
exit(1)
if "custom_db" in self.json_data["test_parameters"]:
self.custom_db = self.json_data["test_parameters"]["custom_db"]
else:
self.logger.info("custom_db not in test_parameters json, if not using custom_db just put in a name")
exit(1)
if "email_list_production" in self.json_data["test_parameters"]:
self.email_list_production = self.json_data["test_parameters"]["email_list_production"]
else:
self.logger.info("email_list_production not in test_parameters json")
exit(1)
if "host_ip_production" in self.json_data["test_parameters"]:
self.host_ip_production = self.json_data["test_parameters"]["host_ip_production"]
else:
self.logger.info("host_ip_production not in test_parameters json")
exit(1)
if "email_list_test" in self.json_data["test_parameters"]:
self.email_list_test = self.json_data["test_parameters"]["email_list_test"]
else:
self.logger.info("email_list_test not in test_parameters json")
exit(1)
if "host_ip_test" in self.json_data["test_parameters"]:
self.email_list_test = self.json_data["test_parameters"]["host_ip_test"]
else:
self.logger.info("host_ip_test not in test_parameters json")
exit(1)
def read_test_network(self):
if "http_test_ip" in self.json_data["test_network"]:
self.http_test_ip = self.json_data["test_network"]["http_test_ip"]
else:
self.logger.info("http_test_ip not in test_network json")
exit(1)
if "ftp_test_ip" in self.json_data["test_network"]:
self.ftp_test_ip = self.json_data["test_network"]["ftp_test_ip"]
else:
self.logger.info("ftp_test_ip not in test_network json")
exit(1)
if "test_ip" in self.json_data["test_network"]:
self.ftp_test_ip = self.json_data["test_network"]["test_ip"]
else:
self.logger.info("test_ip not in test_network json")
exit(1)
def read_test_generic(self):
if "radio_used" in self.json_data["test_generic"]:
self.radio_lf = self.json_data["test_generic"]["radio_used"]
else:
self.logger.info("radio_used not in test_generic json")
exit(1)
if "ssid_used" in self.json_data["test_generic"]:
self.ssid = self.json_data["test_generic"]["ssid_used"]
else:
self.logger.info("ssid_used not in test_generic json")
exit(1)
if "ssid_pw_used" in self.json_data["test_generic"]:
self.ssid_pw = self.json_data["test_generic"]["ssid_pw_used"]
else:
self.logger.info("ssid_pw_used not in test_generic json")
exit(1)
if "security_used" in self.json_data["test_generic"]:
self.security = self.json_data["test_generic"]["security_used"]
else:
self.logger.info("security_used not in test_generic json")
exit(1)
if "num_sta" in self.json_data["test_generic"]:
self.num_sta = self.json_data["test_generic"]["num_sta"]
else:
self.logger.info("num_sta not in test_generic json")
exit(1)
if "col_names" in self.json_data["test_generic"]:
self.num_sta = self.json_data["test_generic"]["col_names"]
else:
self.logger.info("col_names not in test_generic json")
exit(1)
if "upstream_port" in self.json_data["test_generic"]:
self.num_sta = self.json_data["test_generic"]["upstream_port"]
else:
self.logger.info("upstream_port not in test_generic json")
exit(1)
# functions in this section are/can be overridden by descendants
# this code reads the lf_check_config.ini file to populate the test variables
def read_config_ini(self):
#self.logger.info("read_config_ini_contents {}".format(self.config_ini))
config_file = configparser.ConfigParser()
success = True
success = config_file.read(CONFIG_FILE)
self.logger.info("logger worked")
success = config_file.read(self.config_ini)
self.logger.info("config_file.read result {}".format(success))
# LF_MGR parameters not used yet
if 'LF_MGR' in config_file.sections():
section = config_file['LF_MGR']
self.lf_mgr_ip = section['LF_MGR_IP']
@@ -205,6 +381,18 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
self.logger.info("lf_mgr_ip {}".format(self.lf_mgr_ip))
self.logger.info("lf_mgr_port {}".format(self.lf_mgr_port))
if 'TEST_PARAMETERS' in config_file.sections():
section = config_file['TEST_PARAMETERS']
self.test_timeout = section['TEST_TIMEOUT']
self.use_blank_db = section['LOAD_BLANK_DB']
self.use_factory_default_db = section['LOAD_FACTORY_DEFAULT_DB']
self.use_custom_db = section['LOAD_CUSTOM_DB']
self.custom_db = section['CUSTOM_DB']
self.email_list_production = section['EMAIL_LIST_PRODUCTION']
self.host_ip_production = section['HOST_IP_PRODUCTION']
self.email_list_test = section['EMAIL_LIST_TEST']
self.host_ip_test = section['HOST_IP_TEST']
if 'TEST_NETWORK' in config_file.sections():
section = config_file['TEST_NETWORK']
self.http_test_ip = section['HTTP_TEST_IP']
@@ -231,32 +419,22 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
self.upstream_port = section['UPSTREAM_PORT']
self.logger.info("upstream_port {}".format(self.upstream_port))
if 'TEST_PARAMETERS' in config_file.sections():
section = config_file['TEST_PARAMETERS']
self.test_timeout = section['TEST_TIMEOUT']
self.use_blank_db = section['LOAD_BLANK_DB']
self.use_factory_default_db = section['LOAD_FACTORY_DEFAULT_DB']
self.use_custom_db = section['LOAD_CUSTOM_DB']
self.custom_db = section['CUSTOM_DB']
self.production_run = section['PRODUCTION_RUN']
self.email_list_production = section['EMAIL_LIST_PRODUCTION']
self.host_ip_production = section['HOST_IP_PRODUCTION']
self.email_list_test = section['EMAIL_LIST_TEST']
self.host_ip_test = section['HOST_IP_TEST']
if 'RADIO_DICTIONARY' in config_file.sections():
section = config_file['RADIO_DICTIONARY']
self.radio_dict = json.loads(section.get('RADIO_DICT', self.radio_dict))
self.logger.info("self.radio_dict {}".format(self.radio_dict))
if 'TEST_DICTIONARY' in config_file.sections():
section = config_file['TEST_DICTIONARY']
if self.test_suite in config_file.sections():
section = config_file[self.test_suite]
# for json replace the \n and \r they are invalid json characters, allows for multiple line args
try:
self.test_dict = json.loads(section.get('TEST_DICT', self.test_dict).replace('\n',' ').replace('\r',' '))
self.logger.info("TEST_DICTIONARY: {}".format(self.test_dict))
self.logger.info("{}: {}".format(self.test_suite,self.test_dict))
except:
self.logger.info("Excpetion loading TEST_DICTIONARY, is there comma after the last entry? Check syntax")
self.logger.info("Excpetion loading {}, is there comma after the last entry? Check syntax".format(self.test_suite))
else:
self.logger.info("EXITING... NOT FOUND Test Suite with name : {}".format(self.test_suite))
exit(1)
def load_factory_default_db(self):
#self.logger.info("file_wd {}".format(self.scripts_wd))
@@ -273,12 +451,10 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
out, err = process.communicate()
errcode = process.returncode
# Not currently used
# not currently used
def load_blank_db(self):
#self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
@@ -287,10 +463,8 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
def load_custom_db(self,custom_db):
#self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
@@ -310,10 +484,10 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
self.logger.info("test: {} skipped".format(test))
# load the default database
elif self.test_dict[test]['enabled'] == "TRUE":
# Make the command replace ment a separate method call.
# make the command replace ment a separate method call.
# loop through radios
for radio in self.radio_dict:
# Replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values)
# replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values)
# not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues
# --num_stations needs to be int not string (no double quotes)
if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']:
@@ -341,26 +515,35 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES',self.col_names)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',self.col_names)
if self.use_factory_default_db == "TRUE":
self.load_factory_default_db()
sleep(3)
self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT")
if self.use_blank_db == "TRUE":
self.load_blank_db()
sleep(1)
self.logger.info("BLANK loaded between tests with scenario.py --load BLANK")
if self.use_custom_db == "TRUE":
try:
self.load_custom_db(self.custom_db)
if 'load_db' in self.test_dict[test]:
self.logger.info("load_db : {}".format(self.test_dict[test]['load_db']))
if str(self.test_dict[test]['load_db']).lower() != "none" and str(self.test_dict[test]['load_db']).lower() != "skip":
try:
self.load_custom_db(self.test_dict[test]['load_db'])
except:
self.logger.info("custom database failed to load check existance and location: {}".format(self.test_dict[test]['load_db']))
else:
self.logger.info("no load_db present in dictionary, load db normally")
if self.use_factory_default_db == "TRUE":
self.load_factory_default_db()
sleep(3)
self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT")
if self.use_blank_db == "TRUE":
self.load_blank_db()
sleep(1)
self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db,self.custom_db))
except:
self.logger.info("custom database failed to load check existance and location")
else:
self.logger.info("no db loaded between tests: {}".format(self.use_custom_db))
sleep(1) # the sleep is to allow for the database to stablize
self.logger.info("BLANK loaded between tests with scenario.py --load BLANK")
if self.use_custom_db == "TRUE":
try:
self.load_custom_db(self.custom_db)
sleep(1)
self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db,self.custom_db))
except:
self.logger.info("custom database failed to load check existance and location: {}".format(self.custom_db))
else:
self.logger.info("no db loaded between tests: {}".format(self.use_custom_db))
sleep(1) # DO NOT REMOVE the sleep is to allow for the database to stablize
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
@@ -381,21 +564,22 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
#self.logger.info("stderr_log_txt: {}".format(stderr_log_txt))
stderr_log = open(stderr_log_txt, 'a')
# HERE is thwere the test is run
print("running {}".format(command))
process = subprocess.Popen((command).split(' '), shell=False, stdout=stdout_log, stderr=stderr_log, universal_newlines=True)
try:
#out, err = process.communicate()
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
process.terminate()
self.test_result = "TIMEOUT"
process = subprocess.Popen((command).split(' '), shell=False, stdout=stdout_log, stderr=stderr_log, universal_newlines=True)
# if there is a better solution please propose, the TIMEOUT Result is different then FAIL
try:
#out, err = process.communicate()
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
process.terminate()
self.test_result = "TIMEOUT"
#if err:
# self.logger.info("command Test timed out: {}".format(command))
except:
print("No such file or directory with command: {}".format(command))
self.logger.info("No such file or directory with command: {}".format(command))
#self.logger.info(stderr_log_txt)
if(self.test_result != "TIMEOUT"):
stderr_log_size = os.path.getsize(stderr_log_txt)
if stderr_log_size > 0 :
@@ -433,7 +617,6 @@ NOTE: for now to see stdout and stderr remove /home/lanforge from path.
else:
self.logger.info("enable value {} invalid for test: {}, test skipped".format(self.test_dict[test]['enabled'],test))
self.finish_html_results()
def main():
@@ -442,7 +625,7 @@ def main():
prog='lf_check.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
lf_check.py : for running scripts listed in lf_check_config.ini file
lf_check.py : running scripts listed in <config>.ini or <config>.json
''',
description='''\
lf_check.py
@@ -450,15 +633,54 @@ lf_check.py
Summary :
---------
for running scripts listed in lf_check_config.ini
running scripts listed in <config>.ini or <config>.json
Example :
./lf_check.py --ini lf_check_test.ini --suite suite_one
./lf_check.py --use_json --json lf_check_test.json --suite suite_two
---------
''')
parser.add_argument('--ini', help="--ini <config.ini file> default lf_check_config.ini", default="lf_check_config.ini")
parser.add_argument('--json', help="--json <lf_ckeck_config.json file> ", default="lf_check_config.json")
parser.add_argument('--use_json', help="--use_json ", action='store_true')
parser.add_argument('--suite', help="--suite <suite name> default TEST_DICTIONARY", default="TEST_DICTIONARY")
parser.add_argument('--production', help="--production stores true, sends email results to production email list", action='store_true')
parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated", default="")
parser.add_argument('--logfile', help="--logfile <logfile Name> logging for output of lf_check.py script", default="lf_check.log")
args = parser.parse_args()
args = parser.parse_args()
# output report.
# load test config file information either <config>.json or <config>.ini
use_json = False
json_data = ""
config_ini = ""
if args.use_json:
use_json = True
try:
print("args.json {}".format(args.json))
with open(args.json, 'r') as json_config:
json_data = json.load(json_config)
except:
print("Error reading {}".format(args.json))
else:
config_ini = os.getcwd() + '/' + args.ini
if os.path.exists(config_ini):
print("TEST CONFIG : {}".format(config_ini))
else:
print("EXITING: NOTFOUND TEST CONFIG : {} ".format(config_ini))
exit(1)
# select test suite
test_suite = args.suite
if args.production:
production = True
print("Email to production list")
else:
production = False
print("Email to email list")
# create report class for reporting
report = lf_report(_results_dir_name="lf_check",
_output_html="lf_check.html",
_output_pdf="lf-check.pdf")
@@ -470,16 +692,21 @@ for running scripts listed in lf_check_config.ini
outfile_path = report.file_add_path(outfile)
# lf_check() class created
check = lf_check(_csv_results = csv_results,
check = lf_check(_use_json = use_json,
_config_ini = config_ini,
_json_data = json_data,
_test_suite = test_suite,
_production = production,
_csv_results = csv_results,
_outfile = outfile_path)
# get the git sha
# get git sha
process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
(commit_hash, err) = process.communicate()
exit_code = process.wait()
git_sha = commit_hash.decode('utf-8','ignore')
# set up logging
# set up logging
logfile = args.logfile[:-4]
print("logfile: {}".format(logfile))
logfile = "{}-{}.log".format(logfile,current_time)
@@ -493,13 +720,15 @@ for running scripts listed in lf_check_config.ini
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler(sys.stdout)) # allows to logging to file and stdout
# logger setup print out sha
logger.info("commit_hash: {}".format(commit_hash))
logger.info("commit_hash2: {}".format(commit_hash.decode('utf-8','ignore')))
check.read_config_contents() # CMR need mode to just print out the test config and not run
# read config and run tests
check.read_config()
check.run_script_test()
# Generate Ouptput reports
# generate output reports
report.set_title("LF Check: lf_check.py")
report.build_banner()
report.start_content_div()
@@ -514,7 +743,6 @@ for running scripts listed in lf_check_config.ini
print("html report: {}".format(html_report))
report.write_pdf_with_timestamp()
report_path = os.path.dirname(html_report)
parent_report_dir = os.path.dirname(report_path)
@@ -523,7 +751,6 @@ for running scripts listed in lf_check_config.ini
# duplicates html_report file up one directory
lf_check_html_report = parent_report_dir + "/{}.html".format(outfile)
#
banner_src_png = report_path + "/banner.png"
banner_dest_png = parent_report_dir + "/banner.png"
CandelaLogo_src_png = report_path + "/CandelaLogo2-90dpi-200x90-trans.png"
@@ -555,6 +782,7 @@ for running scripts listed in lf_check_config.ini
shutil.copyfile(custom_src_css, custom_dest_css)
shutil.copyfile(font_src_woff, font_dest_woff)
# print out locations of results
print("lf_check_latest.html: "+lf_check_latest_html)
print("lf_check_html_report: "+lf_check_html_report)

View File

@@ -23,8 +23,6 @@ LOAD_BLANK_DB = FALSE
LOAD_FACTORY_DEFAULT_DB = TRUE
LOAD_CUSTOM_DB = FALSE
CUSTOM_DB = DFLT_ETH1_GEN
PRODUCTION_RUN = FALSE # determine whom to send emails to
#EMAIL_LIST_PRODUCTION = scripters@candelatech.com
EMAIL_LIST_PRODUCTION = chuck.rekiere@candelatech.com
HOST_IP_PRODUCTION = 192.168.95.6
EMAIL_LIST_TEST = chuck.rekiere@candelatech.com
@@ -52,11 +50,8 @@ UPSTREAM_PORT = eth1
# NOTE: KEY must match ELEMENT of the DICTIONARY (RADIO_1_CFG == "KEY":"RADIO_1_CFG")
[RADIO_DICTIONARY]
RADIO_DICT: {
"RADIO_0_CFG":{"KEY":"RADIO_0_CFG","RADIO":"wiphy0","STATIONS":"4","SSID":"ssid-wpa2","PASSWD":"ssidpw-wpa2","SECURITY":"wpa2"},
"RADIO_1_CFG":{"KEY":"RADIO_1_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ct523c-vap","PASSWD":"ct523c-vap","SECURITY":"wpa2"},
"RADIO_2_CFG":{"KEY":"RADIO_2_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wpa","PASSWD":"ssidpw-wpa","SECURITY":"wpa"},
"RADIO_3_CFG":{"KEY":"RADIO_3_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wep","PASSWD":"ssidpw-wep","SECURITY":"wep"},
"RADIO_4_CFG":{"KEY":"RADIO_4_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wpa3","PASSWD":"ssidpw-wpa3","SECURITY":"wpa3"}
"RADIO_0_CFG":{"KEY":"RADIO_0_CFG","RADIO":"wiphy0","STATIONS":"4","SSID":"ct523c-vap","PASSWD":"ct523c-vap","SECURITY":"wpa2"},
"RADIO_1_CFG":{"KEY":"RADIO_1_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ct523c-vap","PASSWD":"ct523c-vap","SECURITY":"wpa2"}
}
# Not used
@@ -66,123 +61,20 @@ LF_MGR_PORT=8080
# REPORTS are in /home/lanforge/html-reports
# if /home/lanforge/html-reports not present then reports stored in local directory
#[REPORTS]
#REPORT_DIR="/home/lanforge/html-reports"
# TEST_DICTIONARY used by lf_check, Other section names will be ignored so can save other test lists
# TEST_DICTIONARY_ENABLE_1 is an example, it will not run unless the name is changed to TEST_DICTIONARY
[TEST_DICTIONARY_EXAMPLE_1]
#[TEST_DICTIONARY]
[TEST_DICTIONARY]
TEST_DICT: {
"test_ipv4_l4":{"enabled":"FALSE","command":"test_ipv4_l4.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --num_stations 4 --test_duration 15s --debug"},
"test_ipv4_variable_time2":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --test_duration 15s --output_format excel --layer3_cols name,tx_bytes,rx_bytes,dropped --traffic_type lf_udp --debug"}
"create_l3":{"enabled":"TRUE","command":"create_l4.py","args":"--radio RADIO_USED --ssid SSID_USED --passwd SSID_PW_USED --security SECURITY_USED --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"RADIO_1_CFG --debug"},
# the name needs to be unique for the dictionary
"create_l4_2":{"enabled":"TRUE","command":"create_l4.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --debug"}
}
# TEST_DICTIONARY used by lf_check, Other section names will be ignored so can save other test lists
[TEST_DICTIONARY_EXAMPLE_2]
#[TEST_DICTIONARY]
[SUITE]
TEST_DICT: {
"create_l3":{"enabled":"TRUE","command":"create_l3.py","args":"RADIO_1_CFG --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"RADIO_1_CFG --debug"}
}
# This is an EXAMPLE dictionary of tests that can be run, copy to TEST_DICTIONARY to test.
# Feature update pass in the DICTIONARY name to be run
[TEST_DICTIONARY_EXAMPLE_3]
#[TEST_DICTIONARY]
TEST_DICT: {
"example_security_connection0":{"enabled":"TRUE","command":"example_security_connection.py","args":"RADIO_1_CFG --debug"},
"example_security_connection1":{"enabled":"TRUE","command":"example_security_connection.py","args":"RADIO_2_CFG --debug"},
"example_security_connection2":{"enabled":"TRUE","command":"example_security_connection.py","args":"RADIO_3_CFG --debug"},
"example_security_connection3":{"enabled":"TRUE","command":"example_security_connection.py","args":"RADIO_4_CFG --debug"},
"sta_connect2":{"enabled":"TRUE","command":"sta_connect2.py","args":"--dut_ssid ssid-wpa2 --dut_passwd ssidpw-wpa2 --dut_security wpa2"},
"sta_connect_example":{"enabled":"TRUE","command":"sta_connect_example.py"},
"test_generic0":{"enabled":"TRUE","command":"test_generic.py","args":"RADIO_1_CFG --type lfping --dest TEST_IP --debug"},
"test_generic1":{"enabled":"TRUE","command":"test_generic.py","args":"RADIO_1_CFG --type lfping --dest TEST_IP --debug"},
"test_generic2":{"enabled":"TRUE","command":"test_generic.py","args":"RADIO_1_CFG --type lfping --dest TEST_IP --debug"},
"testgroup":{"enabled":"TRUE","command":"testgroup.py","args":"--group_name group1 --add_group --list_groups --debug"},
"test_ipv4_connection":{"enabled":"TRUE","command":"test_ipv4_connection.py","args":"RADIO_1_CFG --debug"},
"test_ipv4_l4_urls_per_ten":{"enabled":"TRUE","command":"test_ipv4_l4_urls_per_ten.py","args":"RADIO_1_CFG --num_tests 1 --requests_per_ten 600 --target_per_ten 600 --debug"},
"test_ipv4_l4_wifi":{"enabled":"TRUE","command":"test_ipv4_l4_wifi.py","args":"RADIO_1_CFG --test_duration 15s --debug"},
"test_ipv4_l4":{"enabled":"TRUE","command":"test_ipv4_l4.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --num_stations 4 --test_duration 15s --debug"},
"test_ipv4_variable_time0":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"RADIO_1_CFG --test_duration 15s --output_format excel --layer3_cols COL_NAMES --traffic_type lf_udp --debug"},
"test_ipv4_variable_time1":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"RADIO_1_CFG --test_duration 15s --output_format csv --layer3_cols COL_NAMES --traffic_type lf_udp --debug"},
"test_ipv4_variable_time2":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --test_duration 15s --output_format excel --layer3_cols name,tx_bytes,rx_bytes,dropped --traffic_type lf_udp --debug"},
"test_ipv4_variable_time3":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"RADIO_1_CFG --test_duration 15s --output_format csv --layer3_cols COL_NAMES --traffic_type lf_udp --debug"},
"create_bridge":{"enabled":"TRUE","command":"create_bridge.py","args":"--radio RADIO_USED --upstream_port UPSTREAM_PORT --target_device sta0000 --debug"},
"create_l3":{"enabled":"TRUE","command":"create_l3.py","args":"RADIO_1_CFG --debug"},
"create_l3":{"enabled":"TRUE","load_db":"none","command":"create_l4.py","args":"--radio RADIO_USED --ssid SSID_USED --passwd SSID_PW_USED --security SECURITY_USED --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"RADIO_1_CFG --debug"},
"create_station":{"enabled":"TRUE","command":"create_station.py","args":"--radio RADIO_USED --ssid SSID_USED --passwd SSID_PW_USED --security SECURITY_USED --debug"},
"test_fileio":{"enabled":"TRUE","command":"test_fileio.py","args":"--macvlan_parent eth2 --num_ports 3 --use_macvlans --first_mvlan_ip 192.168.92.13 --netmask 255.255.255.0 --gateway 192.168.92.1 --test_duration 30s"},
"test_l3_longevity":{"enabled":"TRUE","command":"test_l3_longevity.py","args":"--test_duration 20s --polling_interval 5s --upstream_port eth1
--radio 'radio==wiphy1,stations==4,ssid==ct523c-vap,ssid_pw==ct523c-vap,security==wpa2'
--radio 'radio==wiphy2,stations==4,ssid==ct523c-vap,ssid_pw==ct523c-vap,security==wpa2'
--radio 'radio==wiphy3,stations==4,ssid==ct523c-vap,ssid_pw==ct523c-vap,security==wpa2'
--endp_type lf_udp --ap_read --ap_test_mode --rates_are_totals --side_a_min_bps=20000 --side_b_min_bps=300000000"},
"test_status_msg":{"enabled":"TRUE","command":"test_status_msg.py","args":"--action run_test"},
"test_wanlink":{"enabled":"TRUE","command":"test_wanlink.py","args":"--debug"},
"sta_connect_example":{"enabled":"TRUE","command":"sta_connect_example.py","args":"RADIO_1_CFG --upstream_port UPSTREAM_PORT"},
"wlan_capacity_calculator1":{"enabled":"TRUE","command":"./wlan_capacity_calculator.py","args":"-sta 11abg -t Voice -p 48 -m 106 -e WEP -q Yes -b 1 2 5.5 11 -pre Long -s N/A -co G.711 -r Yes -c Yes"},
"wlan_capacity_calculator2":{"enabled":"TRUE","command":"./wlan_capacity_calculator.py","args":"-sta 11n -t Voice -d 17 -ch 40 -gu 800 -high 9 -e WEP -q Yes -ip 5 -mc 42 -b 6 9 12 24 -m 1538 -co G.729 -pl Greenfield -cw 15 -r Yes -c Yes"},
"wlan_capacity_calculator3":{"enabled":"TRUE","command":"./wlan_capacity_calculator.py","args":"-sta 11ac -t Voice -d 9 -spa 3 -ch 20 -gu 800 -high 1 -e TKIP -q Yes -ip 3 -mc 0 -b 6 12 24 54 -m 1518 -co Greenfield -cw 15 -rc Yes"}
}
# the name needs to be unique for the dictionary
"create_l4_2":{"enabled":"TRUE","command":"create_l4.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --debug"}
}
# This LISA is used currelty for facilitating getting testing on LISA
[TEST_DICTIONARY_LISA_SHORT]
#[TEST_DICTIONARY]
TEST_DICT: {
"create_l3":{"enabled":"TRUE","command":"create_l3.py","args":"--radio RADIO_USED --ssid SSID_USED --passwd SSID_PW_USED --security SECURITY_USED --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"RADIO_1_CFG --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"}
}
#[TEST_DICTIONARY]
[TEST_DICTIONARY_LISA]
TEST_DICT: {
"example_security_connection0":{"enabled":"FALSE","command":"example_security_connection.py","args":"--num_stations 4 --ssid jedway-wpa-1 --passwd jedway-wpa-1 --radio wiphy1 --security wpa --debug"},
"example_security_connection1":{"enabled":"FALSE","command":"example_security_connection.py","args":"--num_stations 4 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --radio wiphy1 --security wpa2 --debug"},
"example_security_connection2":{"enabled":"FALSE","command":"example_security_connection.py","args":"--num_stations 4 --ssid jedway-wep-48 --passwd 0123456789 --radio wiphy1 --security wep --debug"},
"example_security_connection2":{"enabled":"FALSE","command":"example_security_connection.py","args":"--num_stations 4 --ssid jedway-wpa3-1 --passwd jedway-wpa3-1 --radio wiphy1 --security wpa3 --debug"},
"sta_connect2":{"enabled":"FALSE","command":"sta_connect2.py","args":"--dut_ssid ssid-wpa2 --dut_passwd ssidpw-wpa2 --dut_security wpa2"},
"sta_connect_example":{"enabled":"FALSE","command":"sta_connect_example.py","args":""},
"test_fileio":{"enabled":"FALSE","command":"test_fileio.py","args":"--macvlan_parent eth2 --num_ports 3 --use_macvlans --first_mvlan_ip 192.168.92.13 --netmask 255.255.255.0 --test_duration 30s --gateway 192.168.92.1"},
"test_generic0":{"enabled":"FALSE","command":"test_generic.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --num_stations 4 --type lfping --dest 10.40.0.1 --debug"},
"test_generic1":{"enabled":"FALSE","command":"test_generic.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --num_stations 4 --type speedtest --speedtest_min_up 20 --speedtest_min_dl 20 --speedtest_max_ping 150 --security wpa2 --debug"},
"test_generic2":{"enabled":"FALSE","command":"test_generic.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --num_stations 4 --type iperf3 --debug"},
"test_generic3":{"enabled":"FALSE","command":"test_generic.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --num_stations 4 --type lfcurl --dest 10.40.0.1 --file_output /home/lanforge/Documents/lfcurl_output.txt --debug"},
"testgroup":{"enabled":"FALSE","command":"testgroup.py","args":"--group_name group1 --add_group --list_groups --debug"},
# testgroup_list_groups
# testgroup_list_connections
# testgroup_delete_group
"testgroup5":{"enabled":"TRUE","command":"testgroup.py","args":"--num_stations 4 --ssid lanforge --passwd password --security wpa2 --radio wiphy0 --group_name group0 --add_group"},
"test_ipv4_connection":{"enabled":"TRUE","command":"test_ipv4_connection.py","args":"--radio wiphy1 --num_stations 4 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"},
"test_ipv4_l4_urls_per_ten":{"enabled":"TRUE","command":"test_ipv4_l4_urls_per_ten.py","args":"--radio wiphy1 --num_stations 4 --security wpa2 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --num_tests 1 --requests_per_ten 600 --target_per_ten 600 --debug"},
"test_ipv4_l4_wifi":{"enabled":"TRUE","command":"test_ipv4_l4_wifi.py","args":"--radio wiphy1 --num_stations 4 --security wpa2 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --test_duration 15s --debug"},
"test_ipv4_l4":{"enabled":"TRUE","command":"test_ipv4_l4.py","args":"--radio wiphy1 --num_stations 4 --security wpa2 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --test_duration 15s --debug"},
"test_ipv4_variable_time0":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --test_duration 15s --output_format excel --layer3_cols name,tx_bytes,rx_bytes,dropped --traffic_type lf_udp --debug"},
"test_ipv4_variable_time1":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --test_duration 15s --output_format csv --layer3_cols name,tx_bytes,rx_bytes,dropped --traffic_type lf_udp --debug"},
"test_ipv4_l4_ftp_upload":{"enabled":"TRUE","command":"test_ipv4_l4_ftp_upload.py","args":"--upstream_port eth1 --radio wiphy1 --num_stations 4 --security wpa2 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --test_duration 15s --debug"},
"test_ipv6_connection":{"enabled":"TRUE","command":"test_ipv6_connection.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"},
"test_ipv6_variable_time":{"enabled":"TRUE","command":"test_ipv6_variable_time.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --test_duration 15s --cx_type tcp6 --debug"},
"test_ipv6_variable_time":{"enabled":"TRUE","command":"test_ipv6_variable_time.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"},
"test_l3_longevity":{"enabled":"TRUE","command":"test_l3_longevity.py","args":"--test_duration 15s --polling_interval 5s --upstream_port eth1
--radio 'radio==wiphy0,stations==4,ssid==jedway-wpa2-x2048-5-3,ssid_pw==jedway-wpa2-x2048-5-3,security==wpa2'
--radio 'radio==wiphy2,stations==4,ssid==jedway-wpa2-x2048-5-3,ssid_pw==jedway-wpa2-x2048-5-3,security==wpa2'
--radio 'radio==wiphy3,stations==4,ssid==ct523c-vap,ssid_pw==ct523c-vap,security==wpa2'
--endp_type lf_udp --rates_are_totals --side_a_min_bps=20000 --side_b_min_bps=300000000"},
"test_l3_powersave_traffic":{"enabled":"TRUE","command":"test_l3_powersave_traffic.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"},
"test_status_msg":{"enabled":"TRUE","command":"test_status_msg.py","args":"--action run_test"},
"test_wanlink":{"enabled":"TRUE","command":"test_wanlink.py","args":"--debug"},
"create_bridge":{"enabled":"TRUE","command":"create_bridge.py","args":"--radio wiphy1 --upstream_port eth1 --target_device sta0000 --debug"},
"create_l3":{"enabled":"TRUE","command":"create_l3.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"},
"create_macvlan":{"enabled":"TRUE","command":"create_macvlan.py","args":"--radio wiphy1 --macvlan_parent eth1 --debug"},
"create_station":{"enabled":"TRUE","command":"create_station.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"},
"create_vap":{"enabled":"TRUE","command":"create_vap.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"},
"create_vr":{"enabled":"TRUE","command":"create_vr.py","args":"--vr_name 2.vr0 --ports 2.br0,2.vap2 --services"},
"create_qvlan":{"enabled":"TRUE","command":"create_qvlan.py","args":"--radio wiphy1 --qvlan_parent eth1"},
"wlan_capacity_calculator1":{"enabled":"TRUE","command":"./wlan_capacity_calculator.py","args":"-sta 11abg -t Voice -p 48 -m 106 -e WEP -q Yes -b 1 2 5.5 11 -pre Long -s N/A -co G.711 -r Yes -c Yes"},
"wlan_capacity_calculator2":{"enabled":"TRUE","command":"./wlan_capacity_calculator.py","args":"-sta 11n -t Voice -d 17 -ch 40 -gu 800 -high 9 -e WEP -q Yes -ip 5 -mc 42 -b 6 9 12 24 -m 1538 -co G.729 -pl Greenfield -cw 15 -r Yes -c Yes"},
"wlan_capacity_calculator3":{"enabled":"TRUE","command":"./wlan_capacity_calculator.py","args":"-sta 11ac -t Voice -d 9 -spa 3 -ch 20 -gu 800 -high 1 -e TKIP -q Yes -ip 3 -mc 0 -b 6 12 24 54 -m 1518 -co Greenfield -cw 15 -rc Yes"}
}

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
'''
NAME: <file name>
PURPOSE:
<what does the script do>
EXAMPLE:
<example of how to run the script command line parameters>
SETUP:
<special setup to run the sript>
NOTES:
<Include setup, other information that would be helpful to the user>
COPYRIGHT:
Copyright 2021 Candela Technologies Inc
INCLUDE_IN_README
'''