"""FFmpeg command builder for Deltacast SDI recording.""" import os from datetime import datetime from ..config import Settings from ..models import RecorderConfig, CodecType def _deltacast_device_exists(port_index: int) -> bool: return os.path.exists(f"/dev/deltacast{port_index}") class FFmpegCommandBuilder: """Builds FFmpeg commands for Deltacast SDI recording.""" def __init__(self, settings: Settings): self.settings = settings def build_command(self, config: RecorderConfig) -> list[str]: command = [self.settings.ffmpeg_path] command.extend(self._get_input_args(config.port_index)) command.extend(self._get_codec_args(config)) command.extend(self._get_output_args(config)) # Multi-destination SRT outputs if config.srt_enabled: dests = [d for d in config.srt_destinations if d.enabled and d.url] if not dests and config.srt_destination: # Legacy single destination fallback dests_urls = [config.srt_destination] else: dests_urls = [d.url for d in dests] for url in dests_urls: command.extend(["-f", "mpegts", url]) return command def _get_input_args(self, port_index: int) -> list[str]: if _deltacast_device_exists(port_index): return ["-f", "deltacast", "-i", f"deltacast://{port_index}"] else: # No physical device — use lavfi test source so recording still works for testing return [ "-f", "lavfi", "-i", ( f"testsrc2=size=1920x1080:rate=30," f"drawtext=text='SDI PORT {port_index} — TEST MODE':" f"fontsize=48:fontcolor=white:x=(w-text_w)/2:y=(h-text_h)/2," f"drawtext=text='%{{localtime\\:%H\\:%M\\:%S}}':" f"fontsize=32:fontcolor=yellow:x=10:y=10" ), "-f", "lavfi", "-i", "sine=frequency=1000:sample_rate=48000", ] def _get_codec_args(self, config: RecorderConfig) -> list[str]: args = [] # Normalise bitrate — strip trailing M if present for numeric use bitrate = str(config.bitrate or "185M") bitrate_val = bitrate.rstrip("Mm") if config.codec == CodecType.PRORES: args.extend(["-c:v", "prores_ks"]) profile_map = {"hq": "3", "mq": "2", "lq": "0"} args.extend(["-profile:v", profile_map.get(config.quality_profile, "3")]) args.extend(["-c:a", "pcm_s16le"]) elif config.codec == CodecType.DNXHD: args.extend(["-c:v", "dnxhd", "-b:v", f"{bitrate_val}M"]) args.extend(["-c:a", "pcm_s16le"]) elif config.codec == CodecType.UNCOMPRESSED: args.extend(["-c:v", "rawvideo", "-pix_fmt", "uyvy422"]) args.extend(["-c:a", "pcm_s16le"]) elif config.codec == CodecType.H264: args.extend(["-c:v", "libx264", "-preset", "fast", "-b:v", f"{bitrate_val}M"]) args.extend(["-c:a", "aac", "-b:a", "192k"]) return args def _get_output_args(self, config: RecorderConfig) -> list[str]: output_path = config.recording_path if "{timestamp}" in output_path: output_path = output_path.replace("{timestamp}", datetime.now().strftime("%Y%m%d_%H%M%S")) if "{port_index}" in output_path: output_path = output_path.replace("{port_index}", str(config.port_index)) return [output_path]