Add backend/tests/test_main.py
This commit is contained in:
parent
9dd83b2f9a
commit
9c75b84836
1 changed files with 286 additions and 0 deletions
286
backend/tests/test_main.py
Normal file
286
backend/tests/test_main.py
Normal file
|
|
@ -0,0 +1,286 @@
|
|||
"""Integration tests for the main FastAPI application."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch, Mock
|
||||
from fastapi.testclient import TestClient
|
||||
from datetime import datetime
|
||||
|
||||
# Import after mocking
|
||||
from app.models import PortStatus, CodecType
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_recorder_manager():
|
||||
"""Create a mock RecorderManager."""
|
||||
manager = MagicMock()
|
||||
|
||||
# Mock get_all_status to return sample port statuses (synchronous)
|
||||
mock_status = PortStatus(
|
||||
port_index=0,
|
||||
is_recording=False,
|
||||
frame_count=0,
|
||||
fps=29.97,
|
||||
bitrate_mbps=50.0,
|
||||
uptime_seconds=0,
|
||||
current_file="/recordings/port0.mov",
|
||||
codec=CodecType.PRORES
|
||||
)
|
||||
manager.get_all_status.return_value = [mock_status]
|
||||
manager.stop_recording = AsyncMock()
|
||||
|
||||
return manager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_scte35_manager():
|
||||
"""Create a mock SCTE35Manager."""
|
||||
manager = MagicMock()
|
||||
return manager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_hls_manager():
|
||||
"""Create a mock HLSPreviewManager."""
|
||||
manager = MagicMock()
|
||||
manager.stop_preview = AsyncMock()
|
||||
return manager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_settings():
|
||||
"""Create mock Settings."""
|
||||
settings = MagicMock()
|
||||
settings.deltacast_port_count = 4
|
||||
settings.ffmpeg_path = "/usr/bin/ffmpeg"
|
||||
settings.recording_dir = "/recordings"
|
||||
return settings
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_app(mock_recorder_manager, mock_scte35_manager, mock_hls_manager, mock_settings):
|
||||
"""Create a test FastAPI app with mocked managers."""
|
||||
|
||||
# Patch the lifespan and create app without running startup
|
||||
with patch('app.main.RecorderManager', return_value=mock_recorder_manager), \
|
||||
patch('app.main.SCTE35Manager', return_value=mock_scte35_manager), \
|
||||
patch('app.main.HLSPreviewManager', return_value=mock_hls_manager), \
|
||||
patch('app.main.settings', mock_settings):
|
||||
|
||||
from app.main import app
|
||||
|
||||
# Manually inject the mocked managers for routes
|
||||
from app.api import routes as routes_module
|
||||
routes_module.recorder_manager = mock_recorder_manager
|
||||
routes_module.scte35_manager = mock_scte35_manager
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(test_app):
|
||||
"""Create a TestClient for the test app."""
|
||||
return TestClient(test_app)
|
||||
|
||||
|
||||
class TestAppInitialization:
|
||||
"""Tests for app initialization and configuration."""
|
||||
|
||||
def test_app_has_health_endpoint(self, client):
|
||||
"""App responds to GET /api/health"""
|
||||
response = client.get("/api/health")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["status"] == "ok"
|
||||
assert "ports" in data
|
||||
assert "recording_count" in data
|
||||
|
||||
def test_app_has_ports_endpoint(self, client):
|
||||
"""App responds to GET /api/ports"""
|
||||
response = client.get("/api/ports")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert isinstance(data, list)
|
||||
assert len(data) > 0
|
||||
assert "port_index" in data[0]
|
||||
|
||||
def test_cors_middleware_enabled(self, test_app):
|
||||
"""CORS middleware is configured in the app"""
|
||||
# Check that CORSMiddleware is in the middleware stack
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
has_cors = any(
|
||||
isinstance(middleware.cls, type)
|
||||
and issubclass(middleware.cls, CORSMiddleware)
|
||||
for middleware in test_app.user_middleware
|
||||
)
|
||||
assert has_cors
|
||||
|
||||
def test_hls_endpoint_404_for_missing_file(self, client):
|
||||
"""GET /hls/nonexistent.m3u8 returns 404"""
|
||||
response = client.get("/hls/nonexistent.m3u8")
|
||||
assert response.status_code == 404
|
||||
assert "not found" in response.json()["detail"].lower()
|
||||
|
||||
|
||||
class TestBroadcastPortStatus:
|
||||
"""Tests for broadcast_port_status background task."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_port_status_sends_data(self, mock_recorder_manager, mock_settings):
|
||||
"""broadcast_port_status sends correct data structure"""
|
||||
|
||||
# Mock connection manager to capture broadcast
|
||||
broadcast_called = asyncio.Event()
|
||||
broadcast_data = {}
|
||||
|
||||
async def mock_broadcast(data):
|
||||
broadcast_data.update(data)
|
||||
broadcast_called.set()
|
||||
|
||||
# Create the task
|
||||
from app.main import broadcast_port_status
|
||||
|
||||
with patch('app.main.recorder_manager', mock_recorder_manager), \
|
||||
patch('app.main.connection_manager') as mock_conn_manager:
|
||||
|
||||
mock_conn_manager.broadcast = mock_broadcast
|
||||
|
||||
# Run broadcast once
|
||||
task = asyncio.create_task(broadcast_port_status())
|
||||
|
||||
# Wait for broadcast
|
||||
try:
|
||||
await asyncio.wait_for(broadcast_called.wait(), timeout=2.0)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Broadcast did not complete within timeout")
|
||||
finally:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# Verify broadcast data structure
|
||||
assert "type" in broadcast_data
|
||||
assert broadcast_data["type"] == "port_status"
|
||||
assert "ports" in broadcast_data
|
||||
assert isinstance(broadcast_data["ports"], list)
|
||||
if broadcast_data["ports"]:
|
||||
assert "port_index" in broadcast_data["ports"][0]
|
||||
|
||||
|
||||
class TestWebSocketIntegration:
|
||||
"""Tests for WebSocket functionality."""
|
||||
|
||||
def test_websocket_endpoint_exists(self, client):
|
||||
"""WebSocket endpoint is defined in app routes"""
|
||||
from app.main import app
|
||||
|
||||
ws_routes = [
|
||||
route for route in app.routes
|
||||
if hasattr(route, 'path') and route.path == "/ws"
|
||||
]
|
||||
assert len(ws_routes) > 0
|
||||
|
||||
def test_connection_manager_initialization(self):
|
||||
"""ConnectionManager is initialized with empty connections"""
|
||||
from app.api.websocket import ConnectionManager
|
||||
|
||||
manager = ConnectionManager()
|
||||
assert manager.active_connections == []
|
||||
|
||||
|
||||
class TestHLSEndpoint:
|
||||
"""Tests for HLS file serving endpoint."""
|
||||
|
||||
def test_hls_endpoint_returns_404_for_missing_file(self, client):
|
||||
"""HLS endpoint returns 404 for non-existent file"""
|
||||
response = client.get("/hls/missing-segment.ts")
|
||||
assert response.status_code == 404
|
||||
|
||||
def test_hls_endpoint_handles_various_filenames(self, client):
|
||||
"""HLS endpoint correctly handles various filename patterns"""
|
||||
test_files = [
|
||||
"stream.m3u8",
|
||||
"segment-001.ts",
|
||||
"playlist_index.m3u8",
|
||||
"../../../etc/passwd", # Path traversal attempt
|
||||
]
|
||||
|
||||
for filename in test_files:
|
||||
response = client.get(f"/hls/{filename}")
|
||||
# Should all return 404 since files don't exist
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
class TestAPIRouter:
|
||||
"""Tests to verify API router is properly included."""
|
||||
|
||||
def test_api_router_is_included(self, client):
|
||||
"""API router is included in app"""
|
||||
response = client.get("/api/health")
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_api_prefix_is_correct(self, client):
|
||||
"""API routes have /api prefix"""
|
||||
response = client.get("/api/health")
|
||||
assert response.status_code == 200
|
||||
|
||||
# Routes without /api should not work
|
||||
response = client.get("/health")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
class TestErrorHandling:
|
||||
"""Tests for error handling in the application."""
|
||||
|
||||
def test_404_for_undefined_route(self, client):
|
||||
"""Undefined routes return 404"""
|
||||
response = client.get("/api/nonexistent")
|
||||
assert response.status_code == 404
|
||||
|
||||
def test_404_error_is_dict(self, client):
|
||||
"""404 errors return JSON response"""
|
||||
response = client.get("/api/nonexistent")
|
||||
assert isinstance(response.json(), dict)
|
||||
assert "detail" in response.json()
|
||||
|
||||
|
||||
class TestAppMetadata:
|
||||
"""Tests for app metadata and OpenAPI documentation."""
|
||||
|
||||
def test_app_has_title(self, client):
|
||||
"""App has correct title"""
|
||||
from app.main import app
|
||||
assert app.title == "Deltacast SDI Recorder"
|
||||
|
||||
def test_app_has_version(self, client):
|
||||
"""App has version information"""
|
||||
from app.main import app
|
||||
assert app.version == "1.0.0"
|
||||
|
||||
def test_openapi_schema_available(self, client):
|
||||
"""OpenAPI schema is available"""
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200
|
||||
schema = response.json()
|
||||
assert "openapi" in schema
|
||||
assert "paths" in schema
|
||||
|
||||
|
||||
class TestManagerInjection:
|
||||
"""Tests for proper manager injection into routes module."""
|
||||
|
||||
def test_managers_injected_into_routes(self, mock_recorder_manager, mock_scte35_manager):
|
||||
"""Managers are correctly injected into routes module"""
|
||||
from app.api import routes as routes_module
|
||||
|
||||
# Managers should be set by the fixture
|
||||
assert routes_module.recorder_manager is not None
|
||||
assert routes_module.scte35_manager is not None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Loading…
Reference in a new issue