mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 19:28:00 +00:00
Added upstream_resource and replaced resource with upstream_resource
Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
@@ -7,13 +7,6 @@ PURPOSE:
|
||||
to test the multipsk feature in access point. Multipsk feature states connecting clients using same ssid but different passwords ,
|
||||
here we will create two or 3 passwords with different vlan id on single ssid and try to connect client with different passwords.
|
||||
|
||||
NOTE: This script has a lot of hard-coded values, and will only work in a very particular setup
|
||||
like what is used in TIP labs with APs set up for VLANs and LANforge set up to serve DHCP address
|
||||
on un-tagged ports as well as tagged 1q vlans. This script may be improved in the future to be more flexible.
|
||||
|
||||
NOTE: This script does not create the .1q vlan interfaces, it assumes they already exist before this script
|
||||
runs.
|
||||
|
||||
DESCRIPTION:
|
||||
The script will follow basic functionality as:-
|
||||
1- create station on input parameters provided
|
||||
@@ -34,12 +27,11 @@ import os
|
||||
import importlib
|
||||
import argparse
|
||||
import time
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
import allure
|
||||
from tabulate import tabulate
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
logger.critical("This script requires Python 3")
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
|
||||
@@ -50,7 +42,6 @@ LFCliBase = lfcli_base.LFCliBase
|
||||
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
|
||||
realm = importlib.import_module("py-json.realm")
|
||||
Realm = realm.Realm
|
||||
lf_logger_config = importlib.import_module("py-scripts.lf_logger_config")
|
||||
|
||||
|
||||
class MultiPsk(Realm):
|
||||
@@ -65,6 +56,7 @@ class MultiPsk(Realm):
|
||||
num_sta=None,
|
||||
start_id=0,
|
||||
resource=1,
|
||||
upstream_resource=1,
|
||||
sta_prefix="sta",
|
||||
debug_=False,
|
||||
):
|
||||
@@ -83,9 +75,12 @@ class MultiPsk(Realm):
|
||||
self.sta_prefix = sta_prefix
|
||||
self.debug = debug_
|
||||
self.station_profile = self.new_station_profile()
|
||||
self.upstream_resource = upstream_resource
|
||||
|
||||
def build(self):
|
||||
station_list = []
|
||||
data_table = ""
|
||||
dict_table = {}
|
||||
for idex, input in enumerate(self.input):
|
||||
# print(input)
|
||||
if "." in input['upstream']:
|
||||
@@ -98,24 +93,45 @@ class MultiPsk(Realm):
|
||||
end_id_=input['num_station'] - 1, padding_number_=100,
|
||||
radio=self.radio)
|
||||
# implementation for non vlan pending ****
|
||||
logger.info("creating stations: %s" % (station_list))
|
||||
self.station_profile.use_security(self.security, self.ssid, self.passwd)
|
||||
print("creating stations")
|
||||
self.station_profile.use_security(self.security, self.ssid, str(input['password']))
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
if self.station_profile.create(radio=self.radio, sta_names_=station_list, debug=self.debug):
|
||||
self._pass("Station creation succeeded.")
|
||||
else:
|
||||
self._fail("Station creation failed.")
|
||||
return False
|
||||
|
||||
self.station_profile.create(radio=input['radio'], sta_names_=station_list, debug=self.debug)
|
||||
self.wait_until_ports_appear(sta_list=station_list)
|
||||
for sta_name in station_list:
|
||||
if '1.1.' in sta_name:
|
||||
sta_name = sta_name.strip('1.1.')
|
||||
try:
|
||||
cli_base = LFCliBase(_lfjson_host=self.lfclient_host, _lfjson_port=self.lfclient_port)
|
||||
resp = cli_base.json_get(_req_url=f'port/1/1/{sta_name}')
|
||||
dict_data = resp['interface']
|
||||
dict_table[""] = list(dict_data.keys())
|
||||
dict_table["Before"] = list(dict_data.values())
|
||||
except Exception as e:
|
||||
print(e)
|
||||
self.station_profile.admin_up()
|
||||
if self.wait_for_ip(station_list, timeout_sec=120):
|
||||
self._pass("All stations got IPs")
|
||||
print("All stations got IPs")
|
||||
else:
|
||||
self._fail("Stations failed to get IPs")
|
||||
|
||||
logger.info("create udp endp")
|
||||
print("Stations failed to get IPs")
|
||||
for sta_name2 in station_list:
|
||||
if '1.1.' in sta_name2:
|
||||
sta_name2 = sta_name2.strip('1.1.')
|
||||
try:
|
||||
cli_base = LFCliBase(_lfjson_host=self.lfclient_host, _lfjson_port=self.lfclient_port)
|
||||
resp = cli_base.json_get(_req_url=f'port/1/1/{sta_name2}')
|
||||
dict_data = resp['interface']
|
||||
dict_table["After"] = list(dict_data.values())
|
||||
try:
|
||||
data_table = tabulate(dict_table, headers='keys', tablefmt='fancy_grid')
|
||||
except Exception as e:
|
||||
print(e)
|
||||
allure.attach(name=f'{sta_name2} info', body=data_table)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("create udp endp")
|
||||
self.cx_profile_udp = self.new_l3_cx_profile()
|
||||
self.cx_profile_udp.side_a_min_bps = 128000
|
||||
self.cx_profile_udp.side_b_min_bps = 128000
|
||||
@@ -127,14 +143,10 @@ class MultiPsk(Realm):
|
||||
# print("port list", port_list)
|
||||
if (port_list is None) or (len(port_list) < 1):
|
||||
raise ValueError("Unable to find ports named '%s'+" % self.sta_prefix)
|
||||
if self.cx_profile_udp.create(endp_type="lf_udp",
|
||||
side_a=port_list,
|
||||
sleep_time=0,
|
||||
side_b="%d.%s" % (self.resource, input['upstream']),
|
||||
suppress_related_commands=True):
|
||||
self._pass("UDP connections created.")
|
||||
else:
|
||||
self._fail("UDP connections could not be created.")
|
||||
self.cx_profile_udp.create(endp_type="lf_udp",
|
||||
side_a=port_list,
|
||||
side_b="%d.%s" % (int(self.upstream_resource), input['upstream']),
|
||||
suppress_related_commands=True)
|
||||
|
||||
# Create TCP endpoints
|
||||
print("create tcp endp")
|
||||
@@ -143,14 +155,10 @@ class MultiPsk(Realm):
|
||||
self.l3_tcp_profile.side_b_min_bps = 56000
|
||||
self.l3_tcp_profile.name_prefix = "tcp"
|
||||
self.l3_tcp_profile.report_timer = 1000
|
||||
if self.l3_tcp_profile.create(endp_type="lf_tcp",
|
||||
sleep_time=0,
|
||||
side_a=list(self.find_ports_like("%s+" % self.sta_prefix)),
|
||||
side_b="%d.%s" % (self.resource, input['upstream']),
|
||||
suppress_related_commands=True):
|
||||
self._pass("TCP connections created.")
|
||||
else:
|
||||
self._fail("TCP connections not created.")
|
||||
self.l3_tcp_profile.create(endp_type="lf_tcp",
|
||||
side_a=list(self.find_ports_like("%s+" % self.sta_prefix)),
|
||||
side_b="%d.%s" % (int(self.upstream_resource), input['upstream']),
|
||||
suppress_related_commands=True)
|
||||
|
||||
def start(self):
|
||||
self.cx_profile_udp.start_cx()
|
||||
@@ -161,15 +169,14 @@ class MultiPsk(Realm):
|
||||
vlan_ips = {}
|
||||
for i in self.input:
|
||||
# print(i)
|
||||
# TODO: Properly detect if it is a real vlan interface, don't just match on a period in the name.
|
||||
if "." in i['upstream']:
|
||||
# print(str(i['upstream']) + " is a vlan upstream port")
|
||||
logger.info("checking VLAN upstream port: %s ip .." %(i['upstream']))
|
||||
print("checking its ip ..")
|
||||
data = self.json_get("ports/list?fields=IP")
|
||||
for val in data["interfaces"]:
|
||||
for j in val:
|
||||
if "1." + str(self.resource) + "." + str(i['upstream']) == j:
|
||||
ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip']
|
||||
if "1." + str(self.upstream_resource) + "." + str(i['upstream']) == j:
|
||||
ip_upstream = val["1." + str(self.upstream_resource) + "." + str(i['upstream'])]['ip']
|
||||
vlan_ips[i['upstream']] = ip_upstream
|
||||
# print(ip_upstream)
|
||||
# print(vlan_ips)
|
||||
@@ -181,12 +188,12 @@ class MultiPsk(Realm):
|
||||
for i in self.input:
|
||||
if "." not in i['upstream']:
|
||||
# print(str(i['upstream']) + " is not an vlan upstream port")
|
||||
logger.info("checking non-vlan ip ..")
|
||||
print("checking its ip ..")
|
||||
data = self.json_get("ports/list?fields=IP")
|
||||
for val in data["interfaces"]:
|
||||
for j in val:
|
||||
if "1." + str(self.resource) + "." + str(i['upstream']) == j:
|
||||
ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip']
|
||||
if "1." + str(self.upstream_resource) + "." + str(i['upstream']) == j:
|
||||
ip_upstream = val["1." + str(self.upstream_resource) + "." + str(i['upstream'])]['ip']
|
||||
non_vlan_ips[i['upstream']] = ip_upstream
|
||||
# print(ip_upstream)
|
||||
# print(non_vlan_ips)
|
||||
@@ -215,8 +222,6 @@ class MultiPsk(Realm):
|
||||
# print(station_ip)
|
||||
return station_ip
|
||||
|
||||
# TODO: Take cmd line arg for the 'eth2', and radios, and I guess port_list as well. Or some
|
||||
# other way to not hard-code this.
|
||||
def get_sta_ip_for_more_vlan(self):
|
||||
input = [{'password': 'lanforge1', 'upstream': 'eth2.100', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'},
|
||||
{'password': 'lanforge2', 'upstream': 'eth2.200', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'},
|
||||
@@ -293,10 +298,10 @@ class MultiPsk(Realm):
|
||||
x = vlan_ip[i].split('.')
|
||||
y = station_ip[j].split('.')
|
||||
if x[0] == y[0] and x[1] == y[1]:
|
||||
self._pass("station got ip from vlan")
|
||||
print("station got ip from vlan")
|
||||
return "Pass"
|
||||
else:
|
||||
self._fail("station did not got ip from vlan")
|
||||
print("station did not got ip from vlan")
|
||||
return "Fail"
|
||||
|
||||
def compare_nonvlan_ip_nat(self):
|
||||
@@ -307,30 +312,27 @@ class MultiPsk(Realm):
|
||||
x = id['upstream']
|
||||
# print(non_vlan_sta_ip[x])
|
||||
non_vlan = non_vlan_sta_ip[x].split(".")
|
||||
# TODO: This should not be hard-coded, take as cmd line arg input instead.
|
||||
if non_vlan[0] == "192" and non_vlan[1] == "168":
|
||||
# print("Pass")
|
||||
self._pass("NON-VLAN IP in NAT setup is as expected: %s" % non_vlan_sta_ip[x])
|
||||
x = 'Pass'
|
||||
else:
|
||||
self._fail("NON-VLAN IP in NAT set up is NOT as expected: %s" % non_vlan_sta_ip[x])
|
||||
x = "Fail"
|
||||
return x
|
||||
|
||||
def compare_nonvlan_ip_bridge(self):
|
||||
upstream_ip = self.monitor_non_vlan_ip()
|
||||
non_vlan_sta_ip = self.get_non_vlan_sta_ip()
|
||||
|
||||
result1 = "Fail"
|
||||
for i, j in zip(upstream_ip, non_vlan_sta_ip):
|
||||
# print(i)
|
||||
if i == j:
|
||||
x = upstream_ip[i].split('.')
|
||||
y = non_vlan_sta_ip[j].split('.')
|
||||
if x[0] == y[0] and x[1] == y[1]:
|
||||
self._pass("station got ip: %s from upstream: %s" % (non_vlan_sta_ip[j], upstream_ip[i]))
|
||||
print("station got ip from upstream")
|
||||
result1 = "Pass"
|
||||
else:
|
||||
self._fail("station got ip: %s NOT from upstream: %s" % (non_vlan_sta_ip[j], upstream_ip[i]))
|
||||
print("station did not got ip from upstream")
|
||||
result1 = "Fail"
|
||||
return result1
|
||||
|
||||
@@ -338,13 +340,9 @@ class MultiPsk(Realm):
|
||||
self.cx_profile_udp.cleanup()
|
||||
self.l3_tcp_profile.cleanup()
|
||||
self.station_profile.cleanup()
|
||||
if LFUtils.wait_until_ports_disappear(base_url=self.lfclient_host, port_list=self.station_profile.station_names,
|
||||
debug=self.debug):
|
||||
self._pass("cleanup: Successfully deleted ports.")
|
||||
else:
|
||||
self._fail("cleanup: Failed to delete ports.")
|
||||
|
||||
logger.info("post-cleanup completed")
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_host, port_list=self.station_profile.station_names,
|
||||
debug=self.debug)
|
||||
print("Test Completed")
|
||||
|
||||
|
||||
def main():
|
||||
@@ -356,12 +354,6 @@ def main():
|
||||
parser.add_argument('--mode', help="Mode for lf_multipsk", default=None)
|
||||
args = parser.parse_args()
|
||||
|
||||
logger_config = lf_logger_config.lf_logger_config()
|
||||
# set the logger level to requested value
|
||||
logger_config.set_level(level=args.log_level)
|
||||
logger_config.set_json(json_file=args.lf_logger_config_json)
|
||||
|
||||
# TODO: Allow specifying this data on the command line.
|
||||
input_data = [{
|
||||
"password": args.passwd,
|
||||
"upstream": "eth2.100",
|
||||
@@ -403,34 +395,34 @@ def main():
|
||||
|
||||
multi_obj.build()
|
||||
multi_obj.start()
|
||||
time.sleep(60) #TODO: Shouldn't need this much sleep, maybe some wait_for_foo method instead?
|
||||
time.sleep(60)
|
||||
multi_obj.monitor_vlan_ip()
|
||||
if args.n_vlan == "1":
|
||||
multi_obj.get_sta_ip()
|
||||
else:
|
||||
multi_obj.get_sta_ip_for_more_vlan()
|
||||
|
||||
logger.info("checking for vlan ips")
|
||||
result = multi_obj.compare_ip()
|
||||
|
||||
logger.info("now checking ip for non vlan port")
|
||||
print("checking for vlan ips")
|
||||
if result == "Pass":
|
||||
print("Test pass")
|
||||
else:
|
||||
print("Test Fail")
|
||||
print("now checking ip for non vlan port")
|
||||
multi_obj.monitor_non_vlan_ip()
|
||||
multi_obj.get_non_vlan_sta_ip()
|
||||
if args.mode == "BRIDGE":
|
||||
result1 = multi_obj.compare_nonvlan_ip_bridge()
|
||||
else:
|
||||
result1 = multi_obj.compare_nonvlan_ip_nat()
|
||||
|
||||
# TODO: Verify connections were able to start and pass traffic.
|
||||
|
||||
logger.info("all result gathered")
|
||||
logger.info("clean up")
|
||||
if result1 == "Pass":
|
||||
print("Test passed for non vlan ip ")
|
||||
else:
|
||||
print("Test failed for non vlan ip")
|
||||
print("all result gathered")
|
||||
print("clean up")
|
||||
multi_obj.postcleanup()
|
||||
|
||||
if multi_obj.passes():
|
||||
multi_obj.exit_success()
|
||||
else:
|
||||
multi_obj.exit_fail()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user