From 9f862f905c58993970f3e59c9b32f1132c57a0c2 Mon Sep 17 00:00:00 2001 From: Nikita Yadav Date: Sun, 29 Aug 2021 11:43:44 +0530 Subject: [PATCH] added number of vlan and mode feature calculation --- py-scripts/lf_multipsk.py | 138 +++++++++++++++++++++++++++++++------- 1 file changed, 112 insertions(+), 26 deletions(-) diff --git a/py-scripts/lf_multipsk.py b/py-scripts/lf_multipsk.py index 1f017705..0498a189 100644 --- a/py-scripts/lf_multipsk.py +++ b/py-scripts/lf_multipsk.py @@ -72,8 +72,7 @@ class MultiPsk(Realm): def build(self): station_list = [] for idex, input in enumerate(self.input): - print(input) - + # print(input) if "." in input['upstream']: num = input['upstream'].split('.')[1] sta_name = "1." + str(self.resource) + ".sta" + str(num) @@ -106,7 +105,7 @@ class MultiPsk(Realm): self.cx_profile_udp.report_timer = 1000 self.cx_profile_udp.name_prefix = "udp" port_list = list(self.local_realm.find_ports_like("%s+" % self.sta_prefix)) - print("port list", port_list) + # print("port list", port_list) if (port_list is None) or (len(port_list) < 1): raise ValueError("Unable to find ports named '%s'+" % self.sta_prefix) self.cx_profile_udp.create(endp_type="lf_udp", @@ -132,11 +131,11 @@ class MultiPsk(Realm): def monitor_vlan_ip(self): # this function gives vlan ip dict eg{'eth2.100': '172.17.0.1'} - vlan_ips = {} for i in self.input: + # print(i) if "." in i['upstream']: - print(str(i['upstream']) + " is a vlan upstream port") + # print(str(i['upstream']) + " is a vlan upstream port") print("checking its ip ..") data = self.local_realm.json_get("ports/list?fields=IP") for val in data["interfaces"]: @@ -145,14 +144,15 @@ class MultiPsk(Realm): ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip'] vlan_ips[i['upstream']] = ip_upstream # print(ip_upstream) - print(vlan_ips) - return vlan_ips + # print(vlan_ips) + return vlan_ips + # {'eth2.100': '172.17.0.1', 'eth2.200': '172.18.0.1'} def monitor_non_vlan_ip(self): non_vlan_ips = {} for i in self.input: if "." not in i['upstream']: - print(str(i['upstream']) + " is not an vlan upstream port") + # print(str(i['upstream']) + " is not an vlan upstream port") print("checking its ip ..") data = self.local_realm.json_get("ports/list?fields=IP") for val in data["interfaces"]: @@ -161,18 +161,22 @@ class MultiPsk(Realm): ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip'] non_vlan_ips[i['upstream']] = ip_upstream # print(ip_upstream) - print(non_vlan_ips) - return non_vlan_ips + # print(non_vlan_ips) + return non_vlan_ips def get_sta_ip(self): # this function gives station ip dict eg{'eth2.100': '172.17.0.100'} + # self.input = [{'password': 'lanforge1', 'upstream': 'eth2.100', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'}, {'password': 'lanforge2', 'upstream': 'eth2.200', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'}, {'password': 'lanforge3', 'upstream': 'eth2', 'mac': '', 'num_station': 1, 'radio': 'wiphy0'}] + # port_list = ['1.1.sta200', '1.1.sta00', '1.1.sta100'] station_ip = {} port_list = list(self.local_realm.find_ports_like("%s+" % self.sta_prefix)) - print("port list", port_list) + # print("port list", port_list) + # port list ['1.1.sta200', '1.1.sta00', '1.1.sta100'] for name, id in zip(port_list, self.input): # print(name) # print(type(name)) x = id['upstream'].split('.')[1] + # print(x) if name == "1." + str(self.resource) + ".sta" + str(x): data = self.local_realm.json_get("ports/list?fields=IP") @@ -186,6 +190,51 @@ class MultiPsk(Realm): # print(station_ip) return station_ip + def get_sta_ip_for_more_vlan(self): + input = [{'password': 'lanforge1', 'upstream': 'eth2.100', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'}, + {'password': 'lanforge2', 'upstream': 'eth2.200', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'}, + {'password': 'lanforge3', 'upstream': 'eth2', 'mac': '', 'num_station': 1, 'radio': 'wiphy0'}] + port_list = ['1.1.sta200', '1.1.sta00', '1.1.sta100'] + upstream_list = [] + id_num = [] + station_ip = {} + for i in input: + if "." in i['upstream']: + # print(i['upstream']) + upstream_list.append(i['upstream']) + x = i['upstream'].split('.')[1] + id_num.append(x) + + # print(upstream_list) + # print(id_num) + # print(port_list) + port = [] + + for i in port_list: + # print(i.split(".")[2]) + for num in id_num: + if i.split(".")[2] == "sta" + str(num): + port.append(i) + sorted_port = sorted(port) + + for name, id in zip(sorted_port, self.input): + # print(name) + # print(type(name)) + x = id['upstream'].split('.')[1] + # print(x) + + if name == "1." + str(self.resource) + ".sta" + str(x): + data = self.local_realm.json_get("ports/list?fields=IP") + for i in data["interfaces"]: + # print(i) + for j in i: + if j == name: + sta_ip = i[name]['ip'] + # print(sta_ip) + station_ip[id['upstream']] = sta_ip + # print(station_ip) + return station_ip + def get_non_vlan_sta_ip(self): station_nonvlan_ip = {} x = "" @@ -194,7 +243,7 @@ class MultiPsk(Realm): for id in self.input: if "." not in id['upstream']: x = id['upstream'] - print(x) + # print(x) for name in port_list: if name == "1.1.sta00": data = self.local_realm.json_get("ports/list?fields=IP") @@ -203,26 +252,28 @@ class MultiPsk(Realm): for j in i: if j == name: sta_ip = i[name]['ip'] - print(sta_ip) + # print(sta_ip) station_nonvlan_ip[x] = sta_ip return station_nonvlan_ip def compare_ip(self): vlan_ip = self.monitor_vlan_ip() station_ip = self.get_sta_ip() + # vlan_ip = {'eth2.100': '172.17.0.1', 'eth2.200': '172.18.0.1'} + # station_ip = {'eth2.100': '172.17.0.237', 'eth2.200': '172.18.100.222'} for i, j in zip(vlan_ip, station_ip): - # print(i) if i == j: x = vlan_ip[i].split('.') y = station_ip[j].split('.') - if x[0] == y[0] and x[1] == y[1] and x[2] == y[2]: + if x[0] == y[0] and x[1] == y[1]: print("station got ip from vlan") - return "Pass" - else: - print("station did not got ip from vlan") - return "Fail" + x = "Pass" + else: + print("station did not got ip from vlan") + x = "Fail" + return x - def compare_nonvlan_ip(self): + def compare_nonvlan_ip_nat(self): non_vlan_sta_ip = self.get_non_vlan_sta_ip() # print(non_vlan_sta_ip) for id in self.input: @@ -232,9 +283,27 @@ class MultiPsk(Realm): non_vlan = non_vlan_sta_ip[x].split(".") if non_vlan[0] == "192" and non_vlan[1] == "168": # print("Pass") - return 'Pass' + x = 'Pass' else: - return "Fail" + x = "Fail" + return x + + def compare_nonvlan_ip_bridge(self): + upstream_ip = self.monitor_non_vlan_ip() + non_vlan_sta_ip = self.get_non_vlan_sta_ip() + + for i, j in zip(upstream_ip, non_vlan_sta_ip): + # print(i) + if i == j: + x = upstream_ip[i].split('.') + y = non_vlan_sta_ip[j].split('.') + if x[0] == y[0] and x[1] == y[1]: + print("station got ip from upstream") + result1 = "Pass" + else: + print("station did not got ip from upstream") + result1 = "Fail" + return result1 def postcleanup(self): self.cx_profile_udp.cleanup() @@ -251,18 +320,27 @@ def main(): parser.add_argument('--mgr_port', help='port LANforge GUI HTTP service is running on', default=8080) parser.add_argument('--ssid', help='WiFi SSID for client to associate to') parser.add_argument('--security', help='WiFi Security protocol: {open|wep|wpa2|wpa3', default="wpa2") + parser.add_argument('--mode', help="specify mode of ap eg BRIDGE or NAT", default="BRIDGE") + parser.add_argument('--n_vlan', help="type number of vlan using in test eg 1 or 2", default=1) # parser.add_argument('--input', nargs="+", help="specify list of parameters like passwords,upstream,mac address, number of clients and radio as input, eg password@123,eth2.100,"",1,wiphy0 lanforge@123,eth2.100,"",1,wiphy1") args = parser.parse_args() input_data = [{ - "password": "password@123", + "password": "lanforge1", "upstream": "eth2.100", "mac": "", "num_station": 1, "radio": "wiphy4" }, { - "password": "lanforge@123", + "password": "lanforge2", + "upstream": "eth2.200", + "mac": "", + "num_station": 1, + "radio": "wiphy4" + }, + { + "password": "lanforge", "upstream": "eth2", "mac": "", "num_station": 1, @@ -280,7 +358,11 @@ def main(): multi_obj.start() time.sleep(60) multi_obj.monitor_vlan_ip() - multi_obj.get_sta_ip() + if args.n_vlan == "1": + multi_obj.get_sta_ip() + else: + multi_obj.get_sta_ip_for_more_vlan() + result = multi_obj.compare_ip() print("checking for vlan ips") if result == "Pass": @@ -290,7 +372,10 @@ def main(): print("now checking ip for non vlan port") multi_obj.monitor_non_vlan_ip() multi_obj.get_non_vlan_sta_ip() - result1 = multi_obj.compare_nonvlan_ip() + if args.mode == "BRIDGE": + result1 = multi_obj.compare_nonvlan_ip_bridge() + else: + result1 = multi_obj.compare_nonvlan_ip_nat() if result1 == "Pass": print("Test passed for non vlan ip ") else: @@ -302,3 +387,4 @@ def main(): if __name__ == '__main__': main() +