From 0891ed0588948ca1fcd7d190fc189ff676d99f13 Mon Sep 17 00:00:00 2001 From: stone-w4tch3r <100294019+stone-w4tch3r@users.noreply.github.com> Date: Tue, 20 Aug 2024 22:14:37 +0500 Subject: [PATCH] scoped logging --- README.md | 3 ++- core/api_server.py | 46 ++++++++++++++++++++++++++++++++----- core/certificate_manager.py | 18 +++++---------- shared/logger.py | 33 ++++++++++++++++++++++---- 4 files changed, 77 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 9110837..c502aea 100644 --- a/README.md +++ b/README.md @@ -169,4 +169,5 @@ project_root/ --- # TODO -- [ ] Adjust class diagram after finalizing the project \ No newline at end of file +- [ ] Adjust class diagram after finalizing the project +- [ ] Prevent simultaneous actions calls \ No newline at end of file diff --git a/core/api_server.py b/core/api_server.py index 32a5d1a..a87b254 100644 --- a/core/api_server.py +++ b/core/api_server.py @@ -1,15 +1,38 @@ -from flask import Flask, request, jsonify, Response -from typing import Dict, List, Union, Tuple -from uuid import uuid4 +import uuid +from contextlib import contextmanager + +from flask import Flask, request, jsonify, Response, g + from certificate_manager import CertificateManager -from shared.logger import Logger, LogSeverity -from shared.models import CommandInfo +from shared.logger import Logger + + +@contextmanager +def _logging_scope(): + """ + Context manager for setting up a unique trace ID for each request in a Flask application. + + This function ensures that each request has a unique trace ID, which can be used for logging. + It generates a unique UUID and assigns it to the Flask `g` (global) object as `trace_id`. + + Usage: + with _logging_scope(): + # Code within this block can access g.trace_id + pass + # After this block, the finally part of _logging_scope is executed + """ + g.trace_id = str(uuid.uuid4()) + try: + yield + finally: + pass class APIServer: - _app = Flask(__name__) def __init__(self, cert_manager: CertificateManager, logger: Logger): + self._app = Flask(__name__) + self.cert_manager = cert_manager self.logger = logger @@ -20,6 +43,17 @@ class APIServer: self._app.add_url_rule('/logs', 'get_logs', self.get_logs, methods=['GET']) self._app.add_url_rule('/logs/single', 'get_log_entry', self.get_log_entry, methods=['GET']) + @self._app.before_request + def set_logging_scope(): + g.logging_scope = _logging_scope() + g.logging_scope.__enter__() + + @self._app.after_request + def clear_logging_scope(response): + if hasattr(g, 'logging_scope'): + g.logging_scope.__exit__(None, None, None) + return response + def run(self): self._app.run(host='0.0.0.0', port=5000) diff --git a/core/certificate_manager.py b/core/certificate_manager.py index 95ba38f..9952386 100644 --- a/core/certificate_manager.py +++ b/core/certificate_manager.py @@ -52,18 +52,16 @@ class CertificateManager: success = exit_code == 0 message = "Certificate generated successfully" if success else "Failed to generate certificate" - trace_id = uuid4() # TODO: use scoped logging - self.logger.log( + entry_id = self.logger.log_scoped( LogSeverity.INFO if success else LogSeverity.ERROR, message, - trace_id, CommandInfo(command, output, exit_code, "GENERATE_CERT") ) return { # TODO: extract to dataclass or namedtuple or typed dict "success": success, "message": message, - "logEntryId": str(trace_id), + "logEntryId": str(entry_id), "certificateId": params['keyName'], "certificateName": params['keyName'], "expirationDate": (datetime.now() + self._parse_duration(params['duration'])).isoformat() # TODO: remove _parse_duration @@ -81,18 +79,16 @@ class CertificateManager: success = exit_code == 0 message = "Certificate renewed successfully" if success else "Failed to renew certificate" - trace_id = uuid4() # TODO: use scoped logging - self.logger.log( + entry_id = self.logger.log_scoped( LogSeverity.INFO if success else LogSeverity.ERROR, message, - trace_id, CommandInfo(command, output, exit_code, "RENEW_CERT") ) return { "success": success, "message": message, - "logEntryId": str(trace_id), + "logEntryId": str(entry_id), "certificateId": cert_id, "newExpirationDate": (datetime.now() + timedelta(seconds=duration)).isoformat() } @@ -109,18 +105,16 @@ class CertificateManager: success = exit_code == 0 message = "Certificate revoked successfully" if success else "Failed to revoke certificate" - trace_id = uuid4() # TODO: use scoped logging - self.logger.log( + entry_id = self.logger.log_scoped( LogSeverity.INFO if success else LogSeverity.ERROR, message, - trace_id, CommandInfo(command, output, exit_code, "REVOKE_CERT") ) return { "success": success, "message": message, - "logEntryId": str(trace_id), + "logEntryId": str(entry_id), "certificateId": cert_id, "revocationDate": datetime.now().isoformat() } diff --git a/shared/logger.py b/shared/logger.py index 070d32f..e83af3a 100644 --- a/shared/logger.py +++ b/shared/logger.py @@ -1,7 +1,10 @@ import logging -from typing import Dict, List, Optional from datetime import datetime +from typing import Dict, List, Optional from uuid import UUID + +from flask import g + from .models import LogEntry, LogSeverity, CommandInfo @@ -10,10 +13,13 @@ class Logger: self.log_file = log_file logging.basicConfig(filename=log_file, level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') - def log( - self, severity: LogSeverity, message: str, trace_id: UUID, + def log_scoped( + self, + severity: LogSeverity, + message: str, command_info: Optional[CommandInfo] = None - ) -> None: + ) -> int: + trace_id = g.trace_id if hasattr(g, 'trace_id') else "AUTO_SCOPE_BROKEN" log_entry = LogEntry( entry_id=self._get_next_entry_id(), timestamp=datetime.now(), @@ -23,6 +29,25 @@ class Logger: command_info=command_info ) self._write_log_entry(log_entry) + return log_entry.entry_id + + def log_with_trace( + self, + severity: LogSeverity, + message: str, + trace_id: UUID, + command_info: Optional[CommandInfo] = None + ) -> int: + log_entry = LogEntry( + entry_id=self._get_next_entry_id(), + timestamp=datetime.now(), + severity=severity, + message=message, + trace_id=trace_id, + command_info=command_info + ) + self._write_log_entry(log_entry) + return log_entry.entry_id def get_logs(self, filters: Dict) -> List[LogEntry]: # Implementation for retrieving logs based on filters