Added eap connectivity test

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2022-09-08 17:51:24 +05:30
parent e11e933fda
commit c9fb312750

View File

@@ -49,6 +49,8 @@ csvtoinflux = importlib.import_module("py-scripts.csv_to_influx")
CSVtoInflux = csvtoinflux.CSVtoInflux
lf_dataplane_test = importlib.import_module("py-scripts.lf_dataplane_test")
DataplaneTest = lf_dataplane_test.DataplaneTest
ttlstest = importlib.import_module("py-scripts.test_ipv4_ttls")
TTLSTest = ttlstest.TTLSTest
class lf_tests(lf_libs):
@@ -78,6 +80,7 @@ class lf_tests(lf_libs):
attachment_type=allure.attachment_type.JSON)
if data == {}:
pytest.skip("Skipping This Test")
# list of multiple sta_connect objects
sta_connect_obj = []
for dut in data:
for radio in data[dut]["station_data"]:
@@ -293,8 +296,250 @@ class lf_tests(lf_libs):
return result, description
def enterprise_client_connectivity_test(self):
pass
def enterprise_client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", bssid="[BLANK]", dut_data={},
security="open", extra_securities=[], sta_mode=0, key_mgmt="WPA-EAP",
pairwise="NA", group="NA", wpa_psk="DEFAULT", ttls_passwd="nolastart",
ieee80211w=1, wep_key="NA", ca_cert="NA", eap="TTLS", identity="nolaradius",
d_vlan=False, cleanup=True,
num_sta=1, mode="BRIDGE", vlan_id=[None], band="twog",
allure_attach=True, runtime_secs=40):
logging.info("DUT Data:\n" + json.dumps(str(dut_data), indent=2))
allure.attach(name="DUT Data:\n", body=json.dumps(str(dut_data), indent=2),
attachment_type=allure.attachment_type.JSON)
data = self.setup_interfaces(ssid=ssid, bssid=bssid, passkey=passkey, encryption=security,
band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta)
logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2))
allure.attach(name="Interface Info: \n", body=json.dumps(str(data), indent=2),
attachment_type=allure.attachment_type.JSON)
if data == {}:
pytest.skip("Skipping This Test")
# list of multiple eap_connect objects
eap_connect_objs = []
for dut in data:
for radio in data[dut]["station_data"]:
obj_eap_connect = TTLSTest(host=self.manager_ip, port=self.manager_http_port,
sta_list=data[dut]["station_data"][radio], vap=False)
obj_eap_connect.station_profile.sta_mode = sta_mode
obj_eap_connect.upstream_resource = data[dut]["upstream_resource"]
obj_eap_connect.l3_cx_obj_udp.upstream_resource = data[dut]["upstream_resource"]
obj_eap_connect.l3_cx_obj_tcp.upstream_resource = data[dut]["upstream_resource"]
obj_eap_connect.l3_cx_obj_udp.upstream = data[dut]["upstream"]
obj_eap_connect.l3_cx_obj_tcp.upstream = data[dut]["upstream"]
self.enable_verbose_debug(radio=radio, enable=True)
obj_eap_connect.radio = radio
obj_eap_connect.admin_down(radio)
obj_eap_connect.admin_up(radio)
# changed to auto channel
self.set_radio_channel(radio=radio, channel="AUTO")
logging.info("scan ssid radio: " + str(radio.split(".")[2]))
result = self.scan_ssid(radio=radio, ssid=ssid)
logging.info("ssid scan data : " + str(result))
if not result:
# Sniffer required
for duts in self.dut_data:
identifier = duts["identifier"]
if dut_data.keys().__contains__(identifier):
if band == "twog":
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
if data[dut]["sniff_radio_2g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_2g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
elif band == "fiveg":
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
if data[dut]["sniff_radio_5g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_5g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
elif band == "sixg":
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]
if data[dut]["sniff_radio_6g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_6g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
if not result:
pytest.fail("SSID is not Available in Scan Result")
if eap == "TTLS":
obj_eap_connect.ieee80211w = ieee80211w
obj_eap_connect.key_mgmt = key_mgmt
obj_eap_connect.station_profile.set_command_flag("add_sta", "80211u_enable", 0)
obj_eap_connect.identity = identity
obj_eap_connect.ttls_passwd = ttls_passwd
obj_eap_connect.pairwise = pairwise
obj_eap_connect.group = group
if eap == "TLS":
obj_eap_connect.key_mgmt = key_mgmt
obj_eap_connect.station_profile.set_command_flag("add_sta", "80211u_enable", 0)
obj_eap_connect.eap = eap
obj_eap_connect.identity = "user"
obj_eap_connect.ttls_passwd = "password"
obj_eap_connect.private_key = "/home/lanforge/client.p12"
obj_eap_connect.ca_cert = "/home/lanforge/ca.pem"
obj_eap_connect.pk_passwd = "whatever"
obj_eap_connect.ieee80211w = 1
obj_eap_connect.ssid = data[dut]["ssid"]
obj_eap_connect.password = data[dut]["passkey"]
obj_eap_connect.security = data[dut]["encryption"]
obj_eap_connect.sta_list = data[dut]["station_data"][radio]
obj_eap_connect.build(extra_securities=extra_securities)
eap_connect_objs.append(obj_eap_connect)
for dut_ in self.dut_data:
identifier = dut_["identifier"]
if dut_data.keys().__contains__(identifier):
if band == "twog":
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_2g"].split(".")[2],
duration=runtime_secs)
logging.info("started-sniffer")
for obj in eap_connect_objs:
obj.start(obj.sta_list, True, True, wait_time=1)
logging.info("napping %f sec" % runtime_secs)
time.sleep(runtime_secs)
self.stop_sniffer()
elif band == "fiveg":
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_5g"].split(".")[2],
duration=runtime_secs)
for obj in eap_connect_objs:
obj.start(obj.sta_list, True, True, wait_time=1)
logging.info("napping %f sec" % runtime_secs)
time.sleep(runtime_secs)
self.stop_sniffer()
elif band == "sixg":
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_6g"].split(".")[2],
duration=runtime_secs)
for obj in eap_connect_objs:
obj.start(obj.sta_list, True, True, wait_time=1)
logging.info("napping %f sec" % runtime_secs)
time.sleep(runtime_secs)
self.stop_sniffer()
else:
for obj in eap_connect_objs:
obj.start(obj.sta_list, True, True, wait_time=1)
logging.info("napping %f sec" % runtime_secs)
time.sleep(runtime_secs)
pass_fail_result = []
for obj in eap_connect_objs:
sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"]
station_data = self.get_station_data(sta_name=obj.station_names, rows=sta_rows,
allure_attach=False)
sta_table_dict = {}
sta_table_dict["station name"] = list(station_data.keys())
for i in sta_rows:
temp_list = []
for j in obj.station_names:
temp_list.append(station_data[j][i])
sta_table_dict[i] = temp_list
# pass fail
pass_fail_sta = []
for i in sta_table_dict["ip"]:
if i == "0.0.0.0":
pass_fail_sta.append("Fail")
else:
pass_fail_sta.append("Pass")
sta_table_dict["Pass/Fail"] = pass_fail_sta
if allure_attach:
self.attach_table_allure(data=sta_table_dict, allure_name="station data")
obj.stop()
cx_name = list(obj.l3_udp_profile.get_cx_names()) + list(
obj.l3_tcp_profile.get_cx_names())
cx_row = ["type", "bps rx a", "bps rx b"]
cx_data = self.get_cx_data(cx_name=cx_name, cx_data=cx_row, allure_attach=False)
cx_table_dict = {}
upstream = []
for i in range(len(obj.station_names)):
upstream.append(data[dut]["upstream_port"])
cx_table_dict["Upstream"] = upstream
cx_table_dict["Downstream"] = obj.station_names
cx_tcp_ul = []
cx_tcp_dl = []
cx_udp_ul = []
cx_udp_dl = []
for sta in obj.station_names:
for i in cx_data:
if sta.split(".")[2] in i:
if cx_data[i]["type"] == "LF/UDP":
cx_udp_dl.append(cx_data[i]["bps rx a"])
cx_udp_ul.append(cx_data[i]["bps rx b"])
elif cx_data[i]["type"] == "LF/TCP":
cx_tcp_dl.append(cx_data[i]["bps rx a"])
cx_tcp_ul.append(cx_data[i]["bps rx b"])
cx_table_dict["TCP DL"] = cx_tcp_dl
cx_table_dict["TCP UL"] = cx_tcp_ul
cx_table_dict["UDP DL"] = cx_udp_dl
cx_table_dict["UDP UL"] = cx_udp_ul
pass_fail_cx = []
for i, j, k, l in zip(cx_tcp_dl, cx_tcp_ul, cx_udp_dl, cx_udp_ul):
if i == 0 or j == 0 or k == 0 or l == 0:
pass_fail_cx.append("Fail")
else:
pass_fail_cx.append("Pass")
cx_table_dict["Pass/Fail"] = pass_fail_cx
if allure_attach:
self.attach_table_allure(data=cx_table_dict, allure_name="cx data")
obj.cleanup()
result = "PASS"
description = "Unknown error"
count = 0
temp_dict = {}
if "Fail" in pass_fail_sta:
count = count + 1
result = "FAIL"
description = "Station did not get an ip"
temp_dict[result] = description
pass_fail_result.append(temp_dict)
if count == 0:
if "Fail" in pass_fail_cx:
result = "FAIL"
description = "did not report traffic"
temp_dict[result] = description
pass_fail_result.append(temp_dict)
if obj.passes():
logging.info("client connection to" + str(obj.dut_ssid) + "successful. Test Passed")
result = "PASS"
temp_dict[result] = ""
pass_fail_result.append(temp_dict)
else:
logging.info("client connection to" + str(obj.dut_ssid) + "unsuccessful. Test Failed")
result = "FAIL"
for obj in eap_connect_objs:
try:
print("1." + str(obj.resource) + "." + str(obj.radio))
self.get_supplicant_logs(radio=str(obj.radio))
except Exception as e:
logging.error("client_cpnnectivity_tests() -- Error in getting Supplicant Logs:" + str(e))
result = "PASS"
description = ""
for i in pass_fail_result:
if list(i.keys())[0] == "FAIL":
result = "FAIL"
description = i["FAIL"]
break
return result, description
def rate_vs_range_test(self):
pass
@@ -308,7 +553,7 @@ class lf_tests(lf_libs):
def client_connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog",
vlan_id=[None], num_sta=None, scan_ssid=True, sta_mode=0,
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
allure_attach=True, identifier=None, allure_name="station data"):
allure_attach=True, identifier=None, allure_name="station data", client_type=None):
if identifier is None:
identifier = self.dut_data[0]["identifier"]
logging.info("Identifier: " + str(identifier))
@@ -740,7 +985,7 @@ class lf_tests(lf_libs):
move_to_influx=False,
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip",
"signal"],
allure_attach=True, allure_name="station data"):
allure_attach=True, allure_name="station data", client_type=None):
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
dataplane_obj_list = []
for dut in self.dut_data:
@@ -749,7 +994,7 @@ class lf_tests(lf_libs):
vlan_id=vlan_id, num_sta=num_sta, scan_ssid=scan_ssid, sta_mode=sta_mode,
station_data=station_data,
allure_attach=allure_attach, identifier=identifier,
allure_name=allure_name)
allure_name=allure_name, client_type=client_type)
if mode == "BRIDGE":
ret = self.get_wan_upstream_ports()