main dfs code

This commit is contained in:
Nikita Yadav
2021-02-08 15:14:10 +05:30
parent 3bd2ca193c
commit d2c67bf944

612
py-scripts/new_test.py Normal file
View File

@@ -0,0 +1,612 @@
""" This the main file from where dfs test run """
import os
import signal
import paramiko
import time
import threading
import subprocess
from station_layer3 import STATION
import argparse
from threading import Thread
from itertools import islice
import datetime
from datetime import datetime
import pexpect
import sys
from itertools import islice
from matplotlib import pyplot as plt
import numpy as np
# import pdfkit
class DFS_TEST:
def __init__(self, ip, user, pswd, host, ssid, passwd, security, radio):
self.ip = ip
self.user = user
self.pswd = pswd
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
def set_channel_in_ap_at_(self, ip, user, pswd, channel):
self.ip = ip
self.user = user
self.pswd = pswd
channel = str(channel)
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
ssh.connect(ip, port=22, username=user, password=pswd)
command = "conf_set system:wlanSettings:wlanSettingTable:wlan1:channel " + channel
stdin, stdout, stderr = ssh.exec_command(str(command))
output = stdout.readlines()
# print('\n'.join(output))
time.sleep(10)
def monitor_station_(self, host, ssid, passwd, security, radio, channel):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = (var['interface']['channel'])
if var_1 == channel:
print("channel at" + channel)
return var_1
else:
print("wait untill channel assigned")
timeout = time.time() + 60 * 10
while var_1 != channel:
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
time.sleep(1)
if time.time() > timeout:
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
break
return var_1
def create_station_cx(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
cmd = "python3 station_layer3.py -hst localhost -s " + self.ssid + " -pwd " + self.passwd + " -sec " + self.security + " -rad " + self.radio
os.chdir('/home/lanforge/lanforge-scripts/py-scripts')
os.system(cmd)
def monitor_untill_channel_assigned(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = (var['interface']['channel'])
timeout = time.time() + 60 * 15 # 15min
while var_1 == "-1":
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
time.sleep(1)
if time.time() > timeout:
break
return var_1
def generate_radar_at_ch(self, width_, interval_, count_, frequency):
current_time = datetime.now()
print("Current date and time : ")
current_time = current_time.strftime("%b %d %H:%M:%S")
print("time stamp of radar send", current_time)
# print("width", width_)
os.chdir('/usr/lib64/python2.7/site-packages/')
frequency = str(frequency)
width_ = str(width_)
interval_ = str(interval_)
count_ = str(count_)
command_hackRF = "sudo python lf_hackrf.py --pulse_width " + width_ + " --pulse_interval " + interval_ + " --pulse_count " + count_ + " --sweep_time 1000 --freq " + frequency
command_hackRF = str(command_hackRF)
ch = pexpect.spawn(command_hackRF)
time.sleep(5)
ch.expect('>>>')
ch.sendline('s')
ch.expect('>>>')
ch.sendline('q')
time.sleep(1)
return current_time
def client_connect_time(self, host, ssid, passwd, security, radio, channel):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = (var['interface']['channel'])
timeout_ = time.time() + 60 * 2 # 2min
while var_1 != "-1" :
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
time.sleep(1)
if time.time() > timeout_:
break
if var_1 == "-1" :
current_time = datetime.now()
print("Current date and time : ")
disconnect_time = current_time.strftime("%b %d %H:%M:%S")
print("station disconnection time", disconnect_time)
timeout = time.time() + 60 * 15 # 15min
while var_1 == "-1":
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
time.sleep(1)
if time.time() > timeout:
break
current_time1 = datetime.now()
print("Current date and time : ")
connect_time = current_time1.strftime("%b %d %H:%M:%S")
print("station connection time", connect_time)
"""var_2 = obj1.json_get("/cx/L3Teststa0000-0")
# print(type(var))
var_1 = (var_2['L3Teststa0000-0']['state'])
timeout = time.time() + 60 * 2 # 15min
while var_1 != "Run":
var = obj1.json_get("/cx/L3Teststa0000-0")
# print(type(var))
var_1 = (var['L3Teststa0000-0']['state'])
time.sleep(1)
if time.time() > timeout:
break
current_time_ = datetime.now()
print("Current date and time : ")
traffic_time1 = current_time_.strftime("%H:%M:%S")
print("time at which traffic started", traffic_time1)"""
s1 = disconnect_time
s2 = connect_time # for example
FMT = '%b %d %H:%M:%S'
c_time = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT)
print("connection time ", c_time)
# print("detection time for radar", tdelta)
# detection_time_lst.append(tdelta)
lst = str(c_time).split(":")
seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2])
c_time = seconds
print("connection time ", c_time)
else:
disconnect_time = "00:00:00"
print("station disconnection time", disconnect_time)
connect_time = "00:00:00"
print("time at which traffic started", connect_time)
c_time = 0
print("connection time ", c_time)
return c_time
def channel_after_radar(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=channel")
var_1 = (var['interface']['channel'])
return var_1
def monitor_cx(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/cx/L3Teststa0000-0")
# print(type(var))
var_1 = (var['L3Teststa0000-0']['state'])
timeout = time.time() + 60 * 2 # 15min
while var_1 != "Run":
var = obj1.json_get("/cx/L3Teststa0000-0")
# print(type(var))
var_1 = (var['L3Teststa0000-0']['state'])
time.sleep(1)
if time.time() > timeout:
break
# print(var_1)
return var_1
def monitor_untill_connection_time(self, host, ssid, passwd, security, radio):
self.host = host
self.ssid = ssid
self.passwd = passwd
self.security = security
self.radio = radio
obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio)
var = obj1.json_get("/port/1/1/sta0000?fields=cx%20time%20(us)")
var_1 = (var['interface']['cx time (us)'])
return var_1
def check_Radar_time(self, ip, user, pswd):
self.ip = ip
self.user = user
self.pswd = pswd
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
ssh.connect(ip, port=22, username=user, password=pswd)
stdin, stdout, stderr = ssh.exec_command('cat /tmp/log/messages | grep Radar')
output = stdout.readlines()
# print('\n'.join(output))
time.sleep(1)
return output
def monitor_channel_available_time(self, ip, user, pswd):
self.ip = ip
self.user = user
self.pswd = pswd
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
ssh.connect(ip, port=22, username=user, password=pswd)
stdin, stdout, stderr = ssh.exec_command('iwlist channel ')
output = stdout.readlines()
# print('\n'.join(output))
time.sleep(1)
return output
def run():
parser = argparse.ArgumentParser(description="Netgear AP DFS Test Script")
parser.add_argument('-i', '--ip', type=str, help='AP ip')
parser.add_argument('-u', '--user', type=str, help='credentials login/username')
parser.add_argument('-p', '--pswd', type=str, help='credential password')
parser.add_argument('-hst', '--host', type=str, help='host name')
parser.add_argument('-s', '--ssid', type=str, help='ssid for client')
parser.add_argument('-pwd', '--passwd', type=str, help='password to connect to ssid')
parser.add_argument('-sec', '--security', type=str, help='security')
parser.add_argument('-rad', '--radio', type=str, help='radio at which client will be connected')
parser.add_argument('-n', '--name', type=str, help='Type Name of AP on which test is performed')
#add(connect_client- true or false)
#add(run-traffic - t/false)
#check_only radar
#
args = parser.parse_args()
if (args.name is not None):
AP_name = args.name
if (args.ssid is not None):
ssid = args.ssid
time.sleep(1800)
dfs = DFS_TEST(args.ip, args.user, args.pswd, args.host, args.ssid, args.passwd, args.security, args.radio)
# To create a stations
dfs.create_station_cx(args.host, args.ssid, args.passwd, args.security, args.radio)
fcc_types = ["FCCO", "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6"]
# , "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6"
result_data = dict.fromkeys(fcc_types)
# set channel
width_ = ""
interval_ = ""
count_ = ""
print(result_data)
for fcc in fcc_types:
# 1, 2, 3, 4, 5, 11, 12, 13, 14, 15, 16
# "FCCO" , "FCC1" , "FCC2" , "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6"
if fcc == "FCCO":
width = "1"
interval = "1428"
count = "18"
width_ = width
print("width", width_)
interval_ = interval
count_ = count
elif fcc == "FCC1":
width = "1"
interval = "1163"
count = "18"
width_ = width
interval_ = interval
count_ = count
elif fcc == "FCC2":
width = "2"
interval = "208"
count = "28"
width_ = width
interval_ = interval
count_ = count
elif fcc == "FCC3":
width = "7"
interval = "365"
count = "16"
width_ = width
interval_ = interval
count_ = count
elif fcc == "FCC4":
width = "16"
interval = "271"
count = "12"
width_ = width
interval_ = interval
count_ = count
elif fcc == "FCC5":
width = "70"
interval = "1975"
count = "3"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI1":
width = "5"
interval = "342"
count = "10"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI2":
width = "2"
interval = "1271"
count = "15"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI3":
width = "15"
interval = "3280"
count = "25"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI4":
width = "24"
interval = "2477"
count = "20"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI5":
width = "1"
interval = "356"
count = "10"
width_ = width
interval_ = interval
count_ = count
elif fcc == "ETSI6":
width = "2"
interval = "1091"
count = "15"
width_ = width
interval_ = interval
count_ = count
switched_ch_lst = []
detection_time_lst = []
connection_time_lst = []
radar_lst = []
channel_values = [52, 56, 60, 64, 100,104, 108, 112, 116, 120, 124 ,128, 132, 136, 140]
result_data[fcc] = dict.fromkeys(["switched_ch_lst", "detection_time_lst", "radar_lst", "connection_time_lst"])
result_data[fcc]["switched_ch_lst"] = []
result_data[fcc]["detection_time_lst"] = []
result_data[fcc]["radar_lst"] = []
result_data[fcc]["connection_time_lst"] = []
print("under", result_data)
for channel_s in channel_values:
"""#, 56, 60, 64, 100,104, 108, 112, 116, 120, 124 ,128, 132, 136, 140]:"""
print("checking for FCC0")
print("set channel to ", channel_s)
dfs.set_channel_in_ap_at_(args.ip, args.user, args.pswd, channel_s)
print("channel set checking....")
monitoring_station = dfs.monitor_station_(args.host, args.ssid, args.passwd, args.security, args.radio,
channel=str(channel_s))
monitoring_station = str(monitoring_station)
print(("channel set to ", monitoring_station))
channel_s = str(channel_s)
# compare if ch= 52 == set 52
if monitoring_station == channel_s:
print("station allocated to " + monitoring_station)
print("now generate radar on " + channel_s)
if channel_s == "52":
frequency = "5260000"
elif channel_s == "56":
frequency = "5280000"
elif channel_s == "60":
frequency = "5300000"
elif channel_s == "64":
frequency = "5320000"
elif channel_s == "100":
frequency = "5500000"
elif channel_s == "104":
frequency = "5520000"
elif channel_s == "108":
frequency = "5540000"
elif channel_s == "112":
frequency = "5560000"
elif channel_s == "116":
frequency = "5580000"
elif channel_s == "120":
frequency = "5600000"
elif channel_s == "124":
frequency = "5620000"
elif channel_s == "128":
frequency = "5640000"
elif channel_s == "132":
frequency = "5660000"
elif channel_s == "136":
frequency = "5680000"
elif channel_s == "140":
frequency = "5700000"
else:
print("Invalid")
continue
# now radar generate
generating_radar = dfs.generate_radar_at_ch(width_, interval_, count_, frequency)
client_connection_time = dfs.client_connect_time(args.host, args.ssid, args.passwd, args.security,
args.radio, channel_s)
print("client connection time ", client_connection_time)
connection_time_lst.append(client_connection_time)
time.sleep(2)
ap_logs = dfs.check_Radar_time(args.ip, args.user, args.pswd)
# print(type(ap_logs))
N = 2
res = list(islice(reversed(ap_logs), 0, N))
res.reverse()
if (str(res).__contains__("Radar detected on channel " + channel_s)):
data = "YES"
radar_lst.append(data)
print("yes")
if data == "YES":
print("checking channel assigned...")
detecting_radar_channel = dfs.channel_after_radar(args.host, args.ssid, args.passwd,
args.security,
args.radio)
# print("after radar channel is at ", detecting_radar_channel)
if detecting_radar_channel == "-1" or detecting_radar_channel == "AUTO" or detecting_radar_channel == channel_s:
print("TEST Fail")
print("client is at AUTO Channel")
pass_fail = "FAIL"
ch_af_radar = "AUTO"
print("after radar channel is at ", ch_af_radar)
switched_ch_lst.append(ch_af_radar)
print(pass_fail)
continue
else:
print("Test Pass")
pass_fail = "PASS"
ch_af_radar = detecting_radar_channel
print("after radar channel is at ", ch_af_radar)
switched_ch_lst.append(ch_af_radar)
ap_logs_check = dfs.check_Radar_time(args.ip, args.user, args.pswd)
print(type(ap_logs_check))
N = 2
res = list(islice(reversed(ap_logs_check), 0, N))
res.reverse()
for i in res:
if (str(res).__contains__("Radar detected on channel " + channel_s)):
print("YES")
x = res.index(i)
print("raadar time at index", x)
r_time = " "
for i in res[x][4:19]:
r_time = r_time + i
r_time = r_time.strip()
print("radar detected time", r_time)
# calculation
s1 = generating_radar
s2 = r_time # for example
FMT = '%b %d %H:%M:%S'
tdelta = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT)
# print("detection time for radar", tdelta)
# detection_time_lst.append(tdelta)
lst = str(tdelta).split(":")
seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2])
d_time = seconds
print("detection time ", d_time)
detection_time_lst.append(d_time)
else:
data = "NO"
pass_fail = "FAIL"
radar_lst.append(data)
d_time = "0"
print("detection time ", d_time)
detection_time_lst.append(0)
switched_ch_lst.append(monitoring_station)
print("no")
# radar detected(yes/no)
else:
monitoring_station = str(monitoring_station)
radar_lst.append("NO")
switched_ch_lst.append(monitoring_station)
detection_time_lst.append(0)
connection_time_lst.append(0)
print(("channel set to ", monitoring_station))
print("switched channel list ", switched_ch_lst)
print("radar detection list", radar_lst)
print("detection time list", detection_time_lst)
print("connection time list", connection_time_lst)
result_data[fcc]["switched_ch_lst"] = switched_ch_lst
result_data[fcc]["radar_lst"] = radar_lst
result_data[fcc]["detection_time_lst"] = detection_time_lst
result_data[fcc]["connection_time_lst"] = connection_time_lst
print("result ", result_data)
time.sleep(1800)
print("final result", result_data)
# # Try changing values for the same
now = datetime.now()
generate_report(result_data,
now,
args.ap_name,
args.ssid,
num_client=1,
graph_path="/home/lanforge/html-reports/dfs")
# pip install pdfkit
# sudo apt-get install wkhtmltopdf
# pdfkit.from_file('/home/lanforge/lanforge-scripts/py-scripts/demo.html', '/home/lanforge/lanforge-scripts/py-scripts/out.pdf')
print("Test Finished")
if __name__ == '__main__':
run()