51 lines
2.2 KiB
Python
51 lines
2.2 KiB
Python
"""FFmpeg command builder for Deltacast SDI recording."""
|
|
|
|
from datetime import datetime
|
|
from ..config import Settings
|
|
from ..models import RecorderConfig, CodecType
|
|
|
|
|
|
class FFmpegCommandBuilder:
|
|
"""Builds FFmpeg commands for Deltacast SDI recording."""
|
|
|
|
def __init__(self, settings: Settings):
|
|
self.settings = settings
|
|
|
|
def build_command(self, config: RecorderConfig) -> list[str]:
|
|
command = [self.settings.ffmpeg_path]
|
|
command.extend(self._get_input_args(config.port_index))
|
|
command.extend(self._get_codec_args(config))
|
|
command.extend(self._get_output_args(config))
|
|
if config.srt_enabled:
|
|
command.extend(self._get_srt_output_args(config))
|
|
return command
|
|
|
|
def _get_input_args(self, port_index: int) -> list[str]:
|
|
return ["-f", "deltacast", "-i", f"deltacast://{port_index}"]
|
|
|
|
def _get_codec_args(self, config: RecorderConfig) -> list[str]:
|
|
args = []
|
|
if config.codec == CodecType.PRORES:
|
|
args.extend(["-c:v", "prores_ks"])
|
|
profile_map = {"hq": "3", "mq": "2", "lq": "0"}
|
|
args.extend(["-profile:v", profile_map.get(config.quality_profile, "3")])
|
|
elif config.codec == CodecType.DNXHD:
|
|
args.extend(["-c:v", "dnxhd", "-b:v", f"{config.bitrate}M" if config.bitrate else "185M"])
|
|
elif config.codec == CodecType.UNCOMPRESSED:
|
|
args.extend(["-c:v", "rawvideo", "-pix_fmt", "uyvy422"])
|
|
elif config.codec == CodecType.H264:
|
|
args.extend(["-c:v", "libx264", "-b:v", f"{config.bitrate}M" if config.bitrate else "50M"])
|
|
return args
|
|
|
|
def _get_output_args(self, config: RecorderConfig) -> list[str]:
|
|
output_path = config.recording_path
|
|
if "{timestamp}" in output_path:
|
|
output_path = output_path.replace("{timestamp}", datetime.now().strftime("%Y%m%d_%H%M%S"))
|
|
if "{port_index}" in output_path:
|
|
output_path = output_path.replace("{port_index}", str(config.port_index))
|
|
return ["-f", "mxf", output_path]
|
|
|
|
def _get_srt_output_args(self, config: RecorderConfig) -> list[str]:
|
|
if not config.srt_enabled or not config.srt_destination:
|
|
return []
|
|
return ["-f", "mpegts", config.srt_destination]
|