From 624c032d1e9774166ad9fa0a665b9f3063c7c0b5 Mon Sep 17 00:00:00 2001 From: Nikita Yadav Date: Thu, 11 Feb 2021 12:58:51 +0530 Subject: [PATCH] removing duplicate files --- py-scripts/test_netgear_dfs.py | 447 --------------------------------- 1 file changed, 447 deletions(-) delete mode 100644 py-scripts/test_netgear_dfs.py diff --git a/py-scripts/test_netgear_dfs.py b/py-scripts/test_netgear_dfs.py deleted file mode 100644 index e8a99ebf..00000000 --- a/py-scripts/test_netgear_dfs.py +++ /dev/null @@ -1,447 +0,0 @@ -""" file under progress not ffor testing - AP R7800 -""" - - -import time -import threading -import os -import paramiko -from queue import Queue -from cx_time import IPv4Test - - - -class DFS_TESTING: - - - def _init_(self): - pass - - - def set_dfs_channel_in_ap(self): - ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key - ssh.connect('192.168.200.190', port=22, username='root', password='Lanforge12345!xzsawq@!') - stdin, stdout, stderr = ssh.exec_command('conf_set system:wlanSettings:wlanSettingTable:wlan1:channel 52') - output = stdout.readlines() - print('\n'.join(output)) - time.sleep(1) - exit(0) - - def create_station_on_GUI(self,y1,y2): - global var1 - self.y1 = y1 - self.y2 = y2 - cmd = "python3 sta_cx.py --mgr 192.168.200.13 --num_stations 1 --ssid TestAP95 --passwd lanforge --security wpa2 --radio wiphy0" - print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/home/lanforge/lanforge-scripts/py-scripts') - print("Current working directory: {0}".format(os.getcwd())) - x = os.popen(cmd).read() - print("station created") - y1 ='station created' - - with open("data.txt", "w")as f: - f.write(x) - f.close() - - file = open("data.txt", "r") - for i in file: - if "channel associated is " in i: - my_list = list(i.split(" ")) - print(my_list[3]) - print(type(my_list[3])) - var1 = my_list[3] - - print(var1) - var = var1.replace("\n", "") - - if var == "52" or var == "56" or var == "60" or var == "64" or var == "100" or var == "104" or var == "108" or var == "112" or var == "116" or var == "120" or var == "124" or var == "128" or var == "132" or var == "136" or var == "140": - print('Station is on DFS Channel') - self.y2 = 'station is on DFS Channel' - else: - print('Station is on Non DFS channel') - self.y2 = 'Station is on Non DFS channel' - - return (self.y1 , self.y2) - - - - - ''' ########### HACKRF ####################### ''' - - - def generate_radar_at_ch52(self, r): - self.r = r - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5260000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - #print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.r = "Radar detected" - return self.r - - def generate_radar_at_ch56(self,q): - self.q =q - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5280000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.q = "Radar detected" - return self.q - - def generate_radar_at_ch60(self,w): - self.w = w - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5300000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.w = "Radar detected" - return self.w - - def generate_radar_at_ch64(self,e): - self.e = e - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5320000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.e = "Radar detected" - return self.e - - def generate_radar_at_ch100(self,f): - self.f = f - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5500000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.f = "Radar received" - return self.f - - def generate_radar_at_ch104(self,t): - self.t = t - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5520000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.t = "Radar detected" - return self.t - - def generate_radar_at_ch108(self,u): - self.u=u - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5540000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.u = "Radar detected" - return self.u - - def generate_radar_at_ch112(self,i): - self.i=i - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5560000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.i = "Radar detected" - return self.i - - def generate_radar_at_ch116(self,o): - self.o =o - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5280000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.o = "Radar detected" - return self.o - - def generate_radar_at_ch120(self,p): - self.p=p - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5600000" - #print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.p = "Radar detected" - return self.p - - def generate_radar_at_ch124(self,a): - self.a=a - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5620000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.a = "Radar detected" - return self.a - - def generate_radar_at_ch128(self,s): - self.s=s - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5640000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.s = "Radar detected" - return self.s - - def generate_radar_at_ch132(self,d): - self.d = d - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5660000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.d = "Radar detected" - return self.d - - def generate_radar_at_ch136(self,h): - self.h=h - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5680000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.h = "Radar detected" - return self.h - - def generate_radar_at_ch140(self,j): - self.j = j - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5700000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - self.j = "Radar detected" - return self.j - - def hackrf_status_off(self): - - cmd = "sudo python lf_hackrf.py --pulse_width 1 --pulse_interval 1428 --pulse_count 18 --sweep_time 1000 --freq 5220000" - # print("Current working directory: {0}".format(os.getcwd())) - os.chdir('/usr/lib64/python2.7/site-packages/') - # print("Current working directory: {0}".format(os.getcwd())) - os.system(cmd) - - - def monitor_station_channel(self,m): - self.m = m - obj = IPv4Test(_host="192.168.200.13", - _port=8080, - _ssid="TestAP95", - _password="lanforge", - _security="wpa2", - _radio="wiphy0") - obj.cleanup(obj.sta_list) - obj.build() - obj.station_profile.admin_up() - obj.local_realm.wait_for_ip(obj.sta_list) - time.sleep(30) - var = obj.json_get("/port/1/1/sta0000?fields=channel") - var_1 = var['interface']['channel'] - self.m = var_1 - return self.m - - - def aps_radio_off(self): - pass - - def aps_not_switch_automatically(self): - pass - - def check_ap_channel_switching_time(self): - pass - - - -def main(): - - - - - dfs = DFS_TESTING() - - que = Queue() - - - - ''' algorithm and sequence to be followed ''' - - print("Hackrf is ON") - print("press s --> enter --> q to stop hackrf") - dfs.hackrf_status_off() - print("Now hackrf is OFF") - - #set channel on ap //netgear - - threads_list = [] - t1 = threading.Thread(target=lambda q, arg1, arg2: q.put(dfs.create_station_on_GUI(arg1, arg2)), args=(que, "", "")) - t1.start() - threads_list.append(t1) - t1.join() - - # Check thread's return value - global my_var - - result = que.get() - print("hi i reached", result) - my_var = result - - list_1 = list(my_var) - print("my list", list_1) - - if any("station is on DFS Channel" in s for s in list_1): - t2 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, "")) - t2.start() - threads_list.append(t2) - t2.join() - x = que.get() - print("result", x) - else: - print("radar unreachable") - - t3=threading.Thread(target=lambda q, arg1: q.put(dfs.monitor_station_channel(arg1)), args=(que, "")) - t3.start() - threads_list.append(t3) - t3.join() - y = que.get() - print("channel after radar is ", y) - - if y == "52": - print("station is on DFS Channel") - t4 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, "")) - t4.start() - t4.join() - elif y == "56": - print("station is on DFS Channel 56") - t5 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch56(arg1)), args=(que, "")) - t5.start() - t5.join() - elif y == "60": - print("station is on DFS Channel 60") - t6 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch60(arg1)), args=(que, "")) - t6.start() - t6.join() - elif y == "64": - print("station is on DFS Channel 64") - t7 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch64(arg1)), args=(que, "")) - t7.start() - t7.join() - elif y == "100": - print("station is on DFS Channel 100") - t8 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch100(arg1)), args=(que, "")) - t8.start() - t8.join() - elif y == "104": - print("station is on DFS Channel 104") - t9 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch104(arg1)), args=(que, "")) - t9.start() - t9.join() - elif y == "108": - print("station is on DFS Channel 108") - t10 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch108(arg1)), args=(que, "")) - t10.start() - t10.join() - elif y == "112": - print("station is on DFS Channel 112") - t11 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch112(arg1)), args=(que, "")) - t11.start() - t11.join() - elif y == "116": - print("station is on DFS Channel 116") - t12 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch116(arg1)), args=(que, "")) - t12.start() - t12.join() - elif y == "120": - print("station is on DFS Channel 120") - t13 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch120(arg1)), args=(que, "")) - t13.start() - t13.join() - elif y == "124": - print("station is on DFS Channel 124") - t14 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch124(arg1)), args=(que, "")) - t14.start() - t14.join() - elif y == "128": - print("station is on DFS Channel 128") - t15 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch128(arg1)), args=(que, "")) - t15.start() - t15.join() - elif y == "132": - print("station is on DFS Channel 132") - t16 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch132(arg1)), args=(que, "")) - t16.start() - t16.join() - elif y == "136": - print("station is on DFS Channel 136") - t17 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch136(arg1)), args=(que, "")) - t17.start() - t17.join() - elif y == "140": - print("station is on DFS Channel 140") - t18 =threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch140(arg1)), args=(que, "")) - t18.start() - t18.join() - else: - print("station is on NON DFS Channel") - - - - - - - - - """t2 = threading.Thread(target=lambda q, arg1: q.put(dfs.generate_radar_at_ch52(arg1)), args=(que, "")) - t2.start() - threads_list.append(t2) - t2.join()""" - - - - - # Join all the threads - """for t in threads_list: - t.join()""" - - - - """print("my var", my_var) - empty_list = [] - list = empty_list.append(my_var) - print("list", list)""" - - - '''t2 = threading.Thread(target=dfs.generate_radar_at_ch100()) - t2.start() - t2.join() - print("radar received") - - t3 = threading.Thread(target=dfs.create_station_on_GUI()) - t3.start() - t3.join() - print("station reassociated")''' - - - - - '''dfs.hackrf_status_off() - dfs.aps_radio_off() - dfs.aps_not_switch_automatically() - #generate radar and check for all dfs channels - dfs.check_ap_channel_switching_time() - #after testing turn off hackrf''' - - - - - - -if __name__ == '__main__': - main()