"""Tests for FFmpeg recorder functionality.""" import pytest import asyncio from unittest.mock import Mock, AsyncMock, MagicMock, patch from backend.app.config import Settings from backend.app.models import RecorderConfig, CodecType, PortStatus from backend.app.utils.ffmpeg import FFmpegCommandBuilder from backend.app.recorders.recorder import PortRecorder, RecorderManager # Fixtures @pytest.fixture def settings(): """Create test settings.""" return Settings( ffmpeg_path="/usr/bin/ffmpeg", recording_dir="/recordings", deltacast_port_count=4, srt_enabled=True, srt_latency=5000, ) @pytest.fixture def command_builder(settings): """Create FFmpeg command builder.""" return FFmpegCommandBuilder(settings) @pytest.fixture def prores_config(): """Create a ProRes RecorderConfig.""" return RecorderConfig( port_index=0, codec=CodecType.PRORES, bitrate=0, quality_profile="hq", recording_path="/recordings/prores_{timestamp}.mxf", srt_enabled=False, ) @pytest.fixture def dnxhd_config(): """Create a DNxHD RecorderConfig.""" return RecorderConfig( port_index=1, codec=CodecType.DNXHD, bitrate=185, quality_profile="mq", recording_path="/recordings/dnxhd_{timestamp}.mxf", srt_enabled=False, ) @pytest.fixture def h264_config(): """Create an H264 RecorderConfig.""" return RecorderConfig( port_index=2, codec=CodecType.H264, bitrate=50, quality_profile="lq", recording_path="/recordings/h264_{timestamp}.mxf", srt_enabled=False, ) @pytest.fixture def uncompressed_config(): """Create an uncompressed RecorderConfig.""" return RecorderConfig( port_index=3, codec=CodecType.UNCOMPRESSED, bitrate=0, quality_profile="hq", recording_path="/recordings/uncompressed_{timestamp}.mxf", srt_enabled=False, ) @pytest.fixture def srt_config(): """Create a RecorderConfig with SRT enabled.""" return RecorderConfig( port_index=0, codec=CodecType.H264, bitrate=50, quality_profile="mq", recording_path="/recordings/srt_test_{timestamp}.mxf", srt_enabled=True, srt_destination="srt://localhost:9001", ) # FFmpegCommandBuilder Tests class TestFFmpegCommandBuilder: """Test suite for FFmpegCommandBuilder.""" def test_build_command_prores(self, command_builder, prores_config): """ProRes command includes prores_ks codec and MXF output.""" command = command_builder.build_command(prores_config) # Check for required components assert "/usr/bin/ffmpeg" in command assert "-f" in command assert "deltacast" in command assert "-i" in command assert "deltacast://0" in command assert "-c:v" in command assert "prores_ks" in command assert "-profile:v" in command assert "3" in command # High quality profile assert "-f" in command assert "mxf" in command def test_build_command_dnxhd(self, command_builder, dnxhd_config): """DNxHD command includes dnxhd codec with bitrate.""" command = command_builder.build_command(dnxhd_config) assert "-c:v" in command assert "dnxhd" in command assert "-b:v" in command assert "185M" in command def test_build_command_h264(self, command_builder, h264_config): """H264 command includes libx264.""" command = command_builder.build_command(h264_config) assert "-c:v" in command assert "libx264" in command assert "-b:v" in command assert "50M" in command def test_build_command_uncompressed(self, command_builder, uncompressed_config): """Uncompressed uses rawvideo.""" command = command_builder.build_command(uncompressed_config) assert "-c:v" in command assert "rawvideo" in command assert "-pix_fmt" in command assert "uyvy422" in command def test_build_command_with_srt(self, command_builder, srt_config): """SRT-enabled config appends SRT output.""" command = command_builder.build_command(srt_config) # Should have SRT output srt_index = None for i, arg in enumerate(command): if arg == "srt://localhost:9001": srt_index = i break assert srt_index is not None # Check that -f mpegts appears before SRT destination mpegts_index = command.index("mpegts") assert mpegts_index < srt_index def test_build_command_without_srt(self, command_builder, h264_config): """SRT-disabled config has no SRT output.""" command = command_builder.build_command(h264_config) # Should not contain mpegts (which is for SRT output) assert "mpegts" not in command def test_deltacast_input_format(self, command_builder, h264_config): """Input uses deltacast:// scheme with port index.""" command = command_builder.build_command(h264_config) assert "deltacast://2" in command assert "-f" in command assert "deltacast" in command def test_timestamp_substitution(self, command_builder, prores_config): """Output path substitutes {timestamp} with datetime.""" command = command_builder.build_command(prores_config) # Find the mxf output file mxf_index = command.index("mxf") + 1 output_file = command[mxf_index] # Should not contain {timestamp} anymore assert "{timestamp}" not in output_file # Should contain datetime format assert ".mxf" in output_file def test_prores_quality_profiles(self, command_builder, settings): """ProRes codec maps quality profiles correctly.""" # Test each quality profile profiles = { "hq": "3", "mq": "2", "lq": "0", } for profile_name, expected_value in profiles.items(): config = RecorderConfig( port_index=0, codec=CodecType.PRORES, bitrate=0, quality_profile=profile_name, recording_path="/recordings/test.mxf", ) command = command_builder.build_command(config) assert expected_value in command # PortRecorder Tests class TestPortRecorder: """Test suite for PortRecorder.""" def test_initialization(self, settings): """PortRecorder initializes with correct state.""" recorder = PortRecorder(0, settings) assert recorder.port_index == 0 assert recorder._process is None assert recorder._config is None assert recorder._start_time is None assert recorder._frame_count == 0 assert recorder._current_file == "" def test_get_status_idle(self, settings): """Idle recorder returns is_recording=False.""" recorder = PortRecorder(0, settings) status = recorder.get_status() assert isinstance(status, PortStatus) assert status.port_index == 0 assert status.is_recording is False assert status.frame_count == 0 assert status.fps == 0.0 assert status.uptime_seconds == 0 @pytest.mark.asyncio async def test_start_recording(self, settings, h264_config): """Start sets recording state correctly.""" recorder = PortRecorder(2, settings) # Mock the subprocess creation mock_process = AsyncMock() mock_process.pid = 12345 mock_process.done.return_value = False mock_process.stdin = AsyncMock() mock_process.stdin.is_closing.return_value = False mock_process.stderr = AsyncMock() with patch( "asyncio.create_subprocess_exec", return_value=mock_process ) as mock_create: await recorder.start(h264_config) # Verify subprocess was created mock_create.assert_called_once() args = mock_create.call_args assert args[1]["stdin"] == asyncio.subprocess.PIPE assert args[1]["stderr"] == asyncio.subprocess.PIPE # Verify recording state assert recorder._process is not None assert recorder._config == h264_config assert recorder._start_time is not None @pytest.mark.asyncio async def test_start_already_running(self, settings, h264_config): """Starting when already running raises RuntimeError.""" recorder = PortRecorder(2, settings) recorder._process = Mock() # Simulate already running with pytest.raises(RuntimeError, match="already recording"): await recorder.start(h264_config) @pytest.mark.asyncio async def test_stop_recording(self, settings, h264_config): """Stop cleans up process correctly.""" recorder = PortRecorder(2, settings) # Mock the subprocess with proper async mocks mock_stdin = MagicMock() mock_stdin.is_closing = Mock(return_value=False) mock_stdin.write = Mock() mock_stdin.drain = AsyncMock() mock_process = MagicMock() mock_process.pid = 12345 mock_process.done = Mock(return_value=False) mock_process.stdin = mock_stdin mock_process.wait = AsyncMock(return_value=None) mock_process.terminate = Mock() mock_process.kill = Mock() mock_process.stderr = AsyncMock() recorder._process = mock_process recorder._start_time = 1000.0 # Mock start time # Create a mock task that's done mock_task = MagicMock() mock_task.done = Mock(return_value=True) recorder._monitor_task = mock_task await recorder.stop() # Verify stdin quit was attempted mock_stdin.write.assert_called_once_with(b"q\n") # Verify process was set to None assert recorder._process is None assert recorder._start_time is None @pytest.mark.asyncio async def test_stop_when_idle(self, settings): """Stopping an idle recorder is a no-op.""" recorder = PortRecorder(0, settings) # Should not raise await recorder.stop() assert recorder._process is None # RecorderManager Tests class TestRecorderManager: """Test suite for RecorderManager.""" def test_initialize(self, settings): """Manager creates correct number of recorders.""" manager = RecorderManager(settings) manager.initialize() assert len(manager._recorders) == 4 assert all(isinstance(r, PortRecorder) for r in manager._recorders.values()) assert all(manager._recorders[i].port_index == i for i in range(4)) def test_get_status_idle(self, settings): """Get status for idle port returns idle status.""" manager = RecorderManager(settings) manager.initialize() status = manager.get_status(0) assert isinstance(status, PortStatus) assert status.port_index == 0 assert status.is_recording is False def test_get_all_status(self, settings): """Get all status returns list for each port.""" manager = RecorderManager(settings) manager.initialize() statuses = manager.get_all_status() assert len(statuses) == 4 assert all(isinstance(s, PortStatus) for s in statuses) assert [s.port_index for s in statuses] == [0, 1, 2, 3] @pytest.mark.asyncio async def test_start_recording(self, settings, h264_config): """Start recording on a valid port.""" manager = RecorderManager(settings) manager.initialize() # Mock subprocess creation with proper setup mock_stdin = AsyncMock() mock_stdin.is_closing.return_value = False mock_stdin.drain = AsyncMock() mock_process = AsyncMock() mock_process.pid = 12345 mock_process.done.return_value = False mock_process.stdin = mock_stdin mock_process.stderr = AsyncMock() with patch("asyncio.create_subprocess_exec", return_value=mock_process): await manager.start_recording(2, h264_config) # Verify recording started by checking the recorder has a process assert manager._recorders[2]._process is not None @pytest.mark.asyncio async def test_start_recording_invalid_port(self, settings, h264_config): """Starting on invalid port raises ValueError.""" manager = RecorderManager(settings) manager.initialize() with pytest.raises(ValueError, match="Invalid port index"): await manager.start_recording(99, h264_config) @pytest.mark.asyncio async def test_stop_recording(self, settings, h264_config): """Stop recording on a port.""" manager = RecorderManager(settings) manager.initialize() # First start recording mock_stdin = MagicMock() mock_stdin.is_closing = Mock(return_value=False) mock_stdin.write = Mock() mock_stdin.drain = AsyncMock() mock_process = MagicMock() mock_process.pid = 12345 mock_process.done = Mock(return_value=False) mock_process.stdin = mock_stdin mock_process.wait = AsyncMock(return_value=None) mock_process.stderr = AsyncMock() # Create a mock task that's done mock_task = MagicMock() mock_task.done = Mock(return_value=True) with patch("asyncio.create_subprocess_exec", return_value=mock_process): await manager.start_recording(2, h264_config) # Replace monitor task with completed mock manager._recorders[2]._monitor_task = mock_task # Then stop await manager.stop_recording(2) # Verify recording stopped status = manager.get_status(2) assert status.is_recording is False @pytest.mark.asyncio async def test_stop_recording_invalid_port(self, settings): """Stopping on invalid port raises ValueError.""" manager = RecorderManager(settings) manager.initialize() with pytest.raises(ValueError, match="Invalid port index"): await manager.stop_recording(99) def test_get_status_invalid_port(self, settings): """Getting status for invalid port raises ValueError.""" manager = RecorderManager(settings) manager.initialize() with pytest.raises(ValueError, match="Invalid port index"): manager.get_status(99)