diff --git a/backend/app/utils/hls.py b/backend/app/utils/hls.py new file mode 100644 index 0000000..693378b --- /dev/null +++ b/backend/app/utils/hls.py @@ -0,0 +1,206 @@ +"""HLS preview manager for live video preview via HTTP Live Streaming.""" + +import asyncio +import logging +from pathlib import Path +from ..config import Settings + +logger = logging.getLogger(__name__) + + +class HLSPreviewManager: + """ + Manages FFmpeg HLS transcoding processes for video preview. + Creates per-port HLS streams at low bitrate for web browser playback. + """ + + def __init__(self, settings: Settings, hls_dir: str = "/tmp/hls"): + """Initialize HLS preview manager. + + Args: + settings: Application settings with ffmpeg_path + hls_dir: Directory for HLS playlist and segment files + """ + self.settings = settings + self.hls_dir = Path(hls_dir) + self._processes: dict[int, asyncio.subprocess.Process] = {} + + async def start_preview(self, port_index: int) -> None: + """ + Start HLS transcoding for a port. + Creates {hls_dir}/port_{port_index}.m3u8 and segment files. + No-op if already running for this port. + + Args: + port_index: 0-based port index + """ + if port_index in self._processes and self._processes[port_index] is not None: + logger.info(f"Port {port_index}: HLS preview already running") + return + + # Ensure hls_dir exists + self.hls_dir.mkdir(parents=True, exist_ok=True) + + # Build FFmpeg command + command = self._build_hls_command(port_index) + logger.info(f"Port {port_index}: Starting HLS preview: {' '.join(command)}") + + # Spawn subprocess + try: + process = await asyncio.create_subprocess_exec( + *command, + stdin=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + ) + self._processes[port_index] = process + logger.info(f"Port {port_index}: HLS preview started (PID: {process.pid})") + except Exception as e: + logger.error(f"Port {port_index}: Failed to start HLS preview: {e}") + raise + + async def stop_preview(self, port_index: int) -> None: + """ + Stop HLS transcoding for a port. + Graceful shutdown: stdin q -> SIGTERM -> SIGKILL. + No-op if not running. + + Args: + port_index: 0-based port index + """ + if port_index not in self._processes or self._processes[port_index] is None: + logger.info(f"Port {port_index}: HLS preview not running") + return + + process = self._processes[port_index] + logger.info(f"Port {port_index}: Stopping HLS preview (PID: {process.pid})") + + try: + # Step 1: Try graceful quit via stdin + if process.stdin and not process.stdin.is_closing(): + try: + process.stdin.write(b"q\n") + await process.stdin.drain() + logger.debug(f"Port {port_index}: Sent quit command to stdin") + except (BrokenPipeError, ConnectionResetError): + logger.debug(f"Port {port_index}: Could not write to stdin (already closed)") + + # Step 2: Wait up to 3 seconds for graceful shutdown + try: + await asyncio.wait_for(process.wait(), timeout=3.0) + logger.info(f"Port {port_index}: HLS process exited gracefully") + self._processes[port_index] = None + return + except asyncio.TimeoutError: + logger.warning(f"Port {port_index}: Graceful quit timeout, sending SIGTERM") + + # Step 3: Send SIGTERM + try: + process.terminate() + except ProcessLookupError: + logger.debug(f"Port {port_index}: Process already terminated") + self._processes[port_index] = None + return + + # Step 4: Wait up to 2 seconds for SIGTERM to work + try: + await asyncio.wait_for(process.wait(), timeout=2.0) + logger.info(f"Port {port_index}: HLS process exited after SIGTERM") + self._processes[port_index] = None + return + except asyncio.TimeoutError: + logger.warning(f"Port {port_index}: SIGTERM timeout, sending SIGKILL") + + # Step 5: Send SIGKILL + try: + process.kill() + await process.wait() + logger.info(f"Port {port_index}: HLS process killed") + except ProcessLookupError: + logger.debug(f"Port {port_index}: Process already killed") + + except Exception as e: + logger.error(f"Port {port_index}: Error stopping HLS preview: {e}") + finally: + self._processes[port_index] = None + + async def stop_all(self) -> None: + """Stop all HLS preview processes.""" + logger.info("Stopping all HLS preview processes") + port_indices = list(self._processes.keys()) + for port_index in port_indices: + await self.stop_preview(port_index) + logger.info("All HLS preview processes stopped") + + def get_playlist_path(self, port_index: int) -> Path: + """Return path to HLS playlist file for given port. + + Args: + port_index: 0-based port index + + Returns: + Path to the .m3u8 playlist file + """ + return self.hls_dir / f"port_{port_index}.m3u8" + + def is_previewing(self, port_index: int) -> bool: + """Return True if HLS preview is running for this port. + + Args: + port_index: 0-based port index + + Returns: + True if preview is active and process is running + """ + if port_index not in self._processes: + return False + process = self._processes[port_index] + return process is not None and not process.done() + + def _build_hls_command(self, port_index: int) -> list[str]: + """ + Build FFmpeg HLS transcoding command. + + Args: + port_index: 0-based port index + + Returns: + List of command arguments for FFmpeg + + Command structure: + ffmpeg -f deltacast -i deltacast://{port_index} + -c:v libx264 -preset ultrafast -tune zerolatency + -b:v 5M -s 1280x720 + -c:a aac -b:a 128k + -f hls + -hls_time 5 + -hls_list_size 3 + -hls_flags delete_segments + {hls_dir}/port_{port_index}.m3u8 + """ + playlist_path = self.get_playlist_path(port_index) + + command = [ + self.settings.ffmpeg_path, + # Input: Deltacast SDI + "-f", "deltacast", + "-i", f"deltacast://{port_index}", + # Video codec: x264 with low latency settings + "-c:v", "libx264", + "-preset", "ultrafast", + "-tune", "zerolatency", + "-b:v", "5M", + "-s", "1280x720", + # Audio codec: AAC + "-c:a", "aac", + "-b:a", "128k", + # HLS output format + "-f", "hls", + "-hls_time", "5", + "-hls_list_size", "3", + "-hls_flags", "delete_segments", + # Output path + str(playlist_path), + ] + + return command