286 lines
9.8 KiB
Python
286 lines
9.8 KiB
Python
|
|
"""Recorder management for Deltacast SDI recording ports."""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import re
|
||
|
|
import time
|
||
|
|
import logging
|
||
|
|
from backend.app.config import Settings
|
||
|
|
from backend.app.models import RecorderConfig, PortStatus, CodecType
|
||
|
|
from backend.app.utils.ffmpeg import FFmpegCommandBuilder
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class PortRecorder:
|
||
|
|
"""Manages FFmpeg recording subprocess for a single Deltacast port."""
|
||
|
|
|
||
|
|
def __init__(self, port_index: int, settings: Settings):
|
||
|
|
"""Initialize a port recorder.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
port_index: 0-based port index
|
||
|
|
settings: Application settings
|
||
|
|
"""
|
||
|
|
self.port_index = port_index
|
||
|
|
self.settings = settings
|
||
|
|
self._process: asyncio.subprocess.Process | None = None
|
||
|
|
self._config: RecorderConfig | None = None
|
||
|
|
self._start_time: float | None = None
|
||
|
|
self._frame_count: int = 0
|
||
|
|
self._current_file: str = ""
|
||
|
|
self._monitor_task: asyncio.Task | None = None
|
||
|
|
self._command_builder = FFmpegCommandBuilder(settings)
|
||
|
|
|
||
|
|
async def start(self, config: RecorderConfig) -> None:
|
||
|
|
"""Start FFmpeg recording subprocess.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
config: RecorderConfig with codec and output settings
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
RuntimeError: If recorder is already running
|
||
|
|
"""
|
||
|
|
if self._process is not None:
|
||
|
|
raise RuntimeError(f"Port {self.port_index} is already recording")
|
||
|
|
|
||
|
|
self._config = config
|
||
|
|
self._current_file = config.recording_path
|
||
|
|
self._frame_count = 0
|
||
|
|
self._start_time = time.time()
|
||
|
|
|
||
|
|
# Build FFmpeg command
|
||
|
|
command = self._command_builder.build_command(config)
|
||
|
|
logger.info(f"Port {self.port_index}: Starting FFmpeg: {' '.join(command)}")
|
||
|
|
|
||
|
|
# Spawn subprocess
|
||
|
|
self._process = await asyncio.create_subprocess_exec(
|
||
|
|
*command,
|
||
|
|
stdin=asyncio.subprocess.PIPE,
|
||
|
|
stderr=asyncio.subprocess.PIPE,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Start monitoring task
|
||
|
|
self._monitor_task = asyncio.create_task(self._monitor_process())
|
||
|
|
logger.info(f"Port {self.port_index}: Recording started (PID: {self._process.pid})")
|
||
|
|
|
||
|
|
async def stop(self) -> None:
|
||
|
|
"""Gracefully stop FFmpeg: stdin quit -> SIGTERM -> SIGKILL.
|
||
|
|
|
||
|
|
The subprocess is terminated using escalating methods:
|
||
|
|
1. Send 'q\n' to stdin (graceful quit)
|
||
|
|
2. Wait up to 3 seconds for process to exit
|
||
|
|
3. Send SIGTERM
|
||
|
|
4. Wait up to 2 seconds for process to exit
|
||
|
|
5. Send SIGKILL
|
||
|
|
|
||
|
|
If already stopped, this is a no-op.
|
||
|
|
"""
|
||
|
|
if self._process is None:
|
||
|
|
logger.info(f"Port {self.port_index}: Already stopped")
|
||
|
|
return
|
||
|
|
|
||
|
|
logger.info(f"Port {self.port_index}: Stopping recording (PID: {self._process.pid})")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Step 1: Try graceful quit via stdin
|
||
|
|
if self._process.stdin and not self._process.stdin.is_closing():
|
||
|
|
self._process.stdin.write(b"q\n")
|
||
|
|
await self._process.stdin.drain()
|
||
|
|
logger.debug(f"Port {self.port_index}: Sent quit command to stdin")
|
||
|
|
|
||
|
|
# Step 2: Wait up to 3 seconds for graceful shutdown
|
||
|
|
try:
|
||
|
|
await asyncio.wait_for(self._process.wait(), timeout=3.0)
|
||
|
|
logger.info(f"Port {self.port_index}: Process exited gracefully")
|
||
|
|
self._process = None
|
||
|
|
return
|
||
|
|
except asyncio.TimeoutError:
|
||
|
|
logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM")
|
||
|
|
|
||
|
|
# Step 3: Send SIGTERM
|
||
|
|
self._process.terminate()
|
||
|
|
|
||
|
|
# Step 4: Wait up to 2 seconds for SIGTERM to work
|
||
|
|
try:
|
||
|
|
await asyncio.wait_for(self._process.wait(), timeout=2.0)
|
||
|
|
logger.info(f"Port {self.port_index}: Process exited after SIGTERM")
|
||
|
|
self._process = None
|
||
|
|
return
|
||
|
|
except asyncio.TimeoutError:
|
||
|
|
logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL")
|
||
|
|
|
||
|
|
# Step 5: Send SIGKILL
|
||
|
|
self._process.kill()
|
||
|
|
await self._process.wait()
|
||
|
|
logger.info(f"Port {self.port_index}: Process killed")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Port {self.port_index}: Error stopping process: {e}")
|
||
|
|
finally:
|
||
|
|
self._process = None
|
||
|
|
if self._monitor_task and not self._monitor_task.done():
|
||
|
|
self._monitor_task.cancel()
|
||
|
|
try:
|
||
|
|
await self._monitor_task
|
||
|
|
except asyncio.CancelledError:
|
||
|
|
pass
|
||
|
|
self._start_time = None
|
||
|
|
|
||
|
|
def get_status(self) -> PortStatus:
|
||
|
|
"""Get current status of this port.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
PortStatus with current recording metrics
|
||
|
|
"""
|
||
|
|
is_recording = self._process is not None and not self._process.done()
|
||
|
|
|
||
|
|
# Calculate uptime in seconds
|
||
|
|
uptime_seconds = 0
|
||
|
|
if self._start_time is not None:
|
||
|
|
uptime_seconds = int(time.time() - self._start_time)
|
||
|
|
|
||
|
|
# Calculate FPS
|
||
|
|
fps = 0.0
|
||
|
|
if uptime_seconds > 0:
|
||
|
|
fps = self._frame_count / uptime_seconds
|
||
|
|
|
||
|
|
# Calculate bitrate (would need real-time data from FFmpeg in production)
|
||
|
|
# For now, return 0 as this would require parsing FFmpeg stats
|
||
|
|
bitrate_mbps = 0.0
|
||
|
|
|
||
|
|
codec = self._config.codec if self._config else CodecType.H264
|
||
|
|
|
||
|
|
return PortStatus(
|
||
|
|
port_index=self.port_index,
|
||
|
|
is_recording=is_recording,
|
||
|
|
frame_count=self._frame_count,
|
||
|
|
fps=fps,
|
||
|
|
bitrate_mbps=bitrate_mbps,
|
||
|
|
uptime_seconds=uptime_seconds,
|
||
|
|
current_file=self._current_file,
|
||
|
|
codec=codec,
|
||
|
|
)
|
||
|
|
|
||
|
|
async def _monitor_process(self) -> None:
|
||
|
|
"""Parse FFmpeg stderr for frame=N updates.
|
||
|
|
|
||
|
|
Reads FFmpeg stderr line by line and extracts frame count using regex.
|
||
|
|
Updates self._frame_count as data is received.
|
||
|
|
|
||
|
|
Continues until the process exits or the task is cancelled.
|
||
|
|
"""
|
||
|
|
if self._process is None or self._process.stderr is None:
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
async for line in self._process.stderr:
|
||
|
|
try:
|
||
|
|
line_str = line.decode("utf-8", errors="ignore").strip()
|
||
|
|
if not line_str:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Parse frame count from FFmpeg stderr
|
||
|
|
# Format: "frame= 123 fps=..."
|
||
|
|
match = re.search(r"frame=\s*(\d+)", line_str)
|
||
|
|
if match:
|
||
|
|
self._frame_count = int(match.group(1))
|
||
|
|
logger.debug(
|
||
|
|
f"Port {self.port_index}: Frame count = {self._frame_count}"
|
||
|
|
)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Port {self.port_index}: Error parsing stderr: {e}")
|
||
|
|
|
||
|
|
except asyncio.CancelledError:
|
||
|
|
logger.debug(f"Port {self.port_index}: Monitoring task cancelled")
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Port {self.port_index}: Monitoring process error: {e}")
|
||
|
|
|
||
|
|
|
||
|
|
class RecorderManager:
|
||
|
|
"""Manages recording on all Deltacast ports."""
|
||
|
|
|
||
|
|
def __init__(self, settings: Settings):
|
||
|
|
"""Initialize the recorder manager.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
settings: Application settings
|
||
|
|
"""
|
||
|
|
self.settings = settings
|
||
|
|
self._recorders: dict[int, PortRecorder] = {}
|
||
|
|
|
||
|
|
def initialize(self) -> None:
|
||
|
|
"""Create PortRecorder for each port (0 to DELTACAST_PORT_COUNT-1)."""
|
||
|
|
for port_index in range(self.settings.deltacast_port_count):
|
||
|
|
self._recorders[port_index] = PortRecorder(port_index, self.settings)
|
||
|
|
logger.info(
|
||
|
|
f"Initialized {self.settings.deltacast_port_count} port recorders"
|
||
|
|
)
|
||
|
|
|
||
|
|
async def start_recording(self, port_index: int, config: RecorderConfig) -> None:
|
||
|
|
"""Start recording on given port.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
port_index: 0-based port index
|
||
|
|
config: RecorderConfig with codec and output settings
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
ValueError: If port_index is invalid
|
||
|
|
RuntimeError: If port is already recording
|
||
|
|
"""
|
||
|
|
if port_index not in self._recorders:
|
||
|
|
raise ValueError(
|
||
|
|
f"Invalid port index {port_index}. "
|
||
|
|
f"Valid range: 0-{self.settings.deltacast_port_count - 1}"
|
||
|
|
)
|
||
|
|
|
||
|
|
recorder = self._recorders[port_index]
|
||
|
|
await recorder.start(config)
|
||
|
|
|
||
|
|
async def stop_recording(self, port_index: int) -> None:
|
||
|
|
"""Stop recording on given port.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
port_index: 0-based port index
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
ValueError: If port_index is invalid
|
||
|
|
"""
|
||
|
|
if port_index not in self._recorders:
|
||
|
|
raise ValueError(
|
||
|
|
f"Invalid port index {port_index}. "
|
||
|
|
f"Valid range: 0-{self.settings.deltacast_port_count - 1}"
|
||
|
|
)
|
||
|
|
|
||
|
|
recorder = self._recorders[port_index]
|
||
|
|
await recorder.stop()
|
||
|
|
|
||
|
|
def get_status(self, port_index: int) -> PortStatus:
|
||
|
|
"""Get status for a single port.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
port_index: 0-based port index
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
PortStatus with current metrics
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
ValueError: If port_index is invalid
|
||
|
|
"""
|
||
|
|
if port_index not in self._recorders:
|
||
|
|
raise ValueError(
|
||
|
|
f"Invalid port index {port_index}. "
|
||
|
|
f"Valid range: 0-{self.settings.deltacast_port_count - 1}"
|
||
|
|
)
|
||
|
|
|
||
|
|
return self._recorders[port_index].get_status()
|
||
|
|
|
||
|
|
def get_all_status(self) -> list[PortStatus]:
|
||
|
|
"""Get status for all ports.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of PortStatus objects for each port
|
||
|
|
"""
|
||
|
|
return [self._recorders[i].get_status()
|
||
|
|
for i in range(self.settings.deltacast_port_count)]
|