Add backend/tests/test_srt_streamer.py

This commit is contained in:
Zac Gaetano 2026-04-14 09:21:13 -04:00
parent 9fd23fabb6
commit 3934f23ccd

View file

@ -0,0 +1,436 @@
"""Tests for SRT streamer functionality."""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, MagicMock, patch
from backend.app.config import Settings
from backend.app.models import RecorderConfig, CodecType
from backend.app.recorders.srt_streamer import SRTStreamer
# Fixtures
@pytest.fixture
def settings():
"""Create test settings."""
return Settings(
ffmpeg_path="/usr/bin/ffmpeg",
recording_dir="/recordings",
deltacast_port_count=4,
srt_enabled=True,
srt_latency=5000,
)
@pytest.fixture
def srt_config():
"""Create a RecorderConfig with SRT enabled."""
return RecorderConfig(
port_index=0,
codec=CodecType.H264,
bitrate=8,
quality_profile="mq",
recording_path="/recordings/test_{timestamp}.mxf",
srt_enabled=True,
srt_destination="srt://192.168.1.10:9001",
)
@pytest.fixture
def srt_config_with_latency():
"""Create a RecorderConfig with SRT and latency param already set."""
return RecorderConfig(
port_index=1,
codec=CodecType.H264,
bitrate=10,
quality_profile="lq",
recording_path="/recordings/test_{timestamp}.mxf",
srt_enabled=True,
srt_destination="srt://192.168.1.20:9002?latency=10000",
)
@pytest.fixture
def srt_disabled_config():
"""Create a RecorderConfig with SRT disabled."""
return RecorderConfig(
port_index=2,
codec=CodecType.H264,
bitrate=8,
quality_profile="mq",
recording_path="/recordings/test_{timestamp}.mxf",
srt_enabled=False,
srt_destination=None,
)
# Initial State Tests
class TestSRTStreamerInitialState:
"""Test suite for SRTStreamer initial state."""
def test_srt_streamer_initial_state(self, settings):
"""is_streaming is False initially."""
streamer = SRTStreamer(0, settings)
assert streamer.is_streaming is False
assert streamer._process is None
assert streamer._config is None
assert streamer.port_index == 0
# Command Building Tests
class TestSRTStreamerCommandBuilder:
"""Test suite for SRT command building."""
def test_build_srt_command_structure(self, settings, srt_config):
"""Command includes deltacast input and mpegts output."""
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(srt_config)
# Check for required components
assert "/usr/bin/ffmpeg" in command
assert "-f" in command
assert "deltacast" in command
assert "-i" in command
assert "deltacast://0" in command
assert "mpegts" in command
def test_build_srt_command_includes_destination(self, settings, srt_config):
"""SRT destination URL is in command."""
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(srt_config)
# SRT URL should be in command (possibly with latency param added)
assert any("srt://" in arg for arg in command)
assert any("192.168.1.10" in arg for arg in command)
def test_build_srt_command_h264_codec(self, settings, srt_config):
"""H264 codec appears in command."""
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(srt_config)
assert "-c:v" in command
assert "libx264" in command
assert "-b:v" in command
assert "8M" in command
def test_build_srt_command_prores_codec(self, settings):
"""ProRes codec appears in command."""
config = RecorderConfig(
port_index=0,
codec=CodecType.PRORES,
bitrate=0,
quality_profile="hq",
recording_path="/recordings/test.mxf",
srt_enabled=True,
srt_destination="srt://localhost:9001",
)
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(config)
assert "-c:v" in command
assert "prores_ks" in command
assert "-profile:v" in command
assert "3" in command
def test_build_srt_command_dnxhd_codec(self, settings):
"""DNxHD codec appears in command."""
config = RecorderConfig(
port_index=0,
codec=CodecType.DNXHD,
bitrate=185,
quality_profile="mq",
recording_path="/recordings/test.mxf",
srt_enabled=True,
srt_destination="srt://localhost:9001",
)
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(config)
assert "-c:v" in command
assert "dnxhd" in command
assert "-b:v" in command
assert "185M" in command
def test_build_srt_command_uncompressed_codec(self, settings):
"""Uncompressed codec appears in command."""
config = RecorderConfig(
port_index=0,
codec=CodecType.UNCOMPRESSED,
bitrate=0,
quality_profile="hq",
recording_path="/recordings/test.mxf",
srt_enabled=True,
srt_destination="srt://localhost:9001",
)
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(config)
assert "-c:v" in command
assert "rawvideo" in command
assert "-pix_fmt" in command
assert "uyvy422" in command
# Latency Parameter Tests
class TestSRTLatencyParameter:
"""Test suite for SRT latency parameter handling."""
def test_add_latency_to_srt_url_without_existing_params(self, settings, srt_config):
"""Latency is added to URL without existing params."""
streamer = SRTStreamer(0, settings)
url = streamer._add_latency_to_srt_url("srt://localhost:9001")
assert "latency=5000" in url
assert "srt://localhost:9001" in url
def test_add_latency_to_srt_url_with_existing_params(self, settings, srt_config_with_latency):
"""Existing latency param is overridden."""
streamer = SRTStreamer(0, settings)
url = streamer._add_latency_to_srt_url("srt://localhost:9001?latency=10000")
assert "latency=5000" in url
assert "10000" not in url
def test_add_latency_to_srt_url_preserves_other_params(self, settings):
"""Other URL parameters are preserved."""
streamer = SRTStreamer(0, settings)
url = streamer._add_latency_to_srt_url("srt://localhost:9001?mode=listener&timeout=5000")
assert "latency=5000" in url
assert "mode=listener" in url
assert "timeout=5000" in url
def test_srt_command_includes_latency(self, settings, srt_config):
"""Built command includes latency parameter."""
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(srt_config)
# Find the SRT destination in the command
srt_arg = None
for arg in command:
if arg.startswith("srt://"):
srt_arg = arg
break
assert srt_arg is not None
assert "latency=5000" in srt_arg
# Start Stream Tests
class TestStartStream:
"""Test suite for start_stream method."""
@pytest.mark.asyncio
async def test_start_stream_raises_when_srt_disabled(self, settings, srt_disabled_config):
"""ValueError raised when config.srt_enabled is False."""
streamer = SRTStreamer(0, settings)
with pytest.raises(ValueError, match="SRT is not enabled"):
await streamer.start_stream(srt_disabled_config)
@pytest.mark.asyncio
async def test_start_stream_raises_when_destination_empty(self, settings):
"""ValueError raised when srt_destination is not set."""
config = RecorderConfig(
port_index=0,
codec=CodecType.H264,
bitrate=8,
quality_profile="mq",
recording_path="/recordings/test.mxf",
srt_enabled=True,
srt_destination=None,
)
streamer = SRTStreamer(0, settings)
with pytest.raises(ValueError, match="SRT destination is not set"):
await streamer.start_stream(config)
@pytest.mark.asyncio
async def test_start_stream_raises_when_already_streaming(self, settings, srt_config):
"""RuntimeError raised when already streaming."""
streamer = SRTStreamer(0, settings)
streamer._is_streaming = True
with pytest.raises(RuntimeError, match="already streaming SRT"):
await streamer.start_stream(srt_config)
@pytest.mark.asyncio
async def test_start_stream_sets_streaming_state(self, settings, srt_config):
"""is_streaming becomes True after start_stream (mocked subprocess)."""
streamer = SRTStreamer(0, settings)
# Mock the subprocess creation
mock_process = AsyncMock()
mock_process.pid = 12345
mock_process.stdin = AsyncMock()
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await streamer.start_stream(srt_config)
assert streamer.is_streaming is True
assert streamer._process is not None
assert streamer._config == srt_config
@pytest.mark.asyncio
async def test_start_stream_spawns_subprocess(self, settings, srt_config):
"""Subprocess is created with correct arguments."""
streamer = SRTStreamer(0, settings)
mock_process = AsyncMock()
mock_process.pid = 12345
mock_process.stdin = AsyncMock()
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_create:
await streamer.start_stream(srt_config)
# Verify subprocess was created with stdin and stderr pipes
mock_create.assert_called_once()
args = mock_create.call_args
assert args[1]["stdin"] == asyncio.subprocess.PIPE
assert args[1]["stderr"] == asyncio.subprocess.PIPE
assert args[1]["stdout"] == asyncio.subprocess.PIPE
# Stop Stream Tests
class TestStopStream:
"""Test suite for stop_stream method."""
@pytest.mark.asyncio
async def test_stop_stream_noop_when_not_streaming(self, settings):
"""stop_stream does nothing if not streaming."""
streamer = SRTStreamer(0, settings)
assert streamer.is_streaming is False
# Should not raise
await streamer.stop_stream()
assert streamer.is_streaming is False
@pytest.mark.asyncio
async def test_stop_stream_clears_streaming_state(self, settings, srt_config):
"""is_streaming becomes False after stop_stream."""
streamer = SRTStreamer(0, settings)
# Setup: start the stream
mock_process = AsyncMock()
mock_process.pid = 12345
mock_process.stdin = AsyncMock()
mock_process.stdin.is_closing.return_value = False
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await streamer.start_stream(srt_config)
assert streamer.is_streaming is True
# Teardown: stop the stream with graceful shutdown
mock_process.wait = AsyncMock(return_value=None)
await streamer.stop_stream()
assert streamer.is_streaming is False
assert streamer._process is None
@pytest.mark.asyncio
async def test_stop_stream_sends_quit_command(self, settings, srt_config):
"""stop_stream sends 'q' to stdin."""
streamer = SRTStreamer(0, settings)
# Setup mock process
mock_stdin = MagicMock()
mock_stdin.is_closing = Mock(return_value=False)
mock_stdin.write = Mock()
mock_stdin.drain = AsyncMock()
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.stdin = mock_stdin
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
mock_process.wait = AsyncMock(return_value=None)
streamer._process = mock_process
streamer._is_streaming = True
await streamer.stop_stream()
# Verify quit command was sent
mock_stdin.write.assert_called_once_with(b"q\n")
@pytest.mark.asyncio
async def test_stop_stream_escalates_to_sigterm(self, settings, srt_config):
"""stop_stream sends SIGTERM if graceful quit fails."""
streamer = SRTStreamer(0, settings)
# Setup mock process that doesn't exit gracefully
mock_stdin = MagicMock()
mock_stdin.is_closing = Mock(return_value=False)
mock_stdin.write = Mock()
mock_stdin.drain = AsyncMock()
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.stdin = mock_stdin
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
# First wait times out, second succeeds
mock_process.wait = AsyncMock(side_effect=[
asyncio.TimeoutError(),
None
])
mock_process.terminate = Mock()
streamer._process = mock_process
streamer._is_streaming = True
await streamer.stop_stream()
# Verify SIGTERM was sent
mock_process.terminate.assert_called_once()
assert streamer.is_streaming is False
@pytest.mark.asyncio
async def test_stop_stream_escalates_to_sigkill(self, settings, srt_config):
"""stop_stream sends SIGKILL if SIGTERM fails."""
streamer = SRTStreamer(0, settings)
# Setup mock process that doesn't exit with SIGTERM
mock_stdin = MagicMock()
mock_stdin.is_closing = Mock(return_value=False)
mock_stdin.write = Mock()
mock_stdin.drain = AsyncMock()
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.stdin = mock_stdin
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
# Both waits timeout
mock_process.wait = AsyncMock(side_effect=[
asyncio.TimeoutError(),
asyncio.TimeoutError(),
None # Wait succeeds after kill
])
mock_process.terminate = Mock()
mock_process.kill = Mock()
streamer._process = mock_process
streamer._is_streaming = True
await streamer.stop_stream()
# Verify SIGKILL was sent
mock_process.kill.assert_called_once()
assert streamer.is_streaming is False