diff --git a/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs b/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs
new file mode 100644
index 0000000..511fb21
--- /dev/null
+++ b/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs
@@ -0,0 +1,198 @@
+using System.Threading.Channels;
+using Microsoft.Extensions.Logging;
+using TeamsISO.Engine.Domain;
+using TeamsISO.Engine.Interop;
+
+namespace TeamsISO.Engine.Pipeline;
+
+///
+/// Per-ISO unit. Owns one capture loop, one frame processor, one send loop, and the
+/// supervisor that restarts the inner pipeline with exponential backoff on failure.
+///
+public sealed class IsoPipeline : IAsyncDisposable
+{
+ private readonly Func _runInner;
+ private readonly ExponentialBackoff _backoff;
+ private readonly Func _delay;
+ private readonly ILogger _logger;
+
+ private CancellationTokenSource? _cts;
+ private Task? _supervisorTask;
+ private int _consecutiveFailures;
+
+ public Guid ParticipantId { get; }
+ public IsoState State { get; private set; } = IsoState.Idle;
+ public int ConsecutiveFailures => _consecutiveFailures;
+
+ ///
+ /// Test ctor. The caller supplies the inner runner directly so failures and lifetimes
+ /// can be controlled from a unit test.
+ ///
+ internal IsoPipeline(
+ Guid participantId,
+ Func runInner,
+ ExponentialBackoff backoff,
+ Func delay,
+ ILoggerFactory loggerFactory)
+ {
+ ParticipantId = participantId;
+ _runInner = runInner;
+ _backoff = backoff;
+ _delay = delay;
+ _logger = loggerFactory.CreateLogger();
+ }
+
+ ///
+ /// Production ctor. Builds the inner runner from the engine dependencies.
+ ///
+ public IsoPipeline(
+ IsoPipelineConfig config,
+ INdiInterop interop,
+ IFrameScaler scaler,
+ IFrameClock frameClock,
+ ExponentialBackoff backoff,
+ Func delay,
+ ILoggerFactory loggerFactory)
+ : this(config.ParticipantId,
+ ct => RunInnerPipelineAsync(config, interop, scaler, frameClock, loggerFactory, ct),
+ backoff,
+ delay,
+ loggerFactory) { }
+
+ /// Starts the supervisor. Returns immediately; pipeline runs in the background.
+ public Task StartAsync()
+ {
+ if (_supervisorTask is not null)
+ throw new InvalidOperationException("Pipeline already started.");
+ _cts = new CancellationTokenSource();
+ State = IsoState.Receiving;
+ _supervisorTask = SupervisorLoopAsync(_cts.Token);
+ return Task.CompletedTask;
+ }
+
+ /// Stops the pipeline and awaits supervisor completion.
+ public async Task StopAsync()
+ {
+ if (_cts is null) return;
+ _cts.Cancel();
+ if (_supervisorTask is not null)
+ {
+ try { await _supervisorTask; }
+ catch (OperationCanceledException) { /* expected */ }
+ }
+ State = IsoState.Idle;
+ _cts.Dispose();
+ _cts = null;
+ _supervisorTask = null;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await StopAsync();
+ }
+
+ private async Task SupervisorLoopAsync(CancellationToken ct)
+ {
+ _consecutiveFailures = 0;
+ while (!ct.IsCancellationRequested)
+ {
+ try
+ {
+ await _runInner(ct);
+ // Inner exited normally (typically only on cancellation) — leave the loop.
+ break;
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ _consecutiveFailures++;
+ _logger.LogWarning(ex,
+ "Pipeline {ParticipantId} failed (consecutive failure #{N}).",
+ ParticipantId, _consecutiveFailures);
+
+ if (_backoff.ShouldGiveUp(_consecutiveFailures))
+ {
+ State = IsoState.Error;
+ _logger.LogError("Pipeline {ParticipantId} giving up after {N} consecutive failures.",
+ ParticipantId, _consecutiveFailures);
+ return;
+ }
+
+ var wait = _backoff.NextDelay(_consecutiveFailures);
+ _logger.LogInformation("Pipeline {ParticipantId} retrying in {Delay}.", ParticipantId, wait);
+ try
+ {
+ await _delay(wait, ct);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ State = IsoState.Receiving;
+ }
+ }
+ }
+
+ ///
+ /// Default inner pipeline: spins up receiver → processor → sender on bounded channels
+ /// and awaits all three. Throws if any of them throws.
+ ///
+ private static async Task RunInnerPipelineAsync(
+ IsoPipelineConfig config,
+ INdiInterop interop,
+ IFrameScaler scaler,
+ IFrameClock frameClock,
+ ILoggerFactory loggerFactory,
+ CancellationToken ct)
+ {
+ var rawChannel = Channel.CreateBounded(new BoundedChannelOptions(config.RawChannelCapacity)
+ {
+ FullMode = BoundedChannelFullMode.DropOldest,
+ SingleReader = true,
+ SingleWriter = true,
+ });
+ var processedChannel = Channel.CreateBounded(new BoundedChannelOptions(config.ProcessedChannelCapacity)
+ {
+ FullMode = BoundedChannelFullMode.DropOldest,
+ SingleReader = true,
+ SingleWriter = true,
+ });
+
+ using var receiver = new NdiReceiver(
+ interop, config.SourceName, rawChannel.Writer, loggerFactory.CreateLogger());
+ using var sender = new NdiSender(
+ interop, config.OutputName, processedChannel.Reader, loggerFactory.CreateLogger());
+
+ var processor = new FrameProcessor(
+ config.Settings, scaler, new SolidFrameRenderer(),
+ frameClock, rawChannel.Reader, processedChannel.Writer,
+ config.SlateThreshold, loggerFactory.CreateLogger());
+
+ var receiverTask = receiver.RunAsync(ct);
+ var senderTask = sender.RunAsync(ct);
+ var processorTask = ProcessorLoopAsync(processor, frameClock, ct);
+
+ try
+ {
+ await Task.WhenAll(receiverTask, senderTask, processorTask);
+ }
+ finally
+ {
+ rawChannel.Writer.TryComplete();
+ processedChannel.Writer.TryComplete();
+ }
+ }
+
+ private static async Task ProcessorLoopAsync(FrameProcessor processor, IFrameClock clock, CancellationToken ct)
+ {
+ while (!ct.IsCancellationRequested)
+ {
+ var advanced = await clock.WaitForNextTickAsync(ct);
+ if (!advanced) break;
+ await processor.ProcessOnceAsync(ct);
+ }
+ }
+}
diff --git a/src/TeamsISO.Engine/Pipeline/IsoPipelineConfig.cs b/src/TeamsISO.Engine/Pipeline/IsoPipelineConfig.cs
new file mode 100644
index 0000000..272e57c
--- /dev/null
+++ b/src/TeamsISO.Engine/Pipeline/IsoPipelineConfig.cs
@@ -0,0 +1,23 @@
+using TeamsISO.Engine.Domain;
+
+namespace TeamsISO.Engine.Pipeline;
+
+///
+/// Per-pipeline configuration — identifies the participant, the source it captures,
+/// the output it emits, and the global processing settings to apply.
+///
+public sealed record IsoPipelineConfig(
+ Guid ParticipantId,
+ string SourceName,
+ string OutputName,
+ FrameProcessingSettings Settings)
+{
+ /// Default no-signal threshold per spec §4.
+ public TimeSpan SlateThreshold { get; init; } = TimeSpan.FromSeconds(2.5);
+
+ /// Bounded raw-frame channel capacity (drop-oldest backpressure).
+ public int RawChannelCapacity { get; init; } = 4;
+
+ /// Bounded processed-frame channel capacity.
+ public int ProcessedChannelCapacity { get; init; } = 2;
+}
diff --git a/src/TeamsISO.Engine/TeamsISO.Engine.csproj b/src/TeamsISO.Engine/TeamsISO.Engine.csproj
index d6fa014..310a2dd 100644
--- a/src/TeamsISO.Engine/TeamsISO.Engine.csproj
+++ b/src/TeamsISO.Engine/TeamsISO.Engine.csproj
@@ -1,17 +1,19 @@
-
-
-
- net8.0
- enable
- enable
-
-
+
+
+
+ net8.0
+
+
-
-
-
+
+
+
+
+
+
+
diff --git a/src/tests/TeamsISO.Engine.Tests/Pipeline/IsoPipelineTests.cs b/src/tests/TeamsISO.Engine.Tests/Pipeline/IsoPipelineTests.cs
new file mode 100644
index 0000000..765d844
--- /dev/null
+++ b/src/tests/TeamsISO.Engine.Tests/Pipeline/IsoPipelineTests.cs
@@ -0,0 +1,99 @@
+using Microsoft.Extensions.Logging.Abstractions;
+using TeamsISO.Engine.Domain;
+using TeamsISO.Engine.Pipeline;
+
+namespace TeamsISO.Engine.Tests.Pipeline;
+
+public class IsoPipelineTests
+{
+ private static ExponentialBackoff FastBackoff() =>
+ new(maxAttempts: 5, initial: TimeSpan.FromMilliseconds(1), cap: TimeSpan.FromMilliseconds(10));
+
+ private static Func NoDelay() =>
+ (_, _) => Task.CompletedTask;
+
+ [Fact]
+ public async Task StartAsync_TransitionsIdleToReceiving()
+ {
+ var participant = Guid.NewGuid();
+ var blocker = new TaskCompletionSource();
+ var pipeline = new IsoPipeline(
+ participant,
+ ct => blocker.Task.WaitAsync(ct),
+ FastBackoff(),
+ NoDelay(),
+ NullLoggerFactory.Instance);
+
+ await pipeline.StartAsync();
+
+ pipeline.State.Should().Be(IsoState.Receiving);
+
+ blocker.SetResult();
+ await pipeline.StopAsync();
+ }
+
+ [Fact]
+ public async Task StopAsync_TransitionsBackToIdle()
+ {
+ var blocker = new TaskCompletionSource();
+ var pipeline = new IsoPipeline(
+ Guid.NewGuid(),
+ ct => blocker.Task.WaitAsync(ct),
+ FastBackoff(),
+ NoDelay(),
+ NullLoggerFactory.Instance);
+
+ await pipeline.StartAsync();
+ await pipeline.StopAsync();
+
+ pipeline.State.Should().Be(IsoState.Idle);
+ }
+
+ [Fact]
+ public async Task Supervisor_FailsOnce_ThenRecovers_RemainsReceiving()
+ {
+ int callCount = 0;
+ var done = new TaskCompletionSource();
+ var pipeline = new IsoPipeline(
+ Guid.NewGuid(),
+ ct =>
+ {
+ callCount++;
+ if (callCount == 1) throw new InvalidOperationException("boom");
+ return done.Task.WaitAsync(ct);
+ },
+ FastBackoff(),
+ NoDelay(),
+ NullLoggerFactory.Instance);
+
+ await pipeline.StartAsync();
+ await Task.Delay(50); // let the supervisor cycle through the failure
+ callCount.Should().BeGreaterOrEqualTo(2);
+ pipeline.ConsecutiveFailures.Should().Be(1);
+ pipeline.State.Should().Be(IsoState.Receiving);
+
+ done.SetResult();
+ await pipeline.StopAsync();
+ }
+
+ [Fact]
+ public async Task Supervisor_FailsRepeatedly_TransitionsToError()
+ {
+ var pipeline = new IsoPipeline(
+ Guid.NewGuid(),
+ _ => throw new InvalidOperationException("permanent"),
+ FastBackoff(),
+ NoDelay(),
+ NullLoggerFactory.Instance);
+
+ await pipeline.StartAsync();
+
+ // Wait for supervisor to give up (max 5 attempts × ~1ms delay)
+ var deadline = DateTime.UtcNow.AddSeconds(2);
+ while (pipeline.State != IsoState.Error && DateTime.UtcNow < deadline)
+ await Task.Delay(20);
+
+ pipeline.State.Should().Be(IsoState.Error);
+ pipeline.ConsecutiveFailures.Should().Be(6); // 5 retries allowed, give up on 6th
+ }
+}