441 lines
14 KiB
Python
441 lines
14 KiB
Python
"""Tests for FFmpeg recorder functionality."""
|
|
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import Mock, AsyncMock, MagicMock, patch
|
|
from backend.app.config import Settings
|
|
from backend.app.models import RecorderConfig, CodecType, PortStatus
|
|
from backend.app.utils.ffmpeg import FFmpegCommandBuilder
|
|
from backend.app.recorders.recorder import PortRecorder, RecorderManager
|
|
|
|
|
|
# Fixtures
|
|
@pytest.fixture
|
|
def settings():
|
|
"""Create test settings."""
|
|
return Settings(
|
|
ffmpeg_path="/usr/bin/ffmpeg",
|
|
recording_dir="/recordings",
|
|
deltacast_port_count=4,
|
|
srt_enabled=True,
|
|
srt_latency=5000,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def command_builder(settings):
|
|
"""Create FFmpeg command builder."""
|
|
return FFmpegCommandBuilder(settings)
|
|
|
|
|
|
@pytest.fixture
|
|
def prores_config():
|
|
"""Create a ProRes RecorderConfig."""
|
|
return RecorderConfig(
|
|
port_index=0,
|
|
codec=CodecType.PRORES,
|
|
bitrate=0,
|
|
quality_profile="hq",
|
|
recording_path="/recordings/prores_{timestamp}.mxf",
|
|
srt_enabled=False,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def dnxhd_config():
|
|
"""Create a DNxHD RecorderConfig."""
|
|
return RecorderConfig(
|
|
port_index=1,
|
|
codec=CodecType.DNXHD,
|
|
bitrate=185,
|
|
quality_profile="mq",
|
|
recording_path="/recordings/dnxhd_{timestamp}.mxf",
|
|
srt_enabled=False,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def h264_config():
|
|
"""Create an H264 RecorderConfig."""
|
|
return RecorderConfig(
|
|
port_index=2,
|
|
codec=CodecType.H264,
|
|
bitrate=50,
|
|
quality_profile="lq",
|
|
recording_path="/recordings/h264_{timestamp}.mxf",
|
|
srt_enabled=False,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def uncompressed_config():
|
|
"""Create an uncompressed RecorderConfig."""
|
|
return RecorderConfig(
|
|
port_index=3,
|
|
codec=CodecType.UNCOMPRESSED,
|
|
bitrate=0,
|
|
quality_profile="hq",
|
|
recording_path="/recordings/uncompressed_{timestamp}.mxf",
|
|
srt_enabled=False,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def srt_config():
|
|
"""Create a RecorderConfig with SRT enabled."""
|
|
return RecorderConfig(
|
|
port_index=0,
|
|
codec=CodecType.H264,
|
|
bitrate=50,
|
|
quality_profile="mq",
|
|
recording_path="/recordings/srt_test_{timestamp}.mxf",
|
|
srt_enabled=True,
|
|
srt_destination="srt://localhost:9001",
|
|
)
|
|
|
|
|
|
# FFmpegCommandBuilder Tests
|
|
|
|
class TestFFmpegCommandBuilder:
|
|
"""Test suite for FFmpegCommandBuilder."""
|
|
|
|
def test_build_command_prores(self, command_builder, prores_config):
|
|
"""ProRes command includes prores_ks codec and MXF output."""
|
|
command = command_builder.build_command(prores_config)
|
|
|
|
# Check for required components
|
|
assert "/usr/bin/ffmpeg" in command
|
|
assert "-f" in command
|
|
assert "deltacast" in command
|
|
assert "-i" in command
|
|
assert "deltacast://0" in command
|
|
assert "-c:v" in command
|
|
assert "prores_ks" in command
|
|
assert "-profile:v" in command
|
|
assert "3" in command # High quality profile
|
|
assert "-f" in command
|
|
assert "mxf" in command
|
|
|
|
def test_build_command_dnxhd(self, command_builder, dnxhd_config):
|
|
"""DNxHD command includes dnxhd codec with bitrate."""
|
|
command = command_builder.build_command(dnxhd_config)
|
|
|
|
assert "-c:v" in command
|
|
assert "dnxhd" in command
|
|
assert "-b:v" in command
|
|
assert "185M" in command
|
|
|
|
def test_build_command_h264(self, command_builder, h264_config):
|
|
"""H264 command includes libx264."""
|
|
command = command_builder.build_command(h264_config)
|
|
|
|
assert "-c:v" in command
|
|
assert "libx264" in command
|
|
assert "-b:v" in command
|
|
assert "50M" in command
|
|
|
|
def test_build_command_uncompressed(self, command_builder, uncompressed_config):
|
|
"""Uncompressed uses rawvideo."""
|
|
command = command_builder.build_command(uncompressed_config)
|
|
|
|
assert "-c:v" in command
|
|
assert "rawvideo" in command
|
|
assert "-pix_fmt" in command
|
|
assert "uyvy422" in command
|
|
|
|
def test_build_command_with_srt(self, command_builder, srt_config):
|
|
"""SRT-enabled config appends SRT output."""
|
|
command = command_builder.build_command(srt_config)
|
|
|
|
# Should have SRT output
|
|
srt_index = None
|
|
for i, arg in enumerate(command):
|
|
if arg == "srt://localhost:9001":
|
|
srt_index = i
|
|
break
|
|
|
|
assert srt_index is not None
|
|
# Check that -f mpegts appears before SRT destination
|
|
mpegts_index = command.index("mpegts")
|
|
assert mpegts_index < srt_index
|
|
|
|
def test_build_command_without_srt(self, command_builder, h264_config):
|
|
"""SRT-disabled config has no SRT output."""
|
|
command = command_builder.build_command(h264_config)
|
|
|
|
# Should not contain mpegts (which is for SRT output)
|
|
assert "mpegts" not in command
|
|
|
|
def test_deltacast_input_format(self, command_builder, h264_config):
|
|
"""Input uses deltacast:// scheme with port index."""
|
|
command = command_builder.build_command(h264_config)
|
|
|
|
assert "deltacast://2" in command
|
|
assert "-f" in command
|
|
assert "deltacast" in command
|
|
|
|
def test_timestamp_substitution(self, command_builder, prores_config):
|
|
"""Output path substitutes {timestamp} with datetime."""
|
|
command = command_builder.build_command(prores_config)
|
|
|
|
# Find the mxf output file
|
|
mxf_index = command.index("mxf") + 1
|
|
output_file = command[mxf_index]
|
|
|
|
# Should not contain {timestamp} anymore
|
|
assert "{timestamp}" not in output_file
|
|
# Should contain datetime format
|
|
assert ".mxf" in output_file
|
|
|
|
def test_prores_quality_profiles(self, command_builder, settings):
|
|
"""ProRes codec maps quality profiles correctly."""
|
|
# Test each quality profile
|
|
profiles = {
|
|
"hq": "3",
|
|
"mq": "2",
|
|
"lq": "0",
|
|
}
|
|
|
|
for profile_name, expected_value in profiles.items():
|
|
config = RecorderConfig(
|
|
port_index=0,
|
|
codec=CodecType.PRORES,
|
|
bitrate=0,
|
|
quality_profile=profile_name,
|
|
recording_path="/recordings/test.mxf",
|
|
)
|
|
command = command_builder.build_command(config)
|
|
assert expected_value in command
|
|
|
|
|
|
# PortRecorder Tests
|
|
|
|
class TestPortRecorder:
|
|
"""Test suite for PortRecorder."""
|
|
|
|
def test_initialization(self, settings):
|
|
"""PortRecorder initializes with correct state."""
|
|
recorder = PortRecorder(0, settings)
|
|
|
|
assert recorder.port_index == 0
|
|
assert recorder._process is None
|
|
assert recorder._config is None
|
|
assert recorder._start_time is None
|
|
assert recorder._frame_count == 0
|
|
assert recorder._current_file == ""
|
|
|
|
def test_get_status_idle(self, settings):
|
|
"""Idle recorder returns is_recording=False."""
|
|
recorder = PortRecorder(0, settings)
|
|
status = recorder.get_status()
|
|
|
|
assert isinstance(status, PortStatus)
|
|
assert status.port_index == 0
|
|
assert status.is_recording is False
|
|
assert status.frame_count == 0
|
|
assert status.fps == 0.0
|
|
assert status.uptime_seconds == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_recording(self, settings, h264_config):
|
|
"""Start sets recording state correctly."""
|
|
recorder = PortRecorder(2, settings)
|
|
|
|
# Mock the subprocess creation
|
|
mock_process = AsyncMock()
|
|
mock_process.pid = 12345
|
|
mock_process.done.return_value = False
|
|
mock_process.stdin = AsyncMock()
|
|
mock_process.stdin.is_closing.return_value = False
|
|
mock_process.stderr = AsyncMock()
|
|
|
|
with patch(
|
|
"asyncio.create_subprocess_exec",
|
|
return_value=mock_process
|
|
) as mock_create:
|
|
await recorder.start(h264_config)
|
|
|
|
# Verify subprocess was created
|
|
mock_create.assert_called_once()
|
|
args = mock_create.call_args
|
|
assert args[1]["stdin"] == asyncio.subprocess.PIPE
|
|
assert args[1]["stderr"] == asyncio.subprocess.PIPE
|
|
|
|
# Verify recording state
|
|
assert recorder._process is not None
|
|
assert recorder._config == h264_config
|
|
assert recorder._start_time is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_already_running(self, settings, h264_config):
|
|
"""Starting when already running raises RuntimeError."""
|
|
recorder = PortRecorder(2, settings)
|
|
recorder._process = Mock() # Simulate already running
|
|
|
|
with pytest.raises(RuntimeError, match="already recording"):
|
|
await recorder.start(h264_config)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_recording(self, settings, h264_config):
|
|
"""Stop cleans up process correctly."""
|
|
recorder = PortRecorder(2, settings)
|
|
|
|
# Mock the subprocess with proper async mocks
|
|
mock_stdin = MagicMock()
|
|
mock_stdin.is_closing = Mock(return_value=False)
|
|
mock_stdin.write = Mock()
|
|
mock_stdin.drain = AsyncMock()
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.pid = 12345
|
|
mock_process.done = Mock(return_value=False)
|
|
mock_process.stdin = mock_stdin
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
mock_process.terminate = Mock()
|
|
mock_process.kill = Mock()
|
|
mock_process.stderr = AsyncMock()
|
|
|
|
recorder._process = mock_process
|
|
recorder._start_time = 1000.0 # Mock start time
|
|
|
|
# Create a mock task that's done
|
|
mock_task = MagicMock()
|
|
mock_task.done = Mock(return_value=True)
|
|
recorder._monitor_task = mock_task
|
|
|
|
await recorder.stop()
|
|
|
|
# Verify stdin quit was attempted
|
|
mock_stdin.write.assert_called_once_with(b"q\n")
|
|
# Verify process was set to None
|
|
assert recorder._process is None
|
|
assert recorder._start_time is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_when_idle(self, settings):
|
|
"""Stopping an idle recorder is a no-op."""
|
|
recorder = PortRecorder(0, settings)
|
|
# Should not raise
|
|
await recorder.stop()
|
|
assert recorder._process is None
|
|
|
|
|
|
# RecorderManager Tests
|
|
|
|
class TestRecorderManager:
|
|
"""Test suite for RecorderManager."""
|
|
|
|
def test_initialize(self, settings):
|
|
"""Manager creates correct number of recorders."""
|
|
manager = RecorderManager(settings)
|
|
manager.initialize()
|
|
|
|
assert len(manager._recorders) == 4
|
|
assert all(isinstance(r, PortRecorder) for r in manager._recorders.values())
|
|
assert all(manager._recorders[i].port_index == i for i in range(4))
|
|
|
|
def test_get_status_idle(self, settings):
|
|
"""Get status for idle port returns idle status."""
|
|
manager = RecorderManager(settings)
|
|
manager.initialize()
|
|
|
|
status = manager.get_status(0)
|
|
|
|
assert isinstance(status, PortStatus)
|
|
assert status.port_index == 0
|
|
assert status.is_recording is False
|
|
|
|
def test_get_all_status(self, settings):
|
|
"""Get all status returns list for each port."""
|
|
manager = RecorderManager(settings)
|
|
manager.initialize()
|
|
|
|
statuses = manager.get_all_status()
|
|
|
|
assert len(statuses) == 4
|
|
assert all(isinstance(s, PortStatus) for s in statuses)
|
|
assert [s.port_index for s in statuses] == [0, 1, 2, 3]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_recording(self, settings, h264_config):
|
|
"""Start recording on a valid port."""
|
|
manager = RecorderManager(settings)
|
|
manager.initialize()
|
|
|
|
# Mock subprocess creation with proper setup
|
|
mock_stdin = AsyncMock()
|
|
mock_stdin.is_closing.return_value = False
|
|
mock_stdin.drain = AsyncMock()
|
|
|
|
mock_process = AsyncMock()
|
|
mock_process.pid = 12345
|
|
mock_process.done.return_value = False
|
|
mock_process.stdin = mock_stdin
|
|
mock_process.stderr = AsyncMock()
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
await manager.start_recording(2, h264_config)
|
|
|
|
# Verify recording started by checking the recorder has a process
|
|
assert manager._recorders[2]._process is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_recording_invalid_port(self, settings, h264_config):
|
|
"""Starting on invalid port raises ValueError."""
|
|
manager = RecorderManager(settings)
|
|
manager.initialize()
|
|
|
|
with pytest.raises(ValueError, match="Invalid port index"):
|
|
await manager.start_recording(99, h264_config)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_recording(self, settings, h264_config):
|
|
"""Stop recording on a port."""
|
|
manager = RecorderManager(settings)
|
|
manager.initialize()
|
|
|
|
# First start recording
|
|
mock_stdin = MagicMock()
|
|
mock_stdin.is_closing = Mock(return_value=False)
|
|
mock_stdin.write = Mock()
|
|
mock_stdin.drain = AsyncMock()
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.pid = 12345
|
|
mock_process.done = Mock(return_value=False)
|
|
mock_process.stdin = mock_stdin
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
mock_process.stderr = AsyncMock()
|
|
|
|
# Create a mock task that's done
|
|
mock_task = MagicMock()
|
|
mock_task.done = Mock(return_value=True)
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
await manager.start_recording(2, h264_config)
|
|
# Replace monitor task with completed mock
|
|
manager._recorders[2]._monitor_task = mock_task
|
|
|
|
# Then stop
|
|
await manager.stop_recording(2)
|
|
|
|
# Verify recording stopped
|
|
status = manager.get_status(2)
|
|
assert status.is_recording is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_recording_invalid_port(self, settings):
|
|
"""Stopping on invalid port raises ValueError."""
|
|
manager = RecorderManager(settings)
|
|
manager.initialize()
|
|
|
|
with pytest.raises(ValueError, match="Invalid port index"):
|
|
await manager.stop_recording(99)
|
|
|
|
def test_get_status_invalid_port(self, settings):
|
|
"""Getting status for invalid port raises ValueError."""
|
|
manager = RecorderManager(settings)
|
|
manager.initialize()
|
|
|
|
with pytest.raises(ValueError, match="Invalid port index"):
|
|
manager.get_status(99)
|