From c3523fdacabb2b09d1a180a8c3559a2b4333d922 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Tue, 14 Apr 2026 09:21:10 -0400 Subject: [PATCH] Add backend/app/recorders/srt_streamer.py --- backend/app/recorders/srt_streamer.py | 239 ++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 backend/app/recorders/srt_streamer.py diff --git a/backend/app/recorders/srt_streamer.py b/backend/app/recorders/srt_streamer.py new file mode 100644 index 0000000..c870f49 --- /dev/null +++ b/backend/app/recorders/srt_streamer.py @@ -0,0 +1,239 @@ +"""SRT streaming for Deltacast SDI input.""" + +import asyncio +import logging +from urllib.parse import urlparse, parse_qs, urlencode, urlunparse +from backend.app.config import Settings +from backend.app.models import RecorderConfig, CodecType + +logger = logging.getLogger(__name__) + + +class SRTStreamer: + """ + Independent SRT output manager for streaming without recording. + Manages a separate FFmpeg process for SRT-only output. + """ + + def __init__(self, port_index: int, settings: Settings): + """Initialize SRT streamer. + + Args: + port_index: 0-based Deltacast port index + settings: Application settings + """ + self.port_index = port_index + self.settings = settings + self._process: asyncio.subprocess.Process | None = None + self._config: RecorderConfig | None = None + self._is_streaming: bool = False + + async def start_stream(self, config: RecorderConfig) -> None: + """Start SRT-only FFmpeg stream. + + Args: + config: RecorderConfig with codec and SRT settings + + Raises: + ValueError: If SRT not enabled in config or destination not set + RuntimeError: If already streaming + """ + if not config.srt_enabled: + raise ValueError("SRT is not enabled in config") + + if not config.srt_destination: + raise ValueError("SRT destination is not set in config") + + if self._is_streaming: + raise RuntimeError(f"Port {self.port_index} is already streaming SRT") + + self._config = config + + # Build FFmpeg command + command = self._build_srt_command(config) + logger.info(f"Port {self.port_index}: Starting SRT stream: {' '.join(command)}") + + # Spawn subprocess + self._process = await asyncio.create_subprocess_exec( + *command, + stdin=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + ) + + self._is_streaming = True + logger.info(f"Port {self.port_index}: SRT stream started (PID: {self._process.pid})") + + async def stop_stream(self) -> None: + """Stop SRT stream gracefully: stdin q -> SIGTERM -> SIGKILL. + + The subprocess is terminated using escalating methods: + 1. Send 'q\n' to stdin (graceful quit) + 2. Wait up to 3 seconds for process to exit + 3. Send SIGTERM + 4. Wait up to 2 seconds for process to exit + 5. Send SIGKILL + + If already stopped, this is a no-op. + """ + if not self._is_streaming: + logger.info(f"Port {self.port_index}: Not currently streaming") + return + + logger.info(f"Port {self.port_index}: Stopping SRT stream (PID: {self._process.pid})") + + try: + # Step 1: Try graceful quit via stdin + if self._process and self._process.stdin and not self._process.stdin.is_closing(): + self._process.stdin.write(b"q\n") + await self._process.stdin.drain() + logger.debug(f"Port {self.port_index}: Sent quit command to stdin") + + # Step 2: Wait up to 3 seconds for graceful shutdown + try: + await asyncio.wait_for(self._process.wait(), timeout=3.0) + logger.info(f"Port {self.port_index}: SRT stream exited gracefully") + self._process = None + self._is_streaming = False + return + except asyncio.TimeoutError: + logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM") + + # Step 3: Send SIGTERM + if self._process: + try: + self._process.terminate() + except ProcessLookupError: + pass # Process already terminated + except Exception as e: + logger.error(f"Port {self.port_index}: Error sending SIGTERM: {e}") + + # Step 4: Wait up to 2 seconds for SIGTERM to work + try: + await asyncio.wait_for(self._process.wait(), timeout=2.0) + logger.info(f"Port {self.port_index}: SRT stream exited after SIGTERM") + self._process = None + self._is_streaming = False + return + except asyncio.TimeoutError: + logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL") + + # Step 5: Send SIGKILL + if self._process: + try: + self._process.kill() + await self._process.wait() + logger.info(f"Port {self.port_index}: SRT stream killed") + except ProcessLookupError: + pass # Process already terminated + except Exception as e: + logger.error(f"Port {self.port_index}: Error sending SIGKILL: {e}") + + except Exception as e: + logger.error(f"Port {self.port_index}: Error stopping SRT stream: {e}") + finally: + self._process = None + self._is_streaming = False + + @property + def is_streaming(self) -> bool: + """Return current streaming state.""" + return self._is_streaming + + def _build_srt_command(self, config: RecorderConfig) -> list[str]: + """Build FFmpeg command for SRT-only output (no file recording). + + Command structure: + ffmpeg -f deltacast -i deltacast://{port_index} [codec_args] -f mpegts {srt_destination} + + Args: + config: RecorderConfig with port, codec, and SRT settings + + Returns: + List of command arguments ready for subprocess execution + """ + command = [self.settings.ffmpeg_path] + + # Add input device arguments + command.extend([ + "-f", "deltacast", + "-i", f"deltacast://{self.port_index}" + ]) + + # Add codec arguments + command.extend(self._get_codec_args(config)) + + # Add SRT output with mpegts format + srt_destination = self._add_latency_to_srt_url(config.srt_destination) + command.extend([ + "-f", "mpegts", + srt_destination + ]) + + return command + + def _get_codec_args(self, config: RecorderConfig) -> list[str]: + """Map CodecType to FFmpeg codec arguments for SRT output. + + Args: + config: RecorderConfig with codec selection + + Returns: + Codec-specific FFmpeg arguments + """ + args = [] + + if config.codec == CodecType.PRORES: + args.extend(["-c:v", "prores_ks"]) + profile_map = { + "hq": "3", + "mq": "2", + "lq": "0", + } + profile = profile_map.get(config.quality_profile, "3") + args.extend(["-profile:v", profile]) + + elif config.codec == CodecType.DNXHD: + args.extend(["-c:v", "dnxhd"]) + bitrate = f"{config.bitrate}M" if config.bitrate else "185M" + args.extend(["-b:v", bitrate]) + + elif config.codec == CodecType.UNCOMPRESSED: + args.extend(["-c:v", "rawvideo", "-pix_fmt", "uyvy422"]) + + elif config.codec == CodecType.H264: + args.extend(["-c:v", "libx264"]) + bitrate = f"{config.bitrate}M" if config.bitrate else "8M" + args.extend(["-b:v", bitrate]) + + return args + + def _add_latency_to_srt_url(self, srt_url: str) -> str: + """Add or override latency parameter in SRT URL. + + Args: + srt_url: SRT destination URL (e.g., srt://host:port or srt://host:port?latency=...) + + Returns: + SRT URL with latency parameter set from settings + """ + parsed = urlparse(srt_url) + params = parse_qs(parsed.query, keep_blank_values=True) + + # Set latency from settings + params["latency"] = [str(self.settings.srt_latency)] + + # Reconstruct query string + new_query = urlencode(params, doseq=True) + + # Reconstruct URL + new_url = urlunparse(( + parsed.scheme, + parsed.netloc, + parsed.path, + parsed.params, + new_query, + parsed.fragment + )) + + return new_url