Merge ../lanforge-scripts

This commit is contained in:
shivamcandela
2021-06-07 14:12:44 +05:30
15 changed files with 1587 additions and 154 deletions

Binary file not shown.

View File

@@ -0,0 +1,11 @@
/* file reserved for customer styling of reports */
/* rename to custom.css to take effect */
.TitleFont {}
.HeaderFont {}
.TableFont {}
.TableBorder {}
.ImgStyle {}
.HeaderStyle {}
.FooterStyle {}
/* eof */

View File

@@ -0,0 +1,299 @@
html, body,div {
margin: 0;
padding:0;
font-size: 14px;
}
h1,h2,h3,h4 {
padding: 0em;
line-height: 1.5;
text-align: left;
color: rgb(42,91,41);
}
@font-face {
font-family: CenturyGothic;
src: url("CenturyGothic.woff"),
url("images/CenturyGothic.woff"),
url("/images/CenturyGothic.woff"),
url("http://www.candelatech.com/images/CenturyGothic.woff");
}
body,h1,h2,h3,h4 {
font-family: CenturyGothic, "Century Gothic", Arial, Helvetica, sans-serif;
}
h1 { font-size: 30px;}
h2 { font-size: 24px;}
h3 { font-size: 18px;}
h4 { font-size: 14px;}
li,pre,tt {
text-align: left;
}
pre {
font-size: 10px;
}
table {
border-collapse: collapse;
background: #e0e0e0;
}
table, td, th {
border: 1px solid gray;
padding 4px;
}
table.noborder, table.noborder td, table.noborder th {
border: 0 none;
}
td {
background: white;
}
td.ar {
text-align: right;
}
th {
color: rgb(42,91,41);
text-align: center;
}
#lf_title {
text-align: center;
background-image: url(candela_swirl_small-72h.png);
background-position: right;
background-repeat: no-repeat;
height: 90px;
}
#new_chart {
display: block;
height: 250px;
min-width: 200px;
width: 80%;
border: 1px solid black;
margin: 14px auto;
padding: 14px;
vertical-align: bottom;
text-align: center;
}
.lf_chart {
margin: 1em;
padding: 5px;
}
#error_types ul {
background: #f0f0f0;
font-size: 12px;
line-height: 1.5;
margin: 1em;
padding: 0.25em inherit 0.25em inherit;
max-height: 8em;
overflow: auto;
}
li {
line-height: 1.5;
}
.contentDiv {
min-width: 800px;
max-width: 8in;
margin: 1em auto;
padding: 0;
}
.ct-point {
stroke-width: 6px;}
.o_el {
display: inline-block;
width: 100px;
height: 230px;
border: none;
margin: 1px 1px 16px 1px;
padding: 10px 10px 0 10px;
background: #eee;
text-align: center;
vertical-align: bottom;
}
.bar_el {
display: block;
background: green;
border: none;
min-height: 1px;
margin: 0 0 5px 0;
padding: 0;
text-align: center;
}
.label_el {
color: black;
display: block;
font-size: 14px;
font-family: Arial,Helvetica,sans-serif,mono;
margin: 1px;
text-align: center;
vertical-align: bottom;
width: inherit;
}
.value_el {
font-family: Arial,Helvetica,sans-serif,mono;
color: black;
display: block;
font-size: 14px;
margin: 0 auto;
padding: none;
border: none;
background: white;
text-align: center;
vertical-align: bottom;
width: auto;
}
.value_el>span {
background: #f0f0f0a0;
border: 1px solid #f0f0f0a0;
border-radius: 5px;
padding: 1px;
min-width: 2em;
}
.error {
color: red;
}
@media only screen {
.hideFromPrint { }
.hideFromScreen { display:none; }
}
@media only print {
.hideFromScreen { }
.hideFromPrint { display:none; }
}
/* these styles will get overridden by custom.css */
#BannerBack {
background-color: #e68b15;
height: 205px;
max-height: 205px;
border: 0 none;
margin: 0;
padding: 0;
top: 0;
left: 0;
width: 100%;
}
#Banner {
background-image:url("banner.png");
background-repeat:no-repeat;
padding: 0;
margin: 0 auto;
min-width: 1000px;
min-height: 205px;
width: 1000px;
height: 205px;
max-width: 1000px;
max-height: 205px;
}
#BannerLogo {
text-align: right;
padding: 25px;
margin: 5px;
width: 200px;
border: none;
}
.TitleFontScreen {
margin-left: auto;
margin-right: auto;
margin-top: 1em;
margin-bottom: 0.2em;
font-size: 50px;
padding-top: 1em;
}
.TitleFontPrint {
line-height: 1;
margin-left: 0px;
margin-right: auto;
margin-top: 0.5em;
margin-bottom: 0.2em;
padding-top: 20px;
padding-left: 20px;
color: darkgreen;
}
.TitleFontPrintSub {
line-height: 1;
margin-left: 0px;
margin-right: auto;
margin-top: 0;
margin-bottom: 0;
/*font-size: 20px; Let 'h3', etc control this */
padding-top: 0px;
padding-left: 20px;
}
.HeaderFont {}
.TableFont {}
.TableBorder {}
.ImgStyle {}
div.Section h1, div.Section h2 {
margin: 0 0 0 0em;
}
div.HeaderStyle h1, div.HeaderStyle h2 {
text-align: left;
margin: 0 0 0 0;
max-width: 8in;
min-width: 800px;
}
div.Section {
padding 5px;
position: relative;
}
div.Section img {
margin: 0;
padding: 0;
position: relative;
top: 50%;
transform: translateY(-50%);
}
div.FooterStyle {
width: 100%;
vertical-align: middle;
border: 0 none;
border-top: 2px solid #2A5B29;
color: #2A5B29;
font-size: 12px;
margin-top: 2em;
}
div.FooterStyle img {
width: auto;
height: auto;
text-align: right;
}
div.FooterStyle span.Gradient {
background: white;
color: #2A5B29;
display: inline-block;
height: 30px;
line-height: 1;
padding-top: 22px;
padding-bottom: 20px;
padding-left: 2em;
vertical-align: middle;
max-width:80%;
float:left;
width:50%;
}
.FooterStyle a, .FooterStyle a:visited {
color: #2A5B29;
font-size: 12px;
line-height: 1;
height: 30px;
margin: 0;
padding: 0;
vertical-align: middle;
}
div.FooterStyle a.LogoImgLink {
display: inline-block;
text-align: right;
float: right;
}
a .LogoImgLink {
}
a.LogoImgLink img {
}
table.dataframe {
margin: 1em;
padding: 0;
}
table.dataframe tr th {
padding: 0.5em;
}

View File

