Add backend/app/recorders/recorder.py

This commit is contained in:
Zac Gaetano 2026-04-14 09:21:09 -04:00
parent ec76de02bb
commit 114c44fdb9

View file

@ -0,0 +1,285 @@
"""Recorder management for Deltacast SDI recording ports."""
import asyncio
import re
import time
import logging
from backend.app.config import Settings
from backend.app.models import RecorderConfig, PortStatus, CodecType
from backend.app.utils.ffmpeg import FFmpegCommandBuilder
logger = logging.getLogger(__name__)
class PortRecorder:
"""Manages FFmpeg recording subprocess for a single Deltacast port."""
def __init__(self, port_index: int, settings: Settings):
"""Initialize a port recorder.
Args:
port_index: 0-based port index
settings: Application settings
"""
self.port_index = port_index
self.settings = settings
self._process: asyncio.subprocess.Process | None = None
self._config: RecorderConfig | None = None
self._start_time: float | None = None
self._frame_count: int = 0
self._current_file: str = ""
self._monitor_task: asyncio.Task | None = None
self._command_builder = FFmpegCommandBuilder(settings)
async def start(self, config: RecorderConfig) -> None:
"""Start FFmpeg recording subprocess.
Args:
config: RecorderConfig with codec and output settings
Raises:
RuntimeError: If recorder is already running
"""
if self._process is not None:
raise RuntimeError(f"Port {self.port_index} is already recording")
self._config = config
self._current_file = config.recording_path
self._frame_count = 0
self._start_time = time.time()
# Build FFmpeg command
command = self._command_builder.build_command(config)
logger.info(f"Port {self.port_index}: Starting FFmpeg: {' '.join(command)}")
# Spawn subprocess
self._process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Start monitoring task
self._monitor_task = asyncio.create_task(self._monitor_process())
logger.info(f"Port {self.port_index}: Recording started (PID: {self._process.pid})")
async def stop(self) -> None:
"""Gracefully stop FFmpeg: stdin quit -> SIGTERM -> SIGKILL.
The subprocess is terminated using escalating methods:
1. Send 'q\n' to stdin (graceful quit)
2. Wait up to 3 seconds for process to exit
3. Send SIGTERM
4. Wait up to 2 seconds for process to exit
5. Send SIGKILL
If already stopped, this is a no-op.
"""
if self._process is None:
logger.info(f"Port {self.port_index}: Already stopped")
return
logger.info(f"Port {self.port_index}: Stopping recording (PID: {self._process.pid})")
try:
# Step 1: Try graceful quit via stdin
if self._process.stdin and not self._process.stdin.is_closing():
self._process.stdin.write(b"q\n")
await self._process.stdin.drain()
logger.debug(f"Port {self.port_index}: Sent quit command to stdin")
# Step 2: Wait up to 3 seconds for graceful shutdown
try:
await asyncio.wait_for(self._process.wait(), timeout=3.0)
logger.info(f"Port {self.port_index}: Process exited gracefully")
self._process = None
return
except asyncio.TimeoutError:
logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM")
# Step 3: Send SIGTERM
self._process.terminate()
# Step 4: Wait up to 2 seconds for SIGTERM to work
try:
await asyncio.wait_for(self._process.wait(), timeout=2.0)
logger.info(f"Port {self.port_index}: Process exited after SIGTERM")
self._process = None
return
except asyncio.TimeoutError:
logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL")
# Step 5: Send SIGKILL
self._process.kill()
await self._process.wait()
logger.info(f"Port {self.port_index}: Process killed")
except Exception as e:
logger.error(f"Port {self.port_index}: Error stopping process: {e}")
finally:
self._process = None
if self._monitor_task and not self._monitor_task.done():
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
self._start_time = None
def get_status(self) -> PortStatus:
"""Get current status of this port.
Returns:
PortStatus with current recording metrics
"""
is_recording = self._process is not None and not self._process.done()
# Calculate uptime in seconds
uptime_seconds = 0
if self._start_time is not None:
uptime_seconds = int(time.time() - self._start_time)
# Calculate FPS
fps = 0.0
if uptime_seconds > 0:
fps = self._frame_count / uptime_seconds
# Calculate bitrate (would need real-time data from FFmpeg in production)
# For now, return 0 as this would require parsing FFmpeg stats
bitrate_mbps = 0.0
codec = self._config.codec if self._config else CodecType.H264
return PortStatus(
port_index=self.port_index,
is_recording=is_recording,
frame_count=self._frame_count,
fps=fps,
bitrate_mbps=bitrate_mbps,
uptime_seconds=uptime_seconds,
current_file=self._current_file,
codec=codec,
)
async def _monitor_process(self) -> None:
"""Parse FFmpeg stderr for frame=N updates.
Reads FFmpeg stderr line by line and extracts frame count using regex.
Updates self._frame_count as data is received.
Continues until the process exits or the task is cancelled.
"""
if self._process is None or self._process.stderr is None:
return
try:
async for line in self._process.stderr:
try:
line_str = line.decode("utf-8", errors="ignore").strip()
if not line_str:
continue
# Parse frame count from FFmpeg stderr
# Format: "frame= 123 fps=..."
match = re.search(r"frame=\s*(\d+)", line_str)
if match:
self._frame_count = int(match.group(1))
logger.debug(
f"Port {self.port_index}: Frame count = {self._frame_count}"
)
except Exception as e:
logger.error(f"Port {self.port_index}: Error parsing stderr: {e}")
except asyncio.CancelledError:
logger.debug(f"Port {self.port_index}: Monitoring task cancelled")
except Exception as e:
logger.error(f"Port {self.port_index}: Monitoring process error: {e}")
class RecorderManager:
"""Manages recording on all Deltacast ports."""
def __init__(self, settings: Settings):
"""Initialize the recorder manager.
Args:
settings: Application settings
"""
self.settings = settings
self._recorders: dict[int, PortRecorder] = {}
def initialize(self) -> None:
"""Create PortRecorder for each port (0 to DELTACAST_PORT_COUNT-1)."""
for port_index in range(self.settings.deltacast_port_count):
self._recorders[port_index] = PortRecorder(port_index, self.settings)
logger.info(
f"Initialized {self.settings.deltacast_port_count} port recorders"
)
async def start_recording(self, port_index: int, config: RecorderConfig) -> None:
"""Start recording on given port.
Args:
port_index: 0-based port index
config: RecorderConfig with codec and output settings
Raises:
ValueError: If port_index is invalid
RuntimeError: If port is already recording
"""
if port_index not in self._recorders:
raise ValueError(
f"Invalid port index {port_index}. "
f"Valid range: 0-{self.settings.deltacast_port_count - 1}"
)
recorder = self._recorders[port_index]
await recorder.start(config)
async def stop_recording(self, port_index: int) -> None:
"""Stop recording on given port.
Args:
port_index: 0-based port index
Raises:
ValueError: If port_index is invalid
"""
if port_index not in self._recorders:
raise ValueError(
f"Invalid port index {port_index}. "
f"Valid range: 0-{self.settings.deltacast_port_count - 1}"
)
recorder = self._recorders[port_index]
await recorder.stop()
def get_status(self, port_index: int) -> PortStatus:
"""Get status for a single port.
Args:
port_index: 0-based port index
Returns:
PortStatus with current metrics
Raises:
ValueError: If port_index is invalid
"""
if port_index not in self._recorders:
raise ValueError(
f"Invalid port index {port_index}. "
f"Valid range: 0-{self.settings.deltacast_port_count - 1}"
)
return self._recorders[port_index].get_status()
def get_all_status(self) -> list[PortStatus]:
"""Get status for all ports.
Returns:
List of PortStatus objects for each port
"""
return [self._recorders[i].get_status()
for i in range(self.settings.deltacast_port_count)]