diff --git a/py-scripts/lf_multipsk.py b/py-scripts/lf_multipsk.py index 2eda8d1f..551ad245 100755 --- a/py-scripts/lf_multipsk.py +++ b/py-scripts/lf_multipsk.py @@ -7,13 +7,6 @@ PURPOSE: to test the multipsk feature in access point. Multipsk feature states connecting clients using same ssid but different passwords , here we will create two or 3 passwords with different vlan id on single ssid and try to connect client with different passwords. - NOTE: This script has a lot of hard-coded values, and will only work in a very particular setup - like what is used in TIP labs with APs set up for VLANs and LANforge set up to serve DHCP address - on un-tagged ports as well as tagged 1q vlans. This script may be improved in the future to be more flexible. - - NOTE: This script does not create the .1q vlan interfaces, it assumes they already exist before this script - runs. - DESCRIPTION: The script will follow basic functionality as:- 1- create station on input parameters provided @@ -34,12 +27,11 @@ import os import importlib import argparse import time -import logging - -logger = logging.getLogger(__name__) +import allure +from tabulate import tabulate if sys.version_info[0] != 3: - logger.critical("This script requires Python 3") + print("This script requires Python 3") exit(1) @@ -50,7 +42,6 @@ LFCliBase = lfcli_base.LFCliBase LFUtils = importlib.import_module("py-json.LANforge.LFUtils") realm = importlib.import_module("py-json.realm") Realm = realm.Realm -lf_logger_config = importlib.import_module("py-scripts.lf_logger_config") class MultiPsk(Realm): @@ -65,6 +56,7 @@ class MultiPsk(Realm): num_sta=None, start_id=0, resource=1, + upstream_resource=1, sta_prefix="sta", debug_=False, ): @@ -83,9 +75,12 @@ class MultiPsk(Realm): self.sta_prefix = sta_prefix self.debug = debug_ self.station_profile = self.new_station_profile() + self.upstream_resource = upstream_resource def build(self): station_list = [] + data_table = "" + dict_table = {} for idex, input in enumerate(self.input): # print(input) if "." in input['upstream']: @@ -98,24 +93,45 @@ class MultiPsk(Realm): end_id_=input['num_station'] - 1, padding_number_=100, radio=self.radio) # implementation for non vlan pending **** - logger.info("creating stations: %s" % (station_list)) - self.station_profile.use_security(self.security, self.ssid, self.passwd) + print("creating stations") + self.station_profile.use_security(self.security, self.ssid, str(input['password'])) self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) self.station_profile.set_command_param("set_port", "report_timer", 1500) self.station_profile.set_command_flag("set_port", "rpt_timer", 1) - if self.station_profile.create(radio=self.radio, sta_names_=station_list, debug=self.debug): - self._pass("Station creation succeeded.") - else: - self._fail("Station creation failed.") - return False - + self.station_profile.create(radio=input['radio'], sta_names_=station_list, debug=self.debug) + self.wait_until_ports_appear(sta_list=station_list) + for sta_name in station_list: + if '1.1.' in sta_name: + sta_name = sta_name.strip('1.1.') + try: + cli_base = LFCliBase(_lfjson_host=self.lfclient_host, _lfjson_port=self.lfclient_port) + resp = cli_base.json_get(_req_url=f'port/1/1/{sta_name}') + dict_data = resp['interface'] + dict_table[""] = list(dict_data.keys()) + dict_table["Before"] = list(dict_data.values()) + except Exception as e: + print(e) self.station_profile.admin_up() if self.wait_for_ip(station_list, timeout_sec=120): - self._pass("All stations got IPs") + print("All stations got IPs") else: - self._fail("Stations failed to get IPs") - - logger.info("create udp endp") + print("Stations failed to get IPs") + for sta_name2 in station_list: + if '1.1.' in sta_name2: + sta_name2 = sta_name2.strip('1.1.') + try: + cli_base = LFCliBase(_lfjson_host=self.lfclient_host, _lfjson_port=self.lfclient_port) + resp = cli_base.json_get(_req_url=f'port/1/1/{sta_name2}') + dict_data = resp['interface'] + dict_table["After"] = list(dict_data.values()) + try: + data_table = tabulate(dict_table, headers='keys', tablefmt='fancy_grid') + except Exception as e: + print(e) + allure.attach(name=f'{sta_name2} info', body=data_table) + except Exception as e: + print(e) + print("create udp endp") self.cx_profile_udp = self.new_l3_cx_profile() self.cx_profile_udp.side_a_min_bps = 128000 self.cx_profile_udp.side_b_min_bps = 128000 @@ -127,14 +143,10 @@ class MultiPsk(Realm): # print("port list", port_list) if (port_list is None) or (len(port_list) < 1): raise ValueError("Unable to find ports named '%s'+" % self.sta_prefix) - if self.cx_profile_udp.create(endp_type="lf_udp", - side_a=port_list, - sleep_time=0, - side_b="%d.%s" % (self.resource, input['upstream']), - suppress_related_commands=True): - self._pass("UDP connections created.") - else: - self._fail("UDP connections could not be created.") + self.cx_profile_udp.create(endp_type="lf_udp", + side_a=port_list, + side_b="%d.%s" % (int(self.upstream_resource), input['upstream']), + suppress_related_commands=True) # Create TCP endpoints print("create tcp endp") @@ -143,14 +155,10 @@ class MultiPsk(Realm): self.l3_tcp_profile.side_b_min_bps = 56000 self.l3_tcp_profile.name_prefix = "tcp" self.l3_tcp_profile.report_timer = 1000 - if self.l3_tcp_profile.create(endp_type="lf_tcp", - sleep_time=0, - side_a=list(self.find_ports_like("%s+" % self.sta_prefix)), - side_b="%d.%s" % (self.resource, input['upstream']), - suppress_related_commands=True): - self._pass("TCP connections created.") - else: - self._fail("TCP connections not created.") + self.l3_tcp_profile.create(endp_type="lf_tcp", + side_a=list(self.find_ports_like("%s+" % self.sta_prefix)), + side_b="%d.%s" % (int(self.upstream_resource), input['upstream']), + suppress_related_commands=True) def start(self): self.cx_profile_udp.start_cx() @@ -161,15 +169,14 @@ class MultiPsk(Realm): vlan_ips = {} for i in self.input: # print(i) - # TODO: Properly detect if it is a real vlan interface, don't just match on a period in the name. if "." in i['upstream']: # print(str(i['upstream']) + " is a vlan upstream port") - logger.info("checking VLAN upstream port: %s ip .." %(i['upstream'])) + print("checking its ip ..") data = self.json_get("ports/list?fields=IP") for val in data["interfaces"]: for j in val: - if "1." + str(self.resource) + "." + str(i['upstream']) == j: - ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip'] + if "1." + str(self.upstream_resource) + "." + str(i['upstream']) == j: + ip_upstream = val["1." + str(self.upstream_resource) + "." + str(i['upstream'])]['ip'] vlan_ips[i['upstream']] = ip_upstream # print(ip_upstream) # print(vlan_ips) @@ -181,12 +188,12 @@ class MultiPsk(Realm): for i in self.input: if "." not in i['upstream']: # print(str(i['upstream']) + " is not an vlan upstream port") - logger.info("checking non-vlan ip ..") + print("checking its ip ..") data = self.json_get("ports/list?fields=IP") for val in data["interfaces"]: for j in val: - if "1." + str(self.resource) + "." + str(i['upstream']) == j: - ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip'] + if "1." + str(self.upstream_resource) + "." + str(i['upstream']) == j: + ip_upstream = val["1." + str(self.upstream_resource) + "." + str(i['upstream'])]['ip'] non_vlan_ips[i['upstream']] = ip_upstream # print(ip_upstream) # print(non_vlan_ips) @@ -215,8 +222,6 @@ class MultiPsk(Realm): # print(station_ip) return station_ip - # TODO: Take cmd line arg for the 'eth2', and radios, and I guess port_list as well. Or some - # other way to not hard-code this. def get_sta_ip_for_more_vlan(self): input = [{'password': 'lanforge1', 'upstream': 'eth2.100', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'}, {'password': 'lanforge2', 'upstream': 'eth2.200', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'}, @@ -293,10 +298,10 @@ class MultiPsk(Realm): x = vlan_ip[i].split('.') y = station_ip[j].split('.') if x[0] == y[0] and x[1] == y[1]: - self._pass("station got ip from vlan") + print("station got ip from vlan") return "Pass" else: - self._fail("station did not got ip from vlan") + print("station did not got ip from vlan") return "Fail" def compare_nonvlan_ip_nat(self): @@ -307,30 +312,27 @@ class MultiPsk(Realm): x = id['upstream'] # print(non_vlan_sta_ip[x]) non_vlan = non_vlan_sta_ip[x].split(".") - # TODO: This should not be hard-coded, take as cmd line arg input instead. if non_vlan[0] == "192" and non_vlan[1] == "168": # print("Pass") - self._pass("NON-VLAN IP in NAT setup is as expected: %s" % non_vlan_sta_ip[x]) x = 'Pass' else: - self._fail("NON-VLAN IP in NAT set up is NOT as expected: %s" % non_vlan_sta_ip[x]) x = "Fail" return x def compare_nonvlan_ip_bridge(self): upstream_ip = self.monitor_non_vlan_ip() non_vlan_sta_ip = self.get_non_vlan_sta_ip() - + result1 = "Fail" for i, j in zip(upstream_ip, non_vlan_sta_ip): # print(i) if i == j: x = upstream_ip[i].split('.') y = non_vlan_sta_ip[j].split('.') if x[0] == y[0] and x[1] == y[1]: - self._pass("station got ip: %s from upstream: %s" % (non_vlan_sta_ip[j], upstream_ip[i])) + print("station got ip from upstream") result1 = "Pass" else: - self._fail("station got ip: %s NOT from upstream: %s" % (non_vlan_sta_ip[j], upstream_ip[i])) + print("station did not got ip from upstream") result1 = "Fail" return result1 @@ -338,13 +340,9 @@ class MultiPsk(Realm): self.cx_profile_udp.cleanup() self.l3_tcp_profile.cleanup() self.station_profile.cleanup() - if LFUtils.wait_until_ports_disappear(base_url=self.lfclient_host, port_list=self.station_profile.station_names, - debug=self.debug): - self._pass("cleanup: Successfully deleted ports.") - else: - self._fail("cleanup: Failed to delete ports.") - - logger.info("post-cleanup completed") + LFUtils.wait_until_ports_disappear(base_url=self.lfclient_host, port_list=self.station_profile.station_names, + debug=self.debug) + print("Test Completed") def main(): @@ -356,12 +354,6 @@ def main(): parser.add_argument('--mode', help="Mode for lf_multipsk", default=None) args = parser.parse_args() - logger_config = lf_logger_config.lf_logger_config() - # set the logger level to requested value - logger_config.set_level(level=args.log_level) - logger_config.set_json(json_file=args.lf_logger_config_json) - - # TODO: Allow specifying this data on the command line. input_data = [{ "password": args.passwd, "upstream": "eth2.100", @@ -403,34 +395,34 @@ def main(): multi_obj.build() multi_obj.start() - time.sleep(60) #TODO: Shouldn't need this much sleep, maybe some wait_for_foo method instead? + time.sleep(60) multi_obj.monitor_vlan_ip() if args.n_vlan == "1": multi_obj.get_sta_ip() else: multi_obj.get_sta_ip_for_more_vlan() - logger.info("checking for vlan ips") result = multi_obj.compare_ip() - - logger.info("now checking ip for non vlan port") + print("checking for vlan ips") + if result == "Pass": + print("Test pass") + else: + print("Test Fail") + print("now checking ip for non vlan port") multi_obj.monitor_non_vlan_ip() multi_obj.get_non_vlan_sta_ip() if args.mode == "BRIDGE": result1 = multi_obj.compare_nonvlan_ip_bridge() else: result1 = multi_obj.compare_nonvlan_ip_nat() - - # TODO: Verify connections were able to start and pass traffic. - - logger.info("all result gathered") - logger.info("clean up") + if result1 == "Pass": + print("Test passed for non vlan ip ") + else: + print("Test failed for non vlan ip") + print("all result gathered") + print("clean up") multi_obj.postcleanup() - if multi_obj.passes(): - multi_obj.exit_success() - else: - multi_obj.exit_fail() if __name__ == '__main__': main()