@@ -33,6 +33,7 @@ class CreateStation(Realm):
_proxy_str=None,
_debug_on=False,
_up=True,
_set_txo_data=None,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(_host,
@@ -48,6 +49,7 @@ class CreateStation(Realm):
self.number_template = _number_template
self.debug = _debug_on
self.up = _up
self.set_txo_data = _set_txo_data
self.station_profile = self.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
@@ -60,7 +62,6 @@ class CreateStation(Realm):
pprint.pprint(self.sta_list)
print("---- ~Station List ----- ----- ----- ----- ----- ----- \n")
def build(self):
# Build stations
self.station_profile.use_security(self.security, self.ssid, self.password)
@@ -70,6 +71,15 @@ class CreateStation(Realm):
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
if self.set_txo_data is not None:
self.station_profile.set_wifi_txo(txo_ena=self.set_txo_data["txo_enable"],
tx_power=self.set_txo_data["txpower"],
pream=self.set_txo_data["pream"],
mcs=self.set_txo_data["mcs"],
nss=self.set_txo_data["nss"],
bw=self.set_txo_data["bw"],
retries=self.set_txo_data["retries"],
sgi=self.set_txo_data["sgi"], )
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
if self.up:
self.station_profile.admin_up()
@@ -98,14 +108,14 @@ Command example:
--debug
''')
required = parser.add_argument_group('required arguments')
#required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True)
# required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True)
args = parser.parse_args()
#if args.debug:
# if args.debug:
# pprint.pprint(args)
# time.sleep(5)
if (args.radio is None):
raise ValueError("--radio required")
raise ValueError("--radio required")
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
@@ -113,20 +123,31 @@ Command example:
num_sta = num_stations_converted
station_list = LFUtils.port_name_series(prefix="sta",
start_id=0,
end_id=num_sta-1,
padding_number=10000,
radio=args.radio)
start_id=0,
end_id=num_sta - 1,
padding_number=10000,
radio=args.radio)
set_txo_data={
"txo_enable": 1,
"txpower": 255,
"pream": 0,
"mcs": 0,
"nss": 0,
"bw": 3,
"retries": 1,
"sgi": 0
}
create_station = CreateStation(_host=args.mgr,
_port=args.mgr_port,
_ssid=args.ssid,
_password=args.passwd,
_security=args.security,
_sta_list=station_list,
_radio=args.radio,
_proxy_str=args.proxy,
_debug_on=args.debug)
_port=args.mgr_port,
_ssid=args.ssid,
_password=args.passwd,
_security=args.security,
_sta_list=station_list,
_radio=args.radio,
_set_txo_data=None,
_proxy_str=args.proxy,
_debug_on=args.debug)
create_station.build()
print('Created %s stations' % num_sta)

87
py-scripts/ghost_profile.py Executable file
View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""
NAME: ghost_profile.py
PURPOSE: modify ghost database from the command line.
SETUP: A Ghost installation which the user has admin access to.
EXAMPLE: ./ghost_profile.py --article_text_file text.txt --title Test --authors Matthew --ghost_token SECRET_KEY --host 192.168.1.1
Matthew Stidham
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
import argparse
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
sys.path.append(os.path.join(os.path.abspath('..'), 'py-dashboard'))
from GhostRequest import GhostRequest
from LANforge.lfcli_base import LFCliBase
class UseGhost(LFCliBase):
def __init__(self,
_ghost_token=None,
host="localhost",
port=8080,
_debug_on=False,
_exit_on_fail=False,
_ghost_host="localhost",
_ghost_port=2368, ):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.ghost_host = _ghost_host
self.ghost_port = _ghost_port
self.ghost_token = _ghost_token
self.GP = GhostRequest(self.ghost_host, str(self.ghost_port), _api_token=self.ghost_token)
def create_post(self, title, text, tags, authors):
return self.GP.create_post(title=title, text=text, tags=tags, authors=authors)
def create_post_from_file(self, title, file, tags, authors):
text = open(file).read()
return self.GP.create_post(title=title, text=text, tags=tags, authors=authors)
def main():
parser = LFCliBase.create_basic_argparse(
prog='ghost_profile.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''Manage Ghost Website''',
description='''
ghost_profile.py
----------------
Command example:
./ghost_profile.py
--ghost_token'''
)
optional = parser.add_argument_group('optional arguments')
optional.add_argument('--ghost_token', default=None)
optional.add_argument('--create_post', default=None)
optional.add_argument('--article_text_file', default=None)
optional.add_argument('--ghost_port', help='Ghost port if different from 2368', default=2368)
optional.add_argument('--ghost_host', help='Ghost host if different from localhost', default='localhost')
optional.add_argument('--article_text')
optional.add_argument('--article_tags', action='append')
optional.add_argument('--authors', action='append')
optional.add_argument('--title', default=None)
args = parser.parse_args()
Ghost = UseGhost(_ghost_token=args.ghost_token,
_ghost_port=args.ghost_port,
_ghost_host=args.ghost_host)
if args.create_post is not None:
Ghost.create_post(args.title, args.article_text, args.article_tags, args.authors)
if args.article_text_file is not None:
Ghost.create_post_from_file(args.title, args.article_text_file, args.article_tags, args.authors)
if __name__ == "__main__":
main()

View File

