diff --git a/backend/tests/test_api.py b/backend/tests/test_api.py new file mode 100644 index 0000000..94ad060 --- /dev/null +++ b/backend/tests/test_api.py @@ -0,0 +1,373 @@ +"""Tests for FastAPI REST API endpoints.""" + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from datetime import datetime +from unittest.mock import MagicMock, AsyncMock, patch + +from backend.app.models import RecorderConfig, PortStatus, SCTE35Marker, CodecType +from backend.app.recorders.recorder import RecorderManager +from backend.app.recorders.scte35 import SCTE35Manager +from backend.app.api.routes import router +from backend.app.api import routes as routes_module + + +@pytest.fixture +def app(): + """Create test FastAPI app with mocked managers.""" + test_app = FastAPI() + test_app.include_router(router) + return test_app + + +@pytest.fixture +def client(app): + """Create TestClient for the app.""" + return TestClient(app) + + +@pytest.fixture +def mock_recorder_manager(): + """Create a mock RecorderManager.""" + manager = MagicMock(spec=RecorderManager) + + # Mock default status for all ports + port_statuses = [ + PortStatus( + port_index=i, + is_recording=False, + frame_count=0, + fps=0.0, + bitrate_mbps=0.0, + uptime_seconds=0, + current_file="/recordings/port_{}.ts".format(i), + codec=CodecType.H264, + ) + for i in range(4) + ] + + manager.get_all_status.return_value = port_statuses + manager.get_status.return_value = port_statuses[0] + manager.start_recording = AsyncMock() + manager.stop_recording = AsyncMock() + + return manager + + +@pytest.fixture +def mock_scte35_manager(): + """Create a mock SCTE35Manager.""" + manager = MagicMock(spec=SCTE35Manager) + manager.inject_marker = AsyncMock() + manager.get_marker_history.return_value = [] + return manager + + +@pytest.fixture(autouse=True) +def inject_managers(mock_recorder_manager, mock_scte35_manager): + """Inject mocked managers into routes module.""" + routes_module.recorder_manager = mock_recorder_manager + routes_module.scte35_manager = mock_scte35_manager + yield + # Clean up + routes_module.recorder_manager = None + routes_module.scte35_manager = None + + +def test_health_endpoint_returns_ok(client): + """GET /api/health returns 200 with status=ok""" + response = client.get("/api/health") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + assert "ports" in data + assert "recording_count" in data + + +def test_get_all_ports_returns_list(client, mock_recorder_manager): + """GET /api/ports returns list of port statuses""" + response = client.get("/api/ports") + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + assert len(data) == 4 + assert all("port_index" in port for port in data) + assert all("is_recording" in port for port in data) + + +def test_get_port_by_index(client, mock_recorder_manager): + """GET /api/ports/0 returns single port status""" + response = client.get("/api/ports/0") + assert response.status_code == 200 + data = response.json() + assert data["port_index"] == 0 + assert "is_recording" in data + + +def test_get_port_not_found(client, mock_recorder_manager): + """GET /api/ports/999 returns 404""" + mock_recorder_manager.get_status.side_effect = ValueError("Invalid port") + response = client.get("/api/ports/999") + assert response.status_code == 404 + + +def test_start_recording(client, mock_recorder_manager): + """POST /api/ports/0/start returns 200 and starts recording""" + config = { + "port_index": 0, + "codec": "h264", + "bitrate": 50, + "quality_profile": "high", + "recording_path": "/recordings/test.ts", + "srt_enabled": False, + } + + response = client.post("/api/ports/0/start", json=config) + assert response.status_code == 200 + data = response.json() + assert data["message"] == "Recording started" + assert data["port"] == 0 + + # Verify start_recording was called + mock_recorder_manager.start_recording.assert_called_once() + + +def test_start_recording_already_running(client, mock_recorder_manager): + """POST /api/ports/0/start returns 400 if already recording""" + mock_recorder_manager.start_recording.side_effect = RuntimeError("Already recording") + + config = { + "port_index": 0, + "codec": "h264", + "bitrate": 50, + "quality_profile": "high", + "recording_path": "/recordings/test.ts", + "srt_enabled": False, + } + + response = client.post("/api/ports/0/start", json=config) + assert response.status_code == 400 + + +def test_start_recording_invalid_port(client, mock_recorder_manager): + """POST /api/ports/999/start returns 404""" + mock_recorder_manager.start_recording.side_effect = ValueError("Invalid port") + + config = { + "port_index": 999, + "codec": "h264", + "bitrate": 50, + "quality_profile": "high", + "recording_path": "/recordings/test.ts", + "srt_enabled": False, + } + + response = client.post("/api/ports/999/start", json=config) + assert response.status_code == 404 + + +def test_stop_recording(client, mock_recorder_manager): + """POST /api/ports/0/stop returns 200 and stops recording""" + response = client.post("/api/ports/0/stop") + assert response.status_code == 200 + data = response.json() + assert data["message"] == "Recording stopped" + assert data["port"] == 0 + + # Verify stop_recording was called + mock_recorder_manager.stop_recording.assert_called_once() + + +def test_stop_recording_invalid_port(client, mock_recorder_manager): + """POST /api/ports/999/stop returns 404""" + mock_recorder_manager.stop_recording.side_effect = ValueError("Invalid port") + response = client.post("/api/ports/999/stop") + assert response.status_code == 404 + + +def test_inject_scte35_marker(client, mock_scte35_manager): + """POST /api/scte35/inject returns SCTE35Marker""" + marker_response = SCTE35Marker( + event_id="12345", + duration_seconds=30, + out_of_network=True, + splice_immediate=False, + timestamp=datetime.utcnow(), + webhook_url=None, + ) + mock_scte35_manager.inject_marker.return_value = marker_response + + payload = { + "event_id": 12345, + "duration_seconds": 30.0, + "webhook_url": None, + "out_of_network": True, + "splice_immediate": False, + } + + response = client.post("/api/scte35/inject", json=payload) + assert response.status_code == 200 + data = response.json() + assert data["event_id"] == "12345" + assert data["duration_seconds"] == 30 + + # Verify inject_marker was called + mock_scte35_manager.inject_marker.assert_called_once() + + +def test_inject_scte35_with_invalid_event_id(client, mock_scte35_manager): + """POST /api/scte35/inject returns 400 for invalid event_id""" + mock_scte35_manager.inject_marker.side_effect = ValueError("Invalid event_id") + + payload = { + "event_id": -1, + "duration_seconds": 30.0, + "webhook_url": None, + "out_of_network": True, + "splice_immediate": False, + } + + response = client.post("/api/scte35/inject", json=payload) + assert response.status_code == 400 + + +def test_get_scte35_history(client, mock_scte35_manager): + """GET /api/scte35/history returns list of markers""" + marker = SCTE35Marker( + event_id="12345", + duration_seconds=30, + out_of_network=True, + splice_immediate=False, + timestamp=datetime.utcnow(), + webhook_url=None, + ) + mock_scte35_manager.get_marker_history.return_value = [marker] + + response = client.get("/api/scte35/history") + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + assert len(data) == 1 + assert data[0]["event_id"] == "12345" + + +# ============================================================================ +# WebSocket Manager Tests +# ============================================================================ + +from fastapi import WebSocket +from backend.app.api.websocket import ConnectionManager + + +@pytest.mark.asyncio +async def test_websocket_connection_manager_init(): + """ConnectionManager initializes with empty connections list""" + manager = ConnectionManager() + assert manager.active_connections == [] + + +@pytest.mark.asyncio +async def test_websocket_connect(): + """ConnectionManager can connect a WebSocket""" + manager = ConnectionManager() + + # Mock WebSocket + ws = AsyncMock(spec=WebSocket) + ws.accept = AsyncMock() + + await manager.connect(ws) + assert ws in manager.active_connections + ws.accept.assert_called_once() + + +@pytest.mark.asyncio +async def test_websocket_disconnect(): + """ConnectionManager can disconnect a WebSocket""" + manager = ConnectionManager() + + ws = AsyncMock(spec=WebSocket) + ws.accept = AsyncMock() + + await manager.connect(ws) + assert ws in manager.active_connections + + manager.disconnect(ws) + assert ws not in manager.active_connections + + +@pytest.mark.asyncio +async def test_websocket_broadcast(): + """ConnectionManager broadcasts to all connections""" + manager = ConnectionManager() + + ws1 = AsyncMock(spec=WebSocket) + ws2 = AsyncMock(spec=WebSocket) + ws1.accept = AsyncMock() + ws2.accept = AsyncMock() + ws1.send_json = AsyncMock() + ws2.send_json = AsyncMock() + + await manager.connect(ws1) + await manager.connect(ws2) + + data = {"status": "ok", "ports": 4} + await manager.broadcast(data) + + ws1.send_json.assert_called_once_with(data) + ws2.send_json.assert_called_once_with(data) + + +@pytest.mark.asyncio +async def test_websocket_broadcast_removes_dead_connections(): + """ConnectionManager removes connections that fail to send""" + manager = ConnectionManager() + + ws_good = AsyncMock(spec=WebSocket) + ws_bad = AsyncMock(spec=WebSocket) + ws_good.accept = AsyncMock() + ws_bad.accept = AsyncMock() + ws_good.send_json = AsyncMock() + ws_bad.send_json = AsyncMock(side_effect=Exception("Connection closed")) + + await manager.connect(ws_good) + await manager.connect(ws_bad) + + assert len(manager.active_connections) == 2 + + data = {"status": "ok"} + await manager.broadcast(data) + + # Dead connection should be removed + assert len(manager.active_connections) == 1 + assert ws_good in manager.active_connections + assert ws_bad not in manager.active_connections + + +@pytest.mark.asyncio +async def test_websocket_send_personal_message(): + """ConnectionManager can send personal message""" + manager = ConnectionManager() + + ws = AsyncMock(spec=WebSocket) + ws.send_text = AsyncMock() + + await manager.send_personal_message("Hello", ws) + ws.send_text.assert_called_once_with("Hello") + + +@pytest.mark.asyncio +async def test_websocket_send_personal_message_removes_bad_connection(): + """ConnectionManager removes connection on failed personal message""" + manager = ConnectionManager() + + ws = AsyncMock(spec=WebSocket) + ws.send_text = AsyncMock(side_effect=Exception("Connection closed")) + + await manager.connect(ws) + assert ws in manager.active_connections + + await manager.send_personal_message("Hello", ws) + + # Should be removed after failed send + assert ws not in manager.active_connections