From d2c67bf94450e69d819684705f1c805c05468480 Mon Sep 17 00:00:00 2001 From: Nikita Yadav Date: Mon, 8 Feb 2021 15:14:10 +0530 Subject: [PATCH] main dfs code --- py-scripts/new_test.py | 612 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 612 insertions(+) create mode 100644 py-scripts/new_test.py diff --git a/py-scripts/new_test.py b/py-scripts/new_test.py new file mode 100644 index 00000000..7b0f82a3 --- /dev/null +++ b/py-scripts/new_test.py @@ -0,0 +1,612 @@ +""" This the main file from where dfs test run """ + +import os +import signal +import paramiko +import time +import threading +import subprocess +from station_layer3 import STATION +import argparse +from threading import Thread +from itertools import islice +import datetime +from datetime import datetime + +import pexpect +import sys +from itertools import islice +from matplotlib import pyplot as plt +import numpy as np + + +# import pdfkit + +class DFS_TEST: + def __init__(self, ip, user, pswd, host, ssid, passwd, security, radio): + self.ip = ip + self.user = user + self.pswd = pswd + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + def set_channel_in_ap_at_(self, ip, user, pswd, channel): + self.ip = ip + self.user = user + self.pswd = pswd + channel = str(channel) + ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + ssh.connect(ip, port=22, username=user, password=pswd) + command = "conf_set system:wlanSettings:wlanSettingTable:wlan1:channel " + channel + stdin, stdout, stderr = ssh.exec_command(str(command)) + output = stdout.readlines() + # print('\n'.join(output)) + time.sleep(10) + + def monitor_station_(self, host, ssid, passwd, security, radio, channel): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = (var['interface']['channel']) + if var_1 == channel: + print("channel at" + channel) + return var_1 + else: + print("wait untill channel assigned") + timeout = time.time() + 60 * 10 + + while var_1 != channel: + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + time.sleep(1) + if time.time() > timeout: + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + break + + return var_1 + + def create_station_cx(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + cmd = "python3 station_layer3.py -hst localhost -s " + self.ssid + " -pwd " + self.passwd + " -sec " + self.security + " -rad " + self.radio + os.chdir('/home/lanforge/lanforge-scripts/py-scripts') + os.system(cmd) + + def monitor_untill_channel_assigned(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = (var['interface']['channel']) + + timeout = time.time() + 60 * 15 # 15min + while var_1 == "-1": + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + time.sleep(1) + if time.time() > timeout: + break + + return var_1 + + def generate_radar_at_ch(self, width_, interval_, count_, frequency): + current_time = datetime.now() + print("Current date and time : ") + current_time = current_time.strftime("%b %d %H:%M:%S") + print("time stamp of radar send", current_time) + # print("width", width_) + + os.chdir('/usr/lib64/python2.7/site-packages/') + frequency = str(frequency) + width_ = str(width_) + interval_ = str(interval_) + count_ = str(count_) + command_hackRF = "sudo python lf_hackrf.py --pulse_width " + width_ + " --pulse_interval " + interval_ + " --pulse_count " + count_ + " --sweep_time 1000 --freq " + frequency + command_hackRF = str(command_hackRF) + ch = pexpect.spawn(command_hackRF) + time.sleep(5) + ch.expect('>>>') + ch.sendline('s') + ch.expect('>>>') + ch.sendline('q') + time.sleep(1) + + return current_time + + def client_connect_time(self, host, ssid, passwd, security, radio, channel): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = (var['interface']['channel']) + + timeout_ = time.time() + 60 * 2 # 2min + while var_1 != "-1" : + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + time.sleep(1) + if time.time() > timeout_: + break + if var_1 == "-1" : + current_time = datetime.now() + print("Current date and time : ") + disconnect_time = current_time.strftime("%b %d %H:%M:%S") + print("station disconnection time", disconnect_time) + + timeout = time.time() + 60 * 15 # 15min + while var_1 == "-1": + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + time.sleep(1) + if time.time() > timeout: + break + current_time1 = datetime.now() + print("Current date and time : ") + connect_time = current_time1.strftime("%b %d %H:%M:%S") + print("station connection time", connect_time) + + """var_2 = obj1.json_get("/cx/L3Teststa0000-0") + # print(type(var)) + var_1 = (var_2['L3Teststa0000-0']['state']) + timeout = time.time() + 60 * 2 # 15min + while var_1 != "Run": + var = obj1.json_get("/cx/L3Teststa0000-0") + # print(type(var)) + var_1 = (var['L3Teststa0000-0']['state']) + time.sleep(1) + if time.time() > timeout: + break + current_time_ = datetime.now() + print("Current date and time : ") + traffic_time1 = current_time_.strftime("%H:%M:%S") + print("time at which traffic started", traffic_time1)""" + s1 = disconnect_time + s2 = connect_time # for example + FMT = '%b %d %H:%M:%S' + c_time = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) + print("connection time ", c_time) + # print("detection time for radar", tdelta) + # detection_time_lst.append(tdelta) + + lst = str(c_time).split(":") + seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2]) + c_time = seconds + print("connection time ", c_time) + else: + disconnect_time = "00:00:00" + print("station disconnection time", disconnect_time) + connect_time = "00:00:00" + print("time at which traffic started", connect_time) + c_time = 0 + print("connection time ", c_time) + + + + + return c_time + + def channel_after_radar(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = (var['interface']['channel']) + return var_1 + + def monitor_cx(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + var = obj1.json_get("/cx/L3Teststa0000-0") + # print(type(var)) + var_1 = (var['L3Teststa0000-0']['state']) + timeout = time.time() + 60 * 2 # 15min + while var_1 != "Run": + var = obj1.json_get("/cx/L3Teststa0000-0") + # print(type(var)) + var_1 = (var['L3Teststa0000-0']['state']) + time.sleep(1) + if time.time() > timeout: + break + # print(var_1) + return var_1 + + def monitor_untill_connection_time(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + var = obj1.json_get("/port/1/1/sta0000?fields=cx%20time%20(us)") + var_1 = (var['interface']['cx time (us)']) + return var_1 + + def check_Radar_time(self, ip, user, pswd): + self.ip = ip + self.user = user + self.pswd = pswd + + ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + ssh.connect(ip, port=22, username=user, password=pswd) + stdin, stdout, stderr = ssh.exec_command('cat /tmp/log/messages | grep Radar') + output = stdout.readlines() + # print('\n'.join(output)) + time.sleep(1) + return output + + def monitor_channel_available_time(self, ip, user, pswd): + self.ip = ip + self.user = user + self.pswd = pswd + + ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + ssh.connect(ip, port=22, username=user, password=pswd) + stdin, stdout, stderr = ssh.exec_command('iwlist channel ') + output = stdout.readlines() + # print('\n'.join(output)) + time.sleep(1) + return output + +def run(): + parser = argparse.ArgumentParser(description="Netgear AP DFS Test Script") + parser.add_argument('-i', '--ip', type=str, help='AP ip') + parser.add_argument('-u', '--user', type=str, help='credentials login/username') + parser.add_argument('-p', '--pswd', type=str, help='credential password') + parser.add_argument('-hst', '--host', type=str, help='host name') + parser.add_argument('-s', '--ssid', type=str, help='ssid for client') + parser.add_argument('-pwd', '--passwd', type=str, help='password to connect to ssid') + parser.add_argument('-sec', '--security', type=str, help='security') + parser.add_argument('-rad', '--radio', type=str, help='radio at which client will be connected') + parser.add_argument('-n', '--name', type=str, help='Type Name of AP on which test is performed') + #add(connect_client- true or false) + #add(run-traffic - t/false) + #check_only radar + # + args = parser.parse_args() + + if (args.name is not None): + AP_name = args.name + + if (args.ssid is not None): + ssid = args.ssid + time.sleep(1800) + dfs = DFS_TEST(args.ip, args.user, args.pswd, args.host, args.ssid, args.passwd, args.security, args.radio) + + # To create a stations + dfs.create_station_cx(args.host, args.ssid, args.passwd, args.security, args.radio) + fcc_types = ["FCCO", "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6"] + # , "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6" + result_data = dict.fromkeys(fcc_types) + # set channel + width_ = "" + interval_ = "" + count_ = "" + print(result_data) + for fcc in fcc_types: + # 1, 2, 3, 4, 5, 11, 12, 13, 14, 15, 16 + # "FCCO" , "FCC1" , "FCC2" , "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6" + if fcc == "FCCO": + width = "1" + interval = "1428" + count = "18" + width_ = width + print("width", width_) + interval_ = interval + count_ = count + elif fcc == "FCC1": + width = "1" + interval = "1163" + count = "18" + width_ = width + interval_ = interval + count_ = count + elif fcc == "FCC2": + width = "2" + interval = "208" + count = "28" + width_ = width + interval_ = interval + count_ = count + elif fcc == "FCC3": + width = "7" + interval = "365" + count = "16" + width_ = width + interval_ = interval + count_ = count + elif fcc == "FCC4": + width = "16" + interval = "271" + count = "12" + width_ = width + interval_ = interval + count_ = count + elif fcc == "FCC5": + width = "70" + interval = "1975" + count = "3" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI1": + width = "5" + interval = "342" + count = "10" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI2": + width = "2" + interval = "1271" + count = "15" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI3": + width = "15" + interval = "3280" + count = "25" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI4": + width = "24" + interval = "2477" + count = "20" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI5": + width = "1" + interval = "356" + count = "10" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI6": + width = "2" + interval = "1091" + count = "15" + width_ = width + interval_ = interval + count_ = count + + switched_ch_lst = [] + detection_time_lst = [] + connection_time_lst = [] + radar_lst = [] + + channel_values = [52, 56, 60, 64, 100,104, 108, 112, 116, 120, 124 ,128, 132, 136, 140] + + result_data[fcc] = dict.fromkeys(["switched_ch_lst", "detection_time_lst", "radar_lst", "connection_time_lst"]) + + result_data[fcc]["switched_ch_lst"] = [] + result_data[fcc]["detection_time_lst"] = [] + result_data[fcc]["radar_lst"] = [] + result_data[fcc]["connection_time_lst"] = [] + + print("under", result_data) + + + for channel_s in channel_values: + """#, 56, 60, 64, 100,104, 108, 112, 116, 120, 124 ,128, 132, 136, 140]:""" + print("checking for FCC0") + + print("set channel to ", channel_s) + + dfs.set_channel_in_ap_at_(args.ip, args.user, args.pswd, channel_s) + + print("channel set checking....") + monitoring_station = dfs.monitor_station_(args.host, args.ssid, args.passwd, args.security, args.radio, + channel=str(channel_s)) + monitoring_station = str(monitoring_station) + print(("channel set to ", monitoring_station)) + + channel_s = str(channel_s) + # compare if ch= 52 == set 52 + if monitoring_station == channel_s: + print("station allocated to " + monitoring_station) + print("now generate radar on " + channel_s) + + if channel_s == "52": + frequency = "5260000" + elif channel_s == "56": + frequency = "5280000" + elif channel_s == "60": + frequency = "5300000" + elif channel_s == "64": + frequency = "5320000" + elif channel_s == "100": + frequency = "5500000" + elif channel_s == "104": + frequency = "5520000" + elif channel_s == "108": + frequency = "5540000" + elif channel_s == "112": + frequency = "5560000" + elif channel_s == "116": + frequency = "5580000" + elif channel_s == "120": + frequency = "5600000" + elif channel_s == "124": + frequency = "5620000" + elif channel_s == "128": + frequency = "5640000" + elif channel_s == "132": + frequency = "5660000" + elif channel_s == "136": + frequency = "5680000" + elif channel_s == "140": + frequency = "5700000" + else: + print("Invalid") + continue + + # now radar generate + generating_radar = dfs.generate_radar_at_ch(width_, interval_, count_, frequency) + + client_connection_time = dfs.client_connect_time(args.host, args.ssid, args.passwd, args.security, + args.radio, channel_s) + print("client connection time ", client_connection_time) + connection_time_lst.append(client_connection_time) + + time.sleep(2) + ap_logs = dfs.check_Radar_time(args.ip, args.user, args.pswd) + + # print(type(ap_logs)) + N = 2 + res = list(islice(reversed(ap_logs), 0, N)) + res.reverse() + if (str(res).__contains__("Radar detected on channel " + channel_s)): + data = "YES" + radar_lst.append(data) + print("yes") + if data == "YES": + + print("checking channel assigned...") + + detecting_radar_channel = dfs.channel_after_radar(args.host, args.ssid, args.passwd, + args.security, + args.radio) + # print("after radar channel is at ", detecting_radar_channel) + + if detecting_radar_channel == "-1" or detecting_radar_channel == "AUTO" or detecting_radar_channel == channel_s: + print("TEST Fail") + print("client is at AUTO Channel") + pass_fail = "FAIL" + ch_af_radar = "AUTO" + print("after radar channel is at ", ch_af_radar) + switched_ch_lst.append(ch_af_radar) + + print(pass_fail) + continue + else: + print("Test Pass") + pass_fail = "PASS" + + ch_af_radar = detecting_radar_channel + print("after radar channel is at ", ch_af_radar) + switched_ch_lst.append(ch_af_radar) + + ap_logs_check = dfs.check_Radar_time(args.ip, args.user, args.pswd) + + print(type(ap_logs_check)) + N = 2 + res = list(islice(reversed(ap_logs_check), 0, N)) + res.reverse() + for i in res: + if (str(res).__contains__("Radar detected on channel " + channel_s)): + print("YES") + x = res.index(i) + print("raadar time at index", x) + r_time = " " + for i in res[x][4:19]: + r_time = r_time + i + r_time = r_time.strip() + print("radar detected time", r_time) + + # calculation + s1 = generating_radar + s2 = r_time # for example + FMT = '%b %d %H:%M:%S' + tdelta = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) + # print("detection time for radar", tdelta) + # detection_time_lst.append(tdelta) + + lst = str(tdelta).split(":") + seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2]) + d_time = seconds + print("detection time ", d_time) + detection_time_lst.append(d_time) + + else: + data = "NO" + pass_fail = "FAIL" + radar_lst.append(data) + d_time = "0" + print("detection time ", d_time) + detection_time_lst.append(0) + switched_ch_lst.append(monitoring_station) + print("no") + # radar detected(yes/no) + else: + monitoring_station = str(monitoring_station) + radar_lst.append("NO") + switched_ch_lst.append(monitoring_station) + detection_time_lst.append(0) + connection_time_lst.append(0) + print(("channel set to ", monitoring_station)) + + print("switched channel list ", switched_ch_lst) + print("radar detection list", radar_lst) + print("detection time list", detection_time_lst) + print("connection time list", connection_time_lst) + + result_data[fcc]["switched_ch_lst"] = switched_ch_lst + result_data[fcc]["radar_lst"] = radar_lst + result_data[fcc]["detection_time_lst"] = detection_time_lst + result_data[fcc]["connection_time_lst"] = connection_time_lst + print("result ", result_data) + time.sleep(1800) + + + print("final result", result_data) + + # # Try changing values for the same + now = datetime.now() + + generate_report(result_data, + now, + args.ap_name, + args.ssid, + num_client=1, + graph_path="/home/lanforge/html-reports/dfs") + # pip install pdfkit + # sudo apt-get install wkhtmltopdf + # pdfkit.from_file('/home/lanforge/lanforge-scripts/py-scripts/demo.html', '/home/lanforge/lanforge-scripts/py-scripts/out.pdf') + + print("Test Finished") + + + + + + +if __name__ == '__main__': + run() + +