@@ -25,9 +25,10 @@ LICENSE:
INCLUDE_IN_README
'''
import datetime
import os
import shutil
import datetime
import pandas as pd
import pdfkit
@@ -50,7 +51,8 @@ class lf_report():
_results_dir_name = "LANforge_Test_Results",
_output_format = 'html', # pass in on the write functionality, current not used
_dataframe="",
_path_date_time=""): # this is where the final report is placed.
_path_date_time="",
_custom_css='custom-example.css'): # this is where the final report is placed.
#other report paths,
# _path is where the directory with the data time will be created
@@ -89,13 +91,15 @@ class lf_report():
self.logo_directory = "artifacts"
self.logo_file_name = "CandelaLogo2-90dpi-200x90-trans.png" # does this need to be configurable.
self.current_path = os.path.dirname(os.path.abspath(__file__))
self.custom_css = _custom_css
# pass in _date to allow to change after construction
self.set_date_time_directory(_date,_results_dir_name)
self.build_date_time_directory()
self.font_file = "CenturyGothic.woff"
# move the banners and candela images to report path
self.copy_banner()
self.copy_css()
self.copy_logo()
def copy_banner(self):
@@ -103,21 +107,35 @@ class lf_report():
banner_dst_file = str(self.path_date_time)+'/'+ str(self.banner_file_name)
#print("banner src_file: {}".format(banner_src_file))
#print("dst_file: {}".format(banner_dst_file))
shutil.copy(banner_src_file,banner_dst_file)
shutil.copy(banner_src_file, banner_dst_file)
def copy_css(self):
reportcss_src_file = str(self.current_path)+'/'+str(self.banner_directory)+'/report.css'
reportcss_dest_file = str(self.path_date_time)+'/report.css'
customcss_src_file = str(self.current_path)+'/'+str(self.banner_directory)+'/'+str(self.custom_css)
customcss_dest_file = str(self.path_date_time)+'/custom.css'
font_src_file = str(self.current_path)+'/'+str(self.banner_directory)+'/'+str(self.font_file)
font_dest_file = str(self.path_date_time)+'/'+str(self.font_file)
shutil.copy(reportcss_src_file, reportcss_dest_file)
shutil.copy(customcss_src_file, customcss_dest_file)
shutil.copy(font_src_file, font_dest_file)
def copy_logo(self):
logo_src_file = str(self.current_path)+'/'+str(self.logo_directory)+'/'+str(self.logo_file_name)
logo_dst_file = str(self.path_date_time)+'/'+ str(self.logo_file_name)
#print("logo_src_file: {}".format(logo_src_file))
#print("logo_dst_file: {}".format(logo_dst_file))
shutil.copy(logo_src_file,logo_dst_file)
shutil.copy(logo_src_file, logo_dst_file)
def move_graph_image(self,):
graph_src_file = str(self.graph_image)
graph_dst_file = str(self.path_date_time)+'/'+ str(self.graph_image)
print("graph_src_file: {}".format(graph_src_file))
print("graph_dst_file: {}".format(graph_dst_file))
shutil.move(graph_src_file,graph_dst_file)
shutil.move(graph_src_file, graph_dst_file)
def set_path(self,_path):
self.path = _path
@@ -157,6 +175,7 @@ class lf_report():
def set_graph_title(self,_graph_title):
self.graph_title = _graph_title
# The _date is set when class is enstanciated / created so this set_date should be used with caution, used to synchronize results
def set_date(self,_date):
self.date = _date
@@ -176,9 +195,12 @@ class lf_report():
def set_graph_image(self,_graph_image):
self.graph_image = _graph_image
def get_date(self):
return self.date
def get_path(self):
return self.path
# get_path_date_time, get_report_path and need to be the same ()
# get_path_date_time, get_report_path and need to be the same
def get_path_date_time(self):
return self.path_date_time
@@ -248,60 +270,56 @@ class lf_report():
def build_all(self):
self.build_banner()
self.start_content_div()
self.build_table_title()
self.build_table()
self.end_content_div()
def build_banner(self):
self.banner_html = """
<!DOCTYPE html>
<html lang='en'>
<head>
<meta charset='UTF-8'>
<meta name='viewport' content='width=device-width, initial-scale=1' />
<br>
</head>
<title>BANNER </title></head>
<body>
<div class='Section report_banner-1000x205' style='background-image:url("banner.png");background-repeat:no-repeat;padding:0;margin:0;min-width:1000px; min-height:205px;width:1000px; height:205px;max-width:1000px; max-height:205px;'>
<br>
<img align='right' style='padding:25;margin:5;width:200px;' src="CandelaLogo2-90dpi-200x90-trans.png" border='0' />
<div class='HeaderStyle'>
<br>
<h1 class='TitleFontPrint' style='color:darkgreen;'>""" + str(self.title) + """</h1>
<h3 class='TitleFontPrint' style='color:darkgreen;'>""" + str(self.date) + """</h3>
<br>
<br>
<br>
<br>
<br>
</div>
"""
# NOTE: {{ }} are the ESCAPED curly braces
self.banner_html = """<!DOCTYPE html>
<html lang='en'>
<head>
<meta charset='UTF-8'>
<meta name='viewport' content='width=device-width, initial-scale=1' />
<style>
body {{ margin: 0; padding: 0; }}
</style>
<link rel='stylesheet' href='report.css' />
<link rel='stylesheet' href='custom.css' />
<title>{title}</title>
</head>
<body>
<div id='BannerBack'>
<div id='Banner'>
<br/>
<img id='BannerLogo' align='right' src="CandelaLogo2-90dpi-200x90-trans.png" border='0' />
<div class='HeaderStyle'>
<h1 class='TitleFontPrint'>{title}</h1>
<h4 class='TitleFontPrintSub'>{date}</h4>
</div>
</div>
</div>
""".format(
title=self.title,
date=self.date,
)
self.html += self.banner_html
def build_table_title(self):
self.table_title_html = """
<html lang='en'>
<head>
<meta charset='UTF-8'>
<meta name='viewport' content='width=device-width, initial-scale=1' />
<div class='HeaderStyle'>
<h2 class='TitleFontPrint' style='color:darkgreen;'>""" + str(self.table_title) + """</h2>
"""
self.table_title_html = "<h2 class='TitleFontPrint''>{title}</h2>".format(title=self.table_title)
self.html += self.table_title_html
def build_text(self):
self.text_html = """
<html lang='en'>
<head>
<meta charset='UTF-8'>
<meta name='viewport' content='width=device-width, initial-scale=1' />
<div class='HeaderStyle'>
<h3 class='TitleFontPrint' style='color:darkgreen;'>""" + str(self.text) + """</h3>
"""
self.html += self.text_html
def start_content_div(self):
self.html += "\n<div class='contentDiv'>\n"
def build_text(self):
# please do not use 'style=' tags unless you cannot override a class
self.text_html = """
<div class='HeaderStyle'>
<h3 class='TitleFontPrint'>{text}</h3>\n
</div>""".format(text=self.text)
self.html += self.text_html
def build_date_time(self):
self.date_time = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-h-%m-m-%S-s")).replace(':','-')
@@ -325,30 +343,29 @@ class lf_report():
def build_objective(self):
self.obj_html = """
<!-- Test Objective -->
<h3 align='left'>""" + str(self.obj_title) + """</h3>
<p align='left' width='900'>""" + str(self.objective) + """</p>
"""
<!-- Test Objective -->
<h3 align='left'>{title}</h3>
<p align='left' width='900'>{objective}</p>
""".format(title=self.obj_title,
objective=self.objective)
self.html += self.obj_html
def build_graph_title(self):
self.table_graph_html = """
<html lang='en'>
<head>
<meta charset='UTF-8'>
<meta name='viewport' content='width=device-width, initial-scale=1' />
<div class='HeaderStyle'>
<h2 class='TitleFontPrint' style='color:darkgreen;'>""" + str(self.graph_title) + """</h2>
"""
<div class='HeaderStyle'>
<h2 class='TitleFontPrint' style='color:darkgreen;'>{title}</h2>
""".format(title=self.graph_title)
self.html += self.table_graph_html
def build_graph(self):
self.graph_html_obj = """
<img align='center' style='padding:15;margin:5;width:1000px;' src=""" + "%s" % (self.graph_image) + """ border='1' />
<br><br>
"""
<img align='center' style='padding:15px;margin:5px 5px 2em 5px;width:1000px;' src='{image}' border='1' />
""".format(image=self.graph_image)
self.html +=self.graph_html_obj
def end_content_div(self):
self.html += "\n</div><!-- end contentDiv -->\n"
# Unit Test
if __name__ == "__main__":

View File

@@ -0,0 +1,146 @@
#!/usr/bin/python3
'''
NAME:
lf_pdf_search.py
PURPOSE:
lf_pdf_search.py will run a pdf grep looking for specific information in pdf files
"pdfgrep -r --include 'ASA*.pdf' 'ASA End Date'"
EXAMPLE:
lf_pdf_search.py
NOTES:
1. copy lf_pdf_search.py to a directory that has the pdf information
TO DO NOTES:
'''
import datetime
import pprint
import sys
if sys.version_info[0] != 3:
print("This script requires Python3")
exit()
import os
import socket
import logging
import time
from time import sleep
import argparse
import json
import configparser
import subprocess
import csv
import shutil
import os.path
import xlsxwriter
import re
import pandas as pd
class lf_pdf_search():
def __init__(self):
self.renewal_info = ""
self.timeout = 10
self.outfile = "pdf_search"
self.result = ""
self.stdout_log_txt = ""
self.stdout_log = ""
self.stderr_log_txt = ""
self.stderr_log = ""
self.processed_log_txt = ""
self.dataframe = ""
self.pdf_search_csv = ""
def get_data(self):
# o.k. a little over kill here , just save data to file to help debug if something goes wrong
if self.outfile is not None:
self.stdout_log_txt = self.outfile
self.stdout_log_txt = self.stdout_log_txt + "-{}-stdout.txt".format("test")
self.stdout_log = open(self.stdout_log_txt, 'w+')
self.stderr_log_txt = self.outfile
self.stderr_log_txt = self.stderr_log_txt + "-{}-stderr.txt".format("test")
#self.logger.info("stderr_log_txt: {}".format(stderr_log_txt))
self.stderr_log = open(self.stderr_log_txt, 'w+')
print("Names {} {}".format(self.stdout_log.name, self.stderr_log.name))
# have ability to pass in a specific command
command = "pdfgrep -r --include 'ASA*.pdf' 'ASA End Date'"
print("running {}".format(command))
process = subprocess.Popen(['pdfgrep','-r','--include','ASA*.pdf','ASA End Date'], shell=False, stdout=self.stdout_log, stderr=self.stderr_log, universal_newlines=True)
try:
process.wait(timeout=int(self.timeout))
self.result = "SUCCESS"
except subprocess.TimeoutExpired:
process.terminate()
self.result = "TIMEOUT"
self.stdout_log.close()
self.stderr_log.close()
return self.stdout_log_txt
def preprocess_data(self):
pass
# this method uses pandas dataframe - will use for data manipulation,
# the data mainupulation may be done in other manners
def datafile_to_dataframe(self):
# note the error_bad_lines=False will miss one of the lines
delimiter_list = [':']
try:
self.dataframe = pd.read_csv(self.stdout_log_txt, delimiter = [':'])
#self.dataframe = pd.read_csv(self.stdout_log_txt, sep = ':')
except:
print("one of the files may have a SN: in it need to correct ")
self.dataframe = pd.read_csv(self.stdout_log_txt, delimiter = ':', error_bad_lines=False)
#print(self.dataframe)
print("saving data to .csv")
# this removes the extention of .txt
self.pdf_search_csv= self.stdout_log_txt[:-4]
self.pdf_search_csv = self.pdf_search_csv + ".csv"
self.pdf_search_csv = self.dataframe.to_csv(self.pdf_search_csv,mode='w',index=False)
def main():
# arguments
parser = argparse.ArgumentParser(
prog='lf_pdf_search.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
lf_pdf_search.py : for running scripts listed in lf_check_config.ini file
''',
description='''\
lf_pdf_search.py
-----------
Summary :
---------
show renewas
''')
parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated", default="")
parser.add_argument('--logfile', help="--logfile <logfile Name> logging for output of lf_pdf_search script", default="lf_pdf_search.log")
args = parser.parse_args()
pdf_search = lf_pdf_search()
output_file = pdf_search.get_data()
pdf_search.datafile_to_dataframe()
print("output file: {}".format(str(output_file)))
print("END lf_pdf_search.py")
if __name__ == "__main__":
main()

View File

