diff --git a/src/TeamsISO.Engine/Pipeline/IRecorderSink.cs b/src/TeamsISO.Engine/Pipeline/IRecorderSink.cs new file mode 100644 index 0000000..9004f66 --- /dev/null +++ b/src/TeamsISO.Engine/Pipeline/IRecorderSink.cs @@ -0,0 +1,58 @@ +namespace TeamsISO.Engine.Pipeline; + +/// +/// Tap point for per-ISO output recording. Each can be +/// wired with one recorder; when present, every that +/// flows from to is also fed +/// to the recorder for persistence to disk. +/// +/// Lifecycle: +/// 1. is called once when the pipeline starts (or restarts). +/// 2. is called for every processed frame, in order. +/// 3. is called once when the pipeline stops or fails. +/// +/// Implementations must be tolerant of out-of-order calls (Close before Open, +/// double Close, WriteFrame after Close) — the supervisor's restart logic can +/// race in unusual ways. The simplest correct implementation is to track an +/// _isOpen flag and short-circuit when not open. +/// +public interface IRecorderSink : IAsyncDisposable +{ + /// + /// Open the underlying file/encoder and prepare to receive frames. Width/Height + /// match the pipeline's normalized output resolution; FPS is the target framerate + /// (incoming frames may arrive with timing jitter, but the recorder writes at the + /// nominal rate for downstream playback consistency). + /// + /// Used to derive the output filename. + /// Directory under which the recording is created. + /// Frame width in pixels. + /// Frame height in pixels. + /// Nominal framerate. + void Open(string participantDisplayName, string outputDirectory, int width, int height, double fps); + + /// + /// Write one processed frame. Implementations should not block — if encoding is + /// expensive, queue the frame to a worker thread and return promptly. Returning + /// false means the recorder dropped the frame (disk full, queue overflow); the + /// pipeline carries on regardless so a recorder failure never kills the live ISO. + /// + bool WriteFrame(ProcessedFrame frame); + + /// + /// Flush and finalize the output. Idempotent. + /// + void Close(); + + /// True between successful and . + bool IsRecording { get; } + + /// + /// Drop a timestamped marker into the recording. Used by the operator to + /// chapter a recording in real time — "host intro starts here", "guest + /// answer", etc. — so post-production can jump to the right moment without + /// scrubbing through the raw stream. The label is free-form; an empty + /// label means "unnamed marker." No-op when not recording. + /// + void AddMarker(string label); +} diff --git a/src/TeamsISO.Engine/Pipeline/RawBgraRecorderSink.cs b/src/TeamsISO.Engine/Pipeline/RawBgraRecorderSink.cs new file mode 100644 index 0000000..8140858 --- /dev/null +++ b/src/TeamsISO.Engine/Pipeline/RawBgraRecorderSink.cs @@ -0,0 +1,256 @@ +using System.IO; +using System.Text.Json; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; + +namespace TeamsISO.Engine.Pipeline; + +/// +/// Concrete that writes the processed BGRA stream to +/// disk along with a sidecar JSON manifest and an ffmpeg.cmd conversion +/// script. We deliberately do NOT depend on Media Foundation, libx264, or any +/// other native encoder for v1: the engine stays self-contained, and operators +/// who want H.264 .mkv output can run the generated ffmpeg.cmd after the +/// recording finishes (FFmpeg on PATH is a common operator install). +/// +/// Files produced under <outputDir>/<sanitized-display-name>/: +/// +/// video.bgra — raw concatenated frames at width*height*4 bytes each. +/// manifest.json — width, height, fps, format, started/ended timestamps, +/// frame count. Lets a post-processor reconstruct timing without parsing the .bgra. +/// convert.cmd — one-liner that pipes the .bgra into ffmpeg to produce +/// a final output.mkv at H.264. Operators just double-click. +/// +/// +/// Disk pressure: BGRA at 1080p60 is ~500 MB/s, at 720p30 it's ~88 MB/s. A 30-min +/// recording at the default 720p30 takes ~150 GB. Operators should record at the +/// lowest acceptable resolution / framerate, or enable recording only on the +/// participants they intend to keep. A future MediaFoundationRecorderSink +/// would compress in-process and reduce this 10× — see _NEXT.md. +/// +/// Threading: writes are serialized through a single bounded channel so the +/// pipeline's processor thread never blocks on disk I/O. If the disk can't keep +/// up, frames are dropped (and the manifest's drop counter increments) so the +/// live ISO output is never delayed. +/// +public sealed class RawBgraRecorderSink : IRecorderSink +{ + private readonly ILogger? _logger; + private readonly Channel _queue; + private CancellationTokenSource? _writerCts; + private Task? _writerTask; + private FileStream? _videoStream; + private string? _recordingDir; + private int _width; + private int _height; + private double _fps; + private long _framesWritten; + private long _framesDropped; + private DateTimeOffset _startedAt; + private string _displayName = string.Empty; + + // Operator-dropped markers, recorded in wall-clock order. We accumulate them + // in memory during the recording and write to manifest.json on Close. Lock + // is required because AddMarker can be called from the UI thread while the + // writer task drains video frames in the background. + private readonly List _markers = new(); + private readonly object _markersGate = new(); + private sealed record MarkerEntry(double OffsetMs, string Label); + + public bool IsRecording { get; private set; } + + public RawBgraRecorderSink(ILogger? logger = null) + { + _logger = logger; + // Bounded queue with DropOldest: if the disk falls behind, lose the + // oldest frames and keep recording — better than blocking the pipeline. + // 240 frames ≈ 4 seconds @ 60 fps; gives us a buffer for transient + // disk hiccups without unbounded RAM growth on a stuck volume. + _queue = Channel.CreateBounded(new BoundedChannelOptions(240) + { + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = false, // pipeline thread + close() can both write + }); + } + + public void Open(string participantDisplayName, string outputDirectory, int width, int height, double fps) + { + if (IsRecording) return; + + _width = width; + _height = height; + _fps = fps; + _startedAt = DateTimeOffset.Now; + _framesWritten = 0; + _framesDropped = 0; + _displayName = participantDisplayName; + + var safeName = SanitizeForFileName(participantDisplayName); + _recordingDir = Path.Combine(outputDirectory, safeName); + Directory.CreateDirectory(_recordingDir); + + var videoPath = Path.Combine(_recordingDir, "video.bgra"); + // Pre-allocate via FileStream with sequential-scan hint; the writer thread + // appends. Buffer size tuned to one full frame so writes are aligned and + // we don't fragment on common allocators. + var bufferSize = Math.Max(width * height * 4, 64 * 1024); + _videoStream = new FileStream( + videoPath, + FileMode.Create, FileAccess.Write, FileShare.Read, + bufferSize: bufferSize, + FileOptions.SequentialScan); + + _writerCts = new CancellationTokenSource(); + _writerTask = Task.Run(() => WriterLoopAsync(_writerCts.Token)); + + IsRecording = true; + _logger?.LogInformation( + "Recorder open: {Path} ({W}x{H}@{Fps:F2}fps)", + _recordingDir, width, height, fps); + } + + public bool WriteFrame(ProcessedFrame frame) + { + if (!IsRecording) return false; + if (_queue.Writer.TryWrite(frame)) return true; + Interlocked.Increment(ref _framesDropped); + return false; + } + + public void AddMarker(string label) + { + if (!IsRecording) return; + var offsetMs = (DateTimeOffset.Now - _startedAt).TotalMilliseconds; + lock (_markersGate) + { + _markers.Add(new MarkerEntry(offsetMs, label ?? string.Empty)); + } + } + + public void Close() + { + if (!IsRecording) return; + IsRecording = false; + + // Mark the writer queue complete; the writer task drains remaining frames. + _queue.Writer.TryComplete(); + try { _writerTask?.Wait(TimeSpan.FromSeconds(5)); } + catch { /* defensive: writer task may have already faulted */ } + _writerCts?.Cancel(); + _writerCts?.Dispose(); + _writerCts = null; + _writerTask = null; + + try { _videoStream?.Flush(); _videoStream?.Dispose(); } + catch { /* defensive */ } + _videoStream = null; + + if (_recordingDir is not null) + { + TryWriteManifest(); + TryWriteFfmpegScript(); + } + + _logger?.LogInformation( + "Recorder closed: {Frames} written, {Dropped} dropped to {Path}", + _framesWritten, _framesDropped, _recordingDir); + } + + public async ValueTask DisposeAsync() + { + Close(); + await Task.CompletedTask; + } + + private async Task WriterLoopAsync(CancellationToken ct) + { + try + { + await foreach (var frame in _queue.Reader.ReadAllAsync(ct)) + { + if (_videoStream is null) break; + try + { + // ProcessedFrame.Pixels is a ReadOnlyMemory; FileStream.Write + // accepts ReadOnlySpan so we can write without an extra copy. + _videoStream.Write(frame.Pixels.Span); + Interlocked.Increment(ref _framesWritten); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Recorder write failed; will count as dropped."); + Interlocked.Increment(ref _framesDropped); + } + } + } + catch (OperationCanceledException) { /* expected */ } + } + + private void TryWriteManifest() + { + try + { + MarkerEntry[] markersSnapshot; + lock (_markersGate) { markersSnapshot = _markers.ToArray(); } + + var manifest = new + { + schema = "teamsiso-recorder/v1", + participantDisplayName = _displayName, + width = _width, + height = _height, + fps = _fps, + pixelFormat = "BGRA", + bytesPerFrame = _width * _height * 4, + startedAt = _startedAt.ToString("o"), + endedAt = DateTimeOffset.Now.ToString("o"), + framesWritten = Interlocked.Read(ref _framesWritten), + framesDropped = Interlocked.Read(ref _framesDropped), + markers = markersSnapshot.Select(m => new { offsetMs = m.OffsetMs, label = m.Label }).ToArray(), + }; + var json = JsonSerializer.Serialize(manifest, new JsonSerializerOptions { WriteIndented = true }); + File.WriteAllText(Path.Combine(_recordingDir!, "manifest.json"), json); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Failed to write recorder manifest."); + } + } + + private void TryWriteFfmpegScript() + { + try + { + // Operators rarely have FFmpeg's exact rawvideo-to-MKV recipe memorized; + // ship a one-liner so they can convert by double-clicking. Quotes around + // the input filename so paths with spaces still work; -y auto-overwrites + // an existing output.mkv if the operator runs the script twice. + var script = + "@echo off\r\n" + + "REM Convert raw BGRA recording to H.264 MKV. Requires FFmpeg on PATH (download from ffmpeg.org).\r\n" + + $"ffmpeg -y -f rawvideo -pix_fmt bgra -s {_width}x{_height} -r {_fps:F2} -i video.bgra " + + "-c:v libx264 -preset medium -crf 18 -pix_fmt yuv420p output.mkv\r\n" + + "if errorlevel 1 (echo FFmpeg failed. Is it installed and on PATH?) else (echo Wrote output.mkv)\r\n" + + "pause\r\n"; + File.WriteAllText(Path.Combine(_recordingDir!, "convert.cmd"), script); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Failed to write recorder convert.cmd."); + } + } + + /// + /// Strip characters that would make the display name an invalid Windows + /// filename (or path-traversal vector). Empty or all-stripped names fall + /// back to "participant" so we always have a usable directory name. + /// + private static string SanitizeForFileName(string name) + { + if (string.IsNullOrWhiteSpace(name)) return "participant"; + var invalid = Path.GetInvalidFileNameChars(); + var clean = new string(name.Where(c => !invalid.Contains(c) && c != '.').ToArray()).Trim(); + return string.IsNullOrEmpty(clean) ? "participant" : clean; + } +}