deltacast-sdi-recorder/backend/app/recorders/srt_streamer.py

118 lines
4.5 KiB
Python

"""SRT streaming for Deltacast SDI input."""
import asyncio
import logging
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from ..config import Settings
from ..models import RecorderConfig, CodecType
logger = logging.getLogger(__name__)
class SRTStreamer:
"""Independent SRT output manager for streaming without recording."""
def __init__(self, port_index: int, settings: Settings):
self.port_index = port_index
self.settings = settings
self._process: asyncio.subprocess.Process | None = None
self._config: RecorderConfig | None = None
self._is_streaming: bool = False
async def start_stream(self, config: RecorderConfig) -> None:
if not config.srt_enabled:
raise ValueError("SRT is not enabled in config")
if not config.srt_destination:
raise ValueError("SRT destination is not set in config")
if self._is_streaming:
raise RuntimeError(f"Port {self.port_index} is already streaming SRT")
self._config = config
command = self._build_srt_command(config)
logger.info(f"Port {self.port_index}: Starting SRT stream: {' '.join(command)}")
self._process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)
self._is_streaming = True
logger.info(f"Port {self.port_index}: SRT stream started (PID: {self._process.pid})")
async def stop_stream(self) -> None:
if not self._is_streaming:
return
try:
if self._process and self._process.stdin and not self._process.stdin.is_closing():
self._process.stdin.write(b"q\n")
await self._process.stdin.drain()
try:
await asyncio.wait_for(self._process.wait(), timeout=3.0)
self._process = None
self._is_streaming = False
return
except asyncio.TimeoutError:
pass
if self._process:
try:
self._process.terminate()
except ProcessLookupError:
pass
try:
await asyncio.wait_for(self._process.wait(), timeout=2.0)
self._process = None
self._is_streaming = False
return
except asyncio.TimeoutError:
pass
if self._process:
try:
self._process.kill()
await self._process.wait()
except ProcessLookupError:
pass
except Exception as e:
logger.error(f"Port {self.port_index}: Error stopping SRT stream: {e}")
finally:
self._process = None
self._is_streaming = False
@property
def is_streaming(self) -> bool:
return self._is_streaming
def _build_srt_command(self, config: RecorderConfig) -> list[str]:
command = [self.settings.ffmpeg_path]
command.extend(["-f", "deltacast", "-i", f"deltacast://{self.port_index}"])
command.extend(self._get_codec_args(config))
srt_destination = self._add_latency_to_srt_url(config.srt_destination)
command.extend(["-f", "mpegts", srt_destination])
return command
def _get_codec_args(self, config: RecorderConfig) -> list[str]:
args = []
if config.codec == CodecType.PRORES:
args.extend(["-c:v", "prores_ks"])
profile_map = {"hq": "3", "mq": "2", "lq": "0"}
args.extend(["-profile:v", profile_map.get(config.quality_profile, "3")])
elif config.codec == CodecType.DNXHD:
args.extend(["-c:v", "dnxhd", "-b:v", f"{config.bitrate}M" if config.bitrate else "185M"])
elif config.codec == CodecType.UNCOMPRESSED:
args.extend(["-c:v", "rawvideo", "-pix_fmt", "uyvy422"])
elif config.codec == CodecType.H264:
args.extend(["-c:v", "libx264", "-b:v", f"{config.bitrate}M" if config.bitrate else "8M"])
return args
def _add_latency_to_srt_url(self, srt_url: str) -> str:
parsed = urlparse(srt_url)
params = parse_qs(parsed.query, keep_blank_values=True)
params["latency"] = [str(self.settings.srt_latency)]
new_query = urlencode(params, doseq=True)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, new_query, parsed.fragment))