mirror of
https://github.com/outbackdingo/step-ca-webui.git
synced 2026-01-27 18:20:22 +00:00
124 lines
4.8 KiB
Python
124 lines
4.8 KiB
Python
from datetime import datetime, timedelta
|
|
from typing import List
|
|
|
|
from core.certificate_manager_interface import ICertificateManager, CertificateResult, Certificate
|
|
from shared.cli_wrapper import CLIWrapper
|
|
from shared.logger import Logger, LogSeverity
|
|
from shared.models import CommandInfo, KeyType
|
|
|
|
|
|
class CertificateManager(ICertificateManager):
|
|
def __init__(self, logger: Logger):
|
|
self._logger = logger
|
|
self._cli_wrapper = CLIWrapper()
|
|
|
|
def preview_list_certificates(self) -> str:
|
|
return _Commands.list_certificates()
|
|
|
|
def list_certificates(self) -> List[Certificate]:
|
|
command = self.preview_list_certificates()
|
|
output, exit_code = self._cli_wrapper.execute_command(command)
|
|
|
|
# Parse the output and create a list of Certificate objects
|
|
certificates = []
|
|
# ... (parsing logic here)
|
|
return certificates
|
|
|
|
def preview_generate_certificate(self, key_name: str, key_type: KeyType, duration: int) -> str:
|
|
command = _Commands.generate_certificate(key_name, key_type, duration)
|
|
return command
|
|
|
|
def generate_certificate(self, key_name: str, key_type: KeyType, duration_in_seconds: int) -> CertificateResult:
|
|
command = self.preview_generate_certificate(key_name, key_type, duration_in_seconds)
|
|
output, exit_code = self._cli_wrapper.execute_command(command)
|
|
|
|
success = exit_code == 0
|
|
message = "Certificate generated successfully" if success else "Failed to generate certificate"
|
|
|
|
entry_id = self._logger.log(
|
|
LogSeverity.INFO if success else LogSeverity.ERROR,
|
|
message,
|
|
CommandInfo(coommand=command, output=output, exit_code=exit_code, action="GENERATE_CERT")
|
|
)
|
|
|
|
return CertificateResult(
|
|
success=success,
|
|
message=message,
|
|
log_entry_id=entry_id,
|
|
certificate_id=key_name,
|
|
certificate_name=key_name,
|
|
expiration_date=(datetime.now() + timedelta(seconds=duration_in_seconds))
|
|
# TODO: parse expiration date from output
|
|
)
|
|
|
|
def preview_renew_certificate(self, cert_id: str, duration: int) -> str:
|
|
cert_id = self._cli_wrapper.sanitize_input(cert_id) # TODO: validate cert_id (what format is it?)
|
|
command = _Commands.renew_certificate(cert_id, duration)
|
|
return command
|
|
|
|
def renew_certificate(self, cert_id: str, duration: int) -> CertificateResult:
|
|
command = self.preview_renew_certificate(cert_id, duration)
|
|
output, exit_code = self._cli_wrapper.execute_command(command)
|
|
|
|
success = exit_code == 0
|
|
message = "Certificate renewed successfully" if success else "Failed to renew certificate"
|
|
|
|
entry_id = self._logger.log(
|
|
LogSeverity.INFO if success else LogSeverity.ERROR,
|
|
message,
|
|
CommandInfo(comand=command, output=output, exit_code=exit_code, action="RENEW_CERT")
|
|
)
|
|
|
|
return CertificateResult(
|
|
success=success,
|
|
message=message,
|
|
log_entry_id=entry_id,
|
|
certificate_id=cert_id,
|
|
new_expiration_date=(datetime.now() + timedelta(seconds=duration))
|
|
)
|
|
|
|
def preview_revoke_certificate(self, cert_id: str) -> str:
|
|
cert_id = self._cli_wrapper.sanitize_input(cert_id) # TODO: validate cert_id (what format is it?)
|
|
command = _Commands.revoke_certificate(cert_id)
|
|
return command
|
|
|
|
def revoke_certificate(self, cert_id: str) -> CertificateResult:
|
|
command = self.preview_revoke_certificate(cert_id)
|
|
output, exit_code = self._cli_wrapper.execute_command(command)
|
|
|
|
success = exit_code == 0
|
|
message = "Certificate revoked successfully" if success else "Failed to revoke certificate"
|
|
|
|
entry_id = self._logger.log(
|
|
LogSeverity.INFO if success else LogSeverity.ERROR,
|
|
message,
|
|
CommandInfo(command=command, output=output, exit_code=exit_code, action="REVOKE_CERT")
|
|
)
|
|
|
|
return CertificateResult(
|
|
success=success,
|
|
message=message,
|
|
log_entry_id=entry_id,
|
|
certificate_id=cert_id,
|
|
revocation_date=datetime.now()
|
|
)
|
|
|
|
|
|
class _Commands:
|
|
@staticmethod
|
|
def list_certificates():
|
|
return "step-ca list certificates"
|
|
|
|
@staticmethod
|
|
def generate_certificate(key_name: str, key_type: KeyType, duration: int):
|
|
key_type = key_type.value
|
|
return f"step-ca certificate {key_name} {key_name}.crt {key_name}.key --key-type {key_type} --not-after {duration}"
|
|
|
|
@staticmethod
|
|
def renew_certificate(cert_id: str, duration: int):
|
|
return f"step-ca renew {cert_id}.crt {cert_id}.key --force --expires-in {duration}s"
|
|
|
|
@staticmethod
|
|
def revoke_certificate(cert_id: str):
|
|
return f"step-ca revoke {cert_id}.crt"
|