Files
wlan-lanforge-scripts/py-scripts/test_netgear_dfs.py
2021-01-19 20:25:48 +05:30

448 lines
16 KiB
Python

""" file under progress not ffor testing - AP R7800
"""
import time
import threading
import os
import paramiko
from queue import Queue
from cx_time import IPv4Test
class DFS_TESTING:
def _init_(self):
pass
def set_dfs_channel_in_ap(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
ssh.connect('192.168.200.190', port=22, username='root', password='Lanforge12345!xzsawq@!')
stdin, stdout, stderr = ssh.exec_command('conf_set system:wlanSettings:wlanSettingTable:wlan1:channel 52')
output = stdout.readlines()
print('\n'.join(output))
time.sleep(1)
exit(0)
def create_station_on_GUI(self,y1,y2):
global var1
self.y1 = y1
self.y2 = y2
cmd = "python3 sta_cx.py --mgr 192.168.200.13 --num_stations 1 --ssid TestAP95 --passwd lanforge --security wpa2 --radio wiphy0"
print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/home/lanforge/lanforge-scripts/py-scripts')
print("Current working directory: {0}".format(os.getcwd()))
x = os.popen(cmd).read()
print("station created")
y1 ='station created'
with open("data.txt", "w")as f:
f.write(x)
f.close()
file = open("data.txt", "r")
for i in file:
if "channel associated is " in i:
my_list = list(i.split(" "))
print(my_list[3])
print(type(my_list[3]))
var1 = my_list[3]
print(var1)
var = var1.replace("\n", "")
if var == "52" or var == "56" or var == "60" or var == "64" or var == "100" or var == "104" or var == "108" or var == "112" or var == "116" or var == "120" or var == "124" or var == "128" or var == "132" or var == "136" or var == "140":
print('Station is on DFS Channel')
self.y2 = 'station is on DFS Channel'
else:
print('Station is on Non DFS channel')
self.y2 = 'Station is on Non DFS channel'
return (self.y1 , self.y2)
''' ########### HACKRF ####################### '''
def generate_radar_at_ch52(self, r):
self.r = r
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5260000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
#print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.r = "Radar detected"
return self.r
def generate_radar_at_ch56(self,q):
self.q =q
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5280000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.q = "Radar detected"
return self.q
def generate_radar_at_ch60(self,w):
self.w = w
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5300000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.w = "Radar detected"
return self.w
def generate_radar_at_ch64(self,e):
self.e = e
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5320000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.e = "Radar detected"
return self.e
def generate_radar_at_ch100(self,f):
self.f = f
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5500000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.f = "Radar received"
return self.f
def generate_radar_at_ch104(self,t):
self.t = t
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5520000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.t = "Radar detected"
return self.t
def generate_radar_at_ch108(self,u):
self.u=u
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5540000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.u = "Radar detected"
return self.u
def generate_radar_at_ch112(self,i):
self.i=i
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5560000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.i = "Radar detected"
return self.i
def generate_radar_at_ch116(self,o):
self.o =o
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5280000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.o = "Radar detected"
return self.o
def generate_radar_at_ch120(self,p):
self.p=p
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5600000"
#print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.p = "Radar detected"
return self.p
def generate_radar_at_ch124(self,a):
self.a=a
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5620000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.a = "Radar detected"
return self.a
def generate_radar_at_ch128(self,s):
self.s=s
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5640000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.s = "Radar detected"
return self.s
def generate_radar_at_ch132(self,d):
self.d = d
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5660000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.d = "Radar detected"
return self.d
def generate_radar_at_ch136(self,h):
self.h=h
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5680000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.h = "Radar detected"
return self.h
def generate_radar_at_ch140(self,j):
self.j = j
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5700000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
self.j = "Radar detected"
return self.j
def hackrf_status_off(self):
cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5220000"
# print("Current working directory: {0}".format(os.getcwd()))
os.chdir('/usr/lib64/python2.7/site-packages/')
# print("Current working directory: {0}".format(os.getcwd()))
os.system(cmd)
def monitor_station_channel(self,m):
self.m = m
obj = IPv4Test(_host="192.168.200.13",
_port=8080,
_ssid="TestAP95",
_password="lanforge",
_security="wpa2",
_radio="wiphy0")
obj.cleanup(obj.sta_list)
obj.build()
obj.station_profile.admin_up()
obj.local_realm.wait_for_ip(obj.sta_list)
time.sleep(30)
var = obj.json_get("/port/1/1/sta0000?fields=channel")
var_1 = var['interface']['channel']
self.m = var_1
return self.m
def aps_radio_off(self):
pass
def aps_not_switch_automatically(self):
pass
def check_ap_channel_switching_time(self):
pass
def main():
dfs = DFS_TESTING()
que = Queue()
''' algorithm and sequence to be followed '''
print("Hackrf is ON")
print("press s --> enter --> q to stop hackrf")
dfs.hackrf_status_off()
print("Now hackrf is OFF")
#set channel on ap //netgear
threads_list = []
t1 = threading.Thread(target=lambda q, arg1, arg2: q.put(dfs.create_station_on_GUI(arg1, arg2)), args=(que, "", ""))
t1.start()
threads_list.append(t1)
t1.join()
# Check thread's return value
global my_var
result = que.get()
print("hi i reached", result)
my_var = result
list_1 = list(my_var)
print("my list", list_1)
if any("station is on DFS Channel" in s for s in list_1):
t2 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, ""))
t2.start()
threads_list.append(t2)
t2.join()
x = que.get()
print("result", x)
else:
print("radar unreachable")
t3=threading.Thread(target=lambda q, arg1: q.put(dfs.monitor_station_channel(arg1)), args=(que, ""))
t3.start()
threads_list.append(t3)
t3.join()
y = que.get()
print("channel after radar is ", y)
if y == "52":
print("station is on DFS Channel")
t4 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, ""))
t4.start()
t4.join()
elif y == "56":
print("station is on DFS Channel 56")
t5 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch56(arg1)), args=(que, ""))
t5.start()
t5.join()
elif y == "60":
print("station is on DFS Channel 60")
t6 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch60(arg1)), args=(que, ""))
t6.start()
t6.join()
elif y == "64":
print("station is on DFS Channel 64")
t7 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch64(arg1)), args=(que, ""))
t7.start()
t7.join()
elif y == "100":
print("station is on DFS Channel 100")
t8 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch100(arg1)), args=(que, ""))
t8.start()
t8.join()
elif y == "104":
print("station is on DFS Channel 104")
t9 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch104(arg1)), args=(que, ""))
t9.start()
t9.join()
elif y == "108":
print("station is on DFS Channel 108")
t10 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch108(arg1)), args=(que, ""))
t10.start()
t10.join()
elif y == "112":
print("station is on DFS Channel 112")
t11 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch112(arg1)), args=(que, ""))
t11.start()
t11.join()
elif y == "116":
print("station is on DFS Channel 116")
t12 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch116(arg1)), args=(que, ""))
t12.start()
t12.join()
elif y == "120":
print("station is on DFS Channel 120")
t13 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch120(arg1)), args=(que, ""))
t13.start()
t13.join()
elif y == "124":
print("station is on DFS Channel 124")
t14 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch124(arg1)), args=(que, ""))
t14.start()
t14.join()
elif y == "128":
print("station is on DFS Channel 128")
t15 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch128(arg1)), args=(que, ""))
t15.start()
t15.join()
elif y == "132":
print("station is on DFS Channel 132")
t16 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch132(arg1)), args=(que, ""))
t16.start()
t16.join()
elif y == "136":
print("station is on DFS Channel 136")
t17 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch136(arg1)), args=(que, ""))
t17.start()
t17.join()
elif y == "140":
print("station is on DFS Channel 140")
t18 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch140(arg1)), args=(que, ""))
t18.start()
t18.join()
else:
print("station is on NON DFS Channel")
"""t2 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, ""))
t2.start()
threads_list.append(t2)
t2.join()"""
# Join all the threads
"""for t in threads_list:
t.join()"""
"""print("my var", my_var)
empty_list = []
list = empty_list.append(my_var)
print("list", list)"""
'''t2 = threading.Thread(target=dfs.generate_radar_at_ch100())
t2.start()
t2.join()
print("radar received")
t3 = threading.Thread(target=dfs.create_station_on_GUI())
t3.start()
t3.join()
print("station reassociated")'''
'''dfs.hackrf_status_off()
dfs.aps_radio_off()
dfs.aps_not_switch_automatically()
#generate radar and check for all dfs channels
dfs.check_ap_channel_switching_time()
#after testing turn off hackrf'''
if __name__ == '__main__':
main()