From 114c44fdb92449c666a54652dce631debedcb8b3 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Tue, 14 Apr 2026 09:21:09 -0400 Subject: [PATCH] Add backend/app/recorders/recorder.py --- backend/app/recorders/recorder.py | 285 ++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 backend/app/recorders/recorder.py diff --git a/backend/app/recorders/recorder.py b/backend/app/recorders/recorder.py new file mode 100644 index 0000000..e2b93a2 --- /dev/null +++ b/backend/app/recorders/recorder.py @@ -0,0 +1,285 @@ +"""Recorder management for Deltacast SDI recording ports.""" + +import asyncio +import re +import time +import logging +from backend.app.config import Settings +from backend.app.models import RecorderConfig, PortStatus, CodecType +from backend.app.utils.ffmpeg import FFmpegCommandBuilder + +logger = logging.getLogger(__name__) + + +class PortRecorder: + """Manages FFmpeg recording subprocess for a single Deltacast port.""" + + def __init__(self, port_index: int, settings: Settings): + """Initialize a port recorder. + + Args: + port_index: 0-based port index + settings: Application settings + """ + self.port_index = port_index + self.settings = settings + self._process: asyncio.subprocess.Process | None = None + self._config: RecorderConfig | None = None + self._start_time: float | None = None + self._frame_count: int = 0 + self._current_file: str = "" + self._monitor_task: asyncio.Task | None = None + self._command_builder = FFmpegCommandBuilder(settings) + + async def start(self, config: RecorderConfig) -> None: + """Start FFmpeg recording subprocess. + + Args: + config: RecorderConfig with codec and output settings + + Raises: + RuntimeError: If recorder is already running + """ + if self._process is not None: + raise RuntimeError(f"Port {self.port_index} is already recording") + + self._config = config + self._current_file = config.recording_path + self._frame_count = 0 + self._start_time = time.time() + + # Build FFmpeg command + command = self._command_builder.build_command(config) + logger.info(f"Port {self.port_index}: Starting FFmpeg: {' '.join(command)}") + + # Spawn subprocess + self._process = await asyncio.create_subprocess_exec( + *command, + stdin=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Start monitoring task + self._monitor_task = asyncio.create_task(self._monitor_process()) + logger.info(f"Port {self.port_index}: Recording started (PID: {self._process.pid})") + + async def stop(self) -> None: + """Gracefully stop FFmpeg: stdin quit -> SIGTERM -> SIGKILL. + + The subprocess is terminated using escalating methods: + 1. Send 'q\n' to stdin (graceful quit) + 2. Wait up to 3 seconds for process to exit + 3. Send SIGTERM + 4. Wait up to 2 seconds for process to exit + 5. Send SIGKILL + + If already stopped, this is a no-op. + """ + if self._process is None: + logger.info(f"Port {self.port_index}: Already stopped") + return + + logger.info(f"Port {self.port_index}: Stopping recording (PID: {self._process.pid})") + + try: + # Step 1: Try graceful quit via stdin + if self._process.stdin and not self._process.stdin.is_closing(): + self._process.stdin.write(b"q\n") + await self._process.stdin.drain() + logger.debug(f"Port {self.port_index}: Sent quit command to stdin") + + # Step 2: Wait up to 3 seconds for graceful shutdown + try: + await asyncio.wait_for(self._process.wait(), timeout=3.0) + logger.info(f"Port {self.port_index}: Process exited gracefully") + self._process = None + return + except asyncio.TimeoutError: + logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM") + + # Step 3: Send SIGTERM + self._process.terminate() + + # Step 4: Wait up to 2 seconds for SIGTERM to work + try: + await asyncio.wait_for(self._process.wait(), timeout=2.0) + logger.info(f"Port {self.port_index}: Process exited after SIGTERM") + self._process = None + return + except asyncio.TimeoutError: + logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL") + + # Step 5: Send SIGKILL + self._process.kill() + await self._process.wait() + logger.info(f"Port {self.port_index}: Process killed") + + except Exception as e: + logger.error(f"Port {self.port_index}: Error stopping process: {e}") + finally: + self._process = None + if self._monitor_task and not self._monitor_task.done(): + self._monitor_task.cancel() + try: + await self._monitor_task + except asyncio.CancelledError: + pass + self._start_time = None + + def get_status(self) -> PortStatus: + """Get current status of this port. + + Returns: + PortStatus with current recording metrics + """ + is_recording = self._process is not None and not self._process.done() + + # Calculate uptime in seconds + uptime_seconds = 0 + if self._start_time is not None: + uptime_seconds = int(time.time() - self._start_time) + + # Calculate FPS + fps = 0.0 + if uptime_seconds > 0: + fps = self._frame_count / uptime_seconds + + # Calculate bitrate (would need real-time data from FFmpeg in production) + # For now, return 0 as this would require parsing FFmpeg stats + bitrate_mbps = 0.0 + + codec = self._config.codec if self._config else CodecType.H264 + + return PortStatus( + port_index=self.port_index, + is_recording=is_recording, + frame_count=self._frame_count, + fps=fps, + bitrate_mbps=bitrate_mbps, + uptime_seconds=uptime_seconds, + current_file=self._current_file, + codec=codec, + ) + + async def _monitor_process(self) -> None: + """Parse FFmpeg stderr for frame=N updates. + + Reads FFmpeg stderr line by line and extracts frame count using regex. + Updates self._frame_count as data is received. + + Continues until the process exits or the task is cancelled. + """ + if self._process is None or self._process.stderr is None: + return + + try: + async for line in self._process.stderr: + try: + line_str = line.decode("utf-8", errors="ignore").strip() + if not line_str: + continue + + # Parse frame count from FFmpeg stderr + # Format: "frame= 123 fps=..." + match = re.search(r"frame=\s*(\d+)", line_str) + if match: + self._frame_count = int(match.group(1)) + logger.debug( + f"Port {self.port_index}: Frame count = {self._frame_count}" + ) + + except Exception as e: + logger.error(f"Port {self.port_index}: Error parsing stderr: {e}") + + except asyncio.CancelledError: + logger.debug(f"Port {self.port_index}: Monitoring task cancelled") + except Exception as e: + logger.error(f"Port {self.port_index}: Monitoring process error: {e}") + + +class RecorderManager: + """Manages recording on all Deltacast ports.""" + + def __init__(self, settings: Settings): + """Initialize the recorder manager. + + Args: + settings: Application settings + """ + self.settings = settings + self._recorders: dict[int, PortRecorder] = {} + + def initialize(self) -> None: + """Create PortRecorder for each port (0 to DELTACAST_PORT_COUNT-1).""" + for port_index in range(self.settings.deltacast_port_count): + self._recorders[port_index] = PortRecorder(port_index, self.settings) + logger.info( + f"Initialized {self.settings.deltacast_port_count} port recorders" + ) + + async def start_recording(self, port_index: int, config: RecorderConfig) -> None: + """Start recording on given port. + + Args: + port_index: 0-based port index + config: RecorderConfig with codec and output settings + + Raises: + ValueError: If port_index is invalid + RuntimeError: If port is already recording + """ + if port_index not in self._recorders: + raise ValueError( + f"Invalid port index {port_index}. " + f"Valid range: 0-{self.settings.deltacast_port_count - 1}" + ) + + recorder = self._recorders[port_index] + await recorder.start(config) + + async def stop_recording(self, port_index: int) -> None: + """Stop recording on given port. + + Args: + port_index: 0-based port index + + Raises: + ValueError: If port_index is invalid + """ + if port_index not in self._recorders: + raise ValueError( + f"Invalid port index {port_index}. " + f"Valid range: 0-{self.settings.deltacast_port_count - 1}" + ) + + recorder = self._recorders[port_index] + await recorder.stop() + + def get_status(self, port_index: int) -> PortStatus: + """Get status for a single port. + + Args: + port_index: 0-based port index + + Returns: + PortStatus with current metrics + + Raises: + ValueError: If port_index is invalid + """ + if port_index not in self._recorders: + raise ValueError( + f"Invalid port index {port_index}. " + f"Valid range: 0-{self.settings.deltacast_port_count - 1}" + ) + + return self._recorders[port_index].get_status() + + def get_all_status(self) -> list[PortStatus]: + """Get status for all ports. + + Returns: + List of PortStatus objects for each port + """ + return [self._recorders[i].get_status() + for i in range(self.settings.deltacast_port_count)]