diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 28ace8ea..d546a38c 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -302,8 +302,102 @@ class lf_tests(lf_libs): def multiband_performance_test(self): pass - def multi_psk_test(self): - pass + def multi_psk_test(self, band="twog", mpsk_data=None, ssid="'OpenWifi'", bssid="['BLANK']", passkey="OpenWifi", + encryption="wpa", vlan_id=None, mode="BRIDGE", num_sta=1, dut_data=None): + if mpsk_data is None: + mpsk_data={100: {"num_stations": num_sta, "passkey": "OpenWifi1"}, 200: {"num_stations": num_sta, "passkey": "OpenWifi2"}} + if dut_data is None: + dut_data = self.dut_data + # setup_data = self.setup_interfaces(ssid="OpenWifi", bssid="", passkey="OpenWifi", encryption="wpa", + # vlan_id=list(mpsk_data.keys()), mode="VLAN", band="twog", num_sta=1) + logging.info("Creating VLAN's") + # create VLAN's + self.add_vlan(vlan_ids=list(mpsk_data.keys())) + # query vlan and fetch Ip Address + port_data=self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces'] + # Fail if Vlan don't have IP + for i in port_data: + for item in i: + if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0': + logging.error(f"VLAN Interface - {i[item]['alias']} do not have IP") + pytest.fail("VLAN don't have IP") + break + else: + pass + + #create stations + for key in range(len(list(mpsk_data.keys()))): + passkey = mpsk_data[key]["passkey"] + self.client_connect(ssid=ssid, passkey=passkey, security=encryption, mode=mode, band=band, + vlan_id=[vlan_id], num_sta=num_sta, scan_ssid=True, + station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"], + allure_attach=True) + + # check Pass/Fail + res_data=self.json_get(_req_url='port?fields=alias,port+type,ip,mac', )['interfaces'] + table_heads=["station name", "configured vlan-id", "expected IP Range", "allocated IP", "mac address", + 'pass/fail'] + table_data=[] + temp={'sta100': '', 'sta200': '', 'sta00': ''} + for i in res_data: + for item in i: + if i[item]['port type'] == 'Ethernet' and i[item]['alias'] == self.upstream_port: + if mode == 'NAT': + temp.update({'sta00': '192.168.1.1'}) + else: + temp.update({'sta00': i[item]['ip']}) + if i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port + ".100"): + temp.update({'sta100': i[item]['ip']}) + elif i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port + ".200"): + temp.update({'sta200': i[item]['ip']}) + for i in res_data: + for item in i: + if i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == "sta100": + exp1=temp['sta100'].split('.') + ip1=i[item]['ip'].split('.') + if exp1[0] == ip1[0] and exp1[1] == ip1[1]: + pf='PASS' + else: + pf='FAIL' + result1='Fail' + table_data.append( + [i[item]['alias'], '100', f'{exp1[0]}.{exp1[1]}.X.X', i[item]['ip'], i[item]['mac'], + f'{pf}']) + elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta200': + exp2=temp['sta200'].split('.') + ip2=i[item]['ip'].split('.') + if exp2[0] == ip2[0] and exp2[1] == ip2[1]: + pf='PASS' + else: + pf='FAIL' + result1='Fail' + table_data.append( + [i[item]['alias'], '200', f'{exp2[0]}.{exp2[1]}.X.X', i[item]['ip'], i[item]['mac'], f'{pf}']) + elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta00': + exp3=temp['sta00'].split('.') + ip2=i[item]['ip'].split('.') + if mode == "BRIDGE": + if exp3[0] == ip2[0] and exp3[1] == ip2[1]: + pf='PASS' + else: + pf='FAIL' + result1='Fail' + table_data.append([i[item]['alias'], 'WAN upstream', f'{exp3[0]}.{exp3[1]}.X.X', i[item]['ip'], + i[item]['mac'], f'{pf}']) + elif mode == "NAT": + if exp3[0] == '192' and exp3[1] == '168': + pf='PASS' + else: + pf='FAIL' + result1='Fail' + table_data.append( + [i[item]['alias'], 'LAN upstream', f'192.168.X.X', i[item]['ip'], i[item]['mac'], f'{pf}']) + print(table_data) + # attach test data in a table to allure + report_obj=Report() + table_info=report_obj.table2(table=table_data, headers=table_heads) + allure.attach(name="Test Validation Check", body=table_info) + logging.info(str("\n") + str(table_info)) def client_connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog", vlan_id=[None], num_sta=None, scan_ssid=True, sta_mode=0, @@ -318,8 +412,8 @@ class lf_tests(lf_libs): all_identifier_list.append(dut["identifier"]) print(all_identifier_list) if identifier not in all_identifier_list: - logging.error("Identifier is missinhg") - pytest.fail("Identifier is missinhg") + logging.error("Identifier is missing") + pytest.fail("Identifier is missing") data = self.setup_interfaces(ssid=ssid, passkey=passkey, encryption=security, band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta) @@ -867,7 +961,7 @@ if __name__ == '__main__': "testbed": "basic", "scenario": "dhcp-bridge", "details": { - "manager_ip": "10.28.3.12", + "manager_ip": "10.28.3.34", "http_port": 8080, "ssh_port": 22, "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, @@ -897,3 +991,60 @@ if __name__ == '__main__': obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]), log_level=logging.DEBUG, run_lf=False) + + # obj.add_stations() + # obj.add_stations(band="5G") + # obj.chamber_view(raw_lines="custom") + # dut = {'0000c1018812': {"ssid_data": { + # 0: {"ssid": 'TestSSID-2G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '2G', + # "bssid": '00:00:C1:01:88:15'}, + # 1: {"ssid": 'TestSSID-5G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '5G', + # "bssid": '00:00:C1:01:88:14'}}, "radio_data": {'2G': [1, 40, 2422], '5G': [36, 80, 5210], '6G': None}}} + # obj.wifi_capacity(instance_name="test_client_wpa2_BRIDGE_udp_bi", mode="BRIDGE", + # vlan_id=[100], + # download_rate="1Gbps", batch_size="1,5,10,20,40,64,128,256", + # influx_tags="Jitu", + # upload_rate="1Gbps", protocol="UDP-IPv4", duration="60000", + # move_to_influx=False, dut_data=dut, ssid_name="OpenWifi", + # num_stations={"2G": 10, "5G": 10}) + # A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1) + # print(A) + # obj.setup_relevent_profiles() + # obj.client_connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog", + # vlan_id=[None], num_sta=65, scan_ssid=True, + # station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"], + # allure_attach=True) + obj.multi_psk_test(band="twog", mpsk_data=None, ssid="['BLANK']", bssid="['BLANK']", passkey="['BLANK']", + encryption="wpa2", vlan_id=None, mode=None, num_sta=1) + # obj.add_vlan(vlan_iFds=[100]) + # obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600]) + # obj.get_cx_data() + # obj.chamber_view() + # dut = {'0000c1018812': [['ssid_open_2g_nat', 'open', 'something', '2G', '6A:21:5F:DA:45:6F'], + # {'2G': [6, 40, 2437], '5G': None, '6G': None}]} + + # passes, result = obj.client_connectivity_test(ssid="ssid_open_2g_nat", passkey="something", security="open", + # extra_securities=[], + # num_sta=1, mode="NAT-WAN", dut_data=dut, + # band="twog") + # print(passes == "PASS", result) + # # obj.start_sniffer(radio_channel=1, radio="wiphy7", test_name="sniff_radio", duration=30) + # print("started") + # time.sleep(30) + # obj.stop_sniffer() + # lf_report.pull_reports(hostname="10.28.3.28", port=22, username="lanforge", + # password="lanforge", + # report_location="/home/lanforge/" + "sniff_radio.pcap", + # report_dir=".") + # def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): + # + # obj.get_cx_data() + # obj.chamber_view() + # obj.client_connectivity_test(ssid="wpa2_5g", passkey="something", security="wpa2", extra_securities=[], + # num_sta=1, mode="BRIDGE", vlan_id=1, + # # band="fiveg", ssid_channel=36) + # obj.chamber_view() + # obj.setup_relevent_profiles() + # obj.add_vlan(vlan_ids=[100, 200, 300]) + # # obj.chamber_view() + # obj.setup_relevent_profiles()