diff --git a/backend/app/recorders/recorder.py b/backend/app/recorders/recorder.py index e2b93a2..e5df2a6 100644 --- a/backend/app/recorders/recorder.py +++ b/backend/app/recorders/recorder.py @@ -4,9 +4,9 @@ import asyncio import re import time import logging -from backend.app.config import Settings -from backend.app.models import RecorderConfig, PortStatus, CodecType -from backend.app.utils.ffmpeg import FFmpegCommandBuilder +from ..config import Settings +from ..models import RecorderConfig, PortStatus, CodecType +from ..utils.ffmpeg import FFmpegCommandBuilder logger = logging.getLogger(__name__) @@ -15,12 +15,6 @@ class PortRecorder: """Manages FFmpeg recording subprocess for a single Deltacast port.""" def __init__(self, port_index: int, settings: Settings): - """Initialize a port recorder. - - Args: - port_index: 0-based port index - settings: Application settings - """ self.port_index = port_index self.settings = settings self._process: asyncio.subprocess.Process | None = None @@ -32,14 +26,6 @@ class PortRecorder: self._command_builder = FFmpegCommandBuilder(settings) async def start(self, config: RecorderConfig) -> None: - """Start FFmpeg recording subprocess. - - Args: - config: RecorderConfig with codec and output settings - - Raises: - RuntimeError: If recorder is already running - """ if self._process is not None: raise RuntimeError(f"Port {self.port_index} is already recording") @@ -48,33 +34,19 @@ class PortRecorder: self._frame_count = 0 self._start_time = time.time() - # Build FFmpeg command command = self._command_builder.build_command(config) logger.info(f"Port {self.port_index}: Starting FFmpeg: {' '.join(command)}") - # Spawn subprocess self._process = await asyncio.create_subprocess_exec( *command, stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - # Start monitoring task self._monitor_task = asyncio.create_task(self._monitor_process()) logger.info(f"Port {self.port_index}: Recording started (PID: {self._process.pid})") async def stop(self) -> None: - """Gracefully stop FFmpeg: stdin quit -> SIGTERM -> SIGKILL. - - The subprocess is terminated using escalating methods: - 1. Send 'q\n' to stdin (graceful quit) - 2. Wait up to 3 seconds for process to exit - 3. Send SIGTERM - 4. Wait up to 2 seconds for process to exit - 5. Send SIGKILL - - If already stopped, this is a no-op. - """ if self._process is None: logger.info(f"Port {self.port_index}: Already stopped") return @@ -82,37 +54,28 @@ class PortRecorder: logger.info(f"Port {self.port_index}: Stopping recording (PID: {self._process.pid})") try: - # Step 1: Try graceful quit via stdin if self._process.stdin and not self._process.stdin.is_closing(): self._process.stdin.write(b"q\n") await self._process.stdin.drain() - logger.debug(f"Port {self.port_index}: Sent quit command to stdin") - # Step 2: Wait up to 3 seconds for graceful shutdown try: await asyncio.wait_for(self._process.wait(), timeout=3.0) - logger.info(f"Port {self.port_index}: Process exited gracefully") self._process = None return except asyncio.TimeoutError: logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM") - # Step 3: Send SIGTERM self._process.terminate() - # Step 4: Wait up to 2 seconds for SIGTERM to work try: await asyncio.wait_for(self._process.wait(), timeout=2.0) - logger.info(f"Port {self.port_index}: Process exited after SIGTERM") self._process = None return except asyncio.TimeoutError: logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL") - # Step 5: Send SIGKILL self._process.kill() await self._process.wait() - logger.info(f"Port {self.port_index}: Process killed") except Exception as e: logger.error(f"Port {self.port_index}: Error stopping process: {e}") @@ -127,27 +90,17 @@ class PortRecorder: self._start_time = None def get_status(self) -> PortStatus: - """Get current status of this port. - - Returns: - PortStatus with current recording metrics - """ is_recording = self._process is not None and not self._process.done() - # Calculate uptime in seconds uptime_seconds = 0 if self._start_time is not None: uptime_seconds = int(time.time() - self._start_time) - # Calculate FPS fps = 0.0 if uptime_seconds > 0: fps = self._frame_count / uptime_seconds - # Calculate bitrate (would need real-time data from FFmpeg in production) - # For now, return 0 as this would require parsing FFmpeg stats bitrate_mbps = 0.0 - codec = self._config.codec if self._config else CodecType.H264 return PortStatus( @@ -162,13 +115,6 @@ class PortRecorder: ) async def _monitor_process(self) -> None: - """Parse FFmpeg stderr for frame=N updates. - - Reads FFmpeg stderr line by line and extracts frame count using regex. - Updates self._frame_count as data is received. - - Continues until the process exits or the task is cancelled. - """ if self._process is None or self._process.stderr is None: return @@ -178,21 +124,14 @@ class PortRecorder: line_str = line.decode("utf-8", errors="ignore").strip() if not line_str: continue - - # Parse frame count from FFmpeg stderr - # Format: "frame= 123 fps=..." match = re.search(r"frame=\s*(\d+)", line_str) if match: self._frame_count = int(match.group(1)) - logger.debug( - f"Port {self.port_index}: Frame count = {self._frame_count}" - ) - except Exception as e: logger.error(f"Port {self.port_index}: Error parsing stderr: {e}") except asyncio.CancelledError: - logger.debug(f"Port {self.port_index}: Monitoring task cancelled") + pass except Exception as e: logger.error(f"Port {self.port_index}: Monitoring process error: {e}") @@ -201,85 +140,29 @@ class RecorderManager: """Manages recording on all Deltacast ports.""" def __init__(self, settings: Settings): - """Initialize the recorder manager. - - Args: - settings: Application settings - """ self.settings = settings self._recorders: dict[int, PortRecorder] = {} def initialize(self) -> None: - """Create PortRecorder for each port (0 to DELTACAST_PORT_COUNT-1).""" for port_index in range(self.settings.deltacast_port_count): self._recorders[port_index] = PortRecorder(port_index, self.settings) - logger.info( - f"Initialized {self.settings.deltacast_port_count} port recorders" - ) + logger.info(f"Initialized {self.settings.deltacast_port_count} port recorders") async def start_recording(self, port_index: int, config: RecorderConfig) -> None: - """Start recording on given port. - - Args: - port_index: 0-based port index - config: RecorderConfig with codec and output settings - - Raises: - ValueError: If port_index is invalid - RuntimeError: If port is already recording - """ if port_index not in self._recorders: - raise ValueError( - f"Invalid port index {port_index}. " - f"Valid range: 0-{self.settings.deltacast_port_count - 1}" - ) - - recorder = self._recorders[port_index] - await recorder.start(config) + raise ValueError(f"Invalid port index {port_index}.") + await self._recorders[port_index].start(config) async def stop_recording(self, port_index: int) -> None: - """Stop recording on given port. - - Args: - port_index: 0-based port index - - Raises: - ValueError: If port_index is invalid - """ if port_index not in self._recorders: - raise ValueError( - f"Invalid port index {port_index}. " - f"Valid range: 0-{self.settings.deltacast_port_count - 1}" - ) - - recorder = self._recorders[port_index] - await recorder.stop() + raise ValueError(f"Invalid port index {port_index}.") + await self._recorders[port_index].stop() def get_status(self, port_index: int) -> PortStatus: - """Get status for a single port. - - Args: - port_index: 0-based port index - - Returns: - PortStatus with current metrics - - Raises: - ValueError: If port_index is invalid - """ if port_index not in self._recorders: - raise ValueError( - f"Invalid port index {port_index}. " - f"Valid range: 0-{self.settings.deltacast_port_count - 1}" - ) - + raise ValueError(f"Invalid port index {port_index}.") return self._recorders[port_index].get_status() def get_all_status(self) -> list[PortStatus]: - """Get status for all ports. - - Returns: - List of PortStatus objects for each port - """ return [self._recorders[i].get_status() for i in range(self.settings.deltacast_port_count)]