[dhcp_server] add config dhcp server range (#17741)

* add range related function and ut
This commit is contained in:
Xichen96
2024-01-17 11:24:57 +08:00
committed by GitHub
parent 00fa56760f
commit a100f15ba2
4 changed files with 217 additions and 15 deletions

View File

@@ -8,7 +8,7 @@ import string
SUPPORT_TYPE = ["binary", "boolean", "ipv4-address", "string", "uint8", "uint16", "uint32"]
def validate_str_type(type, value):
def validate_str_type(type_, value):
"""
To validate whether type is consistent with string value
Args:
@@ -20,27 +20,27 @@ def validate_str_type(type, value):
"""
if not isinstance(value, str):
return False
if type not in SUPPORT_TYPE:
if type_ not in SUPPORT_TYPE:
return False
if type == "string":
if type_ == "string":
return True
if type == "binary":
if type_ == "binary":
if len(value) == 0 or len(value) % 2 != 0:
return False
return all(c in set(string.hexdigits) for c in value)
if type == "boolean":
if type_ == "boolean":
return value in ["true", "false"]
if type == "ipv4-address":
if type_ == "ipv4-address":
try:
if len(value.split(".")) != 4:
return False
return ipaddress.ip_address(value).version == 4
except ValueError:
return False
if type.startswith("uint"):
if type_.startswith("uint"):
if not value.isdigit():
return False
length = int("".join([c for c in type if c.isdigit()]))
length = int("".join([c for c in type_ if c.isdigit()]))
return 0 <= int(value) <= int(pow(2, length)) - 1
return False
@@ -186,6 +186,78 @@ def dhcp_server_ipv4_disable(db, dhcp_interface):
ctx.fail("Failed to disable, dhcp interface {} does not exist".format(dhcp_interface))
@dhcp_server_ipv4.group(cls=clicommon.AliasedGroup, name="range")
def dhcp_server_ipv4_range():
pass
def count_ipv4(start, end):
ip1 = int(ipaddress.IPv4Address(start))
ip2 = int(ipaddress.IPv4Address(end))
return ip2 - ip1 + 1
@dhcp_server_ipv4_range.command(name="add")
@click.argument("range_name", required=True)
@click.argument("ip_start", required=True)
@click.argument("ip_end", required=False)
@clicommon.pass_db
def dhcp_server_ipv4_range_add(db, range_name, ip_start, ip_end):
ctx = click.get_current_context()
if not ip_end:
ip_end = ip_start
if not validate_str_type("ipv4-address", ip_start) or not validate_str_type("ipv4-address", ip_end):
ctx.fail("ip_start or ip_end is not valid ipv4 address")
if count_ipv4(ip_start, ip_end) < 1:
ctx.fail("range value is illegal")
dbconn = db.db
key = "DHCP_SERVER_IPV4_RANGE|" + range_name
if dbconn.exists("CONFIG_DB", key):
ctx.fail("Range {} already exist".format(range_name))
else:
dbconn.hmset("CONFIG_DB", key, {"range": ip_start + "," + ip_end})
@dhcp_server_ipv4_range.command(name="update")
@click.argument("range_name", required=True)
@click.argument("ip_start", required=True)
@click.argument("ip_end", required=False)
@clicommon.pass_db
def dhcp_server_ipv4_range_update(db, range_name, ip_start, ip_end):
ctx = click.get_current_context()
if not ip_end:
ip_end = ip_start
if not validate_str_type("ipv4-address", ip_start) or not validate_str_type("ipv4-address", ip_end):
ctx.fail("ip_start or ip_end is not valid ipv4 address")
if count_ipv4(ip_start, ip_end) < 1:
ctx.fail("range value is illegal")
dbconn = db.db
key = "DHCP_SERVER_IPV4_RANGE|" + range_name
if dbconn.exists("CONFIG_DB", key):
dbconn.set("CONFIG_DB", key, "range", ip_start + "," + ip_end)
else:
ctx.fail("Range {} does not exist, cannot update".format(range_name))
@dhcp_server_ipv4_range.command(name="del")
@click.argument("range_name", required=True)
@click.option("--force", required=False, default=False, is_flag=True)
@clicommon.pass_db
def dhcp_sever_ipv4_range_del(db, range_name, force):
ctx = click.get_current_context()
dbconn = db.db
key = "DHCP_SERVER_IPV4_RANGE|" + range_name
if dbconn.exists("CONFIG_DB", key):
if not force:
for port in dbconn.keys("CONFIG_DB", "DHCP_SERVER_IPV4_PORT*"):
ranges = dbconn.get("CONFIG_DB", port, "ranges")
if ranges and range_name in ranges.split(","):
ctx.fail("Range {} is referenced in {}, cannot delete, add --force to bypass".format(range_name, port))
dbconn.delete("CONFIG_DB", key)
else:
ctx.fail("Range {} does not exist, cannot delete".format(range_name))
def register(cli):
# cli.add_command(dhcp_server)
pass