#!/usr/bin/env python3 import os import sys import argparse import time import logging import datetime import importlib # from itertools import combinations # to generate pair combinations for attenuators logger = logging.getLogger(__name__) if sys.version_info[0] != 3: logger.critical("This script requires Python 3") exit(1) sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) realm = importlib.import_module("py-json.realm") LFUtils = importlib.import_module("py-json.LANforge.LFUtils") sta_connect = importlib.import_module("py-scripts.sta_connect2") Realm = realm.Realm LFReport = importlib.import_module("py-scripts.lf_report") lf_report = LFReport.lf_report LFGraph = importlib.import_module("py-scripts.lf_graph") lf_bar_graph_horizontal = LFGraph.lf_bar_graph_horizontal lf_logger_config = importlib.import_module("py-scripts.lf_logger_config") class Roam(Realm): def __init__(self, lanforge_ip='localhost', port=8080, sniff_radio='1.1.wiphy0', station_radio='1.1.wiphy0', band='5G', # ap1_bssid=None, # ap2_bssid=None, # attenuator1=None, # attenuator2=None, attenuators=[], step=100, max_attenuation=950, upstream='1.1.eth1', ssid=None, security=None, password=None, num_sta=None, station_flag=None, option=None, eap_method='TLS', eap_identity='testuser', eap_password='testpasswd', pairwise_cipher='NA', groupwise_cipher='NA', private_key='/home/lanforge/client.p12', pk_passwd='lanforge', ca_cert='/home/lanforge/ca.pem', identity=None, ttls_pass=None, sta_type=None, iteration_based=True, duration=None, wait_time=30, sniff_duration=300, channel='AUTO', frequency=-1, iterations=None, softroam=False, ieee80211w=1, real_devices=True ): super().__init__(lanforge_ip, port) self.lanforge_ip = lanforge_ip self.port = port self.upstream = upstream # self.ap1_bssid = ap1_bssid # self.ap2_bssid = ap2_bssid # self.attenuator1 = attenuator1 # self.attenuator2 = attenuator2 self.attenuators = attenuators self.step = step self.max_attenuation = max_attenuation self.ssid = ssid self.security = security self.password = password self.num_sta = num_sta self.station_flag = station_flag self.option = option self.identity = identity self.ttls_pass = ttls_pass self.sta_type = sta_type self.iteration_based = iteration_based self.duration = duration self.wait_time = wait_time self.channel = channel self.frequency = frequency self.iterations = iterations self.soft_roam = softroam self.real_devices = real_devices self.sniff_radio = sniff_radio self.sniff_duration = sniff_duration self.station_radio = station_radio self.band = band self.ca_cert = ca_cert self.private_key = private_key self.pk_passwd = pk_passwd self.eap_method = eap_method self.eap_identity = eap_identity self.eap_password = eap_password self.ieee80211w = False self.pairwise_cipher = pairwise_cipher self.groupwise_cipher = groupwise_cipher # reporting variable self.roam_data = {} self.bssid_based_totals = {} if self.soft_roam: if len(self.attenuators) == 1: logging.error( 'Cannot perform roaming with only one attenuator. Please provide atleast two attenuators.') exit(1) # self.attenuator_combinations = list(combinations(self.attenuators, 2)) # generating 2 pair combinations # for the given attenuators self.attenuator_combinations = [] attenuators = self.attenuators + [self.attenuators[0]] for atten_index in range(len(attenuators) - 1): self.attenuator_combinations.append( (attenuators[atten_index], attenuators[atten_index + 1])) logging.info('Test will be performed on the APs with the following attenuator combinations {}'.format( self.attenuator_combinations)) all_attenuators = self.atten_list() if all_attenuators is None or all_attenuators == []: logging.error('There are no attenuators in the given LANforge {}. Exiting the test.'.format( self.lanforge_ip)) exit(1) else: for atten_serial in all_attenuators: atten_serial_name, atten_values = list(atten_serial.keys())[ 0], list(atten_serial.values())[0] if atten_serial_name not in self.attenuators: if atten_values['state'] != 'Phantom': logging.info('Attenuator {} is not in the test attenuators list. Setting the attenuation ' 'value to max.'.format( atten_serial_name)) self.set_atten(atten_serial_name, self.max_attenuation) self.attenuator_increments = list( range(0, self.max_attenuation + 1, self.step)) if self.max_attenuation not in self.attenuator_increments: self.attenuator_increments.append(self.max_attenuation) self.attenuator_decrements = list( range(self.max_attenuation, -1, -self.step)) if 0 not in self.attenuator_decrements: self.attenuator_decrements.append(0) # self.sniff_radio_resource, self.sniff_radio_shelf, self.sniff_radio_port, _ = self.name_to_eid( # self.sniff_radio) # # self.monitor = self.new_wifi_monitor_profile( # resource_=self.sniff_radio_resource, up_=False) # self.create_monitor() self.staConnect = sta_connect.StaConnect2(host=self.lanforge_ip, port=self.port, outfile="sta_connect2.csv") self.cx_profile = self.new_l3_cx_profile() self.cx_profile.host = self.lanforge_ip self.cx_profile.port = self.port self.cx_profile.name_prefix = 'ROAM-' self.cx_profile.side_a_min_bps = '1000000' self.cx_profile.side_a_max_bps = '1000000' self.cx_profile.side_b_min_bps = '1000000' self.cx_profile.side_b_max_bps = '1000000' def create_cx(self): self.cx_profile.create(endp_type='lf_udp', side_a=self.station_list, side_b=self.upstream) def start_cx(self): self.cx_profile.start_cx() def stop_cx(self): for cx_name in self.cx_profile.created_cx.keys(): logging.info(cx_name) self.stop_cx(cx_name) def set_attenuators(self, atten1, atten2): logging.info('Setting attenuation to {} for attenuator {}'.format( 0, atten1)) self.set_atten(atten1, 0) logging.info( 'Setting active attenuator as {}'.format(atten1)) self.active_attenuator = atten1 logging.info( 'Setting passive attenuator as {}'.format(atten2)) self.passive_attenuator = atten2 logging.info('Setting attenuation to {} for attenuator {}'.format( self.max_attenuation, atten2)) self.set_atten(atten2, self.max_attenuation) for atten in self.attenuators: if atten not in [atten1, atten2]: logging.info( 'Setting unused attenuator {} value to maximum attenuation.'.format(atten)) self.set_atten(atten, self.max_attenuation) def get_port_data(self, station, field): shelf, resource, port = station.split('.') data = self.json_get( '/port/{}/{}/{}?fields={}'.format(shelf, resource, port, field)) if data is not None and 'interface' in data.keys() and data['interface'] is not None: return data['interface'][field] else: logging.warning( 'Station {} not found. Removing it from test.'.format(station)) return None def cleanup(self): self.monitor.cleanup(desired_ports=['sniffer0']) def create_monitor(self): self.cleanup() self.monitor.create(resource_=self.sniff_radio_resource, radio_=self.sniff_radio_port, channel=self.channel, frequency=self.frequency, name_='sniffer0') def start_sniff(self, capname='roam_test.pcap'): self.monitor.admin_up() self.monitor.start_sniff( capname=capname, duration_sec=self.sniff_duration) def stop_sniff(self): self.monitor.admin_down() def get_bssids(self): bssids = [] removable_stations = [] for station in self.station_list: bssid = self.get_port_data(station, 'ap') if bssid is not None: bssids.append(bssid) else: removable_stations.append(station) for station in removable_stations: self.station_list.remove(station) return bssids # get existing stations list def get_station_list(self): sta = self.staConnect.station_list() if sta == "no response": return "no response" sta_list = [] for i in sta: for j in i: sta_list.append(j) return sta_list def create_clients(self, start_id=0, sta_prefix='sta'): station_profile = self.new_station_profile() if self.station_flag is not None: _flags = self.station_flag.split(',') for flags in _flags: logger.info(f"Selected Flags: '{flags}'") station_profile.set_command_flag("add_sta", flags, 1) radio = self.station_radio sta_list = self.get_station_list() # print("Available list of stations on lanforge-GUI :", sta_list) logging.info(str(sta_list)) if not sta_list: # print("No stations are available on lanforge-GUI") logging.info("No stations are available on lanforge-GUI") else: station_profile.cleanup(sta_list, delay=1) self.wait_until_ports_disappear(sta_list=sta_list, debug_=True) # print("Creating stations.") logging.info("Creating stations.") station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, end_id_=self.num_sta - 1, padding_number_=10000, radio=radio) if self.sta_type == "normal": station_profile.set_command_flag("add_sta", "power_save_enable", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: # print("Soft roam true") logging.info("Soft roam true") if self.option == "otds": # print("OTDS present") station_profile.set_command_flag( "add_sta", "ft-roam-over-ds", 1) if self.sta_type == "11r-sae-802.1x": dut_passwd = "[BLANK]" station_profile.use_security(self.security, self.ssid, self.password) station_profile.set_number_template("00") station_profile.set_command_flag("add_sta", "create_admin_down", 1) station_profile.set_command_param("set_port", "report_timer", 1500) # connect station to particular bssid # self.station_profile.set_command_param("add_sta", "ap", self.bssid[0]) station_profile.set_command_flag("set_port", "rpt_timer", 1) if self.sta_type == "11r": station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) if not self.soft_roam: # station_profile.ssid_pass = self.security_key station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: # print("Soft roam true") logging.info("Soft roam true") if self.option == "otds": # print("OTDS present") station_profile.set_command_flag( "add_sta", "ft-roam-over-ds", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-PSK ", pairwise="", group="", psk="", eap="", identity="", passwd="", pin="", phase1="NA", phase2="NA", pac_file="NA", private_key="NA", pk_password="NA", hessid="00:00:00:00:00:01", realm="localhost.localdomain", client_cert="NA", imsi="NA", milenage="NA", domain="localhost.localdomain", roaming_consortium="NA", venue_group="NA", network_type="NA", ipaddr_type_avail="NA", network_auth_type="NA", anqp_3gpp_cell_net="NA") if self.sta_type == "11r-sae": station_profile.set_command_flag("add_sta", "ieee80211w", 2) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) # station_profile.set_command_flag("add_sta", "disable_roam", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: if self.option == "otds": station_profile.set_command_flag( "add_sta", "ft-roam-over-ds", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-SAE ", pairwise="", group="", psk="", eap="", identity="", passwd="", pin="", phase1="NA", phase2="NA", pac_file="NA", private_key="NA", pk_password="NA", hessid="00:00:00:00:00:01", realm="localhost.localdomain", client_cert="NA", imsi="NA", milenage="NA", domain="localhost.localdomain", roaming_consortium="NA", venue_group="NA", network_type="NA", ipaddr_type_avail="NA", network_auth_type="NA", anqp_3gpp_cell_net="NA") if self.sta_type == "11r-sae-802.1x": station_profile.set_command_flag("set_port", "rpt_timer", 1) station_profile.set_command_flag("add_sta", "ieee80211w", 2) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: if self.option == "otds": station_profile.set_command_flag( "add_sta", "ft-roam-over-ds", 1) # station_profile.set_command_flag("add_sta", "disable_roam", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) # station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f") station_profile.set_wifi_extra(key_mgmt="FT-EAP ", pairwise="[BLANK]", group="[BLANK]", psk="[BLANK]", eap=self.eap_method, identity=self.eap_identity, passwd=self.eap_password, pin="", phase1="NA", phase2="NA", pac_file="NA", private_key=self.private_key, pk_password=self.pk_passwd, hessid="00:00:00:00:00:01", realm="localhost.localdomain", client_cert="NA", imsi="NA", milenage="NA", domain="localhost.localdomain", roaming_consortium="NA", venue_group="NA", network_type="NA", ipaddr_type_avail="NA", network_auth_type="NA", anqp_3gpp_cell_net="NA") if self.sta_type == "11r-eap": # wpa2 enterprise station_profile.set_command_flag("set_port", "rpt_timer", 1) # station_profile.set_command_param("add_sta", "ieee80211w", 2) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: if self.option == "otds": station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) # station_profile.set_command_flag("add_sta", "disable_roam", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) # station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f") station_profile.set_wifi_extra(key_mgmt="FT-EAP ", pairwise=self.pairwise_cipher, group=self.groupwise_cipher, eap=self.eap_method, identity=self.eap_identity, passwd=self.eap_password, ca_cert=self.ca_cert, private_key=self.private_key, pk_password=self.pk_passwd) if self.sta_type == "11r-eap-sha384": # wpa3 enterprise station_profile.set_command_flag("set_port", "rpt_timer", 1) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: if self.option == "otds": station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-EAP-SHA384 ", pairwise=self.pairwise_cipher, group=self.groupwise_cipher, eap=self.eap_method, identity=self.eap_identity, passwd=self.eap_password, ca_cert=self.ca_cert, private_key=self.private_key, pk_password=self.pk_passwd) # enabling ieee80211w flag if self.ieee80211w: station_profile.set_command_param("add_sta", "ieee80211w", self.ieee80211w) station_profile.create(radio=radio, sta_names_=station_list) # print("Waiting for ports to appear") logging.info("Waiting for ports to appear") self.wait_until_ports_appear(sta_list=station_list) if self.soft_roam: for sta_name in station_list: sta = sta_name.split(".")[2] # TODO: Use name_to_eid # wpa_cmd = "roam " + str(checker2) bgscan = { "shelf": 1, # TODO: Do not hard-code resource, get it from radio eid I think. "resource": 1, "port": str(sta), "type": 'NA', "text": 'bgscan="simple:30:-65:300"' } # print(bgscan) logging.info(str(bgscan)) self.json_post("/cli-json/set_wifi_custom", bgscan) # time.sleep(2) station_profile.admin_up() # print("Waiting for ports to admin up") logging.info("Waiting for ports to admin up") if self.wait_for_ip(station_list): # print("All stations got IPs") logging.info("All stations got IPs") self.station_list = station_list # exit() return True else: # print("Stations failed to get IPs") logging.info("Stations failed to get IPs") return False def soft_roam_test(self): for atten_set in self.attenuator_combinations: self.roam_data[atten_set] = {} for current_iteration in range(1, self.iterations + 1): logging.info( 'Initiating iteration {}'.format(current_iteration)) for atten_set in self.attenuator_combinations: current_iteration_roam_data = {} self.roam_data[atten_set][current_iteration] = current_iteration_roam_data # for displaying purpose print( '========================================================================') print( 'Roaming test started on the attenuator combination {} - {}'.format(atten_set[0], atten_set[1])) print( '========================================================================') atten1, atten2 = atten_set self.set_attenuators(atten1=atten1, atten2=atten2) if (self.iteration_based): logging.info( 'Performing Roaming Test for {} iterations.'.format(self.iterations)) before_iteration_bssid_data = self.get_bssids() # logging.info( # 'Starting sniffer with roam_test_{}.pcap'.format(current_iteration)) # self.start_sniff( # capname='roam_test_{}.pcap'.format(current_iteration)) for attenuator_change_index in range(len(self.attenuator_increments)): logging.info('Setting the attenuation to {} for attenuator {}'.format( self.attenuator_increments[attenuator_change_index], self.active_attenuator)) self.set_atten( self.active_attenuator, self.attenuator_increments[attenuator_change_index]) logging.info('Setting the attenuation to {} for attenuator {}'.format( self.attenuator_decrements[attenuator_change_index], self.passive_attenuator)) self.set_atten( self.passive_attenuator, self.attenuator_decrements[attenuator_change_index]) logging.info( 'Waiting for {} seconds before monitoring the stations'.format(self.wait_time)) time.sleep(self.wait_time) logging.info('Monitoring the stations') current_step_bssid_data = self.get_bssids() for bssid_index in range(len(current_step_bssid_data)): if (self.station_list[bssid_index] not in current_iteration_roam_data.keys()): if (before_iteration_bssid_data[bssid_index] != current_step_bssid_data[bssid_index]): current_iteration_roam_data[self.station_list[bssid_index]] = { 'BSSID before iteration': before_iteration_bssid_data[bssid_index], 'BSSID after iteration': current_step_bssid_data[bssid_index], 'Signal Strength': self.get_port_data(self.station_list[bssid_index], 'signal') } if (current_step_bssid_data[bssid_index] in self.bssid_based_totals): self.bssid_based_totals[current_step_bssid_data[bssid_index]] += 1 else: self.bssid_based_totals[current_step_bssid_data[bssid_index]] = 1 logging.info( 'Iteration {} complete'.format(current_iteration)) logging.info('{}'.format(current_iteration_roam_data)) logging.info('{}'.format(self.roam_data)) # self.roam_data[atten_set].update({ # current_iteration: current_iteration_roam_data # }) self.roam_data[atten_set][current_iteration] = current_iteration_roam_data # self.roam_data[current_iteration] = current_iteration_roam_data # logging.info('Stopping sniffer') # self.stop_sniff() self.active_attenuator, self.passive_attenuator = self.passive_attenuator, self.active_attenuator else: logging.info( 'Duration based roaming test is still under development.') logging.info('Stopping sniffer') self.stop_sniff() logging.info(self.roam_data) def generate_report(self, result_json=None, result_dir='Roam_Test_Report', report_path=''): if result_json is not None: self.roam_data = result_json total_attempted_roams = len( self.station_list) * self.iterations * len(self.attenuator_combinations) # total_successful_roams = sum([len(station) # for station in self.roam_data.values()]) total_successful_roams = 0 for atten_set in self.attenuator_combinations: for iteration_values in self.roam_data[atten_set].values(): total_successful_roams += len(iteration_values) total_failed_roams = total_attempted_roams - total_successful_roams bssid_based_totals = self.bssid_based_totals station_based_roam_count = {} for combination_data in self.roam_data.values(): for station_data in combination_data.values(): if (list(station_data.values()) != []): station, station_values = list(station_data.keys())[ 0], list(station_data.values())[0] # calculating station based roam count if (station in station_based_roam_count): station_based_roam_count[station] += 1 else: station_based_roam_count[station] = 1 # calculating bssid based roam count # if(station_values['BSSID after iteration'] in bssid_based_totals): # bssid_based_totals[station_values['BSSID after iteration']] += 1 # else: # bssid_based_totals[station_values['BSSID after iteration']] = 1 else: logging.info( 'No roams in between {}'.format(combination_data)) # print(bssid_based_totals) # print(station_based_roam_count) # total_auth_failed_roams = 0 # calculating roam stats logging.info('Generating Report') report = lf_report(_output_pdf='roam_test.pdf', _output_html='roam_test.html', _results_dir_name=result_dir, _path=report_path) report_path = report.get_path() report_path_date_time = report.get_path_date_time() logging.info('path: {}'.format(report_path)) logging.info('path_date_time: {}'.format(report_path_date_time)) # setting report title report.set_title('Roam Test Report') report.build_banner() # test setup info test_setup_info = { 'SSID': [self.ssid if self.ssid else 'TEST CONFIGURED'][0], 'Security': [self.security if self.ssid else 'TEST CONFIGURED'][0], 'Station Radio': [self.station_radio if self.station_radio else 'TEST CONFIGURED'][0], 'Sniffer Radio': [self.sniff_radio if self.sniff_radio else 'TEST CONFIGURED'][0], 'Station Type': self.sta_type, 'Iterations': self.iterations, 'No of Devices': len(self.station_list), # 'No of Devices': '{} (V:{}, A:{}, W:{}, L:{}, M:{})'.format(len(self.sta_list), len(self.sta_list) - len(self.real_sta_list), self.android, self.windows, self.linux, self.mac), } report.test_setup_table( test_setup_data=test_setup_info, value='Test Setup Information') # objective and description report.set_obj_html(_obj_title='Objective', _obj='''The Candela Roam test uses the forced roam method to create and roam hundreds of WiFi stations between two or more APs with the same SSID or the same channel of different channels. The user can run thousands of roams over long durations and the test measures roaming delay for each roam, station connection times, network down time, packet loss etc.. The user can run this test using different security methods and compare the roaming performance. The expected behavior is the roaming delay should be 50msecs or less for all various kinds of fast roaming methods to avoid any form of service interruption to real-time delay sensitive applications. ''') report.build_objective() # Migration Totals report.set_table_title( 'Total Roams attempted vs Successful vs Failed') report.build_table_title() # graph for above total_roams_graph = lf_bar_graph_horizontal( _data_set=[[total_attempted_roams], [total_successful_roams], [total_failed_roams]], _xaxis_name='Roam Count', _yaxis_name='Wireless Clients', _label=[ 'Attempted Roams', 'Successful Roams', 'Failed Roams'], _graph_image_name='Total Roams attempted vs Successful vs Failed', _yaxis_label=['Stations'], _yaxis_categories=['Stations'], _yaxis_step=1, _yticks_font=8, _graph_title='Total Roams attempted vs Successful vs Failed', _title_size=16, _color=['orange', 'darkgreen', 'red'], _color_edge=['black'], _bar_height=0.15, _legend_loc="best", _legend_box=(1.0, 1.0), _dpi=96, _show_bar_value=False, _enable_csv=True, _color_name=['orange', 'darkgreen', 'red']) total_roams_graph_png = total_roams_graph.build_bar_graph_horizontal() logging.info('graph name {}'.format(total_roams_graph_png)) report.set_graph_image(total_roams_graph_png) # need to move the graph image to the results directory report.move_graph_image() report.set_csv_filename(total_roams_graph_png) report.move_csv_file() report.build_graph() # bssid based roam count report.set_table_title( 'BSSID based Successful vs Failed') report.build_table_title() # graph for above bssid_based_total_attempted_roams = [ total_attempted_roams // 2] * len(list(bssid_based_totals.values())) bssid_based_failed_roams = [bssid_based_total_attempted_roams[roam] - list( bssid_based_totals.values())[roam] for roam in range(len(bssid_based_totals.values()))] # print(bssid_based_total_attempted_roams) # print(bssid_based_failed_roams) # print(bssid_based_totals.values()) # print(bssid_based_totals.keys()) bssid_based_graph = lf_bar_graph_horizontal(_data_set=[list(bssid_based_totals.values())], _xaxis_name='Roam Count', _yaxis_name='Wireless Clients', _label=['Roams'], _graph_image_name='BSSID based Successful vs Failed', _yaxis_label=list( bssid_based_totals.keys()), _yaxis_categories=list( bssid_based_totals.keys()), _yaxis_step=1, _yticks_font=8, _graph_title='BSSID based Successful vs Failed', _title_size=16, _color=['darkgreen', 'darkgreen', 'red'], _color_edge=['black'], _bar_height=0.15, _legend_loc="best", _legend_box=(1.0, 1.0), _dpi=96, _show_bar_value=False, _enable_csv=True, _color_name=['darkgreen', 'darkgreen', 'red']) bssid_based_graph_png = bssid_based_graph.build_bar_graph_horizontal() logging.info('graph name {}'.format(bssid_based_graph_png)) report.set_graph_image(bssid_based_graph_png) # need to move the graph image to the results directory report.move_graph_image() report.set_csv_filename(bssid_based_graph_png) report.move_csv_file() report.build_graph() # station based roam count report.set_table_title( 'Station based Successful vs Failed') report.build_table_title() # graph for above station_based_total_attempted_roams = [ total_attempted_roams // len(self.station_list)] * len( self.station_list) station_based_failed_roams = [] # for station_index in range(len(station_based_roam_count)): # station_based_failed_roams.append(station_based_total_attempted_roams[station_index] - station_based_roam_count[station_index]) for station in station_based_roam_count: station_based_failed_roams.append( (total_attempted_roams // len(self.station_list)) - station_based_roam_count[station]) # print(station_based_total_attempted_roams) # print(station_based_failed_roams) # print(station_based_roam_count.values()) # print(station_based_roam_count.keys()) station_based_graph = lf_bar_graph_horizontal( _data_set=[station_based_total_attempted_roams, list(station_based_roam_count.values()), station_based_failed_roams], _xaxis_name='Roam Count', _yaxis_name='Wireless Clients', _label=[ 'Total', 'Successful', 'Failed'], _graph_image_name='Station based Successful vs Failed', _yaxis_label=list( station_based_roam_count.keys()), _yaxis_categories=list( station_based_roam_count.keys()), _yaxis_step=1, _yticks_font=8, _graph_title='Station based Successful vs Failed', _title_size=16, _color=[ 'orange', 'darkgreen', 'red'], _color_edge=['black'], _bar_height=0.15, _legend_loc="best", _legend_box=(1.0, 1.0), _dpi=96, _show_bar_value=False, _enable_csv=True, _color_name=['orange', 'darkgreen', 'red']) station_based_graph_png = station_based_graph.build_bar_graph_horizontal() logging.info('graph name {}'.format(station_based_graph_png)) report.set_graph_image(station_based_graph_png) # need to move the graph image to the results directory report.move_graph_image() report.set_csv_filename(station_based_graph_png) report.move_csv_file() report.build_graph() # closing report.build_custom() report.build_footer() report.write_html() report.write_pdf() def main(): help_summary = ''' ''' parser = argparse.ArgumentParser( prog='roam_test.py', ) required = parser.add_argument_group('Required Arguments') # required.add_argument('--ap1_bssid', # help='BSSID of Access Point 1', # required=True) # required.add_argument('--ap2_bssid', # help='BSSID of Access Point 2', # required=True) # required.add_argument('--attenuator1', # help='Serial number of attenuator near AP1', # required=True) # required.add_argument('--attenuator2', # help='Serial number of attenuator near AP2', # required=True) required.add_argument('--ssid', help='SSID of the APs', required=False) required.add_argument('--security', help='Encryption type for the SSID', required=False) required.add_argument('--password', help='Key/Password for the SSID', required=False) required.add_argument('--sta_radio', help='Station Radio', default='1.1.wiphy0', required=False) required.add_argument('--band', help='eg. --band "2G", "5G" or "6G"', default="5G") required.add_argument('--num_sta', help='Number of Stations', type=int, default=1, required=False) required.add_argument('--option', help='eg. --option "ota', type=str, default="ota", required=False) required.add_argument('--identity', help='Radius server identity', type=str, default="testuser", required=False) required.add_argument('--ttls_pass', help='Radius Server passwd', type=str, default="testpasswd", required=False) required.add_argument('--sta_type', type=str, help="provide the type of" " client you want to create i.e 11r,11r-sae," " 11r-sae-802.1x or simple as none", default="11r") optional = parser.add_argument_group('Optional Arguments') optional.add_argument('--mgr', help='LANforge IP', default='localhost') optional.add_argument('--port', help='LANforge port', type=int, default=8080) optional.add_argument('--upstream', help='Upstream Port', default='1.1.eth1') optional.add_argument('--step', help='Attenuation increment/decrement step size', type=int, default=10) optional.add_argument('--max_attenuation', help='Maximum attenuation value (dBm) for the attenuators', type=int, default=95) # optional.add_argument('--iteration_based', # help='Enable this flag to run the roam test based on iterations rather than duration', # action='store_true') optional.add_argument('--attenuators', nargs='+', help='Attenuator serials', required=True) optional.add_argument('--iterations', help='Number of iterations to perform roam test', type=int, default=2) # optional.add_argument('--duration', # help='Roam test time (seconds)', # type=int, # default=2) optional.add_argument('--wait_time', help='Waiting time (seconds) between iterations', type=int, default=30) optional.add_argument('--channel', help='Channel', type=str, default='AUTO') optional.add_argument('--frequency', help='Frequency', type=int, default=-1) # optional.add_argument('--hardroam', # help='Enable this flag to perform hardroam', # action='store_true') # optional.add_argument('--real', # help='Enable this flag to perform test on real devices', # action='store_true') optional.add_argument('--station_list', help='List of stations to perform roam test (comma seperated)') optional.add_argument('--station_flag', help='station flags to add. eg: --station_flag use-bss-transition', required=False, default=None) optional.add_argument('--sniff_radio', help='Sniffer Radio', default='1.1.wiphy0') optional.add_argument('--sniff_duration', help='Sniff duration', type=int, default=300) parser.add_argument('--help_summary', help='Show summary of what this script does', default=None, action="store_true") # logging configuration: parser.add_argument('--log_level', default=None, help='Set logging level: debug | info | warning | error | critical') parser.add_argument("--lf_logger_config_json", help="--lf_logger_config_json , json configuration of logger") args = parser.parse_args() # help summary if (args.help_summary): print(help_summary) exit(0) # set the logger level to debug logger_config = lf_logger_config.lf_logger_config() if args.log_level: logger_config.set_level(level=args.log_level) if args.lf_logger_config_json: # logger_config.lf_logger_config_json = "lf_logger_config.json" logger_config.lf_logger_config_json = args.lf_logger_config_json logger_config.load_lf_logger_config() if (args.station_list is not None): stations = args.station_list.split(',') roam_test = Roam( lanforge_ip=args.mgr, port=args.port, sniff_radio=args.sniff_radio, # ap1_bssid=args.ap1_bssid, # ap2_bssid=args.ap2_bssid, # attenuator1=args.attenuator1, # attenuator2=args.attenuator2, attenuators=args.attenuators, step=args.step, max_attenuation=args.max_attenuation, sniff_duration=args.sniff_duration, upstream=args.upstream, # ssid=args.ssid, # security=args.security, # password=args.password, wait_time=args.wait_time, channel=args.channel, frequency=args.frequency, iterations=args.iterations ) roam_test.station_list = stations logging.info('Selected stations\t{}'.format(stations)) else: roam_test = Roam( lanforge_ip=args.mgr, port=args.port, sniff_radio=args.sniff_radio, station_radio=args.sta_radio, band=args.band, # ap1_bssid=args.ap1_bssid, # ap2_bssid=args.ap2_bssid, # attenuator1=args.attenuator1, # attenuator2=args.attenuator2, attenuators=args.attenuators, step=args.step, max_attenuation=args.max_attenuation, sniff_duration=args.sniff_duration, upstream=args.upstream, ssid=args.ssid, security=args.security, password=args.password, num_sta=args.num_sta, station_flag=args.station_flag, option=args.option, identity=args.identity, ttls_pass=args.ttls_pass, sta_type=args.sta_type, wait_time=args.wait_time, channel=args.channel, frequency=args.frequency, iterations=args.iterations ) logging.info( 'Starting sniffer with roam_test.pcap') roam_test.start_sniff( capname='roam_test.pcap') roam_test.create_clients() roam_test.create_cx() roam_test.start_cx() if (roam_test.soft_roam): logging.info('Initiating soft roam test') roam_test.soft_roam_test() # roam_test.stop_cx() roam_test.generate_report() if __name__ == '__main__': main()