From cf3ec228e24bb4b651446f177b761ce81f008e64 Mon Sep 17 00:00:00 2001 From: stone-w4tch3r <100294019+stone-w4tch3r@users.noreply.github.com> Date: Sun, 18 Aug 2024 14:07:20 +0500 Subject: [PATCH] Core & Shared python code added --- README.md | 30 +++++--- app/log_handler.py | 38 ---------- core/api_server.py | 87 +++++++++++++++++++++ core/certificate_manager.py | 147 ++++++++++++++++++++++++++++++++++++ core/main.py | 23 ++++++ requirements.txt | 1 + shared/cli_wrapper.py | 17 +++++ shared/logger.py | 55 ++++++++++++++ shared/models.py | 30 ++++++++ 9 files changed, 379 insertions(+), 49 deletions(-) delete mode 100644 app/log_handler.py create mode 100644 core/api_server.py create mode 100644 core/certificate_manager.py create mode 100644 core/main.py create mode 100644 shared/cli_wrapper.py create mode 100644 shared/logger.py create mode 100644 shared/models.py diff --git a/README.md b/README.md index 570330f..6f85fdc 100644 --- a/README.md +++ b/README.md @@ -248,15 +248,22 @@ paths: /certificates: get: summary: List all certificates + parameters: + - name: preview + in: query + schema: + type: boolean responses: '200': description: Successful response content: - application/json: + application/json: schema: - type: array - items: - $ref: '#/components/schemas/Certificate' + oneOf: + - type: array + items: + $ref: '#/components/schemas/Certificate' + - $ref: '#/components/schemas/CommandPreview' /certificates/generate: post: @@ -362,13 +369,13 @@ paths: type: array items: type: string - enum: [DEBUG, INFO, WARN, ERROR] + enum: [ DEBUG, INFO, WARN, ERROR ] - name: traceId in: query schema: type: string description: "UUID format" - - name: commands_only + - name: commandsOnly in: query schema: type: boolean @@ -401,7 +408,7 @@ components: type: string status: type: string - enum: [Active, Expired, Revoked] + enum: [ Active, Expired, Revoked ] expirationDate: type: string format: date-time @@ -413,7 +420,7 @@ components: type: string keyType: type: string - enum: [RSA, ECDSA] + enum: [ RSA, ECDSA ] duration: type: string @@ -480,7 +487,7 @@ components: format: date-time severity: type: string - enum: [DEBUG, INFO, WARN, ERROR] + enum: [ DEBUG, INFO, WARN, ERROR ] message: type: string traceId: @@ -502,7 +509,7 @@ components: type: integer ``` -### Class Diagram +### Class Diagram ```puml @startuml @@ -683,4 +690,5 @@ project_root/ --- -# TODO \ No newline at end of file +# TODO +- [ ] Adjust class diagram after finalizing the project \ No newline at end of file diff --git a/app/log_handler.py b/app/log_handler.py deleted file mode 100644 index 7fa0394..0000000 --- a/app/log_handler.py +++ /dev/null @@ -1,38 +0,0 @@ -import json -from datetime import datetime - - -class LogHandler: - def __init__(self, log_file="logs/command_history.json"): - self.log_file = log_file - - def log_command(self, command, output, status): - log_entry = { - "timestamp": datetime.now().isoformat(), - "command": command, - "output": output, - "status": status - } - with open(self.log_file, 'a') as file: - json.dump(log_entry, file) - file.write('\n') - - def get_logs(self, filter_params=None): - logs = [] - with open(self.log_file, 'r') as file: - for line in file: - log = json.loads(line) - if self._matches_filter(log, filter_params): - logs.append(log) - return logs - - def get_command_history(self): - return self.get_logs() - - def _matches_filter(self, log, filter_params): - if not filter_params: - return True - for key, value in filter_params.items(): - if key in log and log[key] != value: - return False - return True diff --git a/core/api_server.py b/core/api_server.py new file mode 100644 index 0000000..32a5d1a --- /dev/null +++ b/core/api_server.py @@ -0,0 +1,87 @@ +from flask import Flask, request, jsonify, Response +from typing import Dict, List, Union, Tuple +from uuid import uuid4 +from certificate_manager import CertificateManager +from shared.logger import Logger, LogSeverity +from shared.models import CommandInfo + + +class APIServer: + _app = Flask(__name__) + + def __init__(self, cert_manager: CertificateManager, logger: Logger): + self.cert_manager = cert_manager + self.logger = logger + + self._app.add_url_rule('/certificates', 'list_certificates', self.list_certificates, methods=['GET']) + self._app.add_url_rule('/certificates/generate', 'generate_certificate', self.generate_certificate, methods=['POST']) + self._app.add_url_rule('/certificates/renew', 'renew_certificate', self.renew_certificate, methods=['POST']) + self._app.add_url_rule('/certificates/revoke', 'revoke_certificate', self.revoke_certificate, methods=['POST']) + self._app.add_url_rule('/logs', 'get_logs', self.get_logs, methods=['GET']) + self._app.add_url_rule('/logs/single', 'get_log_entry', self.get_log_entry, methods=['GET']) + + def run(self): + self._app.run(host='0.0.0.0', port=5000) + + def list_certificates(self) -> Response: + preview = request.args.get('preview', 'false').lower() == 'true' + + if preview: + command = self.cert_manager.preview_list_certificates() + return jsonify({"command": command}) + else: + certificates = self.cert_manager.list_certificates() + return jsonify(certificates) + + def generate_certificate(self) -> Response: + params = request.json # TODO explicit flask params + preview = request.args.get('preview', 'false').lower() == 'true' + + if preview: + command = self.cert_manager.preview_generate_certificate(params) + return jsonify({"command": command}) + else: + result = self.cert_manager.generate_certificate(params) + return jsonify(result) + + def renew_certificate(self) -> Response: + cert_id = request.args.get('certId') + duration = int(request.args.get('duration')) + preview = request.args.get('preview', 'false').lower() == 'true' + + if preview: + command = self.cert_manager.preview_renew_certificate(cert_id, duration) + return jsonify({"command": command}) + else: + result = self.cert_manager.renew_certificate(cert_id, duration) + return jsonify(result) + + def revoke_certificate(self) -> Response: + cert_id = request.args.get('certId') + preview = request.args.get('preview', 'false').lower() == 'true' + + if preview: + command = self.cert_manager.preview_revoke_certificate(cert_id) + return jsonify({"command": command}) + else: + result = self.cert_manager.revoke_certificate(cert_id) + return jsonify(result) + + def get_logs(self) -> Response: + filters = { + 'severity': request.args.getlist('severity'), + 'traceId': request.args.get('traceId'), + 'commands_only': request.args.get('commandsOnly', 'false').lower() == 'true', + 'page': int(request.args.get('page', 1)), + 'pageSize': int(request.args.get('pageSize', 20)) + } + logs = self.logger.get_logs(filters) + return jsonify(logs) + + def get_log_entry(self) -> Response | tuple[Response, int]: + log_id = int(request.args.get('logId')) + log_entry = self.logger.get_log_entry(log_id) + if log_entry: + return jsonify(log_entry) + else: + return jsonify({"error": "Log entry not found"}), 404 diff --git a/core/certificate_manager.py b/core/certificate_manager.py new file mode 100644 index 0000000..95ba38f --- /dev/null +++ b/core/certificate_manager.py @@ -0,0 +1,147 @@ +import re +from typing import List, Dict, Union +from datetime import datetime, timedelta +from uuid import uuid4 +from shared.cli_wrapper import CLIWrapper +from shared.logger import Logger, LogSeverity +from shared.models import CommandInfo + + +class CertificateManager: + def __init__(self, logger: Logger): + self.logger = logger + self.cli_wrapper = CLIWrapper() + + # noinspection PyMethodMayBeStatic + def preview_list_certificates(self) -> str: + return "step-ca list certificates" + + def list_certificates(self) -> List[Dict[str, Union[str, datetime]]]: + command = self.preview_list_certificates() + output, exit_code = self.cli_wrapper.execute_command(command) + + # Parse the output and create a list of certificate dictionaries + certificates = [] + # ... (parsing logic here) + + return certificates + + # TODO: Add type hints for params + def preview_generate_certificate(self, params: Dict[str, str]) -> str: + key_name = self.cli_wrapper.sanitize_input(params['keyName']) + key_type = self.cli_wrapper.sanitize_input(params['keyType']) + duration = self.cli_wrapper.sanitize_input(params['duration']) + + allowed_key_types = ["RSA", "ECDSA", "Ed25519"] + if key_type not in allowed_key_types: + raise ValueError(f"Invalid key type: {key_type}, must be one of {allowed_key_types}") + if not self._is_valid_keyname(key_name): + raise ValueError(f"Invalid key name: {key_name}") + if not self._is_valid_duration(duration): + raise ValueError(f"Invalid duration: {duration}") + + # TODO: extract command template into separate entity + command = f"step-ca certificate {key_name} {key_name}.crt {key_name}.key --key-type {key_type} --not-after {duration}" + return command + + # TODO: Add type hints for params + def generate_certificate(self, params: Dict[str, str]) -> Dict[str, Union[bool, str, datetime]]: + command = self.preview_generate_certificate(params) + output, exit_code = self.cli_wrapper.execute_command(command) + + success = exit_code == 0 + message = "Certificate generated successfully" if success else "Failed to generate certificate" + + trace_id = uuid4() # TODO: use scoped logging + self.logger.log( + LogSeverity.INFO if success else LogSeverity.ERROR, + message, + trace_id, + CommandInfo(command, output, exit_code, "GENERATE_CERT") + ) + + return { # TODO: extract to dataclass or namedtuple or typed dict + "success": success, + "message": message, + "logEntryId": str(trace_id), + "certificateId": params['keyName'], + "certificateName": params['keyName'], + "expirationDate": (datetime.now() + self._parse_duration(params['duration'])).isoformat() # TODO: remove _parse_duration + } + + def preview_renew_certificate(self, cert_id: str, duration: int) -> str: + cert_id = self.cli_wrapper.sanitize_input(cert_id) # TODO: validate cert_id (what format is it?) + command = f"step-ca renew {cert_id}.crt {cert_id}.key --force --expires-in {duration}s" + return command + + def renew_certificate(self, cert_id: str, duration: int) -> Dict[str, Union[bool, str, datetime]]: + command = self.preview_renew_certificate(cert_id, duration) + output, exit_code = self.cli_wrapper.execute_command(command) + + success = exit_code == 0 + message = "Certificate renewed successfully" if success else "Failed to renew certificate" + + trace_id = uuid4() # TODO: use scoped logging + self.logger.log( + LogSeverity.INFO if success else LogSeverity.ERROR, + message, + trace_id, + CommandInfo(command, output, exit_code, "RENEW_CERT") + ) + + return { + "success": success, + "message": message, + "logEntryId": str(trace_id), + "certificateId": cert_id, + "newExpirationDate": (datetime.now() + timedelta(seconds=duration)).isoformat() + } + + def preview_revoke_certificate(self, cert_id: str) -> str: + cert_id = self.cli_wrapper.sanitize_input(cert_id) # TODO: validate cert_id (what format is it?) + command = f"step-ca revoke {cert_id}.crt" + return command + + def revoke_certificate(self, cert_id: str) -> Dict[str, Union[bool, str, datetime]]: + command = self.preview_revoke_certificate(cert_id) + output, exit_code = self.cli_wrapper.execute_command(command) + + success = exit_code == 0 + message = "Certificate revoked successfully" if success else "Failed to revoke certificate" + + trace_id = uuid4() # TODO: use scoped logging + self.logger.log( + LogSeverity.INFO if success else LogSeverity.ERROR, + message, + trace_id, + CommandInfo(command, output, exit_code, "REVOKE_CERT") + ) + + return { + "success": success, + "message": message, + "logEntryId": str(trace_id), + "certificateId": cert_id, + "revocationDate": datetime.now().isoformat() + } + + # TODO: remove + @staticmethod + def _parse_duration(duration: str) -> timedelta: + # Parse the duration string and return a timedelta object + # This is a placeholder and would need to be implemented based on your duration format + raise NotImplementedError + + @staticmethod + def _is_valid_keyname(key_name: str) -> bool: + """ + :param key_name: Name of the key, must be alphanumeric with dashes and underscores + """ + return re.match(r"^[a-zA-Z0-9_-]+$", key_name) is not None + + @staticmethod + def _is_valid_duration(duration_str: str) -> bool: + """ + :param duration_str: Duration in seconds, must be a positive integer + """ + return isinstance(duration_str, int) and duration_str > 0 diff --git a/core/main.py b/core/main.py new file mode 100644 index 0000000..51ab878 --- /dev/null +++ b/core/main.py @@ -0,0 +1,23 @@ +from api_server import APIServer +from certificate_manager import CertificateManager +from shared.logger import Logger + + +class MainApplication: + def __init__(self): + self.logger = Logger("app.log") + self.cert_manager = CertificateManager(self.logger) + self.api_server = APIServer(self.cert_manager, self.logger) + + def initialize(self): # TODO remove + # Perform any necessary initialization + pass + + def run(self): + self.initialize() + self.api_server.run() + + +if __name__ == "__main__": + app = MainApplication() + app.run() diff --git a/requirements.txt b/requirements.txt index e69de29..a32a733 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1 @@ +Flask~=3.0.3 \ No newline at end of file diff --git a/shared/cli_wrapper.py b/shared/cli_wrapper.py new file mode 100644 index 0000000..d534d12 --- /dev/null +++ b/shared/cli_wrapper.py @@ -0,0 +1,17 @@ +import subprocess +import shlex +from typing import List, Tuple + + +class CLIWrapper: + @staticmethod + def sanitize_input(input_str: str) -> str: + return shlex.quote(input_str) + + @staticmethod + def execute_command(command: str) -> Tuple[str, int]: + try: + result = subprocess.run(command, shell=True, check=True, text=True, capture_output=True) + return result.stdout, result.returncode + except subprocess.CalledProcessError as e: + return e.stdout, e.returncode diff --git a/shared/logger.py b/shared/logger.py new file mode 100644 index 0000000..070d32f --- /dev/null +++ b/shared/logger.py @@ -0,0 +1,55 @@ +import logging +from typing import Dict, List, Optional +from datetime import datetime +from uuid import UUID +from .models import LogEntry, LogSeverity, CommandInfo + + +class Logger: + def __init__(self, log_file: str): + self.log_file = log_file + logging.basicConfig(filename=log_file, level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + + def log( + self, severity: LogSeverity, message: str, trace_id: UUID, + command_info: Optional[CommandInfo] = None + ) -> None: + log_entry = LogEntry( + entry_id=self._get_next_entry_id(), + timestamp=datetime.now(), + severity=severity, + message=message, + trace_id=trace_id, + command_info=command_info + ) + self._write_log_entry(log_entry) + + def get_logs(self, filters: Dict) -> List[LogEntry]: + # Implementation for retrieving logs based on filters + # This is a placeholder and would need to be implemented based on your storage mechanism + raise NotImplementedError + + def get_log_entry(self, log_id: int) -> Optional[LogEntry]: + # Implementation for retrieving a single log entry + # This is a placeholder and would need to be implemented based on your storage mechanism + raise NotImplementedError + + def _write_log_entry(self, log_entry: LogEntry) -> None: + log_message = f"{log_entry.timestamp} - {log_entry.severity.name} - {log_entry.message} - Trace ID: {log_entry.trace_id}" + if log_entry.command_info: + log_message += f" - Command: {log_entry.command_info.command}" + logging.log(self._get_logging_level(log_entry.severity), log_message) + + def _get_next_entry_id(self) -> int: + # Implementation for generating the next entry ID + # This is a placeholder and would need to be implemented based on your storage mechanism + raise NotImplementedError + + @staticmethod + def _get_logging_level(severity: LogSeverity) -> int: # TODO simplify + return { + LogSeverity.DEBUG: logging.DEBUG, + LogSeverity.INFO: logging.INFO, + LogSeverity.WARN: logging.WARNING, + LogSeverity.ERROR: logging.ERROR + }[severity] diff --git a/shared/models.py b/shared/models.py new file mode 100644 index 0000000..3b82b14 --- /dev/null +++ b/shared/models.py @@ -0,0 +1,30 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional +from datetime import datetime +from uuid import UUID + + +class LogSeverity(Enum): + DEBUG = "DEBUG" + INFO = "INFO" + WARN = "WARN" + ERROR = "ERROR" + + +@dataclass +class CommandInfo: + command: str + output: str + exit_code: int + action: str + + +@dataclass +class LogEntry: + entry_id: int + timestamp: datetime + severity: LogSeverity + message: str + trace_id: UUID + command_info: Optional[CommandInfo] = None