Add Enterprise security support for client

Signed-off-by: anil-tegala <anil.tegala@candelatech.com>
This commit is contained in:
anil-tegala
2024-04-19 17:12:34 +05:30
parent ee4255e822
commit ce228f8609

View File

@@ -184,8 +184,18 @@ class HardRoam(Realm):
port="8888",
band_cc="5g",
timeout="10",
identity=None,
ttls_pass=None,
eap_method=None,
eap_identity=None,
eap_password=None,
pairwise_cipher=None,
groupwise_cipher=None,
private_key=None,
pk_passwd=None,
ca_cert=None,
eap_phase1=None,
eap_phase2=None,
# identity=None,
# ttls_pass=None,
log_file=False,
debug=False,
soft_roam=False,
@@ -215,7 +225,7 @@ class HardRoam(Realm):
self.option = option
self.iteration_based = iteration_based
self.duration_based = duration_based
self.local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port)
self.local_realm = Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port)
self.staConnect = sta_connect.StaConnect2(host=self.lanforge_ip, port=self.lanforge_port,
outfile="sta_connect2.csv")
self.final_bssid = []
@@ -224,7 +234,7 @@ class HardRoam(Realm):
self.test_duration = None
self.client_list = []
self.dut_name = dut_name
self.pcap_obj = lf_pcap.LfPcap()
self.pcap_obj = lf_pcap.LfPcap(host=self.lanforge_ip, port=self.lanforge_port)
self.lf_csv_obj = lf_csv.lf_csv()
self.traffic_type = traffic_type
self.roam_delay = roaming_delay
@@ -245,8 +255,16 @@ class HardRoam(Realm):
self.cc.pwd = path
self.start_time = None
self.end_time = None
self.identity = identity
self.ttls_pass = ttls_pass
self.eap_method = eap_method
self.eap_identity = eap_identity
self.eap_password = eap_password
self.pairwise_cipher = str(pairwise_cipher)
self.groupwise_cipher = groupwise_cipher
self.private_key = private_key
self.pk_passwd = pk_passwd
self.ca_cert = ca_cert
self.eap_phase1 = eap_phase1
self.eap_phase2 = eap_phase2
self.log_file = log_file
self.debug = debug
self.mac_data = None
@@ -348,7 +366,7 @@ class HardRoam(Realm):
def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None,
dut_security=None, dut_passwd=None, radio=None):
local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port)
local_realm = Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port)
station_profile = local_realm.new_station_profile()
if self.band == "fiveg":
radio = self.fiveg_radios
@@ -470,9 +488,9 @@ class HardRoam(Realm):
ipaddr_type_avail="NA",
network_auth_type="NA",
anqp_3gpp_cell_net="NA")
if self.sta_type == "11r-sae-802.1x":
if self.sta_type == "11r-eap": # wpa2 enterprise
station_profile.set_command_flag("set_port", "rpt_timer", 1)
station_profile.set_command_flag("add_sta", "ieee80211w", 2)
# station_profile.set_command_param("add_sta", "ieee80211w", 2)
station_profile.set_command_flag("add_sta", "80211u_enable", 0)
station_profile.set_command_flag("add_sta", "8021x_radius", 1)
if not self.soft_roam:
@@ -484,30 +502,37 @@ class HardRoam(Realm):
station_profile.set_command_flag("add_sta", "power_save_enable", 1)
# station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f")
station_profile.set_wifi_extra(key_mgmt="FT-EAP ",
pairwise="[BLANK]",
group="[BLANK]",
psk="[BLANK]",
eap="TTLS",
identity=self.identity,
passwd=self.ttls_pass,
pin="",
phase1="NA",
phase2="NA",
pac_file="NA",
private_key="NA",
pk_password="NA",
hessid="00:00:00:00:00:01",
realm="localhost.localdomain",
client_cert="NA",
imsi="NA",
milenage="NA",
domain="localhost.localdomain",
roaming_consortium="NA",
venue_group="NA",
network_type="NA",
ipaddr_type_avail="NA",
network_auth_type="NA",
anqp_3gpp_cell_net="NA")
pairwise=self.pairwise_cipher,
group=self.groupwise_cipher,
eap=self.eap_method,
identity=self.eap_identity,
passwd=self.eap_password,
ca_cert=self.ca_cert,
private_key=self.private_key,
pk_password=self.pk_passwd)
if self.sta_type == "11r-eap-sha384": # wpa3 enterprise
station_profile.set_command_flag("set_port", "rpt_timer", 1)
station_profile.set_command_flag("add_sta", "80211u_enable", 0)
station_profile.set_command_flag("add_sta", "8021x_radius", 1)
if not self.soft_roam:
station_profile.set_command_flag("add_sta", "disable_roam", 1)
if self.soft_roam:
if self.option == "otds":
station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1)
station_profile.set_command_flag("add_sta", "power_save_enable", 1)
station_profile.set_wifi_extra(key_mgmt="FT-EAP-SHA384 ",
pairwise=self.pairwise_cipher,
group=self.groupwise_cipher,
eap=self.eap_method,
identity=self.eap_identity,
passwd=self.eap_password,
ca_cert=self.ca_cert,
private_key=self.private_key,
pk_password=self.pk_passwd)
# enabling ieee80211w flag
if self.ieee80211w:
station_profile.set_command_param("add_sta", "ieee80211w", self.ieee80211w)
station_profile.create(radio=radio, sta_names_=station_list)
print("Waiting for ports to appear")
logging.info("Waiting for ports to appear")
@@ -565,7 +590,7 @@ class HardRoam(Realm):
def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, side_a_min_pdu,
side_b_min_pdu, traffic_type, sta_list):
print("Station List :", sta_list)
logging.info("Station List : ", str(sta_list))
logging.info(f"Station List : {str(sta_list)}")
print(type(sta_list))
print("Upstream port :", self.upstream)
logging.info(str(self.upstream))
@@ -617,7 +642,7 @@ class HardRoam(Realm):
obj.resource = "all"
obj.sta_clean()
obj.cxs_clean()
obj.layer3_endp_clean()
# obj.layer3_endp_clean()
# Get client data from lf
def station_data_query(self, station_name="wlan0", query="channel"):
@@ -639,8 +664,7 @@ class HardRoam(Realm):
def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60):
self.pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap"
self.pcap_obj_2 = sniff_radio.SniffRadio(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port,
radio=radio, channel=radio_channel, monitor_name="monitor",
channel_bw="20")
radio=radio, channel=radio_channel, monitor_name="monitor")
self.pcap_obj_2.setup(0, 0, 0)
time.sleep(5)
self.pcap_obj_2.monitor.admin_up()
@@ -2176,6 +2200,65 @@ EXAMPLE: For multiple station and multiple iteration
--security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota"
--dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True
NOTES:
* For enterprise authentication
--eap_method <eap_method>
Add this argument to specify the EAP method
example:
TLS, TTLS, PEAP
--pairwise_cipher [BLANK]
Add this argument to specify the type of pairwise cipher
DEFAULT
CCMP
TKIP
NONE
CCMP-TKIP
CCMP-256
GCMP
GCMP-256
CCMP/GCMP-256
--groupwise_cipher [BLANK]
Add this argument to specify the type of groupwise cipher
DEFAULT
CCMP
TKIP
WEP104
WEP40
GTK_NOT_USED
GCMP-256
CCMP-256
GCMP/CCMP-256
ALL
--eap_identity <eap_identity>
Add this argument to specify the username of radius server
--eap_password <eap_password>
Add this argument to specify the password of radius server
--pk_passwd <private_key_passsword>
Add this argument to specify the private key password
Required only for TLS
--ca_cert <path_to_certificate>
Add this argument to specify the certificate path
Required only for TLS
example:
/home/lanforge/ca.pem
--private_key <path_to_private_key>
Add this argument to specify the private key path
Required only for TLS
example:
/home/lanforge/client.p12
===============================================================================
@@ -2205,14 +2288,56 @@ EXAMPLE: For multiple station and multiple iteration
required.add_argument('--duration_based', help='Duration based', default=False, action='store_true')
required.add_argument('--dut_name', help='', default=["AP1", "AP2"]) # ["AP687D.B45C.1D1C", "AP2C57.4152.385C"]
required.add_argument('--traffic_type', help='To chose the traffic type', default="lf_udp")
required.add_argument('--identity', help='Radius server identity', default="testuser")
required.add_argument('--ttls_pass', help='Radius Server passwd', default="testpasswd")
# eap authentication
required.add_argument('--eap_method', help='Enter EAP method e.g: TLS', default=None)
required.add_argument('--eap_identity', help='Radius server identity', default='[BLANK]')
required.add_argument('--eap_password', help='Radius Server password', default='[BLANK]')
required.add_argument('--pairwise_cipher',
help='Pairwise Ciphers\n'
'DEFAULT\n'
'CCMP\n'
'TKIP\n'
'NONE\n'
'CCMP-TKIP\n'
'CCMP-256\n'
'GCMP\n'
'GCMP-256\n'
'CCMP/GCMP-256',
default='[BLANK]')
required.add_argument('--groupwise_cipher', type=str,
help='Groupwise Ciphers\n'
'DEFAULT\n'
'CCMP\n'
'TKIP\n'
'WEP104\n'
'WEP40\n'
'GTK_NOT_USED\n'
'GCMP-256\n'
'CCMP-256\n'
'GCMP/CCMP-256\n'
'ALL',
default='[BLANK]')
required.add_argument('--private_key',
help='Enter private key path e.g: /home/lanforge/client.p12', default='[BLANK]')
required.add_argument('--pk_passwd', help='Enter the private key password', default='[BLANK]')
required.add_argument('--ca_cert', help='Enter path for certificate e.g: /home/lanforge/ca.pem',
default='[BLANK]')
required.add_argument("--eap_phase1", help="EAP Phase 1 (outer authentication, i.e. TLS tunnel) parameters.\n"
"For example, \"peapver=0\" or \"peapver=1 peaplabel=1\".\n"
"Some WPA Enterprise setups may require \"auth=MSCHAPV2\"",
default="[BLANK]")
required.add_argument("--eap_phase2", help="EAP Phase 2 (inner authentication) parameters.\n"
"For example, \"autheap=MSCHAPV2 autheap=MD5\" for EAP-TTLS.",
default="[BLANK]")
required.add_argument('--log_file', help='To get the log file, need to pass the True', default=False)
required.add_argument('--debug', help='To enable/disable debugger, need to pass the True/False', default=False)
required.add_argument('--soft_roam', help='To enable soft rome eg. --soft_rome True', default=False)
required.add_argument('--sta_type', type=str, help="provide the type of"
" client you want to creatE i.e 11r,11r-sae,"
" 11r-sae-802.1x or simple as none", default="11r")
required.add_argument('--sta_type', type=str, help="provide the type of client you want to create "
"i.e 11r, 11r-sae, 11r-eap, 11r-eap-sha384, normal",
default="11r")
required.add_argument('--ieee80211w', help='--ieee80211w <disabled(0),optional(1),required(2)', default=None)
required.add_argument('--multicast', default=False, help="set to true only if we want multicast "
"traffic run along the hard roam process")
@@ -2239,6 +2364,32 @@ EXAMPLE: For multiple station and multiple iteration
print(help_summary)
exit(0)
# validating arguments if EAP method selected
if args.eap_method is not None:
if args.eap_identity == '[BLANK]':
print("--eap_identity required")
exit(1)
elif args.eap_password == '[BLANK]':
print("--eap_password required")
exit(1)
elif args.eap_method == 'TLS':
if args.pk_passwd == '[BLANK]':
print("--pk_passwd required")
exit(1)
elif args.ca_cert == '[BLANK]':
print('--ca_cert required')
exit(1)
elif args.private_key == '[BLANK]':
print('--private_key required')
exit(0)
# if security is wpa3, it is necessary to have a pairwise_cipher, groupwise_cipher values
if '11r-eap-sha384' in args.sta_type:
if args.pairwise_cipher == '[BLANK]':
print('--pairwise_cipher required')
exit(1)
elif args.groupwise_cipher == '[BLANK]':
print('--groupwise_cipher required')
exit(1)
obj = HardRoam(lanforge_ip=args.mgr,
lanforge_port=args.lanforge_port,
lanforge_ssh_port=args.lanforge_ssh_port,
@@ -2272,17 +2423,26 @@ EXAMPLE: For multiple station and multiple iteration
port="8888",
band_cc="5g",
timeout="10",
identity=args.identity,
ttls_pass=args.ttls_pass,
eap_method=args.eap_method,
eap_identity=args.eap_identity,
eap_password=args.eap_password,
pairwise_cipher=args.pairwise_cipher,
groupwise_cipher=args.groupwise_cipher,
private_key=args.private_key,
pk_passwd=args.pk_passwd,
ca_cert=args.ca_cert,
eap_phase1=args.eap_phase1,
eap_phase2=args.eap_phase2,
soft_roam=args.soft_roam,
sta_type=args.sta_type,
ieee80211w=args.ieee80211w,
multicast=args.multicast
)
x = os.getcwd()
print("Current Working Directory :", x)
file = obj.generate_csv()
print("CSV File :", file)
obj.precleanup()
# obj.precleanup()
kernel, message = obj.run(file_n=file)
report_dir_name = obj.generate_report(csv_list=file, kernel_lst=kernel, current_path=str(x) + "/tests")
print(report_dir_name)