mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-02 19:58:03 +00:00
Improved client connectivity and added methods for get station data
Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
@@ -275,24 +275,24 @@ class lf_libs:
|
|||||||
|
|
||||||
def create_dhcp_bridge(self):
|
def create_dhcp_bridge(self):
|
||||||
""" create chamber view scenario for DHCP-Bridge"""
|
""" create chamber view scenario for DHCP-Bridge"""
|
||||||
for uplink_nat_ports, wan_ports in zip(self.uplink_nat_ports, self.wan_ports):
|
for wan_ports, uplink_nat_ports in zip(self.wan_ports, self.uplink_nat_ports):
|
||||||
upstream_port = uplink_nat_ports
|
upstream_port = wan_ports
|
||||||
upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1]
|
upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1]
|
||||||
uplink_port = wan_ports
|
uplink_port = uplink_nat_ports
|
||||||
uplink_resources = uplink_port.split(".")[0] + "." + uplink_port.split(".")[1]
|
uplink_resources = uplink_port.split(".")[0] + "." + uplink_port.split(".")[1]
|
||||||
print(uplink_nat_ports)
|
print(uplink_nat_ports)
|
||||||
upstream_subnet = self.uplink_nat_ports[uplink_nat_ports]["subnet"]
|
uplink_subnet = self.uplink_nat_ports[uplink_nat_ports]["subnet"]
|
||||||
print(upstream_subnet)
|
print(uplink_subnet)
|
||||||
self.default_scenario_raw_lines.append(["profile_link " + upstream_resources + " upstream-dhcp 1 NA NA " +
|
self.default_scenario_raw_lines.append(["profile_link " + upstream_resources + " upstream-dhcp 1 NA NA " +
|
||||||
upstream_port.split(".")[2] + ",AUTO -1 NA"])
|
upstream_port.split(".")[2] + ",AUTO -1 NA"])
|
||||||
self.default_scenario_raw_lines.append(
|
self.default_scenario_raw_lines.append(
|
||||||
["profile_link " + uplink_resources + " uplink-nat 1 'DUT: upstream LAN "
|
["profile_link " + uplink_resources + " uplink-nat 1 'DUT: upstream LAN "
|
||||||
+ upstream_subnet
|
+ uplink_subnet
|
||||||
+ "' NA " + uplink_port.split(".")[2] + "," + upstream_port.split(".")[2] + " -1 NA"])
|
+ "' NA " + uplink_port.split(".")[2] + "," + upstream_port.split(".")[2] + " -1 NA"])
|
||||||
|
|
||||||
def create_dhcp_external(self):
|
def create_dhcp_external(self):
|
||||||
for uplink_nat_ports in self.uplink_nat_ports:
|
for wan_port in self.wan_ports:
|
||||||
upstream_port = uplink_nat_ports
|
upstream_port = wan_port
|
||||||
upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1]
|
upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1]
|
||||||
self.default_scenario_raw_lines.append(["profile_link " + upstream_resources + " upstream 1 NA NA " +
|
self.default_scenario_raw_lines.append(["profile_link " + upstream_resources + " upstream 1 NA NA " +
|
||||||
upstream_port.split(".")[2] + ",AUTO -1 NA"])
|
upstream_port.split(".")[2] + ",AUTO -1 NA"])
|
||||||
@@ -337,7 +337,6 @@ class lf_libs:
|
|||||||
profile_utility_obj = ProfileUtility(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port)
|
profile_utility_obj = ProfileUtility(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port)
|
||||||
# Read all Profiles
|
# Read all Profiles
|
||||||
all_profiles = profile_utility_obj.show_profile()
|
all_profiles = profile_utility_obj.show_profile()
|
||||||
print(all_profiles)
|
|
||||||
logging.info("Profiles: " + str(all_profiles))
|
logging.info("Profiles: " + str(all_profiles))
|
||||||
|
|
||||||
# Create upstream-dhcp and uplink-nat profile if they don't exists
|
# Create upstream-dhcp and uplink-nat profile if they don't exists
|
||||||
|
|||||||
@@ -48,14 +48,14 @@ class lf_tests(lf_libs):
|
|||||||
ax_prefix = "AX200_0"
|
ax_prefix = "AX200_0"
|
||||||
pcap_obj = None
|
pcap_obj = None
|
||||||
|
|
||||||
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None,
|
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None):
|
||||||
skip_pcap=False):
|
|
||||||
super().__init__(lf_data, dut_data, log_level)
|
super().__init__(lf_data, dut_data, log_level)
|
||||||
self.run_lf = run_lf
|
self.run_lf = run_lf
|
||||||
self.upstream_port = list(self.uplink_nat_ports.keys())[0]
|
# self.upstream_port = list(self.uplink_nat_ports.keys())[0]
|
||||||
self.skip_pcap = skip_pcap
|
# self.skip_pcap = skip_pcap
|
||||||
self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port)
|
#self.wan_upstream = list(self.wan_ports.keys())
|
||||||
pass
|
# self.lan_upstream =
|
||||||
|
self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam")
|
||||||
|
|
||||||
def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None):
|
def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None):
|
||||||
if band is None:
|
if band is None:
|
||||||
@@ -68,13 +68,13 @@ class lf_tests(lf_libs):
|
|||||||
logging.error("Number of stations are not available")
|
logging.error("Number of stations are not available")
|
||||||
pytest.exit()
|
pytest.exit()
|
||||||
if mode == "BRIDGE":
|
if mode == "BRIDGE":
|
||||||
upstream_port = self.upstream_port
|
upstream_port = self.upstream_port()
|
||||||
elif mode == "NAT":
|
elif mode == "NAT":
|
||||||
upstream_port = self.upstream_port
|
upstream_port = self.upstream_port()
|
||||||
elif mode == "VLAN":
|
elif mode == "VLAN":
|
||||||
# for vlan mode vlan id should be available
|
# for vlan mode vlan id should be available
|
||||||
if vlan_id is None:
|
if vlan_id is None:
|
||||||
upstream_port = self.upstream_port + str(vlan_id)
|
upstream_port = self.upstream_port() + str(vlan_id)
|
||||||
else:
|
else:
|
||||||
logging.error("Vlan id is not available for vlan")
|
logging.error("Vlan id is not available for vlan")
|
||||||
pytest.exit()
|
pytest.exit()
|
||||||
@@ -91,9 +91,10 @@ class lf_tests(lf_libs):
|
|||||||
"ax200_radios": 1, "ax210_radios": 1}
|
"ax200_radios": 1, "ax210_radios": 1}
|
||||||
if band == "twog":
|
if band == "twog":
|
||||||
if self.run_lf:
|
if self.run_lf:
|
||||||
ssid = self.dut_data[0]["ssid"]["2g-ssid"]
|
for i in self.dut_data:
|
||||||
passkey = self.dut_data[0]["ssid"]["2g-password"]
|
ssid = i["ssid"]["2g-ssid"]
|
||||||
security = self.dut_data[0]["ssid"]["2g-encryption"].lower()
|
passkey = i["ssid"]["2g-password"]
|
||||||
|
security = i["ssid"]["2g-encryption"].lower()
|
||||||
sta_prefix = self.twog_prefix
|
sta_prefix = self.twog_prefix
|
||||||
|
|
||||||
# checking station compitality of lanforge
|
# checking station compitality of lanforge
|
||||||
@@ -128,13 +129,13 @@ class lf_tests(lf_libs):
|
|||||||
stations = stations - max_station
|
stations = stations - max_station
|
||||||
diff = max_station - stations
|
diff = max_station - stations
|
||||||
# setup sniffer
|
# setup sniffer
|
||||||
if not self.skip_pcap:
|
sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data)
|
||||||
sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data)
|
|
||||||
if band == "fiveg":
|
if band == "fiveg":
|
||||||
if self.run_lf:
|
if self.run_lf:
|
||||||
ssid = self.dut_data["ssid"]["5g-ssid"]
|
for i in self.dut_data:
|
||||||
passkey = self.dut_data["ssid"]["5g-password"]
|
ssid = i["ssid"]["2g-ssid"]
|
||||||
security = self.dut_data["ssid"]["5g-encryption"].lower()
|
passkey = i["ssid"]["2g-password"]
|
||||||
|
security = i["ssid"]["2g-encryption"].lower()
|
||||||
|
|
||||||
sta_prefix = self.fiveg_prefix
|
sta_prefix = self.fiveg_prefix
|
||||||
# checking station compitality of lanforge
|
# checking station compitality of lanforge
|
||||||
@@ -169,8 +170,7 @@ class lf_tests(lf_libs):
|
|||||||
stations = stations - max_station
|
stations = stations - max_station
|
||||||
diff = max_station - stations
|
diff = max_station - stations
|
||||||
|
|
||||||
if not self.skip_pcap:
|
sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data)
|
||||||
sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data)
|
|
||||||
data_dict = {}
|
data_dict = {}
|
||||||
if self.run_lf:
|
if self.run_lf:
|
||||||
data_dict["radios"] = radio_data
|
data_dict["radios"] = radio_data
|
||||||
@@ -179,18 +179,15 @@ class lf_tests(lf_libs):
|
|||||||
data_dict["passkey"] = passkey
|
data_dict["passkey"] = passkey
|
||||||
data_dict["security"] = security
|
data_dict["security"] = security
|
||||||
data_dict["sta_prefix"] = sta_prefix
|
data_dict["sta_prefix"] = sta_prefix
|
||||||
if not self.skip_pcap:
|
data_dict["sniff_radio"] = sniff_radio
|
||||||
data_dict["sniff_radio"] = sniff_radio
|
|
||||||
return data_dict
|
return data_dict
|
||||||
else:
|
else:
|
||||||
data_dict["radios"] = radio_data
|
data_dict["radios"] = radio_data
|
||||||
data_dict["upstream_port"] = upstream_port
|
data_dict["upstream_port"] = upstream_port
|
||||||
data_dict["sta_prefix"] = sta_prefix
|
data_dict["sta_prefix"] = sta_prefix
|
||||||
if not self.skip_pcap:
|
data_dict["sniff_radio"] = sniff_radio
|
||||||
data_dict["sniff_radio"] = sniff_radio
|
|
||||||
return data_dict
|
return data_dict
|
||||||
|
|
||||||
|
|
||||||
def pre_cleanup(self):
|
def pre_cleanup(self):
|
||||||
""" deleting existing stations and layer 3 connections """
|
""" deleting existing stations and layer 3 connections """
|
||||||
logging.info("Checking existing stations and layer3 connections...")
|
logging.info("Checking existing stations and layer3 connections...")
|
||||||
@@ -220,6 +217,25 @@ class lf_tests(lf_libs):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(e)
|
logging.error(e)
|
||||||
|
|
||||||
|
def nametoresource(self, name=None):
|
||||||
|
"""Returns resource number"""
|
||||||
|
if name is not None:
|
||||||
|
resource = name.split(".")[1]
|
||||||
|
return resource
|
||||||
|
else:
|
||||||
|
logging.error("Name is not provided")
|
||||||
|
|
||||||
|
def upstream_port(self):
|
||||||
|
"""finding upstream port"""
|
||||||
|
upstream_port = ""
|
||||||
|
print(len(self.dut_data))
|
||||||
|
for i in self.dut_data:
|
||||||
|
upstream_port = i["wan_port"]
|
||||||
|
print(upstream_port)
|
||||||
|
return upstream_port
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def setup_sniffer(self, band=None, station_radio_data=None):
|
def setup_sniffer(self, band=None, station_radio_data=None):
|
||||||
"""Setup sniff radio"""
|
"""Setup sniff radio"""
|
||||||
sniff_radio = None
|
sniff_radio = None
|
||||||
@@ -243,10 +259,9 @@ class lf_tests(lf_libs):
|
|||||||
sniff_radio = left_radio[0]
|
sniff_radio = left_radio[0]
|
||||||
return sniff_radio
|
return sniff_radio
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[],
|
def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[],
|
||||||
station_name=[], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None, outfile="shivam"):
|
station_name=[], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None,
|
||||||
|
outfile="shivam"):
|
||||||
# self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug)
|
# self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug)
|
||||||
# setup_interfaces() interface selection return radio name along no of station on each radio, upstream port
|
# setup_interfaces() interface selection return radio name along no of station on each radio, upstream port
|
||||||
#
|
#
|
||||||
@@ -260,7 +275,7 @@ class lf_tests(lf_libs):
|
|||||||
self.staConnect.sta_mode = 0
|
self.staConnect.sta_mode = 0
|
||||||
self.staConnect.upstream_resource = data["upstream_port"].split(".")[1]
|
self.staConnect.upstream_resource = data["upstream_port"].split(".")[1]
|
||||||
self.staConnect.upstream_port = data["upstream_port"].split(".")[2]
|
self.staConnect.upstream_port = data["upstream_port"].split(".")[2]
|
||||||
#creating dict of radio and station_list
|
# creating dict of radio and station_list
|
||||||
dict_radio_sta_list = {}
|
dict_radio_sta_list = {}
|
||||||
# list of per radio station
|
# list of per radio station
|
||||||
length_to_split = list(data["radios"].values())
|
length_to_split = list(data["radios"].values())
|
||||||
@@ -268,7 +283,7 @@ class lf_tests(lf_libs):
|
|||||||
sta_list = iter(station_name)
|
sta_list = iter(station_name)
|
||||||
# station list of per radio station list
|
# station list of per radio station list
|
||||||
sta_list_ = [list(islice(sta_list, elem))
|
sta_list_ = [list(islice(sta_list, elem))
|
||||||
for elem in length_to_split]
|
for elem in length_to_split]
|
||||||
# Checking station lists according to radios
|
# Checking station lists according to radios
|
||||||
if len(sta_list_) == len(length_to_split):
|
if len(sta_list_) == len(length_to_split):
|
||||||
dict_radio_sta_list = dict(zip(list(data["radios"].keys()), sta_list_))
|
dict_radio_sta_list = dict(zip(list(data["radios"].keys()), sta_list_))
|
||||||
@@ -278,26 +293,24 @@ class lf_tests(lf_libs):
|
|||||||
print("dict_radio_sta_list", dict_radio_sta_list)
|
print("dict_radio_sta_list", dict_radio_sta_list)
|
||||||
for radio in data["radios"]:
|
for radio in data["radios"]:
|
||||||
self.enable_verbose_debug(radio=radio, enable=False)
|
self.enable_verbose_debug(radio=radio, enable=False)
|
||||||
exit(1)
|
|
||||||
self.staConnect.radio = radio
|
self.staConnect.radio = radio
|
||||||
self.staConnect.admin_down(self.staConnect.radio)
|
self.staConnect.admin_down(self.staConnect.radio)
|
||||||
self.staConnect.admin_up(self.staConnect.radio)
|
self.staConnect.admin_up(self.staConnect.radio)
|
||||||
self.staConnect.sta_prefix = data["sta_prefix"]
|
self.staConnect.sta_prefix = data["sta_prefix"]
|
||||||
self.set_radio_channel(radio=radio, channel=ssid_channel)
|
#changed to auto channel
|
||||||
|
self.set_radio_channel(radio=radio, channel="AUTO")
|
||||||
print("scan ssid radio", radio.split(".")[2])
|
print("scan ssid radio", radio.split(".")[2])
|
||||||
self.data_scan_ssid = self.scan_ssid(radio=radio.split(".")[2])
|
result = self.scan_ssid(radio=radio.split(".")[2])
|
||||||
print("ssid scan data :- ", self.data_scan_ssid)
|
print("ssid scan data :- ", result)
|
||||||
result = self.check_ssid_available_scan_result(scan_ssid_data=self.data_scan_ssid, ssid=ssid)
|
|
||||||
print("ssid available:-", result)
|
|
||||||
if not result and ssid_channel:
|
if not result and ssid_channel:
|
||||||
if not self.skip_pcap:
|
#Sniffer required
|
||||||
print("sniff radio", data["sniff_radio"].split(".")[2])
|
# print("sniff radio", data["sniff_radio"].split(".")[2])
|
||||||
self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30)
|
# self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30)
|
||||||
time.sleep(30)
|
# time.sleep(30)
|
||||||
self.stop_sniffer()
|
# self.stop_sniffer()
|
||||||
print("ssid not available in scan result")
|
print("ssid not available in scan result")
|
||||||
return "FAIL", "ssid not available in scan result"
|
return "FAIL", "ssid not available in scan result"
|
||||||
self.staConnect.resource = 1
|
self.staConnect.resource = radio.split(".")[1]
|
||||||
self.staConnect.dut_ssid = ssid
|
self.staConnect.dut_ssid = ssid
|
||||||
self.staConnect.dut_passwd = passkey
|
self.staConnect.dut_passwd = passkey
|
||||||
self.staConnect.dut_security = security
|
self.staConnect.dut_security = security
|
||||||
@@ -312,44 +325,31 @@ class lf_tests(lf_libs):
|
|||||||
data_table = ""
|
data_table = ""
|
||||||
dict_table = {}
|
dict_table = {}
|
||||||
self.staConnect.setup(extra_securities=extra_securities)
|
self.staConnect.setup(extra_securities=extra_securities)
|
||||||
for sta_name in self.staConnect.station_names:
|
# for sta_name in self.staConnect.station_names:
|
||||||
try:
|
# try:
|
||||||
sta_url = self.staConnect.get_station_url(sta_name)
|
# sta_url = self.staConnect.get_station_url(sta_name.split(".")[2])
|
||||||
station_info = self.staConnect.json_get(sta_url)
|
# station_info = self.staConnect.json_get(sta_url)
|
||||||
dict_data = station_info["interface"]
|
# dict_data = station_info["interface"]
|
||||||
dict_table[""] = list(dict_data.keys())
|
# dict_table[""] = list(dict_data.keys())
|
||||||
dict_table["Before"] = list(dict_data.values())
|
# dict_table["Before"] = list(dict_data.values())
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
print(e)
|
# print(e)
|
||||||
if ssid_channel:
|
if ssid_channel:
|
||||||
if not self.skip_pcap:
|
pass
|
||||||
print("sniff radio", data["sniff_radio"].split(".")[2])
|
#Need to start sniffer
|
||||||
self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30)
|
# print("sniff radio", data["sniff_radio"].split(".")[2])
|
||||||
|
# self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30)
|
||||||
self.staConnect.start()
|
self.staConnect.start()
|
||||||
print("napping %f sec" % self.staConnect.runtime_secs)
|
print("napping %f sec" % self.staConnect.runtime_secs)
|
||||||
time.sleep(self.staConnect.runtime_secs)
|
time.sleep(self.staConnect.runtime_secs)
|
||||||
report_obj = Report()
|
self.get_station_data()
|
||||||
for sta_name in self.staConnect.station_names:
|
self.get_layer3_data()
|
||||||
try:
|
|
||||||
sta_url = self.staConnect.get_station_url(sta_name)
|
|
||||||
station_info = self.staConnect.json_get(sta_url)
|
|
||||||
self.station_ip = station_info["interface"]["ip"]
|
|
||||||
dict_data = station_info["interface"]
|
|
||||||
dict_table["After"] = list(dict_data.values())
|
|
||||||
try:
|
|
||||||
data_table = report_obj.table2(table=dict_table, headers='keys')
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
allure.attach(name=str(sta_name), body=data_table)
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
self.staConnect.stop()
|
self.staConnect.stop()
|
||||||
run_results = self.staConnect.get_result_list()
|
run_results = self.staConnect.get_result_list()
|
||||||
if not self.staConnect.passes():
|
if not self.staConnect.passes():
|
||||||
if self.debug:
|
for result in run_results:
|
||||||
for result in run_results:
|
print("test result: " + result)
|
||||||
print("test result: " + result)
|
pytest.exit("Test Failed: Debug True")
|
||||||
pytest.exit("Test Failed: Debug True")
|
|
||||||
self.staConnect.cleanup()
|
self.staConnect.cleanup()
|
||||||
# try:
|
# try:
|
||||||
# supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + radio.split(".")[2] + ".txt"
|
# supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + radio.split(".")[2] + ".txt"
|
||||||
@@ -394,8 +394,10 @@ class lf_tests(lf_libs):
|
|||||||
# result = "FAIL"
|
# result = "FAIL"
|
||||||
# time.sleep(3)
|
# time.sleep(3)
|
||||||
if ssid_channel:
|
if ssid_channel:
|
||||||
if not self.skip_pcap:
|
# need to stop sniffer
|
||||||
self.stop_sniffer()
|
# if not self.skip_pcap:
|
||||||
|
# self.stop_sniffer()
|
||||||
|
pass
|
||||||
self.set_radio_channel(radio=radio, channel="AUTO")
|
self.set_radio_channel(radio=radio, channel="AUTO")
|
||||||
# return result, description
|
# return result, description
|
||||||
|
|
||||||
@@ -417,35 +419,45 @@ class lf_tests(lf_libs):
|
|||||||
def multi_psk_test(self):
|
def multi_psk_test(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def scan_ssid(self, radio=""):
|
def scan_ssid(self, radio="", retry=1, allure_attach=True, scan_time=15, ssid=None):
|
||||||
'''This method for scan ssid data'''
|
'''This method for scan ssid data'''
|
||||||
list_data = []
|
count = 0
|
||||||
obj_scan = StaScan(host=self.manager_ip, port=self.manager_http_port, ssid="fake ssid", security="open",
|
for i in range(retry + 1):
|
||||||
password="[BLANK]", radio=radio, sta_list=["sta00100"], csv_output="scan_ssid.csv")
|
list_data = []
|
||||||
#obj_scan.pre_cleanup()
|
obj_scan = StaScan(host=self.manager_ip, port=self.manager_http_port, ssid="fake ssid", security="open",
|
||||||
time1 = datetime.now()
|
password="[BLANK]", radio=radio, sta_list=["sta00100"], csv_output="scan_ssid.csv",
|
||||||
first = time.mktime(time1.timetuple()) * 1000
|
scan_time=scan_time)
|
||||||
obj_scan.build()
|
# obj_scan.pre_cleanup()
|
||||||
obj_scan.start()
|
time1 = datetime.now()
|
||||||
time2 = datetime.now()
|
first = time.mktime(time1.timetuple()) * 1000
|
||||||
second = time.mktime(time2.timetuple()) * 1000
|
obj_scan.build()
|
||||||
diff = int(second - first)
|
obj_scan.start()
|
||||||
try:
|
time2 = datetime.now()
|
||||||
with open(obj_scan.csv_output, 'r') as file:
|
second = time.mktime(time2.timetuple()) * 1000
|
||||||
reader = csv.reader(file)
|
diff = int(second - first)
|
||||||
for row in reader:
|
try:
|
||||||
if row[1] == "age":
|
with open(obj_scan.csv_output, 'r') as file:
|
||||||
list_data.append(row)
|
reader = csv.reader(file)
|
||||||
continue
|
for row in reader:
|
||||||
elif int(row[1]) < diff:
|
if row[1] == "age":
|
||||||
list_data.append(row)
|
list_data.append(row)
|
||||||
except Exception as e:
|
continue
|
||||||
print(e)
|
elif int(row[1]) < diff:
|
||||||
report_obj = Report()
|
list_data.append(row)
|
||||||
csv_data_table = report_obj.table2(list_data)
|
except Exception as e:
|
||||||
allure.attach(name="scan_ssid_data", body=csv_data_table)
|
print(e)
|
||||||
obj_scan.cleanup()
|
report_obj = Report()
|
||||||
return list_data
|
csv_data_table = report_obj.table2(list_data)
|
||||||
|
# allure.attach(name="scan_ssid_data", body=csv_data_table)
|
||||||
|
if allure_attach:
|
||||||
|
allure.attach(name="scan_ssid_data_" + str(i+1), body=csv_data_table)
|
||||||
|
obj_scan.cleanup()
|
||||||
|
if self.check_ssid_available_scan_result(scan_ssid_data=list_data, ssid=ssid):
|
||||||
|
count = count + 1
|
||||||
|
return list_data
|
||||||
|
if count == 0:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60):
|
def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60):
|
||||||
self.pcap_name = test_name + ".pcap"
|
self.pcap_name = test_name + ".pcap"
|
||||||
@@ -504,6 +516,73 @@ class lf_tests(lf_libs):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
|
|
||||||
|
def get_station_data(self, rows=["ip", "signal"], sta_name=["1.1.sta0000", "1.1.sta0001"], allure_attach=True):
|
||||||
|
"""
|
||||||
|
Attach station data to allure
|
||||||
|
e.g. rows = ["ip", "signal"] , sta_names = ["1.1.wlan0000", "1.1.wlan0001"]
|
||||||
|
"""
|
||||||
|
# dict for station data
|
||||||
|
sta_dict = {}
|
||||||
|
for sta in sta_name:
|
||||||
|
sta_url = "port/" + str(sta.split(".")[0]) + "/" + str(sta.split(".")[1]) + "/" + str(sta.split(".")[2])
|
||||||
|
station_info = self.staConnect.json_get(sta_url)
|
||||||
|
dict_data = station_info["interface"]
|
||||||
|
print("dict_data", dict_data)
|
||||||
|
temp_dict = {}
|
||||||
|
for i in rows:
|
||||||
|
temp_dict[i] = dict_data[i]
|
||||||
|
sta_dict[sta] = temp_dict
|
||||||
|
print(sta_dict)
|
||||||
|
# Creating dict for allure table
|
||||||
|
table_dict = {}
|
||||||
|
table_dict["station name"] = list(sta_dict.keys())
|
||||||
|
for i in rows:
|
||||||
|
temp_list = []
|
||||||
|
for j in sta_name:
|
||||||
|
temp_list.append(sta_dict[j][i])
|
||||||
|
table_dict[i] = temp_list
|
||||||
|
#pass fail
|
||||||
|
pass_fail = []
|
||||||
|
for i in table_dict["ip"]:
|
||||||
|
if i == "0.0.0.0":
|
||||||
|
pass_fail.append("Fail")
|
||||||
|
else:
|
||||||
|
pass_fail.append("Pass")
|
||||||
|
table_dict["Pass/Fail"] = pass_fail
|
||||||
|
if allure_attach:
|
||||||
|
try:
|
||||||
|
report_obj = Report()
|
||||||
|
data_table = report_obj.table2(table=table_dict, headers='keys')
|
||||||
|
print(data_table)
|
||||||
|
allure.attach(name="station data", body=data_table)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
return sta_dict
|
||||||
|
|
||||||
|
def get_cx_data(self, sta_name=[], cx_data=[]):
|
||||||
|
"""Attach cx data to allure"""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_station_list(self, num_sta=1, band="twog"):
|
||||||
|
"""Create station list"""
|
||||||
|
sta_list = []
|
||||||
|
for i in range(num_sta):
|
||||||
|
if band == "twog":
|
||||||
|
sta_list.append(self.twog_prefix + str(i))
|
||||||
|
elif band == "fiveg":
|
||||||
|
sta_list.append(self.fiveg_prefix + str(i))
|
||||||
|
elif band == "sixg":
|
||||||
|
sta_list.append(self.sixg_prefix + str(i))
|
||||||
|
else:
|
||||||
|
logging.error("band is wrong")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
basic_1 = {
|
basic_1 = {
|
||||||
@@ -516,6 +595,7 @@ if __name__ == '__main__':
|
|||||||
"device_under_tests": [{
|
"device_under_tests": [{
|
||||||
"model": "edgecore_eap101",
|
"model": "edgecore_eap101",
|
||||||
"supported_bands": ["2G", "5G"],
|
"supported_bands": ["2G", "5G"],
|
||||||
|
"wan_port": "1.1.eth1",
|
||||||
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
|
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
|
||||||
"ssid": {
|
"ssid": {
|
||||||
"2g-ssid": "OpenWifi",
|
"2g-ssid": "OpenWifi",
|
||||||
@@ -550,29 +630,37 @@ if __name__ == '__main__':
|
|||||||
"http_port": 8080,
|
"http_port": 8080,
|
||||||
"ssh_port": 22,
|
"ssh_port": 22,
|
||||||
"setup": {"method": "build", "DB": "Test_Scenario_Automation"}, # method: build/load,
|
"setup": {"method": "build", "DB": "Test_Scenario_Automation"}, # method: build/load,
|
||||||
# DB : Default database name
|
|
||||||
"wan_ports": {
|
"wan_ports": {
|
||||||
"1.1.eth3": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
|
"1.1.eth1": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
|
||||||
"lease-first": 10,
|
"lease-first": 10,
|
||||||
"lease-count": 10000,
|
"lease-count": 10000,
|
||||||
"lease-time": "6h"
|
"lease-time": "6h"
|
||||||
}}
|
}}},
|
||||||
},
|
# DB : Default database name
|
||||||
"lan_ports": {
|
# "wan_ports": {
|
||||||
"1.1.eth1": {"addressing": "dynamic"} # dhcp-server/{"addressing": "dynamic"}/{"addressing":
|
# "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
|
||||||
# "static", "subnet": "10.28.2.6/16"}
|
# "lease-first": 10,
|
||||||
},
|
# "lease-count": 10000,
|
||||||
"uplink_nat_ports": {
|
# "lease-time": "6h"
|
||||||
"1.1.eth2": {"addressing": "static", "subnet": "10.28.2.1/24", "gateway_ip": "10.28.2.1"}
|
# }}},
|
||||||
# dhcp-server/{"addressing":
|
# "lan_ports": {
|
||||||
# "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"}
|
# "1.1.eth1": {"addressing": "dynamic"} # dhcp-server/{"addressing": "dynamic"}/{"addressing":
|
||||||
},
|
# # "static", "subnet": "10.28.2.6/16"}
|
||||||
|
# },
|
||||||
|
# "uplink_nat_ports": {
|
||||||
|
# "1.1.eth3": {"addressing": "static", "subnet": "10.28.2.1/24", "gateway_ip": "10.28.2.1"}
|
||||||
|
# # dhcp-server/{"addressing":
|
||||||
|
# # "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"}
|
||||||
|
# },
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
obj = lf_tests(lf_data=dict(basic_1["traffic_generator"]), dut_data=list(basic_1["device_under_tests"]),
|
obj = lf_tests(lf_data=dict(basic_1["traffic_generator"]), dut_data=list(basic_1["device_under_tests"]),
|
||||||
log_level=logging.DEBUG, run_lf=True)
|
log_level=logging.DEBUG, run_lf=True)
|
||||||
obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[],
|
obj.get_station_data()
|
||||||
station_name=["ath10k_2g000"], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=11)
|
|
||||||
# obj.chamber_view()
|
# obj.chamber_view()
|
||||||
|
# obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[],
|
||||||
|
# station_name=["1.1.ath10k_2g000"], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=11)
|
||||||
|
# obj.chamber_view()
|
||||||
|
# obj.setup_relevent_profiles()
|
||||||
|
|||||||
@@ -62,17 +62,24 @@ class ProfileUtility(Realm):
|
|||||||
|
|
||||||
def show_profile(self):
|
def show_profile(self):
|
||||||
"""Show All Profiles"""
|
"""Show All Profiles"""
|
||||||
response = self.json_post("/cli-json/show_profile", {"name": "all"})
|
# response = self.json_post("/cli-json/show_profile", {"name": "all"})
|
||||||
return ""
|
response = self.json_get("/profile/all")
|
||||||
|
return response
|
||||||
|
|
||||||
def check_profile(self, profile_name):
|
def check_profile(self, profile_name):
|
||||||
return True
|
response = self.show_profile()
|
||||||
|
available_profiles = []
|
||||||
|
for i in response["profiles"]:
|
||||||
|
available_profiles.append(list(i.keys()))
|
||||||
|
if profile_name.split(" ") in available_profiles:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
obj = ProfileUtility(lfclient_host="10.28.3.32", lfclient_port=8080)
|
obj = ProfileUtility(lfclient_host="192.168.200.101", lfclient_port=8080)
|
||||||
y = obj.show_profile()
|
y = obj.check_profile("jk")
|
||||||
print(y)
|
print(y)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user