""" file under progress not ffor testing - AP R7800 """ import time import threading import os import paramiko from queue import Queue from cx_time import IPv4Test class DFS_TESTING: def _init_(self): pass def set_dfs_channel_in_ap(self): ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key ssh.connect('192.168.200.190', port=22, username='root', password='Lanforge12345!xzsawq@!') stdin, stdout, stderr = ssh.exec_command('conf_set system:wlanSettings:wlanSettingTable:wlan1:channel 52') output = stdout.readlines() print('\n'.join(output)) time.sleep(1) exit(0) def create_station_on_GUI(self,y1,y2): global var1 self.y1 = y1 self.y2 = y2 cmd = "python3 sta_cx.py --mgr 192.168.200.13 --num_stations 1 --ssid TestAP95 --passwd lanforge --security wpa2 --radio wiphy0" print("Current working directory: {0}".format(os.getcwd())) os.chdir('/home/lanforge/lanforge-scripts/py-scripts') print("Current working directory: {0}".format(os.getcwd())) x = os.popen(cmd).read() print("station created") y1 ='station created' with open("data.txt", "w")as f: f.write(x) f.close() file = open("data.txt", "r") for i in file: if "channel associated is " in i: my_list = list(i.split(" ")) print(my_list[3]) print(type(my_list[3])) var1 = my_list[3] print(var1) var = var1.replace("\n", "") if var == "52" or var == "56" or var == "60" or var == "64" or var == "100" or var == "104" or var == "108" or var == "112" or var == "116" or var == "120" or var == "124" or var == "128" or var == "132" or var == "136" or var == "140": print('Station is on DFS Channel') self.y2 = 'station is on DFS Channel' else: print('Station is on Non DFS channel') self.y2 = 'Station is on Non DFS channel' return (self.y1 , self.y2) ''' ########### HACKRF ####################### ''' def generate_radar_at_ch52(self, r): self.r = r cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5260000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') #print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.r = "Radar detected" return self.r def generate_radar_at_ch56(self,q): self.q =q cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5280000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.q = "Radar detected" return self.q def generate_radar_at_ch60(self,w): self.w = w cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5300000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.w = "Radar detected" return self.w def generate_radar_at_ch64(self,e): self.e = e cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5320000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.e = "Radar detected" return self.e def generate_radar_at_ch100(self,f): self.f = f cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5500000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.f = "Radar received" return self.f def generate_radar_at_ch104(self,t): self.t = t cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5520000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.t = "Radar detected" return self.t def generate_radar_at_ch108(self,u): self.u=u cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5540000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.u = "Radar detected" return self.u def generate_radar_at_ch112(self,i): self.i=i cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5560000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.i = "Radar detected" return self.i def generate_radar_at_ch116(self,o): self.o =o cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5280000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.o = "Radar detected" return self.o def generate_radar_at_ch120(self,p): self.p=p cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5600000" #print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.p = "Radar detected" return self.p def generate_radar_at_ch124(self,a): self.a=a cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5620000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.a = "Radar detected" return self.a def generate_radar_at_ch128(self,s): self.s=s cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5640000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.s = "Radar detected" return self.s def generate_radar_at_ch132(self,d): self.d = d cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5660000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.d = "Radar detected" return self.d def generate_radar_at_ch136(self,h): self.h=h cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5680000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.h = "Radar detected" return self.h def generate_radar_at_ch140(self,j): self.j = j cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5700000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) self.j = "Radar detected" return self.j def hackrf_status_off(self): cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5220000" # print("Current working directory: {0}".format(os.getcwd())) os.chdir('/usr/lib64/python2.7/site-packages/') # print("Current working directory: {0}".format(os.getcwd())) os.system(cmd) def monitor_station_channel(self,m): self.m = m obj = IPv4Test(_host="192.168.200.13", _port=8080, _ssid="TestAP95", _password="lanforge", _security="wpa2", _radio="wiphy0") obj.cleanup(obj.sta_list) obj.build() obj.station_profile.admin_up() obj.local_realm.wait_for_ip(obj.sta_list) time.sleep(30) var = obj.json_get("/port/1/1/sta0000?fields=channel") var_1 = var['interface']['channel'] self.m = var_1 return self.m def aps_radio_off(self): pass def aps_not_switch_automatically(self): pass def check_ap_channel_switching_time(self): pass def main(): dfs = DFS_TESTING() que = Queue() ''' algorithm and sequence to be followed ''' print("Hackrf is ON") print("press s --> enter --> q to stop hackrf") dfs.hackrf_status_off() print("Now hackrf is OFF") #set channel on ap //netgear threads_list = [] t1 = threading.Thread(target=lambda q, arg1, arg2: q.put(dfs.create_station_on_GUI(arg1, arg2)), args=(que, "", "")) t1.start() threads_list.append(t1) t1.join() # Check thread's return value global my_var result = que.get() print("hi i reached", result) my_var = result list_1 = list(my_var) print("my list", list_1) if any("station is on DFS Channel" in s for s in list_1): t2 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, "")) t2.start() threads_list.append(t2) t2.join() x = que.get() print("result", x) else: print("radar unreachable") t3=threading.Thread(target=lambda q, arg1: q.put(dfs.monitor_station_channel(arg1)), args=(que, "")) t3.start() threads_list.append(t3) t3.join() y = que.get() print("channel after radar is ", y) if y == "52": print("station is on DFS Channel") t4 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, "")) t4.start() t4.join() elif y == "56": print("station is on DFS Channel 56") t5 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch56(arg1)), args=(que, "")) t5.start() t5.join() elif y == "60": print("station is on DFS Channel 60") t6 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch60(arg1)), args=(que, "")) t6.start() t6.join() elif y == "64": print("station is on DFS Channel 64") t7 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch64(arg1)), args=(que, "")) t7.start() t7.join() elif y == "100": print("station is on DFS Channel 100") t8 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch100(arg1)), args=(que, "")) t8.start() t8.join() elif y == "104": print("station is on DFS Channel 104") t9 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch104(arg1)), args=(que, "")) t9.start() t9.join() elif y == "108": print("station is on DFS Channel 108") t10 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch108(arg1)), args=(que, "")) t10.start() t10.join() elif y == "112": print("station is on DFS Channel 112") t11 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch112(arg1)), args=(que, "")) t11.start() t11.join() elif y == "116": print("station is on DFS Channel 116") t12 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch116(arg1)), args=(que, "")) t12.start() t12.join() elif y == "120": print("station is on DFS Channel 120") t13 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch120(arg1)), args=(que, "")) t13.start() t13.join() elif y == "124": print("station is on DFS Channel 124") t14 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch124(arg1)), args=(que, "")) t14.start() t14.join() elif y == "128": print("station is on DFS Channel 128") t15 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch128(arg1)), args=(que, "")) t15.start() t15.join() elif y == "132": print("station is on DFS Channel 132") t16 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch132(arg1)), args=(que, "")) t16.start() t16.join() elif y == "136": print("station is on DFS Channel 136") t17 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch136(arg1)), args=(que, "")) t17.start() t17.join() elif y == "140": print("station is on DFS Channel 140") t18 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch140(arg1)), args=(que, "")) t18.start() t18.join() else: print("station is on NON DFS Channel") """t2 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, "")) t2.start() threads_list.append(t2) t2.join()""" # Join all the threads """for t in threads_list: t.join()""" """print("my var", my_var) empty_list = [] list = empty_list.append(my_var) print("list", list)""" '''t2 = threading.Thread(target=dfs.generate_radar_at_ch100()) t2.start() t2.join() print("radar received") t3 = threading.Thread(target=dfs.create_station_on_GUI()) t3.start() t3.join() print("station reassociated")''' '''dfs.hackrf_status_off() dfs.aps_radio_off() dfs.aps_not_switch_automatically() #generate radar and check for all dfs channels dfs.check_ap_channel_switching_time() #after testing turn off hackrf''' if __name__ == '__main__': main()