From 52157add93940e4021fe33327d8ebad918db058e Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Tue, 14 Apr 2026 09:21:13 -0400 Subject: [PATCH] Add backend/tests/test_recorder.py --- backend/tests/test_recorder.py | 441 +++++++++++++++++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 backend/tests/test_recorder.py diff --git a/backend/tests/test_recorder.py b/backend/tests/test_recorder.py new file mode 100644 index 0000000..d861e6b --- /dev/null +++ b/backend/tests/test_recorder.py @@ -0,0 +1,441 @@ +"""Tests for FFmpeg recorder functionality.""" + +import pytest +import asyncio +from unittest.mock import Mock, AsyncMock, MagicMock, patch +from backend.app.config import Settings +from backend.app.models import RecorderConfig, CodecType, PortStatus +from backend.app.utils.ffmpeg import FFmpegCommandBuilder +from backend.app.recorders.recorder import PortRecorder, RecorderManager + + +# Fixtures +@pytest.fixture +def settings(): + """Create test settings.""" + return Settings( + ffmpeg_path="/usr/bin/ffmpeg", + recording_dir="/recordings", + deltacast_port_count=4, + srt_enabled=True, + srt_latency=5000, + ) + + +@pytest.fixture +def command_builder(settings): + """Create FFmpeg command builder.""" + return FFmpegCommandBuilder(settings) + + +@pytest.fixture +def prores_config(): + """Create a ProRes RecorderConfig.""" + return RecorderConfig( + port_index=0, + codec=CodecType.PRORES, + bitrate=0, + quality_profile="hq", + recording_path="/recordings/prores_{timestamp}.mxf", + srt_enabled=False, + ) + + +@pytest.fixture +def dnxhd_config(): + """Create a DNxHD RecorderConfig.""" + return RecorderConfig( + port_index=1, + codec=CodecType.DNXHD, + bitrate=185, + quality_profile="mq", + recording_path="/recordings/dnxhd_{timestamp}.mxf", + srt_enabled=False, + ) + + +@pytest.fixture +def h264_config(): + """Create an H264 RecorderConfig.""" + return RecorderConfig( + port_index=2, + codec=CodecType.H264, + bitrate=50, + quality_profile="lq", + recording_path="/recordings/h264_{timestamp}.mxf", + srt_enabled=False, + ) + + +@pytest.fixture +def uncompressed_config(): + """Create an uncompressed RecorderConfig.""" + return RecorderConfig( + port_index=3, + codec=CodecType.UNCOMPRESSED, + bitrate=0, + quality_profile="hq", + recording_path="/recordings/uncompressed_{timestamp}.mxf", + srt_enabled=False, + ) + + +@pytest.fixture +def srt_config(): + """Create a RecorderConfig with SRT enabled.""" + return RecorderConfig( + port_index=0, + codec=CodecType.H264, + bitrate=50, + quality_profile="mq", + recording_path="/recordings/srt_test_{timestamp}.mxf", + srt_enabled=True, + srt_destination="srt://localhost:9001", + ) + + +# FFmpegCommandBuilder Tests + +class TestFFmpegCommandBuilder: + """Test suite for FFmpegCommandBuilder.""" + + def test_build_command_prores(self, command_builder, prores_config): + """ProRes command includes prores_ks codec and MXF output.""" + command = command_builder.build_command(prores_config) + + # Check for required components + assert "/usr/bin/ffmpeg" in command + assert "-f" in command + assert "deltacast" in command + assert "-i" in command + assert "deltacast://0" in command + assert "-c:v" in command + assert "prores_ks" in command + assert "-profile:v" in command + assert "3" in command # High quality profile + assert "-f" in command + assert "mxf" in command + + def test_build_command_dnxhd(self, command_builder, dnxhd_config): + """DNxHD command includes dnxhd codec with bitrate.""" + command = command_builder.build_command(dnxhd_config) + + assert "-c:v" in command + assert "dnxhd" in command + assert "-b:v" in command + assert "185M" in command + + def test_build_command_h264(self, command_builder, h264_config): + """H264 command includes libx264.""" + command = command_builder.build_command(h264_config) + + assert "-c:v" in command + assert "libx264" in command + assert "-b:v" in command + assert "50M" in command + + def test_build_command_uncompressed(self, command_builder, uncompressed_config): + """Uncompressed uses rawvideo.""" + command = command_builder.build_command(uncompressed_config) + + assert "-c:v" in command + assert "rawvideo" in command + assert "-pix_fmt" in command + assert "uyvy422" in command + + def test_build_command_with_srt(self, command_builder, srt_config): + """SRT-enabled config appends SRT output.""" + command = command_builder.build_command(srt_config) + + # Should have SRT output + srt_index = None + for i, arg in enumerate(command): + if arg == "srt://localhost:9001": + srt_index = i + break + + assert srt_index is not None + # Check that -f mpegts appears before SRT destination + mpegts_index = command.index("mpegts") + assert mpegts_index < srt_index + + def test_build_command_without_srt(self, command_builder, h264_config): + """SRT-disabled config has no SRT output.""" + command = command_builder.build_command(h264_config) + + # Should not contain mpegts (which is for SRT output) + assert "mpegts" not in command + + def test_deltacast_input_format(self, command_builder, h264_config): + """Input uses deltacast:// scheme with port index.""" + command = command_builder.build_command(h264_config) + + assert "deltacast://2" in command + assert "-f" in command + assert "deltacast" in command + + def test_timestamp_substitution(self, command_builder, prores_config): + """Output path substitutes {timestamp} with datetime.""" + command = command_builder.build_command(prores_config) + + # Find the mxf output file + mxf_index = command.index("mxf") + 1 + output_file = command[mxf_index] + + # Should not contain {timestamp} anymore + assert "{timestamp}" not in output_file + # Should contain datetime format + assert ".mxf" in output_file + + def test_prores_quality_profiles(self, command_builder, settings): + """ProRes codec maps quality profiles correctly.""" + # Test each quality profile + profiles = { + "hq": "3", + "mq": "2", + "lq": "0", + } + + for profile_name, expected_value in profiles.items(): + config = RecorderConfig( + port_index=0, + codec=CodecType.PRORES, + bitrate=0, + quality_profile=profile_name, + recording_path="/recordings/test.mxf", + ) + command = command_builder.build_command(config) + assert expected_value in command + + +# PortRecorder Tests + +class TestPortRecorder: + """Test suite for PortRecorder.""" + + def test_initialization(self, settings): + """PortRecorder initializes with correct state.""" + recorder = PortRecorder(0, settings) + + assert recorder.port_index == 0 + assert recorder._process is None + assert recorder._config is None + assert recorder._start_time is None + assert recorder._frame_count == 0 + assert recorder._current_file == "" + + def test_get_status_idle(self, settings): + """Idle recorder returns is_recording=False.""" + recorder = PortRecorder(0, settings) + status = recorder.get_status() + + assert isinstance(status, PortStatus) + assert status.port_index == 0 + assert status.is_recording is False + assert status.frame_count == 0 + assert status.fps == 0.0 + assert status.uptime_seconds == 0 + + @pytest.mark.asyncio + async def test_start_recording(self, settings, h264_config): + """Start sets recording state correctly.""" + recorder = PortRecorder(2, settings) + + # Mock the subprocess creation + mock_process = AsyncMock() + mock_process.pid = 12345 + mock_process.done.return_value = False + mock_process.stdin = AsyncMock() + mock_process.stdin.is_closing.return_value = False + mock_process.stderr = AsyncMock() + + with patch( + "asyncio.create_subprocess_exec", + return_value=mock_process + ) as mock_create: + await recorder.start(h264_config) + + # Verify subprocess was created + mock_create.assert_called_once() + args = mock_create.call_args + assert args[1]["stdin"] == asyncio.subprocess.PIPE + assert args[1]["stderr"] == asyncio.subprocess.PIPE + + # Verify recording state + assert recorder._process is not None + assert recorder._config == h264_config + assert recorder._start_time is not None + + @pytest.mark.asyncio + async def test_start_already_running(self, settings, h264_config): + """Starting when already running raises RuntimeError.""" + recorder = PortRecorder(2, settings) + recorder._process = Mock() # Simulate already running + + with pytest.raises(RuntimeError, match="already recording"): + await recorder.start(h264_config) + + @pytest.mark.asyncio + async def test_stop_recording(self, settings, h264_config): + """Stop cleans up process correctly.""" + recorder = PortRecorder(2, settings) + + # Mock the subprocess with proper async mocks + mock_stdin = MagicMock() + mock_stdin.is_closing = Mock(return_value=False) + mock_stdin.write = Mock() + mock_stdin.drain = AsyncMock() + + mock_process = MagicMock() + mock_process.pid = 12345 + mock_process.done = Mock(return_value=False) + mock_process.stdin = mock_stdin + mock_process.wait = AsyncMock(return_value=None) + mock_process.terminate = Mock() + mock_process.kill = Mock() + mock_process.stderr = AsyncMock() + + recorder._process = mock_process + recorder._start_time = 1000.0 # Mock start time + + # Create a mock task that's done + mock_task = MagicMock() + mock_task.done = Mock(return_value=True) + recorder._monitor_task = mock_task + + await recorder.stop() + + # Verify stdin quit was attempted + mock_stdin.write.assert_called_once_with(b"q\n") + # Verify process was set to None + assert recorder._process is None + assert recorder._start_time is None + + @pytest.mark.asyncio + async def test_stop_when_idle(self, settings): + """Stopping an idle recorder is a no-op.""" + recorder = PortRecorder(0, settings) + # Should not raise + await recorder.stop() + assert recorder._process is None + + +# RecorderManager Tests + +class TestRecorderManager: + """Test suite for RecorderManager.""" + + def test_initialize(self, settings): + """Manager creates correct number of recorders.""" + manager = RecorderManager(settings) + manager.initialize() + + assert len(manager._recorders) == 4 + assert all(isinstance(r, PortRecorder) for r in manager._recorders.values()) + assert all(manager._recorders[i].port_index == i for i in range(4)) + + def test_get_status_idle(self, settings): + """Get status for idle port returns idle status.""" + manager = RecorderManager(settings) + manager.initialize() + + status = manager.get_status(0) + + assert isinstance(status, PortStatus) + assert status.port_index == 0 + assert status.is_recording is False + + def test_get_all_status(self, settings): + """Get all status returns list for each port.""" + manager = RecorderManager(settings) + manager.initialize() + + statuses = manager.get_all_status() + + assert len(statuses) == 4 + assert all(isinstance(s, PortStatus) for s in statuses) + assert [s.port_index for s in statuses] == [0, 1, 2, 3] + + @pytest.mark.asyncio + async def test_start_recording(self, settings, h264_config): + """Start recording on a valid port.""" + manager = RecorderManager(settings) + manager.initialize() + + # Mock subprocess creation with proper setup + mock_stdin = AsyncMock() + mock_stdin.is_closing.return_value = False + mock_stdin.drain = AsyncMock() + + mock_process = AsyncMock() + mock_process.pid = 12345 + mock_process.done.return_value = False + mock_process.stdin = mock_stdin + mock_process.stderr = AsyncMock() + + with patch("asyncio.create_subprocess_exec", return_value=mock_process): + await manager.start_recording(2, h264_config) + + # Verify recording started by checking the recorder has a process + assert manager._recorders[2]._process is not None + + @pytest.mark.asyncio + async def test_start_recording_invalid_port(self, settings, h264_config): + """Starting on invalid port raises ValueError.""" + manager = RecorderManager(settings) + manager.initialize() + + with pytest.raises(ValueError, match="Invalid port index"): + await manager.start_recording(99, h264_config) + + @pytest.mark.asyncio + async def test_stop_recording(self, settings, h264_config): + """Stop recording on a port.""" + manager = RecorderManager(settings) + manager.initialize() + + # First start recording + mock_stdin = MagicMock() + mock_stdin.is_closing = Mock(return_value=False) + mock_stdin.write = Mock() + mock_stdin.drain = AsyncMock() + + mock_process = MagicMock() + mock_process.pid = 12345 + mock_process.done = Mock(return_value=False) + mock_process.stdin = mock_stdin + mock_process.wait = AsyncMock(return_value=None) + mock_process.stderr = AsyncMock() + + # Create a mock task that's done + mock_task = MagicMock() + mock_task.done = Mock(return_value=True) + + with patch("asyncio.create_subprocess_exec", return_value=mock_process): + await manager.start_recording(2, h264_config) + # Replace monitor task with completed mock + manager._recorders[2]._monitor_task = mock_task + + # Then stop + await manager.stop_recording(2) + + # Verify recording stopped + status = manager.get_status(2) + assert status.is_recording is False + + @pytest.mark.asyncio + async def test_stop_recording_invalid_port(self, settings): + """Stopping on invalid port raises ValueError.""" + manager = RecorderManager(settings) + manager.initialize() + + with pytest.raises(ValueError, match="Invalid port index"): + await manager.stop_recording(99) + + def test_get_status_invalid_port(self, settings): + """Getting status for invalid port raises ValueError.""" + manager = RecorderManager(settings) + manager.initialize() + + with pytest.raises(ValueError, match="Invalid port index"): + manager.get_status(99)