Add backend/tests/test_recorder.py
This commit is contained in:
parent
3c85d14d19
commit
52157add93
1 changed files with 441 additions and 0 deletions
441
backend/tests/test_recorder.py
Normal file
441
backend/tests/test_recorder.py
Normal file
|
|
@ -0,0 +1,441 @@
|
|||
"""Tests for FFmpeg recorder functionality."""
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
from unittest.mock import Mock, AsyncMock, MagicMock, patch
|
||||
from backend.app.config import Settings
|
||||
from backend.app.models import RecorderConfig, CodecType, PortStatus
|
||||
from backend.app.utils.ffmpeg import FFmpegCommandBuilder
|
||||
from backend.app.recorders.recorder import PortRecorder, RecorderManager
|
||||
|
||||
|
||||
# Fixtures
|
||||
@pytest.fixture
|
||||
def settings():
|
||||
"""Create test settings."""
|
||||
return Settings(
|
||||
ffmpeg_path="/usr/bin/ffmpeg",
|
||||
recording_dir="/recordings",
|
||||
deltacast_port_count=4,
|
||||
srt_enabled=True,
|
||||
srt_latency=5000,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def command_builder(settings):
|
||||
"""Create FFmpeg command builder."""
|
||||
return FFmpegCommandBuilder(settings)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def prores_config():
|
||||
"""Create a ProRes RecorderConfig."""
|
||||
return RecorderConfig(
|
||||
port_index=0,
|
||||
codec=CodecType.PRORES,
|
||||
bitrate=0,
|
||||
quality_profile="hq",
|
||||
recording_path="/recordings/prores_{timestamp}.mxf",
|
||||
srt_enabled=False,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dnxhd_config():
|
||||
"""Create a DNxHD RecorderConfig."""
|
||||
return RecorderConfig(
|
||||
port_index=1,
|
||||
codec=CodecType.DNXHD,
|
||||
bitrate=185,
|
||||
quality_profile="mq",
|
||||
recording_path="/recordings/dnxhd_{timestamp}.mxf",
|
||||
srt_enabled=False,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def h264_config():
|
||||
"""Create an H264 RecorderConfig."""
|
||||
return RecorderConfig(
|
||||
port_index=2,
|
||||
codec=CodecType.H264,
|
||||
bitrate=50,
|
||||
quality_profile="lq",
|
||||
recording_path="/recordings/h264_{timestamp}.mxf",
|
||||
srt_enabled=False,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def uncompressed_config():
|
||||
"""Create an uncompressed RecorderConfig."""
|
||||
return RecorderConfig(
|
||||
port_index=3,
|
||||
codec=CodecType.UNCOMPRESSED,
|
||||
bitrate=0,
|
||||
quality_profile="hq",
|
||||
recording_path="/recordings/uncompressed_{timestamp}.mxf",
|
||||
srt_enabled=False,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def srt_config():
|
||||
"""Create a RecorderConfig with SRT enabled."""
|
||||
return RecorderConfig(
|
||||
port_index=0,
|
||||
codec=CodecType.H264,
|
||||
bitrate=50,
|
||||
quality_profile="mq",
|
||||
recording_path="/recordings/srt_test_{timestamp}.mxf",
|
||||
srt_enabled=True,
|
||||
srt_destination="srt://localhost:9001",
|
||||
)
|
||||
|
||||
|
||||
# FFmpegCommandBuilder Tests
|
||||
|
||||
class TestFFmpegCommandBuilder:
|
||||
"""Test suite for FFmpegCommandBuilder."""
|
||||
|
||||
def test_build_command_prores(self, command_builder, prores_config):
|
||||
"""ProRes command includes prores_ks codec and MXF output."""
|
||||
command = command_builder.build_command(prores_config)
|
||||
|
||||
# Check for required components
|
||||
assert "/usr/bin/ffmpeg" in command
|
||||
assert "-f" in command
|
||||
assert "deltacast" in command
|
||||
assert "-i" in command
|
||||
assert "deltacast://0" in command
|
||||
assert "-c:v" in command
|
||||
assert "prores_ks" in command
|
||||
assert "-profile:v" in command
|
||||
assert "3" in command # High quality profile
|
||||
assert "-f" in command
|
||||
assert "mxf" in command
|
||||
|
||||
def test_build_command_dnxhd(self, command_builder, dnxhd_config):
|
||||
"""DNxHD command includes dnxhd codec with bitrate."""
|
||||
command = command_builder.build_command(dnxhd_config)
|
||||
|
||||
assert "-c:v" in command
|
||||
assert "dnxhd" in command
|
||||
assert "-b:v" in command
|
||||
assert "185M" in command
|
||||
|
||||
def test_build_command_h264(self, command_builder, h264_config):
|
||||
"""H264 command includes libx264."""
|
||||
command = command_builder.build_command(h264_config)
|
||||
|
||||
assert "-c:v" in command
|
||||
assert "libx264" in command
|
||||
assert "-b:v" in command
|
||||
assert "50M" in command
|
||||
|
||||
def test_build_command_uncompressed(self, command_builder, uncompressed_config):
|
||||
"""Uncompressed uses rawvideo."""
|
||||
command = command_builder.build_command(uncompressed_config)
|
||||
|
||||
assert "-c:v" in command
|
||||
assert "rawvideo" in command
|
||||
assert "-pix_fmt" in command
|
||||
assert "uyvy422" in command
|
||||
|
||||
def test_build_command_with_srt(self, command_builder, srt_config):
|
||||
"""SRT-enabled config appends SRT output."""
|
||||
command = command_builder.build_command(srt_config)
|
||||
|
||||
# Should have SRT output
|
||||
srt_index = None
|
||||
for i, arg in enumerate(command):
|
||||
if arg == "srt://localhost:9001":
|
||||
srt_index = i
|
||||
break
|
||||
|
||||
assert srt_index is not None
|
||||
# Check that -f mpegts appears before SRT destination
|
||||
mpegts_index = command.index("mpegts")
|
||||
assert mpegts_index < srt_index
|
||||
|
||||
def test_build_command_without_srt(self, command_builder, h264_config):
|
||||
"""SRT-disabled config has no SRT output."""
|
||||
command = command_builder.build_command(h264_config)
|
||||
|
||||
# Should not contain mpegts (which is for SRT output)
|
||||
assert "mpegts" not in command
|
||||
|
||||
def test_deltacast_input_format(self, command_builder, h264_config):
|
||||
"""Input uses deltacast:// scheme with port index."""
|
||||
command = command_builder.build_command(h264_config)
|
||||
|
||||
assert "deltacast://2" in command
|
||||
assert "-f" in command
|
||||
assert "deltacast" in command
|
||||
|
||||
def test_timestamp_substitution(self, command_builder, prores_config):
|
||||
"""Output path substitutes {timestamp} with datetime."""
|
||||
command = command_builder.build_command(prores_config)
|
||||
|
||||
# Find the mxf output file
|
||||
mxf_index = command.index("mxf") + 1
|
||||
output_file = command[mxf_index]
|
||||
|
||||
# Should not contain {timestamp} anymore
|
||||
assert "{timestamp}" not in output_file
|
||||
# Should contain datetime format
|
||||
assert ".mxf" in output_file
|
||||
|
||||
def test_prores_quality_profiles(self, command_builder, settings):
|
||||
"""ProRes codec maps quality profiles correctly."""
|
||||
# Test each quality profile
|
||||
profiles = {
|
||||
"hq": "3",
|
||||
"mq": "2",
|
||||
"lq": "0",
|
||||
}
|
||||
|
||||
for profile_name, expected_value in profiles.items():
|
||||
config = RecorderConfig(
|
||||
port_index=0,
|
||||
codec=CodecType.PRORES,
|
||||
bitrate=0,
|
||||
quality_profile=profile_name,
|
||||
recording_path="/recordings/test.mxf",
|
||||
)
|
||||
command = command_builder.build_command(config)
|
||||
assert expected_value in command
|
||||
|
||||
|
||||
# PortRecorder Tests
|
||||
|
||||
class TestPortRecorder:
|
||||
"""Test suite for PortRecorder."""
|
||||
|
||||
def test_initialization(self, settings):
|
||||
"""PortRecorder initializes with correct state."""
|
||||
recorder = PortRecorder(0, settings)
|
||||
|
||||
assert recorder.port_index == 0
|
||||
assert recorder._process is None
|
||||
assert recorder._config is None
|
||||
assert recorder._start_time is None
|
||||
assert recorder._frame_count == 0
|
||||
assert recorder._current_file == ""
|
||||
|
||||
def test_get_status_idle(self, settings):
|
||||
"""Idle recorder returns is_recording=False."""
|
||||
recorder = PortRecorder(0, settings)
|
||||
status = recorder.get_status()
|
||||
|
||||
assert isinstance(status, PortStatus)
|
||||
assert status.port_index == 0
|
||||
assert status.is_recording is False
|
||||
assert status.frame_count == 0
|
||||
assert status.fps == 0.0
|
||||
assert status.uptime_seconds == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_recording(self, settings, h264_config):
|
||||
"""Start sets recording state correctly."""
|
||||
recorder = PortRecorder(2, settings)
|
||||
|
||||
# Mock the subprocess creation
|
||||
mock_process = AsyncMock()
|
||||
mock_process.pid = 12345
|
||||
mock_process.done.return_value = False
|
||||
mock_process.stdin = AsyncMock()
|
||||
mock_process.stdin.is_closing.return_value = False
|
||||
mock_process.stderr = AsyncMock()
|
||||
|
||||
with patch(
|
||||
"asyncio.create_subprocess_exec",
|
||||
return_value=mock_process
|
||||
) as mock_create:
|
||||
await recorder.start(h264_config)
|
||||
|
||||
# Verify subprocess was created
|
||||
mock_create.assert_called_once()
|
||||
args = mock_create.call_args
|
||||
assert args[1]["stdin"] == asyncio.subprocess.PIPE
|
||||
assert args[1]["stderr"] == asyncio.subprocess.PIPE
|
||||
|
||||
# Verify recording state
|
||||
assert recorder._process is not None
|
||||
assert recorder._config == h264_config
|
||||
assert recorder._start_time is not None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_already_running(self, settings, h264_config):
|
||||
"""Starting when already running raises RuntimeError."""
|
||||
recorder = PortRecorder(2, settings)
|
||||
recorder._process = Mock() # Simulate already running
|
||||
|
||||
with pytest.raises(RuntimeError, match="already recording"):
|
||||
await recorder.start(h264_config)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_recording(self, settings, h264_config):
|
||||
"""Stop cleans up process correctly."""
|
||||
recorder = PortRecorder(2, settings)
|
||||
|
||||
# Mock the subprocess with proper async mocks
|
||||
mock_stdin = MagicMock()
|
||||
mock_stdin.is_closing = Mock(return_value=False)
|
||||
mock_stdin.write = Mock()
|
||||
mock_stdin.drain = AsyncMock()
|
||||
|
||||
mock_process = MagicMock()
|
||||
mock_process.pid = 12345
|
||||
mock_process.done = Mock(return_value=False)
|
||||
mock_process.stdin = mock_stdin
|
||||
mock_process.wait = AsyncMock(return_value=None)
|
||||
mock_process.terminate = Mock()
|
||||
mock_process.kill = Mock()
|
||||
mock_process.stderr = AsyncMock()
|
||||
|
||||
recorder._process = mock_process
|
||||
recorder._start_time = 1000.0 # Mock start time
|
||||
|
||||
# Create a mock task that's done
|
||||
mock_task = MagicMock()
|
||||
mock_task.done = Mock(return_value=True)
|
||||
recorder._monitor_task = mock_task
|
||||
|
||||
await recorder.stop()
|
||||
|
||||
# Verify stdin quit was attempted
|
||||
mock_stdin.write.assert_called_once_with(b"q\n")
|
||||
# Verify process was set to None
|
||||
assert recorder._process is None
|
||||
assert recorder._start_time is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_when_idle(self, settings):
|
||||
"""Stopping an idle recorder is a no-op."""
|
||||
recorder = PortRecorder(0, settings)
|
||||
# Should not raise
|
||||
await recorder.stop()
|
||||
assert recorder._process is None
|
||||
|
||||
|
||||
# RecorderManager Tests
|
||||
|
||||
class TestRecorderManager:
|
||||
"""Test suite for RecorderManager."""
|
||||
|
||||
def test_initialize(self, settings):
|
||||
"""Manager creates correct number of recorders."""
|
||||
manager = RecorderManager(settings)
|
||||
manager.initialize()
|
||||
|
||||
assert len(manager._recorders) == 4
|
||||
assert all(isinstance(r, PortRecorder) for r in manager._recorders.values())
|
||||
assert all(manager._recorders[i].port_index == i for i in range(4))
|
||||
|
||||
def test_get_status_idle(self, settings):
|
||||
"""Get status for idle port returns idle status."""
|
||||
manager = RecorderManager(settings)
|
||||
manager.initialize()
|
||||
|
||||
status = manager.get_status(0)
|
||||
|
||||
assert isinstance(status, PortStatus)
|
||||
assert status.port_index == 0
|
||||
assert status.is_recording is False
|
||||
|
||||
def test_get_all_status(self, settings):
|
||||
"""Get all status returns list for each port."""
|
||||
manager = RecorderManager(settings)
|
||||
manager.initialize()
|
||||
|
||||
statuses = manager.get_all_status()
|
||||
|
||||
assert len(statuses) == 4
|
||||
assert all(isinstance(s, PortStatus) for s in statuses)
|
||||
assert [s.port_index for s in statuses] == [0, 1, 2, 3]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_recording(self, settings, h264_config):
|
||||
"""Start recording on a valid port."""
|
||||
manager = RecorderManager(settings)
|
||||
manager.initialize()
|
||||
|
||||
# Mock subprocess creation with proper setup
|
||||
mock_stdin = AsyncMock()
|
||||
mock_stdin.is_closing.return_value = False
|
||||
mock_stdin.drain = AsyncMock()
|
||||
|
||||
mock_process = AsyncMock()
|
||||
mock_process.pid = 12345
|
||||
mock_process.done.return_value = False
|
||||
mock_process.stdin = mock_stdin
|
||||
mock_process.stderr = AsyncMock()
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
||||
await manager.start_recording(2, h264_config)
|
||||
|
||||
# Verify recording started by checking the recorder has a process
|
||||
assert manager._recorders[2]._process is not None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_recording_invalid_port(self, settings, h264_config):
|
||||
"""Starting on invalid port raises ValueError."""
|
||||
manager = RecorderManager(settings)
|
||||
manager.initialize()
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid port index"):
|
||||
await manager.start_recording(99, h264_config)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_recording(self, settings, h264_config):
|
||||
"""Stop recording on a port."""
|
||||
manager = RecorderManager(settings)
|
||||
manager.initialize()
|
||||
|
||||
# First start recording
|
||||
mock_stdin = MagicMock()
|
||||
mock_stdin.is_closing = Mock(return_value=False)
|
||||
mock_stdin.write = Mock()
|
||||
mock_stdin.drain = AsyncMock()
|
||||
|
||||
mock_process = MagicMock()
|
||||
mock_process.pid = 12345
|
||||
mock_process.done = Mock(return_value=False)
|
||||
mock_process.stdin = mock_stdin
|
||||
mock_process.wait = AsyncMock(return_value=None)
|
||||
mock_process.stderr = AsyncMock()
|
||||
|
||||
# Create a mock task that's done
|
||||
mock_task = MagicMock()
|
||||
mock_task.done = Mock(return_value=True)
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
||||
await manager.start_recording(2, h264_config)
|
||||
# Replace monitor task with completed mock
|
||||
manager._recorders[2]._monitor_task = mock_task
|
||||
|
||||
# Then stop
|
||||
await manager.stop_recording(2)
|
||||
|
||||
# Verify recording stopped
|
||||
status = manager.get_status(2)
|
||||
assert status.is_recording is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_recording_invalid_port(self, settings):
|
||||
"""Stopping on invalid port raises ValueError."""
|
||||
manager = RecorderManager(settings)
|
||||
manager.initialize()
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid port index"):
|
||||
await manager.stop_recording(99)
|
||||
|
||||
def test_get_status_invalid_port(self, settings):
|
||||
"""Getting status for invalid port raises ValueError."""
|
||||
manager = RecorderManager(settings)
|
||||
manager.initialize()
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid port index"):
|
||||
manager.get_status(99)
|
||||
Loading…
Reference in a new issue