mirror of
https://github.com/Telecominfraproject/ols-nos.git
synced 2025-11-02 02:57:45 +00:00
[docker-macsec]: MACsec CLI Plugin (#9390)
#### Why I did it
To provide MACsec config and show CLI for manipulating MACsec
#### How I did it
Add `config macsec` and `show macsec`.
#### How to verify it
This PR includes unittest for MACsec CLI, check Azp status.
- Add MACsec profile
```
admin@sonic:~$ sudo config macsec profile add --help
Usage: config macsec profile add [OPTIONS] <profile_name>
Add MACsec profile
Options:
--priority <priority> For Key server election. In 0-255 range with
0 being the highest priority. [default:
255]
--cipher_suite <cipher_suite> The cipher suite for MACsec. [default: GCM-
AES-128]
--primary_cak <primary_cak> Primary Connectivity Association Key.
[required]
--primary_ckn <primary_cak> Primary CAK Name. [required]
--policy <policy> MACsec policy. INTEGRITY_ONLY: All traffic,
except EAPOL, will be converted to MACsec
packets without encryption. SECURITY: All
traffic, except EAPOL, will be encrypted by
SecY. [default: security]
--enable_replay_protect / --disable_replay_protect
Whether enable replay protect. [default:
False]
--replay_window <enable_replay_protect>
Replay window size that is the number of
packets that could be out of order. This
field works only if ENABLE_REPLAY_PROTECT is
true. [default: 0]
--send_sci / --no_send_sci Send SCI in SecTAG field of MACsec header.
[default: True]
--rekey_period <rekey_period> The period of proactively refresh (Unit
second). [default: 0]
-?, -h, --help Show this message and exit.
```
- Delete MACsec profile
```
admin@sonic:~$ sudo config macsec profile del --help
Usage: config macsec profile del [OPTIONS] <profile_name>
Delete MACsec profile
Options:
-?, -h, --help Show this message and exit.
```
- Enable MACsec on the port
```
admin@sonic:~$ sudo config macsec port add --help
Usage: config macsec port add [OPTIONS] <port_name> <profile_name>
Add MACsec port
Options:
-?, -h, --help Show this message and exit.
```
- Disable MACsec on the port
```
admin@sonic:~$ sudo config macsec port del --help
Usage: config macsec port del [OPTIONS] <port_name>
Delete MACsec port
Options:
-?, -h, --help Show this message and exit.
```
Show MACsec
```
MACsec port(Ethernet0)
--------------------- -----------
cipher_suite GCM-AES-256
enable true
enable_encrypt true
enable_protect true
enable_replay_protect false
replay_window 0
send_sci true
--------------------- -----------
MACsec Egress SC (5254008f4f1c0001)
----------- -
encoding_an 2
----------- -
MACsec Egress SA (1)
------------------------------------- ----------------------------------------------------------------
auth_key 849B69D363E2B0AA154BEBBD7C1D9487
next_pn 1
sak AE8C9BB36EA44B60375E84BC8E778596289E79240FDFA6D7BA33D3518E705A5E
salt 000000000000000000000000
ssci 0
SAI_MACSEC_SA_ATTR_CURRENT_XPN 179
SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OCTETS_PROTECTED 0
SAI_MACSEC_SA_STAT_OUT_PKTS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OUT_PKTS_PROTECTED 0
------------------------------------- ----------------------------------------------------------------
MACsec Egress SA (2)
------------------------------------- ----------------------------------------------------------------
auth_key 5A8B8912139551D3678B43DD0F10FFA5
next_pn 1
sak 7F2651140F12C434F782EF9AD7791EE2CFE2BF315A568A48785E35FC803C9DB6
salt 000000000000000000000000
ssci 0
SAI_MACSEC_SA_ATTR_CURRENT_XPN 87185
SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OCTETS_PROTECTED 0
SAI_MACSEC_SA_STAT_OUT_PKTS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OUT_PKTS_PROTECTED 0
------------------------------------- ----------------------------------------------------------------
MACsec Ingress SC (525400edac5b0001)
MACsec Ingress SA (1)
--------------------------------------- ----------------------------------------------------------------
active true
auth_key 849B69D363E2B0AA154BEBBD7C1D9487
lowest_acceptable_pn 1
sak AE8C9BB36EA44B60375E84BC8E778596289E79240FDFA6D7BA33D3518E705A5E
salt 000000000000000000000000
ssci 0
SAI_MACSEC_SA_ATTR_CURRENT_XPN 103
SAI_MACSEC_SA_STAT_IN_PKTS_DELAYED 0
SAI_MACSEC_SA_STAT_IN_PKTS_INVALID 0
SAI_MACSEC_SA_STAT_IN_PKTS_LATE 0
SAI_MACSEC_SA_STAT_IN_PKTS_NOT_USING_SA 0
SAI_MACSEC_SA_STAT_IN_PKTS_NOT_VALID 0
SAI_MACSEC_SA_STAT_IN_PKTS_OK 0
SAI_MACSEC_SA_STAT_IN_PKTS_UNCHECKED 0
SAI_MACSEC_SA_STAT_IN_PKTS_UNUSED_SA 0
SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OCTETS_PROTECTED 0
--------------------------------------- ----------------------------------------------------------------
MACsec Ingress SA (2)
--------------------------------------- ----------------------------------------------------------------
active true
auth_key 5A8B8912139551D3678B43DD0F10FFA5
lowest_acceptable_pn 1
sak 7F2651140F12C434F782EF9AD7791EE2CFE2BF315A568A48785E35FC803C9DB6
salt 000000000000000000000000
ssci 0
SAI_MACSEC_SA_ATTR_CURRENT_XPN 91824
SAI_MACSEC_SA_STAT_IN_PKTS_DELAYED 0
SAI_MACSEC_SA_STAT_IN_PKTS_INVALID 0
SAI_MACSEC_SA_STAT_IN_PKTS_LATE 0
SAI_MACSEC_SA_STAT_IN_PKTS_NOT_USING_SA 0
SAI_MACSEC_SA_STAT_IN_PKTS_NOT_VALID 0
SAI_MACSEC_SA_STAT_IN_PKTS_OK 0
SAI_MACSEC_SA_STAT_IN_PKTS_UNCHECKED 0
SAI_MACSEC_SA_STAT_IN_PKTS_UNUSED_SA 0
SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OCTETS_PROTECTED 0
--------------------------------------- ----------------------------------------------------------------
MACsec port(Ethernet1)
--------------------- -----------
cipher_suite GCM-AES-256
enable true
enable_encrypt true
enable_protect true
enable_replay_protect false
replay_window 0
send_sci true
--------------------- -----------
MACsec Egress SC (5254008f4f1c0001)
----------- -
encoding_an 1
----------- -
MACsec Egress SA (1)
------------------------------------- ----------------------------------------------------------------
auth_key 35FC8F2C81BCA28A95845A4D2A1EE6EF
next_pn 1
sak 1EC8572B75A840BA6B3833DC550C620D2C65BBDDAD372D27A1DFEB0CD786671B
salt 000000000000000000000000
ssci 0
SAI_MACSEC_SA_ATTR_CURRENT_XPN 4809
SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OCTETS_PROTECTED 0
SAI_MACSEC_SA_STAT_OUT_PKTS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OUT_PKTS_PROTECTED 0
------------------------------------- ----------------------------------------------------------------
MACsec Ingress SC (525400edac5b0001)
MACsec Ingress SA (1)
--------------------------------------- ----------------------------------------------------------------
active true
auth_key 35FC8F2C81BCA28A95845A4D2A1EE6EF
lowest_acceptable_pn 1
sak 1EC8572B75A840BA6B3833DC550C620D2C65BBDDAD372D27A1DFEB0CD786671B
salt 000000000000000000000000
ssci 0
SAI_MACSEC_SA_ATTR_CURRENT_XPN 5033
SAI_MACSEC_SA_STAT_IN_PKTS_DELAYED 0
SAI_MACSEC_SA_STAT_IN_PKTS_INVALID 0
SAI_MACSEC_SA_STAT_IN_PKTS_LATE 0
SAI_MACSEC_SA_STAT_IN_PKTS_NOT_USING_SA 0
SAI_MACSEC_SA_STAT_IN_PKTS_NOT_VALID 0
SAI_MACSEC_SA_STAT_IN_PKTS_OK 0
SAI_MACSEC_SA_STAT_IN_PKTS_UNCHECKED 0
SAI_MACSEC_SA_STAT_IN_PKTS_UNUSED_SA 0
SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED 0
SAI_MACSEC_SA_STAT_OCTETS_PROTECTED 0
--------------------------------------- ----------------------------------------------------------------
```
This commit is contained in:
181
dockers/docker-macsec/cli/config/plugins/macsec.py
Normal file
181
dockers/docker-macsec/cli/config/plugins/macsec.py
Normal file
@@ -0,0 +1,181 @@
|
||||
import click
|
||||
import utilities_common.cli as clicommon
|
||||
|
||||
#
|
||||
# 'macsec' group ('config macsec ...')
|
||||
#
|
||||
@click.group(cls=clicommon.AbbreviationGroup, name='macsec')
|
||||
def macsec():
|
||||
"""MACsec-related configuration tasks"""
|
||||
pass
|
||||
|
||||
|
||||
#
|
||||
# 'port' group ('config macsec port ...')
|
||||
#
|
||||
@macsec.group(cls=clicommon.AbbreviationGroup, name='port')
|
||||
def macsec_port():
|
||||
"""Enable MACsec or disable MACsec on the specified port"""
|
||||
pass
|
||||
|
||||
#
|
||||
# 'add' command ('config macsec port add ...')
|
||||
#
|
||||
@macsec_port.command('add')
|
||||
@click.argument('port', metavar='<port_name>', required=True)
|
||||
@click.argument('profile', metavar='<profile_name>', required=True)
|
||||
@clicommon.pass_db
|
||||
def add_port(db, port, profile):
|
||||
"""
|
||||
Add MACsec port
|
||||
"""
|
||||
ctx = click.get_current_context()
|
||||
|
||||
if clicommon.get_interface_naming_mode() == "alias":
|
||||
alias = port
|
||||
iface_alias_converter = clicommon.InterfaceAliasConverter(db)
|
||||
port = iface_alias_converter.alias_to_name(alias)
|
||||
if port is None:
|
||||
ctx.fail("cannot find port name for alias {}".format(alias))
|
||||
|
||||
profile_entry = db.cfgdb.get_entry('MACSEC_PROFILE', profile)
|
||||
if len(profile_entry) == 0:
|
||||
ctx.fail("profile {} doesn't exist".format(profile))
|
||||
|
||||
db.cfgdb.set_entry("PORT", port, {'macsec': profile})
|
||||
|
||||
|
||||
#
|
||||
# 'del' command ('config macsec port del ...')
|
||||
#
|
||||
@macsec_port.command('del')
|
||||
@click.argument('port', metavar='<port_name>', required=True)
|
||||
@clicommon.pass_db
|
||||
def del_port(db, port):
|
||||
"""
|
||||
Delete MACsec port
|
||||
"""
|
||||
ctx = click.get_current_context()
|
||||
|
||||
if clicommon.get_interface_naming_mode() == "alias":
|
||||
alias = port
|
||||
iface_alias_converter = clicommon.InterfaceAliasConverter(db)
|
||||
port = iface_alias_converter.alias_to_name(alias)
|
||||
if port is None:
|
||||
ctx.fail("cannot find port name for alias {}".format(alias))
|
||||
|
||||
db.cfgdb.set_entry("PORT", port, {'macsec': ""})
|
||||
|
||||
|
||||
#
|
||||
# 'profile' group ('config macsec profile ...')
|
||||
#
|
||||
@macsec.group(cls=clicommon.AbbreviationGroup, name='profile')
|
||||
def macsec_profile():
|
||||
pass
|
||||
|
||||
|
||||
def is_hexstring(hexstring: str):
|
||||
try:
|
||||
int(hexstring, 16)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
#
|
||||
# 'add' command ('config macsec profile add ...')
|
||||
#
|
||||
@macsec_profile.command('add')
|
||||
@click.argument('profile', metavar='<profile_name>', required=True)
|
||||
@click.option('--priority', metavar='<priority>', required=False, default=255, show_default=True, type=click.IntRange(0, 255), help="For Key server election. In 0-255 range with 0 being the highest priority.")
|
||||
@click.option('--cipher_suite', metavar='<cipher_suite>', required=False, default="GCM-AES-128", show_default=True, type=click.Choice(["GCM-AES-128", "GCM-AES-256", "GCM-AES-XPN-128", "GCM-AES-XPN-256"]), help="The cipher suite for MACsec.")
|
||||
@click.option('--primary_cak', metavar='<primary_cak>', required=True, type=str, help="Primary Connectivity Association Key.")
|
||||
@click.option('--primary_ckn', metavar='<primary_cak>', required=True, type=str, help="Primary CAK Name.")
|
||||
@click.option('--policy', metavar='<policy>', required=False, default="security", show_default=True, type=click.Choice(["integrity_only", "security"]), help="MACsec policy. INTEGRITY_ONLY: All traffic, except EAPOL, will be converted to MACsec packets without encryption. SECURITY: All traffic, except EAPOL, will be encrypted by SecY.")
|
||||
@click.option('--enable_replay_protect/--disable_replay_protect', metavar='<replay_protect>', required=False, default=False, show_default=True, is_flag=True, help="Whether enable replay protect.")
|
||||
@click.option('--replay_window', metavar='<enable_replay_protect>', required=False, default=0, show_default=True, type=click.IntRange(0, 2**32), help="Replay window size that is the number of packets that could be out of order. This field works only if ENABLE_REPLAY_PROTECT is true.")
|
||||
@click.option('--send_sci/--no_send_sci', metavar='<send_sci>', required=False, default=True, show_default=True, is_flag=True, help="Send SCI in SecTAG field of MACsec header.")
|
||||
@click.option('--rekey_period', metavar='<rekey_period>', required=False, default=0, show_default=True, type=click.IntRange(min=0), help="The period of proactively refresh (Unit second).")
|
||||
@clicommon.pass_db
|
||||
def add_profile(db, profile, priority, cipher_suite, primary_cak, primary_ckn, policy, enable_replay_protect, replay_window, send_sci, rekey_period):
|
||||
"""
|
||||
Add MACsec profile
|
||||
"""
|
||||
ctx = click.get_current_context()
|
||||
profile_entry = db.cfgdb.get_entry('MACSEC_PROFILE', profile)
|
||||
if not len(profile_entry) == 0:
|
||||
ctx.fail("{} already exists".format(profile))
|
||||
|
||||
profile_table = {}
|
||||
|
||||
profile_table["priority"] = priority
|
||||
|
||||
profile_table["cipher_suite"] = cipher_suite
|
||||
|
||||
if "128" in cipher_suite:
|
||||
if len(primary_cak) != 32:
|
||||
ctx.fail("Expect the length of CAK is 32, but got {}".format(len(primary_cak)))
|
||||
elif "256" in cipher_suite:
|
||||
if len(primary_cak) != 64:
|
||||
ctx.fail("Expect the length of CAK is 64, but got {}".format(len(primary_cak)))
|
||||
if not is_hexstring(primary_cak):
|
||||
ctx.fail("Expect the primary_cak is valid hex string")
|
||||
if not is_hexstring(primary_ckn):
|
||||
ctx.fail("Expect the primary_ckn is valid hex string")
|
||||
profile_table["primary_cak"] = primary_cak
|
||||
profile_table["primary_ckn"] = primary_ckn
|
||||
|
||||
profile_table["policy"] = policy
|
||||
|
||||
if enable_replay_protect and replay_window > 0:
|
||||
profile_table["enable_replay_protect"] = enable_replay_protect
|
||||
profile_table["replay_window"] = replay_window
|
||||
|
||||
profile_table["send_sci"] = send_sci
|
||||
|
||||
if rekey_period > 0:
|
||||
profile_table["rekey_period"] = rekey_period
|
||||
|
||||
for k, v in profile_table.items():
|
||||
if isinstance(v, bool):
|
||||
if v:
|
||||
profile_table[k] = "true"
|
||||
else:
|
||||
profile_table[k] = "false"
|
||||
else:
|
||||
profile_table[k] = str(v)
|
||||
db.cfgdb.set_entry("MACSEC_PROFILE", profile, profile_table)
|
||||
|
||||
|
||||
#
|
||||
# 'del' command ('config macsec profile del ...')
|
||||
#
|
||||
@macsec_profile.command('del')
|
||||
@click.argument('profile', metavar='<profile_name>', required=True)
|
||||
@clicommon.pass_db
|
||||
def del_profile(db, profile):
|
||||
"""
|
||||
Delete MACsec profile
|
||||
"""
|
||||
ctx = click.get_current_context()
|
||||
|
||||
profile_entry = db.cfgdb.get_entry('MACSEC_PROFILE', profile)
|
||||
if len(profile_entry) == 0:
|
||||
ctx.fail("{} doesn't exist".format(profile))
|
||||
|
||||
# Check if the profile is being used by any port
|
||||
for port in db.cfgdb.get_keys('PORT'):
|
||||
attr = db.cfgdb.get_entry('PORT', port)
|
||||
if 'macsec' in attr and attr['macsec'] == profile:
|
||||
ctx.fail("{} is being used by port {}, Please remove the MACsec from the port firstly".format(profile, port))
|
||||
|
||||
db.cfgdb.set_entry("MACSEC_PROFILE", profile, None)
|
||||
|
||||
|
||||
def register(cli):
|
||||
cli.add_command(macsec)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
macsec()
|
||||
217
dockers/docker-macsec/cli/show/plugins/show_macsec.py
Normal file
217
dockers/docker-macsec/cli/show/plugins/show_macsec.py
Normal file
@@ -0,0 +1,217 @@
|
||||
import typing
|
||||
from natsort import natsorted
|
||||
|
||||
import click
|
||||
from tabulate import tabulate
|
||||
|
||||
from swsscommon.swsscommon import SonicV2Connector
|
||||
|
||||
|
||||
DB_CONNECTOR = SonicV2Connector(use_unix_socket_path=False)
|
||||
DB_CONNECTOR.connect(DB_CONNECTOR.APPL_DB)
|
||||
DB_CONNECTOR.connect(DB_CONNECTOR.COUNTERS_DB)
|
||||
|
||||
|
||||
class MACsecAppMeta(object):
|
||||
SEPARATOR = DB_CONNECTOR.get_db_separator(DB_CONNECTOR.APPL_DB)
|
||||
|
||||
def __init__(self, *args) -> None:
|
||||
key = self.__class__.get_appl_table_name() + MACsecAppMeta.SEPARATOR + \
|
||||
MACsecAppMeta.SEPARATOR.join(args)
|
||||
self.meta = DB_CONNECTOR.get_all(
|
||||
DB_CONNECTOR.APPL_DB, key)
|
||||
if len(self.meta) == 0:
|
||||
raise ValueError("No such MACsecAppMeta: {}".format(key))
|
||||
for k, v in self.meta.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
|
||||
class MACsecCounters(object):
|
||||
def __init__(self, *args) -> None:
|
||||
key = ":".join(args)
|
||||
counters_id = DB_CONNECTOR.get(
|
||||
DB_CONNECTOR.COUNTERS_DB, self.__class__.get_counter_table_name(), key)
|
||||
counter_key = "COUNTERS:" + counters_id
|
||||
self.counters = DB_CONNECTOR.get_all(
|
||||
DB_CONNECTOR.COUNTERS_DB, counter_key)
|
||||
|
||||
|
||||
class MACsecSA(MACsecAppMeta, MACsecCounters):
|
||||
def __init__(self, port_name: str, sci: str, an: str) -> None:
|
||||
self.port_name = port_name
|
||||
self.sci = sci
|
||||
self.an = an
|
||||
MACsecAppMeta.__init__(self, port_name, sci, an)
|
||||
MACsecCounters.__init__(self, port_name, sci, an)
|
||||
|
||||
def dump_str(self) -> str:
|
||||
buffer = self.get_header()
|
||||
meta = sorted(self.meta.items(), key=lambda x: x[0])
|
||||
counters = sorted(self.counters.items(), key=lambda x: x[0])
|
||||
buffer += tabulate(meta + counters)
|
||||
buffer = "\n".join(["\t\t" + line for line in buffer.splitlines()])
|
||||
return buffer
|
||||
|
||||
|
||||
class MACsecIngressSA(MACsecSA):
|
||||
def __init__(self, port_name: str, sci: str, an: str) -> None:
|
||||
super(MACsecIngressSA, self).__init__(port_name, sci, an)
|
||||
|
||||
@classmethod
|
||||
def get_appl_table_name(cls) -> str:
|
||||
return "MACSEC_INGRESS_SA_TABLE"
|
||||
|
||||
@classmethod
|
||||
def get_counter_table_name(cls) -> str:
|
||||
return "COUNTERS_MACSEC_SA_RX_NAME_MAP"
|
||||
|
||||
def get_header(self):
|
||||
return "MACsec Ingress SA ({})\n".format(self.an)
|
||||
|
||||
|
||||
class MACsecEgressSA(MACsecSA):
|
||||
def __init__(self, port_name: str, sci: str, an: str) -> None:
|
||||
super(MACsecEgressSA, self).__init__(port_name, sci, an)
|
||||
|
||||
@classmethod
|
||||
def get_appl_table_name(cls) -> str:
|
||||
return "MACSEC_EGRESS_SA_TABLE"
|
||||
|
||||
@classmethod
|
||||
def get_counter_table_name(cls) -> str:
|
||||
return "COUNTERS_MACSEC_SA_TX_NAME_MAP"
|
||||
|
||||
def get_header(self):
|
||||
return "MACsec Egress SA ({})\n".format(self.an)
|
||||
|
||||
|
||||
class MACsecSC(MACsecAppMeta):
|
||||
def __init__(self, port_name: str, sci: str) -> None:
|
||||
self.port_name = port_name
|
||||
self.sci = sci
|
||||
super(MACsecSC, self).__init__(port_name, sci)
|
||||
|
||||
|
||||
class MACsecIngressSC(MACsecSC):
|
||||
def __init__(self, port_name: str, sci: str) -> None:
|
||||
super(MACsecIngressSC, self).__init__(port_name, sci)
|
||||
|
||||
@classmethod
|
||||
def get_appl_table_name(cls) -> str:
|
||||
return "MACSEC_INGRESS_SC_TABLE"
|
||||
|
||||
def dump_str(self) -> str:
|
||||
buffer = self.get_header()
|
||||
buffer = "\n".join(["\t" + line for line in buffer.splitlines()])
|
||||
return buffer
|
||||
|
||||
def get_header(self):
|
||||
return "MACsec Ingress SC ({})\n".format(self.sci)
|
||||
|
||||
|
||||
class MACsecEgressSC(MACsecSC):
|
||||
def __init__(self, port_name: str, sci: str) -> None:
|
||||
super(MACsecEgressSC, self).__init__(port_name, sci)
|
||||
|
||||
@classmethod
|
||||
def get_appl_table_name(cls) -> str:
|
||||
return "MACSEC_EGRESS_SC_TABLE"
|
||||
|
||||
def dump_str(self) -> str:
|
||||
buffer = self.get_header()
|
||||
buffer += tabulate(sorted(self.meta.items(), key=lambda x: x[0]))
|
||||
buffer = "\n".join(["\t" + line for line in buffer.splitlines()])
|
||||
return buffer
|
||||
|
||||
def get_header(self):
|
||||
return "MACsec Egress SC ({})\n".format(self.sci)
|
||||
|
||||
|
||||
class MACsecPort(MACsecAppMeta):
|
||||
def __init__(self, port_name: str) -> None:
|
||||
self.port_name = port_name
|
||||
super(MACsecPort, self).__init__(port_name)
|
||||
|
||||
@classmethod
|
||||
def get_appl_table_name(cls) -> str:
|
||||
return "MACSEC_PORT_TABLE"
|
||||
|
||||
def dump_str(self) -> str:
|
||||
buffer = self.get_header()
|
||||
buffer += tabulate(sorted(self.meta.items(), key=lambda x: x[0]))
|
||||
return buffer
|
||||
|
||||
def get_header(self) -> str:
|
||||
return "MACsec port({})\n".format(self.port_name)
|
||||
|
||||
|
||||
def create_macsec_obj(key: str) -> MACsecAppMeta:
|
||||
attr = key.split(":")
|
||||
try:
|
||||
if attr[0] == MACsecPort.get_appl_table_name():
|
||||
return MACsecPort(attr[1])
|
||||
elif attr[0] == MACsecIngressSC.get_appl_table_name():
|
||||
return MACsecIngressSC(attr[1], attr[2])
|
||||
elif attr[0] == MACsecEgressSC.get_appl_table_name():
|
||||
return MACsecEgressSC(attr[1], attr[2])
|
||||
elif attr[0] == MACsecIngressSA.get_appl_table_name():
|
||||
return MACsecIngressSA(attr[1], attr[2], attr[3])
|
||||
elif attr[0] == MACsecEgressSA.get_appl_table_name():
|
||||
return MACsecEgressSA(attr[1], attr[2], attr[3])
|
||||
raise TypeError("Unknown MACsec object type")
|
||||
except ValueError as e:
|
||||
return None
|
||||
|
||||
def create_macsec_objs(interface_name: str) -> typing.List[MACsecAppMeta]:
|
||||
objs = []
|
||||
objs.append(create_macsec_obj(MACsecPort.get_appl_table_name() + ":" + interface_name))
|
||||
egress_scs = DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, MACsecEgressSC.get_appl_table_name() + ":" + interface_name + ":*")
|
||||
for sc_name in natsorted(egress_scs):
|
||||
sc = create_macsec_obj(sc_name)
|
||||
if sc is None:
|
||||
continue
|
||||
objs.append(sc)
|
||||
egress_sas = DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, MACsecEgressSA.get_appl_table_name() + ":" + ":".join(sc_name.split(":")[1:]) + ":*")
|
||||
for sa_name in natsorted(egress_sas):
|
||||
sa = create_macsec_obj(sa_name)
|
||||
if sa is None:
|
||||
continue
|
||||
objs.append(sa)
|
||||
ingress_scs = DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, MACsecIngressSC.get_appl_table_name() + ":" + interface_name + ":*")
|
||||
for sc_name in natsorted(ingress_scs):
|
||||
sc = create_macsec_obj(sc_name)
|
||||
if sc is None:
|
||||
continue
|
||||
objs.append(sc)
|
||||
ingress_sas = DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, MACsecIngressSA.get_appl_table_name() + ":" + ":".join(sc_name.split(":")[1:]) + ":*")
|
||||
for sa_name in natsorted(ingress_sas):
|
||||
sa = create_macsec_obj(sa_name)
|
||||
if sa is None:
|
||||
continue
|
||||
objs.append(sa)
|
||||
return objs
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument('interface_name', required=False)
|
||||
def macsec(interface_name):
|
||||
ctx = click.get_current_context()
|
||||
objs = []
|
||||
interface_names = [name.split(":")[1] for name in DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, "MACSEC_PORT*")]
|
||||
if interface_name is not None:
|
||||
if interface_name not in interface_names:
|
||||
ctx.fail("Cannot find the port {} in MACsec port lists {}".format(interface_name, interface_names))
|
||||
else:
|
||||
interface_names = [interface_name]
|
||||
for interface_name in natsorted(interface_names):
|
||||
objs += create_macsec_objs(interface_name)
|
||||
for obj in objs:
|
||||
print(obj.dump_str())
|
||||
|
||||
|
||||
def register(cli):
|
||||
cli.add_command(macsec)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
macsec(None)
|
||||
Reference in New Issue
Block a user