added vlan modes

Signed-off-by: shivam <shivam.thakur@candelatech.com>
This commit is contained in:
shivam
2022-08-12 14:07:56 +05:30
parent d038c0959b
commit 64f88b2be9
2 changed files with 48 additions and 44 deletions

View File

@@ -320,6 +320,7 @@ class lf_libs:
def create_dhcp_bridge(self):
""" create chamber view scenario for DHCP-Bridge"""
self.setup_connectivity_port(data=self.wan_ports)
self.setup_connectivity_port(data=self.lan_ports)
self.setup_connectivity_port(data=self.uplink_nat_ports)
for wan_ports, uplink_nat_ports in zip(self.wan_ports, self.uplink_nat_ports):
upstream_port = wan_ports
@@ -338,6 +339,7 @@ class lf_libs:
def create_dhcp_external(self):
self.setup_connectivity_port(data=self.wan_ports)
self.setup_connectivity_port(data=self.lan_ports)
for wan_port in self.wan_ports:
upstream_port = wan_port
upstream_resources = upstream_port.split(".")[0] + "." + upstream_port.split(".")[1]
@@ -608,3 +610,4 @@ class SCP_File:
def save_current_scenario(self):
pass

View File

@@ -62,13 +62,13 @@ class lf_tests(lf_libs):
def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None):
if band is None:
logging.error("Band value is not available.")
pytest.exit()
pytest.exit("Band value is not available.")
if mode is None:
logging.error("mode value is not available")
pytest.exit()
pytest.exit("mode value is not available")
if num_sta is None:
logging.error("Number of stations are not available")
pytest.exit()
pytest.exit("Number of stations are not available")
if mode == "BRIDGE":
upstream_port = self.upstream_port()
elif mode == "NAT-WAN":
@@ -81,10 +81,10 @@ class lf_tests(lf_libs):
upstream_port = self.upstream_port() + str(vlan_id)
else:
logging.error("Vlan id is not available for vlan")
pytest.exit()
pytest.exit("Vlan id is not available for vlan")
else:
logging.error("Mode value is wrong.Value e.g. BRIDGE or NAT or VLAN")
pytest.exit()
pytest.exit("Mode value is wrong.Value e.g. BRIDGE or NAT or VLAN")
radio_data = {}
sta_prefix = ""
sniff_radio = ""
@@ -104,12 +104,12 @@ class lf_tests(lf_libs):
print(num_sta, self.max_2g_stations)
if int(num_sta) > int(self.max_2g_stations):
logging.error("Can't create %s stations on lanforge" % num_sta)
pytest.exit()
pytest.skip("Can't create %s stations on lanforge" % num_sta)
# checking atleast one 2g radio is available or not
elif len(self.wave2_2g_radios) == 0 and len(self.wave1_radios) and len(self.ax210_radios) == 0 and len(
self.ax200_radios) == 0 and len(self.mtk_radios) == 0:
logging.error("Twog radio is not available")
pytest.exit()
pytest.skip("Twog radio is not available")
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
@@ -185,11 +185,11 @@ class lf_tests(lf_libs):
# checking station compitality of lanforge
if int(num_sta) > int(self.max_6g_stations):
logging.error("Can't create %s stations on lanforge" % num_sta)
pytest.exit()
pytest.skip("Can't create %s stations on lanforge" % num_sta)
# checking atleast one 6g radio is available or not
elif len(self.ax210_radios) == 0:
logging.error("sixg radio is not available")
pytest.exit()
pytest.skip("sixg radio is not available")
dict_all_radios_6g = {"ax210_radios": self.ax210_radios}
@@ -305,7 +305,6 @@ class lf_tests(lf_libs):
left_radio = list(set(all_radio_2g) - set(list(station_radio_data.keys())))
if len(left_radio) == 0:
logging.error("Radios are not available for sniffing")
pytest.exit()
else:
sniff_radio = left_radio[0]
elif band == "fiveg":
@@ -314,7 +313,6 @@ class lf_tests(lf_libs):
left_radio = list(set(all_radio_5g) - set(list(station_radio_data.keys())))
if len(left_radio) == 0:
logging.error("Radios are not available for sniffing")
pytest.exit()
else:
sniff_radio = left_radio[0]
return sniff_radio
@@ -326,6 +324,7 @@ class lf_tests(lf_libs):
# self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug)
# setup_interfaces() interface selection return radio name along no of station on each radio, upstream port
#
self.add_vlan(vlan_id=vlan_id)
data = self.setup_interfaces(band=band, vlan_id=vlan_id, mode=mode, num_sta=num_sta)
logging.info("Setup interface data" + str(data))
if self.run_lf:
@@ -359,26 +358,32 @@ class lf_tests(lf_libs):
if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)[-1])["2G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["2G"][0]
self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
if data["sniff_radio_2g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data["sniff_radio_2g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
elif band == "fiveg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)[-1])["5G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["5G"][0]
self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
if data["sniff_radio_5g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data["sniff_radio_5g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
elif band == "sixg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)[-1])["6G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["6G"][0]
self.start_sniffer(radio_channel=channel, radio=data["sniff_radio"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
if data["sniff_radio_6g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data["sniff_radio_6g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
# print("ssid not available in scan result")
# return "FAIL", "ssid not available in scan result"
@@ -821,7 +826,7 @@ class lf_tests(lf_libs):
if __name__ == '__main__':
basic_1 = {
"target": "tip_2x",
"controller" : {
"controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
"password": "OpenWifi%123"
@@ -864,14 +869,6 @@ if __name__ == '__main__':
"http_port": 8080,
"ssh_port": 22,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"}, # method: build/load,
# "wan_ports": {
# "1.1.eth2": {"addressing": "static", "subnet": "10.28.2.1", "gateway_ip": "10.28.2.1",
# "dns_servers": "8.8.8.9", "ip_mask": "255.255.255.0", "dhcp": {
# "lease-first": 10,
# "lease-count": 10000,
# "lease-time": "6h"
# }}},
# # DB : Default database name
"wan_ports": {
"1.1.eth1": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
"lease-first": 10,
@@ -879,20 +876,23 @@ if __name__ == '__main__':
"lease-time": "6h"
}}},
"lan_ports": {},
# "uplink_nat_ports": {
# "1.1.eth3": {"addressing": "static", "subnet": "10.28.2.1/24", "gateway_ip": "10.28.2.1",
# "dns_servers": "8.8.8.8", "ip_mask": "255.255.255.0"}
# # dhcp-server/{"addressing":
# # "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"}
# }
"1.1.eth1": {"addressing": "dynamic"}},
"lan_ports": {},
"uplink_nat_ports": {
# dhcp-server/{"addressing":
# "dynamic"} /{"addressing": "static", "subnet": "10.28.2.6/16"}
}
}
}
obj = lf_tests(lf_data=dict(basic_1["traffic_generator"]), dut_data=list(basic_1["device_under_tests"]),
log_level=logging.DEBUG, run_lf=True)
#obj.create_dhcp_external()
obj.setup_relevent_profiles()
obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600])
# obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600])
# obj.get_cx_data()
# obj.chamber_view()
# c = obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[],
@@ -912,8 +912,9 @@ if __name__ == '__main__':
# obj.chamber_view()
# obj.client_connectivity_test(ssid="wpa2_5g", passkey="something", security="wpa2", extra_securities=[],
# num_sta=1, mode="BRIDGE", vlan_id=1,
# band="fiveg", ssid_channel=36)
obj.chamber_view()
obj.setup_relevent_profiles()
obj.add_vlan(vlan_ids=[100, 200, 300])
# # band="fiveg", ssid_channel=36)
# obj.chamber_view()
# obj.setup_relevent_profiles()
# obj.add_vlan(vlan_ids=[100, 200, 300])
# # obj.chamber_view()
# obj.setup_relevent_profiles()