mirror of
				https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
				synced 2025-11-03 20:27:54 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			919 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			919 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# The Realm Class is inherited by most python tests.  Realm Class inherites from LFCliBase.
 | 
						|
# The Realm Class contains the configurable components for LANforge,
 | 
						|
# For example L3 / L4 cross connects, stations.  Also contains helper methods
 | 
						|
# http://www.candelatech.com/cookbook.php?vol=cli&book=Python_Create_Test_Scripts_With_the_Realm_Class
 | 
						|
 | 
						|
# Written by Candela Technologies Inc.
 | 
						|
#  Updated by:
 | 
						|
 | 
						|
import sys
 | 
						|
import os
 | 
						|
import importlib
 | 
						|
import re
 | 
						|
import time
 | 
						|
from pprint import pprint
 | 
						|
 | 
						|
# ---- ---- ---- ---- LANforge Base Imports ---- ---- ---- ----
 | 
						|
 | 
						|
 | 
						|
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
 | 
						|
 | 
						|
LANforge = importlib.import_module("py-json.LANforge")
 | 
						|
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
 | 
						|
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
 | 
						|
LFCliBase = lfcli_base.LFCliBase
 | 
						|
 | 
						|
# ---- ---- ---- ---- Profile Imports ---- ---- ---- ----
 | 
						|
 | 
						|
l3_cxprofile = importlib.import_module("py-json.l3_cxprofile")
 | 
						|
L3CXProfile = l3_cxprofile.L3CXProfile
 | 
						|
l4_cxprofile = importlib.import_module("py-json.l4_cxprofile")
 | 
						|
L4CXProfile = l4_cxprofile.L4CXProfile
 | 
						|
lf_attenmod = importlib.import_module("py-json.lf_attenmod")
 | 
						|
ATTENUATORProfile = lf_attenmod.ATTENUATORProfile
 | 
						|
multicast_profile = importlib.import_module("py-json.multicast_profile")
 | 
						|
MULTICASTProfile = multicast_profile.MULTICASTProfile
 | 
						|
http_profile = importlib.import_module("py-json.http_profile")
 | 
						|
HTTPProfile = http_profile.HTTPProfile
 | 
						|
station_profile = importlib.import_module("py-json.station_profile")
 | 
						|
StationProfile = station_profile.StationProfile
 | 
						|
fio_endp_profile = importlib.import_module("py-json.fio_endp_profile")
 | 
						|
FIOEndpProfile = fio_endp_profile.FIOEndpProfile
 | 
						|
test_group_profile = importlib.import_module("py-json.test_group_profile")
 | 
						|
TestGroupProfile = test_group_profile.TestGroupProfile
 | 
						|
dut_profile = importlib.import_module("py-json.dut_profile")
 | 
						|
DUTProfile = dut_profile.DUTProfile
 | 
						|
vap_profile = importlib.import_module("py-json.vap_profile")
 | 
						|
VAPProfile = vap_profile.VAPProfile
 | 
						|
mac_vlan_profile = importlib.import_module("py-json.mac_vlan_profile")
 | 
						|
MACVLANProfile = mac_vlan_profile.MACVLANProfile
 | 
						|
wifi_monitor_profile = importlib.import_module("py-json.wifi_monitor_profile")
 | 
						|
WifiMonitor = wifi_monitor_profile.WifiMonitor
 | 
						|
gen_cxprofile = importlib.import_module("py-json.gen_cxprofile")
 | 
						|
GenCXProfile = gen_cxprofile.GenCXProfile
 | 
						|
qvlan_profile = importlib.import_module("py-json.qvlan_profile")
 | 
						|
QVLANProfile = qvlan_profile.QVLANProfile
 | 
						|
port_utils = importlib.import_module("py-json.port_utils")
 | 
						|
PortUtils = port_utils.PortUtils
 | 
						|
lfdata = importlib.import_module("py-json.lfdata")
 | 
						|
LFDataCollection = lfdata.LFDataCollection
 | 
						|
 | 
						|
 | 
						|
def wpa_ent_list():
 | 
						|
    return [
 | 
						|
        "DEFAULT",
 | 
						|
        "NONE",
 | 
						|
        "WPA-PSK",
 | 
						|
        "FT-PSK",
 | 
						|
        "FT-EAP",
 | 
						|
        "FT-SAE",
 | 
						|
        "FT-EAP-SHA384",
 | 
						|
        "WPA-EAP",
 | 
						|
        "OSEN",
 | 
						|
        "IEEE8021X",
 | 
						|
        "WPA-PSK-SHA256",
 | 
						|
        "WPA-EAP-SHA256",
 | 
						|
        "WPA-PSK WPA-EAP",
 | 
						|
        "WPA-PSK-SHA256 WPA-EAP-SHA256",
 | 
						|
        "WPA-PSK WPA-EAP WPA-PSK-SHA256 WPA-EAP-SHA256"
 | 
						|
        "SAE",
 | 
						|
        "WPA-EAP-SUITE-B",
 | 
						|
        "WPA-EAP-SUITE-B-192",
 | 
						|
        "FILS-SHA256",
 | 
						|
        "FILS-SHA384",
 | 
						|
        "OWE"
 | 
						|
    ]
 | 
						|
 | 
						|
 | 
						|
