fix: replace absolute backend.app imports with relative imports

This commit is contained in:
Zac Gaetano 2026-04-14 09:55:33 -04:00
parent 7f1951a871
commit 979616e43b

View file

@ -3,25 +3,16 @@
import asyncio import asyncio
import logging import logging
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from backend.app.config import Settings from ..config import Settings
from backend.app.models import RecorderConfig, CodecType from ..models import RecorderConfig, CodecType
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SRTStreamer: class SRTStreamer:
""" """Independent SRT output manager for streaming without recording."""
Independent SRT output manager for streaming without recording.
Manages a separate FFmpeg process for SRT-only output.
"""
def __init__(self, port_index: int, settings: Settings): def __init__(self, port_index: int, settings: Settings):
"""Initialize SRT streamer.
Args:
port_index: 0-based Deltacast port index
settings: Application settings
"""
self.port_index = port_index self.port_index = port_index
self.settings = settings self.settings = settings
self._process: asyncio.subprocess.Process | None = None self._process: asyncio.subprocess.Process | None = None
@ -29,105 +20,63 @@ class SRTStreamer:
self._is_streaming: bool = False self._is_streaming: bool = False
async def start_stream(self, config: RecorderConfig) -> None: async def start_stream(self, config: RecorderConfig) -> None:
"""Start SRT-only FFmpeg stream.
Args:
config: RecorderConfig with codec and SRT settings
Raises:
ValueError: If SRT not enabled in config or destination not set
RuntimeError: If already streaming
"""
if not config.srt_enabled: if not config.srt_enabled:
raise ValueError("SRT is not enabled in config") raise ValueError("SRT is not enabled in config")
if not config.srt_destination: if not config.srt_destination:
raise ValueError("SRT destination is not set in config") raise ValueError("SRT destination is not set in config")
if self._is_streaming: if self._is_streaming:
raise RuntimeError(f"Port {self.port_index} is already streaming SRT") raise RuntimeError(f"Port {self.port_index} is already streaming SRT")
self._config = config self._config = config
# Build FFmpeg command
command = self._build_srt_command(config) command = self._build_srt_command(config)
logger.info(f"Port {self.port_index}: Starting SRT stream: {' '.join(command)}") logger.info(f"Port {self.port_index}: Starting SRT stream: {' '.join(command)}")
# Spawn subprocess
self._process = await asyncio.create_subprocess_exec( self._process = await asyncio.create_subprocess_exec(
*command, *command,
stdin=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
) )
self._is_streaming = True self._is_streaming = True
logger.info(f"Port {self.port_index}: SRT stream started (PID: {self._process.pid})") logger.info(f"Port {self.port_index}: SRT stream started (PID: {self._process.pid})")
async def stop_stream(self) -> None: async def stop_stream(self) -> None:
"""Stop SRT stream gracefully: stdin q -> SIGTERM -> SIGKILL.
The subprocess is terminated using escalating methods:
1. Send 'q\n' to stdin (graceful quit)
2. Wait up to 3 seconds for process to exit
3. Send SIGTERM
4. Wait up to 2 seconds for process to exit
5. Send SIGKILL
If already stopped, this is a no-op.
"""
if not self._is_streaming: if not self._is_streaming:
logger.info(f"Port {self.port_index}: Not currently streaming")
return return
logger.info(f"Port {self.port_index}: Stopping SRT stream (PID: {self._process.pid})")
try: try:
# Step 1: Try graceful quit via stdin
if self._process and self._process.stdin and not self._process.stdin.is_closing(): if self._process and self._process.stdin and not self._process.stdin.is_closing():
self._process.stdin.write(b"q\n") self._process.stdin.write(b"q\n")
await self._process.stdin.drain() await self._process.stdin.drain()
logger.debug(f"Port {self.port_index}: Sent quit command to stdin")
# Step 2: Wait up to 3 seconds for graceful shutdown
try: try:
await asyncio.wait_for(self._process.wait(), timeout=3.0) await asyncio.wait_for(self._process.wait(), timeout=3.0)
logger.info(f"Port {self.port_index}: SRT stream exited gracefully")
self._process = None self._process = None
self._is_streaming = False self._is_streaming = False
return return
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM") pass
# Step 3: Send SIGTERM
if self._process: if self._process:
try: try:
self._process.terminate() self._process.terminate()
except ProcessLookupError: except ProcessLookupError:
pass # Process already terminated pass
except Exception as e:
logger.error(f"Port {self.port_index}: Error sending SIGTERM: {e}")
# Step 4: Wait up to 2 seconds for SIGTERM to work
try: try:
await asyncio.wait_for(self._process.wait(), timeout=2.0) await asyncio.wait_for(self._process.wait(), timeout=2.0)
logger.info(f"Port {self.port_index}: SRT stream exited after SIGTERM")
self._process = None self._process = None
self._is_streaming = False self._is_streaming = False
return return
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL") pass
# Step 5: Send SIGKILL
if self._process: if self._process:
try: try:
self._process.kill() self._process.kill()
await self._process.wait() await self._process.wait()
logger.info(f"Port {self.port_index}: SRT stream killed")
except ProcessLookupError: except ProcessLookupError:
pass # Process already terminated pass
except Exception as e:
logger.error(f"Port {self.port_index}: Error sending SIGKILL: {e}")
except Exception as e: except Exception as e:
logger.error(f"Port {self.port_index}: Error stopping SRT stream: {e}") logger.error(f"Port {self.port_index}: Error stopping SRT stream: {e}")
@ -137,103 +86,33 @@ class SRTStreamer:
@property @property
def is_streaming(self) -> bool: def is_streaming(self) -> bool:
"""Return current streaming state."""
return self._is_streaming return self._is_streaming
def _build_srt_command(self, config: RecorderConfig) -> list[str]: def _build_srt_command(self, config: RecorderConfig) -> list[str]:
"""Build FFmpeg command for SRT-only output (no file recording).
Command structure:
ffmpeg -f deltacast -i deltacast://{port_index} [codec_args] -f mpegts {srt_destination}
Args:
config: RecorderConfig with port, codec, and SRT settings
Returns:
List of command arguments ready for subprocess execution
"""
command = [self.settings.ffmpeg_path] command = [self.settings.ffmpeg_path]
command.extend(["-f", "deltacast", "-i", f"deltacast://{self.port_index}"])
# Add input device arguments
command.extend([
"-f", "deltacast",
"-i", f"deltacast://{self.port_index}"
])
# Add codec arguments
command.extend(self._get_codec_args(config)) command.extend(self._get_codec_args(config))
# Add SRT output with mpegts format
srt_destination = self._add_latency_to_srt_url(config.srt_destination) srt_destination = self._add_latency_to_srt_url(config.srt_destination)
command.extend([ command.extend(["-f", "mpegts", srt_destination])
"-f", "mpegts",
srt_destination
])
return command return command
def _get_codec_args(self, config: RecorderConfig) -> list[str]: def _get_codec_args(self, config: RecorderConfig) -> list[str]:
"""Map CodecType to FFmpeg codec arguments for SRT output.
Args:
config: RecorderConfig with codec selection
Returns:
Codec-specific FFmpeg arguments
"""
args = [] args = []
if config.codec == CodecType.PRORES: if config.codec == CodecType.PRORES:
args.extend(["-c:v", "prores_ks"]) args.extend(["-c:v", "prores_ks"])
profile_map = { profile_map = {"hq": "3", "mq": "2", "lq": "0"}
"hq": "3", args.extend(["-profile:v", profile_map.get(config.quality_profile, "3")])
"mq": "2",
"lq": "0",
}
profile = profile_map.get(config.quality_profile, "3")
args.extend(["-profile:v", profile])
elif config.codec == CodecType.DNXHD: elif config.codec == CodecType.DNXHD:
args.extend(["-c:v", "dnxhd"]) args.extend(["-c:v", "dnxhd", "-b:v", f"{config.bitrate}M" if config.bitrate else "185M"])
bitrate = f"{config.bitrate}M" if config.bitrate else "185M"
args.extend(["-b:v", bitrate])
elif config.codec == CodecType.UNCOMPRESSED: elif config.codec == CodecType.UNCOMPRESSED:
args.extend(["-c:v", "rawvideo", "-pix_fmt", "uyvy422"]) args.extend(["-c:v", "rawvideo", "-pix_fmt", "uyvy422"])
elif config.codec == CodecType.H264: elif config.codec == CodecType.H264:
args.extend(["-c:v", "libx264"]) args.extend(["-c:v", "libx264", "-b:v", f"{config.bitrate}M" if config.bitrate else "8M"])
bitrate = f"{config.bitrate}M" if config.bitrate else "8M"
args.extend(["-b:v", bitrate])
return args return args
def _add_latency_to_srt_url(self, srt_url: str) -> str: def _add_latency_to_srt_url(self, srt_url: str) -> str:
"""Add or override latency parameter in SRT URL.
Args:
srt_url: SRT destination URL (e.g., srt://host:port or srt://host:port?latency=...)
Returns:
SRT URL with latency parameter set from settings
"""
parsed = urlparse(srt_url) parsed = urlparse(srt_url)
params = parse_qs(parsed.query, keep_blank_values=True) params = parse_qs(parsed.query, keep_blank_values=True)
# Set latency from settings
params["latency"] = [str(self.settings.srt_latency)] params["latency"] = [str(self.settings.srt_latency)]
# Reconstruct query string
new_query = urlencode(params, doseq=True) new_query = urlencode(params, doseq=True)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, new_query, parsed.fragment))
# Reconstruct URL
new_url = urlunparse((
parsed.scheme,
parsed.netloc,
parsed.path,
parsed.params,
new_query,
parsed.fragment
))
return new_url