dragon-iso/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs
Zac Gaetano 9cb1cc7b3d
Some checks failed
CI / build-and-test (push) Failing after 29s
fix: review findings on the polish + active-speaker batch
Two real concerns from the code review on ab07297..b266623:

1. ActiveSpeaker removal poisoned the rename-window heuristic. ParticipantTracker.HandleRemoved appends to _recentlyRemoved keyed by MachineName alone; the next Participant Add on the same machine consulted that list with no kind discrimination, so an active-speaker disappearance immediately followed by a participant joining (very common: Teams renames its outputs as participants enter/leave) would cause the new participant to inherit the auto-mix's deterministic v5 GUID. New HandleAutoMixRemoved deliberately skips _recentlyRemoved — the auto-mix row's identity is already stable via the deterministic Id, so re-add restores it without the rename window.

2. IsoPipeline.State writes were not synchronized. Supervisor loop sets State on its own thread; UI thread reads from GetStats. Without volatility, the JIT could cache the field in a register and the UI would stay stuck on Receiving even after Error. Backing field is now an int read/written via Volatile.Read/Volatile.Write, matching the pattern already used for _liveReceiver / _liveSender / _liveProcessor.

Tests: 79/79 (was 78) — added ParticipantTrackerTests.ActiveSpeakerRemove_DoesNotPoisonRenameWindowForLaterParticipant which would have caught (1).
2026-05-09 09:34:16 -04:00

384 lines
15 KiB
C#

