diff --git a/tests/test_api_server.py b/tests/test_api_server.py index 7d32ffe..16e133d 100644 --- a/tests/test_api_server.py +++ b/tests/test_api_server.py @@ -1,92 +1,160 @@ -import pytest -from fastapi.testclient import TestClient +import unittest from unittest.mock import Mock, patch -from core.api_server import APIServer -from core.certificate_manager import CertificateManager -from shared.logger import Logger -from shared.models import Certificate, CertificateResult, LogEntry, LogSeverity +from fastapi.testclient import TestClient from datetime import datetime, timedelta -import uuid +from uuid import UUID -@pytest.fixture -def mock_cert_manager(): - return Mock(spec=CertificateManager) +from core.api_server import APIServer +from core.certificate_manager import CertificateManager, Certificate, CertificateResult +from shared.logger import Logger +from shared.models import KeyType, LogSeverity, CommandInfo -@pytest.fixture -def mock_logger(): - return Mock(spec=Logger) +class TestAPIServer(unittest.TestCase): + def setUp(self): + self.cert_manager_mock = Mock(spec=CertificateManager) + self.logger_mock = Mock(spec=Logger) + self.api_server = APIServer(self.cert_manager_mock, self.logger_mock, "1.0.0", 8000) + self.client = TestClient(self.api_server._app) -@pytest.fixture -def api_client(mock_cert_manager, mock_logger): - server = APIServer(mock_cert_manager, mock_logger, version="1.0", port=8000) - return TestClient(server._app) + def test_list_certificates_preview(self): + self.cert_manager_mock.preview_list_certificates.return_value = "step-ca list certificates" + response = self.client.get("/certificates?preview=true") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), {"command": "step-ca list certificates"}) -def test_list_certificates_preview(api_client, mock_cert_manager): - mock_cert_manager.preview_list_certificates.return_value = "step-cli list" - response = api_client.get("/certificates?preview=true") - assert response.status_code == 200 - assert response.json() == {"command": "step-cli list"} + def test_list_certificates(self): + mock_certs = [ + Certificate(id="cert1", name="Cert 1", status="valid", expiration_date=datetime.now() + timedelta(days=30)), + Certificate(id="cert2", name="Cert 2", status="expired", expiration_date=datetime.now() - timedelta(days=1)) + ] + self.cert_manager_mock.list_certificates.return_value = mock_certs + response = self.client.get("/certificates?preview=false") + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 2) + self.assertEqual(response.json()[0]["id"], "cert1") + self.assertEqual(response.json()[1]["id"], "cert2") -def test_list_certificates(api_client, mock_cert_manager): - mock_certs = [ - Certificate(id="1", name="cert1", status="valid", expiration_date=datetime.now() + timedelta(days=30)), - Certificate(id="2", name="cert2", status="expired", expiration_date=datetime.now() - timedelta(days=1)) - ] - mock_cert_manager.list_certificates.return_value = mock_certs - response = api_client.get("/certificates?preview=false") - assert response.status_code == 200 - assert len(response.json()) == 2 - assert response.json()[0]["id"] == "1" - assert response.json()[1]["id"] == "2" + def test_generate_certificate_preview(self): + self.cert_manager_mock.preview_generate_certificate.return_value = "step-ca certificate test test.crt test.key --key-type rsa --not-after 3600" + response = self.client.post("/certificates/generate?preview=true", json={ + "keyName": "test", + "keyType": "RSA", + "duration": 3600 + }) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), {"command": "step-ca certificate test test.crt test.key --key-type rsa --not-after 3600"}) -def test_generate_certificate_preview(api_client, mock_cert_manager): - mock_cert_manager.preview_generate_certificate.return_value = "step-cli generate" - response = api_client.post("/certificates/generate?preview=true", json={ - "keyName": "test", - "keyType": "EC", - "duration": 3600 - }) - assert response.status_code == 200 - assert response.json() == {"command": "step-cli generate"} + def test_generate_certificate(self): + mock_result = CertificateResult( + success=True, + message="Certificate generated successfully", + log_entry_id=1, + certificate_id="test", + certificate_name="test", + expiration_date=datetime.now() + timedelta(hours=1) + ) + self.cert_manager_mock.generate_certificate.return_value = mock_result + response = self.client.post("/certificates/generate?preview=false", json={ + "keyName": "test", + "keyType": "RSA", + "duration": 3600 + }) + self.assertEqual(response.status_code, 200) + self.assertTrue(response.json()["success"]) + self.assertEqual(response.json()["certificateId"], "test") -def test_generate_certificate(api_client, mock_cert_manager): - mock_result = CertificateResult( - success=True, - message="Certificate generated", - log_entry_id=1, - certificate_id="cert1", - certificate_name="test", - expiration_date=datetime.now() + timedelta(days=30) - ) - mock_cert_manager.generate_certificate.return_value = mock_result - response = api_client.post("/certificates/generate?preview=false", json={ - "keyName": "test", - "keyType": "EC", - "duration": 3600 - }) - assert response.status_code == 200 - assert response.json()["success"] == True - assert response.json()["certificateId"] == "cert1" + def test_renew_certificate_preview(self): + self.cert_manager_mock.preview_renew_certificate.return_value = "step-ca renew test.crt test.key --force --expires-in 3600s" + response = self.client.post("/certificates/renew?certId=test&duration=3600&preview=true") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), {"command": "step-ca renew test.crt test.key --force --expires-in 3600s"}) -def test_get_log_entry(api_client, mock_logger): - mock_log_entry = LogEntry( - entry_id=1, - timestamp=datetime.now(), - severity=LogSeverity.INFO, - message="Test log", - trace_id=uuid.uuid4(), - command_info=None - ) - mock_logger.get_log_entry.return_value = mock_log_entry - response = api_client.get("/logs/single?logId=1") - assert response.status_code == 200 - assert response.json()["entryId"] == 1 - assert response.json()["severity"] == "INFO" + def test_renew_certificate(self): + mock_result = CertificateResult( + success=True, + message="Certificate renewed successfully", + log_entry_id=2, + certificate_id="test", + new_expiration_date=datetime.now() + timedelta(hours=1) + ) + self.cert_manager_mock.renew_certificate.return_value = mock_result + response = self.client.post("/certificates/renew?certId=test&duration=3600&preview=false") + self.assertEqual(response.status_code, 200) + self.assertTrue(response.json()["success"]) + self.assertEqual(response.json()["certificateId"], "test") -def test_get_log_entry_not_found(api_client, mock_logger): - mock_logger.get_log_entry.return_value = None - response = api_client.get("/logs/single?logId=999") - assert response.status_code == 404 - assert response.json()["detail"] == "Log entry not found" + def test_revoke_certificate_preview(self): + self.cert_manager_mock.preview_revoke_certificate.return_value = "step-ca revoke test.crt" + response = self.client.post("/certificates/revoke?certId=test&preview=true") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), {"command": "step-ca revoke test.crt"}) -# Add more tests for other endpoints (renew, revoke, get_logs) following similar patterns + def test_revoke_certificate(self): + mock_result = CertificateResult( + success=True, + message="Certificate revoked successfully", + log_entry_id=3, + certificate_id="test", + revocation_date=datetime.now() + ) + self.cert_manager_mock.revoke_certificate.return_value = mock_result + response = self.client.post("/certificates/revoke?certId=test&preview=false") + self.assertEqual(response.status_code, 200) + self.assertTrue(response.json()["success"]) + self.assertEqual(response.json()["certificateId"], "test") + + def test_get_log_entry(self): + mock_log_entry = Mock( + entry_id=1, + timestamp=datetime.now(), + severity=LogSeverity.INFO, + message="Test log entry", + trace_id=UUID("12345678-1234-5678-1234-567812345678"), + command_info=CommandInfo(command="test command", output="test output", exit_code=0, action="TEST") + ) + self.logger_mock.get_log_entry.return_value = mock_log_entry + response = self.client.get("/logs/single?logId=1") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["entryId"], 1) + self.assertEqual(response.json()["message"], "Test log entry") + + def test_get_log_entry_not_found(self): + self.logger_mock.get_log_entry.return_value = None + response = self.client.get("/logs/single?logId=999") + self.assertEqual(response.status_code, 404) + self.assertEqual(response.json()["detail"], "Log entry not found") + + def test_get_logs(self): + mock_log_entries = [ + Mock( + entry_id=1, + timestamp=datetime.now(), + severity=LogSeverity.INFO, + message="Test log entry 1", + trace_id=UUID("12345678-1234-5678-1234-567812345678"), + command_info=None + ), + Mock( + entry_id=2, + timestamp=datetime.now(), + severity=LogSeverity.ERROR, + message="Test log entry 2", + trace_id=UUID("87654321-4321-8765-4321-876543210987"), + command_info=CommandInfo(command="test command", output="test output", exit_code=1, action="TEST") + ) + ] + self.logger_mock.get_logs.return_value = mock_log_entries + response = self.client.post("/logs", json={ + "traceId": None, + "commandsOnly": False, + "severity": None, + "page": 1, + "pageSize": 10 + }) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 2) + self.assertEqual(response.json()[0]["entryId"], 1) + self.assertEqual(response.json()[1]["entryId"], 2) + +if __name__ == '__main__': + unittest.main()