From 3934f23ccd0b3b6e717a1ba0ec1b859d49ba0dcd Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Tue, 14 Apr 2026 09:21:13 -0400 Subject: [PATCH] Add backend/tests/test_srt_streamer.py --- backend/tests/test_srt_streamer.py | 436 +++++++++++++++++++++++++++++ 1 file changed, 436 insertions(+) create mode 100644 backend/tests/test_srt_streamer.py diff --git a/backend/tests/test_srt_streamer.py b/backend/tests/test_srt_streamer.py new file mode 100644 index 0000000..6e45712 --- /dev/null +++ b/backend/tests/test_srt_streamer.py @@ -0,0 +1,436 @@ +"""Tests for SRT streamer functionality.""" + +import pytest +import asyncio +from unittest.mock import Mock, AsyncMock, MagicMock, patch +from backend.app.config import Settings +from backend.app.models import RecorderConfig, CodecType +from backend.app.recorders.srt_streamer import SRTStreamer + + +# Fixtures +@pytest.fixture +def settings(): + """Create test settings.""" + return Settings( + ffmpeg_path="/usr/bin/ffmpeg", + recording_dir="/recordings", + deltacast_port_count=4, + srt_enabled=True, + srt_latency=5000, + ) + + +@pytest.fixture +def srt_config(): + """Create a RecorderConfig with SRT enabled.""" + return RecorderConfig( + port_index=0, + codec=CodecType.H264, + bitrate=8, + quality_profile="mq", + recording_path="/recordings/test_{timestamp}.mxf", + srt_enabled=True, + srt_destination="srt://192.168.1.10:9001", + ) + + +@pytest.fixture +def srt_config_with_latency(): + """Create a RecorderConfig with SRT and latency param already set.""" + return RecorderConfig( + port_index=1, + codec=CodecType.H264, + bitrate=10, + quality_profile="lq", + recording_path="/recordings/test_{timestamp}.mxf", + srt_enabled=True, + srt_destination="srt://192.168.1.20:9002?latency=10000", + ) + + +@pytest.fixture +def srt_disabled_config(): + """Create a RecorderConfig with SRT disabled.""" + return RecorderConfig( + port_index=2, + codec=CodecType.H264, + bitrate=8, + quality_profile="mq", + recording_path="/recordings/test_{timestamp}.mxf", + srt_enabled=False, + srt_destination=None, + ) + + +# Initial State Tests + +class TestSRTStreamerInitialState: + """Test suite for SRTStreamer initial state.""" + + def test_srt_streamer_initial_state(self, settings): + """is_streaming is False initially.""" + streamer = SRTStreamer(0, settings) + + assert streamer.is_streaming is False + assert streamer._process is None + assert streamer._config is None + assert streamer.port_index == 0 + + +# Command Building Tests + +class TestSRTStreamerCommandBuilder: + """Test suite for SRT command building.""" + + def test_build_srt_command_structure(self, settings, srt_config): + """Command includes deltacast input and mpegts output.""" + streamer = SRTStreamer(0, settings) + command = streamer._build_srt_command(srt_config) + + # Check for required components + assert "/usr/bin/ffmpeg" in command + assert "-f" in command + assert "deltacast" in command + assert "-i" in command + assert "deltacast://0" in command + assert "mpegts" in command + + def test_build_srt_command_includes_destination(self, settings, srt_config): + """SRT destination URL is in command.""" + streamer = SRTStreamer(0, settings) + command = streamer._build_srt_command(srt_config) + + # SRT URL should be in command (possibly with latency param added) + assert any("srt://" in arg for arg in command) + assert any("192.168.1.10" in arg for arg in command) + + def test_build_srt_command_h264_codec(self, settings, srt_config): + """H264 codec appears in command.""" + streamer = SRTStreamer(0, settings) + command = streamer._build_srt_command(srt_config) + + assert "-c:v" in command + assert "libx264" in command + assert "-b:v" in command + assert "8M" in command + + def test_build_srt_command_prores_codec(self, settings): + """ProRes codec appears in command.""" + config = RecorderConfig( + port_index=0, + codec=CodecType.PRORES, + bitrate=0, + quality_profile="hq", + recording_path="/recordings/test.mxf", + srt_enabled=True, + srt_destination="srt://localhost:9001", + ) + + streamer = SRTStreamer(0, settings) + command = streamer._build_srt_command(config) + + assert "-c:v" in command + assert "prores_ks" in command + assert "-profile:v" in command + assert "3" in command + + def test_build_srt_command_dnxhd_codec(self, settings): + """DNxHD codec appears in command.""" + config = RecorderConfig( + port_index=0, + codec=CodecType.DNXHD, + bitrate=185, + quality_profile="mq", + recording_path="/recordings/test.mxf", + srt_enabled=True, + srt_destination="srt://localhost:9001", + ) + + streamer = SRTStreamer(0, settings) + command = streamer._build_srt_command(config) + + assert "-c:v" in command + assert "dnxhd" in command + assert "-b:v" in command + assert "185M" in command + + def test_build_srt_command_uncompressed_codec(self, settings): + """Uncompressed codec appears in command.""" + config = RecorderConfig( + port_index=0, + codec=CodecType.UNCOMPRESSED, + bitrate=0, + quality_profile="hq", + recording_path="/recordings/test.mxf", + srt_enabled=True, + srt_destination="srt://localhost:9001", + ) + + streamer = SRTStreamer(0, settings) + command = streamer._build_srt_command(config) + + assert "-c:v" in command + assert "rawvideo" in command + assert "-pix_fmt" in command + assert "uyvy422" in command + + +# Latency Parameter Tests + +class TestSRTLatencyParameter: + """Test suite for SRT latency parameter handling.""" + + def test_add_latency_to_srt_url_without_existing_params(self, settings, srt_config): + """Latency is added to URL without existing params.""" + streamer = SRTStreamer(0, settings) + url = streamer._add_latency_to_srt_url("srt://localhost:9001") + + assert "latency=5000" in url + assert "srt://localhost:9001" in url + + def test_add_latency_to_srt_url_with_existing_params(self, settings, srt_config_with_latency): + """Existing latency param is overridden.""" + streamer = SRTStreamer(0, settings) + url = streamer._add_latency_to_srt_url("srt://localhost:9001?latency=10000") + + assert "latency=5000" in url + assert "10000" not in url + + def test_add_latency_to_srt_url_preserves_other_params(self, settings): + """Other URL parameters are preserved.""" + streamer = SRTStreamer(0, settings) + url = streamer._add_latency_to_srt_url("srt://localhost:9001?mode=listener&timeout=5000") + + assert "latency=5000" in url + assert "mode=listener" in url + assert "timeout=5000" in url + + def test_srt_command_includes_latency(self, settings, srt_config): + """Built command includes latency parameter.""" + streamer = SRTStreamer(0, settings) + command = streamer._build_srt_command(srt_config) + + # Find the SRT destination in the command + srt_arg = None + for arg in command: + if arg.startswith("srt://"): + srt_arg = arg + break + + assert srt_arg is not None + assert "latency=5000" in srt_arg + + +# Start Stream Tests + +class TestStartStream: + """Test suite for start_stream method.""" + + @pytest.mark.asyncio + async def test_start_stream_raises_when_srt_disabled(self, settings, srt_disabled_config): + """ValueError raised when config.srt_enabled is False.""" + streamer = SRTStreamer(0, settings) + + with pytest.raises(ValueError, match="SRT is not enabled"): + await streamer.start_stream(srt_disabled_config) + + @pytest.mark.asyncio + async def test_start_stream_raises_when_destination_empty(self, settings): + """ValueError raised when srt_destination is not set.""" + config = RecorderConfig( + port_index=0, + codec=CodecType.H264, + bitrate=8, + quality_profile="mq", + recording_path="/recordings/test.mxf", + srt_enabled=True, + srt_destination=None, + ) + streamer = SRTStreamer(0, settings) + + with pytest.raises(ValueError, match="SRT destination is not set"): + await streamer.start_stream(config) + + @pytest.mark.asyncio + async def test_start_stream_raises_when_already_streaming(self, settings, srt_config): + """RuntimeError raised when already streaming.""" + streamer = SRTStreamer(0, settings) + streamer._is_streaming = True + + with pytest.raises(RuntimeError, match="already streaming SRT"): + await streamer.start_stream(srt_config) + + @pytest.mark.asyncio + async def test_start_stream_sets_streaming_state(self, settings, srt_config): + """is_streaming becomes True after start_stream (mocked subprocess).""" + streamer = SRTStreamer(0, settings) + + # Mock the subprocess creation + mock_process = AsyncMock() + mock_process.pid = 12345 + mock_process.stdin = AsyncMock() + mock_process.stderr = AsyncMock() + mock_process.stdout = AsyncMock() + + with patch("asyncio.create_subprocess_exec", return_value=mock_process): + await streamer.start_stream(srt_config) + + assert streamer.is_streaming is True + assert streamer._process is not None + assert streamer._config == srt_config + + @pytest.mark.asyncio + async def test_start_stream_spawns_subprocess(self, settings, srt_config): + """Subprocess is created with correct arguments.""" + streamer = SRTStreamer(0, settings) + + mock_process = AsyncMock() + mock_process.pid = 12345 + mock_process.stdin = AsyncMock() + mock_process.stderr = AsyncMock() + mock_process.stdout = AsyncMock() + + with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_create: + await streamer.start_stream(srt_config) + + # Verify subprocess was created with stdin and stderr pipes + mock_create.assert_called_once() + args = mock_create.call_args + assert args[1]["stdin"] == asyncio.subprocess.PIPE + assert args[1]["stderr"] == asyncio.subprocess.PIPE + assert args[1]["stdout"] == asyncio.subprocess.PIPE + + +# Stop Stream Tests + +class TestStopStream: + """Test suite for stop_stream method.""" + + @pytest.mark.asyncio + async def test_stop_stream_noop_when_not_streaming(self, settings): + """stop_stream does nothing if not streaming.""" + streamer = SRTStreamer(0, settings) + assert streamer.is_streaming is False + + # Should not raise + await streamer.stop_stream() + assert streamer.is_streaming is False + + @pytest.mark.asyncio + async def test_stop_stream_clears_streaming_state(self, settings, srt_config): + """is_streaming becomes False after stop_stream.""" + streamer = SRTStreamer(0, settings) + + # Setup: start the stream + mock_process = AsyncMock() + mock_process.pid = 12345 + mock_process.stdin = AsyncMock() + mock_process.stdin.is_closing.return_value = False + mock_process.stderr = AsyncMock() + mock_process.stdout = AsyncMock() + + with patch("asyncio.create_subprocess_exec", return_value=mock_process): + await streamer.start_stream(srt_config) + assert streamer.is_streaming is True + + # Teardown: stop the stream with graceful shutdown + mock_process.wait = AsyncMock(return_value=None) + await streamer.stop_stream() + + assert streamer.is_streaming is False + assert streamer._process is None + + @pytest.mark.asyncio + async def test_stop_stream_sends_quit_command(self, settings, srt_config): + """stop_stream sends 'q' to stdin.""" + streamer = SRTStreamer(0, settings) + + # Setup mock process + mock_stdin = MagicMock() + mock_stdin.is_closing = Mock(return_value=False) + mock_stdin.write = Mock() + mock_stdin.drain = AsyncMock() + + mock_process = MagicMock() + mock_process.pid = 12345 + mock_process.stdin = mock_stdin + mock_process.stderr = AsyncMock() + mock_process.stdout = AsyncMock() + mock_process.wait = AsyncMock(return_value=None) + + streamer._process = mock_process + streamer._is_streaming = True + + await streamer.stop_stream() + + # Verify quit command was sent + mock_stdin.write.assert_called_once_with(b"q\n") + + @pytest.mark.asyncio + async def test_stop_stream_escalates_to_sigterm(self, settings, srt_config): + """stop_stream sends SIGTERM if graceful quit fails.""" + streamer = SRTStreamer(0, settings) + + # Setup mock process that doesn't exit gracefully + mock_stdin = MagicMock() + mock_stdin.is_closing = Mock(return_value=False) + mock_stdin.write = Mock() + mock_stdin.drain = AsyncMock() + + mock_process = MagicMock() + mock_process.pid = 12345 + mock_process.stdin = mock_stdin + mock_process.stderr = AsyncMock() + mock_process.stdout = AsyncMock() + + # First wait times out, second succeeds + mock_process.wait = AsyncMock(side_effect=[ + asyncio.TimeoutError(), + None + ]) + mock_process.terminate = Mock() + + streamer._process = mock_process + streamer._is_streaming = True + + await streamer.stop_stream() + + # Verify SIGTERM was sent + mock_process.terminate.assert_called_once() + assert streamer.is_streaming is False + + @pytest.mark.asyncio + async def test_stop_stream_escalates_to_sigkill(self, settings, srt_config): + """stop_stream sends SIGKILL if SIGTERM fails.""" + streamer = SRTStreamer(0, settings) + + # Setup mock process that doesn't exit with SIGTERM + mock_stdin = MagicMock() + mock_stdin.is_closing = Mock(return_value=False) + mock_stdin.write = Mock() + mock_stdin.drain = AsyncMock() + + mock_process = MagicMock() + mock_process.pid = 12345 + mock_process.stdin = mock_stdin + mock_process.stderr = AsyncMock() + mock_process.stdout = AsyncMock() + + # Both waits timeout + mock_process.wait = AsyncMock(side_effect=[ + asyncio.TimeoutError(), + asyncio.TimeoutError(), + None # Wait succeeds after kill + ]) + mock_process.terminate = Mock() + mock_process.kill = Mock() + + streamer._process = mock_process + streamer._is_streaming = True + + await streamer.stop_stream() + + # Verify SIGKILL was sent + mock_process.kill.assert_called_once() + assert streamer.is_streaming is False