mirror of
				https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
				synced 2025-11-03 20:27:54 +00:00 
			
		
		
		
	modified input data passing style and vlan ip comparision done
This commit is contained in:
		@@ -9,12 +9,12 @@ PURPOSE:
 | 
			
		||||
DESCRIPTION:
 | 
			
		||||
            The script will follow basic functionality as:-
 | 
			
		||||
            1- create station on input parameters provided
 | 
			
		||||
            2- the input parameters consist of list of passwords,upstream,mac address, number of clients and radio as input
 | 
			
		||||
            2- the input parameters consist of dictionary of passwords,upstream,mac address, number of clients and radio as input
 | 
			
		||||
            3- will create layer3 cx for tcp and udp
 | 
			
		||||
            3- verify layer3 cx
 | 
			
		||||
            4- verify the ip for each station is comming from respective vlan id or not.
 | 
			
		||||
example :-
 | 
			
		||||
        python3 lf_multipsk.py --mgr 192.168.200.29 --mgr_port 8080 --ssid nikita --security wpa2 --passwd password@123 password@123 password@123  --num_sta 2   --radio wiphy1 --input "password@123","eth1","",2,wiphy1   "password@123","eth1","",2,wiphy0
 | 
			
		||||
         python3 lf_multipsk.py --mgr localhost  --mgr_port 8802  --ssid "MDU Wi-Fi"  --security wpa2
 | 
			
		||||
 | 
			
		||||
INCLUDE_IN_README
 | 
			
		||||
    -Nikita Yadav
 | 
			
		||||
@@ -70,26 +70,24 @@ class MultiPsk(Realm):
 | 
			
		||||
        self.station_profile = self.local_realm.new_station_profile()
 | 
			
		||||
 | 
			
		||||
    def build(self):
 | 
			
		||||
 | 
			
		||||
        station_list = []
 | 
			
		||||
        for idex, input in enumerate(self.input):
 | 
			
		||||
            input_list = input.split(',')
 | 
			
		||||
            if idex == 0:
 | 
			
		||||
                number = 10
 | 
			
		||||
            elif idex == 1:
 | 
			
		||||
                number = 100
 | 
			
		||||
            elif idex == 2:
 | 
			
		||||
                number = 1000
 | 
			
		||||
            elif idex == 3:
 | 
			
		||||
                number = 10000
 | 
			
		||||
            # print(input)
 | 
			
		||||
            if "." in input['upstream']:
 | 
			
		||||
                num = input['upstream'].split('.')[1]
 | 
			
		||||
                sta_name = "1." + str(self.resource) + ".sta" + str(num)
 | 
			
		||||
                # print(sta_name)
 | 
			
		||||
            else:
 | 
			
		||||
                number = "10"
 | 
			
		||||
                # implementation for non vlan pending ****
 | 
			
		||||
            print("creating stations")
 | 
			
		||||
            station_list = LFUtils.portNameSeries(prefix_=self.sta_prefix, start_id_=self.start_id,
 | 
			
		||||
                                                  end_id_=int(input_list[3]) - 1, padding_number_=number,
 | 
			
		||||
                                                  radio=input_list[4])
 | 
			
		||||
            self.station_profile.use_security(self.security, self.ssid, str(input_list[0]))
 | 
			
		||||
            # print("hi",sta_name)
 | 
			
		||||
            station_list.append(sta_name)
 | 
			
		||||
            self.station_profile.use_security(self.security, self.ssid, str(input['password']))
 | 
			
		||||
            self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
 | 
			
		||||
            self.station_profile.set_command_param("set_port", "report_timer", 1500)
 | 
			
		||||
            self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
 | 
			
		||||
            self.station_profile.create(radio=input_list[4], sta_names_=station_list, debug=self.local_realm.debug)
 | 
			
		||||
            self.station_profile.create(radio=input['radio'], sta_names_=station_list, debug=self.local_realm.debug)
 | 
			
		||||
            self.local_realm.wait_until_ports_appear(sta_list=station_list)
 | 
			
		||||
            self.station_profile.admin_up()
 | 
			
		||||
            if self.local_realm.wait_for_ip(station_list, timeout_sec=60):
 | 
			
		||||
