extracted CLI commands, improved validation

This commit is contained in:
stone-w4tch3r
2024-08-21 16:58:32 +05:00
parent 11685fef30
commit 7d5c8a6aa2
2 changed files with 37 additions and 52 deletions

View File

@@ -1,7 +1,6 @@
import re
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional, Union
from typing import List
from shared.cli_wrapper import CLIWrapper
from shared.logger import Logger, LogSeverity
@@ -30,47 +29,35 @@ class Certificate:
class CertificateManager:
def __init__(self, logger: Logger):
self.logger = logger
self.cli_wrapper = CLIWrapper()
self._logger = logger
self._cli_wrapper = CLIWrapper()
# noinspection PyMethodMayBeStatic
def preview_list_certificates(self) -> str:
return "step-ca list certificates"
return _Commands.list_certificates()
def list_certificates(self) -> List[Certificate]:
command = self.preview_list_certificates()
output, exit_code = self.cli_wrapper.execute_command(command)
output, exit_code = self._cli_wrapper.execute_command(command)
# Parse the output and create a list of Certificate objects
certificates = []
# ... (parsing logic here)
return certificates
# noinspection PyMethodMayBeStatic
def preview_generate_certificate(self, key_name: str, key_type: KeyType, duration: int) -> str:
key_name = self.cli_wrapper.sanitize_input(key_name)
key_type = self.cli_wrapper.sanitize_input(key_type.upper())
duration = self.cli_wrapper.sanitize_input(str(duration))
allowed_key_types = [kt.upper for kt in KeyType]
if key_type not in allowed_key_types:
raise ValueError(f"Invalid key type: {key_type}, must be one of {allowed_key_types}")
if not self._is_valid_keyname(key_name):
raise ValueError(f"Invalid key name: {key_name}")
if not self._is_valid_duration(duration):
raise ValueError(f"Invalid duration: {duration}")
# TODO: extract command template into separate entity
command = f"step-ca certificate {key_name} {key_name}.crt {key_name}.key --key-type {key_type} --not-after {duration}"
command = _Commands.generate_certificate(key_name, key_type, duration)
return command
def generate_certificate(self, key_name: str, key_type: KeyType, duration: int) -> CertificateResult:
command = self.preview_generate_certificate(key_name, key_type, duration)
output, exit_code = self.cli_wrapper.execute_command(command)
def generate_certificate(self, key_name: str, key_type: KeyType, duration_in_seconds: int) -> CertificateResult:
command = self.preview_generate_certificate(key_name, key_type, duration_in_seconds)
output, exit_code = self._cli_wrapper.execute_command(command)
success = exit_code == 0
message = "Certificate generated successfully" if success else "Failed to generate certificate"
entry_id = self.logger.log_scoped(
entry_id = self._logger.log_scoped(
LogSeverity.INFO if success else LogSeverity.ERROR,
message,
CommandInfo(command, output, exit_code, "GENERATE_CERT")
@@ -82,22 +69,22 @@ class CertificateManager:
log_entry_id=entry_id,
certificate_id=key_name,
certificate_name=key_name,
expiration_date=(datetime.now() + self._parse_duration(str(duration)))
expiration_date=(datetime.now() + timedelta(seconds=duration_in_seconds)) # TODO: parse expiration date from output
)
def preview_renew_certificate(self, cert_id: str, duration: int) -> str:
cert_id = self.cli_wrapper.sanitize_input(cert_id) # TODO: validate cert_id (what format is it?)
command = f"step-ca renew {cert_id}.crt {cert_id}.key --force --expires-in {duration}s"
cert_id = self._cli_wrapper.sanitize_input(cert_id) # TODO: validate cert_id (what format is it?)
command = _Commands.renew_certificate(cert_id, duration)
return command
def renew_certificate(self, cert_id: str, duration: int) -> CertificateResult:
command = self.preview_renew_certificate(cert_id, duration)
output, exit_code = self.cli_wrapper.execute_command(command)
output, exit_code = self._cli_wrapper.execute_command(command)
success = exit_code == 0
message = "Certificate renewed successfully" if success else "Failed to renew certificate"
entry_id = self.logger.log_scoped(
entry_id = self._logger.log_scoped(
LogSeverity.INFO if success else LogSeverity.ERROR,
message,
CommandInfo(command, output, exit_code, "RENEW_CERT")
@@ -112,18 +99,18 @@ class CertificateManager:
)
def preview_revoke_certificate(self, cert_id: str) -> str:
cert_id = self.cli_wrapper.sanitize_input(cert_id) # TODO: validate cert_id (what format is it?)
command = f"step-ca revoke {cert_id}.crt"
cert_id = self._cli_wrapper.sanitize_input(cert_id) # TODO: validate cert_id (what format is it?)
command = _Commands.revoke_certificate(cert_id)
return command
def revoke_certificate(self, cert_id: str) -> CertificateResult:
command = self.preview_revoke_certificate(cert_id)
output, exit_code = self.cli_wrapper.execute_command(command)
output, exit_code = self._cli_wrapper.execute_command(command)
success = exit_code == 0
message = "Certificate revoked successfully" if success else "Failed to revoke certificate"
entry_id = self.logger.log_scoped(
entry_id = self._logger.log_scoped(
LogSeverity.INFO if success else LogSeverity.ERROR,
message,
CommandInfo(command, output, exit_code, "REVOKE_CERT")
@@ -137,23 +124,21 @@ class CertificateManager:
revocation_date=datetime.now()
)
# TODO: remove
@staticmethod
def _parse_duration(duration: str) -> timedelta:
# Parse the duration string and return a timedelta object
# This is a placeholder and would need to be implemented based on your duration format
raise NotImplementedError
@staticmethod # TODO move to api validation
def _is_valid_keyname(key_name: str) -> bool:
"""
:param key_name: Name of the key, must be alphanumeric with dashes and underscores
"""
return re.match(r"^[a-zA-Z0-9_-]+$", key_name) is not None
class _Commands:
@staticmethod
def list_certificates():
return "step-ca list certificates"
@staticmethod
def _is_valid_duration(duration_str: str) -> bool:
"""
:param duration_str: Duration in seconds, must be a positive integer
"""
return isinstance(duration_str, int) and duration_str > 0
def generate_certificate(key_name: str, key_type: KeyType, duration: int):
key_type = key_type.value
return f"step-ca certificate {key_name} {key_name}.crt {key_name}.key --key-type {key_type} --not-after {duration}"
@staticmethod
def renew_certificate(cert_id: str, duration: int):
return f"step-ca renew {cert_id}.crt {cert_id}.key --force --expires-in {duration}s"
@staticmethod
def revoke_certificate(cert_id: str):
return f"step-ca revoke {cert_id}.crt"

View File

@@ -15,7 +15,7 @@ class Certificate(BaseModel):
class CertificateGenerateRequest(BaseModel):
keyName: str
keyName: str = Field(..., pattern=r"^[a-zA-Z0-9_-]+$", description="Alphanumeric characters, dashes, and underscores only")
keyType: KeyType
duration: int = Field(..., gt=0, description="Duration in seconds")