diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index a9821fd2..2c90aff3 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -275,24 +275,24 @@ class lf_libs: def create_dhcp_bridge(self): """ create chamber view scenario for DHCP-Bridge""" - for uplink_nat_ports, wan_ports in zip(self.uplink_nat_ports, self.wan_ports): - upstream_port = uplink_nat_ports + for wan_ports, uplink_nat_ports in zip(self.wan_ports, self.uplink_nat_ports): + upstream_port = wan_ports upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1] - uplink_port = wan_ports + uplink_port = uplink_nat_ports uplink_resources = uplink_port.split(".")[0] + "." + uplink_port.split(".")[1] print(uplink_nat_ports) - upstream_subnet = self.uplink_nat_ports[uplink_nat_ports]["subnet"] - print(upstream_subnet) + uplink_subnet = self.uplink_nat_ports[uplink_nat_ports]["subnet"] + print(uplink_subnet) self.default_scenario_raw_lines.append(["profile_link " + upstream_resources + " upstream-dhcp 1 NA NA " + upstream_port.split(".")[2] + ",AUTO -1 NA"]) self.default_scenario_raw_lines.append( ["profile_link " + uplink_resources + " uplink-nat 1 'DUT: upstream LAN " - + upstream_subnet + + uplink_subnet + "' NA " + uplink_port.split(".")[2] + "," + upstream_port.split(".")[2] + " -1 NA"]) def create_dhcp_external(self): - for uplink_nat_ports in self.uplink_nat_ports: - upstream_port = uplink_nat_ports + for wan_port in self.wan_ports: + upstream_port = wan_port upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1] self.default_scenario_raw_lines.append(["profile_link " + upstream_resources + " upstream 1 NA NA " + upstream_port.split(".")[2] + ",AUTO -1 NA"]) @@ -337,7 +337,6 @@ class lf_libs: profile_utility_obj = ProfileUtility(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port) # Read all Profiles all_profiles = profile_utility_obj.show_profile() - print(all_profiles) logging.info("Profiles: " + str(all_profiles)) # Create upstream-dhcp and uplink-nat profile if they don't exists diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 49bc90cf..47314bb2 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -48,14 +48,14 @@ class lf_tests(lf_libs): ax_prefix = "AX200_0" pcap_obj = None - def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None, - skip_pcap=False): + def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None): super().__init__(lf_data, dut_data, log_level) self.run_lf = run_lf - self.upstream_port = list(self.uplink_nat_ports.keys())[0] - self.skip_pcap = skip_pcap - self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port) - pass + # self.upstream_port = list(self.uplink_nat_ports.keys())[0] + # self.skip_pcap = skip_pcap + #self.wan_upstream = list(self.wan_ports.keys()) + # self.lan_upstream = + self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam") def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None): if band is None: @@ -68,13 +68,13 @@ class lf_tests(lf_libs): logging.error("Number of stations are not available") pytest.exit() if mode == "BRIDGE": - upstream_port = self.upstream_port + upstream_port = self.upstream_port() elif mode == "NAT": - upstream_port = self.upstream_port + upstream_port = self.upstream_port() elif mode == "VLAN": # for vlan mode vlan id should be available if vlan_id is None: - upstream_port = self.upstream_port + str(vlan_id) + upstream_port = self.upstream_port() + str(vlan_id) else: logging.error("Vlan id is not available for vlan") pytest.exit() @@ -91,9 +91,10 @@ class lf_tests(lf_libs): "ax200_radios": 1, "ax210_radios": 1} if band == "twog": if self.run_lf: - ssid = self.dut_data[0]["ssid"]["2g-ssid"] - passkey = self.dut_data[0]["ssid"]["2g-password"] - security = self.dut_data[0]["ssid"]["2g-encryption"].lower() + for i in self.dut_data: + ssid = i["ssid"]["2g-ssid"] + passkey = i["ssid"]["2g-password"] + security = i["ssid"]["2g-encryption"].lower() sta_prefix = self.twog_prefix # checking station compitality of lanforge @@ -128,13 +129,13 @@ class lf_tests(lf_libs): stations = stations - max_station diff = max_station - stations # setup sniffer - if not self.skip_pcap: - sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) if band == "fiveg": if self.run_lf: - ssid = self.dut_data["ssid"]["5g-ssid"] - passkey = self.dut_data["ssid"]["5g-password"] - security = self.dut_data["ssid"]["5g-encryption"].lower() + for i in self.dut_data: + ssid = i["ssid"]["2g-ssid"] + passkey = i["ssid"]["2g-password"] + security = i["ssid"]["2g-encryption"].lower() sta_prefix = self.fiveg_prefix # checking station compitality of lanforge @@ -169,8 +170,7 @@ class lf_tests(lf_libs): stations = stations - max_station diff = max_station - stations - if not self.skip_pcap: - sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) data_dict = {} if self.run_lf: data_dict["radios"] = radio_data @@ -179,18 +179,15 @@ class lf_tests(lf_libs): data_dict["passkey"] = passkey data_dict["security"] = security data_dict["sta_prefix"] = sta_prefix - if not self.skip_pcap: - data_dict["sniff_radio"] = sniff_radio + data_dict["sniff_radio"] = sniff_radio return data_dict else: data_dict["radios"] = radio_data data_dict["upstream_port"] = upstream_port data_dict["sta_prefix"] = sta_prefix - if not self.skip_pcap: - data_dict["sniff_radio"] = sniff_radio + data_dict["sniff_radio"] = sniff_radio return data_dict - def pre_cleanup(self): """ deleting existing stations and layer 3 connections """ logging.info("Checking existing stations and layer3 connections...") @@ -220,6 +217,25 @@ class lf_tests(lf_libs): except Exception as e: logging.error(e) + def nametoresource(self, name=None): + """Returns resource number""" + if name is not None: + resource = name.split(".")[1] + return resource + else: + logging.error("Name is not provided") + + def upstream_port(self): + """finding upstream port""" + upstream_port = "" + print(len(self.dut_data)) + for i in self.dut_data: + upstream_port = i["wan_port"] + print(upstream_port) + return upstream_port + + + def setup_sniffer(self, band=None, station_radio_data=None): """Setup sniff radio""" sniff_radio = None @@ -243,10 +259,9 @@ class lf_tests(lf_libs): sniff_radio = left_radio[0] return sniff_radio - - def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[], - station_name=[], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None, outfile="shivam"): + station_name=[], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=None, + outfile="shivam"): # self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug) # setup_interfaces() interface selection return radio name along no of station on each radio, upstream port # @@ -260,7 +275,7 @@ class lf_tests(lf_libs): self.staConnect.sta_mode = 0 self.staConnect.upstream_resource = data["upstream_port"].split(".")[1] self.staConnect.upstream_port = data["upstream_port"].split(".")[2] - #creating dict of radio and station_list + # creating dict of radio and station_list dict_radio_sta_list = {} # list of per radio station length_to_split = list(data["radios"].values()) @@ -268,7 +283,7 @@ class lf_tests(lf_libs): sta_list = iter(station_name) # station list of per radio station list sta_list_ = [list(islice(sta_list, elem)) - for elem in length_to_split] + for elem in length_to_split] # Checking station lists according to radios if len(sta_list_) == len(length_to_split): dict_radio_sta_list = dict(zip(list(data["radios"].keys()), sta_list_)) @@ -278,26 +293,24 @@ class lf_tests(lf_libs): print("dict_radio_sta_list", dict_radio_sta_list) for radio in data["radios"]: self.enable_verbose_debug(radio=radio, enable=False) - exit(1) self.staConnect.radio = radio self.staConnect.admin_down(self.staConnect.radio) self.staConnect.admin_up(self.staConnect.radio) self.staConnect.sta_prefix = data["sta_prefix"] - self.set_radio_channel(radio=radio, channel=ssid_channel) + #changed to auto channel + self.set_radio_channel(radio=radio, channel="AUTO") print("scan ssid radio", radio.split(".")[2]) - self.data_scan_ssid = self.scan_ssid(radio=radio.split(".")[2]) - print("ssid scan data :- ", self.data_scan_ssid) - result = self.check_ssid_available_scan_result(scan_ssid_data=self.data_scan_ssid, ssid=ssid) - print("ssid available:-", result) + result = self.scan_ssid(radio=radio.split(".")[2]) + print("ssid scan data :- ", result) if not result and ssid_channel: - if not self.skip_pcap: - print("sniff radio", data["sniff_radio"].split(".")[2]) - self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) - time.sleep(30) - self.stop_sniffer() + #Sniffer required + # print("sniff radio", data["sniff_radio"].split(".")[2]) + # self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) + # time.sleep(30) + # self.stop_sniffer() print("ssid not available in scan result") return "FAIL", "ssid not available in scan result" - self.staConnect.resource = 1 + self.staConnect.resource = radio.split(".")[1] self.staConnect.dut_ssid = ssid self.staConnect.dut_passwd = passkey self.staConnect.dut_security = security @@ -312,44 +325,31 @@ class lf_tests(lf_libs): data_table = "" dict_table = {} self.staConnect.setup(extra_securities=extra_securities) - for sta_name in self.staConnect.station_names: - try: - sta_url = self.staConnect.get_station_url(sta_name) - station_info = self.staConnect.json_get(sta_url) - dict_data = station_info["interface"] - dict_table[""] = list(dict_data.keys()) - dict_table["Before"] = list(dict_data.values()) - except Exception as e: - print(e) + # for sta_name in self.staConnect.station_names: + # try: + # sta_url = self.staConnect.get_station_url(sta_name.split(".")[2]) + # station_info = self.staConnect.json_get(sta_url) + # dict_data = station_info["interface"] + # dict_table[""] = list(dict_data.keys()) + # dict_table["Before"] = list(dict_data.values()) + # except Exception as e: + # print(e) if ssid_channel: - if not self.skip_pcap: - print("sniff radio", data["sniff_radio"].split(".")[2]) - self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) + pass + #Need to start sniffer + # print("sniff radio", data["sniff_radio"].split(".")[2]) + # self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) self.staConnect.start() print("napping %f sec" % self.staConnect.runtime_secs) time.sleep(self.staConnect.runtime_secs) - report_obj = Report() - for sta_name in self.staConnect.station_names: - try: - sta_url = self.staConnect.get_station_url(sta_name) - station_info = self.staConnect.json_get(sta_url) - self.station_ip = station_info["interface"]["ip"] - dict_data = station_info["interface"] - dict_table["After"] = list(dict_data.values()) - try: - data_table = report_obj.table2(table=dict_table, headers='keys') - except Exception as e: - print(e) - allure.attach(name=str(sta_name), body=data_table) - except Exception as e: - print(e) + self.get_station_data() + self.get_layer3_data() self.staConnect.stop() run_results = self.staConnect.get_result_list() if not self.staConnect.passes(): - if self.debug: - for result in run_results: - print("test result: " + result) - pytest.exit("Test Failed: Debug True") + for result in run_results: + print("test result: " + result) + pytest.exit("Test Failed: Debug True") self.staConnect.cleanup() # try: # supplicant = "/home/lanforge/wifi/wpa_supplicant_log_" + radio.split(".")[2] + ".txt" @@ -394,8 +394,10 @@ class lf_tests(lf_libs): # result = "FAIL" # time.sleep(3) if ssid_channel: - if not self.skip_pcap: - self.stop_sniffer() + # need to stop sniffer + # if not self.skip_pcap: + # self.stop_sniffer() + pass self.set_radio_channel(radio=radio, channel="AUTO") # return result, description @@ -417,35 +419,45 @@ class lf_tests(lf_libs): def multi_psk_test(self): pass - def scan_ssid(self, radio=""): + def scan_ssid(self, radio="", retry=1, allure_attach=True, scan_time=15, ssid=None): '''This method for scan ssid data''' - list_data = [] - obj_scan = StaScan(host=self.manager_ip, port=self.manager_http_port, ssid="fake ssid", security="open", - password="[BLANK]", radio=radio, sta_list=["sta00100"], csv_output="scan_ssid.csv") - #obj_scan.pre_cleanup() - time1 = datetime.now() - first = time.mktime(time1.timetuple()) * 1000 - obj_scan.build() - obj_scan.start() - time2 = datetime.now() - second = time.mktime(time2.timetuple()) * 1000 - diff = int(second - first) - try: - with open(obj_scan.csv_output, 'r') as file: - reader = csv.reader(file) - for row in reader: - if row[1] == "age": - list_data.append(row) - continue - elif int(row[1]) < diff: - list_data.append(row) - except Exception as e: - print(e) - report_obj = Report() - csv_data_table = report_obj.table2(list_data) - allure.attach(name="scan_ssid_data", body=csv_data_table) - obj_scan.cleanup() - return list_data + count = 0 + for i in range(retry + 1): + list_data = [] + obj_scan = StaScan(host=self.manager_ip, port=self.manager_http_port, ssid="fake ssid", security="open", + password="[BLANK]", radio=radio, sta_list=["sta00100"], csv_output="scan_ssid.csv", + scan_time=scan_time) + # obj_scan.pre_cleanup() + time1 = datetime.now() + first = time.mktime(time1.timetuple()) * 1000 + obj_scan.build() + obj_scan.start() + time2 = datetime.now() + second = time.mktime(time2.timetuple()) * 1000 + diff = int(second - first) + try: + with open(obj_scan.csv_output, 'r') as file: + reader = csv.reader(file) + for row in reader: + if row[1] == "age": + list_data.append(row) + continue + elif int(row[1]) < diff: + list_data.append(row) + except Exception as e: + print(e) + report_obj = Report() + csv_data_table = report_obj.table2(list_data) + # allure.attach(name="scan_ssid_data", body=csv_data_table) + if allure_attach: + allure.attach(name="scan_ssid_data_" + str(i+1), body=csv_data_table) + obj_scan.cleanup() + if self.check_ssid_available_scan_result(scan_ssid_data=list_data, ssid=ssid): + count = count + 1 + return list_data + if count == 0: + return False + def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): self.pcap_name = test_name + ".pcap" @@ -504,6 +516,73 @@ class lf_tests(lf_libs): except Exception as e: print(e) + def get_station_data(self, rows=["ip", "signal"], sta_name=["1.1.sta0000", "1.1.sta0001"], allure_attach=True): + """ + Attach station data to allure + e.g. rows = ["ip", "signal"] , sta_names = ["1.1.wlan0000", "1.1.wlan0001"] + """ + # dict for station data + sta_dict = {} + for sta in sta_name: + sta_url = "port/" + str(sta.split(".")[0]) + "/" + str(sta.split(".")[1]) + "/" + str(sta.split(".")[2]) + station_info = self.staConnect.json_get(sta_url) + dict_data = station_info["interface"] + print("dict_data", dict_data) + temp_dict = {} + for i in rows: + temp_dict[i] = dict_data[i] + sta_dict[sta] = temp_dict + print(sta_dict) + # Creating dict for allure table + table_dict = {} + table_dict["station name"] = list(sta_dict.keys()) + for i in rows: + temp_list = [] + for j in sta_name: + temp_list.append(sta_dict[j][i]) + table_dict[i] = temp_list + #pass fail + pass_fail = [] + for i in table_dict["ip"]: + if i == "0.0.0.0": + pass_fail.append("Fail") + else: + pass_fail.append("Pass") + table_dict["Pass/Fail"] = pass_fail + if allure_attach: + try: + report_obj = Report() + data_table = report_obj.table2(table=table_dict, headers='keys') + print(data_table) + allure.attach(name="station data", body=data_table) + except Exception as e: + logging.error(e) + return sta_dict + + def get_cx_data(self, sta_name=[], cx_data=[]): + """Attach cx data to allure""" + + pass + + def get_station_list(self, num_sta=1, band="twog"): + """Create station list""" + sta_list = [] + for i in range(num_sta): + if band == "twog": + sta_list.append(self.twog_prefix + str(i)) + elif band == "fiveg": + sta_list.append(self.fiveg_prefix + str(i)) + elif band == "sixg": + sta_list.append(self.sixg_prefix + str(i)) + else: + logging.error("band is wrong") + + + + + + + if __name__ == '__main__': basic_1 = { @@ -516,6 +595,7 @@ if __name__ == '__main__': "device_under_tests": [{ "model": "edgecore_eap101", "supported_bands": ["2G", "5G"], + "wan_port": "1.1.eth1", "supported_modes": ["BRIDGE", "NAT", "VLAN"], "ssid": { "2g-ssid": "OpenWifi", @@ -550,29 +630,37 @@ if __name__ == '__main__': "http_port": 8080, "ssh_port": 22, "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, # method: build/load, - # DB : Default database name "wan_ports": { - "1.1.eth3": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { - "lease-first": 10, - "lease-count": 10000, - "lease-time": "6h" - }} - }, - "lan_ports": { - "1.1.eth1": {"addressing": "dynamic"} # dhcp-server/{"addressing": "dynamic"}/{"addressing": - # "static", "subnet": "10.28.2.6/16"} - }, - "uplink_nat_ports": { - "1.1.eth2": {"addressing": "static", "subnet": "10.28.2.1/24", "gateway_ip": "10.28.2.1"} - # dhcp-server/{"addressing": - # "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"} - }, + "1.1.eth1": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + "lease-first": 10, + "lease-count": 10000, + "lease-time": "6h" + }}}, + # DB : Default database name + # "wan_ports": { + # "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + # "lease-first": 10, + # "lease-count": 10000, + # "lease-time": "6h" + # }}}, + # "lan_ports": { + # "1.1.eth1": {"addressing": "dynamic"} # dhcp-server/{"addressing": "dynamic"}/{"addressing": + # # "static", "subnet": "10.28.2.6/16"} + # }, + # "uplink_nat_ports": { + # "1.1.eth3": {"addressing": "static", "subnet": "10.28.2.1/24", "gateway_ip": "10.28.2.1"} + # # dhcp-server/{"addressing": + # # "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"} + # }, } } } obj = lf_tests(lf_data=dict(basic_1["traffic_generator"]), dut_data=list(basic_1["device_under_tests"]), log_level=logging.DEBUG, run_lf=True) - obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[], - station_name=["ath10k_2g000"], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=11) + obj.get_station_data() # obj.chamber_view() + # obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[], + # station_name=["1.1.ath10k_2g000"], mode="BRIDGE", vlan_id=1, band="twog", ssid_channel=11) + # obj.chamber_view() + # obj.setup_relevent_profiles() diff --git a/py-json/profile_utility.py b/py-json/profile_utility.py index aa13be81..b06fe85e 100644 --- a/py-json/profile_utility.py +++ b/py-json/profile_utility.py @@ -62,17 +62,24 @@ class ProfileUtility(Realm): def show_profile(self): """Show All Profiles""" - response = self.json_post("/cli-json/show_profile", {"name": "all"}) - return "" + # response = self.json_post("/cli-json/show_profile", {"name": "all"}) + response = self.json_get("/profile/all") + return response def check_profile(self, profile_name): - return True - + response = self.show_profile() + available_profiles = [] + for i in response["profiles"]: + available_profiles.append(list(i.keys())) + if profile_name.split(" ") in available_profiles: + return True + else: + return False def main(): - obj = ProfileUtility(lfclient_host="10.28.3.32", lfclient_port=8080) - y = obj.show_profile() + obj = ProfileUtility(lfclient_host="192.168.200.101", lfclient_port=8080) + y = obj.check_profile("jk") print(y)