deltacast-sdi-recorder/backend/app/utils/hls.py

160 lines
5.6 KiB
Python

"""HLS preview manager for live video preview via HTTP Live Streaming."""
import asyncio
import logging
import os
from pathlib import Path
from ..config import Settings
logger = logging.getLogger(__name__)
def _deltacast_device_exists(port_index: int) -> bool:
return os.path.exists(f"/dev/deltacast{port_index}")
def _proc_running(proc) -> bool:
"""Return True if the subprocess is still alive."""
return proc is not None and proc.returncode is None
class HLSPreviewManager:
"""
Manages FFmpeg HLS transcoding processes for video preview.
Falls back to a lavfi test source when no physical deltacast device is present.
"""
def __init__(self, settings: Settings, hls_dir: str = "/tmp/hls"):
self.settings = settings
self.hls_dir = Path(hls_dir)
self._processes: dict[int, asyncio.subprocess.Process] = {}
async def start_preview(self, port_index: int) -> None:
existing = self._processes.get(port_index)
if _proc_running(existing):
logger.info(f"Port {port_index}: HLS preview already running")
return
self.hls_dir.mkdir(parents=True, exist_ok=True)
use_test_src = not _deltacast_device_exists(port_index)
command = self._build_hls_command(port_index, use_test_src=use_test_src)
if use_test_src:
logger.info(f"Port {port_index}: No deltacast device — using lavfi test source for HLS")
else:
logger.info(f"Port {port_index}: Starting HLS preview from deltacast{port_index}")
try:
process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)
self._processes[port_index] = process
logger.info(f"Port {port_index}: HLS preview started (PID: {process.pid})")
except Exception as e:
logger.error(f"Port {port_index}: Failed to start HLS preview: {e}")
raise
async def stop_preview(self, port_index: int) -> None:
process = self._processes.get(port_index)
if not _proc_running(process):
return
logger.info(f"Port {port_index}: Stopping HLS preview (PID: {process.pid})")
try:
if process.stdin and not process.stdin.is_closing():
try:
process.stdin.write(b"q\n")
await process.stdin.drain()
except (BrokenPipeError, ConnectionResetError):
pass
try:
await asyncio.wait_for(process.wait(), timeout=3.0)
return
except asyncio.TimeoutError:
pass
try:
process.terminate()
except ProcessLookupError:
return
try:
await asyncio.wait_for(process.wait(), timeout=2.0)
except asyncio.TimeoutError:
try:
process.kill()
await process.wait()
except ProcessLookupError:
pass
except Exception as e:
logger.error(f"Port {port_index}: Error stopping HLS preview: {e}")
finally:
self._processes[port_index] = None
async def stop_all(self) -> None:
for port_index in list(self._processes.keys()):
await self.stop_preview(port_index)
def get_playlist_path(self, port_index: int) -> Path:
return self.hls_dir / f"port_{port_index}.m3u8"
def is_previewing(self, port_index: int) -> bool:
return _proc_running(self._processes.get(port_index))
def _build_hls_command(self, port_index: int, use_test_src: bool = False) -> list[str]:
playlist_path = self.get_playlist_path(port_index)
if use_test_src:
command = [
self.settings.ffmpeg_path,
"-f", "lavfi",
"-i", (
f"testsrc2=size=1280x720:rate=30,"
f"drawtext=text='SDI PORT {port_index} \u2014 NO SIGNAL':"
f"fontsize=36:fontcolor=white:x=(w-text_w)/2:y=(h-text_h)/2,"
f"drawtext=text='%{{localtime\\:%H\\:%M\\:%S}}':"
f"fontsize=24:fontcolor=yellow:x=10:y=10"
),
"-f", "lavfi", "-i", "sine=frequency=1000:sample_rate=48000",
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-b:v", "2M",
"-g", "30",
"-c:a", "aac",
"-b:a", "64k",
"-f", "hls",
"-hls_time", "2",
"-hls_list_size", "4",
"-hls_flags", "delete_segments+append_list",
"-hls_segment_filename", str(self.hls_dir / f"port_{port_index}_%03d.ts"),
str(playlist_path),
]
else:
command = [
self.settings.ffmpeg_path,
"-f", "deltacast",
"-i", f"deltacast://{port_index}",
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-b:v", "5M",
"-s", "1280x720",
"-c:a", "aac",
"-b:a", "128k",
"-f", "hls",
"-hls_time", "2",
"-hls_list_size", "4",
"-hls_flags", "delete_segments+append_list",
"-hls_segment_filename", str(self.hls_dir / f"port_{port_index}_%03d.ts"),
str(playlist_path),
]
return command