mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-03 04:07:52 +00:00
VAPProfile now able to create vaps, added factory method to realm for VAPProfile
This commit is contained in:
273
py-json/realm.py
273
py-json/realm.py
@@ -8,6 +8,7 @@ from LANforge import LFUtils
|
||||
from LANforge import set_port
|
||||
from LANforge import add_sta
|
||||
from LANforge import lfcli_base
|
||||
from LANforge import add_vap
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from generic_cx import GenericCx
|
||||
from LANforge import add_monitor
|
||||
@@ -483,9 +484,13 @@ class Realm(LFCliBase):
|
||||
return cx_prof
|
||||
|
||||
def new_generic_cx_profile(self):
|
||||
cx_prof = GenCXProfile(self.lfclient_host, self.lfclient_port,local_realm=self, debug_=self.debug)
|
||||
cx_prof = GenCXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug)
|
||||
return cx_prof
|
||||
|
||||
def new_vap_profile(self):
|
||||
vap_prof = VAPProfile(lfclient_url=self.lfclient_url, local_realm=self, debug_=self.debug)
|
||||
return vap_prof
|
||||
|
||||
class MULTICASTProfile(LFCliBase):
|
||||
def __init__(self, lfclient_host, lfclient_port, local_realm,
|
||||
side_a_min_bps=None, side_b_min_bps=None,
|
||||
@@ -1321,39 +1326,267 @@ class WifiMonitor:
|
||||
|
||||
# "sniff_port 1 %s %s NA %s %s.pcap %i"%(r, m, sflags, m, int(dur))
|
||||
|
||||
class VAPProfile:
|
||||
def __init__(self, lfclient_url, local_realm, radio, ssid="NA", ssid_pass="NA", mode=0, debug_=False):
|
||||
class VAPProfile(LFCliBase):
|
||||
def __init__(self, lfclient_url, local_realm, vap_name="", ssid="NA", ssid_pass="NA", mode=0, use_ht160=False, debug_=False):
|
||||
self.debug = debug_
|
||||
self.lfclient_url = lfclient_url
|
||||
self.ssid = ssid
|
||||
self.ssid_pass = ssid_pass
|
||||
self.mode = mode
|
||||
self.local_realm = local_realm
|
||||
self.radio = radio
|
||||
self.created_vaps = []
|
||||
self.vap_name = vap_name
|
||||
self.use_ht160 = use_ht160
|
||||
self.COMMANDS = ["add_vap", "set_port"]
|
||||
self.desired_add_vap_flags = ["wpa2_enable", "80211u_enable", "create_admin_down"]
|
||||
self.desired_add_vap_flags_mask = ["wpa2_enable", "80211u_enable", "create_admin_down"]
|
||||
self.add_vap_data = {
|
||||
"shelf": 1,
|
||||
"resource": 1,
|
||||
"radio": None,
|
||||
"ap_name": None,
|
||||
"flags": 0,
|
||||
"flags_mask": 0,
|
||||
"ssid": None,
|
||||
"key": None,
|
||||
"mac": "xx:xx:xx:xx:*:xx"
|
||||
}
|
||||
|
||||
self.desired_set_port_cmd_flags = []
|
||||
self.desired_set_port_current_flags = ["if_down"]
|
||||
self.desired_set_port_interest_flags = ["current_flags", "ifdown"]
|
||||
self.set_port_data = {
|
||||
"shelf": 1,
|
||||
"resource": 1,
|
||||
"port": None,
|
||||
"current_flags": 0,
|
||||
"interest": 0, # (0x2 + 0x4000 + 0x800000) # current, dhcp, down
|
||||
}
|
||||
|
||||
if self.use_ht160:
|
||||
self.desired_add_vap_flags.append("ht160_enable")
|
||||
self.desired_add_vap_flags_mask.append("ht160_enable")
|
||||
|
||||
def admin_up(self, resource):
|
||||
set_port_r = LFRequest.LFRequest(self.lfclient_url, "/cli-json/set_port", debug_=self.debug)
|
||||
req_json = LFUtils.portUpRequest(resource, None, debug_on=False)
|
||||
for vap_name in self.created_vaps:
|
||||
req_json["port"] = vap_name
|
||||
set_port_r.addPostData(req_json)
|
||||
json_response = set_port_r.jsonPost(self.debug)
|
||||
time.sleep(0.03)
|
||||
req_json["ap_name"] = self.vap_name
|
||||
set_port_r.addPostData(req_json)
|
||||
json_response = set_port_r.jsonPost(self.debug)
|
||||
time.sleep(0.03)
|
||||
|
||||
def admin_down(self, resource):
|
||||
set_port_r = LFRequest.LFRequest(self.lfclient_url, "/cli-json/set_port", debug_=self.debug)
|
||||
req_json = LFUtils.portDownRequest(resource, None, debug_on=False)
|
||||
for vap_name in self.created_vaps:
|
||||
req_json["port"] = vap_name
|
||||
set_port_r.addPostData(req_json)
|
||||
json_response = set_port_r.jsonPost(self.debug)
|
||||
time.sleep(0.03)
|
||||
req_json = LFUtils.port_down_request(resource, None, debug_on=False)
|
||||
req_json["ap_name"] = self.vap_name
|
||||
set_port_r.addPostData(req_json)
|
||||
json_response = set_port_r.jsonPost(self.debug)
|
||||
time.sleep(0.03)
|
||||
|
||||
def create(self):
|
||||
pass
|
||||
def use_security(self, security_type, ssid=None, passwd=None):
|
||||
types = {"wep": "wep_enable", "wpa": "wpa_enable", "wpa2": "wpa2_enable", "wpa3": "use-wpa3", "open": "[BLANK]"}
|
||||
self.add_vap_data["ssid"] = ssid
|
||||
if security_type in types.keys():
|
||||
if (ssid is None) or (ssid == ""):
|
||||
raise ValueError("use_security: %s requires ssid" % security_type)
|
||||
if (passwd is None) or (passwd == ""):
|
||||
raise ValueError("use_security: %s requires passphrase or [BLANK]" % security_type)
|
||||
for name in types.values():
|
||||
if name in self.desired_add_vap_flags and name in self.desired_add_vap_flags_mask:
|
||||
self.desired_add_vap_flags.remove(name)
|
||||
self.desired_add_vap_flags_mask.remove(name)
|
||||
if security_type != "open":
|
||||
self.desired_add_vap_flags.append(types[security_type])
|
||||
self.desired_add_vap_flags_mask.append(types[security_type])
|
||||
else:
|
||||
passwd = "[BLANK]"
|
||||
self.set_command_param("add_vap", "ssid", ssid)
|
||||
self.set_command_param("add_vap", "key", passwd)
|
||||
# unset any other security flag before setting our present flags
|
||||
if security_type == "wpa3":
|
||||
self.set_command_param("add_vap", "ieee80211w", 2)
|
||||
|
||||
def set_command_flag(self, command_name, param_name, value):
|
||||
# we have to check what the param name is
|
||||
if (command_name is None) or (command_name == ""):
|
||||
return
|
||||
if (param_name is None) or (param_name == ""):
|
||||
return
|
||||
if command_name not in self.COMMANDS:
|
||||
print("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS))
|
||||
return
|
||||
if command_name == "add_vap":
|
||||
if (param_name not in add_vap.add_vap_flags):
|
||||
print("Parameter name [%s] not defined in add_vap.py" % param_name)
|
||||
if self.debug:
|
||||
pprint(add_vap.add_vap_flags)
|
||||
return
|
||||
if (value == 1) and (param_name not in self.desired_add_vap_flags):
|
||||
self.desired_add_vap_flags.append(param_name)
|
||||
self.desired_add_vap_flags_mask.append(param_name)
|
||||
elif value == 0:
|
||||
self.desired_add_vap_flags.remove(param_name)
|
||||
self.desired_add_vap_flags_mask.append(param_name)
|
||||
|
||||
elif command_name == "set_port":
|
||||
if (param_name not in set_port.set_port_current_flags) and (param_name not in set_port.set_port_cmd_flags) and (param_name not in set_port.set_port_interest_flags):
|
||||
print("Parameter name [%s] not defined in set_port.py" % param_name)
|
||||
if self.debug:
|
||||
pprint(set_port.set_port_cmd_flags)
|
||||
pprint(set_port.set_port_current_flags)
|
||||
pprint(set_port.set_port_interest_flags)
|
||||
return
|
||||
if param_name in set_port.set_port_cmd_flags:
|
||||
if (value == 1) and (param_name not in self.desired_set_port_cmd_flags):
|
||||
self.desired_set_port_cmd_flags.append(param_name)
|
||||
elif value == 0:
|
||||
self.desired_set_port_cmd_flags.remove(param_name)
|
||||
elif param_name in set_port.set_port_current_flags:
|
||||
if (value == 1) and (param_name not in self.desired_set_port_current_flags):
|
||||
self.desired_set_port_current_flags.append(param_name)
|
||||
elif value == 0:
|
||||
self.desired_set_port_current_flags.remove(param_name)
|
||||
elif param_name in set_port.set_port_interest_flags:
|
||||
if (value == 1) and (param_name not in self.desired_set_port_interest_flags):
|
||||
self.desired_set_port_interest_flags.append(param_name)
|
||||
elif value == 0:
|
||||
self.desired_set_port_interest_flags.remove(param_name)
|
||||
else:
|
||||
raise ValueError("Unknown param name: " + param_name)
|
||||
|
||||
def set_command_param(self, command_name, param_name, param_value):
|
||||
# we have to check what the param name is
|
||||
if (command_name is None) or (command_name == ""):
|
||||
return
|
||||
if (param_name is None) or (param_name == ""):
|
||||
return
|
||||
if command_name not in self.COMMANDS:
|
||||
self.error("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS))
|
||||
return
|
||||
if command_name == "add_vap":
|
||||
self.add_vap_data[param_name] = param_value
|
||||
elif command_name == "set_port":
|
||||
self.set_port_data[param_name] = param_value
|
||||
|
||||
def add_named_flags(self, desired_list, command_ref):
|
||||
if desired_list is None:
|
||||
raise ValueError("addNamedFlags wants a list of desired flag names")
|
||||
if len(desired_list) < 1:
|
||||
print("addNamedFlags: empty desired list")
|
||||
return 0
|
||||
if (command_ref is None) or (len(command_ref) < 1):
|
||||
raise ValueError("addNamedFlags wants a maps of flag values")
|
||||
|
||||
result = 0
|
||||
for name in desired_list:
|
||||
if (name is None) or (name == ""):
|
||||
continue
|
||||
if name not in command_ref:
|
||||
if self.debug:
|
||||
pprint(command_ref)
|
||||
raise ValueError("flag %s not in map" % name)
|
||||
result += command_ref[name]
|
||||
|
||||
return result
|
||||
|
||||
def create(self, resource, radio, up_=None, debug=False, suppress_related_commands_=True):
|
||||
if up_ is not None:
|
||||
self.up = up_
|
||||
if self.up:
|
||||
if "create_admin_down" in self.desired_add_vap_flags:
|
||||
del self.desired_add_vap_flags[self.desired_add_vap_flags.index("create_admin_down")]
|
||||
elif "create_admin_down" not in self.desired_add_vap_flags:
|
||||
self.desired_add_vap_flags.append("create_admin_down")
|
||||
|
||||
# create stations down, do set_port on them, then set stations up
|
||||
self.add_vap_data["flags"] = self.add_named_flags(self.desired_add_vap_flags, add_vap.add_vap_flags)
|
||||
self.add_vap_data["flags_mask"] = self.add_named_flags(self.desired_add_vap_flags_mask, add_vap.add_vap_flags)
|
||||
self.add_vap_data["radio"] = radio
|
||||
|
||||
self.add_vap_data["resource"] = resource
|
||||
self.set_port_data["current_flags"] = self.add_named_flags(self.desired_set_port_current_flags,
|
||||
set_port.set_port_current_flags)
|
||||
self.set_port_data["interest"] = self.add_named_flags(self.desired_set_port_interest_flags,
|
||||
set_port.set_port_interest_flags)
|
||||
# these are unactivated LFRequest objects that we can modify and
|
||||
# re-use inside a loop, reducing the number of object creations
|
||||
add_vap_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/add_vap")
|
||||
set_port_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/set_port")
|
||||
|
||||
if suppress_related_commands_:
|
||||
self.add_vap_data["suppress_preexec_cli"] = "yes"
|
||||
self.add_vap_data["suppress_preexec_method"] = 1
|
||||
self.set_port_data["suppress_preexec_cli"] = "yes"
|
||||
self.set_port_data["suppress_preexec_method"] = 1
|
||||
|
||||
# pprint(self.station_names)
|
||||
# exit(1)
|
||||
self.set_port_data["port"] = self.vap_name
|
||||
self.add_vap_data["ap_name"] = self.vap_name
|
||||
add_vap_r.addPostData(self.add_vap_data)
|
||||
if debug:
|
||||
print("- 1502 - %s- - - - - - - - - - - - - - - - - - " % self.vap_name)
|
||||
pprint(self.add_vap_data)
|
||||
pprint(self.set_port_data)
|
||||
pprint(add_vap_r)
|
||||
print("- ~1502 - - - - - - - - - - - - - - - - - - - ")
|
||||
|
||||
json_response = add_vap_r.jsonPost(debug)
|
||||
# time.sleep(0.03)
|
||||
time.sleep(2)
|
||||
set_port_r.addPostData(self.set_port_data)
|
||||
json_response = set_port_r.jsonPost(debug)
|
||||
time.sleep(0.03)
|
||||
|
||||
if (self.up):
|
||||
self.admin_up(1)
|
||||
|
||||
|
||||
def cleanup(self, resource, desired_ports=None, delay=0.03):
|
||||
print("Cleaning up vaps")
|
||||
req_url = "/cli-json/rm_vlan"
|
||||
data = {
|
||||
"shelf": 1,
|
||||
"resource": resource,
|
||||
"port": None
|
||||
}
|
||||
if (desired_ports is not None):
|
||||
if len(desired_ports) < 1:
|
||||
print("No stations requested for cleanup, returning.")
|
||||
return
|
||||
names = ','.join(desired_ports)
|
||||
current_stations = self.local_realm.json_get("/port/1/%s/%s?fields=alias" % (resource, names))
|
||||
if current_stations is None:
|
||||
return
|
||||
if "interfaces" in current_stations:
|
||||
for station in current_stations['interfaces']:
|
||||
for eid, info in station.items():
|
||||
data["port"] = info["alias"]
|
||||
self.local_realm.json_post(req_url, data, debug_=self.debug)
|
||||
time.sleep(delay)
|
||||
|
||||
if "interface" in current_stations:
|
||||
data["port"] = current_stations["interface"]["alias"]
|
||||
self.local_realm.json_post(req_url, data, debug_=self.debug)
|
||||
|
||||
return
|
||||
|
||||
names = ','.join(self.station_names)
|
||||
current_stations = self.local_realm.json_get("/port/1/%s/%s?fields=alias" % (resource, names))
|
||||
if current_stations is None or current_stations['interfaces'] is None:
|
||||
print("No stations to clean up")
|
||||
return
|
||||
|
||||
if "interfaces" in current_stations:
|
||||
for station in current_stations['interfaces']:
|
||||
for eid, info in station.items():
|
||||
data["port"] = info["alias"]
|
||||
self.local_realm.json_post(req_url, data, debug_=self.debug)
|
||||
time.sleep(delay)
|
||||
|
||||
if "interface" in current_stations:
|
||||
data["port"] = current_stations["interface"]["alias"]
|
||||
self.local_realm.json_post(req_url, data, debug_=self.debug)
|
||||
|
||||
|
||||
|
||||
# use the station profile to set the combination of features you want on your stations
|
||||
# once this combination is configured, build the stations with the build(resource, radio, number) call
|
||||
@@ -1408,8 +1641,6 @@ class StationProfile:
|
||||
"interest": 0, # (0x2 + 0x4000 + 0x800000) # current, dhcp, down,
|
||||
}
|
||||
|
||||
# TODO: create use_wpa3()
|
||||
|
||||
def use_security(self, security_type, ssid=None, passwd=None):
|
||||
types = {"wep": "wep_enable", "wpa": "wpa_enable", "wpa2": "wpa2_enable", "wpa3": "use-wpa3", "open": "[BLANK]"}
|
||||
self.add_sta_data["ssid"] = ssid
|
||||
|
||||
Reference in New Issue
Block a user