Add backend/tests/test_api.py
This commit is contained in:
parent
604fdbb7d0
commit
67fdd91285
1 changed files with 373 additions and 0 deletions
373
backend/tests/test_api.py
Normal file
373
backend/tests/test_api.py
Normal file
|
|
@ -0,0 +1,373 @@
|
|||
"""Tests for FastAPI REST API endpoints."""
|
||||
|
||||
import pytest
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
from backend.app.models import RecorderConfig, PortStatus, SCTE35Marker, CodecType
|
||||
from backend.app.recorders.recorder import RecorderManager
|
||||
from backend.app.recorders.scte35 import SCTE35Manager
|
||||
from backend.app.api.routes import router
|
||||
from backend.app.api import routes as routes_module
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app():
|
||||
"""Create test FastAPI app with mocked managers."""
|
||||
test_app = FastAPI()
|
||||
test_app.include_router(router)
|
||||
return test_app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(app):
|
||||
"""Create TestClient for the app."""
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_recorder_manager():
|
||||
"""Create a mock RecorderManager."""
|
||||
manager = MagicMock(spec=RecorderManager)
|
||||
|
||||
# Mock default status for all ports
|
||||
port_statuses = [
|
||||
PortStatus(
|
||||
port_index=i,
|
||||
is_recording=False,
|
||||
frame_count=0,
|
||||
fps=0.0,
|
||||
bitrate_mbps=0.0,
|
||||
uptime_seconds=0,
|
||||
current_file="/recordings/port_{}.ts".format(i),
|
||||
codec=CodecType.H264,
|
||||
)
|
||||
for i in range(4)
|
||||
]
|
||||
|
||||
manager.get_all_status.return_value = port_statuses
|
||||
manager.get_status.return_value = port_statuses[0]
|
||||
manager.start_recording = AsyncMock()
|
||||
manager.stop_recording = AsyncMock()
|
||||
|
||||
return manager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_scte35_manager():
|
||||
"""Create a mock SCTE35Manager."""
|
||||
manager = MagicMock(spec=SCTE35Manager)
|
||||
manager.inject_marker = AsyncMock()
|
||||
manager.get_marker_history.return_value = []
|
||||
return manager
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def inject_managers(mock_recorder_manager, mock_scte35_manager):
|
||||
"""Inject mocked managers into routes module."""
|
||||
routes_module.recorder_manager = mock_recorder_manager
|
||||
routes_module.scte35_manager = mock_scte35_manager
|
||||
yield
|
||||
# Clean up
|
||||
routes_module.recorder_manager = None
|
||||
routes_module.scte35_manager = None
|
||||
|
||||
|
||||
def test_health_endpoint_returns_ok(client):
|
||||
"""GET /api/health returns 200 with status=ok"""
|
||||
response = client.get("/api/health")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["status"] == "ok"
|
||||
assert "ports" in data
|
||||
assert "recording_count" in data
|
||||
|
||||
|
||||
def test_get_all_ports_returns_list(client, mock_recorder_manager):
|
||||
"""GET /api/ports returns list of port statuses"""
|
||||
response = client.get("/api/ports")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert isinstance(data, list)
|
||||
assert len(data) == 4
|
||||
assert all("port_index" in port for port in data)
|
||||
assert all("is_recording" in port for port in data)
|
||||
|
||||
|
||||
def test_get_port_by_index(client, mock_recorder_manager):
|
||||
"""GET /api/ports/0 returns single port status"""
|
||||
response = client.get("/api/ports/0")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["port_index"] == 0
|
||||
assert "is_recording" in data
|
||||
|
||||
|
||||
def test_get_port_not_found(client, mock_recorder_manager):
|
||||
"""GET /api/ports/999 returns 404"""
|
||||
mock_recorder_manager.get_status.side_effect = ValueError("Invalid port")
|
||||
response = client.get("/api/ports/999")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_start_recording(client, mock_recorder_manager):
|
||||
"""POST /api/ports/0/start returns 200 and starts recording"""
|
||||
config = {
|
||||
"port_index": 0,
|
||||
"codec": "h264",
|
||||
"bitrate": 50,
|
||||
"quality_profile": "high",
|
||||
"recording_path": "/recordings/test.ts",
|
||||
"srt_enabled": False,
|
||||
}
|
||||
|
||||
response = client.post("/api/ports/0/start", json=config)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["message"] == "Recording started"
|
||||
assert data["port"] == 0
|
||||
|
||||
# Verify start_recording was called
|
||||
mock_recorder_manager.start_recording.assert_called_once()
|
||||
|
||||
|
||||
def test_start_recording_already_running(client, mock_recorder_manager):
|
||||
"""POST /api/ports/0/start returns 400 if already recording"""
|
||||
mock_recorder_manager.start_recording.side_effect = RuntimeError("Already recording")
|
||||
|
||||
config = {
|
||||
"port_index": 0,
|
||||
"codec": "h264",
|
||||
"bitrate": 50,
|
||||
"quality_profile": "high",
|
||||
"recording_path": "/recordings/test.ts",
|
||||
"srt_enabled": False,
|
||||
}
|
||||
|
||||
response = client.post("/api/ports/0/start", json=config)
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
def test_start_recording_invalid_port(client, mock_recorder_manager):
|
||||
"""POST /api/ports/999/start returns 404"""
|
||||
mock_recorder_manager.start_recording.side_effect = ValueError("Invalid port")
|
||||
|
||||
config = {
|
||||
"port_index": 999,
|
||||
"codec": "h264",
|
||||
"bitrate": 50,
|
||||
"quality_profile": "high",
|
||||
"recording_path": "/recordings/test.ts",
|
||||
"srt_enabled": False,
|
||||
}
|
||||
|
||||
response = client.post("/api/ports/999/start", json=config)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_stop_recording(client, mock_recorder_manager):
|
||||
"""POST /api/ports/0/stop returns 200 and stops recording"""
|
||||
response = client.post("/api/ports/0/stop")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["message"] == "Recording stopped"
|
||||
assert data["port"] == 0
|
||||
|
||||
# Verify stop_recording was called
|
||||
mock_recorder_manager.stop_recording.assert_called_once()
|
||||
|
||||
|
||||
def test_stop_recording_invalid_port(client, mock_recorder_manager):
|
||||
"""POST /api/ports/999/stop returns 404"""
|
||||
mock_recorder_manager.stop_recording.side_effect = ValueError("Invalid port")
|
||||
response = client.post("/api/ports/999/stop")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_inject_scte35_marker(client, mock_scte35_manager):
|
||||
"""POST /api/scte35/inject returns SCTE35Marker"""
|
||||
marker_response = SCTE35Marker(
|
||||
event_id="12345",
|
||||
duration_seconds=30,
|
||||
out_of_network=True,
|
||||
splice_immediate=False,
|
||||
timestamp=datetime.utcnow(),
|
||||
webhook_url=None,
|
||||
)
|
||||
mock_scte35_manager.inject_marker.return_value = marker_response
|
||||
|
||||
payload = {
|
||||
"event_id": 12345,
|
||||
"duration_seconds": 30.0,
|
||||
"webhook_url": None,
|
||||
"out_of_network": True,
|
||||
"splice_immediate": False,
|
||||
}
|
||||
|
||||
response = client.post("/api/scte35/inject", json=payload)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["event_id"] == "12345"
|
||||
assert data["duration_seconds"] == 30
|
||||
|
||||
# Verify inject_marker was called
|
||||
mock_scte35_manager.inject_marker.assert_called_once()
|
||||
|
||||
|
||||
def test_inject_scte35_with_invalid_event_id(client, mock_scte35_manager):
|
||||
"""POST /api/scte35/inject returns 400 for invalid event_id"""
|
||||
mock_scte35_manager.inject_marker.side_effect = ValueError("Invalid event_id")
|
||||
|
||||
payload = {
|
||||
"event_id": -1,
|
||||
"duration_seconds": 30.0,
|
||||
"webhook_url": None,
|
||||
"out_of_network": True,
|
||||
"splice_immediate": False,
|
||||
}
|
||||
|
||||
response = client.post("/api/scte35/inject", json=payload)
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
def test_get_scte35_history(client, mock_scte35_manager):
|
||||
"""GET /api/scte35/history returns list of markers"""
|
||||
marker = SCTE35Marker(
|
||||
event_id="12345",
|
||||
duration_seconds=30,
|
||||
out_of_network=True,
|
||||
splice_immediate=False,
|
||||
timestamp=datetime.utcnow(),
|
||||
webhook_url=None,
|
||||
)
|
||||
mock_scte35_manager.get_marker_history.return_value = [marker]
|
||||
|
||||
response = client.get("/api/scte35/history")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert isinstance(data, list)
|
||||
assert len(data) == 1
|
||||
assert data[0]["event_id"] == "12345"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# WebSocket Manager Tests
|
||||
# ============================================================================
|
||||
|
||||
from fastapi import WebSocket
|
||||
from backend.app.api.websocket import ConnectionManager
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_connection_manager_init():
|
||||
"""ConnectionManager initializes with empty connections list"""
|
||||
manager = ConnectionManager()
|
||||
assert manager.active_connections == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_connect():
|
||||
"""ConnectionManager can connect a WebSocket"""
|
||||
manager = ConnectionManager()
|
||||
|
||||
# Mock WebSocket
|
||||
ws = AsyncMock(spec=WebSocket)
|
||||
ws.accept = AsyncMock()
|
||||
|
||||
await manager.connect(ws)
|
||||
assert ws in manager.active_connections
|
||||
ws.accept.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_disconnect():
|
||||
"""ConnectionManager can disconnect a WebSocket"""
|
||||
manager = ConnectionManager()
|
||||
|
||||
ws = AsyncMock(spec=WebSocket)
|
||||
ws.accept = AsyncMock()
|
||||
|
||||
await manager.connect(ws)
|
||||
assert ws in manager.active_connections
|
||||
|
||||
manager.disconnect(ws)
|
||||
assert ws not in manager.active_connections
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_broadcast():
|
||||
"""ConnectionManager broadcasts to all connections"""
|
||||
manager = ConnectionManager()
|
||||
|
||||
ws1 = AsyncMock(spec=WebSocket)
|
||||
ws2 = AsyncMock(spec=WebSocket)
|
||||
ws1.accept = AsyncMock()
|
||||
ws2.accept = AsyncMock()
|
||||
ws1.send_json = AsyncMock()
|
||||
ws2.send_json = AsyncMock()
|
||||
|
||||
await manager.connect(ws1)
|
||||
await manager.connect(ws2)
|
||||
|
||||
data = {"status": "ok", "ports": 4}
|
||||
await manager.broadcast(data)
|
||||
|
||||
ws1.send_json.assert_called_once_with(data)
|
||||
ws2.send_json.assert_called_once_with(data)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_broadcast_removes_dead_connections():
|
||||
"""ConnectionManager removes connections that fail to send"""
|
||||
manager = ConnectionManager()
|
||||
|
||||
ws_good = AsyncMock(spec=WebSocket)
|
||||
ws_bad = AsyncMock(spec=WebSocket)
|
||||
ws_good.accept = AsyncMock()
|
||||
ws_bad.accept = AsyncMock()
|
||||
ws_good.send_json = AsyncMock()
|
||||
ws_bad.send_json = AsyncMock(side_effect=Exception("Connection closed"))
|
||||
|
||||
await manager.connect(ws_good)
|
||||
await manager.connect(ws_bad)
|
||||
|
||||
assert len(manager.active_connections) == 2
|
||||
|
||||
data = {"status": "ok"}
|
||||
await manager.broadcast(data)
|
||||
|
||||
# Dead connection should be removed
|
||||
assert len(manager.active_connections) == 1
|
||||
assert ws_good in manager.active_connections
|
||||
assert ws_bad not in manager.active_connections
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_send_personal_message():
|
||||
"""ConnectionManager can send personal message"""
|
||||
manager = ConnectionManager()
|
||||
|
||||
ws = AsyncMock(spec=WebSocket)
|
||||
ws.send_text = AsyncMock()
|
||||
|
||||
await manager.send_personal_message("Hello", ws)
|
||||
ws.send_text.assert_called_once_with("Hello")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_send_personal_message_removes_bad_connection():
|
||||
"""ConnectionManager removes connection on failed personal message"""
|
||||
manager = ConnectionManager()
|
||||
|
||||
ws = AsyncMock(spec=WebSocket)
|
||||
ws.send_text = AsyncMock(side_effect=Exception("Connection closed"))
|
||||
|
||||
await manager.connect(ws)
|
||||
assert ws in manager.active_connections
|
||||
|
||||
await manager.send_personal_message("Hello", ws)
|
||||
|
||||
# Should be removed after failed send
|
||||
assert ws not in manager.active_connections
|
||||
Loading…
Reference in a new issue