diff --git a/backend/app/recorders/srt_streamer.py b/backend/app/recorders/srt_streamer.py index c870f49..d834eef 100644 --- a/backend/app/recorders/srt_streamer.py +++ b/backend/app/recorders/srt_streamer.py @@ -3,25 +3,16 @@ import asyncio import logging from urllib.parse import urlparse, parse_qs, urlencode, urlunparse -from backend.app.config import Settings -from backend.app.models import RecorderConfig, CodecType +from ..config import Settings +from ..models import RecorderConfig, CodecType logger = logging.getLogger(__name__) class SRTStreamer: - """ - Independent SRT output manager for streaming without recording. - Manages a separate FFmpeg process for SRT-only output. - """ + """Independent SRT output manager for streaming without recording.""" def __init__(self, port_index: int, settings: Settings): - """Initialize SRT streamer. - - Args: - port_index: 0-based Deltacast port index - settings: Application settings - """ self.port_index = port_index self.settings = settings self._process: asyncio.subprocess.Process | None = None @@ -29,105 +20,63 @@ class SRTStreamer: self._is_streaming: bool = False async def start_stream(self, config: RecorderConfig) -> None: - """Start SRT-only FFmpeg stream. - - Args: - config: RecorderConfig with codec and SRT settings - - Raises: - ValueError: If SRT not enabled in config or destination not set - RuntimeError: If already streaming - """ if not config.srt_enabled: raise ValueError("SRT is not enabled in config") - if not config.srt_destination: raise ValueError("SRT destination is not set in config") - if self._is_streaming: raise RuntimeError(f"Port {self.port_index} is already streaming SRT") self._config = config - - # Build FFmpeg command command = self._build_srt_command(config) logger.info(f"Port {self.port_index}: Starting SRT stream: {' '.join(command)}") - # Spawn subprocess self._process = await asyncio.create_subprocess_exec( *command, stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, ) - self._is_streaming = True logger.info(f"Port {self.port_index}: SRT stream started (PID: {self._process.pid})") async def stop_stream(self) -> None: - """Stop SRT stream gracefully: stdin q -> SIGTERM -> SIGKILL. - - The subprocess is terminated using escalating methods: - 1. Send 'q\n' to stdin (graceful quit) - 2. Wait up to 3 seconds for process to exit - 3. Send SIGTERM - 4. Wait up to 2 seconds for process to exit - 5. Send SIGKILL - - If already stopped, this is a no-op. - """ if not self._is_streaming: - logger.info(f"Port {self.port_index}: Not currently streaming") return - logger.info(f"Port {self.port_index}: Stopping SRT stream (PID: {self._process.pid})") - try: - # Step 1: Try graceful quit via stdin if self._process and self._process.stdin and not self._process.stdin.is_closing(): self._process.stdin.write(b"q\n") await self._process.stdin.drain() - logger.debug(f"Port {self.port_index}: Sent quit command to stdin") - # Step 2: Wait up to 3 seconds for graceful shutdown try: await asyncio.wait_for(self._process.wait(), timeout=3.0) - logger.info(f"Port {self.port_index}: SRT stream exited gracefully") self._process = None self._is_streaming = False return except asyncio.TimeoutError: - logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM") + pass - # Step 3: Send SIGTERM if self._process: try: self._process.terminate() except ProcessLookupError: - pass # Process already terminated - except Exception as e: - logger.error(f"Port {self.port_index}: Error sending SIGTERM: {e}") + pass - # Step 4: Wait up to 2 seconds for SIGTERM to work try: await asyncio.wait_for(self._process.wait(), timeout=2.0) - logger.info(f"Port {self.port_index}: SRT stream exited after SIGTERM") self._process = None self._is_streaming = False return except asyncio.TimeoutError: - logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL") + pass - # Step 5: Send SIGKILL if self._process: try: self._process.kill() await self._process.wait() - logger.info(f"Port {self.port_index}: SRT stream killed") except ProcessLookupError: - pass # Process already terminated - except Exception as e: - logger.error(f"Port {self.port_index}: Error sending SIGKILL: {e}") + pass except Exception as e: logger.error(f"Port {self.port_index}: Error stopping SRT stream: {e}") @@ -137,103 +86,33 @@ class SRTStreamer: @property def is_streaming(self) -> bool: - """Return current streaming state.""" return self._is_streaming def _build_srt_command(self, config: RecorderConfig) -> list[str]: - """Build FFmpeg command for SRT-only output (no file recording). - - Command structure: - ffmpeg -f deltacast -i deltacast://{port_index} [codec_args] -f mpegts {srt_destination} - - Args: - config: RecorderConfig with port, codec, and SRT settings - - Returns: - List of command arguments ready for subprocess execution - """ command = [self.settings.ffmpeg_path] - - # Add input device arguments - command.extend([ - "-f", "deltacast", - "-i", f"deltacast://{self.port_index}" - ]) - - # Add codec arguments + command.extend(["-f", "deltacast", "-i", f"deltacast://{self.port_index}"]) command.extend(self._get_codec_args(config)) - - # Add SRT output with mpegts format srt_destination = self._add_latency_to_srt_url(config.srt_destination) - command.extend([ - "-f", "mpegts", - srt_destination - ]) - + command.extend(["-f", "mpegts", srt_destination]) return command def _get_codec_args(self, config: RecorderConfig) -> list[str]: - """Map CodecType to FFmpeg codec arguments for SRT output. - - Args: - config: RecorderConfig with codec selection - - Returns: - Codec-specific FFmpeg arguments - """ args = [] - if config.codec == CodecType.PRORES: args.extend(["-c:v", "prores_ks"]) - profile_map = { - "hq": "3", - "mq": "2", - "lq": "0", - } - profile = profile_map.get(config.quality_profile, "3") - args.extend(["-profile:v", profile]) - + profile_map = {"hq": "3", "mq": "2", "lq": "0"} + args.extend(["-profile:v", profile_map.get(config.quality_profile, "3")]) elif config.codec == CodecType.DNXHD: - args.extend(["-c:v", "dnxhd"]) - bitrate = f"{config.bitrate}M" if config.bitrate else "185M" - args.extend(["-b:v", bitrate]) - + args.extend(["-c:v", "dnxhd", "-b:v", f"{config.bitrate}M" if config.bitrate else "185M"]) elif config.codec == CodecType.UNCOMPRESSED: args.extend(["-c:v", "rawvideo", "-pix_fmt", "uyvy422"]) - elif config.codec == CodecType.H264: - args.extend(["-c:v", "libx264"]) - bitrate = f"{config.bitrate}M" if config.bitrate else "8M" - args.extend(["-b:v", bitrate]) - + args.extend(["-c:v", "libx264", "-b:v", f"{config.bitrate}M" if config.bitrate else "8M"]) return args def _add_latency_to_srt_url(self, srt_url: str) -> str: - """Add or override latency parameter in SRT URL. - - Args: - srt_url: SRT destination URL (e.g., srt://host:port or srt://host:port?latency=...) - - Returns: - SRT URL with latency parameter set from settings - """ parsed = urlparse(srt_url) params = parse_qs(parsed.query, keep_blank_values=True) - - # Set latency from settings params["latency"] = [str(self.settings.srt_latency)] - - # Reconstruct query string new_query = urlencode(params, doseq=True) - - # Reconstruct URL - new_url = urlunparse(( - parsed.scheme, - parsed.netloc, - parsed.path, - parsed.params, - new_query, - parsed.fragment - )) - - return new_url + return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, new_query, parsed.fragment))