From f8ca760d90cb2048836a07073eab91e11009c248 Mon Sep 17 00:00:00 2001 From: Nikita Yadav Date: Thu, 11 Feb 2021 12:52:24 +0530 Subject: [PATCH] final DFS script netgear --- py-scripts/lf_dfs_netgear.py | 695 +++++++++++++++++++++++++++++++++++ 1 file changed, 695 insertions(+) create mode 100644 py-scripts/lf_dfs_netgear.py diff --git a/py-scripts/lf_dfs_netgear.py b/py-scripts/lf_dfs_netgear.py new file mode 100644 index 00000000..a13b558b --- /dev/null +++ b/py-scripts/lf_dfs_netgear.py @@ -0,0 +1,695 @@ +""" This script is used for DFS Testing on various radar types and user dependent channels after completion of the test it gives a nice pdf test report under folder html_reports/dfs/ + it uses various input arguments which can be seen by ./x.py --help + eg - sudo python3 dfs_11.py -i 192.168.200.141 -u root -p Netgear@123xzsawq@! -hst localhost -s TestAP22 -pwd [BLANK] -sec open -rad wiphy0 --name WAC505 --fcctypes FCCO --channel 60 + date - 11- feb - 2021 + --Nikita Yadav +""" + +import os +import signal +import paramiko +import time +import threading +import subprocess +from station_layer3 import STATION +import argparse +from threading import Thread +from itertools import islice +import datetime +from datetime import datetime + +import pexpect +import sys +from itertools import islice +from matplotlib import pyplot as plt +import numpy as np +from html_template import * +import logging + + +# import pdfkit + +class DFS_TEST: + def __init__(self, ip, user, pswd, host, ssid, passwd, security, radio): + self.ip = ip + self.user = user + self.pswd = pswd + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + def set_channel_in_ap_at_(self, ip, user, pswd, channel): + self.ip = ip + self.user = user + self.pswd = pswd + channel = str(channel) + ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + ssh.connect(ip, port=22, username=user, password=pswd) + command = "conf_set system:wlanSettings:wlanSettingTable:wlan1:channel " + channel + stdin, stdout, stderr = ssh.exec_command(str(command)) + output = stdout.readlines() + # print('\n'.join(output)) + time.sleep(10) + + def monitor_station_(self, host, ssid, passwd, security, radio, channel): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + + print(channel) + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = (var['interface']['channel']) + # print(var_1) + + if var_1 == channel: + print("channel at" + channel) + return var_1 + else: + print("wait untill channel assigned") + timeout = time.time() + 60 * 15 + + while var_1 != channel: + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + time.sleep(1) + if time.time() > timeout: + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + break + + return var_1 + + def create_station_cx(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + cmd = "python3 station_layer3.py -hst localhost -s " + self.ssid + " -pwd " + self.passwd + " -sec " + self.security + " -rad " + self.radio + os.chdir('/home/lanforge/lanforge-scripts/py-scripts') + os.system(cmd) + + def monitor_untill_channel_assigned(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = (var['interface']['channel']) + + timeout = time.time() + 60 * 15 # 15min + while var_1 == "-1": + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + time.sleep(1) + if time.time() > timeout: + break + + return var_1 + + def generate_radar_at_ch(self, width_, interval_, count_, frequency): + current_time = datetime.now() + print("Current date and time : ") + current_time = current_time.strftime("%b %d %H:%M:%S") + print("time stamp of radar send", current_time) + # print("width", width_) + + os.chdir('/usr/lib64/python2.7/site-packages/') + frequency = str(frequency) + width_ = str(width_) + interval_ = str(interval_) + count_ = str(count_) + command_hackRF = "sudo python lf_hackrf.py --pulse_width " + width_ + " --pulse_interval " + interval_ + " --pulse_count " + count_ + " --sweep_time 1000 --freq " + frequency + command_hackRF = str(command_hackRF) + ch = pexpect.spawn(command_hackRF) + time.sleep(5) + ch.expect('>>>') + ch.sendline('s') + ch.expect('>>>') + ch.sendline('q') + time.sleep(1) + + return current_time + + def client_connect_time(self, host, ssid, passwd, security, radio, channel): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = (var['interface']['channel']) + + timeout_ = time.time() + 60 * 2 # 2min + while var_1 != "-1" : + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + time.sleep(1) + if time.time() > timeout_: + break + if var_1 == "-1" : + current_time = datetime.now() + print("Current date and time : ") + disconnect_time = current_time.strftime("%b %d %H:%M:%S") + print("station disconnection time", disconnect_time) + + timeout = time.time() + 60 * 15 # 15min + while var_1 == "-1": + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = var['interface']['channel'] + time.sleep(1) + if time.time() > timeout: + break + current_time1 = datetime.now() + print("Current date and time : ") + connect_time = current_time1.strftime("%b %d %H:%M:%S") + print("station connection time", connect_time) + + """var_2 = obj1.json_get("/cx/L3Teststa0000-0") + # print(type(var)) + var_1 = (var_2['L3Teststa0000-0']['state']) + timeout = time.time() + 60 * 2 # 15min + while var_1 != "Run": + var = obj1.json_get("/cx/L3Teststa0000-0") + # print(type(var)) + var_1 = (var['L3Teststa0000-0']['state']) + time.sleep(1) + if time.time() > timeout: + break + current_time_ = datetime.now() + print("Current date and time : ") + traffic_time1 = current_time_.strftime("%H:%M:%S") + print("time at which traffic started", traffic_time1)""" + s1 = disconnect_time + s2 = connect_time # for example + FMT = '%b %d %H:%M:%S' + c_time = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) + print("connection time ", c_time) + # print("detection time for radar", tdelta) + # detection_time_lst.append(tdelta) + + lst = str(c_time).split(":") + seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2]) + c_time = seconds + print("connection time ", c_time) + else: + disconnect_time = "00:00:00" + print("station disconnection time", disconnect_time) + connect_time = "00:00:00" + print("time at which traffic started", connect_time) + c_time = 0 + print("connection time ", c_time) + + + + + return c_time + + def channel_after_radar(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + var = obj1.json_get("/port/1/1/sta0000?fields=channel") + var_1 = (var['interface']['channel']) + return var_1 + + def monitor_cx(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + var = obj1.json_get("/cx/L3Teststa0000-0") + # print(type(var)) + var_1 = (var['L3Teststa0000-0']['state']) + timeout = time.time() + 60 * 2 # 15min + while var_1 != "Run": + var = obj1.json_get("/cx/L3Teststa0000-0") + # print(type(var)) + var_1 = (var['L3Teststa0000-0']['state']) + time.sleep(1) + if time.time() > timeout: + break + # print(var_1) + return var_1 + + def monitor_untill_connection_time(self, host, ssid, passwd, security, radio): + self.host = host + self.ssid = ssid + self.passwd = passwd + self.security = security + self.radio = radio + + obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) + var = obj1.json_get("/port/1/1/sta0000?fields=cx%20time%20(us)") + var_1 = (var['interface']['cx time (us)']) + return var_1 + + def check_Radar_time(self, ip, user, pswd): + self.ip = ip + self.user = user + self.pswd = pswd + + ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + ssh.connect(ip, port=22, username=user, password=pswd) + stdin, stdout, stderr = ssh.exec_command('cat /tmp/log/messages | grep Radar') + output = stdout.readlines() + # print('\n'.join(output)) + time.sleep(1) + return output + + def monitor_channel_available_time(self, ip, user, pswd): + self.ip = ip + self.user = user + self.pswd = pswd + + ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + ssh.connect(ip, port=22, username=user, password=pswd) + stdin, stdout, stderr = ssh.exec_command('iwlist channel ') + output = stdout.readlines() + # print('\n'.join(output)) + time.sleep(1) + return output + +def run(): + parser = argparse.ArgumentParser(description="Netgear AP DFS Test Script") + parser.add_argument('-i', '--ip', type=str, help='AP ip') + parser.add_argument('-u', '--user', type=str, help='credentials login/username') + parser.add_argument('-p', '--pswd', type=str, help='credential password') + parser.add_argument('-hst', '--host', type=str, help='host name') + parser.add_argument('-s', '--ssid', type=str, help='ssid for client') + parser.add_argument('-pwd', '--passwd', type=str, help='password to connect to ssid') + parser.add_argument('-sec', '--security', type=str, help='security') + parser.add_argument('-rad', '--radio', type=str, help='radio at which client will be connected') + parser.add_argument('-n', '--name', type=str, help='Type Name of AP on which test is performed') + parser.add_argument( '--fcctypes', nargs="+", default= ["FCC0", "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6"], help='types needed to be tested {FCC0/FCC1/FCC2/FCC3/FCC4/FCC5/ETSI1/ETSI2/ETSI3/ETSI4/ETSI5/ETSI6}') + parser.add_argument('--channel', '--channel', nargs="+", default=[52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 136, 140], help='channel options need to be tested {52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124 ,128, 132, 136, 140}') + parser.add_argument('--radar', type=str, default=True, help="To check only for radar types") + parser.add_argument('--cx', type=str, default=True, help="Test run and check only till client connection time") + parser.add_argument('--traffic', type=str, default=True, help="Test run till traffic and check the traffic time") + + '''#add(connect_client- true or false) + #add(run-traffic - t/false) + #check_only radar''' + + logging.basicConfig(filename= '/home/lanforge/html-reports/dfs/dfs.log', filemode='w', format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S') + logging.warning('Test Started') + + test_time = datetime.now() + test_time = test_time.strftime("%b %d %H:%M:%S") + print("Test started at ", test_time ) + cmd = "Test started at " + str(test_time) + logging.warning(str(cmd)) + args = parser.parse_args() + + if (args.name is not None): + AP_name = args.name + + if (args.ssid is not None): + ssid = args.ssid + #time.sleep(1800) + + + dfs = DFS_TEST(args.ip, args.user, args.pswd, args.host, args.ssid, args.passwd, args.security, args.radio) + + # To create a stations + logging.warning("creating station and generating traffic") + dfs.create_station_cx(args.host, args.ssid, args.passwd, args.security, args.radio) + + # "FCC0", "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6" + fcc_types = args.fcctypes + result_data = dict.fromkeys(fcc_types) + # set channel + width_ = "" + interval_ = "" + count_ = "" + """types = {"FCC0": {"width_": "1", "interval_": "1428", "count_": "18"}, + "FCC1": {"width_": "1", "interval_": "1163", "count_": "18"}, + "FCC2": {"width_": "2", "interval_": "208", "count_": "28"}, + "FCC3": {"width_": "7", "interval_": "365", "count_": "16"}, + "FCC4": {"width_": "16", "interval_": "271", "count_": "12"}, + "FCC5": {"width_": "70", "interval_": "1975", "count_": "3"}, + "ETSI1": {"width_": "5", "interval_": "342", "count_": "10"}, + "ETSI2": {"width_": "2", "interval_": "1271", "count_": "15"}, + "ETSI3": {"width_": "15", "interval_": "3280", "count_": "25"}, + "ETSI4": {"width_": "24", "interval_": "2477", "count_": "20"}, + "ETSI5": {"width_": "1", "interval_": "356", "count_": "10"}, + "ETSI6": {"width_": "2", "interval_": "1091", "count_": "15"}, }""" + for fcc in fcc_types: + + if fcc == "FCCO": + width = "1" + interval = "1428" + count = "18" + width_ = width + print("width", width_) + interval_ = interval + count_ = count + elif fcc == "FCC1": + width = "1" + interval = "1163" + count = "18" + width_ = width + interval_ = interval + count_ = count + elif fcc == "FCC2": + width = "2" + interval = "208" + count = "28" + width_ = width + interval_ = interval + count_ = count + elif fcc == "FCC3": + width = "7" + interval = "365" + count = "16" + width_ = width + interval_ = interval + count_ = count + elif fcc == "FCC4": + width = "16" + interval = "271" + count = "12" + width_ = width + interval_ = interval + count_ = count + elif fcc == "FCC5": + width = "70" + interval = "1975" + count = "3" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI1": + width = "5" + interval = "342" + count = "10" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI2": + width = "2" + interval = "1271" + count = "15" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI3": + width = "15" + interval = "3280" + count = "25" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI4": + width = "24" + interval = "2477" + count = "20" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI5": + width = "1" + interval = "356" + count = "10" + width_ = width + interval_ = interval + count_ = count + elif fcc == "ETSI6": + width = "2" + interval = "1091" + count = "15" + width_ = width + interval_ = interval + count_ = count + + + switched_ch_lst = [] + detection_time_lst = [] + connection_time_lst = [] + radar_lst = [] + + channel_list = ["52", "56", "60", "64", "100", "104", "108", "112", "116", "120", "124" ,"128", "132", "136", "140"] + channel_values = args.channel + + + #print(channel_values) + + result_data[fcc] = dict.fromkeys(["switched_ch_lst", "detection_time_lst", "radar_lst", "connection_time_lst"]) + + result_data[fcc]["switched_ch_lst"] = [] + result_data[fcc]["detection_time_lst"] = [] + result_data[fcc]["radar_lst"] = [] + result_data[fcc]["connection_time_lst"] = [] + + #print("under", result_data) + + + for channel_s in channel_values: + + print("set channel to ", channel_s) + x = "set channel to " + str(channel_s) + logging.warning(str(x)) + + dfs.set_channel_in_ap_at_(args.ip, args.user, args.pswd, channel_s) + + print("channel set checking....") + logging.warning('channel set checking....') + + monitoring_station = dfs.monitor_station_(args.host, args.ssid, args.passwd, args.security, args.radio, + channel=str(channel_s)) + monitoring_station = str(monitoring_station) + print(("channel set to ", monitoring_station)) + y = "channel set to " + str(monitoring_station) + logging.warning(str(y)) + + channel_s = str(channel_s) + + if monitoring_station == channel_s: + print("station allocated to " + monitoring_station) + z = "station allocated to " + str(monitoring_station) + logging.warning(str(z)) + + print("now generate radar on " + channel_s) + q = "now generate radar on " + str(channel_s) + logging.warning(str(q)) + + frequency = {"52": "5260000", "56": "5280000", "60": "5300000", "64": "5320000", "100": "5500000", + "104": "5520000", "108": "5540000", "112": "5560000", "116": "5580000", "120": "5600000", + "124": "5620000", + "128": "5640000", "132": "5660000", "136": "5680000", "140": "5700000"} + + # now radar generate + generating_radar = dfs.generate_radar_at_ch(width_, interval_, count_, frequency[channel_s]) + + client_connection_time = dfs.client_connect_time(args.host, args.ssid, args.passwd, args.security, + args.radio, channel_s) + print("client connection time ", client_connection_time) + w = "client connection time " + str(client_connection_time) + logging.warning(str(w)) + connection_time_lst.append(client_connection_time) + + time.sleep(2) + logging.warning('checking AP logs') + ap_logs = dfs.check_Radar_time(args.ip, args.user, args.pswd) + + # print(type(ap_logs)) + N = 2 + res = list(islice(reversed(ap_logs), 0, N)) + res.reverse() + if (str(res).__contains__("Radar detected on channel " + channel_s)): + data = "YES" + radar_lst.append(data) + print("yes") + logging.warning("YES") + if data == "YES": + + print("checking channel assigned...") + + + detecting_radar_channel = dfs.channel_after_radar(args.host, args.ssid, args.passwd, + args.security, + args.radio) + # print("after radar channel is at ", detecting_radar_channel) + + if detecting_radar_channel == "-1" or detecting_radar_channel == "AUTO" or detecting_radar_channel == channel_s: + print("TEST Fail") + logging.warning("Test FAIL") + print("client is at AUTO Channel") + logging.warning("client is at AUTO Channel") + pass_fail = "FAIL" + ch_af_radar = "AUTO" + print("after radar channel is at ", ch_af_radar) + x = "after radar channel is at " + str(ch_af_radar) + logging.warning(str(x)) + switched_ch_lst.append(ch_af_radar) + + print(pass_fail) + continue + else: + print("Test Pass") + logging.warning("TEST PASS ") + pass_fail = "PASS" + + ch_af_radar = detecting_radar_channel + print("after radar channel is at ", ch_af_radar) + Y = "after radar channel is at " + str(ch_af_radar) + logging.warning(str(Y)) + switched_ch_lst.append(ch_af_radar) + + logging.warning("Checking AP logs") + ap_logs_check = dfs.check_Radar_time(args.ip, args.user, args.pswd) + + #print(type(ap_logs_check)) + N = 2 + res = list(islice(reversed(ap_logs_check), 0, N)) + res.reverse() + for i in res: + if (str(res).__contains__("Radar detected on channel " + channel_s)): + print("YES") + x = res.index(i) + print("raadar time at index", x) + r = "raadar time at index" + str(x) + logging.warning(str(r)) + r_time = " " + for i in res[x][4:19]: + r_time = r_time + i + r_time = r_time.strip() + print("radar detected time", r_time) + r_ = "radar detected time" + str(r_time) + logging.warning(str(r_)) + + # calculation + s1 = generating_radar + s2 = r_time # for example + FMT = '%b %d %H:%M:%S' + tdelta = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) + # print("detection time for radar", tdelta) + # detection_time_lst.append(tdelta) + + lst = str(tdelta).split(":") + seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2]) + d_time = seconds + print("detection time ", d_time) + detection_time_lst.append(d_time) + t = "detection time " + str(d_time) + logging.warning(str(t)) + + else: + data = "NO" + pass_fail = "FAIL" + radar_lst.append(data) + d_time = "0" + print("detection time ", d_time) + x = "detection time " + str(d_time) + logging.warning(str(x)) + detection_time_lst.append(0) + switched_ch_lst.append(" - ") + print("no") + # radar detected(yes/no) + else: + #monitoring_station = str(monitoring_station) + radar_lst.append("NO") + switched_ch_lst.append("X") + detection_time_lst.append(0) + connection_time_lst.append(0) + #print(("channel set to ", monitoring_station)) + + + print("switched channel list ", switched_ch_lst) + s = "switched channel list " + str(switched_ch_lst) + logging.warning(str(x)) + print("radar detection list", radar_lst) + r = "radar detection list" + str(radar_lst) + logging.warning(str(r)) + print("detection time list", detection_time_lst) + d = "detection time list" + str(detection_time_lst) + logging.warning(str(d)) + print("connection time list", connection_time_lst) + c = "connection time list" + str(connection_time_lst) + logging.warning(str(c)) + + result_data[fcc]["switched_ch_lst"] = switched_ch_lst + result_data[fcc]["radar_lst"] = radar_lst + result_data[fcc]["detection_time_lst"] = detection_time_lst + result_data[fcc]["connection_time_lst"] = connection_time_lst + print("result ", result_data) + res = "result " + str(result_data) + logging.warning(str(res)) + #time.sleep(1800) + + + print("final result", result_data) + res = "final result " + str(result_data) + logging.warning(str(res)) + print("Test Finished") + test_end = datetime.now() + test_end = test_end.strftime("%b %d %H:%M:%S") + print("Test ended at ", test_end) + + s1 = test_time + s2 = test_end # for example + FMT = '%b %d %H:%M:%S' + test_duration = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) + print("total test duration ", test_duration) + + # # Try changing values for the same + date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0] + test_setup_info = { + "AP Name": args.name, + "SSID": args.ssid, + "Number of Stations": "1", + "Test Duration": test_duration + } + + input_setup_info = { + "IP" : args.ip, + "user" : args.user, + "Radartypes" : args.fcctypes, + "Channel" : args.channel, + "Contact" : "support@candelatech.com" + } + + generate_report(result_data, + date, + test_setup_info, + input_setup_info, + test_channel = channel_values, + + graph_path="/home/lanforge/html-reports/dfs") + + + graph_path = "/home/lanforge/html-reports/dfs" + + + + + + + + + +if __name__ == '__main__': + run() + +