deltacast-sdi-recorder/backend/app/recorders/recorder.py

168 lines
6.1 KiB
Python

"""Recorder management for Deltacast SDI recording ports."""
import asyncio
import re
import time
import logging
from ..config import Settings
from ..models import RecorderConfig, PortStatus, CodecType
from ..utils.ffmpeg import FFmpegCommandBuilder
logger = logging.getLogger(__name__)
class PortRecorder:
"""Manages FFmpeg recording subprocess for a single Deltacast port."""
def __init__(self, port_index: int, settings: Settings):
self.port_index = port_index
self.settings = settings
self._process: asyncio.subprocess.Process | None = None
self._config: RecorderConfig | None = None
self._start_time: float | None = None
self._frame_count: int = 0
self._current_file: str = ""
self._monitor_task: asyncio.Task | None = None
self._command_builder = FFmpegCommandBuilder(settings)
async def start(self, config: RecorderConfig) -> None:
if self._process is not None:
raise RuntimeError(f"Port {self.port_index} is already recording")
self._config = config
self._current_file = config.recording_path
self._frame_count = 0
self._start_time = time.time()
command = self._command_builder.build_command(config)
logger.info(f"Port {self.port_index}: Starting FFmpeg: {' '.join(command)}")
self._process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._monitor_task = asyncio.create_task(self._monitor_process())
logger.info(f"Port {self.port_index}: Recording started (PID: {self._process.pid})")
async def stop(self) -> None:
if self._process is None:
logger.info(f"Port {self.port_index}: Already stopped")
return
logger.info(f"Port {self.port_index}: Stopping recording (PID: {self._process.pid})")
try:
if self._process.stdin and not self._process.stdin.is_closing():
self._process.stdin.write(b"q\n")
await self._process.stdin.drain()
try:
await asyncio.wait_for(self._process.wait(), timeout=3.0)
self._process = None
return
except asyncio.TimeoutError:
logger.warning(f"Port {self.port_index}: Graceful quit timeout, sending SIGTERM")
self._process.terminate()
try:
await asyncio.wait_for(self._process.wait(), timeout=2.0)
self._process = None
return
except asyncio.TimeoutError:
logger.warning(f"Port {self.port_index}: SIGTERM timeout, sending SIGKILL")
self._process.kill()
await self._process.wait()
except Exception as e:
logger.error(f"Port {self.port_index}: Error stopping process: {e}")
finally:
self._process = None
if self._monitor_task and not self._monitor_task.done():
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
self._start_time = None
def get_status(self) -> PortStatus:
is_recording = self._process is not None and not self._process.done()
uptime_seconds = 0
if self._start_time is not None:
uptime_seconds = int(time.time() - self._start_time)
fps = 0.0
if uptime_seconds > 0:
fps = self._frame_count / uptime_seconds
bitrate_mbps = 0.0
codec = self._config.codec if self._config else CodecType.H264
return PortStatus(
port_index=self.port_index,
is_recording=is_recording,
frame_count=self._frame_count,
fps=fps,
bitrate_mbps=bitrate_mbps,
uptime_seconds=uptime_seconds,
current_file=self._current_file,
codec=codec,
)
async def _monitor_process(self) -> None:
if self._process is None or self._process.stderr is None:
return
try:
async for line in self._process.stderr:
try:
line_str = line.decode("utf-8", errors="ignore").strip()
if not line_str:
continue
match = re.search(r"frame=\s*(\d+)", line_str)
if match:
self._frame_count = int(match.group(1))
except Exception as e:
logger.error(f"Port {self.port_index}: Error parsing stderr: {e}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Port {self.port_index}: Monitoring process error: {e}")
class RecorderManager:
"""Manages recording on all Deltacast ports."""
def __init__(self, settings: Settings):
self.settings = settings
self._recorders: dict[int, PortRecorder] = {}
def initialize(self) -> None:
for port_index in range(self.settings.deltacast_port_count):
self._recorders[port_index] = PortRecorder(port_index, self.settings)
logger.info(f"Initialized {self.settings.deltacast_port_count} port recorders")
async def start_recording(self, port_index: int, config: RecorderConfig) -> None:
if port_index not in self._recorders:
raise ValueError(f"Invalid port index {port_index}.")
await self._recorders[port_index].start(config)
async def stop_recording(self, port_index: int) -> None:
if port_index not in self._recorders:
raise ValueError(f"Invalid port index {port_index}.")
await self._recorders[port_index].stop()
def get_status(self, port_index: int) -> PortStatus:
if port_index not in self._recorders:
raise ValueError(f"Invalid port index {port_index}.")
return self._recorders[port_index].get_status()
def get_all_status(self) -> list[PortStatus]:
return [self._recorders[i].get_status()
for i in range(self.settings.deltacast_port_count)]