mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-03 20:27:54 +00:00
Added client connect methods
Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
@@ -7,6 +7,7 @@ import sys
|
|||||||
import time
|
import time
|
||||||
import string
|
import string
|
||||||
import random
|
import random
|
||||||
|
from itertools import islice
|
||||||
import paramiko
|
import paramiko
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
@@ -309,24 +310,39 @@ class lf_tests(lf_libs):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def client_connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog",
|
def client_connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog",
|
||||||
vlan_id=100, num_sta=None, scan_ssid=True,
|
vlan_id=[None], num_sta=None, scan_ssid=True, sta_mode=0,
|
||||||
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
|
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
|
||||||
allure_attach=True):
|
allure_attach=True, identifier=None, allure_name="station data"):
|
||||||
data = self.setup_interfaces(band=band, vlan_id=vlan_id, mode=mode, num_sta=num_sta)
|
if identifier is None:
|
||||||
logging.info("Setup interface data" + str(data))
|
identifier = self.dut_data[0]["identifier"]
|
||||||
if self.run_lf:
|
logging.info("Identifier: " + str(identifier))
|
||||||
ssid = data["ssid"]
|
else:
|
||||||
passkey = data["passkey"]
|
all_identifier_list = []
|
||||||
security = data["security"]
|
for dut in self.dut_data:
|
||||||
|
all_identifier_list.append(self.dut_data[dut]["indentifier"])
|
||||||
|
print(all_identifier_list)
|
||||||
|
if identifier not in all_identifier_list:
|
||||||
|
logging.error("Identifier is missinhg")
|
||||||
|
pytest.fail("Identifier is missinhg")
|
||||||
|
|
||||||
|
data = self.setup_interfaces(ssid=ssid, passkey=passkey, encryption=security,
|
||||||
|
band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta)
|
||||||
|
|
||||||
|
logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2))
|
||||||
|
allure.attach(name="Interface Info: \n", body=json.dumps(str(data), indent=2),
|
||||||
|
attachment_type=allure.attachment_type.JSON)
|
||||||
|
if data == {}:
|
||||||
|
pytest.skip("Skipping This Test")
|
||||||
client_connect_obj = []
|
client_connect_obj = []
|
||||||
station_data_all = {}
|
station_data_all = {}
|
||||||
for radio in data["radios"]:
|
for radio in data[identifier]["station_data"]:
|
||||||
client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port,
|
client_connect = CreateStation(_host=self.manager_ip, _port=self.manager_http_port,
|
||||||
_sta_list=data["radios"][radio], _password=passkey, _ssid=ssid,
|
_sta_list=data[identifier]["station_data"][radio], _password=data[identifier]["passkey"],
|
||||||
_security=security)
|
_ssid=data[identifier]["ssid"],
|
||||||
client_connect.station_profile.sta_mode = 0
|
_security=data[identifier]["encryption"])
|
||||||
client_connect.upstream_resource = data["upstream_port"].split(".")[1]
|
client_connect.station_profile.sta_mode = sta_mode
|
||||||
client_connect.upstream_port = data["upstream_port"].split(".")[2]
|
client_connect.upstream_resource = data[identifier]["upstream_resource"]
|
||||||
|
client_connect.upstream_port = data[identifier]["upstream"]
|
||||||
client_connect.radio = radio
|
client_connect.radio = radio
|
||||||
logging.info("scan ssid radio: " + str(client_connect.radio))
|
logging.info("scan ssid radio: " + str(client_connect.radio))
|
||||||
if scan_ssid:
|
if scan_ssid:
|
||||||
@@ -357,7 +373,7 @@ class lf_tests(lf_libs):
|
|||||||
pass_fail_sta.append("Pass")
|
pass_fail_sta.append("Pass")
|
||||||
sta_table_dict["Pass/Fail"] = pass_fail_sta
|
sta_table_dict["Pass/Fail"] = pass_fail_sta
|
||||||
if allure_attach:
|
if allure_attach:
|
||||||
self.attach_table_allure(data=sta_table_dict, allure_name="station data")
|
self.attach_table_allure(data=sta_table_dict, allure_name=allure_name)
|
||||||
|
|
||||||
logging.info("pass_fail result: " + str(pass_fail))
|
logging.info("pass_fail result: " + str(pass_fail))
|
||||||
if False in pass_fail:
|
if False in pass_fail:
|
||||||
@@ -367,6 +383,27 @@ class lf_tests(lf_libs):
|
|||||||
logging.info("ALL Stations got IP's")
|
logging.info("ALL Stations got IP's")
|
||||||
return station_data_all
|
return station_data_all
|
||||||
|
|
||||||
|
# def dfs_test(self, ssid=None, security=None, passkey=None, mode=None,
|
||||||
|
# band=None, num_sta=1, vlan_id=[None], dut_data={}):
|
||||||
|
# """DFS test"""
|
||||||
|
# if len(dut_data) == 0:
|
||||||
|
# logging.error("DUT data is empty")
|
||||||
|
# pytest.fail("DUT data is empty")
|
||||||
|
# for identifier in dut_data:
|
||||||
|
# station_data = self.client_connect(ssid=ssid, security=security, passkey=passkey, mode=mode,
|
||||||
|
# band=band, num_sta=num_sta, vlan_id=vlan_id,
|
||||||
|
# allure_name="Data before simulate radar", identifier=identifier)
|
||||||
|
# station_list = list(station_data.kyes())
|
||||||
|
# pass_fail = []
|
||||||
|
# logging.info("AP channel: " + str(ap_channel))
|
||||||
|
#
|
||||||
|
# #run the simulate radar command
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None):
|
def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None):
|
||||||
|
|
||||||
dut_name = []
|
dut_name = []
|
||||||
@@ -566,8 +603,6 @@ class lf_tests(lf_libs):
|
|||||||
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
|
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
|
||||||
if mode == "BRIDGE":
|
if mode == "BRIDGE":
|
||||||
ret = self.get_wan_upstream_ports()
|
ret = self.get_wan_upstream_ports()
|
||||||
print("ret", ret)
|
|
||||||
print("dut", dut)
|
|
||||||
upstream_port = ret[identifier]
|
upstream_port = ret[identifier]
|
||||||
|
|
||||||
if mode == "NAT-WAN":
|
if mode == "NAT-WAN":
|
||||||
@@ -584,8 +619,8 @@ class lf_tests(lf_libs):
|
|||||||
else:
|
else:
|
||||||
self.add_vlan(vlan_ids=[vlan_id])
|
self.add_vlan(vlan_ids=[vlan_id])
|
||||||
ret = self.get_wan_upstream_ports()
|
ret = self.get_wan_upstream_ports()
|
||||||
upstream_data = ret[identifier] + "." + str(vlan_id)
|
upstream_port = ret[identifier] + "." + str(vlan_id)
|
||||||
logging.info("Upstream data: ", upstream_port)
|
logging.info("Upstream data: " + str(upstream_port))
|
||||||
sets = [["DUT_NAME", dut]]
|
sets = [["DUT_NAME", dut]]
|
||||||
'''SINGLE WIFI CAPACITY using lf_wifi_capacity.py'''
|
'''SINGLE WIFI CAPACITY using lf_wifi_capacity.py'''
|
||||||
for band_ in num_stations:
|
for band_ in num_stations:
|
||||||
@@ -637,7 +672,7 @@ class lf_tests(lf_libs):
|
|||||||
if move_to_influx:
|
if move_to_influx:
|
||||||
try:
|
try:
|
||||||
report_name = "../reports/" + \
|
report_name = "../reports/" + \
|
||||||
wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
|
||||||
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
|
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
|
||||||
influx_port=self.influx_params["influx_port"],
|
influx_port=self.influx_params["influx_port"],
|
||||||
influx_org=self.influx_params["influx_org"],
|
influx_org=self.influx_params["influx_org"],
|
||||||
@@ -649,7 +684,7 @@ class lf_tests(lf_libs):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
pass
|
pass
|
||||||
report_name = wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
report_name = wificapacity_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
|
||||||
|
|
||||||
self.attach_report_graphs(report_name=report_name)
|
self.attach_report_graphs(report_name=report_name)
|
||||||
self.attach_report_kpi(report_name=report_name)
|
self.attach_report_kpi(report_name=report_name)
|
||||||
@@ -906,29 +941,31 @@ if __name__ == '__main__':
|
|||||||
}
|
}
|
||||||
|
|
||||||
obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]),
|
obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]),
|
||||||
log_level=logging.DEBUG, run_lf=True)
|
log_level=logging.DEBUG, run_lf=False)
|
||||||
# obj.add_stations()
|
# obj.add_stations()
|
||||||
# obj.add_stations(band="5G")
|
# obj.add_stations(band="5G")
|
||||||
# obj.chamber_view(raw_lines="custom")
|
# obj.chamber_view(raw_lines="custom")
|
||||||
dut = {'0000c1018812': {"ssid_data": {
|
# dut = {'0000c1018812': {"ssid_data": {
|
||||||
0: {"ssid": 'TestSSID-2G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '2G',
|
# 0: {"ssid": 'TestSSID-2G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '2G',
|
||||||
"bssid": '00:00:C1:01:88:15'},
|
# "bssid": '00:00:C1:01:88:15'},
|
||||||
1: {"ssid": 'TestSSID-5G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '5G',
|
# 1: {"ssid": 'TestSSID-5G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '5G',
|
||||||
"bssid": '00:00:C1:01:88:14'}}, "radio_data": {'2G': [1, 40, 2422], '5G': [36, 80, 5210], '6G': None}}}
|
# "bssid": '00:00:C1:01:88:14'}}, "radio_data": {'2G': [1, 40, 2422], '5G': [36, 80, 5210], '6G': None}}}
|
||||||
obj.wifi_capacity(instance_name="test_client_wpa2_BRIDGE_udp_bi", mode="BRIDGE",
|
# obj.wifi_capacity(instance_name="test_client_wpa2_BRIDGE_udp_bi", mode="BRIDGE",
|
||||||
vlan_id=[100],
|
# vlan_id=[100],
|
||||||
download_rate="1Gbps", batch_size="1,5,10,20,40,64,128,256",
|
# download_rate="1Gbps", batch_size="1,5,10,20,40,64,128,256",
|
||||||
influx_tags="Jitu",
|
# influx_tags="Jitu",
|
||||||
upload_rate="1Gbps", protocol="UDP-IPv4", duration="60000",
|
# upload_rate="1Gbps", protocol="UDP-IPv4", duration="60000",
|
||||||
move_to_influx=False, dut_data=dut, ssid_name="OpenWifi",
|
# move_to_influx=False, dut_data=dut, ssid_name="OpenWifi",
|
||||||
num_stations={"2G": 10, "5G": 10})
|
# num_stations={"2G": 10, "5G": 10})
|
||||||
# A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1)
|
# A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1)
|
||||||
# print(A)
|
# print(A)
|
||||||
# obj.setup_relevent_profiles()
|
# obj.setup_relevent_profiles()
|
||||||
# obj.Client_Connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog",
|
# obj.client_connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog",
|
||||||
# vlan_id=100, num_sta=5, scan_ssid=True,
|
# vlan_id=[None], num_sta=65, scan_ssid=True,
|
||||||
# station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
|
# station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
|
||||||
# allure_attach=True)
|
# allure_attach=True)
|
||||||
|
obj.dfs_test(ssid="OpenWifi", security="wpa2", passkey="OpenWifi", mode="BRIDGE",
|
||||||
|
band="fiveg", num_sta=1)
|
||||||
# obj.add_vlan(vlan_iFds=[100])
|
# obj.add_vlan(vlan_iFds=[100])
|
||||||
# obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600])
|
# obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600])
|
||||||
# obj.get_cx_data()
|
# obj.get_cx_data()
|
||||||
|
|||||||
Reference in New Issue
Block a user