removing dfs script from public repo

This commit is contained in:
Nikita Yadav
2021-02-18 01:45:54 +05:30
parent 9e3d8d958c
commit d1c67870f6

View File

@@ -1,695 +0,0 @@
""" This script is used for DFS Testing on various radar types and user dependent channels after completion of the test it gives a nice pdf test report under folder html_reports/dfs/
it uses various input arguments which can be seen by ./x.py --help
eg - sudo python3 dfs_11.py -i 192.168.200.141 -u root -p Netgear@123xzsawq@! -hst localhost -s TestAP22 -pwd [BLANK] -sec open -rad wiphy0 --name WAC505 --fcctypes FCCO --channel 60
date - 11- feb - 2021
--Nikita Yadav
"""
import os
import signal
import paramiko
import time
import threading
import subprocess
from station_layer3 import STATION
import argparse
from threading import Thread
from itertools import islice
import datetime
from datetime import datetime
import pexpect
import sys
from itertools import islice
from matplotlib import pyplot as plt
import numpy as np
from html_template import *
import logging
# import pdfkit
class DFS_TEST:
def __init__(self, ip, user, pswd, host, ssid, passwd, security, radio):
self.ip = ip
self.user = user
self.pswd = pswd
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
def set_channel_in_ap_at_(self, ip, user, pswd, channel):
self.ip = ip
self.user = user
self.pswd = pswd
channel = str(channel)
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
ssh.connect(ip, port=22, username=user, password=pswd)
command = "conf_set system:wlanSettings:wlanSettingTable:wlan1:channel " + channel
stdin, stdout, stderr = ssh.exec_command(str(command))
output = stdout.readlines()
# print('\n'.join(output))
time.sleep(10)
def monitor_station_(self, host, ssid, passwd, security, radio, channel):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
print(channel)
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = (var['interface']['channel'])
# print(var_1)
if var_1 == channel:
print("channel at" + channel)
return var_1
else:
print("wait untill channel assigned")
timeout = time.time() + 60 * 15
while var_1 != channel:
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
time.sleep(1)
if time.time() > timeout:
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
break
return var_1
def create_station_cx(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
cmd = "python3 station_layer3.py -hst localhost -s " + self.ssid + " -pwd " + self.passwd + " -sec " + self.security + " -rad " + self.radio
os.chdir('/home/lanforge/lanforge-scripts/py-scripts')
os.system(cmd)
def monitor_untill_channel_assigned(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = (var['interface']['channel'])
timeout = time.time() + 60 * 15 # 15min
while var_1 == "-1":
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
time.sleep(1)
if time.time() > timeout:
break
return var_1
def generate_radar_at_ch(self, width_, interval_, count_, frequency):
current_time = datetime.now()
print("Current date and time : ")
current_time = current_time.strftime("%b %d %H:%M:%S")
print("time stamp of radar send", current_time)
# print("width", width_)
os.chdir('/usr/lib64/python2.7/site-packages/')
frequency = str(frequency)
width_ = str(width_)
interval_ = str(interval_)
count_ = str(count_)
command_hackRF = "sudo python lf_hackrf.py --pulse_width " + width_ + " --pulse_interval " + interval_ + " --pulse_count " + count_ + " --sweep_time 1000 --freq " + frequency
command_hackRF = str(command_hackRF)
ch = pexpect.spawn(command_hackRF)
time.sleep(5)
ch.expect('>>>')
ch.sendline('s')
ch.expect('>>>')
ch.sendline('q')
time.sleep(1)
return current_time
def client_connect_time(self, host, ssid, passwd, security, radio, channel):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = (var['interface']['channel'])
timeout_ = time.time() + 60 * 2 # 2min
while var_1 != "-1" :
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
time.sleep(1)
if time.time() > timeout_:
break
if var_1 == "-1" :
current_time = datetime.now()
print("Current date and time : ")
disconnect_time = current_time.strftime("%b %d %H:%M:%S")
print("station disconnection time", disconnect_time)
timeout = time.time() + 60 * 15 # 15min
while var_1 == "-1":
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
time.sleep(1)
if time.time() > timeout:
break
current_time1 = datetime.now()
print("Current date and time : ")
connect_time = current_time1.strftime("%b %d %H:%M:%S")
print("station connection time", connect_time)
"""var_2 = obj1.json_get("/cx/L3Teststa0000-0")
# print(type(var))
var_1 = (var_2['L3Teststa0000-0']['state'])
timeout = time.time() + 60 * 2 # 15min
while var_1 != "Run":
var = obj1.json_get("/cx/L3Teststa0000-0")
# print(type(var))
var_1 = (var['L3Teststa0000-0']['state'])
time.sleep(1)
if time.time() > timeout:
break
current_time_ = datetime.now()
print("Current date and time : ")
traffic_time1 = current_time_.strftime("%H:%M:%S")
print("time at which traffic started", traffic_time1)"""
s1 = disconnect_time
s2 = connect_time # for example
FMT = '%b %d %H:%M:%S'
c_time = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT)
print("connection time ", c_time)
# print("detection time for radar", tdelta)
# detection_time_lst.append(tdelta)
lst = str(c_time).split(":")
seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2])
c_time = seconds
print("connection time ", c_time)
else:
disconnect_time = "00:00:00"
print("station disconnection time", disconnect_time)
connect_time = "00:00:00"
print("time at which traffic started", connect_time)
c_time = 0
print("connection time ", c_time)
return c_time
def channel_after_radar(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = (var['interface']['channel'])
return var_1
def monitor_cx(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/cx/L3Teststa0000-0")
# print(type(var))
var_1 = (var['L3Teststa0000-0']['state'])
timeout = time.time() + 60 * 2 # 15min
while var_1 != "Run":
var = obj1.json_get("/cx/L3Teststa0000-0")
# print(type(var))
var_1 = (var['L3Teststa0000-0']['state'])
time.sleep(1)
if time.time() > timeout:
break
# print(var_1)
return var_1
def monitor_untill_connection_time(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=cx%20time%20(us)")
var_1 = (var['interface']['cx time (us)'])
return var_1
def check_Radar_time(self, ip, user, pswd):
self.ip = ip
self.user = user
self.pswd = pswd
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
ssh.connect(ip, port=22, username=user, password=pswd)
stdin, stdout, stderr = ssh.exec_command('cat /tmp/log/messages | grep Radar')
output = stdout.readlines()
# print('\n'.join(output))
time.sleep(1)
return output
def monitor_channel_available_time(self, ip, user, pswd):
self.ip = ip
self.user = user
self.pswd = pswd
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
ssh.connect(ip, port=22, username=user, password=pswd)
stdin, stdout, stderr = ssh.exec_command('iwlist channel ')
output = stdout.readlines()
# print('\n'.join(output))
time.sleep(1)
return output
def run():
parser = argparse.ArgumentParser(description="Netgear AP DFS Test Script")
parser.add_argument('-i', '--ip', type=str, help='AP ip')
parser.add_argument('-u', '--user', type=str, help='credentials login/username')
parser.add_argument('-p', '--pswd', type=str, help='credential password')
parser.add_argument('-hst', '--host', type=str, help='host name')
parser.add_argument('-s', '--ssid', type=str, help='ssid for client')
parser.add_argument('-pwd', '--passwd', type=str, help='password to connect to ssid')
parser.add_argument('-sec', '--security', type=str, help='security')
parser.add_argument('-rad', '--radio', type=str, help='radio at which client will be connected')
parser.add_argument('-n', '--name', type=str, help='Type Name of AP on which test is performed')
parser.add_argument( '--fcctypes', nargs="+", default= ["FCC0", "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6"], help='types needed to be tested {FCC0/FCC1/FCC2/FCC3/FCC4/FCC5/ETSI1/ETSI2/ETSI3/ETSI4/ETSI5/ETSI6}')
parser.add_argument('--channel', '--channel', nargs="+", default=[52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 136, 140], help='channel options need to be tested {52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124 ,128, 132, 136, 140}')
parser.add_argument('--radar', type=str, default=True, help="To check only for radar types")
parser.add_argument('--cx', type=str, default=True, help="Test run and check only till client connection time")
parser.add_argument('--traffic', type=str, default=True, help="Test run till traffic and check the traffic time")
'''#add(connect_client- true or false)
#add(run-traffic - t/false)
#check_only radar'''
logging.basicConfig(filename= '/home/lanforge/html-reports/dfs/dfs.log', filemode='w', format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.warning('Test Started')
test_time = datetime.now()
test_time = test_time.strftime("%b %d %H:%M:%S")
print("Test started at ", test_time )
cmd = "Test started at " + str(test_time)
logging.warning(str(cmd))
args = parser.parse_args()
if (args.name is not None):
AP_name = args.name
if (args.ssid is not None):
ssid = args.ssid
#time.sleep(1800)
dfs = DFS_TEST(args.ip, args.user, args.pswd, args.host, args.ssid, args.passwd, args.security, args.radio)
# To create a stations
logging.warning("creating station and generating traffic")
dfs.create_station_cx(args.host, args.ssid, args.passwd, args.security, args.radio)
# "FCC0", "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6"
fcc_types = args.fcctypes
result_data = dict.fromkeys(fcc_types)
# set channel
width_ = ""
interval_ = ""
count_ = ""
"""types = {"FCC0": {"width_": "1", "interval_": "1428", "count_": "18"},
"FCC1": {"width_": "1", "interval_": "1163", "count_": "18"},
"FCC2": {"width_": "2", "interval_": "208", "count_": "28"},
"FCC3": {"width_": "7", "interval_": "365", "count_": "16"},
"FCC4": {"width_": "16", "interval_": "271", "count_": "12"},
"FCC5": {"width_": "70", "interval_": "1975", "count_": "3"},
"ETSI1": {"width_": "5", "interval_": "342", "count_": "10"},
"ETSI2": {"width_": "2", "interval_": "1271", "count_": "15"},
"ETSI3": {"width_": "15", "interval_": "3280", "count_": "25"},
"ETSI4": {"width_": "24", "interval_": "2477", "count_": "20"},
"ETSI5": {"width_": "1", "interval_": "356", "count_": "10"},
"ETSI6": {"width_": "2", "interval_": "1091", "count_": "15"}, }"""
for fcc in fcc_types:
if fcc == "FCCO":
width = "1"
interval = "1428"
count = "18"
width_ = width
print("width", width_)
interval_ = interval
count_ = count
elif fcc == "FCC1":
width = "1"
interval = "1163"
count = "18"
width_ = width
interval_ = interval
count_ = count
elif fcc == "FCC2":
width = "2"
interval = "208"
count = "28"
width_ = width
interval_ = interval
count_ = count
elif fcc == "FCC3":
width = "7"
interval = "365"
count = "16"
width_ = width
interval_ = interval
count_ = count
elif fcc == "FCC4":
width = "16"
interval = "271"
count = "12"
width_ = width
interval_ = interval
count_ = count
elif fcc == "FCC5":
width = "70"
interval = "1975"
count = "3"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI1":
width = "5"
interval = "342"
count = "10"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI2":
width = "2"
interval = "1271"
count = "15"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI3":
width = "15"
interval = "3280"
count = "25"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI4":
width = "24"
interval = "2477"
count = "20"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI5":
width = "1"
interval = "356"
count = "10"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI6":
width = "2"
interval = "1091"
count = "15"
width_ = width
interval_ = interval
count_ = count
switched_ch_lst = []
detection_time_lst = []
connection_time_lst = []
radar_lst = []
channel_list = ["52", "56", "60", "64", "100", "104", "108", "112", "116", "120", "124" ,"128", "132", "136", "140"]
channel_values = args.channel
#print(channel_values)
result_data[fcc] = dict.fromkeys(["switched_ch_lst", "detection_time_lst", "radar_lst", "connection_time_lst"])
result_data[fcc]["switched_ch_lst"] = []
result_data[fcc]["detection_time_lst"] = []
result_data[fcc]["radar_lst"] = []
result_data[fcc]["connection_time_lst"] = []
#print("under", result_data)
for channel_s in channel_values:
print("set channel to ", channel_s)
x = "set channel to " + str(channel_s)
logging.warning(str(x))
dfs.set_channel_in_ap_at_(args.ip, args.user, args.pswd, channel_s)
print("channel set checking....")
logging.warning('channel set checking....')
monitoring_station = dfs.monitor_station_(args.host, args.ssid, args.passwd, args.security, args.radio,
channel=str(channel_s))
monitoring_station = str(monitoring_station)
print(("channel set to ", monitoring_station))
y = "channel set to " + str(monitoring_station)
logging.warning(str(y))
channel_s = str(channel_s)
if monitoring_station == channel_s:
print("station allocated to " + monitoring_station)
z = "station allocated to " + str(monitoring_station)
logging.warning(str(z))
print("now generate radar on " + channel_s)
q = "now generate radar on " + str(channel_s)
logging.warning(str(q))
frequency = {"52": "5260000", "56": "5280000", "60": "5300000", "64": "5320000", "100": "5500000",
"104": "5520000", "108": "5540000", "112": "5560000", "116": "5580000", "120": "5600000",
"124": "5620000",
"128": "5640000", "132": "5660000", "136": "5680000", "140": "5700000"}
# now radar generate
generating_radar = dfs.generate_radar_at_ch(width_, interval_, count_, frequency[channel_s])
client_connection_time = dfs.client_connect_time(args.host, args.ssid, args.passwd, args.security,
args.radio, channel_s)
print("client connection time ", client_connection_time)
w = "client connection time " + str(client_connection_time)
logging.warning(str(w))
connection_time_lst.append(client_connection_time)
time.sleep(2)
logging.warning('checking AP logs')
ap_logs = dfs.check_Radar_time(args.ip, args.user, args.pswd)
# print(type(ap_logs))
N = 2
res = list(islice(reversed(ap_logs), 0, N))
res.reverse()
if (str(res).__contains__("Radar detected on channel " + channel_s)):
data = "YES"
radar_lst.append(data)
print("yes")
logging.warning("YES")
if data == "YES":
print("checking channel assigned...")
detecting_radar_channel = dfs.channel_after_radar(args.host, args.ssid, args.passwd,
args.security,
args.radio)
# print("after radar channel is at ", detecting_radar_channel)
if detecting_radar_channel == "-1" or detecting_radar_channel == "AUTO" or detecting_radar_channel == channel_s:
print("TEST Fail")
logging.warning("Test FAIL")
print("client is at AUTO Channel")
logging.warning("client is at AUTO Channel")
pass_fail = "FAIL"
ch_af_radar = "AUTO"
print("after radar channel is at ", ch_af_radar)
x = "after radar channel is at " + str(ch_af_radar)
logging.warning(str(x))
switched_ch_lst.append(ch_af_radar)
print(pass_fail)
continue
else:
print("Test Pass")
logging.warning("TEST PASS ")
pass_fail = "PASS"
ch_af_radar = detecting_radar_channel
print("after radar channel is at ", ch_af_radar)
Y = "after radar channel is at " + str(ch_af_radar)
logging.warning(str(Y))
switched_ch_lst.append(ch_af_radar)
logging.warning("Checking AP logs")
ap_logs_check = dfs.check_Radar_time(args.ip, args.user, args.pswd)
#print(type(ap_logs_check))
N = 2
res = list(islice(reversed(ap_logs_check), 0, N))
res.reverse()
for i in res:
if (str(res).__contains__("Radar detected on channel " + channel_s)):
print("YES")
x = res.index(i)
print("raadar time at index", x)
r = "raadar time at index" + str(x)
logging.warning(str(r))
r_time = " "
for i in res[x][4:19]:
r_time = r_time + i
r_time = r_time.strip()
print("radar detected time", r_time)
r_ = "radar detected time" + str(r_time)
logging.warning(str(r_))
# calculation
s1 = generating_radar
s2 = r_time # for example
FMT = '%b %d %H:%M:%S'
tdelta = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT)
# print("detection time for radar", tdelta)
# detection_time_lst.append(tdelta)
lst = str(tdelta).split(":")
seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2])
d_time = seconds
print("detection time ", d_time)
detection_time_lst.append(d_time)
t = "detection time " + str(d_time)
logging.warning(str(t))
else:
data = "NO"
pass_fail = "FAIL"
radar_lst.append(data)
d_time = "0"
print("detection time ", d_time)
x = "detection time " + str(d_time)
logging.warning(str(x))
detection_time_lst.append(0)
switched_ch_lst.append(" - ")
print("no")
# radar detected(yes/no)
else:
#monitoring_station = str(monitoring_station)
radar_lst.append("NO")
switched_ch_lst.append("X")
detection_time_lst.append(0)
connection_time_lst.append(0)
#print(("channel set to ", monitoring_station))
print("switched channel list ", switched_ch_lst)
s = "switched channel list " + str(switched_ch_lst)
logging.warning(str(x))
print("radar detection list", radar_lst)
r = "radar detection list" + str(radar_lst)
logging.warning(str(r))
print("detection time list", detection_time_lst)
d = "detection time list" + str(detection_time_lst)
logging.warning(str(d))
print("connection time list", connection_time_lst)
c = "connection time list" + str(connection_time_lst)
logging.warning(str(c))
result_data[fcc]["switched_ch_lst"] = switched_ch_lst
result_data[fcc]["radar_lst"] = radar_lst
result_data[fcc]["detection_time_lst"] = detection_time_lst
result_data[fcc]["connection_time_lst"] = connection_time_lst
print("result ", result_data)
res = "result " + str(result_data)
logging.warning(str(res))
#time.sleep(1800)
print("final result", result_data)
res = "final result " + str(result_data)
logging.warning(str(res))
print("Test Finished")
test_end = datetime.now()
test_end = test_end.strftime("%b %d %H:%M:%S")
print("Test ended at ", test_end)
s1 = test_time
s2 = test_end # for example
FMT = '%b %d %H:%M:%S'
test_duration = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT)
print("total test duration ", test_duration)
# # Try changing values for the same
date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0]
test_setup_info = {
"AP Name": args.name,
"SSID": args.ssid,
"Number of Stations": "1",
"Test Duration": test_duration
}
input_setup_info = {
"IP" : args.ip,
"user" : args.user,
"Radartypes" : args.fcctypes,
"Channel" : args.channel,
"Contact" : "support@candelatech.com"
}
generate_report(result_data,
date,
test_setup_info,
input_setup_info,
test_channel = channel_values,
graph_path="/home/lanforge/html-reports/dfs")
graph_path = "/home/lanforge/html-reports/dfs"
if __name__ == '__main__':
run()