@@ -28,7 +28,7 @@
--radio 'radio==wiphy2,stations==1,ssid==TCH-XB7,ssid_pw==comcast123,security==wpa2' \
--radio 'radio==wiphy3,stations==1,ssid==TCH-XB7,ssid_pw==comcast123,security==wpa2' \
--radio 'radio==wiphy4,stations==1,ssid==TCH-XB7,ssid_pw==comcast123,security==wpa2' \
--endp_type lf_udp --ap_read --side_a_min_bps=20000 --side_b_min_bps=400000000 \
--endp_type lf_udp --ap_read --ap_stats --side_a_min_bps=20000 --side_b_min_bps=400000000 \
--attenuators 1.1.<serial number>.1 \
--atten_vals 20,21,40,41
@@ -104,6 +104,7 @@ class L3VariableTime(Realm):
lfclient_port=8080,
debug=False,
influxdb=None,
ap_scheduler_stats=False,
ap_read=False,
ap_port='/dev/ttyUSB0',
ap_baud='115200',
@@ -184,11 +185,14 @@ class L3VariableTime(Realm):
self.cx_profile.side_b_min_bps = side_b_min_rate[0]
self.cx_profile.side_b_max_bps = side_b_max_rate[0]
self.ap_scheduler_stats = ap_scheduler_stats
self.ap_read = ap_read
self.ap_port = ap_port
self.ap_baud = ap_baud
self.ap_cmd = ap_cmd
self.ap_test_mode = ap_test_mode
self.ap_umsched = ""
self.ap_msched = ""
# Lookup key is port-eid name
self.port_csv_files = {}
@@ -237,6 +241,12 @@ class L3VariableTime(Realm):
self.cx_profile.port = self.lfclient_port
self.cx_profile.name_prefix = self.name_prefix
def get_ap_umsched(self):
return self.ap_umsched
def get_ap_msched(self):
return self.ap_msched
def get_kpi_csv(self):
#print("self.csv_kpi_file {}".format(self.csv_kpi_file.name))
return self.csv_kpi_file.name
@@ -423,6 +433,21 @@ class L3VariableTime(Realm):
else:
self._pass("PASS: Stations & CX build finished: created/updated: %s stations and %s connections."%(self.station_count, self.cx_count))
def ap_custom_cmd(self,ap_custom_cmd):
ap_results = ""
try:
# configure the serial interface
ser = serial.Serial(self.ap_port, int(self.ap_baud), timeout=5)
ss = SerialSpawn(ser)
ss.sendline(str(ap_custom_cmd))
ss.expect([pexpect.TIMEOUT], timeout=1) # do not detete line, waits for output
ap_results = ss.before.decode('utf-8','ignore')
print("ap_custom_cmd: {} ap_results {}".format(ap_custom_cmd, ap_results))
except:
print("ap_custom_cmd: {} WARNING unable to read AP ".format(ap_custom_cmd))
return ap_results
def read_ap_stats(self):
# 5ghz: wl -i wl1 bs_data 2.4ghz# wl -i wl0 bs_data
ap_stats = ""
@@ -511,6 +536,9 @@ class L3VariableTime(Realm):
# Update connections with the new rate and pdu size config.
self.build(rebuild=True)
if self.ap_scheduler_stats:
self.ap_custom_cmd('wl -i wl1 dump_clear')
for atten_val in self.atten_vals:
if atten_val != -1:
for atten_idx in self.attenuators:
@@ -540,8 +568,6 @@ class L3VariableTime(Realm):
ap_row = []
ap_stats_col_titles = []
while cur_time < end_time:
#interval_time = cur_time + datetime.timedelta(seconds=5)
interval_time = cur_time + datetime.timedelta(seconds=self.polling_interval_seconds)
@@ -637,6 +663,14 @@ class L3VariableTime(Realm):
# At end of test step, record KPI information.
self.record_kpi(len(temp_stations_list), ul, dl, ul_pdu_str, dl_pdu_str, atten_val, total_dl_bps, total_ul_bps)
# At end of test if requested store upload and download stats
if self.ap_scheduler_stats:
# get the (UL) Upload scheduler statistics
self.ap_umsched += self.ap_custom_cmd('wl -i wl1 dump umsched')
# get the (DL) Download schduler staticstics
self.ap_msched += self.ap_custom_cmd('wl -i wl1 dump msched')
# Stop connections.
self.cx_profile.stop_cx();
self.multicast_profile.stop_mc();
@@ -911,6 +945,8 @@ python3 .\\test_l3_longevity.py --test_duration 4m --endp_type \"lf_tcp lf_udp m
parser.add_argument('--ap_port', help='--ap_port \'/dev/ttyUSB0\'',default='/dev/ttyUSB0')
parser.add_argument('--ap_baud', help='--ap_baud \'115200\'',default='115200')
parser.add_argument('--ap_cmd', help='ap_cmd \'wl -i wl1 bs_data\'', default="wl -i wl1 bs_data")
parser.add_argument('--ap_scheduler_stats', help='--ap_scheduler_stats flag to clear stats run test then dump ul and dl stats to file on ap', action='store_true')
parser.add_argument('--ap_test_mode', help='ap_test_mode flag present use ap canned data', action='store_true')
@@ -949,6 +985,12 @@ python3 .\\test_l3_longevity.py --test_duration 4m --endp_type \"lf_tcp lf_udp m
else:
ap_read = False
if args.ap_scheduler_stats:
ap_scheduler_stats = args.ap_scheduler_stats
else:
ap_scheduler_stats = False
if args.ap_test_mode:
ap_test_mode = args.ap_test_mode
else:
@@ -1126,6 +1168,7 @@ python3 .\\test_l3_longevity.py --test_duration 4m --endp_type \"lf_tcp lf_udp m
lfclient_port=lfjson_port,
debug=debug,
influxdb=influxdb,
ap_scheduler_stats=ap_scheduler_stats,
ap_read=ap_read,
ap_port=ap_port,
ap_baud=ap_baud,
@@ -1163,6 +1206,25 @@ python3 .\\test_l3_longevity.py --test_duration 4m --endp_type \"lf_tcp lf_udp m
#report.write_pdf(_page_size = 'A3', _orientation='Landscape')
report.write_pdf_with_timestamp(_page_size = 'A4', _orientation='Portrait')
# ap scheduler results and write to a file
if ap_scheduler_stats:
print("getting umsched and msched ap data and writing to a file")
file_date = report.get_date()
ap_umsched_data = ip_var_test.get_ap_umsched()
ap_umsched = "{}-{}".format(file_date,"ap_umsched.txt")
ap_umsched = report.file_add_path(ap_umsched)
ap_umsched_file = open(ap_umsched, "w")
ap_umsched_file.write(str(ap_umsched_data))
ap_umsched_file.close()
ap_msched_data = ip_var_test.get_ap_msched()
ap_msched = report.file_add_path("ap_msched.txt")
ap_msched = report.file_add_path(ap_msched)
ap_msched_file = open(ap_msched, "w")
ap_msched_file.write(str(ap_msched_data))
ap_msched_file.close()
#for csv_file in csv_list:
# print("Ouptput reports CSV list value: {}".format(str(csv_file)))

View File