using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using TeamsISO.Engine.Domain;
using TeamsISO.Engine.Interop;
namespace TeamsISO.Engine.Pipeline;
/// <summary>
/// Per-ISO unit. Owns one capture loop, one frame processor, one send loop, and the
/// supervisor that restarts the inner pipeline with exponential backoff on failure.
/// </summary>
public sealed class IsoPipeline : IAsyncDisposable
{
private Func<CancellationToken, Task> _runInner;
private readonly ExponentialBackoff _backoff;
private readonly Func<TimeSpan, CancellationToken, Task> _delay;
private readonly ILogger<IsoPipeline> _logger;
private CancellationTokenSource? _cts;
private Task? _supervisorTask;
private int _consecutiveFailures;
// Refs to the currently-live receiver, sender, and frame processor, set by the
// inner loop on each restart. Reads via Volatile.Read are safe from any thread
// (UI's stats poll).
private NdiReceiver? _liveReceiver;
private NdiSender? _liveSender;
private FrameProcessor? _liveProcessor;
// Last-frame metadata, snapshotted out of the RawFrame on capture so we don't
// hold a reference to the frame's pixel buffer past its useful life. Two ints
// and a DateTimeOffset are atomically writable on x64; we accept tearing on x86
// (purely advisory stats display).
private int _lastWidth;
private int _lastHeight;
private DateTimeOffset? _lastReceivedAt;
// Ring buffer of the last 30 incoming-frame timestamps for live fps display.
// Updated on the receiver's capture thread (single writer) and read by the UI
// poll thread (single reader); we use a lock for the snapshot path because
// an array-of-ticks can't be torn-read atomically.
private readonly long[] _frameTimes = new long[30];
private int _frameTimesHead;
private int _frameTimesCount;
private readonly object _frameTimesGate = new();
public Guid ParticipantId { get; }
// Backing field for State, accessed via Volatile.Read/Write so the supervisor
// loop's writes are observed promptly by the UI thread's stats poll.
private int _state = (int)IsoState.Idle;
public IsoState State
{
get => (IsoState)Volatile.Read(ref _state);
private set => Volatile.Write(ref _state, (int)value);
}
public int ConsecutiveFailures => _consecutiveFailures;
/// <summary>
/// Snapshot of the pipeline's current health. Safe to call from any thread; values
/// are inherently a moment-in-time view and may change immediately. Returns
/// <see cref="Domain.IsoHealthStats.Empty"/> when no inner pipeline is currently
/// running (e.g. between supervisor restarts or after final failure).
/// </summary>
public Domain.IsoHealthStats GetStats()
{
var receiver = Volatile.Read(ref _liveReceiver);
var sender = Volatile.Read(ref _liveSender);
var processor = Volatile.Read(ref _liveProcessor);
var w = Volatile.Read(ref _lastWidth);
var h = Volatile.Read(ref _lastHeight);
var lastAt = _lastReceivedAt;
if (receiver is null || sender is null)
return Domain.IsoHealthStats.Empty;
// FrameProcessor.Stats already aggregates FramesDropped (older frames dropped
// by the closest-frame strategy when the input channel had backlog) and
// FramesDuplicated (last-frame re-emits when no new frame arrived this tick).
var procStats = processor?.Stats;
return new Domain.IsoHealthStats(
FramesIn: receiver.FramesCaptured,
FramesOut: sender.FramesSent,
FramesDropped: procStats?.FramesDropped ?? 0,
FramesDuplicated: procStats?.FramesDuplicated ?? 0,
LastFrameAt: lastAt,
IncomingFps: ComputeFps(),
IncomingWidth: w,
IncomingHeight: h)
{
State = State,
};
}
/// <summary>
/// Computes a moving-average incoming framerate from the last N frame timestamps.
/// Rate = (count - 1) / (newest - oldest). Returns 0 if fewer than 2 frames are
/// recorded or if the window is degenerate (clock skew, all-equal timestamps).
/// </summary>
private double ComputeFps()
{
long oldest, newest;
int count;
lock (_frameTimesGate)
{
count = _frameTimesCount;
if (count < 2) return 0;
// Oldest is at the slot AFTER head when buffer is full; otherwise at index 0.
var oldestIdx = count < _frameTimes.Length
? 0
: _frameTimesHead;
var newestIdx = (_frameTimesHead - 1 + _frameTimes.Length) % _frameTimes.Length;
oldest = _frameTimes[oldestIdx];
newest = _frameTimes[newestIdx];
}
var deltaTicks = newest - oldest;
if (deltaTicks <= 0) return 0;
var seconds = deltaTicks / (double)TimeSpan.TicksPerSecond;
return (count - 1) / seconds;
}
private void RecordFrameTimestamp(long ticks)
{
lock (_frameTimesGate)
{
_frameTimes[_frameTimesHead] = ticks;
_frameTimesHead = (_frameTimesHead + 1) % _frameTimes.Length;
if (_frameTimesCount < _frameTimes.Length) _frameTimesCount++;
}
}
private void ResetFrameTimestamps()
{
lock (_frameTimesGate)
{
_frameTimesHead = 0;
_frameTimesCount = 0;
}
}
/// <summary>
/// Test ctor. The caller supplies the inner runner directly so failures and lifetimes
/// can be controlled from a unit test.
/// </summary>
internal IsoPipeline(
Guid participantId,
Func<CancellationToken, Task> runInner,
ExponentialBackoff backoff,
Func<TimeSpan, CancellationToken, Task> delay,
ILoggerFactory loggerFactory)
{
ParticipantId = participantId;
_runInner = runInner;
_backoff = backoff;
_delay = delay;
_logger = loggerFactory.CreateLogger<IsoPipeline>();
}
/// <summary>
/// Production ctor. Builds the inner runner from the engine dependencies.
/// </summary>
public IsoPipeline(
IsoPipelineConfig config,
INdiInterop interop,
IFrameScaler scaler,
IFrameClock frameClock,
ExponentialBackoff backoff,
Func<TimeSpan, CancellationToken, Task> delay,
ILoggerFactory loggerFactory)
: this(config.ParticipantId,
// The inner-runner closure captures `this` so the receiver/sender
// wired by RunInnerPipelineAsync can be published to instance fields
// for stats reads.
default(Func<CancellationToken, Task>)!,
backoff,
delay,
loggerFactory)
{
_runInner = ct => RunInnerPipelineAsync(
config, interop, scaler, frameClock, loggerFactory, ct,
onLive: (recv, send, proc) =>
{
Volatile.Write(ref _liveReceiver, recv);
Volatile.Write(ref _liveSender, send);
Volatile.Write(ref _liveProcessor, proc);
ResetFrameTimestamps(); // fresh window on every supervisor restart
},
onClear: () =>
{
Volatile.Write(ref _liveReceiver, null);
Volatile.Write(ref _liveSender, null);
Volatile.Write(ref _liveProcessor, null);
ResetFrameTimestamps();
},
onFrame: frame =>
{
// Snapshot dimensions only — don't hold the RawFrame reference past
// the channel write so the GC can reclaim it on schedule, and so a
// late stats read can never resurrect a dropped frame's pixel buffer.
Volatile.Write(ref _lastWidth, frame.Width);
Volatile.Write(ref _lastHeight, frame.Height);
var nowTicks = DateTimeOffset.UtcNow.UtcTicks;
_lastReceivedAt = new DateTimeOffset(nowTicks, TimeSpan.Zero);
RecordFrameTimestamp(nowTicks);
});
}
/// <summary>Starts the supervisor. Returns immediately; pipeline runs in the background.</summary>
public Task StartAsync()
{
if (_supervisorTask is not null)
throw new InvalidOperationException("Pipeline already started.");
_cts = new CancellationTokenSource();
State = IsoState.Receiving;
_supervisorTask = SupervisorLoopAsync(_cts.Token);
return Task.CompletedTask;
}
/// <summary>Stops the pipeline and awaits supervisor completion.</summary>
public async Task StopAsync()
{
if (_cts is null) return;
_cts.Cancel();
if (_supervisorTask is not null)
{
try { await _supervisorTask; }
catch (OperationCanceledException) { /* expected */ }
}
State = IsoState.Idle;
_cts.Dispose();
_cts = null;
_supervisorTask = null;
}
public async ValueTask DisposeAsync()
{
await StopAsync();
}
private async Task SupervisorLoopAsync(CancellationToken ct)
{
_consecutiveFailures = 0;
while (!ct.IsCancellationRequested)
{
try
{
await _runInner(ct);
// Inner exited normally (typically only on cancellation) — leave the loop.
break;
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_consecutiveFailures++;
_logger.LogWarning(ex,
"Pipeline {ParticipantId} failed (consecutive failure #{N}).",
ParticipantId, _consecutiveFailures);
if (_backoff.ShouldGiveUp(_consecutiveFailures))
{
State = IsoState.Error;
_logger.LogError("Pipeline {ParticipantId} giving up after {N} consecutive failures.",
ParticipantId, _consecutiveFailures);
return;
}
var wait = _backoff.NextDelay(_consecutiveFailures);
_logger.LogInformation("Pipeline {ParticipantId} retrying in {Delay}.", ParticipantId, wait);
try
{
await _delay(wait, ct);
}
catch (OperationCanceledException)
{
break;
}
State = IsoState.Receiving;
}
}
}
/// <summary>
/// Default inner pipeline: spins up receiver → processor → sender on bounded channels
/// and awaits all three. Throws if any of them throws.
///
/// The optional <paramref name="onLive"/> / <paramref name="onClear"/> / <paramref name="onFrame"/>
/// callbacks let the outer <see cref="IsoPipeline"/> publish references to the live
/// receiver and sender (so it can read counters from any thread for health stats)
/// and observe the most recent received frame (so source resolution / last-seen-at
/// can be surfaced in the UI). All three are no-ops by default.
/// </summary>
private static async Task RunInnerPipelineAsync(
IsoPipelineConfig config,
INdiInterop interop,
IFrameScaler scaler,
IFrameClock frameClock,
ILoggerFactory loggerFactory,
CancellationToken ct,
Action<NdiReceiver, NdiSender, FrameProcessor>? onLive = null,
Action? onClear = null,
Action<RawFrame>? onFrame = null)
{
var rawChannel = Channel.CreateBounded<RawFrame>(new BoundedChannelOptions(config.RawChannelCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = true,
});
var processedChannel = Channel.CreateBounded<ProcessedFrame>(new BoundedChannelOptions(config.ProcessedChannelCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = true,
});
// Tap the raw frames as they flow into the channel so the host can show "last
// frame at" / source resolution without us re-implementing a probe.
var rawWriter = onFrame is null
? rawChannel.Writer
: new TappedChannelWriter<RawFrame>(rawChannel.Writer, onFrame);
using var receiver = new NdiReceiver(
interop, config.SourceName, rawWriter, loggerFactory.CreateLogger<NdiReceiver>());
using var sender = new NdiSender(
interop, config.OutputName, processedChannel.Reader, loggerFactory.CreateLogger<NdiSender>(),
config.OutputGroups);
var processor = new FrameProcessor(
config.Settings, scaler, new SolidFrameRenderer(),
frameClock, rawChannel.Reader, processedChannel.Writer,
config.SlateThreshold, loggerFactory.CreateLogger<FrameProcessor>());
onLive?.Invoke(receiver, sender, processor);
var receiverTask = receiver.RunAsync(ct);
var senderTask = sender.RunAsync(ct);
var processorTask = ProcessorLoopAsync(processor, frameClock, ct);
try
{
await Task.WhenAll(receiverTask, senderTask, processorTask);
}
finally
{
rawChannel.Writer.TryComplete();
processedChannel.Writer.TryComplete();
onClear?.Invoke();
}
}
/// <summary>
/// Channel-writer wrapper that fires a callback on every successful write but
/// otherwise behaves identically to the inner writer. Used to tap the raw-frame
/// stream for stats without entangling the receiver with the stats API.
/// </summary>
private sealed class TappedChannelWriter<T> : ChannelWriter<T>
{
private readonly ChannelWriter<T> _inner;
private readonly Action<T> _onWrite;
public TappedChannelWriter(ChannelWriter<T> inner, Action<T> onWrite) { _inner = inner; _onWrite = onWrite; }
public override bool TryWrite(T item)
{
if (_inner.TryWrite(item)) { _onWrite(item); return true; }
return false;
}
public override ValueTask<bool> WaitToWriteAsync(CancellationToken ct = default)
=> _inner.WaitToWriteAsync(ct);
public override bool TryComplete(Exception? error = null) => _inner.TryComplete(error);
}
private static async Task ProcessorLoopAsync(FrameProcessor processor, IFrameClock clock, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var advanced = await clock.WaitForNextTickAsync(ct);
if (!advanced) break;
await processor.ProcessOnceAsync(ct);
}
}
}