deltacast-sdi-recorder/backend/tests/test_recorder.py

441 lines
14 KiB
Python

"""Tests for FFmpeg recorder functionality."""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, MagicMock, patch
from backend.app.config import Settings
from backend.app.models import RecorderConfig, CodecType, PortStatus
from backend.app.utils.ffmpeg import FFmpegCommandBuilder
from backend.app.recorders.recorder import PortRecorder, RecorderManager
# Fixtures
@pytest.fixture
def settings():
"""Create test settings."""
return Settings(
ffmpeg_path="/usr/bin/ffmpeg",
recording_dir="/recordings",
deltacast_port_count=4,
srt_enabled=True,
srt_latency=5000,
)
@pytest.fixture
def command_builder(settings):
"""Create FFmpeg command builder."""
return FFmpegCommandBuilder(settings)
@pytest.fixture
def prores_config():
"""Create a ProRes RecorderConfig."""
return RecorderConfig(
port_index=0,
codec=CodecType.PRORES,
bitrate=0,
quality_profile="hq",
recording_path="/recordings/prores_{timestamp}.mxf",
srt_enabled=False,
)
@pytest.fixture
def dnxhd_config():
"""Create a DNxHD RecorderConfig."""
return RecorderConfig(
port_index=1,
codec=CodecType.DNXHD,
bitrate=185,
quality_profile="mq",
recording_path="/recordings/dnxhd_{timestamp}.mxf",
srt_enabled=False,
)
@pytest.fixture
def h264_config():
"""Create an H264 RecorderConfig."""
return RecorderConfig(
port_index=2,
codec=CodecType.H264,
bitrate=50,
quality_profile="lq",
recording_path="/recordings/h264_{timestamp}.mxf",
srt_enabled=False,
)
@pytest.fixture
def uncompressed_config():
"""Create an uncompressed RecorderConfig."""
return RecorderConfig(
port_index=3,
codec=CodecType.UNCOMPRESSED,
bitrate=0,
quality_profile="hq",
recording_path="/recordings/uncompressed_{timestamp}.mxf",
srt_enabled=False,
)
@pytest.fixture
def srt_config():
"""Create a RecorderConfig with SRT enabled."""
return RecorderConfig(
port_index=0,
codec=CodecType.H264,
bitrate=50,
quality_profile="mq",
recording_path="/recordings/srt_test_{timestamp}.mxf",
srt_enabled=True,
srt_destination="srt://localhost:9001",
)
# FFmpegCommandBuilder Tests
class TestFFmpegCommandBuilder:
"""Test suite for FFmpegCommandBuilder."""
def test_build_command_prores(self, command_builder, prores_config):
"""ProRes command includes prores_ks codec and MXF output."""
command = command_builder.build_command(prores_config)
# Check for required components
assert "/usr/bin/ffmpeg" in command
assert "-f" in command
assert "deltacast" in command
assert "-i" in command
assert "deltacast://0" in command
assert "-c:v" in command
assert "prores_ks" in command
assert "-profile:v" in command
assert "3" in command # High quality profile
assert "-f" in command
assert "mxf" in command
def test_build_command_dnxhd(self, command_builder, dnxhd_config):
"""DNxHD command includes dnxhd codec with bitrate."""
command = command_builder.build_command(dnxhd_config)
assert "-c:v" in command
assert "dnxhd" in command
assert "-b:v" in command
assert "185M" in command
def test_build_command_h264(self, command_builder, h264_config):
"""H264 command includes libx264."""
command = command_builder.build_command(h264_config)
assert "-c:v" in command
assert "libx264" in command
assert "-b:v" in command
assert "50M" in command
def test_build_command_uncompressed(self, command_builder, uncompressed_config):
"""Uncompressed uses rawvideo."""
command = command_builder.build_command(uncompressed_config)
assert "-c:v" in command
assert "rawvideo" in command
assert "-pix_fmt" in command
assert "uyvy422" in command
def test_build_command_with_srt(self, command_builder, srt_config):
"""SRT-enabled config appends SRT output."""
command = command_builder.build_command(srt_config)
# Should have SRT output
srt_index = None
for i, arg in enumerate(command):
if arg == "srt://localhost:9001":
srt_index = i
break
assert srt_index is not None
# Check that -f mpegts appears before SRT destination
mpegts_index = command.index("mpegts")
assert mpegts_index < srt_index
def test_build_command_without_srt(self, command_builder, h264_config):
"""SRT-disabled config has no SRT output."""
command = command_builder.build_command(h264_config)
# Should not contain mpegts (which is for SRT output)
assert "mpegts" not in command
def test_deltacast_input_format(self, command_builder, h264_config):
"""Input uses deltacast:// scheme with port index."""
command = command_builder.build_command(h264_config)
assert "deltacast://2" in command
assert "-f" in command
assert "deltacast" in command
def test_timestamp_substitution(self, command_builder, prores_config):
"""Output path substitutes {timestamp} with datetime."""
command = command_builder.build_command(prores_config)
# Find the mxf output file
mxf_index = command.index("mxf") + 1
output_file = command[mxf_index]
# Should not contain {timestamp} anymore
assert "{timestamp}" not in output_file
# Should contain datetime format
assert ".mxf" in output_file
def test_prores_quality_profiles(self, command_builder, settings):
"""ProRes codec maps quality profiles correctly."""
# Test each quality profile
profiles = {
"hq": "3",
"mq": "2",
"lq": "0",
}
for profile_name, expected_value in profiles.items():
config = RecorderConfig(
port_index=0,
codec=CodecType.PRORES,
bitrate=0,
quality_profile=profile_name,
recording_path="/recordings/test.mxf",
)
command = command_builder.build_command(config)
assert expected_value in command
# PortRecorder Tests
class TestPortRecorder:
"""Test suite for PortRecorder."""
def test_initialization(self, settings):
"""PortRecorder initializes with correct state."""
recorder = PortRecorder(0, settings)
assert recorder.port_index == 0
assert recorder._process is None
assert recorder._config is None
assert recorder._start_time is None
assert recorder._frame_count == 0
assert recorder._current_file == ""
def test_get_status_idle(self, settings):
"""Idle recorder returns is_recording=False."""
recorder = PortRecorder(0, settings)
status = recorder.get_status()
assert isinstance(status, PortStatus)
assert status.port_index == 0
assert status.is_recording is False
assert status.frame_count == 0
assert status.fps == 0.0
assert status.uptime_seconds == 0
@pytest.mark.asyncio
async def test_start_recording(self, settings, h264_config):
"""Start sets recording state correctly."""
recorder = PortRecorder(2, settings)
# Mock the subprocess creation
mock_process = AsyncMock()
mock_process.pid = 12345
mock_process.done.return_value = False
mock_process.stdin = AsyncMock()
mock_process.stdin.is_closing.return_value = False
mock_process.stderr = AsyncMock()
with patch(
"asyncio.create_subprocess_exec",
return_value=mock_process
) as mock_create:
await recorder.start(h264_config)
# Verify subprocess was created
mock_create.assert_called_once()
args = mock_create.call_args
assert args[1]["stdin"] == asyncio.subprocess.PIPE
assert args[1]["stderr"] == asyncio.subprocess.PIPE
# Verify recording state
assert recorder._process is not None
assert recorder._config == h264_config
assert recorder._start_time is not None
@pytest.mark.asyncio
async def test_start_already_running(self, settings, h264_config):
"""Starting when already running raises RuntimeError."""
recorder = PortRecorder(2, settings)
recorder._process = Mock() # Simulate already running
with pytest.raises(RuntimeError, match="already recording"):
await recorder.start(h264_config)
@pytest.mark.asyncio
async def test_stop_recording(self, settings, h264_config):
"""Stop cleans up process correctly."""
recorder = PortRecorder(2, settings)
# Mock the subprocess with proper async mocks
mock_stdin = MagicMock()
mock_stdin.is_closing = Mock(return_value=False)
mock_stdin.write = Mock()
mock_stdin.drain = AsyncMock()
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.done = Mock(return_value=False)
mock_process.stdin = mock_stdin
mock_process.wait = AsyncMock(return_value=None)
mock_process.terminate = Mock()
mock_process.kill = Mock()
mock_process.stderr = AsyncMock()
recorder._process = mock_process
recorder._start_time = 1000.0 # Mock start time
# Create a mock task that's done
mock_task = MagicMock()
mock_task.done = Mock(return_value=True)
recorder._monitor_task = mock_task
await recorder.stop()
# Verify stdin quit was attempted
mock_stdin.write.assert_called_once_with(b"q\n")
# Verify process was set to None
assert recorder._process is None
assert recorder._start_time is None
@pytest.mark.asyncio
async def test_stop_when_idle(self, settings):
"""Stopping an idle recorder is a no-op."""
recorder = PortRecorder(0, settings)
# Should not raise
await recorder.stop()
assert recorder._process is None
# RecorderManager Tests
class TestRecorderManager:
"""Test suite for RecorderManager."""
def test_initialize(self, settings):
"""Manager creates correct number of recorders."""
manager = RecorderManager(settings)
manager.initialize()
assert len(manager._recorders) == 4
assert all(isinstance(r, PortRecorder) for r in manager._recorders.values())
assert all(manager._recorders[i].port_index == i for i in range(4))
def test_get_status_idle(self, settings):
"""Get status for idle port returns idle status."""
manager = RecorderManager(settings)
manager.initialize()
status = manager.get_status(0)
assert isinstance(status, PortStatus)
assert status.port_index == 0
assert status.is_recording is False
def test_get_all_status(self, settings):
"""Get all status returns list for each port."""
manager = RecorderManager(settings)
manager.initialize()
statuses = manager.get_all_status()
assert len(statuses) == 4
assert all(isinstance(s, PortStatus) for s in statuses)
assert [s.port_index for s in statuses] == [0, 1, 2, 3]
@pytest.mark.asyncio
async def test_start_recording(self, settings, h264_config):
"""Start recording on a valid port."""
manager = RecorderManager(settings)
manager.initialize()
# Mock subprocess creation with proper setup
mock_stdin = AsyncMock()
mock_stdin.is_closing.return_value = False
mock_stdin.drain = AsyncMock()
mock_process = AsyncMock()
mock_process.pid = 12345
mock_process.done.return_value = False
mock_process.stdin = mock_stdin
mock_process.stderr = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await manager.start_recording(2, h264_config)
# Verify recording started by checking the recorder has a process
assert manager._recorders[2]._process is not None
@pytest.mark.asyncio
async def test_start_recording_invalid_port(self, settings, h264_config):
"""Starting on invalid port raises ValueError."""
manager = RecorderManager(settings)
manager.initialize()
with pytest.raises(ValueError, match="Invalid port index"):
await manager.start_recording(99, h264_config)
@pytest.mark.asyncio
async def test_stop_recording(self, settings, h264_config):
"""Stop recording on a port."""
manager = RecorderManager(settings)
manager.initialize()
# First start recording
mock_stdin = MagicMock()
mock_stdin.is_closing = Mock(return_value=False)
mock_stdin.write = Mock()
mock_stdin.drain = AsyncMock()
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.done = Mock(return_value=False)
mock_process.stdin = mock_stdin
mock_process.wait = AsyncMock(return_value=None)
mock_process.stderr = AsyncMock()
# Create a mock task that's done
mock_task = MagicMock()
mock_task.done = Mock(return_value=True)
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await manager.start_recording(2, h264_config)
# Replace monitor task with completed mock
manager._recorders[2]._monitor_task = mock_task
# Then stop
await manager.stop_recording(2)
# Verify recording stopped
status = manager.get_status(2)
assert status.is_recording is False
@pytest.mark.asyncio
async def test_stop_recording_invalid_port(self, settings):
"""Stopping on invalid port raises ValueError."""
manager = RecorderManager(settings)
manager.initialize()
with pytest.raises(ValueError, match="Invalid port index"):
await manager.stop_recording(99)
def test_get_status_invalid_port(self, settings):
"""Getting status for invalid port raises ValueError."""
manager = RecorderManager(settings)
manager.initialize()
with pytest.raises(ValueError, match="Invalid port index"):
manager.get_status(99)