Files
wlan-lanforge-scripts/py-scripts/lf_multipsk.py
2021-08-10 23:11:18 +05:30

170 lines
7.3 KiB
Python

"""
NAME: lf_multipsk.py
PURPOSE:
to test the multipsk feature in access point. Multipsk feature states connecting clients using same ssid but different passwords ,
here we will create two or 3 passwords with different vlan id on single ssid and try to connect client with different passwords.
DESCRIPTION:
The script will follow basic functionality as:-
1- create station on input parameters provided
2- the input parameters consist of list of passwords,upstream,mac address, number of clients and radio as input
3- will create layer3 cx for tcp and udp
3- verify layer3 cx
4- verify the ip for each station is comming from respective vlan id or not.
example :-
python3 lf_multipsk.py --mgr 192.168.200.29 --mgr_port 8080 --ssid nikita --security wpa2 --passwd password@123 password@123 password@123 --num_sta 2 --radio wiphy1 --input "password@123","eth1","",2,wiphy1 "password@123","eth1","",2,wiphy0
INCLUDE_IN_README
-Nikita Yadav
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
#in progress
"""
import argparse
import os
import sys
import time
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import realm
from realm import Realm
class MultiPsk(Realm):
def __init__(self,
_host=None,
_port=None,
_ssid=None,
_input=None,
_security=None,
_passwd=None,
_radio=None,
_num_sta=None,
_start_id=0,
_resource=1,
_sta_prefix="sta",
debug_=False,
):
self.host = _host
self.port = _port
self.ssid = _ssid
self.input = _input
self.security = _security
self.passwd = _passwd
self.radio = _radio
self.num_sta = _num_sta
self.start_id = _start_id
self.resource = _resource
self.sta_prefix = _sta_prefix
self.debug = debug_
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
def build(self):
for idex, input in enumerate(self.input):
input_list = input.split(',')
if idex == 0:
number = 10
elif idex == 1:
number = 100
elif idex == 2:
number = 1000
elif idex == 3:
number = 10000
print("creating stations")
station_list = LFUtils.portNameSeries(prefix_=self.sta_prefix, start_id_=self.start_id,
end_id_=int(input_list[3]) - 1, padding_number_=number,
radio=input_list[4])
self.station_profile.use_security(self.security, self.ssid, str(input_list[0]))
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=input_list[4], sta_names_=station_list, debug=self.local_realm.debug)
self.local_realm.wait_until_ports_appear(sta_list=station_list)
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(station_list, timeout_sec=60):
print("All stations got IPs")
else:
print("Stations failed to get IPs")
print("create udp endp")
self.cx_profile_udp = self.local_realm.new_l3_cx_profile()
self.cx_profile_udp.side_a_min_bps = 128000
self.cx_profile_udp.side_b_min_bps = 128000
self.cx_profile_udp.side_a_min_pdu = 1200
self.cx_profile_udp.side_b_min_pdu = 1500
self.cx_profile_udp.report_timer = 1000
self.cx_profile_udp.name_prefix = "udp"
port_list = list(self.local_realm.find_ports_like("%s+" % self.sta_prefix))
print("port list", port_list)
if (port_list is None) or (len(port_list) < 1):
raise ValueError("Unable to find ports named '%s'+" % self.sta_prefix)
self.cx_profile_udp.create(endp_type="lf_udp",
side_a=port_list,
side_b="%d.%s" % (self.resource, input_list[1]),
suppress_related_commands=True)
# Create TCP endpoints
print("create tcp endp")
self.l3_tcp_profile = self.local_realm.new_l3_cx_profile()
self.l3_tcp_profile.side_a_min_bps = 128000
self.l3_tcp_profile.side_b_min_bps = 56000
self.l3_tcp_profile.name_prefix = "tcp"
self.l3_tcp_profile.report_timer = 1000
self.l3_tcp_profile.create(endp_type="lf_tcp",
side_a=list(self.local_realm.find_ports_like("%s+" % self.sta_prefix)),
side_b="%d.%s" % (self.resource, input_list[1]),
suppress_related_commands=True)
def start(self):
self.cx_profile_udp.start_cx()
self.l3_tcp_profile.start_cx()
def postcleanup(self):
self.cx_profile_udp.cleanup()
self.l3_tcp_profile.cleanup()
self.station_profile.cleanup()
LFUtils.wait_until_ports_disappear(base_url=self.local_realm.lfclient_url, port_list=self.station_profile.station_names,
debug=self.debug)
print("Test Completed")
def main():
parser = argparse.ArgumentParser(description="lanforge webpage download Test Script")
parser.add_argument('--mgr', help='hostname for where LANforge GUI is running', default='localhost')
parser.add_argument('--mgr_port', help='port LANforge GUI HTTP service is running on', default=8080)
parser.add_argument('--ssid', help='WiFi SSID for client to associate to')
parser.add_argument('--security', help='WiFi Security protocol: {open|wep|wpa2|wpa3', default="wpa2")
parser.add_argument('--passwd', nargs="+", help='WiFi passphrase/password/key, here using list of passwords for single ssid')
parser.add_argument('--radio', help='specify WiFi radio')
parser.add_argument('--num_sta', type=int, help='specify number of stations you want to create', default=1)
parser.add_argument('--input', nargs="+")
args = parser.parse_args()
multi_obj = MultiPsk(_host=args.mgr,
_port=args.mgr_port,
_ssid=args.ssid,
_input=args.input,
_security=args.security,
_passwd=args.passwd,
_radio=args.radio,
_num_sta=args.num_sta)
multi_obj.build()
multi_obj.start()
time.sleep(60)
multi_obj.postcleanup()
if __name__ == '__main__':
main()