deltacast-sdi-recorder/backend/tests/test_srt_streamer.py

436 lines
14 KiB
Python

"""Tests for SRT streamer functionality."""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, MagicMock, patch
from backend.app.config import Settings
from backend.app.models import RecorderConfig, CodecType
from backend.app.recorders.srt_streamer import SRTStreamer
# Fixtures
@pytest.fixture
def settings():
"""Create test settings."""
return Settings(
ffmpeg_path="/usr/bin/ffmpeg",
recording_dir="/recordings",
deltacast_port_count=4,
srt_enabled=True,
srt_latency=5000,
)
@pytest.fixture
def srt_config():
"""Create a RecorderConfig with SRT enabled."""
return RecorderConfig(
port_index=0,
codec=CodecType.H264,
bitrate=8,
quality_profile="mq",
recording_path="/recordings/test_{timestamp}.mxf",
srt_enabled=True,
srt_destination="srt://192.168.1.10:9001",
)
@pytest.fixture
def srt_config_with_latency():
"""Create a RecorderConfig with SRT and latency param already set."""
return RecorderConfig(
port_index=1,
codec=CodecType.H264,
bitrate=10,
quality_profile="lq",
recording_path="/recordings/test_{timestamp}.mxf",
srt_enabled=True,
srt_destination="srt://192.168.1.20:9002?latency=10000",
)
@pytest.fixture
def srt_disabled_config():
"""Create a RecorderConfig with SRT disabled."""
return RecorderConfig(
port_index=2,
codec=CodecType.H264,
bitrate=8,
quality_profile="mq",
recording_path="/recordings/test_{timestamp}.mxf",
srt_enabled=False,
srt_destination=None,
)
# Initial State Tests
class TestSRTStreamerInitialState:
"""Test suite for SRTStreamer initial state."""
def test_srt_streamer_initial_state(self, settings):
"""is_streaming is False initially."""
streamer = SRTStreamer(0, settings)
assert streamer.is_streaming is False
assert streamer._process is None
assert streamer._config is None
assert streamer.port_index == 0
# Command Building Tests
class TestSRTStreamerCommandBuilder:
"""Test suite for SRT command building."""
def test_build_srt_command_structure(self, settings, srt_config):
"""Command includes deltacast input and mpegts output."""
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(srt_config)
# Check for required components
assert "/usr/bin/ffmpeg" in command
assert "-f" in command
assert "deltacast" in command
assert "-i" in command
assert "deltacast://0" in command
assert "mpegts" in command
def test_build_srt_command_includes_destination(self, settings, srt_config):
"""SRT destination URL is in command."""
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(srt_config)
# SRT URL should be in command (possibly with latency param added)
assert any("srt://" in arg for arg in command)
assert any("192.168.1.10" in arg for arg in command)
def test_build_srt_command_h264_codec(self, settings, srt_config):
"""H264 codec appears in command."""
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(srt_config)
assert "-c:v" in command
assert "libx264" in command
assert "-b:v" in command
assert "8M" in command
def test_build_srt_command_prores_codec(self, settings):
"""ProRes codec appears in command."""
config = RecorderConfig(
port_index=0,
codec=CodecType.PRORES,
bitrate=0,
quality_profile="hq",
recording_path="/recordings/test.mxf",
srt_enabled=True,
srt_destination="srt://localhost:9001",
)
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(config)
assert "-c:v" in command
assert "prores_ks" in command
assert "-profile:v" in command
assert "3" in command
def test_build_srt_command_dnxhd_codec(self, settings):
"""DNxHD codec appears in command."""
config = RecorderConfig(
port_index=0,
codec=CodecType.DNXHD,
bitrate=185,
quality_profile="mq",
recording_path="/recordings/test.mxf",
srt_enabled=True,
srt_destination="srt://localhost:9001",
)
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(config)
assert "-c:v" in command
assert "dnxhd" in command
assert "-b:v" in command
assert "185M" in command
def test_build_srt_command_uncompressed_codec(self, settings):
"""Uncompressed codec appears in command."""
config = RecorderConfig(
port_index=0,
codec=CodecType.UNCOMPRESSED,
bitrate=0,
quality_profile="hq",
recording_path="/recordings/test.mxf",
srt_enabled=True,
srt_destination="srt://localhost:9001",
)
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(config)
assert "-c:v" in command
assert "rawvideo" in command
assert "-pix_fmt" in command
assert "uyvy422" in command
# Latency Parameter Tests
class TestSRTLatencyParameter:
"""Test suite for SRT latency parameter handling."""
def test_add_latency_to_srt_url_without_existing_params(self, settings, srt_config):
"""Latency is added to URL without existing params."""
streamer = SRTStreamer(0, settings)
url = streamer._add_latency_to_srt_url("srt://localhost:9001")
assert "latency=5000" in url
assert "srt://localhost:9001" in url
def test_add_latency_to_srt_url_with_existing_params(self, settings, srt_config_with_latency):
"""Existing latency param is overridden."""
streamer = SRTStreamer(0, settings)
url = streamer._add_latency_to_srt_url("srt://localhost:9001?latency=10000")
assert "latency=5000" in url
assert "10000" not in url
def test_add_latency_to_srt_url_preserves_other_params(self, settings):
"""Other URL parameters are preserved."""
streamer = SRTStreamer(0, settings)
url = streamer._add_latency_to_srt_url("srt://localhost:9001?mode=listener&timeout=5000")
assert "latency=5000" in url
assert "mode=listener" in url
assert "timeout=5000" in url
def test_srt_command_includes_latency(self, settings, srt_config):
"""Built command includes latency parameter."""
streamer = SRTStreamer(0, settings)
command = streamer._build_srt_command(srt_config)
# Find the SRT destination in the command
srt_arg = None
for arg in command:
if arg.startswith("srt://"):
srt_arg = arg
break
assert srt_arg is not None
assert "latency=5000" in srt_arg
# Start Stream Tests
class TestStartStream:
"""Test suite for start_stream method."""
@pytest.mark.asyncio
async def test_start_stream_raises_when_srt_disabled(self, settings, srt_disabled_config):
"""ValueError raised when config.srt_enabled is False."""
streamer = SRTStreamer(0, settings)
with pytest.raises(ValueError, match="SRT is not enabled"):
await streamer.start_stream(srt_disabled_config)
@pytest.mark.asyncio
async def test_start_stream_raises_when_destination_empty(self, settings):
"""ValueError raised when srt_destination is not set."""
config = RecorderConfig(
port_index=0,
codec=CodecType.H264,
bitrate=8,
quality_profile="mq",
recording_path="/recordings/test.mxf",
srt_enabled=True,
srt_destination=None,
)
streamer = SRTStreamer(0, settings)
with pytest.raises(ValueError, match="SRT destination is not set"):
await streamer.start_stream(config)
@pytest.mark.asyncio
async def test_start_stream_raises_when_already_streaming(self, settings, srt_config):
"""RuntimeError raised when already streaming."""
streamer = SRTStreamer(0, settings)
streamer._is_streaming = True
with pytest.raises(RuntimeError, match="already streaming SRT"):
await streamer.start_stream(srt_config)
@pytest.mark.asyncio
async def test_start_stream_sets_streaming_state(self, settings, srt_config):
"""is_streaming becomes True after start_stream (mocked subprocess)."""
streamer = SRTStreamer(0, settings)
# Mock the subprocess creation
mock_process = AsyncMock()
mock_process.pid = 12345
mock_process.stdin = AsyncMock()
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await streamer.start_stream(srt_config)
assert streamer.is_streaming is True
assert streamer._process is not None
assert streamer._config == srt_config
@pytest.mark.asyncio
async def test_start_stream_spawns_subprocess(self, settings, srt_config):
"""Subprocess is created with correct arguments."""
streamer = SRTStreamer(0, settings)
mock_process = AsyncMock()
mock_process.pid = 12345
mock_process.stdin = AsyncMock()
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_create:
await streamer.start_stream(srt_config)
# Verify subprocess was created with stdin and stderr pipes
mock_create.assert_called_once()
args = mock_create.call_args
assert args[1]["stdin"] == asyncio.subprocess.PIPE
assert args[1]["stderr"] == asyncio.subprocess.PIPE
assert args[1]["stdout"] == asyncio.subprocess.PIPE
# Stop Stream Tests
class TestStopStream:
"""Test suite for stop_stream method."""
@pytest.mark.asyncio
async def test_stop_stream_noop_when_not_streaming(self, settings):
"""stop_stream does nothing if not streaming."""
streamer = SRTStreamer(0, settings)
assert streamer.is_streaming is False
# Should not raise
await streamer.stop_stream()
assert streamer.is_streaming is False
@pytest.mark.asyncio
async def test_stop_stream_clears_streaming_state(self, settings, srt_config):
"""is_streaming becomes False after stop_stream."""
streamer = SRTStreamer(0, settings)
# Setup: start the stream
mock_process = AsyncMock()
mock_process.pid = 12345
mock_process.stdin = AsyncMock()
mock_process.stdin.is_closing.return_value = False
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await streamer.start_stream(srt_config)
assert streamer.is_streaming is True
# Teardown: stop the stream with graceful shutdown
mock_process.wait = AsyncMock(return_value=None)
await streamer.stop_stream()
assert streamer.is_streaming is False
assert streamer._process is None
@pytest.mark.asyncio
async def test_stop_stream_sends_quit_command(self, settings, srt_config):
"""stop_stream sends 'q' to stdin."""
streamer = SRTStreamer(0, settings)
# Setup mock process
mock_stdin = MagicMock()
mock_stdin.is_closing = Mock(return_value=False)
mock_stdin.write = Mock()
mock_stdin.drain = AsyncMock()
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.stdin = mock_stdin
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
mock_process.wait = AsyncMock(return_value=None)
streamer._process = mock_process
streamer._is_streaming = True
await streamer.stop_stream()
# Verify quit command was sent
mock_stdin.write.assert_called_once_with(b"q\n")
@pytest.mark.asyncio
async def test_stop_stream_escalates_to_sigterm(self, settings, srt_config):
"""stop_stream sends SIGTERM if graceful quit fails."""
streamer = SRTStreamer(0, settings)
# Setup mock process that doesn't exit gracefully
mock_stdin = MagicMock()
mock_stdin.is_closing = Mock(return_value=False)
mock_stdin.write = Mock()
mock_stdin.drain = AsyncMock()
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.stdin = mock_stdin
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
# First wait times out, second succeeds
mock_process.wait = AsyncMock(side_effect=[
asyncio.TimeoutError(),
None
])
mock_process.terminate = Mock()
streamer._process = mock_process
streamer._is_streaming = True
await streamer.stop_stream()
# Verify SIGTERM was sent
mock_process.terminate.assert_called_once()
assert streamer.is_streaming is False
@pytest.mark.asyncio
async def test_stop_stream_escalates_to_sigkill(self, settings, srt_config):
"""stop_stream sends SIGKILL if SIGTERM fails."""
streamer = SRTStreamer(0, settings)
# Setup mock process that doesn't exit with SIGTERM
mock_stdin = MagicMock()
mock_stdin.is_closing = Mock(return_value=False)
mock_stdin.write = Mock()
mock_stdin.drain = AsyncMock()
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.stdin = mock_stdin
mock_process.stderr = AsyncMock()
mock_process.stdout = AsyncMock()
# Both waits timeout
mock_process.wait = AsyncMock(side_effect=[
asyncio.TimeoutError(),
asyncio.TimeoutError(),
None # Wait succeeds after kill
])
mock_process.terminate = Mock()
mock_process.kill = Mock()
streamer._process = mock_process
streamer._is_streaming = True
await streamer.stop_stream()
# Verify SIGKILL was sent
mock_process.kill.assert_called_once()
assert streamer.is_streaming is False