mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-03 04:07:52 +00:00
added number of vlan and mode feature calculation
This commit is contained in:
@@ -72,8 +72,7 @@ class MultiPsk(Realm):
|
|||||||
def build(self):
|
def build(self):
|
||||||
station_list = []
|
station_list = []
|
||||||
for idex, input in enumerate(self.input):
|
for idex, input in enumerate(self.input):
|
||||||
print(input)
|
# print(input)
|
||||||
|
|
||||||
if "." in input['upstream']:
|
if "." in input['upstream']:
|
||||||
num = input['upstream'].split('.')[1]
|
num = input['upstream'].split('.')[1]
|
||||||
sta_name = "1." + str(self.resource) + ".sta" + str(num)
|
sta_name = "1." + str(self.resource) + ".sta" + str(num)
|
||||||
@@ -106,7 +105,7 @@ class MultiPsk(Realm):
|
|||||||
self.cx_profile_udp.report_timer = 1000
|
self.cx_profile_udp.report_timer = 1000
|
||||||
self.cx_profile_udp.name_prefix = "udp"
|
self.cx_profile_udp.name_prefix = "udp"
|
||||||
port_list = list(self.local_realm.find_ports_like("%s+" % self.sta_prefix))
|
port_list = list(self.local_realm.find_ports_like("%s+" % self.sta_prefix))
|
||||||
print("port list", port_list)
|
# print("port list", port_list)
|
||||||
if (port_list is None) or (len(port_list) < 1):
|
if (port_list is None) or (len(port_list) < 1):
|
||||||
raise ValueError("Unable to find ports named '%s'+" % self.sta_prefix)
|
raise ValueError("Unable to find ports named '%s'+" % self.sta_prefix)
|
||||||
self.cx_profile_udp.create(endp_type="lf_udp",
|
self.cx_profile_udp.create(endp_type="lf_udp",
|
||||||
@@ -132,11 +131,11 @@ class MultiPsk(Realm):
|
|||||||
|
|
||||||
def monitor_vlan_ip(self):
|
def monitor_vlan_ip(self):
|
||||||
# this function gives vlan ip dict eg{'eth2.100': '172.17.0.1'}
|
# this function gives vlan ip dict eg{'eth2.100': '172.17.0.1'}
|
||||||
|
|
||||||
vlan_ips = {}
|
vlan_ips = {}
|
||||||
for i in self.input:
|
for i in self.input:
|
||||||
|
# print(i)
|
||||||
if "." in i['upstream']:
|
if "." in i['upstream']:
|
||||||
print(str(i['upstream']) + " is a vlan upstream port")
|
# print(str(i['upstream']) + " is a vlan upstream port")
|
||||||
print("checking its ip ..")
|
print("checking its ip ..")
|
||||||
data = self.local_realm.json_get("ports/list?fields=IP")
|
data = self.local_realm.json_get("ports/list?fields=IP")
|
||||||
for val in data["interfaces"]:
|
for val in data["interfaces"]:
|
||||||
@@ -145,14 +144,15 @@ class MultiPsk(Realm):
|
|||||||
ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip']
|
ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip']
|
||||||
vlan_ips[i['upstream']] = ip_upstream
|
vlan_ips[i['upstream']] = ip_upstream
|
||||||
# print(ip_upstream)
|
# print(ip_upstream)
|
||||||
print(vlan_ips)
|
# print(vlan_ips)
|
||||||
return vlan_ips
|
return vlan_ips
|
||||||
|
# {'eth2.100': '172.17.0.1', 'eth2.200': '172.18.0.1'}
|
||||||
|
|
||||||
def monitor_non_vlan_ip(self):
|
def monitor_non_vlan_ip(self):
|
||||||
non_vlan_ips = {}
|
non_vlan_ips = {}
|
||||||
for i in self.input:
|
for i in self.input:
|
||||||
if "." not in i['upstream']:
|
if "." not in i['upstream']:
|
||||||
print(str(i['upstream']) + " is not an vlan upstream port")
|
# print(str(i['upstream']) + " is not an vlan upstream port")
|
||||||
print("checking its ip ..")
|
print("checking its ip ..")
|
||||||
data = self.local_realm.json_get("ports/list?fields=IP")
|
data = self.local_realm.json_get("ports/list?fields=IP")
|
||||||
for val in data["interfaces"]:
|
for val in data["interfaces"]:
|
||||||
@@ -161,18 +161,67 @@ class MultiPsk(Realm):
|
|||||||
ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip']
|
ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip']
|
||||||
non_vlan_ips[i['upstream']] = ip_upstream
|
non_vlan_ips[i['upstream']] = ip_upstream
|
||||||
# print(ip_upstream)
|
# print(ip_upstream)
|
||||||
print(non_vlan_ips)
|
# print(non_vlan_ips)
|
||||||
return non_vlan_ips
|
return non_vlan_ips
|
||||||
|
|
||||||
def get_sta_ip(self):
|
def get_sta_ip(self):
|
||||||
# this function gives station ip dict eg{'eth2.100': '172.17.0.100'}
|
# this function gives station ip dict eg{'eth2.100': '172.17.0.100'}
|
||||||
|
# self.input = [{'password': 'lanforge1', 'upstream': 'eth2.100', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'}, {'password': 'lanforge2', 'upstream': 'eth2.200', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'}, {'password': 'lanforge3', 'upstream': 'eth2', 'mac': '', 'num_station': 1, 'radio': 'wiphy0'}]
|
||||||
|
# port_list = ['1.1.sta200', '1.1.sta00', '1.1.sta100']
|
||||||
station_ip = {}
|
station_ip = {}
|
||||||
port_list = list(self.local_realm.find_ports_like("%s+" % self.sta_prefix))
|
port_list = list(self.local_realm.find_ports_like("%s+" % self.sta_prefix))
|
||||||
print("port list", port_list)
|
# print("port list", port_list)
|
||||||
|
# port list ['1.1.sta200', '1.1.sta00', '1.1.sta100']
|
||||||
for name, id in zip(port_list, self.input):
|
for name, id in zip(port_list, self.input):
|
||||||
# print(name)
|
# print(name)
|
||||||
# print(type(name))
|
# print(type(name))
|
||||||
x = id['upstream'].split('.')[1]
|
x = id['upstream'].split('.')[1]
|
||||||
|
# print(x)
|
||||||
|
|
||||||
|
if name == "1." + str(self.resource) + ".sta" + str(x):
|
||||||
|
data = self.local_realm.json_get("ports/list?fields=IP")
|
||||||
|
for i in data["interfaces"]:
|
||||||
|
# print(i)
|
||||||
|
for j in i:
|
||||||
|
if j == name:
|
||||||
|
sta_ip = i[name]['ip']
|
||||||
|
# print(sta_ip)
|
||||||
|
station_ip[id['upstream']] = sta_ip
|
||||||
|
# print(station_ip)
|
||||||
|
return station_ip
|
||||||
|
|
||||||
|
def get_sta_ip_for_more_vlan(self):
|
||||||
|
input = [{'password': 'lanforge1', 'upstream': 'eth2.100', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'},
|
||||||
|
{'password': 'lanforge2', 'upstream': 'eth2.200', 'mac': '', 'num_station': 1, 'radio': 'wiphy4'},
|
||||||
|
{'password': 'lanforge3', 'upstream': 'eth2', 'mac': '', 'num_station': 1, 'radio': 'wiphy0'}]
|
||||||
|
port_list = ['1.1.sta200', '1.1.sta00', '1.1.sta100']
|
||||||
|
upstream_list = []
|
||||||
|
id_num = []
|
||||||
|
station_ip = {}
|
||||||
|
for i in input:
|
||||||
|
if "." in i['upstream']:
|
||||||
|
# print(i['upstream'])
|
||||||
|
upstream_list.append(i['upstream'])
|
||||||
|
x = i['upstream'].split('.')[1]
|
||||||
|
id_num.append(x)
|
||||||
|
|
||||||
|
# print(upstream_list)
|
||||||
|
# print(id_num)
|
||||||
|
# print(port_list)
|
||||||
|
port = []
|
||||||
|
|
||||||
|
for i in port_list:
|
||||||
|
# print(i.split(".")[2])
|
||||||
|
for num in id_num:
|
||||||
|
if i.split(".")[2] == "sta" + str(num):
|
||||||
|
port.append(i)
|
||||||
|
sorted_port = sorted(port)
|
||||||
|
|
||||||
|
for name, id in zip(sorted_port, self.input):
|
||||||
|
# print(name)
|
||||||
|
# print(type(name))
|
||||||
|
x = id['upstream'].split('.')[1]
|
||||||
|
# print(x)
|
||||||
|
|
||||||
if name == "1." + str(self.resource) + ".sta" + str(x):
|
if name == "1." + str(self.resource) + ".sta" + str(x):
|
||||||
data = self.local_realm.json_get("ports/list?fields=IP")
|
data = self.local_realm.json_get("ports/list?fields=IP")
|
||||||
@@ -194,7 +243,7 @@ class MultiPsk(Realm):
|
|||||||
for id in self.input:
|
for id in self.input:
|
||||||
if "." not in id['upstream']:
|
if "." not in id['upstream']:
|
||||||
x = id['upstream']
|
x = id['upstream']
|
||||||
print(x)
|
# print(x)
|
||||||
for name in port_list:
|
for name in port_list:
|
||||||
if name == "1.1.sta00":
|
if name == "1.1.sta00":
|
||||||
data = self.local_realm.json_get("ports/list?fields=IP")
|
data = self.local_realm.json_get("ports/list?fields=IP")
|
||||||
@@ -203,26 +252,28 @@ class MultiPsk(Realm):
|
|||||||
for j in i:
|
for j in i:
|
||||||
if j == name:
|
if j == name:
|
||||||
sta_ip = i[name]['ip']
|
sta_ip = i[name]['ip']
|
||||||
print(sta_ip)
|
# print(sta_ip)
|
||||||
station_nonvlan_ip[x] = sta_ip
|
station_nonvlan_ip[x] = sta_ip
|
||||||
return station_nonvlan_ip
|
return station_nonvlan_ip
|
||||||
|
|
||||||
def compare_ip(self):
|
def compare_ip(self):
|
||||||
vlan_ip = self.monitor_vlan_ip()
|
vlan_ip = self.monitor_vlan_ip()
|
||||||
station_ip = self.get_sta_ip()
|
station_ip = self.get_sta_ip()
|
||||||
|
# vlan_ip = {'eth2.100': '172.17.0.1', 'eth2.200': '172.18.0.1'}
|
||||||
|
# station_ip = {'eth2.100': '172.17.0.237', 'eth2.200': '172.18.100.222'}
|
||||||
for i, j in zip(vlan_ip, station_ip):
|
for i, j in zip(vlan_ip, station_ip):
|
||||||
# print(i)
|
|
||||||
if i == j:
|
if i == j:
|
||||||
x = vlan_ip[i].split('.')
|
x = vlan_ip[i].split('.')
|
||||||
y = station_ip[j].split('.')
|
y = station_ip[j].split('.')
|
||||||
if x[0] == y[0] and x[1] == y[1] and x[2] == y[2]:
|
if x[0] == y[0] and x[1] == y[1]:
|
||||||
print("station got ip from vlan")
|
print("station got ip from vlan")
|
||||||
return "Pass"
|
x = "Pass"
|
||||||
else:
|
else:
|
||||||
print("station did not got ip from vlan")
|
print("station did not got ip from vlan")
|
||||||
return "Fail"
|
x = "Fail"
|
||||||
|
return x
|
||||||
|
|
||||||
def compare_nonvlan_ip(self):
|
def compare_nonvlan_ip_nat(self):
|
||||||
non_vlan_sta_ip = self.get_non_vlan_sta_ip()
|
non_vlan_sta_ip = self.get_non_vlan_sta_ip()
|
||||||
# print(non_vlan_sta_ip)
|
# print(non_vlan_sta_ip)
|
||||||
for id in self.input:
|
for id in self.input:
|
||||||
@@ -232,9 +283,27 @@ class MultiPsk(Realm):
|
|||||||
non_vlan = non_vlan_sta_ip[x].split(".")
|
non_vlan = non_vlan_sta_ip[x].split(".")
|
||||||
if non_vlan[0] == "192" and non_vlan[1] == "168":
|
if non_vlan[0] == "192" and non_vlan[1] == "168":
|
||||||
# print("Pass")
|
# print("Pass")
|
||||||
return 'Pass'
|
x = 'Pass'
|
||||||
else:
|
else:
|
||||||
return "Fail"
|
x = "Fail"
|
||||||
|
return x
|
||||||
|
|
||||||
|
def compare_nonvlan_ip_bridge(self):
|
||||||
|
upstream_ip = self.monitor_non_vlan_ip()
|
||||||
|
non_vlan_sta_ip = self.get_non_vlan_sta_ip()
|
||||||
|
|
||||||
|
for i, j in zip(upstream_ip, non_vlan_sta_ip):
|
||||||
|
# print(i)
|
||||||
|
if i == j:
|
||||||
|
x = upstream_ip[i].split('.')
|
||||||
|
y = non_vlan_sta_ip[j].split('.')
|
||||||
|
if x[0] == y[0] and x[1] == y[1]:
|
||||||
|
print("station got ip from upstream")
|
||||||
|
result1 = "Pass"
|
||||||
|
else:
|
||||||
|
print("station did not got ip from upstream")
|
||||||
|
result1 = "Fail"
|
||||||
|
return result1
|
||||||
|
|
||||||
def postcleanup(self):
|
def postcleanup(self):
|
||||||
self.cx_profile_udp.cleanup()
|
self.cx_profile_udp.cleanup()
|
||||||
@@ -251,18 +320,27 @@ def main():
|
|||||||
parser.add_argument('--mgr_port', help='port LANforge GUI HTTP service is running on', default=8080)
|
parser.add_argument('--mgr_port', help='port LANforge GUI HTTP service is running on', default=8080)
|
||||||
parser.add_argument('--ssid', help='WiFi SSID for client to associate to')
|
parser.add_argument('--ssid', help='WiFi SSID for client to associate to')
|
||||||
parser.add_argument('--security', help='WiFi Security protocol: {open|wep|wpa2|wpa3', default="wpa2")
|
parser.add_argument('--security', help='WiFi Security protocol: {open|wep|wpa2|wpa3', default="wpa2")
|
||||||
|
parser.add_argument('--mode', help="specify mode of ap eg BRIDGE or NAT", default="BRIDGE")
|
||||||
|
parser.add_argument('--n_vlan', help="type number of vlan using in test eg 1 or 2", default=1)
|
||||||
# parser.add_argument('--input', nargs="+", help="specify list of parameters like passwords,upstream,mac address, number of clients and radio as input, eg password@123,eth2.100,"",1,wiphy0 lanforge@123,eth2.100,"",1,wiphy1")
|
# parser.add_argument('--input', nargs="+", help="specify list of parameters like passwords,upstream,mac address, number of clients and radio as input, eg password@123,eth2.100,"",1,wiphy0 lanforge@123,eth2.100,"",1,wiphy1")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
input_data = [{
|
input_data = [{
|
||||||
"password": "password@123",
|
"password": "lanforge1",
|
||||||
"upstream": "eth2.100",
|
"upstream": "eth2.100",
|
||||||
"mac": "",
|
"mac": "",
|
||||||
"num_station": 1,
|
"num_station": 1,
|
||||||
"radio": "wiphy4"
|
"radio": "wiphy4"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"password": "lanforge@123",
|
"password": "lanforge2",
|
||||||
|
"upstream": "eth2.200",
|
||||||
|
"mac": "",
|
||||||
|
"num_station": 1,
|
||||||
|
"radio": "wiphy4"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"password": "lanforge",
|
||||||
"upstream": "eth2",
|
"upstream": "eth2",
|
||||||
"mac": "",
|
"mac": "",
|
||||||
"num_station": 1,
|
"num_station": 1,
|
||||||
@@ -280,7 +358,11 @@ def main():
|
|||||||
multi_obj.start()
|
multi_obj.start()
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
multi_obj.monitor_vlan_ip()
|
multi_obj.monitor_vlan_ip()
|
||||||
|
if args.n_vlan == "1":
|
||||||
multi_obj.get_sta_ip()
|
multi_obj.get_sta_ip()
|
||||||
|
else:
|
||||||
|
multi_obj.get_sta_ip_for_more_vlan()
|
||||||
|
|
||||||
result = multi_obj.compare_ip()
|
result = multi_obj.compare_ip()
|
||||||
print("checking for vlan ips")
|
print("checking for vlan ips")
|
||||||
if result == "Pass":
|
if result == "Pass":
|
||||||
@@ -290,7 +372,10 @@ def main():
|
|||||||
print("now checking ip for non vlan port")
|
print("now checking ip for non vlan port")
|
||||||
multi_obj.monitor_non_vlan_ip()
|
multi_obj.monitor_non_vlan_ip()
|
||||||
multi_obj.get_non_vlan_sta_ip()
|
multi_obj.get_non_vlan_sta_ip()
|
||||||
result1 = multi_obj.compare_nonvlan_ip()
|
if args.mode == "BRIDGE":
|
||||||
|
result1 = multi_obj.compare_nonvlan_ip_bridge()
|
||||||
|
else:
|
||||||
|
result1 = multi_obj.compare_nonvlan_ip_nat()
|
||||||
if result1 == "Pass":
|
if result1 == "Pass":
|
||||||
print("Test passed for non vlan ip ")
|
print("Test passed for non vlan ip ")
|
||||||
else:
|
else:
|
||||||
@@ -302,3 +387,4 @@ def main():
|
|||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user