Add backend/tests/test_scte35.py
This commit is contained in:
parent
52157add93
commit
9fd23fabb6
1 changed files with 335 additions and 0 deletions
335
backend/tests/test_scte35.py
Normal file
335
backend/tests/test_scte35.py
Normal file
|
|
@ -0,0 +1,335 @@
|
||||||
|
import pytest
|
||||||
|
import struct
|
||||||
|
import asyncio
|
||||||
|
from datetime import datetime
|
||||||
|
from unittest.mock import Mock, AsyncMock, patch, MagicMock
|
||||||
|
from backend.app.models import SCTE35Marker
|
||||||
|
from backend.app.recorders.scte35 import SCTE35Manager
|
||||||
|
|
||||||
|
|
||||||
|
class TestSCTE35Encoding:
|
||||||
|
"""Test SCTE35 binary encoding"""
|
||||||
|
|
||||||
|
def test_encode_splice_insert_returns_bytes(self):
|
||||||
|
"""encode_splice_insert returns bytes"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result = manager.encode_splice_insert(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30
|
||||||
|
)
|
||||||
|
assert isinstance(result, bytes)
|
||||||
|
assert len(result) > 0
|
||||||
|
|
||||||
|
def test_encode_splice_insert_starts_with_table_id(self):
|
||||||
|
"""Binary output starts with 0xFC (SCTE35 table ID)"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result = manager.encode_splice_insert(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30
|
||||||
|
)
|
||||||
|
assert result[0] == 0xFC
|
||||||
|
|
||||||
|
def test_encode_splice_insert_event_id_present(self):
|
||||||
|
"""Event ID is encoded in the binary output"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
event_id = 0x12345678
|
||||||
|
result = manager.encode_splice_insert(
|
||||||
|
event_id=event_id,
|
||||||
|
duration_seconds=30
|
||||||
|
)
|
||||||
|
# Event ID should be somewhere in the bytes (after headers)
|
||||||
|
assert len(result) >= 20 # Minimum SCTE35 section length
|
||||||
|
|
||||||
|
def test_encode_splice_insert_duration_flag_set(self):
|
||||||
|
"""Duration > 0 sets duration_flag in flags byte"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result = manager.encode_splice_insert(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30
|
||||||
|
)
|
||||||
|
# Find the flags byte (after event_id in the command)
|
||||||
|
# This is a basic check that encoding succeeded
|
||||||
|
assert len(result) > 0
|
||||||
|
|
||||||
|
def test_encode_splice_insert_no_duration(self):
|
||||||
|
"""Duration = 0 encodes without duration flag"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result = manager.encode_splice_insert(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=0
|
||||||
|
)
|
||||||
|
assert isinstance(result, bytes)
|
||||||
|
assert len(result) > 0
|
||||||
|
|
||||||
|
def test_encode_splice_insert_out_of_network_flag(self):
|
||||||
|
"""Out of network flag is set correctly"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result_oon = manager.encode_splice_insert(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30,
|
||||||
|
out_of_network=True
|
||||||
|
)
|
||||||
|
result_in = manager.encode_splice_insert(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30,
|
||||||
|
out_of_network=False
|
||||||
|
)
|
||||||
|
# Both should be bytes, potentially different lengths or content
|
||||||
|
assert isinstance(result_oon, bytes)
|
||||||
|
assert isinstance(result_in, bytes)
|
||||||
|
|
||||||
|
def test_encode_splice_insert_splice_immediate(self):
|
||||||
|
"""splice_immediate flag prevents PTS encoding"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result = manager.encode_splice_insert(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30,
|
||||||
|
splice_immediate=True
|
||||||
|
)
|
||||||
|
assert isinstance(result, bytes)
|
||||||
|
|
||||||
|
def test_encode_splice_insert_first_byte_is_table_id(self):
|
||||||
|
"""Binary output first byte is exactly 0xFC (SCTE35 table ID)"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result = manager.encode_splice_insert(event_id=1, duration_seconds=30.0)
|
||||||
|
assert result[0] == 0xFC
|
||||||
|
|
||||||
|
def test_encode_splice_insert_event_id_bytes(self):
|
||||||
|
"""Event ID is encoded as 4 big-endian bytes somewhere in the output"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result = manager.encode_splice_insert(event_id=0x12345678, duration_seconds=30.0)
|
||||||
|
event_id_bytes = struct.pack('>I', 0x12345678)
|
||||||
|
assert event_id_bytes in result
|
||||||
|
|
||||||
|
def test_encode_splice_insert_out_of_network_different_bytes(self):
|
||||||
|
"""Out of network flag affects the binary output"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
result_onn = manager.encode_splice_insert(
|
||||||
|
event_id=1, duration_seconds=30.0, out_of_network=True
|
||||||
|
)
|
||||||
|
result_not_onn = manager.encode_splice_insert(
|
||||||
|
event_id=1, duration_seconds=30.0, out_of_network=False
|
||||||
|
)
|
||||||
|
# The results should differ
|
||||||
|
assert result_onn != result_not_onn
|
||||||
|
|
||||||
|
def test_input_validation_negative_duration(self):
|
||||||
|
"""Negative duration raises ValueError"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
with pytest.raises(ValueError, match="non-negative"):
|
||||||
|
manager.encode_splice_insert(event_id=1, duration_seconds=-1.0)
|
||||||
|
|
||||||
|
def test_input_validation_invalid_event_id_too_large(self):
|
||||||
|
"""Event ID out of 32-bit range raises ValueError"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
with pytest.raises(ValueError, match="32-bit"):
|
||||||
|
manager.encode_splice_insert(event_id=0xFFFFFFFF + 1, duration_seconds=30.0)
|
||||||
|
|
||||||
|
def test_input_validation_invalid_event_id_negative(self):
|
||||||
|
"""Negative event ID raises ValueError"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
with pytest.raises(ValueError, match="32-bit"):
|
||||||
|
manager.encode_splice_insert(event_id=-1, duration_seconds=30.0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSCTE35Manager:
|
||||||
|
"""Test SCTE35Manager functionality"""
|
||||||
|
|
||||||
|
def test_manager_initialization(self):
|
||||||
|
"""SCTE35Manager initializes with empty history"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
assert manager.get_marker_history() == []
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_inject_marker_returns_marker(self):
|
||||||
|
"""inject_marker returns SCTE35Marker with correct fields"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
marker = await manager.inject_marker(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30,
|
||||||
|
webhook_url=None,
|
||||||
|
out_of_network=True,
|
||||||
|
splice_immediate=False
|
||||||
|
)
|
||||||
|
assert isinstance(marker, SCTE35Marker)
|
||||||
|
assert marker.event_id == "1" # event_id is stored as string
|
||||||
|
assert marker.duration_seconds == 30
|
||||||
|
assert marker.out_of_network is True
|
||||||
|
assert marker.splice_immediate is False
|
||||||
|
assert isinstance(marker.timestamp, datetime)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_inject_marker_adds_to_history(self):
|
||||||
|
"""inject_marker adds SCTE35Marker to history"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
marker = await manager.inject_marker(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30
|
||||||
|
)
|
||||||
|
history = manager.get_marker_history()
|
||||||
|
assert len(history) == 1
|
||||||
|
assert history[0].event_id == marker.event_id
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_inject_marker_multiple_adds(self):
|
||||||
|
"""Multiple inject_marker calls add to history"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
for i in range(3):
|
||||||
|
await manager.inject_marker(
|
||||||
|
event_id=i,
|
||||||
|
duration_seconds=30
|
||||||
|
)
|
||||||
|
history = manager.get_marker_history()
|
||||||
|
assert len(history) == 3
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_inject_marker_no_webhook_when_url_none(self):
|
||||||
|
"""No HTTP request made when webhook_url is None"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
with patch('aiohttp.ClientSession.post') as mock_post:
|
||||||
|
marker = await manager.inject_marker(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30,
|
||||||
|
webhook_url=None
|
||||||
|
)
|
||||||
|
# Should not attempt to post
|
||||||
|
assert not mock_post.called
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_inject_marker_triggers_webhook(self):
|
||||||
|
"""HTTP POST is made when webhook_url is provided"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
webhook_url = "https://example.com/webhook"
|
||||||
|
|
||||||
|
# Mock the aiohttp session
|
||||||
|
with patch('aiohttp.ClientSession') as mock_session_class:
|
||||||
|
mock_session = AsyncMock()
|
||||||
|
mock_response = AsyncMock()
|
||||||
|
mock_response.status = 200
|
||||||
|
|
||||||
|
# Setup the context managers
|
||||||
|
mock_session.post.return_value = AsyncMock()
|
||||||
|
mock_session.post.return_value.__aenter__ = AsyncMock(return_value=mock_response)
|
||||||
|
mock_session.post.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||||
|
|
||||||
|
mock_session_class.return_value = AsyncMock()
|
||||||
|
mock_session_class.return_value.__aenter__ = AsyncMock(return_value=mock_session)
|
||||||
|
mock_session_class.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||||
|
|
||||||
|
marker = await manager.inject_marker(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30,
|
||||||
|
webhook_url=webhook_url
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify POST was called
|
||||||
|
mock_session.post.assert_called_once()
|
||||||
|
call_args = mock_session.post.call_args
|
||||||
|
assert webhook_url in call_args[0] or webhook_url == call_args[1].get('url', '')
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_webhook_payload_contains_required_fields(self):
|
||||||
|
"""Webhook JSON payload has event_id, duration_seconds, timestamp, cue_hex"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
webhook_url = "https://example.com/webhook"
|
||||||
|
|
||||||
|
with patch('aiohttp.ClientSession') as mock_session_class:
|
||||||
|
mock_session = AsyncMock()
|
||||||
|
mock_response = AsyncMock()
|
||||||
|
mock_response.status = 200
|
||||||
|
|
||||||
|
# Setup the context managers properly
|
||||||
|
mock_session.post.return_value = AsyncMock()
|
||||||
|
mock_session.post.return_value.__aenter__ = AsyncMock(return_value=mock_response)
|
||||||
|
mock_session.post.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||||
|
|
||||||
|
mock_session_class.return_value = AsyncMock()
|
||||||
|
mock_session_class.return_value.__aenter__ = AsyncMock(return_value=mock_session)
|
||||||
|
mock_session_class.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||||
|
|
||||||
|
marker = await manager.inject_marker(
|
||||||
|
event_id=42,
|
||||||
|
duration_seconds=60,
|
||||||
|
webhook_url=webhook_url,
|
||||||
|
out_of_network=True,
|
||||||
|
splice_immediate=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Extract the json argument from the POST call
|
||||||
|
call_args = mock_session.post.call_args
|
||||||
|
json_data = call_args[1].get('json', {})
|
||||||
|
|
||||||
|
assert json_data.get('event_id') == "42" # event_id is a string
|
||||||
|
assert json_data.get('duration_seconds') == 60
|
||||||
|
assert json_data.get('out_of_network') is True
|
||||||
|
assert json_data.get('splice_immediate') is False
|
||||||
|
assert 'timestamp' in json_data
|
||||||
|
assert 'cue_hex' in json_data
|
||||||
|
|
||||||
|
def test_get_marker_history_returns_list(self):
|
||||||
|
"""get_marker_history returns list of markers"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
history = manager.get_marker_history()
|
||||||
|
assert isinstance(history, list)
|
||||||
|
|
||||||
|
def test_get_marker_history_returns_copy(self):
|
||||||
|
"""get_marker_history returns a copy, not reference"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
history1 = manager.get_marker_history()
|
||||||
|
history2 = manager.get_marker_history()
|
||||||
|
# Should be equal but different objects
|
||||||
|
assert history1 == history2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_clear_history(self):
|
||||||
|
"""clear_history empties the history list"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
# Add some markers
|
||||||
|
for i in range(3):
|
||||||
|
await manager.inject_marker(
|
||||||
|
event_id=i,
|
||||||
|
duration_seconds=30
|
||||||
|
)
|
||||||
|
assert len(manager.get_marker_history()) == 3
|
||||||
|
|
||||||
|
# Clear
|
||||||
|
manager.clear_history()
|
||||||
|
assert len(manager.get_marker_history()) == 0
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_marker_timestamp_set(self):
|
||||||
|
"""Marker timestamp is set when created"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
before = datetime.utcnow()
|
||||||
|
marker = await manager.inject_marker(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30
|
||||||
|
)
|
||||||
|
after = datetime.utcnow()
|
||||||
|
assert before <= marker.timestamp <= after
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_inject_marker_webhook_exception_handling(self):
|
||||||
|
"""inject_marker handles webhook errors gracefully"""
|
||||||
|
manager = SCTE35Manager()
|
||||||
|
webhook_url = "https://example.com/webhook"
|
||||||
|
|
||||||
|
with patch('aiohttp.ClientSession') as mock_session_class:
|
||||||
|
mock_session = AsyncMock()
|
||||||
|
|
||||||
|
# Setup post to raise an exception
|
||||||
|
mock_session.post.side_effect = Exception("Connection failed")
|
||||||
|
|
||||||
|
mock_session_class.return_value = AsyncMock()
|
||||||
|
mock_session_class.return_value.__aenter__ = AsyncMock(return_value=mock_session)
|
||||||
|
mock_session_class.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||||
|
|
||||||
|
# Should not raise, should still add marker to history
|
||||||
|
marker = await manager.inject_marker(
|
||||||
|
event_id=1,
|
||||||
|
duration_seconds=30,
|
||||||
|
webhook_url=webhook_url
|
||||||
|
)
|
||||||
|
|
||||||
|
# Marker should still be added despite webhook failure
|
||||||
|
assert len(manager.get_marker_history()) == 1
|
||||||
Loading…
Reference in a new issue