class Realm(LFCliBase):
 | 
						|
    def __init__(self,
 | 
						|
                 lfclient_host="localhost",
 | 
						|
                 lfclient_port=8080,
 | 
						|
                 debug_=False,
 | 
						|
                 _exit_on_error=False,
 | 
						|
                 _exit_on_fail=False,
 | 
						|
                 _proxy_str=None,
 | 
						|
                 _capture_signal_list=None):
 | 
						|
        super().__init__(_lfjson_host=lfclient_host,
 | 
						|
                         _lfjson_port=lfclient_port,
 | 
						|
                         _debug=debug_,
 | 
						|
                         _exit_on_error=_exit_on_error,
 | 
						|
                         _exit_on_fail=_exit_on_fail,
 | 
						|
                         _proxy_str=_proxy_str,
 | 
						|
                         _capture_signal_list=_capture_signal_list)
 | 
						|
 | 
						|
        if _capture_signal_list is None:
 | 
						|
            _capture_signal_list = []
 | 
						|
        self.debug = debug_
 | 
						|
        # if debug_:
 | 
						|
        #     print("Realm _proxy_str: %s" % _proxy_str)
 | 
						|
        #     pprint(_proxy_str)
 | 
						|
        self.check_connect()
 | 
						|
        self.chan_to_freq = {}
 | 
						|
        self.freq_to_chan = {}
 | 
						|
        chan = 1
 | 
						|
        for freq in range(2412, 2472, 5):
 | 
						|
            self.freq_to_chan[freq] = chan
 | 
						|
            self.chan_to_freq[chan] = freq
 | 
						|
            chan += 1
 | 
						|
 | 
						|
        self.chan_to_freq[14] = 2484
 | 
						|
        self.chan_to_freq[34] = 5170
 | 
						|
        self.chan_to_freq[36] = 5180
 | 
						|
        self.chan_to_freq[38] = 5190
 | 
						|
        self.chan_to_freq[40] = 5200
 | 
						|
        self.chan_to_freq[42] = 5210
 | 
						|
        self.chan_to_freq[44] = 5220
 | 
						|
        self.chan_to_freq[46] = 5230
 | 
						|
        self.chan_to_freq[48] = 5240
 | 
						|
        self.chan_to_freq[52] = 5260
 | 
						|
        self.chan_to_freq[56] = 5280
 | 
						|
        self.chan_to_freq[60] = 5300
 | 
						|
        self.chan_to_freq[64] = 5320
 | 
						|
        self.chan_to_freq[100] = 5500
 | 
						|
        self.chan_to_freq[104] = 5520
 | 
						|
        self.chan_to_freq[108] = 5540
 | 
						|
        self.chan_to_freq[112] = 5560
 | 
						|
        self.chan_to_freq[116] = 5580
 | 
						|
        self.chan_to_freq[120] = 5600
 | 
						|
        self.chan_to_freq[124] = 5620
 | 
						|
        self.chan_to_freq[128] = 5640
 | 
						|
        self.chan_to_freq[132] = 5660
 | 
						|
        self.chan_to_freq[136] = 5680
 | 
						|
        self.chan_to_freq[140] = 5700
 | 
						|
        self.chan_to_freq[144] = 5720
 | 
						|
        self.chan_to_freq[149] = 5745
 | 
						|
        self.chan_to_freq[153] = 5765
 | 
						|
        self.chan_to_freq[157] = 5785
 | 
						|
        self.chan_to_freq[161] = 5805
 | 
						|
        self.chan_to_freq[165] = 5825
 | 
						|
        self.chan_to_freq[169] = 5845
 | 
						|
        self.chan_to_freq[173] = 5865
 | 
						|
 | 
						|
        self.freq_to_chan[2484] = 14
 | 
						|
        self.freq_to_chan[5170] = 34
 | 
						|
        self.freq_to_chan[5180] = 36
 | 
						|
        self.freq_to_chan[5190] = 38
 | 
						|
        self.freq_to_chan[5200] = 40
 | 
						|
        self.freq_to_chan[5210] = 42
 | 
						|
        self.freq_to_chan[5220] = 44
 | 
						|
        self.freq_to_chan[5230] = 46
 | 
						|
        self.freq_to_chan[5240] = 48
 | 
						|
        self.freq_to_chan[5260] = 52
 | 
						|
        self.freq_to_chan[5280] = 56
 | 
						|
        self.freq_to_chan[5300] = 60
 | 
						|
        self.freq_to_chan[5320] = 64
 | 
						|
        self.freq_to_chan[5500] = 100
 | 
						|
        self.freq_to_chan[5520] = 104
 | 
						|
        self.freq_to_chan[5540] = 108
 | 
						|
        self.freq_to_chan[5560] = 112
 | 
						|
        self.freq_to_chan[5580] = 116
 | 
						|
        self.freq_to_chan[5600] = 120
 | 
						|
        self.freq_to_chan[5620] = 124
 | 
						|
        self.freq_to_chan[5640] = 128
 | 
						|
        self.freq_to_chan[5660] = 132
 | 
						|
        self.freq_to_chan[5680] = 136
 | 
						|
        self.freq_to_chan[5700] = 140
 | 
						|
        self.freq_to_chan[5720] = 144
 | 
						|
        self.freq_to_chan[5745] = 149
 | 
						|
        self.freq_to_chan[5765] = 153
 | 
						|
        self.freq_to_chan[5785] = 157
 | 
						|
        self.freq_to_chan[5805] = 161
 | 
						|
        self.freq_to_chan[5825] = 165
 | 
						|
        self.freq_to_chan[5845] = 169
 | 
						|
        self.freq_to_chan[5865] = 173
 | 
						|
 | 
						|
        # 4.9Ghz police band
 | 
						|
        self.chan_to_freq[183] = 4915
 | 
						|
        self.chan_to_freq[184] = 4920
 | 
						|
        self.chan_to_freq[185] = 4925
 | 
						|
        self.chan_to_freq[187] = 4935
 | 
						|
        self.chan_to_freq[188] = 4940
 | 
						|
        self.chan_to_freq[189] = 4945
 | 
						|
        self.chan_to_freq[192] = 4960
 | 
						|
        self.chan_to_freq[194] = 4970
 | 
						|
        self.chan_to_freq[196] = 4980
 | 
						|
 | 
						|
        self.freq_to_chan[4915] = 183
 | 
						|
        self.freq_to_chan[4920] = 184
 | 
						|
        self.freq_to_chan[4925] = 185
 | 
						|
        self.freq_to_chan[4935] = 187
 | 
						|
        self.freq_to_chan[4940] = 188
 | 
						|
        self.freq_to_chan[4945] = 189
 | 
						|
        self.freq_to_chan[4960] = 192
 | 
						|
        self.freq_to_chan[4970] = 194
 | 
						|
        self.freq_to_chan[4980] = 196
 | 
						|
 | 
						|
    def wait_until_ports_appear(self, sta_list=None, debug_=False):
 | 
						|
        if (sta_list is None) or (len(sta_list) < 1):
 | 
						|
            print("realm.wait_until_ports_appear: no stations provided")
 | 
						|
            return
 | 
						|
        LFUtils.wait_until_ports_appear(base_url=self.lfclient_url,
 | 
						|
                                        port_list=sta_list,
 | 
						|
                                        debug=debug_)
 | 
						|
 | 
						|
    def wait_until_ports_disappear(self, sta_list=None, debug_=False):
 | 
						|
        if (sta_list is None) or (len(sta_list) < 1):
 | 
						|
            print("realm.wait_until_ports_appear: no stations provided")
 | 
						|
            return
 | 
						|
 | 
						|
        LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url,
 | 
						|
                                           port_list=sta_list,
 | 
						|
                                           debug=debug_)
 | 
						|
 | 
						|
    def rm_port(self, port_eid, check_exists=True, debug_=None):
 | 
						|
        if port_eid is None:
 | 
						|
            raise ValueError("realm.rm_port: want a port eid like 1.1.eth1")
 | 
						|
        if debug_ is None:
 | 
						|
            debug_ = self.debug
 | 
						|
        req_url = "/cli-json/rm_vlan"
 | 
						|
        eid = self.name_to_eid(port_eid)
 | 
						|
        if check_exists:
 | 
						|
            if not self.port_exists(port_eid, debug=debug_):
 | 
						|
                return False
 | 
						|
 | 
						|
        data = {
 | 
						|
            "shelf": eid[0],
 | 
						|
            "resource": eid[1],
 | 
						|
            "port": eid[2]
 | 
						|
        }
 | 
						|
        self.json_post(req_url, data, debug_=debug_)
 | 
						|
        return True
 | 
						|
 | 
						|
    def port_exists(self, port_eid, debug=None):
 | 
						|
        if debug is None:
 | 
						|
            debug = self.debug
 | 
						|
        eid = self.name_to_eid(port_eid)
 | 
						|
        current_stations = self.json_get("/port/%s/%s/%s?fields=alias" % (eid[0], eid[1], eid[2]),
 | 
						|
                                         debug_=debug)
 | 
						|
        if current_stations:
 | 
						|
            return True
 | 
						|
        return False
 | 
						|
 | 
						|
    def admin_up(self, port_eid):
 | 
						|
        # print("186 admin_up port_eid: "+port_eid)
 | 
						|
        eid = self.name_to_eid(port_eid)
 | 
						|
        resource = eid[1]
 | 
						|
        port = eid[2]
 | 
						|
        request = LFUtils.port_up_request(resource_id=resource, port_name=port)
 | 
						|
        # print("192.admin_up request: resource: %s port_name %s"%(resource, port))
 | 
						|
        # time.sleep(2)
 | 
						|
        self.json_post("/cli-json/set_port", request)
 | 
						|
 | 
						|
    def admin_down(self, port_eid):
 | 
						|
        eid = self.name_to_eid(port_eid)
 | 
						|
        resource = eid[1]
 | 
						|
        port = eid[2]
 | 
						|
        request = LFUtils.port_down_request(resource_id=resource, port_name=port)
 | 
						|
        self.json_post("/cli-json/set_port", request)
 | 
						|
 | 
						|
    def reset_port(self, port_eid):
 | 
						|
        eid = self.name_to_eid(port_eid)
 | 
						|
        resource = eid[1]
 | 
						|
        port = eid[2]
 | 
						|
        request = LFUtils.port_reset_request(resource_id=resource, port_name=port)
 | 
						|
        self.json_post("cli-json/reset_port", request)
 | 
						|
 | 
						|
    def rm_cx(self, cx_name):
 | 
						|
        req_url = "cli-json/rm_cx"
 | 
						|
        data = {
 | 
						|
            "test_mgr": "ALL",
 | 
						|
            "cx_name": cx_name
 | 
						|
        }
 | 
						|
        self.json_post(req_url, data)
 | 
						|
 | 
						|
    def rm_endp(self, ename, debug_=False, suppress_related_commands_=True):
 | 
						|
        req_url = "cli-json/rm_endp"
 | 
						|
        data = {
 | 
						|
            "endp_name": ename
 | 
						|
        }
 | 
						|
        self.json_post(req_url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands_)
 | 
						|
 | 
						|
    def set_endp_tos(self, ename, _tos, debug_=False, suppress_related_commands_=True):
 | 
						|
        req_url = "cli-json/set_endp_tos"
 | 
						|
        tos = _tos
 | 
						|
        # Convert some human readable values to numeric needed by LANforge.
 | 
						|
        if _tos == "BK":
 | 
						|
            tos = "64"
 | 
						|
        if _tos == "BE":
 | 
						|
            tos = "96"
 | 
						|
        if _tos == "VI":
 | 
						|
            tos = "128"
 | 
						|
        if _tos == "VO":
 | 
						|
            tos = "192"
 | 
						|
        data = {
 | 
						|
            "name": ename,
 | 
						|
            "tos": tos
 | 
						|
        }
 | 
						|
        self.json_post(req_url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands_)
 | 
						|
 | 
						|
    def stop_cx(self, cx_name):
 | 
						|
        self.json_post("/cli-json/set_cx_state", {
 | 
						|
            "test_mgr": "ALL",
 | 
						|
            "cx_name": cx_name,
 | 
						|
            "cx_state": "STOPPED"
 | 
						|
        }, debug_=self.debug)
 | 
						|
 | 
						|
    def cleanup_cxe_prefix(self, prefix):
 | 
						|
        cx_list = self.cx_list()
 | 
						|
        if cx_list:
 | 
						|
            for cx_name in cx_list:
 | 
						|
                if cx_name.startswith(prefix):
 | 
						|
                    self.rm_cx(cx_name)
 | 
						|
 | 
						|
        endp_list = self.json_get("/endp/list")
 | 
						|
        if endp_list:
 | 
						|
            if 'endpoint' in endp_list:
 | 
						|
                endp_list = list(endp_list['endpoint'])
 | 
						|
                for idx in range(len(endp_list)):
 | 
						|
                    endp_name = list(endp_list[idx])[0]
 | 
						|
                    if endp_name.startswith(prefix):
 | 
						|
                        self.rm_endp(endp_name)
 | 
						|
            else:
 | 
						|
                if self.debug:
 | 
						|
                    print("cleanup_cxe_prefix no endpoints: endp_list{}".format(endp_list))
 | 
						|
 | 
						|
    def channel_freq(self, channel_=0):
 | 
						|
        return self.chan_to_freq[channel_]
 | 
						|
 | 
						|
    def freq_channel(self, freq_=0):
 | 
						|
        return self.freq_to_chan[freq_]
 | 
						|
 | 
						|
    # checks for OK or BUSY when querying cli-json/cv+is_built
 | 
						|
    def wait_while_building(self, debug_=False):
 | 
						|
        response_json = []
 | 
						|
        data = {
 | 
						|
            "cmd": "cv is_built"
 | 
						|
        }
 | 
						|
        last_response = "BUSY"
 | 
						|
        dbg_param = ""
 | 
						|
        if debug_:
 | 
						|
            dbg_param = "?__debug=1"
 | 
						|
 | 
						|
        while last_response != "YES":
 | 
						|
            self.json_post("/gui-json/cmd%s" % dbg_param, data, debug_=debug_, response_json_list_=response_json)
 | 
						|
            # LFUtils.debug_printer.pprint(response_json)
 | 
						|
            last_response = response_json[0]["LAST"]["response"]
 | 
						|
            if last_response != "YES":
 | 
						|
                last_response = None
 | 
						|
                response_json = []
 | 
						|
                time.sleep(1)
 | 
						|
            else:
 | 
						|
                return
 | 
						|
        return
 | 
						|
 | 
						|
    # loads a database
 | 
						|
    def load(self, name):
 | 
						|
        if (name is None) or (name == ""):
 | 
						|
            raise ValueError(
 | 
						|
                "Realm::load: wants a test scenario database name, please find one in the Status tab of the GUI")
 | 
						|
 | 
						|
        data = {
 | 
						|
            "name": name,
 | 
						|
            "action": "overwrite",
 | 
						|
            "clean_dut": "yes",
 | 
						|
            "clean_chambers": "yes"
 | 
						|
        }
 | 
						|
        self.json_post("/cli-json/load", _data=data, debug_=self.debug)
 | 
						|
        time.sleep(1)
 | 
						|
 | 
						|
    # Returns json response from webpage of all layer 3 cross connects
 | 
						|
    def cx_list(self):
 | 
						|
        response = self.json_get("/cx/list")
 | 
						|
        return response
 | 
						|
 | 
						|
    def waitUntilEndpsAppear(self, these_endp, debug=False):
 | 
						|
        return self.wait_until_endps_appear(these_endp, debug=debug)
 | 
						|
 | 
						|
    def wait_until_endps_appear(self, these_endp, debug=False):
 | 
						|
        wait_more = True
 | 
						|
        count = 0
 | 
						|
        while wait_more:
 | 
						|
            time.sleep(1)
 | 
						|
            wait_more = False
 | 
						|
            endp_list = self.json_get("/endp/list")
 | 
						|
            found_endps = {}
 | 
						|
            if debug:
 | 
						|
                print("Waiting on endpoint endp_list {}".format(endp_list))
 | 
						|
            if endp_list and ("items" not in endp_list):
 | 
						|
                try:
 | 
						|
                    endp_list = list(endp_list['endpoint'])
 | 
						|
                    for idx in range(len(endp_list)):
 | 
						|
                        name = list(endp_list[idx])[0]
 | 
						|
                        found_endps[name] = name
 | 
						|
                except:
 | 
						|
                    print(
 | 
						|
                        "non-fatal exception endp_list = list(endp_list['endpoint'] did not exist, will wait some more")
 | 
						|
 | 
						|
            for req in these_endp:
 | 
						|
                if req not in found_endps:
 | 
						|
                    if debug:
 | 
						|
                        print("Waiting on endpoint: %s" % req)
 | 
						|
                    wait_more = True
 | 
						|
            count += 1
 | 
						|
            if count > 100:
 | 
						|
                break
 | 
						|
 | 
						|
        return not wait_more
 | 
						|
 | 
						|
    def waitUntilCxsAppear(self, these_cx, debug=False):
 | 
						|
        return self.wait_until_cxs_appear(these_cx, debug=debug)
 | 
						|
 | 
						|
    def wait_until_cxs_appear(self, these_cx, debug=False):
 | 
						|
        wait_more = True
 | 
						|
        count = 0
 | 
						|
        while wait_more:
 | 
						|
            time.sleep(1)
 | 
						|
            wait_more = False
 | 
						|
            found_cxs = {}
 | 
						|
            cx_list = self.cx_list()
 | 
						|
            not_cx = ['warnings', 'errors', 'handler', 'uri', 'items']
 | 
						|
            if cx_list:
 | 
						|
                for cx_name in cx_list:
 | 
						|
                    if cx_name in not_cx:
 | 
						|
                        continue
 | 
						|
                    found_cxs[cx_name] = cx_name
 | 
						|
 | 
						|
            for req in these_cx:
 | 
						|
                if req not in found_cxs:
 | 
						|
                    if debug:
 | 
						|
                        print("Waiting on CX: %s" % req)
 | 
						|
                    wait_more = True
 | 
						|
            count += 1
 | 
						|
            if count > 100:
 | 
						|
                break
 | 
						|
 | 
						|
        return not wait_more
 | 
						|
 | 
						|
    # def wait_until_database_loaded(self):
 | 
						|
 | 
						|
    # Returns map of all stations with port+type == WIFI-STATION
 | 
						|
    # Key is the EID, value is the map of key/values for the port values.
 | 
						|
    def station_map(self):
 | 
						|
        response = super().json_get("/port/list?fields=port,_links,alias,device,port+type")
 | 
						|
        if (response is None) or ("interfaces" not in response):
 | 
						|
            pprint(response)
 | 
						|
            print("station_list: incomplete response, halting")
 | 
						|
            exit(1)
 | 
						|
        sta_map = {}
 | 
						|
        temp_map = LFUtils.portListToAliasMap(response)
 | 
						|
        for k, v in temp_map.items():
 | 
						|
            if v['port type'] == "WIFI-STA":
 | 
						|
                sta_map[k] = v
 | 
						|
        temp_map.clear()
 | 
						|
        del temp_map
 | 
						|
        del response
 | 
						|
        return sta_map
 | 
						|
 | 
						|
    # Returns list of all stations with port+type == WIFI-STATION
 | 
						|
    def station_list(self):
 | 
						|
        sta_list = []
 | 
						|
        response = super().json_get("/port/list?fields=_links,alias,device,port+type")
 | 
						|
        if (response is None) or ("interfaces" not in response):
 | 
						|
            print("station_list: incomplete response:")
 | 
						|
            pprint(response)
 | 
						|
            exit(1)
 | 
						|
 | 
						|
        for x in range(len(response['interfaces'])):
 | 
						|
            for k, v in response['interfaces'][x].items():
 | 
						|
                if v['port type'] == "WIFI-STA":
 | 
						|
                    sta_list.append(response['interfaces'][x])
 | 
						|
        del response
 | 
						|
        return sta_list
 | 
						|
 | 
						|
    # Returns list of all ports
 | 
						|
    def port_list(self):
 | 
						|
        response = super().json_get("/port/list?fields=all")
 | 
						|
        if (response is None) or ("interfaces" not in response):
 | 
						|
            print("port_list: incomplete response:")
 | 
						|
            pprint(response)
 | 
						|
            return None
 | 
						|
 | 
						|
        return response['interfaces']
 | 
						|
 | 
						|
    # Returns list of all VAPs with "vap" in their name
 | 
						|
    def vap_list(self):
 | 
						|
        sta_list = []
 | 
						|
        response = super().json_get("/port/list?fields=_links,alias,device,port+type")
 | 
						|
        for x in range(len(response['interfaces'])):
 | 
						|
            for k, v in response['interfaces'][x].items():
 | 
						|
                if "vap" in v['device']:
 | 
						|
                    sta_list.append(response['interfaces'][x])
 | 
						|
 | 
						|
        return sta_list
 | 
						|
 | 
						|
    # Returns all attenuators
 | 
						|
    def atten_list(self):
 | 
						|
        response = super().json_get("/atten/list")
 | 
						|
        return response['attenuators']
 | 
						|
 | 
						|
    # EID is shelf.resource.atten-serno.atten-idx
 | 
						|
    def set_atten(self, eid, atten_ddb):
 | 
						|
        eid_toks = self.name_to_eid(eid, non_port=True)
 | 
						|
        req_url = "cli-json/set_attenuator"
 | 
						|
        data = {
 | 
						|
            "shelf": eid_toks[0],
 | 
						|
            "resource": eid_toks[1],
 | 
						|
            "serno": eid_toks[2],
 | 
						|
            "atten_idx": eid_toks[3],
 | 
						|
            "val": atten_ddb,
 | 
						|
        }
 | 
						|
        self.json_post(req_url, data)
 | 
						|
 | 
						|
    # removes port by eid/eidpn
 | 
						|
    def remove_vlan_by_eid(self, eid):
 | 
						|
        if (eid is None) or (eid == ""):
 | 
						|
            raise ValueError("removeVlanByEid wants eid like 1.1.sta0 but given[%s]" % eid)
 | 
						|
        hunks = self.name_to_eid(eid)
 | 
						|
        # print("- - - - - - - - - - - - - - - - -")
 | 
						|
        # pprint(hunks)
 | 
						|
        # pprint(self.lfclient_url)
 | 
						|
        # print("- - - - - - - - - - - - - - - - -")
 | 
						|
        if (len(hunks) > 3) or (len(hunks) < 2):
 | 
						|
            raise ValueError("removeVlanByEid wants eid like 1.1.sta0 but given[%s]" % eid)
 | 
						|
        elif len(hunks) == 3:
 | 
						|
            LFUtils.removePort(hunks[1], hunks[2], self.lfclient_url)
 | 
						|
        else:
 | 
						|
            LFUtils.removePort(hunks[0], hunks[1], self.lfclient_url)
 | 
						|
 | 
						|
    # Searches for ports that match a given pattern and returns a list of names
 | 
						|
    def find_ports_like(self, pattern="", _fields="_links,alias,device,port+type", resource=0, debug_=False):
 | 
						|
        if resource == 0:
 | 
						|
            url = "/port/1/list?fields=%s" % _fields
 | 
						|
        else:
 | 
						|
            url = "/port/1/%s/list?fields=%s" % (resource, _fields)
 | 
						|
        response = self.json_get(url)
 | 
						|
        if debug_:
 | 
						|
            print("# find_ports_like r:%s, u:%s #" % (resource, url))
 | 
						|
            pprint(response)
 | 
						|
        alias_map = LFUtils.portListToAliasMap(response, debug_=debug_)
 | 
						|
        if debug_:
 | 
						|
            pprint(alias_map)
 | 
						|
        prelim_map = {}
 | 
						|
        matched_map = {}
 | 
						|
        for name, record in alias_map.items():
 | 
						|
            try:
 | 
						|
                if debug_:
 | 
						|
                    print("- prelim - - - - - - - - - - - - - - - - - - -")
 | 
						|
                    pprint(record)
 | 
						|
                if record["port type"] == "WIFI-STA":
 | 
						|
                    prelim_map[name] = record
 | 
						|
 | 
						|
            except Exception as x:
 | 
						|
                self.error(x)
 | 
						|
 | 
						|
        prefix = ""
 | 
						|
        try:
 | 
						|
            if pattern.find("+") > 0:
 | 
						|
                match = re.search(r"^([^+]+)[+]$", pattern)
 | 
						|
                if match.group(1):
 | 
						|
                    prefix = match.group(1)
 | 
						|
                for port_eid, record in prelim_map.items():
 | 
						|
                    if debug_:
 | 
						|
                        print("name:", port_eid, " Group 1: ", match.group(1))
 | 
						|
                    if port_eid.find(prefix) >= 0:
 | 
						|
                        matched_map[port_eid] = record
 | 
						|
 | 
						|
            elif pattern.find("*") > 0:
 | 
						|
                match = re.search(r"^([^\*]+)[*]$", pattern)
 | 
						|
                if match.group(1):
 | 
						|
                    prefix = match.group(1)
 | 
						|
                    if debug_:
 | 
						|
                        print("group 1: ", prefix)
 | 
						|
                for port_eid, record in prelim_map.items():
 | 
						|
                    if port_eid.find(prefix) >= 0:
 | 
						|
                        matched_map[port_eid] = record
 | 
						|
 | 
						|
            elif pattern.find("[") > 0:
 | 
						|
                # TODO: regex below might have too many hack escapes
 | 
						|
                match = re.search(r"^([^\[]+)\[(\d+)\.\.(\d+)\]$", pattern)
 | 
						|
                if match.group(0):
 | 
						|
                    if debug_:
 | 
						|
                        print("[group1]: ", match.group(1))
 | 
						|
                        print("[group2]: ", match.group(2))
 | 
						|
                        print("[group3]: ", match.group(3))
 | 
						|
                    prefix = match.group(1)
 | 
						|
                    for port_eid, record in prelim_map.items():
 | 
						|
                        if port_eid.find(prefix) >= 0:
 | 
						|
                            port_suf = record["device"][len(prefix):]
 | 
						|
                            if (port_suf >= match.group(2)) and (port_suf <= match.group(3)):
 | 
						|
                                # print("%s: suffix[%s] between %s:%s" % (port_name, port_name, match.group(2), match.group(3))
 | 
						|
                                matched_map[port_eid] = record
 | 
						|
        except ValueError as e:
 | 
						|
            self.error(e)
 | 
						|
 | 
						|
        return matched_map
 | 
						|
 | 
						|
    def name_to_eid(self, eid, debug=False, non_port=False):
 | 
						|
        if debug:
 | 
						|
            self.logg(level="debug", mesg="name_to_eid: " + str(eid))
 | 
						|
        if (type(eid) is list) or (type(eid) is tuple):
 | 
						|
            return eid
 | 
						|
        return LFUtils.name_to_eid(eid, non_port=non_port)
 | 
						|
 | 
						|
    def wait_for_ip(self, station_list=None, ipv4=True, ipv6=False, timeout_sec=360, debug=False):
 | 
						|
        if not (ipv4 or ipv6):
 | 
						|
            raise ValueError("wait_for_ip: ipv4 and/or ipv6 must be set!")
 | 
						|
        if timeout_sec >= 0:
 | 
						|
            print("Waiting for ips, timeout: %i..." % timeout_sec)
 | 
						|
        else:
 | 
						|
            print("Determining wait time based on mean station association time of stations. "
 | 
						|
                  "Will not wait more that 60 seconds without single association")
 | 
						|
        stas_with_ips = {}
 | 
						|
        sec_elapsed = 0
 | 
						|
        time_extended = False
 | 
						|
        # print(station_list)
 | 
						|
        waiting_states = ["0.0.0.0", "NA", ""]
 | 
						|
        if (station_list is None) or (len(station_list) < 1):
 | 
						|
            raise ValueError("wait_for_ip: expects non-empty list of ports")
 | 
						|
        wait_more = True
 | 
						|
 | 
						|
        while wait_more and (sec_elapsed <= timeout_sec or timeout_sec == -1):
 | 
						|
            wait_more = False
 | 
						|
 | 
						|
            if not time_extended and timeout_sec == -1:
 | 
						|
                wait_more = True
 | 
						|
                num_with_ips = len(stas_with_ips)
 | 
						|
                if sec_elapsed >= 10 and num_with_ips > 0:
 | 
						|
                    time_extended = True
 | 
						|
                    # print(sec_elapsed, num_with_ips, int(sec_elapsed / num_with_ips), len(station_list))
 | 
						|
                    timeout_sec = int(sec_elapsed / num_with_ips * len(station_list))
 | 
						|
                    print("New timeout is %d seconds" % timeout_sec)
 | 
						|
                elif sec_elapsed > 60 and num_with_ips == 0:
 | 
						|
                    timeout_sec = 60
 | 
						|
                    wait_more = False
 | 
						|
 | 
						|
            for sta_eid in station_list:
 | 
						|
                if debug:
 | 
						|
                    print("checking sta-eid: %s" % sta_eid)
 | 
						|
                eid = self.name_to_eid(sta_eid)
 | 
						|
 | 
						|
                response = super().json_get("/port/%s/%s/%s?fields=alias,ip,port+type,ipv6+address" %
 | 
						|
                                            (eid[0], eid[1], eid[2]))
 | 
						|
                # pprint(response)
 | 
						|
 | 
						|
                if (response is None) or ("interface" not in response):
 | 
						|
                    print("station_list: incomplete response:")
 | 
						|
                    pprint(response)
 | 
						|
                    wait_more = True
 | 
						|
                    break
 | 
						|
 | 
						|
                if ipv4:
 | 
						|
                    v = response['interface']
 | 
						|
                    if v['ip'] in waiting_states:
 | 
						|
                        wait_more = True
 | 
						|
                        if debug:
 | 
						|
                            print("Waiting for port %s to get IPv4 Address." % sta_eid)
 | 
						|
                    else:
 | 
						|
                        if sta_eid not in stas_with_ips:
 | 
						|
                            stas_with_ips[sta_eid] = {'ipv4': v['ip']}
 | 
						|
                        if debug:
 | 
						|
                            print("Found IP: %s on port: %s" % (v['ip'], sta_eid))
 | 
						|
 | 
						|
                if ipv6:
 | 
						|
                    v = response['interface']
 | 
						|
                    # print(v)
 | 
						|
                    if v['ipv6 address'] != 'DELETED' and not v['ipv6 address'].startswith('fe80') \
 | 
						|
                            and v['ipv6 address'] != 'AUTO':
 | 
						|
                        if sta_eid not in stas_with_ips:
 | 
						|
                            stas_with_ips[sta_eid] = {'ipv6': v['ip']}
 | 
						|
                        if debug:
 | 
						|
                            print("Found IPv6: %s on port: %s" % (v['ipv6 address'], sta_eid))
 | 
						|
                    else:
 | 
						|
                        wait_more = True
 | 
						|
                        if debug:
 | 
						|
                            print("Waiting for port %s to get IPv6 Address." % sta_eid)
 | 
						|
 | 
						|
            if wait_more:
 | 
						|
                time.sleep(1)
 | 
						|
                sec_elapsed += 1
 | 
						|
 | 
						|
        return not wait_more
 | 
						|
 | 
						|
    def get_curr_num_ips(self, num_sta_with_ips=0, station_list=None, ipv4=True, ipv6=False, debug=False):
 | 
						|
        if debug:
 | 
						|
            print("checking number of stations with ips...")
 | 
						|
        waiting_states = ["0.0.0.0", "NA", ""]
 | 
						|
        if (station_list is None) or (len(station_list) < 1):
 | 
						|
            raise ValueError("check for num curr ips expects non-empty list of ports")
 | 
						|
        for sta_eid in station_list:
 | 
						|
            if debug:
 | 
						|
                print("checking sta-eid: %s" % sta_eid)
 | 
						|
            eid = self.name_to_eid(sta_eid)
 | 
						|
            response = super().json_get("/port/%s/%s/%s?fields=alias,ip,port+type,ipv6+address" %
 | 
						|
                                        (eid[0], eid[1], eid[2]))
 | 
						|
            if debug:
 | 
						|
                pprint(response)
 | 
						|
            if (response is None) or ("interface" not in response):
 | 
						|
                print("station_list: incomplete response:")
 | 
						|
                pprint(response)
 | 
						|
                # wait_more = True
 | 
						|
                break
 | 
						|
            if ipv4:
 | 
						|
                v = response['interface']
 | 
						|
                if v['ip'] in waiting_states:
 | 
						|
                    if debug:
 | 
						|
                        print("Waiting for port %s to get IPv4 Address." % sta_eid)
 | 
						|
                else:
 | 
						|
                    if debug:
 | 
						|
                        print("Found IP: %s on port: %s" % (v['ip'], sta_eid))
 | 
						|
                        print("Incrementing stations with IP addresses found")
 | 
						|
                        num_sta_with_ips += 1
 | 
						|
                    else:
 | 
						|
                        num_sta_with_ips += 1
 | 
						|
            if ipv6:
 | 
						|
                v = response['interface']
 | 
						|
                if v['ip'] in waiting_states:
 | 
						|
                    if debug:
 | 
						|
                        print("Waiting for port %s to get IPv6 Address." % sta_eid)
 | 
						|
 | 
						|
                else:
 | 
						|
                    if debug:
 | 
						|
                        print("Found IP: %s on port: %s" % (v['ip'], sta_eid))
 | 
						|
                        print("Incrementing stations with IP addresses found")
 | 
						|
                        num_sta_with_ips += 1
 | 
						|
                    else:
 | 
						|
                        num_sta_with_ips += 1
 | 
						|
        return num_sta_with_ips
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def duration_time_to_seconds(time_string):
 | 
						|
        if isinstance(time_string, str):
 | 
						|
            pattern = re.compile("^(\d+)([dhms]$)")
 | 
						|
            td = pattern.match(time_string)
 | 
						|
            if td:
 | 
						|
                dur_time = int(td.group(1))
 | 
						|
                dur_measure = str(td.group(2))
 | 
						|
                if dur_measure == "d":
 | 
						|
                    duration_sec = dur_time * 24 * 60 * 60
 | 
						|
                elif dur_measure == "h":
 | 
						|
                    duration_sec = dur_time * 60 * 60
 | 
						|
                elif dur_measure == "m":
 | 
						|
                    duration_sec = dur_time * 60
 | 
						|
                else:
 | 
						|
                    duration_sec = dur_time * 1
 | 
						|
            else:
 | 
						|
                raise ValueError("Unknown value for time_string: %s" % time_string)
 | 
						|
        else:
 | 
						|
            raise ValueError("time_string must be of type str. Type %s provided" % type(time_string))
 | 
						|
        return duration_sec
 | 
						|
 | 
						|
    def remove_all_stations(self, resource):
 | 
						|
        port_list = self.station_list()
 | 
						|
        sta_list = []
 | 
						|
        if port_list:
 | 
						|
            print("Removing all stations")
 | 
						|
            for item in list(port_list):
 | 
						|
                if "sta" in list(item)[0]:
 | 
						|
                    sta_list.append(self.name_to_eid(list(item)[0])[2])
 | 
						|
 | 
						|
            for sta_name in sta_list:
 | 
						|
                req_url = "cli-json/rm_vlan"
 | 
						|
                data = {
 | 
						|
                    "shelf": 1,
 | 
						|
                    "resource": resource,
 | 
						|
                    "port": sta_name
 | 
						|
                }
 | 
						|
                self.json_post(req_url, data)
 | 
						|
 | 
						|
    def remove_all_endps(self):
 | 
						|
        endp_list = self.json_get("/endp/list")
 | 
						|
        if "items" in endp_list or "empty" in endp_list:
 | 
						|
            return
 | 
						|
        if endp_list:
 | 
						|
            print("Removing all endps")
 | 
						|
            endp_list = list(endp_list['endpoint'])
 | 
						|
            for endp_name in range(len(endp_list)):
 | 
						|
                name = list(endp_list[endp_name])[0]
 | 
						|
                req_url = "cli-json/rm_endp"
 | 
						|
                data = {
 | 
						|
                    "endp_name": name
 | 
						|
                }
 | 
						|
                self.json_post(req_url, data)
 | 
						|
 | 
						|
    def remove_all_cxs(self, remove_all_endpoints=False):
 | 
						|
        # remove cross connects
 | 
						|
        # remove endpoints
 | 
						|
        # nc show endpoints
 | 
						|
        # nc show cross connects
 | 
						|
        if self.cx_list():
 | 
						|
            cx_list = list(self.cx_list())
 | 
						|
            not_cx = ['warnings', 'errors', 'handler', 'uri', 'items', 'empty']
 | 
						|
            if cx_list:
 | 
						|
                print("Removing all cxs")
 | 
						|
                for cx_name in cx_list:
 | 
						|
                    if cx_name in not_cx:
 | 
						|
                        continue
 | 
						|
                    req_url = "cli-json/rm_cx"
 | 
						|
                    data = {
 | 
						|
                        "test_mgr": "default_tm",
 | 
						|
                        "cx_name": cx_name
 | 
						|
                    }
 | 
						|
                    self.json_post(req_url, data)
 | 
						|
        else:
 | 
						|
            print("no cxs to remove")
 | 
						|
 | 
						|
        if remove_all_endpoints:
 | 
						|
            self.remove_all_endps()
 | 
						|
            req_url = "cli-json/nc_show_endpoints"
 | 
						|
            data = {
 | 
						|
                "endpoint": "all"
 | 
						|
            }
 | 
						|
            self.json_post(req_url, data)
 | 
						|
 | 
						|
    def parse_link(self, link):
 | 
						|
        link = self.lfclient_url + link
 | 
						|
        info = ()
 | 
						|
 | 
						|
    def new_station_profile(self):
 | 
						|
        return StationProfile(self.lfclient_url, local_realm=self, debug_=self.debug, up=False)
 | 
						|
 | 
						|
    def new_multicast_profile(self):
 | 
						|
        return MULTICASTProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug, report_timer_=3000)
 | 
						|
 | 
						|
    def new_wifi_monitor_profile(self, resource_=1, debug_=False, up_=False):
 | 
						|
        return WifiMonitor(self.lfclient_url,
 | 
						|
                           local_realm=self,
 | 
						|
                           resource_=resource_,
 | 
						|
                           up=up_,
 | 
						|
                           debug_=(self.debug or debug_))
 | 
						|
 | 
						|
    def new_l3_cx_profile(self):
 | 
						|
        return L3CXProfile(self.lfclient_host,
 | 
						|
                           self.lfclient_port,
 | 
						|
                           local_realm=self,
 | 
						|
                           debug_=self.debug,
 | 
						|
                           report_timer_=3000)
 | 
						|
 | 
						|
    def new_l4_cx_profile(self):
 | 
						|
        return L4CXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_attenuator_profile(self):
 | 
						|
        return ATTENUATORProfile(self.lfclient_host, self.lfclient_port, debug_=self.debug)
 | 
						|
 | 
						|
    def new_generic_endp_profile(self):
 | 
						|
        return GenCXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_generic_cx_profile(self):
 | 
						|
        """
 | 
						|
        @deprecated
 | 
						|
        :return: new GenCXProfile
 | 
						|
        """
 | 
						|
        return GenCXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_vap_profile(self):
 | 
						|
        return VAPProfile(lfclient_host=self.lfclient_host, lfclient_port=self.lfclient_port, local_realm=self,
 | 
						|
                          debug_=self.debug)
 | 
						|
 | 
						|
    # def new_vr_profile(self):
 | 
						|
    # return VRProfile(local_realm=self,
 | 
						|
    # debug=self.debug)
 | 
						|
 | 
						|
    def new_http_profile(self):
 | 
						|
        return HTTPProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_fio_endp_profile(self):
 | 
						|
        return FIOEndpProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_dut_profile(self):
 | 
						|
        return DUTProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_mvlan_profile(self):
 | 
						|
        return MACVLANProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_qvlan_profile(self):
 | 
						|
        return QVLANProfile(self.host, self.port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_test_group_profile(self):
 | 
						|
        return TestGroupProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
 | 
						|
 | 
						|
    def new_lf_data_collection(self):
 | 
						|
        return LFDataCollection(local_realm=self)
 | 
						|
 | 
						|
 | 
						|
class PacketFilter:
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_filter_wlan_assoc_packets(ap_mac, sta_mac):
 | 
						|
        return "-T fields -e wlan.fc.type_subtype -e wlan.addr -e wlan.fc.pwrmgt " \
 | 
						|
               "-Y \"(wlan.addr==%s or wlan.addr==%s) and wlan.fc.type_subtype<=3\" " % (ap_mac, sta_mac)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_filter_wlan_null_packets(ap_mac, sta_mac):
 | 
						|
        return "-T fields -e wlan.fc.type_subtype -e wlan.addr -e wlan.fc.pwrmgt " \
 | 
						|
               "-Y \"(wlan.addr==%s or wlan.addr==%s) and wlan.fc.type_subtype==44\" " % (ap_mac, sta_mac)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def run_filter(pcap_file, file_filter):
 | 
						|
        filename = "/tmp/tshark_dump.txt"
 | 
						|
        cmd = "tshark -r %s %s > %s" % (pcap_file, file_filter, filename)
 | 
						|
        # print("CMD: ", cmd)
 | 
						|
        os.system(cmd)
 | 
						|
        lines = []
 | 
						|
        with open(filename) as tshark_file:
 | 
						|
            for line in tshark_file:
 | 
						|
                lines.append(line.rstrip())
 | 
						|
 | 
						|
        return lines
 |