@@ -111,7 +109,7 @@ class MultiPsk(Realm):
 | 
			
		||||
                raise ValueError("Unable to find ports named '%s'+" % self.sta_prefix)
 | 
			
		||||
            self.cx_profile_udp.create(endp_type="lf_udp",
 | 
			
		||||
                                       side_a=port_list,
 | 
			
		||||
                                       side_b="%d.%s" % (self.resource, input_list[1]),
 | 
			
		||||
                                       side_b="%d.%s" % (self.resource, input['upstream']),
 | 
			
		||||
                                       suppress_related_commands=True)
 | 
			
		||||
 | 
			
		||||
            # Create TCP endpoints
 | 
			
		||||
@@ -123,7 +121,7 @@ class MultiPsk(Realm):
 | 
			
		||||
            self.l3_tcp_profile.report_timer = 1000
 | 
			
		||||
            self.l3_tcp_profile.create(endp_type="lf_tcp",
 | 
			
		||||
                                       side_a=list(self.local_realm.find_ports_like("%s+" % self.sta_prefix)),
 | 
			
		||||
                                       side_b="%d.%s" % (self.resource, input_list[1]),
 | 
			
		||||
                                       side_b="%d.%s" % (self.resource, input['upstream']),
 | 
			
		||||
                                       suppress_related_commands=True)
 | 
			
		||||
 | 
			
		||||
    def start(self):
 | 
			
		||||
@@ -131,34 +129,57 @@ class MultiPsk(Realm):
 | 
			
		||||
        self.l3_tcp_profile.start_cx()
 | 
			
		||||
 | 
			
		||||
    def monitor_vlan_ip(self):
 | 
			
		||||
        # this function gives vlan ip dict eg{'eth2.100': '172.17.0.1'}
 | 
			
		||||
 | 
			
		||||
        vlan_ips = {}
 | 
			
		||||
        for i in self.input:
 | 
			
		||||
            if "." in i['upstream']:
 | 
			
		||||
                print(str(i['upstream']) + " is a vlan upstream port")
 | 
			
		||||
                print("checking its ip ..")
 | 
			
		||||
                data = self.local_realm.json_get("ports/list?fields=IP")
 | 
			
		||||
        # print(data)
 | 
			
		||||
        for i in data["interfaces"]:
 | 
			
		||||
            for j in i:
 | 
			
		||||
                if "1.1.eth2.100" == j:
 | 
			
		||||
                    ip_upstream = i["1.1.eth2.100"]['ip']
 | 
			
		||||
                for val in data["interfaces"]:
 | 
			
		||||
                    for j in val:
 | 
			
		||||
                        if "1." + str(self.resource) + "." + str(i['upstream']) == j:
 | 
			
		||||
                            ip_upstream = val["1." + str(self.resource) + "." + str(i['upstream'])]['ip']
 | 
			
		||||
                            vlan_ips[i['upstream']] = ip_upstream
 | 
			
		||||
                            # print(ip_upstream)
 | 
			
		||||
        return ip_upstream
 | 
			
		||||
                            print(vlan_ips)
 | 
			
		||||
                            return vlan_ips
 | 
			
		||||
            else:
 | 
			
		||||
                print(str(i['upstream']) + " is not an vlan upstream port")
 | 
			
		||||
                return 0
 | 
			
		||||
 | 
			
		||||
    def get_sta_ip(self):
 | 
			
		||||
        # this function gives station ip dict eg{'eth2.100': '172.17.0.100'}
 | 
			
		||||
        station_ip = {}
 | 
			
		||||
        port_list = list(self.local_realm.find_ports_like("%s+" % self.sta_prefix))
 | 
			
		||||
        # print("port list", port_list)
 | 
			
		||||
        print("port list", port_list)
 | 
			
		||||
        for name, id in zip(port_list, self.input):
 | 
			
		||||
            print(name)
 | 
			
		||||
            print(type(name))
 | 
			
		||||
            x = id['upstream'].split('.')[1]
 | 
			
		||||
 | 
			
		||||
            if name == "1." + str(self.resource) + ".sta" + str(x):
 | 
			
		||||
                data = self.local_realm.json_get("ports/list?fields=IP")
 | 
			
		||||
        # print(data)
 | 
			
		||||
                for i in data["interfaces"]:
 | 
			
		||||
                    # print(i)
 | 
			
		||||
                    for j in i:
 | 
			
		||||
                if j == "1.1.sta0":
 | 
			
		||||
                    sta_ip = i["1.1.sta0"]['ip']
 | 
			
		||||
        print(sta_ip)
 | 
			
		||||
        return sta_ip
 | 
			
		||||
                        if j == name:
 | 
			
		||||
                            sta_ip = i[name]['ip']
 | 
			
		||||
                            # print(sta_ip)
 | 
			
		||||
                            station_ip[id['upstream']] = sta_ip
 | 
			
		||||
            print(station_ip)
 | 
			
		||||
            return station_ip
 | 
			
		||||
 | 
			
		||||
    def compare_ip(self):
 | 
			
		||||
        vlan_ip = self.monitor_vlan_ip()
 | 
			
		||||
        station_ip = self.get_sta_ip()
 | 
			
		||||
        vlan_ip_ = vlan_ip.split('.')
 | 
			
		||||
        station_ip_ = station_ip.split('.')
 | 
			
		||||
        if vlan_ip_[0] == station_ip_[0] and vlan_ip_[1] == station_ip_[1] and vlan_ip_[2] == station_ip_[2]:
 | 
			
		||||
        for i, j in zip(vlan_ip, station_ip):
 | 
			
		||||
            # print(i)
 | 
			
		||||
            if i == j:
 | 
			
		||||
                x = vlan_ip[i].split('.')
 | 
			
		||||
                y = station_ip[j].split('.')
 | 
			
		||||
                if x[0] == y[0] and x[1] == y[1] and x[2] == y[2]:
 | 
			
		||||
                    print("station got ip from vlan")
 | 
			
		||||
                    return "Pass"
 | 
			
		||||
        else:
 | 
			
		||||
