373 lines
11 KiB
Python
373 lines
11 KiB
Python
"""Tests for FastAPI REST API endpoints."""
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
from datetime import datetime
|
|
from unittest.mock import MagicMock, AsyncMock, patch
|
|
|
|
from backend.app.models import RecorderConfig, PortStatus, SCTE35Marker, CodecType
|
|
from backend.app.recorders.recorder import RecorderManager
|
|
from backend.app.recorders.scte35 import SCTE35Manager
|
|
from backend.app.api.routes import router
|
|
from backend.app.api import routes as routes_module
|
|
|
|
|
|
@pytest.fixture
|
|
def app():
|
|
"""Create test FastAPI app with mocked managers."""
|
|
test_app = FastAPI()
|
|
test_app.include_router(router)
|
|
return test_app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
"""Create TestClient for the app."""
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_recorder_manager():
|
|
"""Create a mock RecorderManager."""
|
|
manager = MagicMock(spec=RecorderManager)
|
|
|
|
# Mock default status for all ports
|
|
port_statuses = [
|
|
PortStatus(
|
|
port_index=i,
|
|
is_recording=False,
|
|
frame_count=0,
|
|
fps=0.0,
|
|
bitrate_mbps=0.0,
|
|
uptime_seconds=0,
|
|
current_file="/recordings/port_{}.ts".format(i),
|
|
codec=CodecType.H264,
|
|
)
|
|
for i in range(4)
|
|
]
|
|
|
|
manager.get_all_status.return_value = port_statuses
|
|
manager.get_status.return_value = port_statuses[0]
|
|
manager.start_recording = AsyncMock()
|
|
manager.stop_recording = AsyncMock()
|
|
|
|
return manager
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_scte35_manager():
|
|
"""Create a mock SCTE35Manager."""
|
|
manager = MagicMock(spec=SCTE35Manager)
|
|
manager.inject_marker = AsyncMock()
|
|
manager.get_marker_history.return_value = []
|
|
return manager
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def inject_managers(mock_recorder_manager, mock_scte35_manager):
|
|
"""Inject mocked managers into routes module."""
|
|
routes_module.recorder_manager = mock_recorder_manager
|
|
routes_module.scte35_manager = mock_scte35_manager
|
|
yield
|
|
# Clean up
|
|
routes_module.recorder_manager = None
|
|
routes_module.scte35_manager = None
|
|
|
|
|
|
def test_health_endpoint_returns_ok(client):
|
|
"""GET /api/health returns 200 with status=ok"""
|
|
response = client.get("/api/health")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "ok"
|
|
assert "ports" in data
|
|
assert "recording_count" in data
|
|
|
|
|
|
def test_get_all_ports_returns_list(client, mock_recorder_manager):
|
|
"""GET /api/ports returns list of port statuses"""
|
|
response = client.get("/api/ports")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) == 4
|
|
assert all("port_index" in port for port in data)
|
|
assert all("is_recording" in port for port in data)
|
|
|
|
|
|
def test_get_port_by_index(client, mock_recorder_manager):
|
|
"""GET /api/ports/0 returns single port status"""
|
|
response = client.get("/api/ports/0")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["port_index"] == 0
|
|
assert "is_recording" in data
|
|
|
|
|
|
def test_get_port_not_found(client, mock_recorder_manager):
|
|
"""GET /api/ports/999 returns 404"""
|
|
mock_recorder_manager.get_status.side_effect = ValueError("Invalid port")
|
|
response = client.get("/api/ports/999")
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_start_recording(client, mock_recorder_manager):
|
|
"""POST /api/ports/0/start returns 200 and starts recording"""
|
|
config = {
|
|
"port_index": 0,
|
|
"codec": "h264",
|
|
"bitrate": 50,
|
|
"quality_profile": "high",
|
|
"recording_path": "/recordings/test.ts",
|
|
"srt_enabled": False,
|
|
}
|
|
|
|
response = client.post("/api/ports/0/start", json=config)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["message"] == "Recording started"
|
|
assert data["port"] == 0
|
|
|
|
# Verify start_recording was called
|
|
mock_recorder_manager.start_recording.assert_called_once()
|
|
|
|
|
|
def test_start_recording_already_running(client, mock_recorder_manager):
|
|
"""POST /api/ports/0/start returns 400 if already recording"""
|
|
mock_recorder_manager.start_recording.side_effect = RuntimeError("Already recording")
|
|
|
|
config = {
|
|
"port_index": 0,
|
|
"codec": "h264",
|
|
"bitrate": 50,
|
|
"quality_profile": "high",
|
|
"recording_path": "/recordings/test.ts",
|
|
"srt_enabled": False,
|
|
}
|
|
|
|
response = client.post("/api/ports/0/start", json=config)
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_start_recording_invalid_port(client, mock_recorder_manager):
|
|
"""POST /api/ports/999/start returns 404"""
|
|
mock_recorder_manager.start_recording.side_effect = ValueError("Invalid port")
|
|
|
|
config = {
|
|
"port_index": 999,
|
|
"codec": "h264",
|
|
"bitrate": 50,
|
|
"quality_profile": "high",
|
|
"recording_path": "/recordings/test.ts",
|
|
"srt_enabled": False,
|
|
}
|
|
|
|
response = client.post("/api/ports/999/start", json=config)
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_stop_recording(client, mock_recorder_manager):
|
|
"""POST /api/ports/0/stop returns 200 and stops recording"""
|
|
response = client.post("/api/ports/0/stop")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["message"] == "Recording stopped"
|
|
assert data["port"] == 0
|
|
|
|
# Verify stop_recording was called
|
|
mock_recorder_manager.stop_recording.assert_called_once()
|
|
|
|
|
|
def test_stop_recording_invalid_port(client, mock_recorder_manager):
|
|
"""POST /api/ports/999/stop returns 404"""
|
|
mock_recorder_manager.stop_recording.side_effect = ValueError("Invalid port")
|
|
response = client.post("/api/ports/999/stop")
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_inject_scte35_marker(client, mock_scte35_manager):
|
|
"""POST /api/scte35/inject returns SCTE35Marker"""
|
|
marker_response = SCTE35Marker(
|
|
event_id="12345",
|
|
duration_seconds=30,
|
|
out_of_network=True,
|
|
splice_immediate=False,
|
|
timestamp=datetime.utcnow(),
|
|
webhook_url=None,
|
|
)
|
|
mock_scte35_manager.inject_marker.return_value = marker_response
|
|
|
|
payload = {
|
|
"event_id": 12345,
|
|
"duration_seconds": 30.0,
|
|
"webhook_url": None,
|
|
"out_of_network": True,
|
|
"splice_immediate": False,
|
|
}
|
|
|
|
response = client.post("/api/scte35/inject", json=payload)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["event_id"] == "12345"
|
|
assert data["duration_seconds"] == 30
|
|
|
|
# Verify inject_marker was called
|
|
mock_scte35_manager.inject_marker.assert_called_once()
|
|
|
|
|
|
def test_inject_scte35_with_invalid_event_id(client, mock_scte35_manager):
|
|
"""POST /api/scte35/inject returns 400 for invalid event_id"""
|
|
mock_scte35_manager.inject_marker.side_effect = ValueError("Invalid event_id")
|
|
|
|
payload = {
|
|
"event_id": -1,
|
|
"duration_seconds": 30.0,
|
|
"webhook_url": None,
|
|
"out_of_network": True,
|
|
"splice_immediate": False,
|
|
}
|
|
|
|
response = client.post("/api/scte35/inject", json=payload)
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_get_scte35_history(client, mock_scte35_manager):
|
|
"""GET /api/scte35/history returns list of markers"""
|
|
marker = SCTE35Marker(
|
|
event_id="12345",
|
|
duration_seconds=30,
|
|
out_of_network=True,
|
|
splice_immediate=False,
|
|
timestamp=datetime.utcnow(),
|
|
webhook_url=None,
|
|
)
|
|
mock_scte35_manager.get_marker_history.return_value = [marker]
|
|
|
|
response = client.get("/api/scte35/history")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
assert data[0]["event_id"] == "12345"
|
|
|
|
|
|
# ============================================================================
|
|
# WebSocket Manager Tests
|
|
# ============================================================================
|
|
|
|
from fastapi import WebSocket
|
|
from backend.app.api.websocket import ConnectionManager
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_connection_manager_init():
|
|
"""ConnectionManager initializes with empty connections list"""
|
|
manager = ConnectionManager()
|
|
assert manager.active_connections == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_connect():
|
|
"""ConnectionManager can connect a WebSocket"""
|
|
manager = ConnectionManager()
|
|
|
|
# Mock WebSocket
|
|
ws = AsyncMock(spec=WebSocket)
|
|
ws.accept = AsyncMock()
|
|
|
|
await manager.connect(ws)
|
|
assert ws in manager.active_connections
|
|
ws.accept.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_disconnect():
|
|
"""ConnectionManager can disconnect a WebSocket"""
|
|
manager = ConnectionManager()
|
|
|
|
ws = AsyncMock(spec=WebSocket)
|
|
ws.accept = AsyncMock()
|
|
|
|
await manager.connect(ws)
|
|
assert ws in manager.active_connections
|
|
|
|
manager.disconnect(ws)
|
|
assert ws not in manager.active_connections
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_broadcast():
|
|
"""ConnectionManager broadcasts to all connections"""
|
|
manager = ConnectionManager()
|
|
|
|
ws1 = AsyncMock(spec=WebSocket)
|
|
ws2 = AsyncMock(spec=WebSocket)
|
|
ws1.accept = AsyncMock()
|
|
ws2.accept = AsyncMock()
|
|
ws1.send_json = AsyncMock()
|
|
ws2.send_json = AsyncMock()
|
|
|
|
await manager.connect(ws1)
|
|
await manager.connect(ws2)
|
|
|
|
data = {"status": "ok", "ports": 4}
|
|
await manager.broadcast(data)
|
|
|
|
ws1.send_json.assert_called_once_with(data)
|
|
ws2.send_json.assert_called_once_with(data)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_broadcast_removes_dead_connections():
|
|
"""ConnectionManager removes connections that fail to send"""
|
|
manager = ConnectionManager()
|
|
|
|
ws_good = AsyncMock(spec=WebSocket)
|
|
ws_bad = AsyncMock(spec=WebSocket)
|
|
ws_good.accept = AsyncMock()
|
|
ws_bad.accept = AsyncMock()
|
|
ws_good.send_json = AsyncMock()
|
|
ws_bad.send_json = AsyncMock(side_effect=Exception("Connection closed"))
|
|
|
|
await manager.connect(ws_good)
|
|
await manager.connect(ws_bad)
|
|
|
|
assert len(manager.active_connections) == 2
|
|
|
|
data = {"status": "ok"}
|
|
await manager.broadcast(data)
|
|
|
|
# Dead connection should be removed
|
|
assert len(manager.active_connections) == 1
|
|
assert ws_good in manager.active_connections
|
|
assert ws_bad not in manager.active_connections
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_send_personal_message():
|
|
"""ConnectionManager can send personal message"""
|
|
manager = ConnectionManager()
|
|
|
|
ws = AsyncMock(spec=WebSocket)
|
|
ws.send_text = AsyncMock()
|
|
|
|
await manager.send_personal_message("Hello", ws)
|
|
ws.send_text.assert_called_once_with("Hello")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_send_personal_message_removes_bad_connection():
|
|
"""ConnectionManager removes connection on failed personal message"""
|
|
manager = ConnectionManager()
|
|
|
|
ws = AsyncMock(spec=WebSocket)
|
|
ws.send_text = AsyncMock(side_effect=Exception("Connection closed"))
|
|
|
|
await manager.connect(ws)
|
|
assert ws in manager.active_connections
|
|
|
|
await manager.send_personal_message("Hello", ws)
|
|
|
|
# Should be removed after failed send
|
|
assert ws not in manager.active_connections
|