deltacast-sdi-recorder/backend/app/utils/ffmpeg.py

83 lines
3.5 KiB
Python

"""FFmpeg command builder for Deltacast SDI recording."""
import os
from datetime import datetime
from ..config import Settings
from ..models import RecorderConfig, CodecType
def _deltacast_device_exists(port_index: int) -> bool:
return os.path.exists(f"/dev/deltacast{port_index}")
class FFmpegCommandBuilder:
"""Builds FFmpeg commands for Deltacast SDI recording."""
def __init__(self, settings: Settings):
self.settings = settings
def build_command(self, config: RecorderConfig) -> list[str]:
command = [self.settings.ffmpeg_path]
command.extend(self._get_input_args(config.port_index))
command.extend(self._get_codec_args(config))
command.extend(self._get_output_args(config))
# Multi-destination SRT outputs
if config.srt_enabled:
dests = [d for d in config.srt_destinations if d.enabled and d.url]
if not dests and config.srt_destination:
# Legacy single destination fallback
dests_urls = [config.srt_destination]
else:
dests_urls = [d.url for d in dests]
for url in dests_urls:
command.extend(["-f", "mpegts", url])
return command
def _get_input_args(self, port_index: int) -> list[str]:
if _deltacast_device_exists(port_index):
return ["-f", "deltacast", "-i", f"deltacast://{port_index}"]
else:
# No physical device — use lavfi test source so recording still works for testing
return [
"-f", "lavfi",
"-i", (
f"testsrc2=size=1920x1080:rate=30,"
f"drawtext=text='SDI PORT {port_index} — TEST MODE':"
f"fontsize=48:fontcolor=white:x=(w-text_w)/2:y=(h-text_h)/2,"
f"drawtext=text='%{{localtime\\:%H\\:%M\\:%S}}':"
f"fontsize=32:fontcolor=yellow:x=10:y=10"
),
"-f", "lavfi", "-i", "sine=frequency=1000:sample_rate=48000",
]
def _get_codec_args(self, config: RecorderConfig) -> list[str]:
args = []
# Normalise bitrate — strip trailing M if present for numeric use
bitrate = str(config.bitrate or "185M")
bitrate_val = bitrate.rstrip("Mm")
if config.codec == CodecType.PRORES:
args.extend(["-c:v", "prores_ks"])
profile_map = {"hq": "3", "mq": "2", "lq": "0"}
args.extend(["-profile:v", profile_map.get(config.quality_profile, "3")])
args.extend(["-c:a", "pcm_s16le"])
elif config.codec == CodecType.DNXHD:
args.extend(["-c:v", "dnxhd", "-b:v", f"{bitrate_val}M"])
args.extend(["-c:a", "pcm_s16le"])
elif config.codec == CodecType.UNCOMPRESSED:
args.extend(["-c:v", "rawvideo", "-pix_fmt", "uyvy422"])
args.extend(["-c:a", "pcm_s16le"])
elif config.codec == CodecType.H264:
args.extend(["-c:v", "libx264", "-preset", "fast", "-b:v", f"{bitrate_val}M"])
args.extend(["-c:a", "aac", "-b:a", "192k"])
return args
def _get_output_args(self, config: RecorderConfig) -> list[str]:
output_path = config.recording_path
if "{timestamp}" in output_path:
output_path = output_path.replace("{timestamp}", datetime.now().strftime("%Y%m%d_%H%M%S"))
if "{port_index}" in output_path:
output_path = output_path.replace("{port_index}", str(config.port_index))
return [output_path]