wct initial merge

Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
shivamcandela
2021-06-01 15:59:23 +05:30
66 changed files with 6930 additions and 4556 deletions

View File

@@ -25,24 +25,33 @@ import time
# from eap_connect import EAPConnect
from test_ipv4_ttls import TTLSTest
from lf_wifi_capacity_test import WiFiCapacityTest
from create_station import CreateStation
import lf_dataplane_test
from lf_dataplane_test import DataplaneTest
class RunTest:
def __init__(self, lanforge_data=None, debug=False):
def __init__(self, lanforge_data=None, local_report_path="../reports/", debug=False):
self.lanforge_ip = lanforge_data["ip"]
self.lanforge_port = lanforge_data["port"]
self.twog_radios = lanforge_data["2.4G-Radio"]
self.fiveg_radios = lanforge_data["5G-Radio"]
self.ax_radios = lanforge_data["AX-Radio"]
self.upstream_port = lanforge_data["upstream"]
self.upstream_port = lanforge_data["upstream"].split(".")[2]
self.twog_prefix = lanforge_data["2.4G-Station-Name"]
self.fiveg_prefix = lanforge_data["5G-Station-Name"]
self.ax_prefix = lanforge_data["AX-Station-Name"]
self.debug = debug
self.lf_ssh_port = lanforge_data["ssh_port"]
self.staConnect = StaConnect2(self.lanforge_ip, self.lanforge_port, debug_=debug)
self.dataplane_obj = None
self.local_report_path = local_report_path
if not os.path.exists(self.local_report_path):
os.mkdir(self.local_report_path)
def Client_Connectivity(self, ssid="[BLANK]", passkey="[BLANK]", security="open", station_name=[],
def Client_Connectivity(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[],
station_name=[],
mode="BRIDGE", vlan_id=1, band="twog"):
'''SINGLE CLIENT CONNECTIVITY using test_connect2.py'''
self.staConnect.sta_mode = 0
@@ -68,7 +77,7 @@ class RunTest:
self.staConnect.bringup_time_sec = 60
self.staConnect.cleanup_on_exit = True
# self.staConnect.cleanup()
self.staConnect.setup()
self.staConnect.setup(extra_securities=extra_securities)
self.staConnect.start()
print("napping %f sec" % self.staConnect.runtime_secs)
time.sleep(self.staConnect.runtime_secs)
@@ -87,8 +96,8 @@ class RunTest:
time.sleep(3)
return self.staConnect.passes(), result
def EAP_Connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog", vlan_id=100,
def EAP_Connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", extra_securities=[],
mode="BRIDGE", band="twog", vlan_id=100,
station_name=[], key_mgmt="WPA-EAP",
pairwise="NA", group="NA", wpa_psk="DEFAULT",
ttls_passwd="nolastart",
@@ -99,11 +108,14 @@ class RunTest:
self.eap_connect.station_profile.sta_mode = 0
self.eap_connect.upstream_resource = 1
if mode == "BRIDGE":
self.eap_connect.upstream_port = self.upstream_port
self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port
self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port
elif mode == "NAT":
self.eap_connect.upstream_port = self.upstream_port
self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port
self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port
else:
self.eap_connect.upstream_port = self.upstream_port + "." + str(vlan_id)
self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port + "." + str(vlan_id)
self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port + "." + str(vlan_id)
if band == "twog":
self.eap_connect.radio = self.twog_radios[0]
# self.eap_connect.sta_prefix = self.twog_prefix
@@ -128,7 +140,7 @@ class RunTest:
self.eap_connect.password = passkey
self.eap_connect.security = security
self.eap_connect.sta_list = station_name
self.eap_connect.build()
self.eap_connect.build(extra_securities=extra_securities)
self.eap_connect.start(station_name, True, True)
self.eap_connect.stop()
self.eap_connect.cleanup(station_name)
@@ -157,6 +169,76 @@ class RunTest:
result = True
return result
def Client_Connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog",
vlan_id=100,
station_name=[]):
self.client_connect = CreateStation(_host=self.lanforge_ip, _port=self.lanforge_port,
_sta_list=station_name, _password=passkey, _ssid=ssid, _security=security)
self.client_connect.station_profile.sta_mode = 0
self.client_connect.upstream_resource = 1
if mode == "BRIDGE":
self.client_connect.upstream_port = self.upstream_port
elif mode == "NAT":
self.client_connect.upstream_port = self.upstream_port
else:
self.client_connect.upstream_port = self.upstream_port + "." + str(vlan_id)
if band == "twog":
self.client_connect.radio = self.twog_radios[0]
# self.client_connect.sta_prefix = self.twog_prefix
if band == "fiveg":
self.client_connect.radio = self.fiveg_radios[0]
self.client_connect.build()
self.client_connect.wait_for_ip(station_name)
print(self.client_connect.wait_for_ip(station_name))
if self.client_connect.wait_for_ip(station_name):
self.client_connect._pass("ALL Stations got IP's", print_=True)
return self.client_connect
else:
return False
def Client_disconnect(self, station_name=[]):
self.client_dis = CreateStation(_host=self.lanforge_ip, _port=self.lanforge_port,
_sta_list=station_name, _password="passkey", _ssid="ssid", _security="security")
self.client_dis.station_profile.cleanup(station_name)
return True
def dataplane(self, station_name=None, mode="BRIDGE", vlan_id=100, download_rate="85%", dut_name="TIP",
upload_rate="85%", duration="1m", instance_name="test_demo"):
if mode == "BRIDGE":
self.client_connect.upstream_port = self.upstream_port
elif mode == "NAT":
self.client_connect.upstream_port = self.upstream_port
else:
self.client_connect.upstream_port = self.upstream_port + "." + str(vlan_id)
self.dataplane_obj = DataplaneTest(lf_host=self.lanforge_ip,
lf_port=self.lanforge_port,
ssh_port=self.lf_ssh_port,
local_path=self.local_report_path,
lf_user="lanforge",
lf_password="lanforge",
instance_name=instance_name,
config_name="dpt_config",
upstream="1.1." + self.upstream_port,
pull_report=True,
load_old_cfg=False,
download_speed=download_rate,
upload_speed=upload_rate,
duration=duration,
dut=dut_name,
station="1.1." + station_name[0],
raw_lines=['pkts: Custom;60;142;256;512;1024;MTU',
'directions: DUT Transmit;DUT Receive',
'traffic_types: UDP;TCP', "show_3s: 1",
"show_ll_graphs: 1", "show_log: 1"],
)
self.dataplane_obj.setup()
self.dataplane_obj.run()
return self.dataplane_obj
if __name__ == '__main__':
lanforge_data = {
"ip": "192.168.200.81",
@@ -164,11 +246,11 @@ if __name__ == '__main__':
"2.4G-Radio": ["wiphy0"],
"5G-Radio": ["wiphy1"],
"AX-Radio": ["wiphy2"],
"upstream": "eth1",
"upstream": "1.1.eth1",
"2.4G-Station-Name": "wlan0",
"5G-Station-Name": "wlan0",
"AX-Station-Name": "ax",
}
obj = RunTest(lanforge_data=lanforge_data, debug=False)
# print(obj.eap_connect.json_get("port/1/1/sta0000?fields=ap,ip"))
obj.EAP_Connect(station_name=["sta0000", "sta0001"], eap="TTLS", ssid="testing_radius")
# obj.EAP_Connect(station_name=["sta0000", "sta0001"], eap="TTLS", ssid="testing_radius")