@@ -180,13 +201,24 @@ def main():
 | 
			
		||||
    parser.add_argument('--mgr_port', help='port LANforge GUI HTTP service is running on', default=8080)
 | 
			
		||||
    parser.add_argument('--ssid', help='WiFi SSID for client to associate to')
 | 
			
		||||
    parser.add_argument('--security', help='WiFi Security protocol: {open|wep|wpa2|wpa3', default="wpa2")
 | 
			
		||||
    parser.add_argument('--input', nargs="+", help="specify list of parameters like passwords,upstream,mac address, number of clients and radio as input, eg password@123,eth2.100,"",1,wiphy0  lanforge@123,eth2.100,"",1,wiphy1")
 | 
			
		||||
    #parser.add_argument('--input', nargs="+", help="specify list of parameters like passwords,upstream,mac address, number of clients and radio as input, eg password@123,eth2.100,"",1,wiphy0  lanforge@123,eth2.100,"",1,wiphy1")
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
    input_data = [{
 | 
			
		||||
        "password": "password@123",
 | 
			
		||||
        "upstream": "eth2.100",
 | 
			
		||||
        "mac": "",
 | 
			
		||||
        "num_station": 1,
 | 
			
		||||
        "radio": "wiphy4"
 | 
			
		||||
        },
 | 
			
		||||
 | 
			
		||||
        ]
 | 
			
		||||
    multi_obj = MultiPsk(host=args.mgr,
 | 
			
		||||
                         port=args.mgr_port,
 | 
			
		||||
                         ssid=args.ssid,
 | 
			
		||||
                         input=args.input,
 | 
			
		||||
                         input=input_data,
 | 
			
		||||
                         security=args.security)
 | 
			
		||||
 | 
			
		||||
    multi_obj.build()
 | 
			
		||||
    multi_obj.start()
 | 
			
		||||
    time.sleep(60)
 | 
			
		||||
@@ -197,7 +229,6 @@ def main():
 | 
			
		||||
        print("Test pass")
 | 
			
		||||
    else:
 | 
			
		||||
        print("Test Fail")
 | 
			
		||||
 | 
			
		||||
    multi_obj.postcleanup()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user