"""Recorder management for Deltacast SDI recording ports.""" import asyncio import re import time import logging from backend.app.config import Settings from backend.app.models import RecorderConfig, PortStatus, CodecType from backend.app.utils.ffmpeg import FFmpegCommandBuilder logger = logging.getLogger(__name__) class PortRecorder: """Manages FFmpeg recording subprocess for a single Deltacast port.""" def __init__(self, port_index: int, settings: Settings): """Initialize a port recorder. Args: port_index: 0-based port index settings: Application settings """ self.port_index = port_index self.settings = settings self._process: asyncio.subprocess.Process | None = None self._config: RecorderConfig | None = None self._start_time: float | None = None self._frame_count: int = 0 self._current_file: str = "" self._monitor_task: asyncio.Task | None = None self._command_builder = FFmpegCommandBuilder(settings) async def start(self, config: RecorderConfig) -> None: """Start FFmpeg recording subprocess. Args: config: RecorderConfig with codec and output settings Raises: RuntimeError: If recorder is already running """ if self._process is not None: raise RuntimeError(f"Port {self.port_index} is already recording") self._config = config self._current_file = config.recording_path self._frame_count = 0 self._start_time = time.time() # Build FFmpeg command command = self._command_builder.build_command(config) logger.info(f"Port {self.port_index}: Starting FFmpeg: {' '.join(command)}") # Spawn subprocess self._process = await asyncio.create_subprocess_exec( *command, stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Start monitoring task self._monitor_task = asyncio.create_task(self._monitor_process()) logger.info(f"Port {self.port_index}: Recording started (PID: {self._process.pid})") async def stop(self) -> None: """Gracefully stop FFmpeg: stdin quit -> SIGTERM -> SIGKILL. The subprocess is terminated using escalating methods: 1. Send 'q\n' to stdin (graceful quit) 2. Wait up to 3 seconds for process to exit 3. Send SIGTERM 4. Wait up to 2 seconds for process to exit 5. Send SIGKILL If already stopped, this is a no-op. """ if self._process is None: logger.info(f"Port {self.port_index}: Already stopped") return logger.info(f"Port {self.port_index}: Stopping recording (PID: {self._process.pid})") try: # Step 1: Try graceful quit via stdin if self._process.stdin and not self._process.stdin.is_closing(): self._process.stdin.write(b"q\n") await self._process.stdin.drain() logger.debug(f"Port {self.port_index}: Sent quit command to stdin") # Step 2: Wait up to 3 seconds for graceful shutdown try: await asyncio.wait_for(self._process.wait(), timeout=3.0) logger.info(f"Port {self.port_index}: Process exited gracefully") self._process = None return except asyncio.TimeoutError: logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM") # Step 3: Send SIGTERM self._process.terminate() # Step 4: Wait up to 2 seconds for SIGTERM to work try: await asyncio.wait_for(self._process.wait(), timeout=2.0) logger.info(f"Port {self.port_index}: Process exited after SIGTERM") self._process = None return except asyncio.TimeoutError: logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL") # Step 5: Send SIGKILL self._process.kill() await self._process.wait() logger.info(f"Port {self.port_index}: Process killed") except Exception as e: logger.error(f"Port {self.port_index}: Error stopping process: {e}") finally: self._process = None if self._monitor_task and not self._monitor_task.done(): self._monitor_task.cancel() try: await self._monitor_task except asyncio.CancelledError: pass self._start_time = None def get_status(self) -> PortStatus: """Get current status of this port. Returns: PortStatus with current recording metrics """ is_recording = self._process is not None and not self._process.done() # Calculate uptime in seconds uptime_seconds = 0 if self._start_time is not None: uptime_seconds = int(time.time() - self._start_time) # Calculate FPS fps = 0.0 if uptime_seconds > 0: fps = self._frame_count / uptime_seconds # Calculate bitrate (would need real-time data from FFmpeg in production) # For now, return 0 as this would require parsing FFmpeg stats bitrate_mbps = 0.0 codec = self._config.codec if self._config else CodecType.H264 return PortStatus( port_index=self.port_index, is_recording=is_recording, frame_count=self._frame_count, fps=fps, bitrate_mbps=bitrate_mbps, uptime_seconds=uptime_seconds, current_file=self._current_file, codec=codec, ) async def _monitor_process(self) -> None: """Parse FFmpeg stderr for frame=N updates. Reads FFmpeg stderr line by line and extracts frame count using regex. Updates self._frame_count as data is received. Continues until the process exits or the task is cancelled. """ if self._process is None or self._process.stderr is None: return try: async for line in self._process.stderr: try: line_str = line.decode("utf-8", errors="ignore").strip() if not line_str: continue # Parse frame count from FFmpeg stderr # Format: "frame= 123 fps=..." match = re.search(r"frame=\s*(\d+)", line_str) if match: self._frame_count = int(match.group(1)) logger.debug( f"Port {self.port_index}: Frame count = {self._frame_count}" ) except Exception as e: logger.error(f"Port {self.port_index}: Error parsing stderr: {e}") except asyncio.CancelledError: logger.debug(f"Port {self.port_index}: Monitoring task cancelled") except Exception as e: logger.error(f"Port {self.port_index}: Monitoring process error: {e}") class RecorderManager: """Manages recording on all Deltacast ports.""" def __init__(self, settings: Settings): """Initialize the recorder manager. Args: settings: Application settings """ self.settings = settings self._recorders: dict[int, PortRecorder] = {} def initialize(self) -> None: """Create PortRecorder for each port (0 to DELTACAST_PORT_COUNT-1).""" for port_index in range(self.settings.deltacast_port_count): self._recorders[port_index] = PortRecorder(port_index, self.settings) logger.info( f"Initialized {self.settings.deltacast_port_count} port recorders" ) async def start_recording(self, port_index: int, config: RecorderConfig) -> None: """Start recording on given port. Args: port_index: 0-based port index config: RecorderConfig with codec and output settings Raises: ValueError: If port_index is invalid RuntimeError: If port is already recording """ if port_index not in self._recorders: raise ValueError( f"Invalid port index {port_index}. " f"Valid range: 0-{self.settings.deltacast_port_count - 1}" ) recorder = self._recorders[port_index] await recorder.start(config) async def stop_recording(self, port_index: int) -> None: """Stop recording on given port. Args: port_index: 0-based port index Raises: ValueError: If port_index is invalid """ if port_index not in self._recorders: raise ValueError( f"Invalid port index {port_index}. " f"Valid range: 0-{self.settings.deltacast_port_count - 1}" ) recorder = self._recorders[port_index] await recorder.stop() def get_status(self, port_index: int) -> PortStatus: """Get status for a single port. Args: port_index: 0-based port index Returns: PortStatus with current metrics Raises: ValueError: If port_index is invalid """ if port_index not in self._recorders: raise ValueError( f"Invalid port index {port_index}. " f"Valid range: 0-{self.settings.deltacast_port_count - 1}" ) return self._recorders[port_index].get_status() def get_all_status(self) -> list[PortStatus]: """Get status for all ports. Returns: List of PortStatus objects for each port """ return [self._recorders[i].get_status() for i in range(self.settings.deltacast_port_count)]