diff --git a/backend/app/recorders/scte35.py b/backend/app/recorders/scte35.py new file mode 100644 index 0000000..de3c479 --- /dev/null +++ b/backend/app/recorders/scte35.py @@ -0,0 +1,221 @@ +import struct +import binascii +import asyncio +import aiohttp +import logging +from datetime import datetime +from typing import Optional +from ..models import SCTE35Marker + +logger = logging.getLogger(__name__) + + +def _encode_33bit_pts(value: int) -> bytes: + """Encode 33-bit PTS value into 5 bytes per MPEG-2 spec.""" + byte1 = ((value >> 30) & 0x07) << 1 | 0x01 # top 3 bits + marker + byte2 = (value >> 22) & 0xFF + byte3 = ((value >> 15) & 0x7F) << 1 | 0x01 # 7 bits + marker + byte4 = (value >> 7) & 0xFF + byte5 = (value & 0x7F) << 1 | 0x01 # bottom 7 bits + marker + return struct.pack('>BBBBB', byte1, byte2, byte3, byte4, byte5) + + +class SCTE35Manager: + """Manage SCTE35 splice insert cues for broadcast recording.""" + + def __init__(self): + """Initialize the SCTE35 manager with empty history.""" + self._marker_history: list[SCTE35Marker] = [] + + def encode_splice_insert( + self, + event_id: int, + duration_seconds: float, + pts_time: int = 0, + out_of_network: bool = True, + splice_immediate: bool = False + ) -> bytes: + """ + Encode a SCTE35 splice_insert command as binary bytes. + Returns the binary SCTE35 cue. + + Args: + event_id: Unique identifier for the splice event + duration_seconds: Duration of the splice in seconds + pts_time: PTS time in seconds (90kHz clock) + out_of_network: Whether this is an out-of-network splice + splice_immediate: Whether splice should happen immediately + + Returns: + Binary SCTE35 cue bytes + """ + # Validate inputs + if not (0 <= event_id <= 0xFFFFFFFF): + raise ValueError(f"event_id must be a 32-bit unsigned integer (0-4294967295), got {event_id}") + if duration_seconds < 0: + raise ValueError(f"duration_seconds must be non-negative, got {duration_seconds}") + + # Build the splice_insert command + flags = 0x00 + if out_of_network: + flags |= 0x80 # out_of_network_indicator (bit 7) + flags |= 0x40 # program_splice_flag (bit 6) + + if duration_seconds > 0: + flags |= 0x20 # duration_flag (bit 5) + + if splice_immediate: + flags |= 0x10 # splice_immediate_flag (bit 4) + + # Build splice_insert command: event_id (4 bytes) + cancel_indicator (1 byte) + splice_cmd = struct.pack('>I', event_id) + splice_cmd += struct.pack('>B', 0x00) # splice_event_cancel_indicator = 0 + splice_cmd += struct.pack('>B', flags) + + # If not immediate, encode PTS time + if not splice_immediate: + pts_90k = pts_time * 90000 + pts_bytes = _encode_33bit_pts(pts_90k) + splice_cmd += pts_bytes + + # If duration specified, encode duration + if duration_seconds > 0: + dur_90k = int(duration_seconds * 90000) + # Auto return flag (bit 7) = 1, 33-bit duration + duration_byte = 0xFE # 1111 1110 (auto_return=1) + splice_cmd += struct.pack('>B', duration_byte) + dur_bytes = _encode_33bit_pts(dur_90k) + splice_cmd += dur_bytes + + # Unique program ID, avail num, avails expected + splice_cmd += struct.pack('>H', 0x0001) # unique_program_id + splice_cmd += struct.pack('>B', 0x00) # avail_num + splice_cmd += struct.pack('>B', 0x00) # avails_expected + + # Calculate splice command length + splice_cmd_length = len(splice_cmd) + + # Build SCTE35 section header + # Table ID: 0xFC + section = struct.pack('>B', 0xFC) + + # Section Syntax Indicator (1 bit) + Private Indicator (1 bit) + Reserved (2 bits) = 0x30 + section += struct.pack('>B', 0x30) + + # The remainder is: protocol_version (1) + encrypted_packet (1) + pts_adj (5) + + # cw_index (1) + tier (2) + splice_cmd_length (2) + splice_cmd + + # unique_program_id (2) + avail_num (1) + avails_expected (1) + + # descriptor_loop_length (2) + + # Build the payload + payload = struct.pack('>B', 0x00) # protocol_version + payload += struct.pack('>B', 0x00) # encrypted_packet + payload += struct.pack('>5s', b'\x00\x00\x00\x00\x00') # PTS adjustment (5 bytes) + payload += struct.pack('>B', 0x00) # CW index + # Tier (reserved=0xFF, reserved=0xF0) + payload += struct.pack('>H', 0xFFF0) + payload += struct.pack('>H', splice_cmd_length) # Splice command length + payload += splice_cmd + payload += struct.pack('>H', 0x0000) # descriptor_loop_length + + # Section length (length of everything after the 2-byte section_length field) + section_length = len(payload) + section += struct.pack('>H', section_length) + section += payload + + # Calculate and append CRC32 + crc = binascii.crc32(section) & 0xFFFFFFFF + section += struct.pack('>I', crc) + + return section + + async def inject_marker( + self, + event_id: int, + duration_seconds: float, + webhook_url: Optional[str] = None, + out_of_network: bool = True, + splice_immediate: bool = False + ) -> SCTE35Marker: + """ + Create SCTE35 marker, trigger webhook if configured, add to history. + + Args: + event_id: Unique identifier for the splice event + duration_seconds: Duration of the ad break in seconds + webhook_url: Optional webhook URL to POST marker data to + out_of_network: Whether this is an out-of-network splice + splice_immediate: Whether splice should happen immediately + + Returns: + The created SCTE35Marker object + """ + # Generate binary cue + cue_bytes = self.encode_splice_insert( + event_id=event_id, + duration_seconds=duration_seconds, + out_of_network=out_of_network, + splice_immediate=splice_immediate + ) + + # Create marker object - event_id must be a string per model + marker = SCTE35Marker( + event_id=str(event_id), + duration_seconds=int(duration_seconds), + out_of_network=out_of_network, + splice_immediate=splice_immediate, + timestamp=datetime.utcnow(), + webhook_url=webhook_url + ) + + # Add to history + self._marker_history.append(marker) + + # Trigger webhook if configured + if webhook_url: + try: + await self._trigger_webhook(marker, cue_bytes) + except Exception as e: + # Log but don't fail - marker is already added to history + logger.error(f"Webhook trigger failed for event_id {event_id}: {e}") + + return marker + + async def _trigger_webhook(self, marker: SCTE35Marker, cue_bytes: bytes) -> None: + """ + POST JSON payload to webhook_url with marker metadata. + + Args: + marker: SCTE35Marker object with metadata + cue_bytes: Binary SCTE35 cue bytes + """ + if not marker.webhook_url: + return + + payload = { + "event_id": marker.event_id, + "duration_seconds": marker.duration_seconds, + "out_of_network": marker.out_of_network, + "splice_immediate": marker.splice_immediate, + "timestamp": marker.timestamp.isoformat(), + "cue_hex": cue_bytes.hex() + } + + async with aiohttp.ClientSession() as session: + async with session.post(marker.webhook_url, json=payload) as response: + # Just log the status + if response.status != 200: + logger.warning(f"Webhook returned status {response.status}") + + def get_marker_history(self) -> list[SCTE35Marker]: + """ + Return all markers in history as a copy. + + Returns: + List of SCTE35Marker objects + """ + return list(self._marker_history) + + def clear_history(self) -> None: + """Clear the marker history.""" + self._marker_history.clear()