@@ -156,8 +156,8 @@ def main():
lfjson_host = "localhost"
lfjson_port = 8080
# station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=4, padding_number_=10000)
station_list = ["sta0000", "sta0001"]
ip_powersave_test = L3PowersaveTraffic(lfjson_host, lfjson_port, ssid="jedway-open-149", security="open",
station_list = ["sta0000", "sta0001", "sta0002", "sta0003"]
ip_powersave_test = L3PowersaveTraffic(lfjson_host, lfjson_port, ssid="j-open-36", security="open",
password="[BLANK]", station_list=station_list, side_a_min_rate=2000,
side_b_min_rate=2000, side_a_max_rate=0,
side_b_max_rate=0, prefix="00000", test_duration="30s",

View File

@@ -5,15 +5,24 @@ NAME:
lf_check.py
PURPOSE:
Configuration for lf_check.py , runs various tests
lf_check.py will run a series of tests based on the test TEST_DICTIONARY listed in lf_check_config.ini.
The lf_check_config.ini file is copied from lf_check_config_template.ini and local configuration is made
to the lf_check_config.ini.
EXAMPLE:
lf_check.py
NOTES:
Before using lf_check.py
1. copy lf_check_config_template.ini to the lf_check_config.ini
2. update lf_check_config.ini to enable (TRUE) tests to be run in the TEST_DICTIONARY , the TEST_DICTIONARY needs to be passed in
TO DO NOTES:
6/4/2021 : add server (telnet localhost 4001) build info, GUI build shaw, and Kernel version to the output.
'''
import datetime
import pprint
import sys
if sys.version_info[0] != 3:
print("This script requires Python3")
@@ -21,17 +30,14 @@ if sys.version_info[0] != 3:
import os
import pexpect
import socket
import logging
import time
from time import sleep
import argparse
import json
from json import load
import configparser
from pprint import *
import subprocess
import re
import csv
import shutil
import os.path
@@ -51,6 +57,7 @@ RUN_CONDITION = 'ENABLE'
# setup logging FORMAT
FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s'
# lf_check class contains verificaiton configuration and ocastrates the testing.
class lf_check():
def __init__(self,
_csv_results,
@@ -75,6 +82,7 @@ class lf_check():
self.ftp_test_ip = ""
self.test_ip = ""
# section TEST_GENERIC
self.radio_lf = ""
self.ssdi = ""
self.ssid_pw = ""
@@ -88,11 +96,61 @@ class lf_check():
self.csv_results_writer = ""
self.csv_results_column_headers = ""
self.logger = logging.getLogger(__name__)
self.test_timeout = 20
self.test_timeout = 120
self.use_blank_db = "FALSE"
self.use_factory_default_db = "FALSE"
self.use_custom_db = "FALSE"
self.production_run = "FALSE"
self.email_list_production = ""
self.host_ip_production = None
self.email_list_test = ""
self.host_ip_test = None
# NOT complete : will send the email results
def send_results_email(self, report_file=None):
if (report_file is None):
print( "No report file, not sending email.")
return
report_url=report_file.replace('/home/lanforge/', '')
if report_url.startswith('/'):
report_url = report_url[1:]
# Following recommendation
# NOTE: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-from-nic-in-python
#command = 'echo "$HOSTNAME mail system works!" | mail -s "Test: $HOSTNAME $(date)" chuck.rekiere@candelatech.com'
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
message_txt = """Results from {hostname}:
http://{ip}/{report}
NOTE: for now to see stdout and stderr remove /home/lanforge from path.
""".format(hostname=hostname, ip=ip, report=report_url)
mail_subject = "Regression Test [{hostname}] {date}".format(hostname=hostname,
date=datetime.datetime.now())
try:
if self.production_run == "TRUE":
msg = message_txt.format(ip=self.host_ip_production)
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg,
subject=mail_subject,
ip=self.host_ip_production,
address=self.email_list_production)
else:
msg = message_txt.format(ip=ip)
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg,
subject=mail_subject,
ip=ip, #self.host_ip_test,
address=self.email_list_test)
print("running:[{}]".format(command))
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# have email on separate timeout
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
print("send email timed out")
process.terminate()
def get_csv_results(self):
return self.csv_file.name
@@ -132,6 +190,7 @@ class lf_check():
"""
# Functions in this section are/can be overridden by descendants
# This code reads the lf_check_config.ini file to populate the test variables
def read_config_contents(self):
self.logger.info("read_config_contents {}".format(CONFIG_FILE))
config_file = configparser.ConfigParser()
@@ -179,6 +238,11 @@ class lf_check():
self.use_factory_default_db = section['LOAD_FACTORY_DEFAULT_DB']
self.use_custom_db = section['LOAD_CUSTOM_DB']
self.custom_db = section['CUSTOM_DB']
self.production_run = section['PRODUCTION_RUN']
self.email_list_production = section['EMAIL_LIST_PRODUCTION']
self.host_ip_production = section['HOST_IP_PRODUCTION']
self.email_list_test = section['EMAIL_LIST_TEST']
self.host_ip_test = section['HOST_IP_TEST']
if 'RADIO_DICTIONARY' in config_file.sections():
section = config_file['RADIO_DICTIONARY']
@@ -188,9 +252,11 @@ class lf_check():
if 'TEST_DICTIONARY' in config_file.sections():
section = config_file['TEST_DICTIONARY']
# for json replace the \n and \r they are invalid json characters, allows for multiple line args
self.test_dict = json.loads(section.get('TEST_DICT', self.test_dict).replace('\n',' ').replace('\r',' '))
#self.logger.info("test_dict {}".format(self.test_dict))
try:
self.test_dict = json.loads(section.get('TEST_DICT', self.test_dict).replace('\n',' ').replace('\r',' '))
self.logger.info("TEST_DICTIONARY: {}".format(self.test_dict))
except:
self.logger.info("Excpetion loading TEST_DICTIONARY, is there comma after the last entry? Check syntax")
def load_factory_default_db(self):
#self.logger.info("file_wd {}".format(self.scripts_wd))
@@ -393,7 +459,9 @@ for running scripts listed in lf_check_config.ini
args = parser.parse_args()
# output report.
report = lf_report(_results_dir_name = "lf_check",_output_html="lf_check.html",_output_pdf="lf-check.pdf")
report = lf_report(_results_dir_name="lf_check",
_output_html="lf_check.html",
_output_pdf="lf-check.pdf")
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
csv_results = "lf_check{}-{}.csv".format(args.outfile,current_time)
@@ -434,6 +502,7 @@ for running scripts listed in lf_check_config.ini
# Generate Ouptput reports
report.set_title("LF Check: lf_check.py")
report.build_banner()
report.start_content_div()
report.set_table_title("LF Check Test Results")
report.build_table_title()
report.set_text("git sha: {}".format(git_sha))
@@ -445,27 +514,51 @@ for running scripts listed in lf_check_config.ini
print("html report: {}".format(html_report))
report.write_pdf_with_timestamp()
report_path = os.path.dirname(html_report)
parent_report_dir = os.path.dirname(report_path)
# copy results to lastest so someone may see the latest.
lf_check_latest_html = os.path.dirname(os.path.dirname(html_report)) + "/lf_check_latest.html"
lf_check_latest_html = parent_report_dir + "/lf_check_latest.html"
# duplicates html_report file up one directory
lf_check_html_report = os.path.dirname(os.path.dirname(html_report)) + "/{}.html".format(outfile)
lf_check_html_report = parent_report_dir + "/{}.html".format(outfile)
#
banner_src_png = os.path.dirname(html_report)+ "/banner.png"
banner_dest_png = os.path.dirname(os.path.dirname(html_report))+ "/banner.png"
CandelaLogo_src_png = os.path.dirname(html_report) + "/CandelaLogo2-90dpi-200x90-trans.png"
CandelaLogo_dest_png = os.path.dirname(os.path.dirname(html_report)) + "/CandelaLogo2-90dpi-200x90-trans.png"
banner_src_png = report_path + "/banner.png"
banner_dest_png = parent_report_dir + "/banner.png"
CandelaLogo_src_png = report_path + "/CandelaLogo2-90dpi-200x90-trans.png"
CandelaLogo_dest_png = parent_report_dir + "/CandelaLogo2-90dpi-200x90-trans.png"
report_src_css = report_path + "/report.css"
report_dest_css = parent_report_dir + "/report.css"
custom_src_css = report_path + "/custom.css"
custom_dest_css = parent_report_dir + "/custom.css"
font_src_woff = report_path + "/CenturyGothic.woff"
font_dest_woff = parent_report_dir + "/CenturyGothic.woff"
#pprint.pprint([
# ('banner_src', banner_src_png),
# ('banner_dest', banner_dest_png),
# ('CandelaLogo_src_png', CandelaLogo_src_png),
# ('CandelaLogo_dest_png', CandelaLogo_dest_png),
# ('report_src_css', report_src_css),
# ('custom_src_css', custom_src_css)
#])
# copy one directory above
shutil.copyfile(html_report,lf_check_latest_html)
shutil.copyfile(html_report,lf_check_html_report)
shutil.copyfile(html_report, lf_check_latest_html)
shutil.copyfile(html_report, lf_check_html_report)
# copy banner and logo
shutil.copyfile(banner_src_png, banner_dest_png)
shutil.copyfile(CandelaLogo_src_png,CandelaLogo_dest_png)
print("lf_check_latest.html: {}".format(lf_check_latest_html))
print("lf_check_html_report: {}".format(lf_check_html_report))
shutil.copyfile(banner_src_png, banner_dest_png)
shutil.copyfile(CandelaLogo_src_png, CandelaLogo_dest_png)
shutil.copyfile(report_src_css, report_dest_css)
shutil.copyfile(custom_src_css, custom_dest_css)
shutil.copyfile(font_src_woff, font_dest_woff)
print("lf_check_latest.html: "+lf_check_latest_html)
print("lf_check_html_report: "+lf_check_html_report)
check.send_results_email(report_file=lf_check_html_report)
if __name__ == '__main__':
main()

View File

@@ -1,32 +1,43 @@
#
# NAME : lf_check_config_template.ini
#
# PURPOSE : Configuration (lf_check_config.ini) information for running lf_check.py
#
# SETUP: copy lf_check_config_template.ini to lf_check_config.ini and update lf_check_config.ini
#
#
# NAME :
# lf_check_config_template.ini
# PURPOSE :
# The lf_check_config_template.ini is a template to be copied to the test configuration file : lf_check_config.ini which
# is used by the lf_check.py.
# The lf_check_config_template.in is devided into section that are used for test selection, test configuration or configuration of lanforge.
# Test arguments for the test do not need to use the pre-defined values. The command arguments can be entered directly.
# SETUP:
# Copy lf_check_config_template.ini to lf_check_config.ini
# 1. Update the lf_check_config.ini with the tests to be run by setting the enable flag to TRUE
# 2. TEST_DICTIONARY contains the test list: test key, test name, test arguments
# NOTE: each test dictionary key must be unique
# NOTE: { } placement important, will cause parcing errors
# radio configuraiton used below for test arguments
# NOTE: KEY must match ELEMENT of the DICTIONARY (RADIO_1_CFG == "KEY":"RADIO_1_CFG")
[RADIO_DICTIONARY]
RADIO_DICT: {
"RADIO_0_CFG":{"KEY":"RADIO_0_CFG","RADIO":"wiphy0","STATIONS":"4","SSID":"ssid-wpa2","PASSWD":"ssidpw-wpa2","SECURITY":"wpa2"},
"RADIO_1_CFG":{"KEY":"RADIO_1_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ct523c-vap","PASSWD":"ct523c-vap","SECURITY":"wpa2"},
"RADIO_2_CFG":{"KEY":"RADIO_2_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wpa","PASSWD":"ssidpw-wpa","SECURITY":"wpa"},
"RADIO_3_CFG":{"KEY":"RADIO_3_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wep","PASSWD":"ssidpw-wep","SECURITY":"wep"},
"RADIO_4_CFG":{"KEY":"RADIO_4_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wpa3","PASSWD":"ssidpw-wpa3","SECURITY":"wpa3"}
}
# NO quotes around parameters in TEST_PARAMETERS section
[TEST_PARAMETERS]
TEST_TIMEOUT = 200
LOAD_BLANK_DB = FALSE
LOAD_FACTORY_DEFAULT_DB = TRUE
LOAD_CUSTOM_DB = FALSE
CUSTOM_DB = DFLT_ETH1_GEN
PRODUCTION_RUN = FALSE # determine whom to send emails to
#EMAIL_LIST_PRODUCTION = scripters@candelatech.com
EMAIL_LIST_PRODUCTION = chuck.rekiere@candelatech.com
HOST_IP_PRODUCTION = 192.168.95.6
EMAIL_LIST_TEST = chuck.rekiere@candelatech.com
HOST_IP_TEST = 192.168.95.6
# Command line arguments: Configures network information used as inputs to test command line
[TEST_NETWORK]
HTTP_TEST_IP = "10.40.0.10"
FTP_TEST_IP = "10.40.0.10"
TEST_IP = "192.168.0.104"
# Command line arguments: LANForge configuration
# Also can use for single lanforge radio configuraiton , the RADIO_DICT may also be used for radio configuration
[TEST_GENERIC]
RADIO_USED = wiphy1
SSID_USED = ct523c-vap
@@ -36,14 +47,17 @@ NUM_STA = 4
COL_NAMES = name,tx_bytes,rx_bytes,dropped
UPSTREAM_PORT = eth1
# NO quotes around test parameters
# Note: please copy lf_check_config_template.ini - values used for lf_check.py script verification
[TEST_PARAMETERS]
TEST_TIMEOUT = 200
LOAD_BLANK_DB = FALSE
LOAD_FACTORY_DEFAULT_DB = TRUE
LOAD_CUSTOM_DB = FALSE
CUSTOM_DB = DFLT_ETH1_GEN
# Command line arguments
# radio configuraiton may also be done in the TEST_GENEERIC arguments
# NOTE: KEY must match ELEMENT of the DICTIONARY (RADIO_1_CFG == "KEY":"RADIO_1_CFG")
[RADIO_DICTIONARY]
RADIO_DICT: {
"RADIO_0_CFG":{"KEY":"RADIO_0_CFG","RADIO":"wiphy0","STATIONS":"4","SSID":"ssid-wpa2","PASSWD":"ssidpw-wpa2","SECURITY":"wpa2"},
"RADIO_1_CFG":{"KEY":"RADIO_1_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ct523c-vap","PASSWD":"ct523c-vap","SECURITY":"wpa2"},
"RADIO_2_CFG":{"KEY":"RADIO_2_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wpa","PASSWD":"ssidpw-wpa","SECURITY":"wpa"},
"RADIO_3_CFG":{"KEY":"RADIO_3_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wep","PASSWD":"ssidpw-wep","SECURITY":"wep"},
"RADIO_4_CFG":{"KEY":"RADIO_4_CFG","RADIO":"wiphy1","STATIONS":"4","SSID":"ssid-wpa3","PASSWD":"ssidpw-wpa3","SECURITY":"wpa3"}
}
# Not used
[LF_MGR]
@@ -55,9 +69,10 @@ LF_MGR_PORT=8080
#[REPORTS]
#REPORT_DIR="/home/lanforge/html-reports"
# TEST_DICTIONARY used by lf_check, Other section names will be ignored so can save other test lists
# TEST_DICTIONARY_ENABLE_1 is an example, it will not run unless the name is changed to TEST_DICTIONARY
[TEST_DICTIONARY_EXAMPLE_1]
#[TEST_DICTIONARY]
TEST_DICT: {
"test_ipv4_l4":{"enabled":"FALSE","command":"test_ipv4_l4.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --num_stations 4 --test_duration 15s --debug"},
"test_ipv4_variable_time2":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"--radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --test_duration 15s --output_format excel --layer3_cols name,tx_bytes,rx_bytes,dropped --traffic_type lf_udp --debug"}
@@ -65,19 +80,14 @@ TEST_DICT: {
# TEST_DICTIONARY used by lf_check, Other section names will be ignored so can save other test lists
[TEST_DICTIONARY_EXAMPLE_2]
#[TEST_DICTIONARY]
TEST_DICT: {
"create_l3":{"enabled":"TRUE","command":"create_l3.py","args":"RADIO_1_CFG --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"RADIO_1_CFG --debug"}
}
# This is an EXAMPLE dictionary of tests that can be run, copy to TEST_DICTIONARY to test.
# Feature update pass in the DICTIONARY name to be run
# NOTE: please do not edit this example it is used for lf_check.py script verification
# NOTE: generic tab needs to be enabled to allow for generic tests to pass
# Please enter addtional dictionaries after this one
[TEST_DICTIONARY_EXAMPLE_3]
#[TEST_DICTIONARY]
TEST_DICT: {
@@ -117,14 +127,16 @@ TEST_DICT: {
"wlan_capacity_calculator3":{"enabled":"TRUE","command":"./wlan_capacity_calculator.py","args":"-sta 11ac -t Voice -d 9 -spa 3 -ch 20 -gu 800 -high 1 -e TKIP -q Yes -ip 3 -mc 0 -b 6 12 24 54 -m 1518 -co Greenfield -cw 15 -rc Yes"}
}
# This LISA is used currelty for facilitating getting testing on LISA
[TEST_DICTIONARY_LISA_SHORT]
#[TEST_DICTIONARY]
TEST_DICT: {
"test_ipv4_l4":{"enabled":"FALSE","command":"test_ipv4_l4.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --num_stations 4 --test_duration 15s --debug"},
"test_ipv4_variable_time2":{"enabled":"TRUE","command":"test_ipv4_variable_time.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --test_duration 15s --output_format excel --layer3_cols name,tx_bytes,rx_bytes,dropped --traffic_type lf_udp --debug"}
"create_l3":{"enabled":"TRUE","command":"create_l3.py","args":"--radio RADIO_USED --ssid SSID_USED --passwd SSID_PW_USED --security SECURITY_USED --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"RADIO_1_CFG --debug"},
"create_l4":{"enabled":"TRUE","command":"create_l4.py","args":"--radio wiphy1 --ssid jedway-wpa2-x2048-5-3 --passwd jedway-wpa2-x2048-5-3 --security wpa2 --debug"}
}
#[TEST_DICTIONARY]
[TEST_DICTIONARY_LISA]
TEST_DICT: {

563
py-scripts/tools/lf_check_jbr.py Executable file
View File

@@ -0,0 +1,563 @@
#!/usr/bin/python3
'''
NAME:
lf_check.py
PURPOSE:
lf_check.py will run a series of tests based on the test TEST_DICTIONARY listed in lf_check_config.ini.
The lf_check_config.ini file is copied from lf_check_config_template.ini and local configuration is made
to the lf_check_config.ini.
EXAMPLE:
lf_check.py
NOTES:
Before using lf_check.py
1. copy lf_check_config_template.ini to the lf_check_config.ini
2. update lf_check_config.ini to enable (TRUE) tests to be run in the TEST_DICTIONARY , the TEST_DICTIONARY needs to be passed in
'''
import datetime
import pprint
import sys
if sys.version_info[0] != 3:
print("This script requires Python3")
exit()
import os
import socket
import logging
import time
from time import sleep
import argparse
import json
import configparser
import subprocess
import csv
import shutil
import os.path
# lf_report is from the parent of the current file
dir_path = os.path.dirname(os.path.realpath(__file__))
parent_dir_path = os.path.abspath(os.path.join(dir_path,os.pardir))
sys.path.insert(0, parent_dir_path)
#sys.path.append('../')
from lf_report import lf_report
sys.path.append('/')
CONFIG_FILE = os.getcwd() + '/lf_check_config.ini'
RUN_CONDITION = 'ENABLE'
# setup logging FORMAT
FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s'
# lf_check class contains verificaiton configuration and ocastrates the testing.
class lf_check():
def __init__(self,
_csv_results,
_outfile):
self.lf_mgr_ip = ""
self.lf_mgr_port = ""
self.radio_dict = {}
self.test_dict = {}
path_parent = os.path.dirname(os.getcwd())
os.chdir(path_parent)
self.scripts_wd = os.getcwd()
self.results = ""
self.outfile = _outfile
self.test_result = "Failure"
self.results_col_titles = ["Test","Command","Result","STDOUT","STDERR"]
self.html_results = ""
self.background_green = "background-color:green"
self.background_red = "background-color:red"
self.background_purple = "background-color:purple"
self.http_test_ip = ""
self.ftp_test_ip = ""
self.test_ip = ""
# section TEST_GENERIC
self.radio_lf = ""
self.ssdi = ""
self.ssid_pw = ""
self.security = ""
self.num_sta = ""
self.col_names = ""
self.upstream_port = ""
self.csv_results = _csv_results
self.csv_results_file = ""
self.csv_results_writer = ""
self.csv_results_column_headers = ""
self.logger = logging.getLogger(__name__)
self.test_timeout = 120
self.use_blank_db = "FALSE"
self.use_factory_default_db = "FALSE"
self.use_custom_db = "FALSE"
self.production_run = "FALSE"
self.email_list_production = ""
self.host_ip_production = None
self.email_list_test = ""
self.host_ip_test = None
# NOT complete : will send the email results
def send_results_email(self, report_file=None):
if (report_file is None):
print( "No report file, not sending email.")
return
report_url=report_file.replace('/home/lanforge/', '')
if report_url.startswith('/'):
report_url = report_url[1:]
# Following recommendation
# NOTE: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-from-nic-in-python
#command = 'echo "$HOSTNAME mail system works!" | mail -s "Test: $HOSTNAME $(date)" chuck.rekiere@candelatech.com'
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
message_txt = """Results from {hostname}:\\n
http://{ip}/{report}\\n
NOTE: for now to see stdout and stderr remove /home/lanforge from path.\\n
""".format(hostname=hostname, ip=ip, report=report_url)
mail_subject = "Regression Test [{hostname}] {date}".format(hostname=hostname,
date=datetime.datetime.now())
try:
if self.production_run == "TRUE":
msg = message_txt.format(ip=self.host_ip_production)
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg,
subject=mail_subject,
ip=self.host_ip_production,
address=self.email_list_production)
else:
msg = message_txt.format(ip=ip)
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg,
subject=mail_subject,
ip=ip, #self.host_ip_test,
address=self.email_list_test)
print("running:[{}]".format(command))
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# have email on separate timeout
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
print("send email timed out")
process.terminate()
def get_csv_results(self):
return self.csv_file.name
def start_csv_results(self):
print("self.csv_results")
self.csv_results_file = open(self.csv_results, "w")
self.csv_results_writer = csv.writer(self.csv_results_file, delimiter=",")
self.csv_results_column_headers = ['Test','Command','Result','STDOUT','STDERR']
self.csv_results_writer.writerow(self.csv_results_column_headers)
self.csv_results_file.flush()
def get_html_results(self):
return self.html_results
def start_html_results(self):
self.html_results += """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: left;">
<th>Test</th>
<th>Command</th>
<th>Result</th>
<th>STDOUT</th>
<th>STDERR</th>
</tr>
</thead>
<tbody>
"""
def finish_html_results(self):
self.html_results += """
</tbody>
</table>
<br>
<br>
<br>
"""
# Functions in this section are/can be overridden by descendants
# This code reads the lf_check_config.ini file to populate the test variables
def read_config_contents(self):
self.logger.info("read_config_contents {}".format(CONFIG_FILE))
config_file = configparser.ConfigParser()
success = True
success = config_file.read(CONFIG_FILE)
self.logger.info("logger worked")
if 'LF_MGR' in config_file.sections():
section = config_file['LF_MGR']
self.lf_mgr_ip = section['LF_MGR_IP']
self.lf_mgr_port = section['LF_MGR_PORT']
self.logger.info("lf_mgr_ip {}".format(self.lf_mgr_ip))
self.logger.info("lf_mgr_port {}".format(self.lf_mgr_port))
if 'TEST_NETWORK' in config_file.sections():
section = config_file['TEST_NETWORK']
self.http_test_ip = section['HTTP_TEST_IP']
self.logger.info("http_test_ip {}".format(self.http_test_ip))
self.ftp_test_ip = section['FTP_TEST_IP']
self.logger.info("ftp_test_ip {}".format(self.ftp_test_ip))
self.test_ip = section['TEST_IP']
self.logger.info("test_ip {}".format(self.test_ip))
if 'TEST_GENERIC' in config_file.sections():
section = config_file['TEST_GENERIC']
self.radio_lf = section['RADIO_USED']
self.logger.info("radio_lf {}".format(self.radio_lf))
self.ssid = section['SSID_USED']
self.logger.info("ssid {}".format(self.ssid))
self.ssid_pw = section['SSID_PW_USED']
self.logger.info("ssid_pw {}".format(self.ssid_pw))
self.security = section['SECURITY_USED']
self.logger.info("secruity {}".format(self.security))
self.num_sta = section['NUM_STA']
self.logger.info("num_sta {}".format(self.num_sta))
self.col_names = section['COL_NAMES']
self.logger.info("col_names {}".format(self.col_names))
self.upstream_port = section['UPSTREAM_PORT']
self.logger.info("upstream_port {}".format(self.upstream_port))
if 'TEST_PARAMETERS' in config_file.sections():
section = config_file['TEST_PARAMETERS']
self.test_timeout = section['TEST_TIMEOUT']
self.use_blank_db = section['LOAD_BLANK_DB']
self.use_factory_default_db = section['LOAD_FACTORY_DEFAULT_DB']
self.use_custom_db = section['LOAD_CUSTOM_DB']
self.custom_db = section['CUSTOM_DB']
self.production_run = section['PRODUCTION_RUN']
self.email_list_production = section['EMAIL_LIST_PRODUCTION']
self.host_ip_production = section['HOST_IP_PRODUCTION']
self.email_list_test = section['EMAIL_LIST_TEST']
self.host_ip_test = section['HOST_IP_TEST']
if 'RADIO_DICTIONARY' in config_file.sections():
section = config_file['RADIO_DICTIONARY']
self.radio_dict = json.loads(section.get('RADIO_DICT', self.radio_dict))
self.logger.info("self.radio_dict {}".format(self.radio_dict))
if 'TEST_DICTIONARY' in config_file.sections():
section = config_file['TEST_DICTIONARY']
# for json replace the \n and \r they are invalid json characters, allows for multiple line args
try:
self.test_dict = json.loads(section.get('TEST_DICT', self.test_dict).replace('\n',' ').replace('\r',' '))
self.logger.info("TEST_DICTIONARY: {}".format(self.test_dict))
except:
self.logger.info("Excpetion loading TEST_DICTIONARY, is there comma after the last entry? Check syntax")
def load_factory_default_db(self):
#self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
# no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load FACTORY_DFLT")
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# wait for the process to terminate
out, err = process.communicate()
errcode = process.returncode
# Not currently used
def load_blank_db(self):
#self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
# no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load BLANK")
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
def load_custom_db(self,custom_db):
#self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
# no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load {}".format(custom_db))
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# wait for the process to terminate
out, err = process.communicate()
errcode = process.returncode
def run_script_test(self):
self.start_html_results()
self.start_csv_results()
for test in self.test_dict:
if self.test_dict[test]['enabled'] == "FALSE":
self.logger.info("test: {} skipped".format(test))
# load the default database
elif self.test_dict[test]['enabled'] == "TRUE":
# Make the command replace ment a separate method call.
# loop through radios
for radio in self.radio_dict:
# Replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values)
# not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues
# --num_stations needs to be int not string (no double quotes)
if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.radio_dict[radio]["KEY"],'--radio {} --ssid {} --passwd {} --security {} --num_stations {}'
.format(self.radio_dict[radio]['RADIO'],self.radio_dict[radio]['SSID'],self.radio_dict[radio]['PASSWD'],self.radio_dict[radio]['SECURITY'],self.radio_dict[radio]['STATIONS']))
if 'HTTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP',self.http_test_ip)
if 'FTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP',self.ftp_test_ip)
if 'TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP',self.test_ip)
if 'RADIO_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED',self.radio_lf)
if 'SSID_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED',self.ssid)
if 'SSID_PW_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED',self.ssid_pw)
if 'SECURITY_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED',self.security)
if 'NUM_STA' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA',self.num_sta)
if 'COL_NAMES' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES',self.col_names)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',self.col_names)
if self.use_factory_default_db == "TRUE":
self.load_factory_default_db()
sleep(3)
self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT")
if self.use_blank_db == "TRUE":
self.load_blank_db()
sleep(1)
self.logger.info("BLANK loaded between tests with scenario.py --load BLANK")
if self.use_custom_db == "TRUE":
try:
self.load_custom_db(self.custom_db)
sleep(1)
self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db,self.custom_db))
except:
self.logger.info("custom database failed to load check existance and location")
else:
self.logger.info("no db loaded between tests: {}".format(self.use_custom_db))
sleep(1) # the sleep is to allow for the database to stablize
try:
os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
cmd_args = "{}".format(self.test_dict[test]['args'])
command = "./{} {}".format(self.test_dict[test]['command'], cmd_args)
self.logger.info("command: {}".format(command))
self.logger.info("cmd_args {}".format(cmd_args))
if self.outfile is not None:
stdout_log_txt = self.outfile
stdout_log_txt = stdout_log_txt + "-{}-stdout.txt".format(test)
#self.logger.info("stdout_log_txt: {}".format(stdout_log_txt))
stdout_log = open(stdout_log_txt, 'a')
stderr_log_txt = self.outfile
stderr_log_txt = stderr_log_txt + "-{}-stderr.txt".format(test)
#self.logger.info("stderr_log_txt: {}".format(stderr_log_txt))
stderr_log = open(stderr_log_txt, 'a')
print("running {}".format(command))
process = subprocess.Popen((command).split(' '), shell=False, stdout=stdout_log, stderr=stderr_log, universal_newlines=True)
try:
#out, err = process.communicate()
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
process.terminate()
self.test_result = "TIMEOUT"
#if err:
# self.logger.info("command Test timed out: {}".format(command))
#self.logger.info(stderr_log_txt)
if(self.test_result != "TIMEOUT"):
stderr_log_size = os.path.getsize(stderr_log_txt)
if stderr_log_size > 0 :
self.logger.info("File: {} is not empty: {}".format(stderr_log_txt,str(stderr_log_size)))
self.test_result = "Failure"
background = self.background_red
else:
self.logger.info("File: {} is empty: {}".format(stderr_log_txt,str(stderr_log_size)))
self.test_result = "Success"
background = self.background_green
else:
self.logger.info("TIMEOUT FAILURE, Check LANforge Radios")
self.test_result = "Time Out"
background = self.background_purple
self.html_results += """
<tr><td>""" + str(test) + """</td><td class='scriptdetails'>""" + str(command) + """</td>
<td style="""+ str(background) + """>""" + str(self.test_result) + """
<td><a href=""" + str(stdout_log_txt) + """ target=\"_blank\">STDOUT</a></td>"""
if self.test_result == "Failure":
self.html_results += """<td><a href=""" + str(stderr_log_txt) + """ target=\"_blank\">STDERR</a></td>"""
elif self.test_result == "Time Out":
self.html_results += """<td><a href=""" + str(stderr_log_txt) + """ target=\"_blank\">STDERR</a></td>"""
#self.html_results += """<td></td>"""
else:
self.html_results += """<td></td>"""
self.html_results += """</tr>"""
row = [test,command,self.test_result,stdout_log_txt,stderr_log_txt]
self.csv_results_writer.writerow(row)
self.csv_results_file.flush()
#self.logger.info("row: {}".format(row))
self.logger.info("test: {} executed".format(test))
else:
self.logger.info("enable value {} invalid for test: {}, test skipped".format(self.test_dict[test]['enabled'],test))
self.finish_html_results()
def main():
# arguments
parser = argparse.ArgumentParser(
prog='lf_check.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
lf_check.py : for running scripts listed in lf_check_config.ini file
''',
description='''\
lf_check.py
-----------
Summary :
---------
for running scripts listed in lf_check_config.ini
''')
parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated", default="")
parser.add_argument('--logfile', help="--logfile <logfile Name> logging for output of lf_check.py script", default="lf_check.log")
args = parser.parse_args()
# output report.
report = lf_report(_results_dir_name="lf_check",
_output_html="lf_check.html",
_output_pdf="lf-check.pdf")
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
csv_results = "lf_check{}-{}.csv".format(args.outfile,current_time)
csv_results = report.file_add_path(csv_results)
outfile = "lf_check-{}-{}".format(args.outfile,current_time)
outfile_path = report.file_add_path(outfile)
# lf_check() class created
check = lf_check(_csv_results = csv_results,
_outfile = outfile_path)
# get the git sha
process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
(commit_hash, err) = process.communicate()
exit_code = process.wait()
git_sha = commit_hash.decode('utf-8','ignore')
# set up logging
logfile = args.logfile[:-4]
print("logfile: {}".format(logfile))
logfile = "{}-{}.log".format(logfile,current_time)
logfile = report.file_add_path(logfile)
print("logfile {}".format(logfile))
formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(logfile, "w")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler(sys.stdout)) # allows to logging to file and stdout
logger.info("commit_hash: {}".format(commit_hash))
logger.info("commit_hash2: {}".format(commit_hash.decode('utf-8','ignore')))
check.read_config_contents() # CMR need mode to just print out the test config and not run
check.run_script_test()
# Generate Ouptput reports
report.set_title("LF Check: lf_check.py")
report.build_banner()
report.start_content_div()
report.set_table_title("LF Check Test Results")
report.build_table_title()
report.set_text("git sha: {}".format(git_sha))
report.build_text()
html_results = check.get_html_results()
report.set_custom_html(html_results)
report.build_custom()
html_report = report.write_html_with_timestamp()
print("html report: {}".format(html_report))
report.write_pdf_with_timestamp()
report_path = os.path.dirname(html_report)
parent_report_dir = os.path.dirname(report_path)
# copy results to lastest so someone may see the latest.
lf_check_latest_html = parent_report_dir + "/lf_check_latest.html"
# duplicates html_report file up one directory
lf_check_html_report = parent_report_dir + "/{}.html".format(outfile)
#
banner_src_png = report_path + "/banner.png"
banner_dest_png = parent_report_dir + "/banner.png"
CandelaLogo_src_png = report_path + "/CandelaLogo2-90dpi-200x90-trans.png"
CandelaLogo_dest_png = parent_report_dir + "/CandelaLogo2-90dpi-200x90-trans.png"
report_src_css = report_path + "/report.css"
report_dest_css = parent_report_dir + "/report.css"
custom_src_css = report_path + "/custom.css"
custom_dest_css = parent_report_dir + "/custom.css"
font_src_woff = report_path + "/CenturyGothic.woff"
font_dest_woff = parent_report_dir + "/CenturyGothic.woff"
#pprint.pprint([
# ('banner_src', banner_src_png),
# ('banner_dest', banner_dest_png),
# ('CandelaLogo_src_png', CandelaLogo_src_png),
# ('CandelaLogo_dest_png', CandelaLogo_dest_png),
# ('report_src_css', report_src_css),
# ('custom_src_css', custom_src_css)
#])
# copy one directory above
shutil.copyfile(html_report, lf_check_latest_html)
shutil.copyfile(html_report, lf_check_html_report)
# copy banner and logo
shutil.copyfile(banner_src_png, banner_dest_png)
shutil.copyfile(CandelaLogo_src_png, CandelaLogo_dest_png)
shutil.copyfile(report_src_css, report_dest_css)
shutil.copyfile(custom_src_css, custom_dest_css)
shutil.copyfile(font_src_woff, font_dest_woff)
print("lf_check_latest.html: "+lf_check_latest_html)
print("lf_check_html_report: "+lf_check_html_report)
check.send_results_email(report_file=lf_check_html_report)
if __name__ == '__main__':
main()

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# INCLUDE_IN_README
'''
NAME: update_dependencies.py
NAME: update_dependencies.py
PURPOSE: Installs python3 script package dependencies
@@ -18,10 +18,10 @@ def main():
packages_installed = []
packages_failed =[]
for package in packages:
command = "pip3 install {} ".format(package)
command = "pip3 install {} >/tmp/pip3-stdout 2>/tmp/pip3-stderr".format(package)
res = subprocess.call(command, shell = True)
if res == 0:
print("Package {} install SUCCESS Returned Value: {} ".format(package, res))
#print("Package {} install SUCCESS Returned Value: {} ".format(package, res))
packages_installed.append(package)
else:
print("Package {} install FAILED Returned Value: {} ".format(package, res))
@@ -30,6 +30,8 @@ def main():
print("Install Complete")
print("Packages Installed Success: {}\n".format(packages_installed))
if not packages_failed:
return
print("Packages Failed (Some scripts may not need these packages): {}".format(packages_failed))
if __name__ == "__main__":