diff --git a/py-scripts/lf_dfs_netgear.py b/py-scripts/lf_dfs_netgear.py deleted file mode 100644 index a13b558b..00000000 --- a/py-scripts/lf_dfs_netgear.py +++ /dev/null @@ -1,695 +0,0 @@ -""" This script is used for DFS Testing on various radar types and user dependent channels after completion of the test it gives a nice pdf test report under folder html_reports/dfs/ - it uses various input arguments which can be seen by ./x.py --help - eg - sudo python3 dfs_11.py -i 192.168.200.141 -u root -p Netgear@123xzsawq@! -hst localhost -s TestAP22 -pwd [BLANK] -sec open -rad wiphy0 --name WAC505 --fcctypes FCCO --channel 60 - date - 11- feb - 2021 - --Nikita Yadav -""" - -import os -import signal -import paramiko -import time -import threading -import subprocess -from station_layer3 import STATION -import argparse -from threading import Thread -from itertools import islice -import datetime -from datetime import datetime - -import pexpect -import sys -from itertools import islice -from matplotlib import pyplot as plt -import numpy as np -from html_template import * -import logging - - -# import pdfkit - -class DFS_TEST: - def __init__(self, ip, user, pswd, host, ssid, passwd, security, radio): - self.ip = ip - self.user = user - self.pswd = pswd - self.host = host - self.ssid = ssid - self.passwd = passwd - self.security = security - self.radio = radio - - def set_channel_in_ap_at_(self, ip, user, pswd, channel): - self.ip = ip - self.user = user - self.pswd = pswd - channel = str(channel) - ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key - ssh.connect(ip, port=22, username=user, password=pswd) - command = "conf_set system:wlanSettings:wlanSettingTable:wlan1:channel " + channel - stdin, stdout, stderr = ssh.exec_command(str(command)) - output = stdout.readlines() - # print('\n'.join(output)) - time.sleep(10) - - def monitor_station_(self, host, ssid, passwd, security, radio, channel): - self.host = host - self.ssid = ssid - self.passwd = passwd - self.security = security - self.radio = radio - - obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) - - print(channel) - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = (var['interface']['channel']) - # print(var_1) - - if var_1 == channel: - print("channel at" + channel) - return var_1 - else: - print("wait untill channel assigned") - timeout = time.time() + 60 * 15 - - while var_1 != channel: - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = var['interface']['channel'] - time.sleep(1) - if time.time() > timeout: - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = var['interface']['channel'] - break - - return var_1 - - def create_station_cx(self, host, ssid, passwd, security, radio): - self.host = host - self.ssid = ssid - self.passwd = passwd - self.security = security - self.radio = radio - - cmd = "python3 station_layer3.py -hst localhost -s " + self.ssid + " -pwd " + self.passwd + " -sec " + self.security + " -rad " + self.radio - os.chdir('/home/lanforge/lanforge-scripts/py-scripts') - os.system(cmd) - - def monitor_untill_channel_assigned(self, host, ssid, passwd, security, radio): - self.host = host - self.ssid = ssid - self.passwd = passwd - self.security = security - self.radio = radio - - obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) - - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = (var['interface']['channel']) - - timeout = time.time() + 60 * 15 # 15min - while var_1 == "-1": - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = var['interface']['channel'] - time.sleep(1) - if time.time() > timeout: - break - - return var_1 - - def generate_radar_at_ch(self, width_, interval_, count_, frequency): - current_time = datetime.now() - print("Current date and time : ") - current_time = current_time.strftime("%b %d %H:%M:%S") - print("time stamp of radar send", current_time) - # print("width", width_) - - os.chdir('/usr/lib64/python2.7/site-packages/') - frequency = str(frequency) - width_ = str(width_) - interval_ = str(interval_) - count_ = str(count_) - command_hackRF = "sudo python lf_hackrf.py --pulse_width " + width_ + " --pulse_interval " + interval_ + " --pulse_count " + count_ + " --sweep_time 1000 --freq " + frequency - command_hackRF = str(command_hackRF) - ch = pexpect.spawn(command_hackRF) - time.sleep(5) - ch.expect('>>>') - ch.sendline('s') - ch.expect('>>>') - ch.sendline('q') - time.sleep(1) - - return current_time - - def client_connect_time(self, host, ssid, passwd, security, radio, channel): - self.host = host - self.ssid = ssid - self.passwd = passwd - self.security = security - self.radio = radio - obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = (var['interface']['channel']) - - timeout_ = time.time() + 60 * 2 # 2min - while var_1 != "-1" : - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = var['interface']['channel'] - time.sleep(1) - if time.time() > timeout_: - break - if var_1 == "-1" : - current_time = datetime.now() - print("Current date and time : ") - disconnect_time = current_time.strftime("%b %d %H:%M:%S") - print("station disconnection time", disconnect_time) - - timeout = time.time() + 60 * 15 # 15min - while var_1 == "-1": - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = var['interface']['channel'] - time.sleep(1) - if time.time() > timeout: - break - current_time1 = datetime.now() - print("Current date and time : ") - connect_time = current_time1.strftime("%b %d %H:%M:%S") - print("station connection time", connect_time) - - """var_2 = obj1.json_get("/cx/L3Teststa0000-0") - # print(type(var)) - var_1 = (var_2['L3Teststa0000-0']['state']) - timeout = time.time() + 60 * 2 # 15min - while var_1 != "Run": - var = obj1.json_get("/cx/L3Teststa0000-0") - # print(type(var)) - var_1 = (var['L3Teststa0000-0']['state']) - time.sleep(1) - if time.time() > timeout: - break - current_time_ = datetime.now() - print("Current date and time : ") - traffic_time1 = current_time_.strftime("%H:%M:%S") - print("time at which traffic started", traffic_time1)""" - s1 = disconnect_time - s2 = connect_time # for example - FMT = '%b %d %H:%M:%S' - c_time = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) - print("connection time ", c_time) - # print("detection time for radar", tdelta) - # detection_time_lst.append(tdelta) - - lst = str(c_time).split(":") - seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2]) - c_time = seconds - print("connection time ", c_time) - else: - disconnect_time = "00:00:00" - print("station disconnection time", disconnect_time) - connect_time = "00:00:00" - print("time at which traffic started", connect_time) - c_time = 0 - print("connection time ", c_time) - - - - - return c_time - - def channel_after_radar(self, host, ssid, passwd, security, radio): - self.host = host - self.ssid = ssid - self.passwd = passwd - self.security = security - self.radio = radio - obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) - var = obj1.json_get("/port/1/1/sta0000?fields=channel") - var_1 = (var['interface']['channel']) - return var_1 - - def monitor_cx(self, host, ssid, passwd, security, radio): - self.host = host - self.ssid = ssid - self.passwd = passwd - self.security = security - self.radio = radio - - obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) - var = obj1.json_get("/cx/L3Teststa0000-0") - # print(type(var)) - var_1 = (var['L3Teststa0000-0']['state']) - timeout = time.time() + 60 * 2 # 15min - while var_1 != "Run": - var = obj1.json_get("/cx/L3Teststa0000-0") - # print(type(var)) - var_1 = (var['L3Teststa0000-0']['state']) - time.sleep(1) - if time.time() > timeout: - break - # print(var_1) - return var_1 - - def monitor_untill_connection_time(self, host, ssid, passwd, security, radio): - self.host = host - self.ssid = ssid - self.passwd = passwd - self.security = security - self.radio = radio - - obj1 = STATION(lfclient_host=host, lfclient_port=8080, ssid=ssid, paswd=passwd, security=security, radio=radio) - var = obj1.json_get("/port/1/1/sta0000?fields=cx%20time%20(us)") - var_1 = (var['interface']['cx time (us)']) - return var_1 - - def check_Radar_time(self, ip, user, pswd): - self.ip = ip - self.user = user - self.pswd = pswd - - ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key - ssh.connect(ip, port=22, username=user, password=pswd) - stdin, stdout, stderr = ssh.exec_command('cat /tmp/log/messages | grep Radar') - output = stdout.readlines() - # print('\n'.join(output)) - time.sleep(1) - return output - - def monitor_channel_available_time(self, ip, user, pswd): - self.ip = ip - self.user = user - self.pswd = pswd - - ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key - ssh.connect(ip, port=22, username=user, password=pswd) - stdin, stdout, stderr = ssh.exec_command('iwlist channel ') - output = stdout.readlines() - # print('\n'.join(output)) - time.sleep(1) - return output - -def run(): - parser = argparse.ArgumentParser(description="Netgear AP DFS Test Script") - parser.add_argument('-i', '--ip', type=str, help='AP ip') - parser.add_argument('-u', '--user', type=str, help='credentials login/username') - parser.add_argument('-p', '--pswd', type=str, help='credential password') - parser.add_argument('-hst', '--host', type=str, help='host name') - parser.add_argument('-s', '--ssid', type=str, help='ssid for client') - parser.add_argument('-pwd', '--passwd', type=str, help='password to connect to ssid') - parser.add_argument('-sec', '--security', type=str, help='security') - parser.add_argument('-rad', '--radio', type=str, help='radio at which client will be connected') - parser.add_argument('-n', '--name', type=str, help='Type Name of AP on which test is performed') - parser.add_argument( '--fcctypes', nargs="+", default= ["FCC0", "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6"], help='types needed to be tested {FCC0/FCC1/FCC2/FCC3/FCC4/FCC5/ETSI1/ETSI2/ETSI3/ETSI4/ETSI5/ETSI6}') - parser.add_argument('--channel', '--channel', nargs="+", default=[52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 136, 140], help='channel options need to be tested {52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124 ,128, 132, 136, 140}') - parser.add_argument('--radar', type=str, default=True, help="To check only for radar types") - parser.add_argument('--cx', type=str, default=True, help="Test run and check only till client connection time") - parser.add_argument('--traffic', type=str, default=True, help="Test run till traffic and check the traffic time") - - '''#add(connect_client- true or false) - #add(run-traffic - t/false) - #check_only radar''' - - logging.basicConfig(filename= '/home/lanforge/html-reports/dfs/dfs.log', filemode='w', format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S') - logging.warning('Test Started') - - test_time = datetime.now() - test_time = test_time.strftime("%b %d %H:%M:%S") - print("Test started at ", test_time ) - cmd = "Test started at " + str(test_time) - logging.warning(str(cmd)) - args = parser.parse_args() - - if (args.name is not None): - AP_name = args.name - - if (args.ssid is not None): - ssid = args.ssid - #time.sleep(1800) - - - dfs = DFS_TEST(args.ip, args.user, args.pswd, args.host, args.ssid, args.passwd, args.security, args.radio) - - # To create a stations - logging.warning("creating station and generating traffic") - dfs.create_station_cx(args.host, args.ssid, args.passwd, args.security, args.radio) - - # "FCC0", "FCC1", "FCC2", "FCC3", "FCC4", "FCC5", "ETSI1", "ETSI2", "ETSI3", "ETSI4", "ETSI5", "ETSI6" - fcc_types = args.fcctypes - result_data = dict.fromkeys(fcc_types) - # set channel - width_ = "" - interval_ = "" - count_ = "" - """types = {"FCC0": {"width_": "1", "interval_": "1428", "count_": "18"}, - "FCC1": {"width_": "1", "interval_": "1163", "count_": "18"}, - "FCC2": {"width_": "2", "interval_": "208", "count_": "28"}, - "FCC3": {"width_": "7", "interval_": "365", "count_": "16"}, - "FCC4": {"width_": "16", "interval_": "271", "count_": "12"}, - "FCC5": {"width_": "70", "interval_": "1975", "count_": "3"}, - "ETSI1": {"width_": "5", "interval_": "342", "count_": "10"}, - "ETSI2": {"width_": "2", "interval_": "1271", "count_": "15"}, - "ETSI3": {"width_": "15", "interval_": "3280", "count_": "25"}, - "ETSI4": {"width_": "24", "interval_": "2477", "count_": "20"}, - "ETSI5": {"width_": "1", "interval_": "356", "count_": "10"}, - "ETSI6": {"width_": "2", "interval_": "1091", "count_": "15"}, }""" - for fcc in fcc_types: - - if fcc == "FCCO": - width = "1" - interval = "1428" - count = "18" - width_ = width - print("width", width_) - interval_ = interval - count_ = count - elif fcc == "FCC1": - width = "1" - interval = "1163" - count = "18" - width_ = width - interval_ = interval - count_ = count - elif fcc == "FCC2": - width = "2" - interval = "208" - count = "28" - width_ = width - interval_ = interval - count_ = count - elif fcc == "FCC3": - width = "7" - interval = "365" - count = "16" - width_ = width - interval_ = interval - count_ = count - elif fcc == "FCC4": - width = "16" - interval = "271" - count = "12" - width_ = width - interval_ = interval - count_ = count - elif fcc == "FCC5": - width = "70" - interval = "1975" - count = "3" - width_ = width - interval_ = interval - count_ = count - elif fcc == "ETSI1": - width = "5" - interval = "342" - count = "10" - width_ = width - interval_ = interval - count_ = count - elif fcc == "ETSI2": - width = "2" - interval = "1271" - count = "15" - width_ = width - interval_ = interval - count_ = count - elif fcc == "ETSI3": - width = "15" - interval = "3280" - count = "25" - width_ = width - interval_ = interval - count_ = count - elif fcc == "ETSI4": - width = "24" - interval = "2477" - count = "20" - width_ = width - interval_ = interval - count_ = count - elif fcc == "ETSI5": - width = "1" - interval = "356" - count = "10" - width_ = width - interval_ = interval - count_ = count - elif fcc == "ETSI6": - width = "2" - interval = "1091" - count = "15" - width_ = width - interval_ = interval - count_ = count - - - switched_ch_lst = [] - detection_time_lst = [] - connection_time_lst = [] - radar_lst = [] - - channel_list = ["52", "56", "60", "64", "100", "104", "108", "112", "116", "120", "124" ,"128", "132", "136", "140"] - channel_values = args.channel - - - #print(channel_values) - - result_data[fcc] = dict.fromkeys(["switched_ch_lst", "detection_time_lst", "radar_lst", "connection_time_lst"]) - - result_data[fcc]["switched_ch_lst"] = [] - result_data[fcc]["detection_time_lst"] = [] - result_data[fcc]["radar_lst"] = [] - result_data[fcc]["connection_time_lst"] = [] - - #print("under", result_data) - - - for channel_s in channel_values: - - print("set channel to ", channel_s) - x = "set channel to " + str(channel_s) - logging.warning(str(x)) - - dfs.set_channel_in_ap_at_(args.ip, args.user, args.pswd, channel_s) - - print("channel set checking....") - logging.warning('channel set checking....') - - monitoring_station = dfs.monitor_station_(args.host, args.ssid, args.passwd, args.security, args.radio, - channel=str(channel_s)) - monitoring_station = str(monitoring_station) - print(("channel set to ", monitoring_station)) - y = "channel set to " + str(monitoring_station) - logging.warning(str(y)) - - channel_s = str(channel_s) - - if monitoring_station == channel_s: - print("station allocated to " + monitoring_station) - z = "station allocated to " + str(monitoring_station) - logging.warning(str(z)) - - print("now generate radar on " + channel_s) - q = "now generate radar on " + str(channel_s) - logging.warning(str(q)) - - frequency = {"52": "5260000", "56": "5280000", "60": "5300000", "64": "5320000", "100": "5500000", - "104": "5520000", "108": "5540000", "112": "5560000", "116": "5580000", "120": "5600000", - "124": "5620000", - "128": "5640000", "132": "5660000", "136": "5680000", "140": "5700000"} - - # now radar generate - generating_radar = dfs.generate_radar_at_ch(width_, interval_, count_, frequency[channel_s]) - - client_connection_time = dfs.client_connect_time(args.host, args.ssid, args.passwd, args.security, - args.radio, channel_s) - print("client connection time ", client_connection_time) - w = "client connection time " + str(client_connection_time) - logging.warning(str(w)) - connection_time_lst.append(client_connection_time) - - time.sleep(2) - logging.warning('checking AP logs') - ap_logs = dfs.check_Radar_time(args.ip, args.user, args.pswd) - - # print(type(ap_logs)) - N = 2 - res = list(islice(reversed(ap_logs), 0, N)) - res.reverse() - if (str(res).__contains__("Radar detected on channel " + channel_s)): - data = "YES" - radar_lst.append(data) - print("yes") - logging.warning("YES") - if data == "YES": - - print("checking channel assigned...") - - - detecting_radar_channel = dfs.channel_after_radar(args.host, args.ssid, args.passwd, - args.security, - args.radio) - # print("after radar channel is at ", detecting_radar_channel) - - if detecting_radar_channel == "-1" or detecting_radar_channel == "AUTO" or detecting_radar_channel == channel_s: - print("TEST Fail") - logging.warning("Test FAIL") - print("client is at AUTO Channel") - logging.warning("client is at AUTO Channel") - pass_fail = "FAIL" - ch_af_radar = "AUTO" - print("after radar channel is at ", ch_af_radar) - x = "after radar channel is at " + str(ch_af_radar) - logging.warning(str(x)) - switched_ch_lst.append(ch_af_radar) - - print(pass_fail) - continue - else: - print("Test Pass") - logging.warning("TEST PASS ") - pass_fail = "PASS" - - ch_af_radar = detecting_radar_channel - print("after radar channel is at ", ch_af_radar) - Y = "after radar channel is at " + str(ch_af_radar) - logging.warning(str(Y)) - switched_ch_lst.append(ch_af_radar) - - logging.warning("Checking AP logs") - ap_logs_check = dfs.check_Radar_time(args.ip, args.user, args.pswd) - - #print(type(ap_logs_check)) - N = 2 - res = list(islice(reversed(ap_logs_check), 0, N)) - res.reverse() - for i in res: - if (str(res).__contains__("Radar detected on channel " + channel_s)): - print("YES") - x = res.index(i) - print("raadar time at index", x) - r = "raadar time at index" + str(x) - logging.warning(str(r)) - r_time = " " - for i in res[x][4:19]: - r_time = r_time + i - r_time = r_time.strip() - print("radar detected time", r_time) - r_ = "radar detected time" + str(r_time) - logging.warning(str(r_)) - - # calculation - s1 = generating_radar - s2 = r_time # for example - FMT = '%b %d %H:%M:%S' - tdelta = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) - # print("detection time for radar", tdelta) - # detection_time_lst.append(tdelta) - - lst = str(tdelta).split(":") - seconds = int(lst[0]) * 3600 + int(lst[1]) * 60 + int(lst[2]) - d_time = seconds - print("detection time ", d_time) - detection_time_lst.append(d_time) - t = "detection time " + str(d_time) - logging.warning(str(t)) - - else: - data = "NO" - pass_fail = "FAIL" - radar_lst.append(data) - d_time = "0" - print("detection time ", d_time) - x = "detection time " + str(d_time) - logging.warning(str(x)) - detection_time_lst.append(0) - switched_ch_lst.append(" - ") - print("no") - # radar detected(yes/no) - else: - #monitoring_station = str(monitoring_station) - radar_lst.append("NO") - switched_ch_lst.append("X") - detection_time_lst.append(0) - connection_time_lst.append(0) - #print(("channel set to ", monitoring_station)) - - - print("switched channel list ", switched_ch_lst) - s = "switched channel list " + str(switched_ch_lst) - logging.warning(str(x)) - print("radar detection list", radar_lst) - r = "radar detection list" + str(radar_lst) - logging.warning(str(r)) - print("detection time list", detection_time_lst) - d = "detection time list" + str(detection_time_lst) - logging.warning(str(d)) - print("connection time list", connection_time_lst) - c = "connection time list" + str(connection_time_lst) - logging.warning(str(c)) - - result_data[fcc]["switched_ch_lst"] = switched_ch_lst - result_data[fcc]["radar_lst"] = radar_lst - result_data[fcc]["detection_time_lst"] = detection_time_lst - result_data[fcc]["connection_time_lst"] = connection_time_lst - print("result ", result_data) - res = "result " + str(result_data) - logging.warning(str(res)) - #time.sleep(1800) - - - print("final result", result_data) - res = "final result " + str(result_data) - logging.warning(str(res)) - print("Test Finished") - test_end = datetime.now() - test_end = test_end.strftime("%b %d %H:%M:%S") - print("Test ended at ", test_end) - - s1 = test_time - s2 = test_end # for example - FMT = '%b %d %H:%M:%S' - test_duration = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) - print("total test duration ", test_duration) - - # # Try changing values for the same - date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0] - test_setup_info = { - "AP Name": args.name, - "SSID": args.ssid, - "Number of Stations": "1", - "Test Duration": test_duration - } - - input_setup_info = { - "IP" : args.ip, - "user" : args.user, - "Radartypes" : args.fcctypes, - "Channel" : args.channel, - "Contact" : "support@candelatech.com" - } - - generate_report(result_data, - date, - test_setup_info, - input_setup_info, - test_channel = channel_values, - - graph_path="/home/lanforge/html-reports/dfs") - - - graph_path = "/home/lanforge/html-reports/dfs" - - - - - - - - - -if __name__ == '__main__': - run() - -