deltacast-sdi-recorder/backend/app/api/routes.py

278 lines
11 KiB
Python

"""FastAPI REST API routes for Deltacast SDI recorder control."""
import logging
from fastapi import APIRouter, HTTPException, Depends
from typing import Any, Optional
from pydantic import BaseModel, Field
from ..models import RecorderConfig, PortStatus, SCTE35Marker
from ..recorders.recorder import RecorderManager
from ..recorders.scte35 import SCTE35Manager
from ..utils.hls import HLSPreviewManager
logger = logging.getLogger(__name__)
# ============================================================================
# Pydantic Models for Request Validation
# ============================================================================
class SCTE35InjectionRequest(BaseModel):
"""Request model for SCTE35 marker injection endpoint."""
event_id: int = Field(..., description="Unique event identifier")
duration_seconds: float = Field(..., gt=0, description="Duration in seconds")
webhook_url: Optional[str] = Field(None, description="Optional webhook URL")
out_of_network: bool = Field(True, description="Out of network indicator")
splice_immediate: bool = Field(False, description="Splice immediate flag")
srt_destination_url: Optional[str] = Field(None, description="Target specific SRT destination (None = all)")
router = APIRouter(prefix="/api")
# These will be set by main.py during startup
recorder_manager: RecorderManager | None = None
scte35_manager: SCTE35Manager | None = None
hls_manager: HLSPreviewManager | None = None
def get_recorder_manager() -> RecorderManager:
if recorder_manager is None:
raise HTTPException(status_code=503, detail="Recorder not initialized")
return recorder_manager
def get_scte35_manager() -> SCTE35Manager:
if scte35_manager is None:
raise HTTPException(status_code=503, detail="SCTE35 manager not initialized")
return scte35_manager
def get_hls_manager() -> HLSPreviewManager:
if hls_manager is None:
raise HTTPException(status_code=503, detail="HLS manager not initialized")
return hls_manager
# ============================================================================
# Health and Status Endpoints
# ============================================================================
@router.get("/health")
async def health(manager: RecorderManager = Depends(get_recorder_manager)) -> dict[str, Any]:
try:
all_status = manager.get_all_status()
recording_count = sum(1 for status in all_status if status.is_recording)
return {
"status": "ok",
"ports": len(all_status),
"recording_count": recording_count,
}
except Exception as e:
logger.error(f"Health check error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# ============================================================================
# Port Status Endpoints
# ============================================================================
@router.get("/ports", response_model=list[PortStatus])
async def list_ports(manager: RecorderManager = Depends(get_recorder_manager)) -> list[PortStatus]:
try:
return manager.get_all_status()
except Exception as e:
logger.error(f"Error listing ports: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/ports/{port_index}", response_model=PortStatus)
async def get_port(
port_index: int,
manager: RecorderManager = Depends(get_recorder_manager),
) -> PortStatus:
try:
return manager.get_status(port_index)
except ValueError as e:
raise HTTPException(status_code=404, detail=f"Port {port_index} not found")
except Exception as e:
logger.error(f"Error getting port {port_index}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# ============================================================================
# Recording Control Endpoints
# ============================================================================
@router.post("/ports/{port_index}/start")
async def start_recording(
port_index: int,
config: RecorderConfig,
manager: RecorderManager = Depends(get_recorder_manager),
) -> dict[str, Any]:
try:
await manager.start_recording(port_index, config)
# Auto-start HLS preview if enabled in config
if config.preview_enabled and hls_manager is not None:
try:
await hls_manager.start_preview(port_index)
except Exception as e:
logger.warning(f"HLS preview failed to start for port {port_index}: {e}")
logger.info(f"Started recording on port {port_index}")
return {"message": "Recording started", "port": port_index}
except ValueError as e:
raise HTTPException(status_code=404, detail=f"Port {port_index} not found")
except RuntimeError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error starting recording on port {port_index}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/ports/{port_index}/stop")
async def stop_recording(
port_index: int,
manager: RecorderManager = Depends(get_recorder_manager),
) -> dict[str, Any]:
try:
await manager.stop_recording(port_index)
# Stop HLS preview when recording stops
if hls_manager is not None:
try:
await hls_manager.stop_preview(port_index)
except Exception as e:
logger.warning(f"HLS preview failed to stop for port {port_index}: {e}")
logger.info(f"Stopped recording on port {port_index}")
return {"message": "Recording stopped", "port": port_index}
except ValueError as e:
raise HTTPException(status_code=404, detail=f"Port {port_index} not found")
except Exception as e:
logger.error(f"Error stopping recording on port {port_index}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# ============================================================================
# HLS Preview Endpoints
# ============================================================================
@router.post("/ports/{port_index}/preview/start")
async def start_preview(
port_index: int,
manager: HLSPreviewManager = Depends(get_hls_manager),
) -> dict[str, Any]:
"""Manually start HLS preview for a port (independent of recording)."""
try:
await manager.start_preview(port_index)
playlist_url = f"/hls/port_{port_index}.m3u8"
return {"message": "HLS preview started", "port": port_index, "playlist_url": playlist_url}
except Exception as e:
logger.error(f"Error starting HLS preview on port {port_index}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ports/{port_index}/preview/stop")
async def stop_preview(
port_index: int,
manager: HLSPreviewManager = Depends(get_hls_manager),
) -> dict[str, Any]:
"""Manually stop HLS preview for a port."""
try:
await manager.stop_preview(port_index)
return {"message": "HLS preview stopped", "port": port_index}
except Exception as e:
logger.error(f"Error stopping HLS preview on port {port_index}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ports/{port_index}/preview/status")
async def preview_status(
port_index: int,
manager: HLSPreviewManager = Depends(get_hls_manager),
) -> dict[str, Any]:
"""Get HLS preview status for a port."""
active = manager.is_previewing(port_index)
return {
"port": port_index,
"active": active,
"playlist_url": f"/hls/port_{port_index}.m3u8" if active else None,
}
# ============================================================================
# SCTE35 Ad Break Endpoints
# ============================================================================
@router.post("/scte35/inject", response_model=SCTE35Marker)
async def inject_scte35_marker(
request: SCTE35InjectionRequest,
manager: SCTE35Manager = Depends(get_scte35_manager),
) -> SCTE35Marker:
try:
marker = await manager.inject_marker(
event_id=request.event_id,
duration_seconds=request.duration_seconds,
webhook_url=request.webhook_url,
out_of_network=request.out_of_network,
splice_immediate=request.splice_immediate,
port_index=None,
srt_destination_url=request.srt_destination_url,
)
logger.info(f"Injected global SCTE35 marker event_id={request.event_id}")
return marker
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error injecting SCTE35 marker: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/ports/{port_index}/scte35/inject", response_model=SCTE35Marker)
async def inject_scte35_marker_for_port(
port_index: int,
request: SCTE35InjectionRequest,
manager: SCTE35Manager = Depends(get_scte35_manager),
) -> SCTE35Marker:
try:
marker = await manager.inject_marker(
event_id=request.event_id,
duration_seconds=request.duration_seconds,
webhook_url=request.webhook_url,
out_of_network=request.out_of_network,
splice_immediate=request.splice_immediate,
port_index=port_index,
srt_destination_url=request.srt_destination_url,
)
logger.info(f"Injected SCTE35 marker event_id={request.event_id} on port {port_index}")
return marker
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error injecting SCTE35 marker on port {port_index}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/scte35/history", response_model=list[SCTE35Marker])
async def get_scte35_history(
manager: SCTE35Manager = Depends(get_scte35_manager),
) -> list[SCTE35Marker]:
try:
return manager.get_marker_history()
except Exception as e:
logger.error(f"Error retrieving SCTE35 history: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/ports/{port_index}/scte35/history", response_model=list[SCTE35Marker])
async def get_scte35_history_for_port(
port_index: int,
manager: SCTE35Manager = Depends(get_scte35_manager),
) -> list[SCTE35Marker]:
try:
return [m for m in manager.get_marker_history() if m.port_index == port_index]
except Exception as e:
logger.error(f"Error retrieving SCTE35 history for port {port_index}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")