From 0da7e94581b7a0a6cadaf6c00e43a4fbadd9b827 Mon Sep 17 00:00:00 2001 From: Logan Lipke Date: Fri, 31 Jul 2020 16:31:27 -0700 Subject: [PATCH] VAPProfile now able to create vaps, added factory method to realm for VAPProfile --- py-json/realm.py | 273 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 252 insertions(+), 21 deletions(-) diff --git a/py-json/realm.py b/py-json/realm.py index 26a8a16e..1b00d4b7 100755 --- a/py-json/realm.py +++ b/py-json/realm.py @@ -8,6 +8,7 @@ from LANforge import LFUtils from LANforge import set_port from LANforge import add_sta from LANforge import lfcli_base +from LANforge import add_vap from LANforge.lfcli_base import LFCliBase from generic_cx import GenericCx from LANforge import add_monitor @@ -483,9 +484,13 @@ class Realm(LFCliBase): return cx_prof def new_generic_cx_profile(self): - cx_prof = GenCXProfile(self.lfclient_host, self.lfclient_port,local_realm=self, debug_=self.debug) + cx_prof = GenCXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) return cx_prof + def new_vap_profile(self): + vap_prof = VAPProfile(lfclient_url=self.lfclient_url, local_realm=self, debug_=self.debug) + return vap_prof + class MULTICASTProfile(LFCliBase): def __init__(self, lfclient_host, lfclient_port, local_realm, side_a_min_bps=None, side_b_min_bps=None, @@ -1321,39 +1326,267 @@ class WifiMonitor: # "sniff_port 1 %s %s NA %s %s.pcap %i"%(r, m, sflags, m, int(dur)) -class VAPProfile: - def __init__(self, lfclient_url, local_realm, radio, ssid="NA", ssid_pass="NA", mode=0, debug_=False): +class VAPProfile(LFCliBase): + def __init__(self, lfclient_url, local_realm, vap_name="", ssid="NA", ssid_pass="NA", mode=0, use_ht160=False, debug_=False): self.debug = debug_ self.lfclient_url = lfclient_url self.ssid = ssid self.ssid_pass = ssid_pass self.mode = mode self.local_realm = local_realm - self.radio = radio - self.created_vaps = [] + self.vap_name = vap_name + self.use_ht160 = use_ht160 + self.COMMANDS = ["add_vap", "set_port"] + self.desired_add_vap_flags = ["wpa2_enable", "80211u_enable", "create_admin_down"] + self.desired_add_vap_flags_mask = ["wpa2_enable", "80211u_enable", "create_admin_down"] + self.add_vap_data = { + "shelf": 1, + "resource": 1, + "radio": None, + "ap_name": None, + "flags": 0, + "flags_mask": 0, + "ssid": None, + "key": None, + "mac": "xx:xx:xx:xx:*:xx" + } + + self.desired_set_port_cmd_flags = [] + self.desired_set_port_current_flags = ["if_down"] + self.desired_set_port_interest_flags = ["current_flags", "ifdown"] + self.set_port_data = { + "shelf": 1, + "resource": 1, + "port": None, + "current_flags": 0, + "interest": 0, # (0x2 + 0x4000 + 0x800000) # current, dhcp, down + } + + if self.use_ht160: + self.desired_add_vap_flags.append("ht160_enable") + self.desired_add_vap_flags_mask.append("ht160_enable") def admin_up(self, resource): set_port_r = LFRequest.LFRequest(self.lfclient_url, "/cli-json/set_port", debug_=self.debug) req_json = LFUtils.portUpRequest(resource, None, debug_on=False) - for vap_name in self.created_vaps: - req_json["port"] = vap_name - set_port_r.addPostData(req_json) - json_response = set_port_r.jsonPost(self.debug) - time.sleep(0.03) + req_json["ap_name"] = self.vap_name + set_port_r.addPostData(req_json) + json_response = set_port_r.jsonPost(self.debug) + time.sleep(0.03) def admin_down(self, resource): set_port_r = LFRequest.LFRequest(self.lfclient_url, "/cli-json/set_port", debug_=self.debug) - req_json = LFUtils.portDownRequest(resource, None, debug_on=False) - for vap_name in self.created_vaps: - req_json["port"] = vap_name - set_port_r.addPostData(req_json) - json_response = set_port_r.jsonPost(self.debug) - time.sleep(0.03) + req_json = LFUtils.port_down_request(resource, None, debug_on=False) + req_json["ap_name"] = self.vap_name + set_port_r.addPostData(req_json) + json_response = set_port_r.jsonPost(self.debug) + time.sleep(0.03) - def create(self): - pass + def use_security(self, security_type, ssid=None, passwd=None): + types = {"wep": "wep_enable", "wpa": "wpa_enable", "wpa2": "wpa2_enable", "wpa3": "use-wpa3", "open": "[BLANK]"} + self.add_vap_data["ssid"] = ssid + if security_type in types.keys(): + if (ssid is None) or (ssid == ""): + raise ValueError("use_security: %s requires ssid" % security_type) + if (passwd is None) or (passwd == ""): + raise ValueError("use_security: %s requires passphrase or [BLANK]" % security_type) + for name in types.values(): + if name in self.desired_add_vap_flags and name in self.desired_add_vap_flags_mask: + self.desired_add_vap_flags.remove(name) + self.desired_add_vap_flags_mask.remove(name) + if security_type != "open": + self.desired_add_vap_flags.append(types[security_type]) + self.desired_add_vap_flags_mask.append(types[security_type]) + else: + passwd = "[BLANK]" + self.set_command_param("add_vap", "ssid", ssid) + self.set_command_param("add_vap", "key", passwd) + # unset any other security flag before setting our present flags + if security_type == "wpa3": + self.set_command_param("add_vap", "ieee80211w", 2) + + def set_command_flag(self, command_name, param_name, value): + # we have to check what the param name is + if (command_name is None) or (command_name == ""): + return + if (param_name is None) or (param_name == ""): + return + if command_name not in self.COMMANDS: + print("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS)) + return + if command_name == "add_vap": + if (param_name not in add_vap.add_vap_flags): + print("Parameter name [%s] not defined in add_vap.py" % param_name) + if self.debug: + pprint(add_vap.add_vap_flags) + return + if (value == 1) and (param_name not in self.desired_add_vap_flags): + self.desired_add_vap_flags.append(param_name) + self.desired_add_vap_flags_mask.append(param_name) + elif value == 0: + self.desired_add_vap_flags.remove(param_name) + self.desired_add_vap_flags_mask.append(param_name) + + elif command_name == "set_port": + if (param_name not in set_port.set_port_current_flags) and (param_name not in set_port.set_port_cmd_flags) and (param_name not in set_port.set_port_interest_flags): + print("Parameter name [%s] not defined in set_port.py" % param_name) + if self.debug: + pprint(set_port.set_port_cmd_flags) + pprint(set_port.set_port_current_flags) + pprint(set_port.set_port_interest_flags) + return + if param_name in set_port.set_port_cmd_flags: + if (value == 1) and (param_name not in self.desired_set_port_cmd_flags): + self.desired_set_port_cmd_flags.append(param_name) + elif value == 0: + self.desired_set_port_cmd_flags.remove(param_name) + elif param_name in set_port.set_port_current_flags: + if (value == 1) and (param_name not in self.desired_set_port_current_flags): + self.desired_set_port_current_flags.append(param_name) + elif value == 0: + self.desired_set_port_current_flags.remove(param_name) + elif param_name in set_port.set_port_interest_flags: + if (value == 1) and (param_name not in self.desired_set_port_interest_flags): + self.desired_set_port_interest_flags.append(param_name) + elif value == 0: + self.desired_set_port_interest_flags.remove(param_name) + else: + raise ValueError("Unknown param name: " + param_name) + + def set_command_param(self, command_name, param_name, param_value): + # we have to check what the param name is + if (command_name is None) or (command_name == ""): + return + if (param_name is None) or (param_name == ""): + return + if command_name not in self.COMMANDS: + self.error("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS)) + return + if command_name == "add_vap": + self.add_vap_data[param_name] = param_value + elif command_name == "set_port": + self.set_port_data[param_name] = param_value + + def add_named_flags(self, desired_list, command_ref): + if desired_list is None: + raise ValueError("addNamedFlags wants a list of desired flag names") + if len(desired_list) < 1: + print("addNamedFlags: empty desired list") + return 0 + if (command_ref is None) or (len(command_ref) < 1): + raise ValueError("addNamedFlags wants a maps of flag values") + + result = 0 + for name in desired_list: + if (name is None) or (name == ""): + continue + if name not in command_ref: + if self.debug: + pprint(command_ref) + raise ValueError("flag %s not in map" % name) + result += command_ref[name] + + return result + + def create(self, resource, radio, up_=None, debug=False, suppress_related_commands_=True): + if up_ is not None: + self.up = up_ + if self.up: + if "create_admin_down" in self.desired_add_vap_flags: + del self.desired_add_vap_flags[self.desired_add_vap_flags.index("create_admin_down")] + elif "create_admin_down" not in self.desired_add_vap_flags: + self.desired_add_vap_flags.append("create_admin_down") + + # create stations down, do set_port on them, then set stations up + self.add_vap_data["flags"] = self.add_named_flags(self.desired_add_vap_flags, add_vap.add_vap_flags) + self.add_vap_data["flags_mask"] = self.add_named_flags(self.desired_add_vap_flags_mask, add_vap.add_vap_flags) + self.add_vap_data["radio"] = radio + + self.add_vap_data["resource"] = resource + self.set_port_data["current_flags"] = self.add_named_flags(self.desired_set_port_current_flags, + set_port.set_port_current_flags) + self.set_port_data["interest"] = self.add_named_flags(self.desired_set_port_interest_flags, + set_port.set_port_interest_flags) + # these are unactivated LFRequest objects that we can modify and + # re-use inside a loop, reducing the number of object creations + add_vap_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/add_vap") + set_port_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/set_port") + + if suppress_related_commands_: + self.add_vap_data["suppress_preexec_cli"] = "yes" + self.add_vap_data["suppress_preexec_method"] = 1 + self.set_port_data["suppress_preexec_cli"] = "yes" + self.set_port_data["suppress_preexec_method"] = 1 + + # pprint(self.station_names) + # exit(1) + self.set_port_data["port"] = self.vap_name + self.add_vap_data["ap_name"] = self.vap_name + add_vap_r.addPostData(self.add_vap_data) + if debug: + print("- 1502 - %s- - - - - - - - - - - - - - - - - - " % self.vap_name) + pprint(self.add_vap_data) + pprint(self.set_port_data) + pprint(add_vap_r) + print("- ~1502 - - - - - - - - - - - - - - - - - - - ") + + json_response = add_vap_r.jsonPost(debug) + # time.sleep(0.03) + time.sleep(2) + set_port_r.addPostData(self.set_port_data) + json_response = set_port_r.jsonPost(debug) + time.sleep(0.03) + + if (self.up): + self.admin_up(1) + + + def cleanup(self, resource, desired_ports=None, delay=0.03): + print("Cleaning up vaps") + req_url = "/cli-json/rm_vlan" + data = { + "shelf": 1, + "resource": resource, + "port": None + } + if (desired_ports is not None): + if len(desired_ports) < 1: + print("No stations requested for cleanup, returning.") + return + names = ','.join(desired_ports) + current_stations = self.local_realm.json_get("/port/1/%s/%s?fields=alias" % (resource, names)) + if current_stations is None: + return + if "interfaces" in current_stations: + for station in current_stations['interfaces']: + for eid, info in station.items(): + data["port"] = info["alias"] + self.local_realm.json_post(req_url, data, debug_=self.debug) + time.sleep(delay) + + if "interface" in current_stations: + data["port"] = current_stations["interface"]["alias"] + self.local_realm.json_post(req_url, data, debug_=self.debug) + + return + + names = ','.join(self.station_names) + current_stations = self.local_realm.json_get("/port/1/%s/%s?fields=alias" % (resource, names)) + if current_stations is None or current_stations['interfaces'] is None: + print("No stations to clean up") + return + + if "interfaces" in current_stations: + for station in current_stations['interfaces']: + for eid, info in station.items(): + data["port"] = info["alias"] + self.local_realm.json_post(req_url, data, debug_=self.debug) + time.sleep(delay) + + if "interface" in current_stations: + data["port"] = current_stations["interface"]["alias"] + self.local_realm.json_post(req_url, data, debug_=self.debug) - # use the station profile to set the combination of features you want on your stations # once this combination is configured, build the stations with the build(resource, radio, number) call @@ -1408,8 +1641,6 @@ class StationProfile: "interest": 0, # (0x2 + 0x4000 + 0x800000) # current, dhcp, down, } - # TODO: create use_wpa3() - def use_security(self, security_type, ssid=None, passwd=None): types = {"wep": "wep_enable", "wpa": "wpa_enable", "wpa2": "wpa2_enable", "wpa3": "use-wpa3", "open": "[BLANK]"} self.add_sta_data["ssid"] = ssid