diff --git a/src/TeamsISO.App/MainWindow.xaml b/src/TeamsISO.App/MainWindow.xaml index 9f8fd2c..4c3c4d6 100644 --- a/src/TeamsISO.App/MainWindow.xaml +++ b/src/TeamsISO.App/MainWindow.xaml @@ -276,9 +276,34 @@ - + + + + + + + + + + + + + + + + + + + + + diff --git a/src/TeamsISO.App/ViewModels/MainViewModel.cs b/src/TeamsISO.App/ViewModels/MainViewModel.cs index 79ecb51..2d2043e 100644 --- a/src/TeamsISO.App/ViewModels/MainViewModel.cs +++ b/src/TeamsISO.App/ViewModels/MainViewModel.cs @@ -18,6 +18,7 @@ public sealed class MainViewModel : ObservableObject, IDisposable private readonly Dispatcher _dispatcher; private readonly IDisposable _participantsSub; private readonly IDisposable _alertsSub; + private readonly DispatcherTimer _statsTimer; private readonly Dictionary _byId = new(); private string _statusText = "Starting…"; @@ -49,6 +50,33 @@ public sealed class MainViewModel : ObservableObject, IDisposable { AlertBanner.Current = alert; }); + + // 1 Hz stats poll — pull live frame counters from each running pipeline and + // push them onto the per-participant view models. Cheap (just reads volatile + // fields on the engine side) and runs on the UI dispatcher so SetField is safe. + _statsTimer = new DispatcherTimer(DispatcherPriority.Background, _dispatcher) + { + Interval = TimeSpan.FromSeconds(1), + }; + _statsTimer.Tick += OnStatsTick; + _statsTimer.Start(); + } + + private void OnStatsTick(object? sender, EventArgs e) + { + foreach (var vm in Participants) + { + try + { + var stats = _controller.GetStats(vm.Id); + vm.UpdateStats(stats); + } + catch + { + // Stats are advisory; never let a transient read failure + // tear down the timer or surface an error to the user. + } + } } public async Task InitializeAsync(CancellationToken cancellationToken) @@ -99,6 +127,8 @@ public sealed class MainViewModel : ObservableObject, IDisposable public void Dispose() { + _statsTimer.Stop(); + _statsTimer.Tick -= OnStatsTick; _participantsSub.Dispose(); _alertsSub.Dispose(); } diff --git a/src/TeamsISO.App/ViewModels/ParticipantViewModel.cs b/src/TeamsISO.App/ViewModels/ParticipantViewModel.cs index 1d41a0d..cc0f388 100644 --- a/src/TeamsISO.App/ViewModels/ParticipantViewModel.cs +++ b/src/TeamsISO.App/ViewModels/ParticipantViewModel.cs @@ -1,5 +1,6 @@ using TeamsISO.Engine.Controller; using TeamsISO.Engine.Domain; +using IsoHealthStats = TeamsISO.Engine.Domain.IsoHealthStats; namespace TeamsISO.App.ViewModels; @@ -35,6 +36,29 @@ public sealed class ParticipantViewModel : ObservableObject set => SetField(ref _isEnabled, value); } + private long _framesIn; + private long _framesOut; + private string _incomingResolution = "—"; + + /// Number of frames the receiver has captured so far. + public long FramesIn { get => _framesIn; private set => SetField(ref _framesIn, value); } + + /// Number of frames the sender has emitted so far. + public long FramesOut { get => _framesOut; private set => SetField(ref _framesOut, value); } + + /// Source resolution as "WxH", or em-dash when no frames have been seen yet. + public string IncomingResolution { get => _incomingResolution; private set => SetField(ref _incomingResolution, value); } + + /// Updates the live stats display from a controller-side snapshot. + public void UpdateStats(IsoHealthStats stats) + { + FramesIn = stats.FramesIn; + FramesOut = stats.FramesOut; + IncomingResolution = stats.IncomingWidth > 0 && stats.IncomingHeight > 0 + ? $"{stats.IncomingWidth}×{stats.IncomingHeight}" + : "—"; + } + public bool IsProcessing { get => _isProcessing; diff --git a/src/TeamsISO.Engine/Controller/IsoController.cs b/src/TeamsISO.Engine/Controller/IsoController.cs index c530599..fdda5ec 100644 --- a/src/TeamsISO.Engine/Controller/IsoController.cs +++ b/src/TeamsISO.Engine/Controller/IsoController.cs @@ -97,12 +97,15 @@ public sealed class IsoController : IIsoController public IsoHealthStats GetStats(Guid participantId) { + IsoPipeline? pipeline; lock (_gate) { - return _pipelines.TryGetValue(participantId, out var pipeline) - ? IsoHealthStats.Empty // production wires pipeline.Stats; Phase B-1 leaves this stub - : IsoHealthStats.Empty; + if (!_pipelines.TryGetValue(participantId, out pipeline)) + return IsoHealthStats.Empty; } + // GetStats() is thread-safe and fast; pull outside the gate so a slow stats + // read doesn't serialize the controller's other operations. + return pipeline.GetStats(); } public async Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken) diff --git a/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs b/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs index cb7f385..1f549c1 100644 --- a/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs +++ b/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs @@ -11,7 +11,7 @@ namespace TeamsISO.Engine.Pipeline; /// public sealed class IsoPipeline : IAsyncDisposable { - private readonly Func _runInner; + private Func _runInner; private readonly ExponentialBackoff _backoff; private readonly Func _delay; private readonly ILogger _logger; @@ -20,10 +20,44 @@ public sealed class IsoPipeline : IAsyncDisposable private Task? _supervisorTask; private int _consecutiveFailures; + // Refs to the currently-live receiver and sender, set by the inner loop on each + // restart. Reads via Volatile.Read are safe from any thread (UI's stats poll). + private NdiReceiver? _liveReceiver; + private NdiSender? _liveSender; + private RawFrame? _lastReceivedFrame; + private DateTimeOffset? _lastReceivedAt; + public Guid ParticipantId { get; } public IsoState State { get; private set; } = IsoState.Idle; public int ConsecutiveFailures => _consecutiveFailures; + /// + /// Snapshot of the pipeline's current health. Safe to call from any thread; values + /// are inherently a moment-in-time view and may change immediately. Returns + /// when no inner pipeline is currently + /// running (e.g. between supervisor restarts or after final failure). + /// + public Domain.IsoHealthStats GetStats() + { + var receiver = Volatile.Read(ref _liveReceiver); + var sender = Volatile.Read(ref _liveSender); + var lastFrame = Volatile.Read(ref _lastReceivedFrame); + var lastAt = _lastReceivedAt; + + if (receiver is null || sender is null) + return Domain.IsoHealthStats.Empty; + + return new Domain.IsoHealthStats( + FramesIn: receiver.FramesCaptured, + FramesOut: sender.FramesSent, + FramesDropped: 0, // FrameProcessor currently doesn't surface drops; wire later + FramesDuplicated: 0, // same — last-frame re-emits aren't counted yet + LastFrameAt: lastAt, + IncomingFps: 0, // running rate computation is a follow-up + IncomingWidth: lastFrame?.Width ?? 0, + IncomingHeight: lastFrame?.Height ?? 0); + } + /// /// Test ctor. The caller supplies the inner runner directly so failures and lifetimes /// can be controlled from a unit test. @@ -54,10 +88,32 @@ public sealed class IsoPipeline : IAsyncDisposable Func delay, ILoggerFactory loggerFactory) : this(config.ParticipantId, - ct => RunInnerPipelineAsync(config, interop, scaler, frameClock, loggerFactory, ct), + // The inner-runner closure captures `this` so the receiver/sender + // wired by RunInnerPipelineAsync can be published to instance fields + // for stats reads. + default(Func)!, backoff, delay, - loggerFactory) { } + loggerFactory) + { + _runInner = ct => RunInnerPipelineAsync( + config, interop, scaler, frameClock, loggerFactory, ct, + onLive: (recv, send) => + { + Volatile.Write(ref _liveReceiver, recv); + Volatile.Write(ref _liveSender, send); + }, + onClear: () => + { + Volatile.Write(ref _liveReceiver, null); + Volatile.Write(ref _liveSender, null); + }, + onFrame: frame => + { + Volatile.Write(ref _lastReceivedFrame, frame); + _lastReceivedAt = DateTimeOffset.UtcNow; + }); + } /// Starts the supervisor. Returns immediately; pipeline runs in the background. public Task StartAsync() @@ -139,6 +195,12 @@ public sealed class IsoPipeline : IAsyncDisposable /// /// Default inner pipeline: spins up receiver → processor → sender on bounded channels /// and awaits all three. Throws if any of them throws. + /// + /// The optional / / + /// callbacks let the outer publish references to the live + /// receiver and sender (so it can read counters from any thread for health stats) + /// and observe the most recent received frame (so source resolution / last-seen-at + /// can be surfaced in the UI). All three are no-ops by default. /// private static async Task RunInnerPipelineAsync( IsoPipelineConfig config, @@ -146,7 +208,10 @@ public sealed class IsoPipeline : IAsyncDisposable IFrameScaler scaler, IFrameClock frameClock, ILoggerFactory loggerFactory, - CancellationToken ct) + CancellationToken ct, + Action? onLive = null, + Action? onClear = null, + Action? onFrame = null) { var rawChannel = Channel.CreateBounded(new BoundedChannelOptions(config.RawChannelCapacity) { @@ -161,12 +226,20 @@ public sealed class IsoPipeline : IAsyncDisposable SingleWriter = true, }); + // Tap the raw frames as they flow into the channel so the host can show "last + // frame at" / source resolution without us re-implementing a probe. + var rawWriter = onFrame is null + ? rawChannel.Writer + : new TappedChannelWriter(rawChannel.Writer, onFrame); + using var receiver = new NdiReceiver( - interop, config.SourceName, rawChannel.Writer, loggerFactory.CreateLogger()); + interop, config.SourceName, rawWriter, loggerFactory.CreateLogger()); using var sender = new NdiSender( interop, config.OutputName, processedChannel.Reader, loggerFactory.CreateLogger(), config.OutputGroups); + onLive?.Invoke(receiver, sender); + var processor = new FrameProcessor( config.Settings, scaler, new SolidFrameRenderer(), frameClock, rawChannel.Reader, processedChannel.Writer, @@ -184,9 +257,30 @@ public sealed class IsoPipeline : IAsyncDisposable { rawChannel.Writer.TryComplete(); processedChannel.Writer.TryComplete(); + onClear?.Invoke(); } } + /// + /// Channel-writer wrapper that fires a callback on every successful write but + /// otherwise behaves identically to the inner writer. Used to tap the raw-frame + /// stream for stats without entangling the receiver with the stats API. + /// + private sealed class TappedChannelWriter : ChannelWriter + { + private readonly ChannelWriter _inner; + private readonly Action _onWrite; + public TappedChannelWriter(ChannelWriter inner, Action onWrite) { _inner = inner; _onWrite = onWrite; } + public override bool TryWrite(T item) + { + if (_inner.TryWrite(item)) { _onWrite(item); return true; } + return false; + } + public override ValueTask WaitToWriteAsync(CancellationToken ct = default) + => _inner.WaitToWriteAsync(ct); + public override bool TryComplete(Exception? error = null) => _inner.TryComplete(error); + } + private static async Task ProcessorLoopAsync(FrameProcessor processor, IFrameClock clock, CancellationToken ct) { while (!ct.IsCancellationRequested)