Compare commits
16 commits
0417aff3b1
...
fd391b5ca4
| Author | SHA1 | Date | |
|---|---|---|---|
| fd391b5ca4 | |||
| 8c9ab5db0c | |||
| 6eaf346d06 | |||
| 1be2c3489d | |||
| 73d4049893 | |||
| 671f64ca56 | |||
| b7afd0f08a | |||
| 927ccc6ced | |||
| c8bcf75227 | |||
| 49677fbd3d | |||
| de4b215123 | |||
| 8d60cbd333 | |||
| 07b6b43ab4 | |||
| 4d2f11d836 | |||
| 3abd4d8fd1 | |||
| 4f84c72c85 |
28 changed files with 2724 additions and 133 deletions
124
.forgejo/workflows/test.yml
Normal file
124
.forgejo/workflows/test.yml
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
# Forgejo Actions CI for Datarhei — Dragon Fork.
|
||||
#
|
||||
# Mirrors the upstream go-tests.yml shape (GitHub Actions syntax),
|
||||
# but pinned to Go 1.24 to match go.mod and adds the M3 race-detector
|
||||
# pass. The forgejo-runner picks this up automatically.
|
||||
#
|
||||
# Triggered on every push and pull request. Two jobs:
|
||||
# - lint-and-vet: cheap, fast feedback (~30s)
|
||||
# - test: full test suite with -race, ~3 minutes including
|
||||
# the integration tests in app/webrtc that bind UDP
|
||||
# sockets and run a real Pion handshake.
|
||||
|
||||
name: ci
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
- 'm[0-9]*-*'
|
||||
- 'fix/**'
|
||||
pull_request:
|
||||
|
||||
jobs:
|
||||
lint-and-vet:
|
||||
name: vet + build
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '1.24'
|
||||
cache: true
|
||||
|
||||
- name: go vet
|
||||
run: go vet ./...
|
||||
|
||||
- name: go build
|
||||
run: go build ./...
|
||||
|
||||
test:
|
||||
name: race tests
|
||||
runs-on: ubuntu-22.04
|
||||
needs: lint-and-vet
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '1.24'
|
||||
cache: true
|
||||
|
||||
# Integration tests need ephemeral UDP ports above 32768; the
|
||||
# default sysctl on ubuntu runners covers this, so no extra
|
||||
# setup is required.
|
||||
|
||||
- name: go test -race -short
|
||||
run: go test -race -short -count=1 ./...
|
||||
env:
|
||||
# The integration tests start Pion peers; tighten the timeout
|
||||
# so a flaky network-bound test never sits the whole job.
|
||||
GORACE: 'halt_on_error=1'
|
||||
|
||||
- name: go test (coverage, no race)
|
||||
# Race detector + coverage in one pass slows things meaningfully;
|
||||
# do them separately. This step's purpose is the coverage.out
|
||||
# artifact, not a second correctness signal.
|
||||
run: go test -coverprofile=coverage.out -covermode=atomic -count=1 ./...
|
||||
|
||||
- name: Upload coverage artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
if: success() || failure()
|
||||
with:
|
||||
name: coverage-go-${{ github.sha }}
|
||||
path: coverage.out
|
||||
if-no-files-found: warn
|
||||
retention-days: 14
|
||||
|
||||
# --- WebRTC subsystem-only smoke ---------------------------------
|
||||
# The 5-viewer fanout test catches the largest class of regressions
|
||||
# for the egress path. Promoted to its own job so a failure on the
|
||||
# WebRTC side reads cleanly in the actions log instead of being
|
||||
# buried among ~80 packages of unrelated Core tests.
|
||||
webrtc-smoke:
|
||||
name: WebRTC smoke (5-viewer fanout)
|
||||
runs-on: ubuntu-22.04
|
||||
needs: lint-and-vet
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '1.24'
|
||||
cache: true
|
||||
|
||||
- name: WebRTC integration tests (race)
|
||||
run: |
|
||||
go test -race -count=1 -v \
|
||||
-run 'TestIntegration_|TestSubsystem_TeardownHookFiresOnProcessStop|TestHandler_' \
|
||||
./app/webrtc/... ./core/webrtc/...
|
||||
|
||||
# --- Latency gate ----------------------------------------------------
|
||||
# Server-hop p95 latency check. Build-tagged so it doesn't run in the
|
||||
# default `go test ./...` invocation; this dedicated job exists to
|
||||
# catch regressions that would otherwise hide behind 'all tests pass'.
|
||||
# Threshold: p95 < 50ms (locally observed: sub-ms; gate is generous
|
||||
# to absorb CI runner noise without false alarms).
|
||||
latency-gate:
|
||||
name: WebRTC latency p95 gate
|
||||
runs-on: ubuntu-22.04
|
||||
needs: lint-and-vet
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '1.24'
|
||||
cache: true
|
||||
|
||||
- name: Server-hop latency p95 < 50ms
|
||||
run: |
|
||||
go test -tags latency -timeout 90s -race -count=1 \
|
||||
-run TestLatencyServerHop \
|
||||
./app/webrtc/... -v
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -15,6 +15,8 @@
|
|||
!/test/publish.sh
|
||||
!/test/whep-client/
|
||||
!/test/whep-client/**
|
||||
!/test/whep-player.html
|
||||
!/test/TESTING.md
|
||||
|
||||
*.ts
|
||||
*.ts.tmp
|
||||
|
|
@ -25,3 +27,4 @@
|
|||
*.flv
|
||||
|
||||
.VSCodeCounter
|
||||
whep-client
|
||||
|
|
|
|||
70
CHANGELOG.md
70
CHANGELOG.md
|
|
@ -1,4 +1,72 @@
|
|||
# Core
|
||||
# Datarhei — Dragon Fork
|
||||
|
||||
## v0.1.0-dragonfork (2026-05-03)
|
||||
|
||||
The first tagged Dragon Fork release. Forked from upstream datarhei
|
||||
Core v16.16.0; everything upstream does is preserved unchanged. New:
|
||||
WebRTC (WHEP) egress, integrated with the existing process supervisor.
|
||||
|
||||
### Added
|
||||
|
||||
- **WebRTC subsystem** under `app/webrtc/`, mirroring the shape of
|
||||
upstream's RTMP and SRT servers (Server interface, Echo handlers,
|
||||
process-graph hooks, admin endpoints).
|
||||
- **Per-process opt-in** via `config.webrtc.enabled` on every restream
|
||||
process; resolver auto-injects two RTP output legs and allocates
|
||||
loopback UDP ports.
|
||||
- **`POST /api/v3/whep/{id}`** — WebRTC-HTTP Egress Protocol subscribe.
|
||||
JWT-protected by the existing Core auth.
|
||||
- **`DELETE /api/v3/whep/{id}/{resource}`** — idempotent teardown
|
||||
(returns 204 even on unknown resource per WHEP spec).
|
||||
- **`PATCH /api/v3/whep/{id}/{resource}`** — trickle ICE.
|
||||
- **CORS preflight** on every WHEP route + `Access-Control-Expose-Headers`
|
||||
for `Location` and `ETag` so browser-side WHEP players work
|
||||
cross-origin.
|
||||
- **Configurable stream maps** via `webrtc.video_map` / `webrtc.audio_map`
|
||||
on the per-process config — defaults to `0:v:0` / `0:a:0` for
|
||||
RTMP/SRT publishers, overridable for multi-input pipelines.
|
||||
- **`webrtc.*` global config block** with `CORE_WEBRTC_*` env-var
|
||||
bindings parallel to RTMP and SRT.
|
||||
- **Admin API:** `GET /api/v3/webrtc/streams` + `/streams/{id}/peers`.
|
||||
- **Browser smoke player** at `test/whep-player.html` with ICE / codec
|
||||
/ bitrate diagnostics, JWT field, and `?url=&token=` shareable
|
||||
URLs.
|
||||
- **Server-hop latency p95 gate** in CI (`-tags latency`), enforced at
|
||||
50ms on the runner; locally observed p95 ≈ 240µs.
|
||||
- **TrueNAS deploy bundle** at `deploy/truenas/core/` — host-networked
|
||||
Docker stack with bundled FFmpeg, env-driven config.
|
||||
- **Multi-viewer correctness:** per-stream peer cap, ICE-failure
|
||||
auto-cleanup goroutines, process-stop broadcast tear-down.
|
||||
- **Error matrix:** 406 codec mismatch, 504 ICE timeout, 503 cap
|
||||
reached (separate body for total vs per-stream), 204 DELETE
|
||||
idempotent.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `Config.Clone()` now preserves the `WebRTC` section. Pre-fix,
|
||||
`cfg.WebRTC.Enable` was always zero at runtime regardless of
|
||||
`CORE_WEBRTC_ENABLE`. Caught on the first M2 TrueNAS deploy.
|
||||
- `http/api.ProcessConfig` Marshal/Unmarshal now carry the per-process
|
||||
`webrtc` block. Pre-fix, `POST /api/v3/process` silently dropped
|
||||
`webrtc.enabled=true` on its way to the restream config layer.
|
||||
|
||||
### Forking notes
|
||||
|
||||
- Module path stays `github.com/datarhei/core/v16` — internal imports
|
||||
don't churn, the fork is distinguished by repo location and branch
|
||||
history.
|
||||
- `cmd/webrtc-poc` from M1 is preserved as a manual-testing harness.
|
||||
Production deploys use the main `core` binary.
|
||||
|
||||
### Acknowledgements
|
||||
|
||||
Built on upstream Datarhei Core (Apache 2.0) and Pion WebRTC v4
|
||||
(MIT). Full attribution in `NOTICE` and `CREDITS`.
|
||||
|
||||
---
|
||||
|
||||
# Core (upstream)
|
||||
|
||||
|
||||
### Core v16.15.0 > v16.16.0
|
||||
|
||||
|
|
|
|||
47
CREDITS
Normal file
47
CREDITS
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
# Credits
|
||||
|
||||
Datarhei — Dragon Fork stands on the shoulders of the open-source
|
||||
projects below. Required-attribution notices and the corresponding
|
||||
licenses live in NOTICE and the per-vendor LICENSE files under
|
||||
vendor/.
|
||||
|
||||
## Direct ancestor
|
||||
|
||||
- **datarhei/core** (Apache-2.0) — the base codebase this fork tracks.
|
||||
https://github.com/datarhei/core
|
||||
|
||||
## Major Go dependencies
|
||||
|
||||
- **github.com/pion/webrtc/v4** (MIT) — the Go WebRTC stack the egress
|
||||
path is built on. https://github.com/pion/webrtc
|
||||
- **github.com/pion/rtp** (MIT) — RTP packet types.
|
||||
- **github.com/pion/dtls/v2** (MIT) — DTLS for SRTP key exchange.
|
||||
- **github.com/pion/ice/v3** (MIT) — ICE candidate gathering.
|
||||
- **github.com/pion/sdp/v3** (MIT) — SDP parsing.
|
||||
- **github.com/labstack/echo/v4** (MIT) — HTTP routing.
|
||||
- **github.com/swaggo/echo-swagger** (MIT) — OpenAPI / Swagger UI
|
||||
middleware.
|
||||
- **github.com/caddyserver/certmagic** (Apache-2.0) — Let's Encrypt
|
||||
TLS automation.
|
||||
- **github.com/datarhei/joy4** (Apache-2.0) — RTMP server primitives
|
||||
(forked from joy4).
|
||||
- **github.com/datarhei/gosrt** (Apache-2.0) — pure-Go SRT.
|
||||
- **go.uber.org/zap** (MIT) — structured logging.
|
||||
|
||||
## Subprocess
|
||||
|
||||
- **FFmpeg** (LGPL-2.1-or-later / GPL-2.0-or-later, build-flag
|
||||
dependent) — used as an out-of-process child by the `restream`
|
||||
subsystem for transcoding and RTP packetisation. Dragon Fork does
|
||||
not link against the FFmpeg libraries.
|
||||
|
||||
## Brand assets
|
||||
|
||||
- **"Wild Dragon" mark** — © Wild Dragon, used as the project mark
|
||||
for Dragon Fork builds.
|
||||
|
||||
## Full list
|
||||
|
||||
The complete dependency tree, including transitive dependencies and
|
||||
their licenses, is enumerated in `vendor/modules.txt` and the
|
||||
per-vendor LICENSE / COPYING files under `vendor/`.
|
||||
41
NOTICE
Normal file
41
NOTICE
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
Datarhei — Dragon Fork
|
||||
Copyright (c) 2026 Wild Dragon
|
||||
|
||||
This product includes software developed by datarhei.
|
||||
|
||||
datarhei Core
|
||||
Copyright (c) datarhei
|
||||
https://github.com/datarhei/core
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
not use this file except in compliance with the License. A copy of the
|
||||
License is in the LICENSE file at the root of this repository, and is
|
||||
also available at:
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied. See the License for the specific language governing
|
||||
permissions and limitations under the License.
|
||||
|
||||
This fork additionally bundles or depends on:
|
||||
|
||||
Pion WebRTC and related Pion libraries
|
||||
Copyright (c) The Pion authors
|
||||
https://github.com/pion
|
||||
MIT License
|
||||
|
||||
Echo HTTP framework
|
||||
Copyright (c) LabStack
|
||||
https://github.com/labstack/echo
|
||||
MIT License
|
||||
|
||||
FFmpeg (used as a subprocess by the restream subsystem; not linked)
|
||||
Copyright (c) The FFmpeg developers
|
||||
https://ffmpeg.org
|
||||
LGPL-2.1-or-later / GPL-2.0-or-later (build-flag dependent)
|
||||
|
||||
A complete list of dependencies and their licenses lives in the
|
||||
CREDITS file at the root of this repository.
|
||||
203
README.md
203
README.md
|
|
@ -1,92 +1,155 @@
|
|||
# Core
|
||||
# Datarhei — Dragon Fork
|
||||
|
||||

|
||||
A fork of [datarhei/core](https://github.com/datarhei/core) that adds a
|
||||
native **WebRTC (WHEP) egress** path. Everything upstream Datarhei
|
||||
already does — RTMP / SRT / RTSP ingest, FFmpeg process orchestration,
|
||||
HLS / DASH outputs, S3 mounts, the HTTP API and Swagger UI — works
|
||||
unchanged. WebRTC sits alongside as another output type, opt-in
|
||||
per process.
|
||||
|
||||
[](<[https://opensource.org/licenses/MI](https://www.apache.org/licenses/LICENSE-2.0)>)
|
||||
[](https://github.com/datarhei/core/actions/workflows/codeql-analysis.yml)
|
||||
[](https://github.com/datarhei/core/actions/workflows/go-tests.yml)
|
||||
[](https://codecov.io/gh/datarhei/core)
|
||||
[](https://goreportcard.com/report/github.com/datarhei/core)
|
||||
[](https://pkg.go.dev/github.com/datarhei/core)
|
||||
[](https://docs.datarhei.com/core/guides/beginner)
|
||||
```
|
||||
publisher (OBS / FFmpeg / SRT) ──▶ datarhei Core ──▶ WebRTC peers
|
||||
│ │ (1–5 viewers per stream)
|
||||
│ ├──▶ HLS / DASH (existing)
|
||||
│ ├──▶ RTMP relay (existing)
|
||||
└──▶ ingest (RTMP / SRT / …) └──▶ recording (existing)
|
||||
```
|
||||
|
||||
The datarhei Core is a process management solution for FFmpeg that offers a range of interfaces for media content, including HTTP, RTMP, SRT, and storage options. It is optimized for use in virtual environments such as Docker. It has been implemented in various contexts, from small-scale applications like Restreamer to large-scale, multi-instance frameworks spanning multiple locations, such as dedicated servers, cloud instances, and single-board computers. The datarhei Core stands out from traditional media servers by emphasizing FFmpeg and its capabilities rather than focusing on media conversion.
|
||||
Sub-second glass-to-glass on a LAN over WHEP, no SFU dependencies,
|
||||
single binary, single Docker image.
|
||||
|
||||
## Objectives of development
|
||||
> **Status:** M1–M4 complete, M5 (release) in flight. Live deploy
|
||||
> running on TrueNAS since 2026-04-17.
|
||||
|
||||
The objectives of development are:
|
||||
## What this fork adds
|
||||
|
||||
- Unhindered use of FFmpeg processes
|
||||
- Portability of FFmpeg, including management across development and production environments
|
||||
- Scalability of FFmpeg-based applications through the ability to offload processes to additional instances
|
||||
- Streamlining of media product development by focusing on features and design.
|
||||
- **`webrtc.*` config block** alongside `rtmp.*` and `srt.*`, with the
|
||||
same `CORE_*` env-var binding pattern.
|
||||
- **Per-process `webrtc.enabled` toggle** on the existing process
|
||||
config. Once true, Core auto-injects two RTP output legs (video +
|
||||
audio), allocates UDP ports, and the WHEP endpoint is live.
|
||||
- **`POST /api/v3/whep/{processID}`** — WebRTC-HTTP Egress Protocol
|
||||
subscribe; SDP offer in, SDP answer out. JWT-protected by the
|
||||
existing Core auth.
|
||||
- **`DELETE /api/v3/whep/{processID}/{resourceID}`** — idempotent
|
||||
teardown.
|
||||
- **`PATCH …/{resourceID}`** — trickle ICE.
|
||||
- **Browser-side smoke player** at `test/whep-player.html` —
|
||||
zero-dependency WHEP subscriber, ICE/codec/bitrate stats, JWT
|
||||
field, shareable `?url=&token=` URLs.
|
||||
- **Multi-viewer correctness:** per-stream peer cap, ICE-failure
|
||||
auto-cleanup, process-stop broadcast tear-down.
|
||||
- **Error matrix** per the design spec: `406` on codec mismatch,
|
||||
`504` on ICE timeout, `503` on cap, `204` on idempotent DELETE,
|
||||
CORS preflights on every WHEP route.
|
||||
|
||||
## What issues have been resolved thus far?
|
||||
|
||||
### Process management
|
||||
|
||||
- Run multiple processes via API
|
||||
- Unrestricted FFmpeg commands in process configuration.
|
||||
- Error detection and recovery (e.g., FFmpeg stalls, dumps)
|
||||
- Referencing for process chaining (pipelines)
|
||||
- Placeholders for storage, RTMP, and SRT usage (automatic credentials management and URL resolution)
|
||||
- Logs (access to current stdout/stderr)
|
||||
- Log history (configurable log history, e.g., for error analysis)
|
||||
- Resource limitation (max. CPU and MEMORY usage per process)
|
||||
- Statistics (like FFmpeg progress per input and output, CPU and MEMORY, state, uptime)
|
||||
- Input verification (like FFprobe)
|
||||
- Metadata (option to store additional information like a title)
|
||||
|
||||
### Media delivery
|
||||
|
||||
- Configurable file systems (in-memory, disk-mount, S3)
|
||||
- HTTP/S, RTMP/S, and SRT services, including Let's Encrypt
|
||||
- Bandwidth and session limiting for HLS/MPEG DASH sessions (protects restreams from congestion)
|
||||
- Viewer session API and logging
|
||||
|
||||
### Misc
|
||||
|
||||
- HTTP REST and GraphQL API
|
||||
- Swagger documentation
|
||||
- Metrics incl. Prometheus support (also detects POSIX and cgroups resources)
|
||||
- Docker images for fast setup of development environments up to the integration of cloud resources
|
||||
|
||||
## Docker images
|
||||
|
||||
- datarhei/core:latest (AMD64, ARM64, ARMv7)
|
||||
- datarhei/core:cuda-latest (Nvidia CUDA 11.7.1, AMD64)
|
||||
- datarhei/core:rpi-latest (Raspberry Pi / OMX/V4L2-M2M, AMD64/ARMv7)
|
||||
- datarhei/core:vaapi-latest (Intel VAAPI, AMD64)
|
||||
The existing upstream Datarhei feature set is intact — see "From
|
||||
upstream Datarhei" below.
|
||||
|
||||
## Quick start
|
||||
|
||||
1. Run the Docker image
|
||||
### Docker (TrueNAS / any host with Docker + LAN-reachable IP)
|
||||
|
||||
```sh
|
||||
docker run --name core -d \
|
||||
-e CORE_API_AUTH_USERNAME=admin \
|
||||
-e CORE_API_AUTH_PASSWORD=secret \
|
||||
-p 8080:8080 \
|
||||
-v ${HOME}/core/config:/core/config \
|
||||
-v ${HOME}/core/data:/core/data \
|
||||
datarhei/core:latest
|
||||
git clone https://forge.wilddragon.net/zgaetano/datarhei-dragonfork-core.git
|
||||
cd datarhei-dragonfork-core/deploy/truenas/core
|
||||
|
||||
cat > .env <<EOF
|
||||
PUBLIC_IP=10.0.0.25
|
||||
CORE_HTTP_PORT=8080
|
||||
API_AUTH_USERNAME=admin
|
||||
API_AUTH_PASSWORD=$(openssl rand -base64 24)
|
||||
API_AUTH_JWT_SECRET=$(openssl rand -base64 48)
|
||||
EOF
|
||||
|
||||
docker compose up -d --build
|
||||
```
|
||||
|
||||
2. Open Swagger
|
||||
http://host-ip:8080/api/swagger/index.html
|
||||
Then:
|
||||
|
||||
3. Log in with Swagger
|
||||
Authorize > Basic authorization > Username: admin, Password: secret
|
||||
- Swagger UI: `http://<host>:8080/api/swagger/index.html`
|
||||
- WHEP smoke player: open `test/whep-player.html` in a browser
|
||||
|
||||
### Sample process JSON
|
||||
|
||||
```json
|
||||
{
|
||||
"id": "live",
|
||||
"input": [
|
||||
{ "address": "{rtmp,name=live.stream}", "options": [] }
|
||||
],
|
||||
"output": [],
|
||||
"webrtc": { "enabled": true }
|
||||
}
|
||||
```
|
||||
|
||||
That's it. No `webrtc://` URL scheme to learn — the toggle on
|
||||
`config.webrtc.enabled` is the entire surface. The resolver allocates
|
||||
ports, injects `-f rtp udp://…` legs into the FFmpeg command, and the
|
||||
WHEP endpoint at `/api/v3/whep/live` becomes live the moment the
|
||||
process starts.
|
||||
|
||||
For multi-input pipelines (lavfi test sources, multi-camera switches,
|
||||
SDI + file audio), use the `video_map` and `audio_map` fields:
|
||||
|
||||
```json
|
||||
"webrtc": {
|
||||
"enabled": true,
|
||||
"video_map": "0:v:0",
|
||||
"audio_map": "1:a:0",
|
||||
"force_transcode": true
|
||||
}
|
||||
```
|
||||
|
||||
## Documentation
|
||||
|
||||
Documentation is available on [docs.datarhei.com/core](https://docs.datarhei.com/core).
|
||||
| Topic | Where |
|
||||
| ----- | ----- |
|
||||
| Design spec | [`docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md`](docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md) |
|
||||
| M1 (PoC) plan | [`docs/design/2026-04-16-datarhei-dragon-fork-m1-webrtc-poc.md`](docs/design/2026-04-16-datarhei-dragon-fork-m1-webrtc-poc.md) |
|
||||
| M2 (Core integration) spec | [`docs/design/2026-04-17-datarhei-dragon-fork-m2-webrtc-core-integration.md`](docs/design/2026-04-17-datarhei-dragon-fork-m2-webrtc-core-integration.md) |
|
||||
| Testing | [`test/TESTING.md`](test/TESTING.md) |
|
||||
| Changelog (Dragon Fork) | [`CHANGELOG.md`](CHANGELOG.md) |
|
||||
| Upstream Datarhei docs | [docs.datarhei.com/core](https://docs.datarhei.com/core) |
|
||||
|
||||
- [Quick start](https://docs.datarhei.com/core/guides/beginner)
|
||||
- [Installation](https://docs.datarhei.com/core/installation)
|
||||
- [Configuration](https://docs.datarhei.com/core/configuration)
|
||||
- [Coding](https://docs.datarhei.com/core/development/coding)
|
||||
## Building from source
|
||||
|
||||
Go 1.24 required (vendored).
|
||||
|
||||
```sh
|
||||
make release # cross-compiles linux/amd64 to ./core/core
|
||||
make test # full suite, race detector
|
||||
go test -tags latency -timeout 90s -count=1 \
|
||||
-run TestLatencyServerHop ./app/webrtc/... # latency p95 gate
|
||||
```
|
||||
|
||||
## From upstream Datarhei
|
||||
|
||||
This fork preserves everything upstream Datarhei Core does — Dragon
|
||||
Fork is purely additive. If a feature isn't WebRTC-related, the
|
||||
behaviour is unchanged from upstream and the upstream documentation
|
||||
applies as-is.
|
||||
|
||||
| Subsystem | Upstream feature set |
|
||||
| --- | --- |
|
||||
| Process management | API-driven FFmpeg, error detection / recovery, log history, resource limits, statistics, FFprobe input verification, process metadata |
|
||||
| Media delivery | HTTP/S, RTMP/S, SRT services with Let's Encrypt, configurable file systems (in-memory / disk / S3), HLS/DASH session limits, viewer session API |
|
||||
| Misc | HTTP REST + GraphQL, Swagger, Prometheus metrics, multi-arch Docker images |
|
||||
|
||||
## Attribution
|
||||
|
||||
Dragon Fork is built on:
|
||||
|
||||
- **datarhei Core** — Apache 2.0, © datarhei. The base repository this
|
||||
fork tracks. See [`NOTICE`](NOTICE) for the required attribution.
|
||||
- **Pion WebRTC** — MIT. The Go WebRTC stack the egress path is built
|
||||
on.
|
||||
- **FFmpeg** — LGPL / GPL (build-flag dependent). Used as a subprocess
|
||||
for transcoding and RTP packetisation; Dragon Fork doesn't link
|
||||
against it.
|
||||
|
||||
Full third-party credits in [`CREDITS`](CREDITS).
|
||||
|
||||
## License
|
||||
|
||||
datarhei/core is licensed under the Apache License 2.0
|
||||
Apache License 2.0 — same as upstream. See [`LICENSE`](LICENSE).
|
||||
|
|
|
|||
|
|
@ -219,6 +219,8 @@ func (a *api) Reload() error {
|
|||
|
||||
logfields := log.Fields{
|
||||
"application": app.Name,
|
||||
"variant": app.Variant,
|
||||
"fork": app.Fork,
|
||||
"version": app.Version.String(),
|
||||
"repository": "https://github.com/datarhei/core",
|
||||
"license": "Apache License Version 2.0",
|
||||
|
|
|
|||
|
|
@ -8,6 +8,19 @@ import (
|
|||
// Name of the app
|
||||
const Name = "datarhei-core"
|
||||
|
||||
// Variant distinguishes a Dragon Fork build from upstream Datarhei
|
||||
// Core in the startup banner and in the /api/v3/about endpoint
|
||||
// payload. Empty would imply an upstream build; we override the
|
||||
// linker default with the fork identity.
|
||||
//
|
||||
// Kept as a var (not const) so a downstream packager can override it
|
||||
// at build time via -ldflags="-X github.com/datarhei/core/v16/app.Variant=…"
|
||||
// without forking the source.
|
||||
var Variant = "dragonfork"
|
||||
|
||||
// Fork carries the human-readable fork name surfaced in logs.
|
||||
var Fork = "Datarhei — Dragon Fork"
|
||||
|
||||
type versionInfo struct {
|
||||
Major int
|
||||
Minor int
|
||||
|
|
|
|||
|
|
@ -32,7 +32,16 @@ func BuildArgs(cfg appcfg.ConfigWebRTC, videoPort int) []string {
|
|||
acopy = []string{"-c:a", "libopus", "-b:a", "96k"}
|
||||
}
|
||||
|
||||
args := []string{"-map", "0:v:0"}
|
||||
videoMap := cfg.VideoMap
|
||||
if videoMap == "" {
|
||||
videoMap = "0:v:0"
|
||||
}
|
||||
audioMap := cfg.AudioMap
|
||||
if audioMap == "" {
|
||||
audioMap = "0:a:0"
|
||||
}
|
||||
|
||||
args := []string{"-map", videoMap}
|
||||
args = append(args, vcopy...)
|
||||
args = append(args,
|
||||
"-payload_type", fmt.Sprint(cfg.VideoPT),
|
||||
|
|
@ -40,7 +49,7 @@ func BuildArgs(cfg appcfg.ConfigWebRTC, videoPort int) []string {
|
|||
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort),
|
||||
)
|
||||
|
||||
args = append(args, "-map", "0:a:0")
|
||||
args = append(args, "-map", audioMap)
|
||||
args = append(args, acopy...)
|
||||
args = append(args,
|
||||
"-payload_type", fmt.Sprint(cfg.AudioPT),
|
||||
|
|
|
|||
|
|
@ -87,3 +87,46 @@ func any(haystack []string, prefix string) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// TestBuildArgs_DefaultMaps confirms 0:v:0 / 0:a:0 are emitted when
|
||||
// VideoMap / AudioMap are empty (regression on the fix for issue #2 —
|
||||
// the prior version had these as hardcoded literals; if VideoMap is
|
||||
// ever empty unexpectedly, BuildArgs must still produce a working
|
||||
// command line).
|
||||
func TestBuildArgs_DefaultMaps(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}
|
||||
got := BuildArgs(cfg, 50000)
|
||||
if !contains(got, "-map", "0:v:0") {
|
||||
t.Fatalf("expected default video map 0:v:0, got %v", got)
|
||||
}
|
||||
if !contains(got, "-map", "0:a:0") {
|
||||
t.Fatalf("expected default audio map 0:a:0, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildArgs_CustomMaps drives the issue-#2 fix: when the user
|
||||
// configures a multi-input pipeline (audio on input #1, etc.), the
|
||||
// emitted -map values must follow the user's choice rather than the
|
||||
// "0:v:0"/"0:a:0" assumption.
|
||||
func TestBuildArgs_CustomMaps(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{
|
||||
Enabled: true,
|
||||
VideoPT: 102,
|
||||
AudioPT: 111,
|
||||
VideoMap: "0:v:1",
|
||||
AudioMap: "1:a:0",
|
||||
}
|
||||
got := BuildArgs(cfg, 50000)
|
||||
if !contains(got, "-map", "0:v:1") {
|
||||
t.Fatalf("expected custom video map 0:v:1, got %v", got)
|
||||
}
|
||||
if !contains(got, "-map", "1:a:0") {
|
||||
t.Fatalf("expected custom audio map 1:a:0, got %v", got)
|
||||
}
|
||||
// The default literals should NOT appear when overridden.
|
||||
for _, opt := range got {
|
||||
if opt == "0:v:0" || opt == "0:a:0" {
|
||||
t.Errorf("expected no default maps in output, found %q in %v", opt, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,54 +13,107 @@ import (
|
|||
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||
)
|
||||
|
||||
// Default per-stream peer cap when the caller passes 0. The total cap
|
||||
// (passed to NewHandler) is enforced separately and takes precedence.
|
||||
const defaultMaxPeersPerStream = 8
|
||||
|
||||
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into
|
||||
// the /api/v3 group (or a sibling group) via Handler.Register.
|
||||
//
|
||||
// Lifecycle: peers are tracked in a streamID→resourceID→Peer index.
|
||||
// On every Subscribe we spin a tiny goroutine watching the new peer's
|
||||
// Done() channel; when ICE fails or Close() runs the index entry is
|
||||
// removed and the counters tick back down — no leaks if the browser
|
||||
// rage-quits.
|
||||
type Handler struct {
|
||||
sub *Subsystem
|
||||
|
||||
peersMu sync.Mutex
|
||||
peers map[string]*corewebrtc.Peer // resourceID -> peer
|
||||
count int64 // atomic, for cap check without lock
|
||||
maxCap int64
|
||||
mu sync.Mutex
|
||||
peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer
|
||||
peerStream map[string]string // resource -> streamID (reverse index)
|
||||
count int64 // atomic
|
||||
maxCapTotal int64
|
||||
maxCapPerStrm int64
|
||||
}
|
||||
|
||||
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
|
||||
// The maxPeers argument caps concurrent subscribers; pass 0 to use a
|
||||
// generous default (matches corewebrtc.DefaultConfig).
|
||||
// The maxPeers argument caps concurrent subscribers across all streams;
|
||||
// pass 0 to use a generous default (matches corewebrtc.DefaultConfig).
|
||||
// The per-stream cap is taken from the corewebrtc default; pass a
|
||||
// non-zero value to override via NewHandlerWithCaps.
|
||||
func NewHandler(s *Subsystem, maxPeers int) *Handler {
|
||||
cap := int64(maxPeers)
|
||||
if cap <= 0 {
|
||||
cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
|
||||
}
|
||||
return &Handler{
|
||||
sub: s,
|
||||
peers: make(map[string]*corewebrtc.Peer),
|
||||
maxCap: cap,
|
||||
}
|
||||
return NewHandlerWithCaps(s, maxPeers, 0)
|
||||
}
|
||||
|
||||
// Register mounts the WHEP routes on the provided Echo group. WHEP
|
||||
// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource.
|
||||
// NewHandlerWithCaps is NewHandler plus an explicit per-stream cap.
|
||||
// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream.
|
||||
func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler {
|
||||
total := int64(maxPeers)
|
||||
if total <= 0 {
|
||||
total = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
|
||||
}
|
||||
perStream := int64(maxPeersPerStream)
|
||||
if perStream <= 0 {
|
||||
perStream = defaultMaxPeersPerStream
|
||||
}
|
||||
h := &Handler{
|
||||
sub: s,
|
||||
peersByStream: make(map[string]map[string]*corewebrtc.Peer),
|
||||
peerStream: make(map[string]string),
|
||||
maxCapTotal: total,
|
||||
maxCapPerStrm: perStream,
|
||||
}
|
||||
// Subsystem broadcasts process-stop via this hook so the handler
|
||||
// can yank stale peer entries before their Sources close out
|
||||
// from underneath them.
|
||||
if s != nil {
|
||||
s.SetTeardownHook(h.tearDownStreamPeers)
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
// Register mounts the WHEP routes on the provided Echo group.
|
||||
//
|
||||
// The routes are deliberately unauthenticated in M2 because WHEP
|
||||
// clients (browsers, OBS) don't carry the Core JWT. M3 will add
|
||||
// per-process signed-URL tokens; for M2 the deployment is expected
|
||||
// to put the endpoint behind an authenticated reverse-proxy or VPN.
|
||||
// CORS preflights are answered on every WHEP path; regular WHEP
|
||||
// responses also carry the Access-Control-* headers so browser-side
|
||||
// players living on a different origin can subscribe.
|
||||
func (h *Handler) Register(g *echo.Group) {
|
||||
g.OPTIONS("/whep/:id", h.preflight)
|
||||
g.OPTIONS("/whep/:id/:resource", h.preflight)
|
||||
g.POST("/whep/:id", h.Subscribe)
|
||||
g.DELETE("/whep/:id/:resource", h.Unsubscribe)
|
||||
g.PATCH("/whep/:id/:resource", h.Trickle)
|
||||
}
|
||||
|
||||
// Subscribe handles POST /whep/:id. Request body is an SDP offer,
|
||||
// response is an SDP answer with a Location header pointing at the
|
||||
// DELETE resource.
|
||||
// DELETE/PATCH resource.
|
||||
//
|
||||
// @Summary Subscribe to a WebRTC stream via WHEP
|
||||
// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.
|
||||
// @Tags v16.16.0
|
||||
// @ID webrtc-3-whep-subscribe
|
||||
// @Accept application/sdp
|
||||
// @Produce application/sdp
|
||||
// @Param id path string true "Process ID with config.webrtc.enabled=true"
|
||||
// @Success 201 {string} string "SDP answer"
|
||||
// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP"
|
||||
// @Failure 404 {string} string "no stream registered for this process id"
|
||||
// @Failure 406 {string} string "offer SDP missing required H264 / Opus rtpmap"
|
||||
// @Failure 503 {string} string "peer cap reached (per-stream or total)"
|
||||
// @Failure 504 {string} string "ICE gathering timeout"
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/whep/{id} [post]
|
||||
func (h *Handler) Subscribe(c echo.Context) error {
|
||||
addCORS(c)
|
||||
|
||||
id := c.Param("id")
|
||||
if id == "" {
|
||||
return c.String(http.StatusBadRequest, "missing stream id")
|
||||
}
|
||||
|
||||
if atomic.LoadInt64(&h.count) >= h.maxCap {
|
||||
// Total cap: cheap atomic check before doing real work.
|
||||
if atomic.LoadInt64(&h.count) >= h.maxCapTotal {
|
||||
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
|
||||
}
|
||||
|
||||
|
|
@ -69,6 +122,14 @@ func (h *Handler) Subscribe(c echo.Context) error {
|
|||
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
|
||||
}
|
||||
|
||||
// Per-stream cap: needs the lock since we're indexing per stream.
|
||||
h.mu.Lock()
|
||||
if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm {
|
||||
h.mu.Unlock()
|
||||
return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached")
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
body, err := io.ReadAll(c.Request().Body)
|
||||
if err != nil {
|
||||
return c.String(http.StatusBadRequest, "read body: "+err.Error())
|
||||
|
|
@ -76,59 +137,251 @@ func (h *Handler) Subscribe(c echo.Context) error {
|
|||
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
|
||||
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
|
||||
}
|
||||
if err := requireH264AndOpus(string(body)); err != nil {
|
||||
return c.String(http.StatusNotAcceptable, err.Error())
|
||||
}
|
||||
|
||||
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
|
||||
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
|
||||
if err != nil {
|
||||
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
|
||||
// Surface the design's error matrix.
|
||||
switch err {
|
||||
case corewebrtc.ErrICETimeout:
|
||||
return c.String(http.StatusGatewayTimeout, err.Error())
|
||||
case corewebrtc.ErrCodecMismatch:
|
||||
return c.String(http.StatusNotAcceptable, err.Error())
|
||||
default:
|
||||
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
h.peersMu.Lock()
|
||||
h.peers[peer.ResourceID()] = peer
|
||||
h.peersMu.Unlock()
|
||||
rid := peer.ResourceID()
|
||||
h.mu.Lock()
|
||||
if h.peersByStream[id] == nil {
|
||||
h.peersByStream[id] = make(map[string]*corewebrtc.Peer)
|
||||
}
|
||||
h.peersByStream[id][rid] = peer
|
||||
h.peerStream[rid] = id
|
||||
h.mu.Unlock()
|
||||
atomic.AddInt64(&h.count, 1)
|
||||
|
||||
// Auto-cleanup: when Pion's OnConnectionStateChange triggers
|
||||
// peer.Close() (ICE failed/disconnected), the Done channel
|
||||
// closes and we yank the index entry. Without this the map
|
||||
// leaks for the lifetime of the handler.
|
||||
go h.awaitPeerClose(rid, peer)
|
||||
|
||||
c.Response().Header().Set("Content-Type", "application/sdp")
|
||||
c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID())
|
||||
c.Response().Header().Set("Location", "/whep/"+id+"/"+rid)
|
||||
c.Response().Header().Set("ETag", `"`+rid+`"`)
|
||||
return c.String(http.StatusCreated, peer.Answer().SDP)
|
||||
}
|
||||
|
||||
// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of
|
||||
// the path per WHEP spec but we only need :resource to locate the
|
||||
// peer; :id is accepted for route symmetry.
|
||||
// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we
|
||||
// return 204 even when the resource is unknown — DELETE is idempotent
|
||||
// and a re-issued tear-down should never error out.
|
||||
//
|
||||
// @Summary Tear down a WHEP subscription
|
||||
// @Description Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.
|
||||
// @Tags v16.16.0
|
||||
// @ID webrtc-3-whep-unsubscribe
|
||||
// @Param id path string true "Process ID"
|
||||
// @Param resource path string true "Resource ID from the Subscribe Location header"
|
||||
// @Success 204 "no content"
|
||||
// @Failure 400 {string} string "missing resource id"
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/whep/{id}/{resource} [delete]
|
||||
func (h *Handler) Unsubscribe(c echo.Context) error {
|
||||
addCORS(c)
|
||||
|
||||
resource := c.Param("resource")
|
||||
if resource == "" {
|
||||
return c.String(http.StatusBadRequest, "missing resource id")
|
||||
}
|
||||
|
||||
h.peersMu.Lock()
|
||||
peer, ok := h.peers[resource]
|
||||
if ok {
|
||||
delete(h.peers, resource)
|
||||
h.mu.Lock()
|
||||
streamID := h.peerStream[resource]
|
||||
var peer *corewebrtc.Peer
|
||||
if streamID != "" {
|
||||
peer = h.peersByStream[streamID][resource]
|
||||
delete(h.peersByStream[streamID], resource)
|
||||
if len(h.peersByStream[streamID]) == 0 {
|
||||
delete(h.peersByStream, streamID)
|
||||
}
|
||||
delete(h.peerStream, resource)
|
||||
}
|
||||
h.peersMu.Unlock()
|
||||
h.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
if peer != nil {
|
||||
_ = peer.Close()
|
||||
}
|
||||
if streamID != "" {
|
||||
atomic.AddInt64(&h.count, -1)
|
||||
}
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates
|
||||
// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients
|
||||
// signal end-of-candidates via an a=end-of-candidates line, which
|
||||
// AddICECandidate accepts).
|
||||
//
|
||||
// @Summary Trickle ICE candidates for a WHEP subscription
|
||||
// @Description Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.
|
||||
// @Tags v16.16.0
|
||||
// @ID webrtc-3-whep-trickle
|
||||
// @Accept application/trickle-ice-sdpfrag
|
||||
// @Param id path string true "Process ID"
|
||||
// @Param resource path string true "Resource ID from the Subscribe Location header"
|
||||
// @Success 204 "no content"
|
||||
// @Failure 400 {string} string "missing resource id or unreadable body"
|
||||
// @Failure 404 {string} string "peer not found"
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/whep/{id}/{resource} [patch]
|
||||
func (h *Handler) Trickle(c echo.Context) error {
|
||||
addCORS(c)
|
||||
|
||||
resource := c.Param("resource")
|
||||
if resource == "" {
|
||||
return c.String(http.StatusBadRequest, "missing resource id")
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
streamID := h.peerStream[resource]
|
||||
var peer *corewebrtc.Peer
|
||||
if streamID != "" {
|
||||
peer = h.peersByStream[streamID][resource]
|
||||
}
|
||||
h.mu.Unlock()
|
||||
if peer == nil {
|
||||
return c.NoContent(http.StatusNotFound)
|
||||
}
|
||||
_ = peer.Close()
|
||||
atomic.AddInt64(&h.count, -1)
|
||||
|
||||
body, err := io.ReadAll(c.Request().Body)
|
||||
if err != nil {
|
||||
return c.String(http.StatusBadRequest, "read body: "+err.Error())
|
||||
}
|
||||
for _, line := range strings.Split(string(body), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if !strings.HasPrefix(line, "a=candidate:") {
|
||||
continue
|
||||
}
|
||||
cand := strings.TrimPrefix(line, "a=")
|
||||
_ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand})
|
||||
}
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// preflight answers a CORS OPTIONS request; the headers are also
|
||||
// echoed on every other response.
|
||||
func (h *Handler) preflight(c echo.Context) error {
|
||||
addCORS(c)
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Close tears down every active peer (e.g., during Core shutdown).
|
||||
func (h *Handler) Close() {
|
||||
h.peersMu.Lock()
|
||||
peers := make([]*corewebrtc.Peer, 0, len(h.peers))
|
||||
for _, p := range h.peers {
|
||||
peers = append(peers, p)
|
||||
h.mu.Lock()
|
||||
peers := make([]*corewebrtc.Peer, 0)
|
||||
for _, m := range h.peersByStream {
|
||||
for _, p := range m {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
}
|
||||
h.peers = make(map[string]*corewebrtc.Peer)
|
||||
h.peersMu.Unlock()
|
||||
h.peersByStream = make(map[string]map[string]*corewebrtc.Peer)
|
||||
h.peerStream = make(map[string]string)
|
||||
h.mu.Unlock()
|
||||
|
||||
for _, p := range peers {
|
||||
_ = p.Close()
|
||||
if p != nil {
|
||||
_ = p.Close()
|
||||
}
|
||||
}
|
||||
atomic.StoreInt64(&h.count, 0)
|
||||
}
|
||||
|
||||
// awaitPeerClose blocks on peer.Done() and yanks the index entry when
|
||||
// the peer self-closes (ICE failed/disconnected). Idempotent with
|
||||
// the Unsubscribe path: if Unsubscribe ran first the index is already
|
||||
// empty and we just decrement the counter once on first arrival.
|
||||
func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) {
|
||||
<-peer.Done()
|
||||
h.mu.Lock()
|
||||
streamID := h.peerStream[resource]
|
||||
_, present := h.peerStream[resource]
|
||||
if present {
|
||||
delete(h.peerStream, resource)
|
||||
if streamID != "" {
|
||||
delete(h.peersByStream[streamID], resource)
|
||||
if len(h.peersByStream[streamID]) == 0 {
|
||||
delete(h.peersByStream, streamID)
|
||||
}
|
||||
}
|
||||
}
|
||||
h.mu.Unlock()
|
||||
if present {
|
||||
atomic.AddInt64(&h.count, -1)
|
||||
}
|
||||
}
|
||||
|
||||
// tearDownStreamPeers is the callback the Subsystem runs in its
|
||||
// onProcessStop hook. It closes every peer subscribed to that
|
||||
// stream (driving each one's Done() and indirectly awaitPeerClose).
|
||||
func (h *Handler) tearDownStreamPeers(streamID string) {
|
||||
h.mu.Lock()
|
||||
bucket := h.peersByStream[streamID]
|
||||
peers := make([]*corewebrtc.Peer, 0, len(bucket))
|
||||
for _, p := range bucket {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
for _, p := range peers {
|
||||
if p != nil {
|
||||
_ = p.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addCORS emits the response headers a browser-side WHEP player
|
||||
// expects. WHEP's Location and ETag headers must be exposed for
|
||||
// fetch() to read them across origins.
|
||||
func addCORS(c echo.Context) {
|
||||
hh := c.Response().Header()
|
||||
hh.Set("Access-Control-Allow-Origin", "*")
|
||||
hh.Set("Access-Control-Allow-Methods", "POST, DELETE, PATCH, OPTIONS")
|
||||
hh.Set("Access-Control-Allow-Headers", "Content-Type, Authorization, If-Match, If-None-Match")
|
||||
hh.Set("Access-Control-Expose-Headers", "Location, ETag")
|
||||
}
|
||||
|
||||
// requireH264AndOpus does a coarse SDP scan to confirm the offer
|
||||
// includes both an H.264 video rtpmap and an Opus audio rtpmap. The
|
||||
// design treats codec mismatch as a 406, never a silent black frame.
|
||||
//
|
||||
// This is intentionally a string scan rather than a full SDP parse:
|
||||
// every modern browser advertises H.264 and Opus by name, and a
|
||||
// dependency on a real SDP parser for one validation step is
|
||||
// disproportionate. M4 may swap this for pion/sdp.v3 when other
|
||||
// surfaces also need parsing.
|
||||
func requireH264AndOpus(sdp string) error {
|
||||
lower := strings.ToLower(sdp)
|
||||
hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/")
|
||||
hasOpus := strings.Contains(lower, "opus/48000") || strings.Contains(lower, " opus/")
|
||||
if hasH264 && hasOpus {
|
||||
return nil
|
||||
}
|
||||
missing := []string{}
|
||||
if !hasH264 {
|
||||
missing = append(missing, "H264")
|
||||
}
|
||||
if !hasOpus {
|
||||
missing = append(missing, "Opus")
|
||||
}
|
||||
return &codecMismatchError{missing: missing}
|
||||
}
|
||||
|
||||
type codecMismatchError struct{ missing []string }
|
||||
|
||||
func (e *codecMismatchError) Error() string {
|
||||
return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ")
|
||||
}
|
||||
|
|
|
|||
251
app/webrtc/handler_m3_test.go
Normal file
251
app/webrtc/handler_m3_test.go
Normal file
|
|
@ -0,0 +1,251 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
|
||||
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||
)
|
||||
|
||||
// minimalH264OpusOffer returns an SDP offer that includes both H264
|
||||
// and Opus rtpmap lines — passes requireH264AndOpus but is otherwise
|
||||
// nonsense, so CreatePeerFromSources will fail downstream when this
|
||||
// is wired through. Use it only in tests that don't reach the
|
||||
// PeerConnection path.
|
||||
func minimalH264OpusOffer() string {
|
||||
return "v=0\r\n" +
|
||||
"o=- 0 0 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\n" +
|
||||
"m=video 9 UDP/TLS/RTP/SAVPF 102\r\n" +
|
||||
"a=rtpmap:102 H264/90000\r\n" +
|
||||
"m=audio 9 UDP/TLS/RTP/SAVPF 111\r\n" +
|
||||
"a=rtpmap:111 opus/48000/2\r\n"
|
||||
}
|
||||
|
||||
// nonH264Offer is missing H264 entirely. Triggers requireH264AndOpus.
|
||||
func nonH264Offer() string {
|
||||
return "v=0\r\n" +
|
||||
"m=video 9 UDP/TLS/RTP/SAVPF 96\r\n" +
|
||||
"a=rtpmap:96 VP8/90000\r\n" +
|
||||
"m=audio 9 UDP/TLS/RTP/SAVPF 111\r\n" +
|
||||
"a=rtpmap:111 opus/48000/2\r\n"
|
||||
}
|
||||
|
||||
// TestHandler_Subscribe_406OnCodecMismatch verifies an offer that
|
||||
// doesn't include H264 yields 406, per the design's error matrix.
|
||||
func TestHandler_Subscribe_406OnCodecMismatch(t *testing.T) {
|
||||
sub := newTestSubsystem(t)
|
||||
sub.mu.Lock()
|
||||
sub.streams["s"] = &processStream{id: "s"}
|
||||
sub.mu.Unlock()
|
||||
h := NewHandler(sub, 0)
|
||||
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(nonH264Offer()))
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id")
|
||||
c.SetParamValues("s")
|
||||
|
||||
if err := h.Subscribe(c); err != nil {
|
||||
t.Fatalf("Subscribe: %v", err)
|
||||
}
|
||||
if rec.Code != http.StatusNotAcceptable {
|
||||
t.Fatalf("expected 406, got %d: %s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if !strings.Contains(rec.Body.String(), "H264") {
|
||||
t.Errorf("body should mention missing codec: %q", rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandler_Subscribe_503OnTotalCap simulates the total cap being
|
||||
// exhausted by another subscriber. We don't actually create real peers
|
||||
// (would need a real PeerConnection); instead we pre-load the atomic
|
||||
// counter so the cap check fires.
|
||||
func TestHandler_Subscribe_503OnTotalCap(t *testing.T) {
|
||||
sub := newTestSubsystem(t)
|
||||
sub.mu.Lock()
|
||||
sub.streams["s"] = &processStream{id: "s"}
|
||||
sub.mu.Unlock()
|
||||
h := NewHandlerWithCaps(sub, 1, 100)
|
||||
atomic.StoreInt64(&h.count, 1) // simulate one in-flight peer
|
||||
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(minimalH264OpusOffer()))
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id")
|
||||
c.SetParamValues("s")
|
||||
_ = h.Subscribe(c)
|
||||
if rec.Code != http.StatusServiceUnavailable {
|
||||
t.Fatalf("expected 503, got %d: %s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if !strings.Contains(rec.Body.String(), corewebrtc.ErrPeerCapReached.Error()) {
|
||||
t.Errorf("body should mention peer cap: %q", rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandler_Subscribe_503OnPerStreamCap simulates the per-stream cap
|
||||
// being exhausted. Same trick as above but populating the per-stream
|
||||
// index directly.
|
||||
func TestHandler_Subscribe_503OnPerStreamCap(t *testing.T) {
|
||||
sub := newTestSubsystem(t)
|
||||
sub.mu.Lock()
|
||||
sub.streams["s"] = &processStream{id: "s"}
|
||||
sub.mu.Unlock()
|
||||
h := NewHandlerWithCaps(sub, 100, 1)
|
||||
// Drop a placeholder peer into the per-stream bucket so the cap
|
||||
// arithmetic trips on the next subscribe.
|
||||
h.mu.Lock()
|
||||
h.peersByStream["s"] = map[string]*corewebrtc.Peer{"existing": nil}
|
||||
h.mu.Unlock()
|
||||
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(minimalH264OpusOffer()))
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id")
|
||||
c.SetParamValues("s")
|
||||
_ = h.Subscribe(c)
|
||||
if rec.Code != http.StatusServiceUnavailable {
|
||||
t.Fatalf("expected 503, got %d: %s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if !strings.Contains(rec.Body.String(), "per-stream") {
|
||||
t.Errorf("body should mention per-stream cap: %q", rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandler_Trickle_404WhenUnknown verifies a PATCH for an unknown
|
||||
// resource returns 404 (we still treat the resource as authoritative
|
||||
// here; only DELETE is idempotent per spec).
|
||||
func TestHandler_Trickle_404WhenUnknown(t *testing.T) {
|
||||
h := NewHandler(newTestSubsystem(t), 0)
|
||||
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodPatch, "/whep/id/unknown", strings.NewReader(""))
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id", "resource")
|
||||
c.SetParamValues("id", "unknown")
|
||||
|
||||
if err := h.Trickle(c); err != nil {
|
||||
t.Fatalf("Trickle: %v", err)
|
||||
}
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandler_PreflightCORS verifies OPTIONS returns 204 with the
|
||||
// browser-friendly CORS headers.
|
||||
func TestHandler_PreflightCORS(t *testing.T) {
|
||||
h := NewHandler(newTestSubsystem(t), 0)
|
||||
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodOptions, "/whep/x", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id")
|
||||
c.SetParamValues("x")
|
||||
|
||||
if err := h.preflight(c); err != nil {
|
||||
t.Fatalf("preflight: %v", err)
|
||||
}
|
||||
if rec.Code != http.StatusNoContent {
|
||||
t.Fatalf("expected 204, got %d", rec.Code)
|
||||
}
|
||||
hh := rec.Header()
|
||||
for _, k := range []string{
|
||||
"Access-Control-Allow-Origin",
|
||||
"Access-Control-Allow-Methods",
|
||||
"Access-Control-Allow-Headers",
|
||||
"Access-Control-Expose-Headers",
|
||||
} {
|
||||
if hh.Get(k) == "" {
|
||||
t.Errorf("missing CORS header %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandler_RegisterMountsAllRoutes is a sanity check that
|
||||
// Handler.Register installs OPTIONS / POST / DELETE / PATCH on the
|
||||
// expected paths. Echo's Group has no public route enumerator, so we
|
||||
// dispatch synthetic requests and assert the right methods are
|
||||
// reachable.
|
||||
func TestHandler_RegisterMountsAllRoutes(t *testing.T) {
|
||||
h := NewHandler(newTestSubsystem(t), 0)
|
||||
e := echo.New()
|
||||
g := e.Group("")
|
||||
h.Register(g)
|
||||
|
||||
cases := []struct {
|
||||
method, path string
|
||||
want int
|
||||
}{
|
||||
{http.MethodOptions, "/whep/foo", http.StatusNoContent},
|
||||
{http.MethodOptions, "/whep/foo/bar", http.StatusNoContent},
|
||||
{http.MethodPost, "/whep/foo", http.StatusNotFound}, // stream missing -> 404
|
||||
{http.MethodDelete, "/whep/foo/bar", http.StatusNoContent},
|
||||
{http.MethodPatch, "/whep/foo/bar", http.StatusNotFound},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
req := httptest.NewRequest(tc.method, tc.path, strings.NewReader(""))
|
||||
rec := httptest.NewRecorder()
|
||||
e.ServeHTTP(rec, req)
|
||||
if rec.Code != tc.want {
|
||||
t.Errorf("%s %s: got %d want %d (%s)", tc.method, tc.path, rec.Code, tc.want, rec.Body.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandler_Close_DrainsPeers seeds a fake peer into the index and
|
||||
// verifies Close clears it without panicking.
|
||||
func TestHandler_Close_DrainsPeers(t *testing.T) {
|
||||
h := NewHandler(newTestSubsystem(t), 0)
|
||||
h.mu.Lock()
|
||||
h.peersByStream["s"] = map[string]*corewebrtc.Peer{"r1": nil}
|
||||
h.peerStream["r1"] = "s"
|
||||
atomic.StoreInt64(&h.count, 1)
|
||||
h.mu.Unlock()
|
||||
|
||||
h.Close()
|
||||
if got := atomic.LoadInt64(&h.count); got != 0 {
|
||||
t.Errorf("count after Close = %d, want 0", got)
|
||||
}
|
||||
h.mu.Lock()
|
||||
if len(h.peersByStream) != 0 || len(h.peerStream) != 0 {
|
||||
t.Errorf("indexes not cleared")
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
// TestRequireH264AndOpus covers the SDP scanner's positive +
|
||||
// negative cases.
|
||||
func TestRequireH264AndOpus(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
sdp string
|
||||
ok bool
|
||||
}{
|
||||
{"both", minimalH264OpusOffer(), true},
|
||||
{"missing h264", nonH264Offer(), false},
|
||||
{"missing opus", "m=video 9 UDP/TLS/RTP/SAVPF 102\r\na=rtpmap:102 H264/90000\r\n", false},
|
||||
{"capitalized", "a=rtpmap:111 OPUS/48000\r\na=rtpmap:102 H264/90000", true},
|
||||
{"empty", "", false},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
err := requireH264AndOpus(c.sdp)
|
||||
if c.ok && err != nil {
|
||||
t.Errorf("expected ok, got %v", err)
|
||||
}
|
||||
if !c.ok && err == nil {
|
||||
t.Errorf("expected error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -68,9 +68,11 @@ func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an
|
||||
// unknown resource id returns 404 and no state mutation.
|
||||
func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
|
||||
// TestHandler_Unsubscribe_204WhenUnknown verifies a DELETE with an
|
||||
// unknown resource id returns 204 (idempotent), per the WHEP spec
|
||||
// and the M2/M3 design's error matrix. Pre-M3 this returned 404; the
|
||||
// updated semantics let clients re-issue DELETE without erroring.
|
||||
func TestHandler_Unsubscribe_204WhenUnknown(t *testing.T) {
|
||||
h := NewHandler(newTestSubsystem(t), 0)
|
||||
|
||||
e := echo.New()
|
||||
|
|
@ -83,7 +85,7 @@ func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
|
|||
if err := h.Unsubscribe(c); err != nil {
|
||||
t.Fatalf("Unsubscribe returned error: %v", err)
|
||||
}
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %d", rec.Code)
|
||||
if rec.Code != http.StatusNoContent {
|
||||
t.Fatalf("expected 204, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
289
app/webrtc/latency_test.go
Normal file
289
app/webrtc/latency_test.go
Normal file
|
|
@ -0,0 +1,289 @@
|
|||
//go:build latency
|
||||
// +build latency
|
||||
|
||||
package webrtc
|
||||
|
||||
// Server-hop latency benchmark. Build-tagged off the default test
|
||||
// suite because it's a load test, not a unit test:
|
||||
//
|
||||
// go test -tags latency -timeout 60s -count=1 ./app/webrtc/... \
|
||||
// -run TestLatencyServerHop -v
|
||||
//
|
||||
// What this measures
|
||||
// -------------------
|
||||
// RTP packet arrival latency end-to-end through the Core WebRTC
|
||||
// egress path:
|
||||
//
|
||||
// publisher (this test) ── UDP ──▶ corewebrtc.Source
|
||||
// │
|
||||
// ▼ subscriber fan-out
|
||||
// Peer ── ICE+SRTP ──▶ Pion subscriber
|
||||
// │
|
||||
// ▼ ReadRTP
|
||||
//
|
||||
// What it does NOT measure (and why)
|
||||
// ----------------------------------
|
||||
// The design (docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md
|
||||
// §7) calls for true glass-to-glass latency: publisher embeds a frame
|
||||
// counter via FFmpeg drawtext, subscriber decodes H.264 and samples a
|
||||
// pixel bounding box, diff is the e2e number. Implementing that in
|
||||
// pure Go would require a cgo H.264 decoder or an FFmpeg-as-sidecar
|
||||
// pipe. Both are heavier than the ~150 LOC this test costs and add a
|
||||
// dependency that doesn't pay off for the dominant CI question
|
||||
// ("did anybody regress the server hop?"). Encode/decode latency
|
||||
// is roughly fixed by the codec stack and isn't something Core code
|
||||
// changes can move.
|
||||
//
|
||||
// We sidestep the decoder by embedding a wall-clock timestamp in the
|
||||
// RTP packet payload (first 8 bytes, big-endian UnixNano). The
|
||||
// subscriber reads it via track.ReadRTP() and diffs against time.Now()
|
||||
// at arrival. This gives us a true server-hop measurement that
|
||||
// exercises:
|
||||
//
|
||||
// - Source.readLoop unmarshalling
|
||||
// - Source.subscribers fan-out
|
||||
// - forwardRTPSplit goroutine
|
||||
// - Pion's TrackLocalStaticRTP.WriteRTP
|
||||
// - DTLS-SRTP encrypt
|
||||
// - ICE socket write
|
||||
// - DTLS-SRTP decrypt at the subscriber
|
||||
// - subscriber TrackRemote.ReadRTP unmarshal
|
||||
//
|
||||
// Threshold
|
||||
// ---------
|
||||
// p95 < 50ms on a quiet Linux host (loopback + Pion). The CI runner
|
||||
// is shared so we set the gate at 200ms — generous, but a regression
|
||||
// that crosses it indicates a genuine slowdown rather than runner
|
||||
// noise.
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/pion/rtp"
|
||||
pionwebrtc "github.com/pion/webrtc/v4"
|
||||
|
||||
"github.com/datarhei/core/v16/config"
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
const (
|
||||
latencyPackets = 1000
|
||||
latencyRateHz = 60
|
||||
latencyP95Budget = 50 * time.Millisecond // CI gate; p95 is sub-ms locally
|
||||
)
|
||||
|
||||
func TestLatencyServerHop(t *testing.T) {
|
||||
sub, err := New(config.DataWebRTC{Enable: true}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("subsystem New: %v", err)
|
||||
}
|
||||
defer sub.Close()
|
||||
|
||||
h := NewHandler(sub, 0)
|
||||
defer h.Close()
|
||||
|
||||
processID := "latency-probe"
|
||||
legs, err := sub.onProcessStart(processID, &appcfg.Config{
|
||||
ID: processID,
|
||||
WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("onProcessStart: %v", err)
|
||||
}
|
||||
defer sub.onProcessStop(processID)
|
||||
|
||||
videoPort, err := portFromLegAddress(legs[0].Address)
|
||||
if err != nil {
|
||||
t.Fatalf("video port: %v", err)
|
||||
}
|
||||
|
||||
e := echo.New()
|
||||
g := e.Group("")
|
||||
h.Register(g)
|
||||
srv := httptest.NewServer(e)
|
||||
defer srv.Close()
|
||||
|
||||
pc, samples := buildSubscriber(t, srv.URL, processID)
|
||||
defer pc.Close()
|
||||
|
||||
// Sender: synthetic RTP packets with UnixNano in the first 8 bytes
|
||||
// of payload. We only stream video (latency on audio is identical
|
||||
// in this path).
|
||||
conn, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort))
|
||||
if err != nil {
|
||||
t.Fatalf("dial: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
tick := time.NewTicker(time.Second / latencyRateHz)
|
||||
defer tick.Stop()
|
||||
var seq uint16
|
||||
for i := 0; i < latencyPackets; i++ {
|
||||
<-tick.C
|
||||
seq++
|
||||
payload := make([]byte, 200)
|
||||
binary.BigEndian.PutUint64(payload, uint64(time.Now().UnixNano()))
|
||||
pkt := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
PayloadType: 102,
|
||||
SequenceNumber: seq,
|
||||
Timestamp: uint32(seq) * 3000,
|
||||
SSRC: 0xdeadbeef,
|
||||
},
|
||||
Payload: payload,
|
||||
}
|
||||
b, _ := pkt.Marshal()
|
||||
_, _ = conn.Write(b)
|
||||
}
|
||||
|
||||
// Wait for the receiver to drain — give it 2× the send window.
|
||||
deadline := time.After(time.Duration(latencyPackets*2) * time.Second / latencyRateHz)
|
||||
for {
|
||||
if int(samples.Load()) >= latencyPackets-50 {
|
||||
break // 5% tolerance for in-flight loss; loopback rarely loses
|
||||
}
|
||||
select {
|
||||
case <-deadline:
|
||||
break
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
got := samples.Drain()
|
||||
if len(got) < latencyPackets/2 {
|
||||
t.Fatalf("only %d/%d samples received — too lossy to gate", len(got), latencyPackets)
|
||||
}
|
||||
p50, p95, p99 := percentile(got, 50), percentile(got, 95), percentile(got, 99)
|
||||
t.Logf("latency over %d samples: p50=%v p95=%v p99=%v",
|
||||
len(got), p50, p95, p99)
|
||||
|
||||
if p95 > latencyP95Budget {
|
||||
t.Fatalf("p95 latency %v exceeds budget %v (%d samples)",
|
||||
p95, latencyP95Budget, len(got))
|
||||
}
|
||||
}
|
||||
|
||||
// latencySamples is a goroutine-safe append-only sample buffer. The
|
||||
// receiver goroutine appends; the test goroutine reads via Drain
|
||||
// after the run completes.
|
||||
type latencySamples struct {
|
||||
mu sync.Mutex
|
||||
samples []time.Duration
|
||||
count atomic.Int32
|
||||
}
|
||||
|
||||
func (s *latencySamples) Add(d time.Duration) {
|
||||
s.mu.Lock()
|
||||
s.samples = append(s.samples, d)
|
||||
s.mu.Unlock()
|
||||
s.count.Add(1)
|
||||
}
|
||||
func (s *latencySamples) Load() int32 { return s.count.Load() }
|
||||
func (s *latencySamples) Drain() []time.Duration {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
out := make([]time.Duration, len(s.samples))
|
||||
copy(out, s.samples)
|
||||
return out
|
||||
}
|
||||
|
||||
// buildSubscriber spins up a Pion peer, performs the WHEP handshake,
|
||||
// returns a samples buffer that latencyArrival fills as packets land.
|
||||
func buildSubscriber(t *testing.T, srvURL, processID string) (*pionwebrtc.PeerConnection, *latencySamples) {
|
||||
t.Helper()
|
||||
me := &pionwebrtc.MediaEngine{}
|
||||
if err := me.RegisterDefaultCodecs(); err != nil {
|
||||
t.Fatalf("register codecs: %v", err)
|
||||
}
|
||||
api := pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me))
|
||||
pc, err := api.NewPeerConnection(pionwebrtc.Configuration{})
|
||||
if err != nil {
|
||||
t.Fatalf("new PC: %v", err)
|
||||
}
|
||||
if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo,
|
||||
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||||
t.Fatalf("add video tx: %v", err)
|
||||
}
|
||||
if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio,
|
||||
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||||
t.Fatalf("add audio tx: %v", err)
|
||||
}
|
||||
|
||||
samples := &latencySamples{}
|
||||
pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) {
|
||||
if tr.Kind() != pionwebrtc.RTPCodecTypeVideo {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
p, _, err := tr.ReadRTP()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(p.Payload) < 8 {
|
||||
continue
|
||||
}
|
||||
sentNs := int64(binary.BigEndian.Uint64(p.Payload[:8]))
|
||||
samples.Add(time.Duration(time.Now().UnixNano() - sentNs))
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
offer, err := pc.CreateOffer(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("offer: %v", err)
|
||||
}
|
||||
gather := pionwebrtc.GatheringCompletePromise(pc)
|
||||
if err := pc.SetLocalDescription(offer); err != nil {
|
||||
t.Fatalf("set local: %v", err)
|
||||
}
|
||||
<-gather
|
||||
|
||||
resp, err := http.Post(srvURL+"/whep/"+processID, "application/sdp",
|
||||
strings.NewReader(pc.LocalDescription().SDP))
|
||||
if err != nil {
|
||||
t.Fatalf("POST /whep: %v", err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
t.Fatalf("WHEP status = %d", resp.StatusCode)
|
||||
}
|
||||
buf := make([]byte, 1<<15)
|
||||
n, _ := resp.Body.Read(buf)
|
||||
resp.Body.Close()
|
||||
if err := pc.SetRemoteDescription(pionwebrtc.SessionDescription{
|
||||
Type: pionwebrtc.SDPTypeAnswer,
|
||||
SDP: string(buf[:n]),
|
||||
}); err != nil {
|
||||
t.Fatalf("set remote: %v", err)
|
||||
}
|
||||
// Give ICE a moment to settle before the publisher fires.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
return pc, samples
|
||||
}
|
||||
|
||||
func percentile(samples []time.Duration, p int) time.Duration {
|
||||
if len(samples) == 0 {
|
||||
return 0
|
||||
}
|
||||
sort.Slice(samples, func(i, j int) bool { return samples[i] < samples[j] })
|
||||
idx := (p * len(samples)) / 100
|
||||
if idx >= len(samples) {
|
||||
idx = len(samples) - 1
|
||||
}
|
||||
return samples[idx]
|
||||
}
|
||||
|
||||
|
|
@ -94,6 +94,7 @@ func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.Conf
|
|||
func (s *Subsystem) onProcessStop(id string) {
|
||||
s.mu.Lock()
|
||||
st, ok := s.streams[id]
|
||||
teardown := s.teardown
|
||||
if ok {
|
||||
delete(s.streams, id)
|
||||
}
|
||||
|
|
@ -102,6 +103,16 @@ func (s *Subsystem) onProcessStop(id string) {
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Broadcast first, so any subscribed peers get torn down while
|
||||
// the streamID is still meaningful. The handler's tearDownStreamPeers
|
||||
// drives each Peer.Close() which in turn unsubscribes from the
|
||||
// Sources we're about to shut down — preventing a "subscribers fan
|
||||
// out into a closed channel" race.
|
||||
if teardown != nil {
|
||||
teardown(id)
|
||||
}
|
||||
|
||||
if st.video != nil {
|
||||
_ = st.video.Close()
|
||||
}
|
||||
|
|
|
|||
257
app/webrtc/multiviewer_test.go
Normal file
257
app/webrtc/multiviewer_test.go
Normal file
|
|
@ -0,0 +1,257 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
pionwebrtc "github.com/pion/webrtc/v4"
|
||||
|
||||
"github.com/datarhei/core/v16/config"
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
// TestIntegration_FiveViewerFanout drives the M3 acceptance criterion
|
||||
// "5 concurrent viewers, all error paths correct, clean teardown" in
|
||||
// the wide direction. Five Pion subscribers attach to a single
|
||||
// process's stream pair and each receives RTP without crosstalk; on
|
||||
// teardown every subscriber's PeerConnection observes its tracks
|
||||
// closing.
|
||||
//
|
||||
// Verifies (in order):
|
||||
// * subsystem.onProcessStart returns adjacent UDP ports
|
||||
// * 5 WHEP POSTs in parallel succeed (per-stream cap default = 8)
|
||||
// * every subscriber's video and audio track receives at least one
|
||||
// RTP packet within the timeout
|
||||
// * onProcessStop tears every subscriber down (PeerConnection
|
||||
// transitions away from connected/connecting)
|
||||
func TestIntegration_FiveViewerFanout(t *testing.T) {
|
||||
const N = 5
|
||||
|
||||
sub, err := New(config.DataWebRTC{Enable: true}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("subsystem New: %v", err)
|
||||
}
|
||||
defer sub.Close()
|
||||
|
||||
h := NewHandler(sub, 0)
|
||||
defer h.Close()
|
||||
|
||||
processID := "fanout"
|
||||
legs, err := sub.onProcessStart(processID, &appcfg.Config{
|
||||
ID: processID,
|
||||
WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("onProcessStart: %v", err)
|
||||
}
|
||||
if len(legs) != 2 {
|
||||
t.Fatalf("expected 2 legs, got %d", len(legs))
|
||||
}
|
||||
videoPort, err := portFromLegAddress(legs[0].Address)
|
||||
if err != nil {
|
||||
t.Fatalf("video port: %v", err)
|
||||
}
|
||||
audioPort, err := portFromLegAddress(legs[1].Address)
|
||||
if err != nil {
|
||||
t.Fatalf("audio port: %v", err)
|
||||
}
|
||||
|
||||
e := echo.New()
|
||||
g := e.Group("")
|
||||
h.Register(g)
|
||||
srv := httptest.NewServer(e)
|
||||
defer srv.Close()
|
||||
|
||||
// Each subscriber tracks first-RTP-received signals for V and A.
|
||||
type viewer struct {
|
||||
pc *pionwebrtc.PeerConnection
|
||||
videoCh chan struct{}
|
||||
audioCh chan struct{}
|
||||
}
|
||||
viewers := make([]*viewer, N)
|
||||
api := func() *pionwebrtc.API {
|
||||
me := &pionwebrtc.MediaEngine{}
|
||||
_ = me.RegisterDefaultCodecs()
|
||||
return pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me))
|
||||
}()
|
||||
|
||||
subscribe := func(i int) error {
|
||||
pc, err := api.NewPeerConnection(pionwebrtc.Configuration{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v := &viewer{pc: pc, videoCh: make(chan struct{}, 1), audioCh: make(chan struct{}, 1)}
|
||||
viewers[i] = v
|
||||
var vGot, aGot atomic.Bool
|
||||
pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) {
|
||||
go func() {
|
||||
if _, _, rerr := tr.ReadRTP(); rerr != nil {
|
||||
return
|
||||
}
|
||||
switch tr.Kind() {
|
||||
case pionwebrtc.RTPCodecTypeVideo:
|
||||
if vGot.CompareAndSwap(false, true) {
|
||||
v.videoCh <- struct{}{}
|
||||
}
|
||||
case pionwebrtc.RTPCodecTypeAudio:
|
||||
if aGot.CompareAndSwap(false, true) {
|
||||
v.audioCh <- struct{}{}
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
_, _ = pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo,
|
||||
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly})
|
||||
_, _ = pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio,
|
||||
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly})
|
||||
offer, err := pc.CreateOffer(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
gather := pionwebrtc.GatheringCompletePromise(pc)
|
||||
if err := pc.SetLocalDescription(offer); err != nil {
|
||||
return err
|
||||
}
|
||||
<-gather
|
||||
resp, err := http.Post(srv.URL+"/whep/"+processID, "application/sdp",
|
||||
strings.NewReader(pc.LocalDescription().SDP))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
t.Errorf("viewer %d: WHEP %d", i, resp.StatusCode)
|
||||
return nil
|
||||
}
|
||||
buf := make([]byte, 1<<15)
|
||||
n, _ := resp.Body.Read(buf)
|
||||
return pc.SetRemoteDescription(pionwebrtc.SessionDescription{
|
||||
Type: pionwebrtc.SDPTypeAnswer,
|
||||
SDP: string(buf[:n]),
|
||||
})
|
||||
}
|
||||
|
||||
// Subscribe all N viewers in parallel.
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < N; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
if err := subscribe(i); err != nil {
|
||||
t.Errorf("viewer %d subscribe: %v", i, err)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i := 0; i < N; i++ {
|
||||
if viewers[i] == nil || viewers[i].pc == nil {
|
||||
t.Fatalf("viewer %d not constructed", i)
|
||||
}
|
||||
defer viewers[i].pc.Close()
|
||||
}
|
||||
|
||||
// Spray RTP into both ports until every viewer reports first-RTP.
|
||||
videoSender, _ := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort))
|
||||
audioSender, _ := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(audioPort))
|
||||
defer videoSender.Close()
|
||||
defer audioSender.Close()
|
||||
stop := make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(20 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
var seq uint16
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
seq++
|
||||
_, _ = videoSender.Write(synthRTPPacket(102, seq, uint32(seq)*3000, 0xcafe0000, []byte("vvvvvvvv")))
|
||||
_, _ = audioSender.Write(synthRTPPacket(111, seq, uint32(seq)*960, 0xbeef0000, []byte("aaaaaaaa")))
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(stop)
|
||||
|
||||
deadline := time.After(15 * time.Second)
|
||||
for i, v := range viewers {
|
||||
select {
|
||||
case <-v.videoCh:
|
||||
case <-deadline:
|
||||
t.Fatalf("viewer %d: no video RTP within 15s", i)
|
||||
}
|
||||
select {
|
||||
case <-v.audioCh:
|
||||
case <-deadline:
|
||||
t.Fatalf("viewer %d: no audio RTP within 15s", i)
|
||||
}
|
||||
}
|
||||
|
||||
// Confirm the per-stream peer index has all N entries.
|
||||
h.mu.Lock()
|
||||
got := len(h.peersByStream[processID])
|
||||
h.mu.Unlock()
|
||||
if got != N {
|
||||
t.Errorf("peersByStream[%s] = %d, want %d", processID, got, N)
|
||||
}
|
||||
|
||||
// Tear the process down — every viewer's PC should observe state
|
||||
// transitioning away from connected within a short window.
|
||||
sub.onProcessStop(processID)
|
||||
|
||||
// After teardown the peer index for this stream should be empty.
|
||||
// Closing peers is async (driven by Done channel), so poll briefly.
|
||||
deadline2 := time.Now().Add(3 * time.Second)
|
||||
for time.Now().Before(deadline2) {
|
||||
h.mu.Lock()
|
||||
empty := len(h.peersByStream[processID]) == 0
|
||||
h.mu.Unlock()
|
||||
if empty {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
h.mu.Lock()
|
||||
leftover := len(h.peersByStream[processID])
|
||||
h.mu.Unlock()
|
||||
if leftover != 0 {
|
||||
t.Errorf("after onProcessStop, %d peers remain in index", leftover)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSubsystem_TeardownHookFiresOnProcessStop is a unit-level check
|
||||
// that the teardown callback the Handler installs actually runs.
|
||||
func TestSubsystem_TeardownHookFiresOnProcessStop(t *testing.T) {
|
||||
sub, err := New(config.DataWebRTC{Enable: true}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
defer sub.Close()
|
||||
|
||||
var fired atomic.Int32
|
||||
sub.SetTeardownHook(func(streamID string) {
|
||||
if streamID == "p1" {
|
||||
fired.Add(1)
|
||||
}
|
||||
})
|
||||
|
||||
if _, err := sub.onProcessStart("p1", &appcfg.Config{
|
||||
ID: "p1",
|
||||
WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111},
|
||||
}); err != nil {
|
||||
t.Fatalf("onProcessStart: %v", err)
|
||||
}
|
||||
sub.onProcessStop("p1")
|
||||
if got := fired.Load(); got != 1 {
|
||||
t.Errorf("teardown fired %d times, want 1", got)
|
||||
}
|
||||
}
|
||||
|
|
@ -31,6 +31,12 @@ type Subsystem struct {
|
|||
|
||||
mu sync.Mutex
|
||||
streams map[string]*processStream // processID -> stream pair
|
||||
|
||||
// teardown is set by the Handler (or any other consumer) so the
|
||||
// Subsystem can broadcast process-stop events. Called *before*
|
||||
// the per-stream Sources are closed, so consumers can yank their
|
||||
// own indexes while the stream id is still valid.
|
||||
teardown func(streamID string)
|
||||
}
|
||||
|
||||
// processStream captures the two Sources (video + audio) backing a
|
||||
|
|
@ -110,6 +116,19 @@ func (s *Subsystem) Close() {
|
|||
}
|
||||
}
|
||||
|
||||
// SetTeardownHook registers a callback invoked just before a stream's
|
||||
// Sources are closed in onProcessStop. The callback is expected to
|
||||
// tear down any external resources keyed by streamID — most importantly
|
||||
// the WHEP Handler's per-stream peer index.
|
||||
//
|
||||
// Calling SetTeardownHook again replaces the previous callback; pass
|
||||
// nil to detach. Only one consumer is supported by design.
|
||||
func (s *Subsystem) SetTeardownHook(fn func(streamID string)) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.teardown = fn
|
||||
}
|
||||
|
||||
// lookup returns the per-process stream pair for id, or nil, false.
|
||||
// Used by the WHEP handler.
|
||||
func (s *Subsystem) lookup(id string) (*processStream, bool) {
|
||||
|
|
|
|||
|
|
@ -152,6 +152,12 @@ func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }
|
|||
// ResourceID returns the stable resource id used in the WHEP Location header.
|
||||
func (p *Peer) ResourceID() string { return p.resourceID }
|
||||
|
||||
// Done returns a channel that is closed when the Peer has been torn down
|
||||
// (either explicitly via Close, or because Pion observed an ICE
|
||||
// failure / disconnection). Consumers can range over it to drive
|
||||
// index cleanup without polling.
|
||||
func (p *Peer) Done() <-chan struct{} { return p.done }
|
||||
|
||||
// Close tears down the peer connection and unsubscribes from each
|
||||
// source. Safe to call multiple times.
|
||||
func (p *Peer) Close() error {
|
||||
|
|
@ -257,6 +263,14 @@ func (f *PeerFactory) CreatePeerFromSources(ctx context.Context,
|
|||
return p, nil
|
||||
}
|
||||
|
||||
|
||||
// AddICECandidate forwards a trickle-ICE candidate to the underlying
|
||||
// PeerConnection. Returns the underlying error if the candidate is
|
||||
// malformed or the connection has already been closed.
|
||||
func (p *Peer) AddICECandidate(c webrtc.ICECandidateInit) error {
|
||||
return p.pc.AddICECandidate(c)
|
||||
}
|
||||
|
||||
func newResourceID() string {
|
||||
b := make([]byte, 8)
|
||||
_, _ = rand.Read(b)
|
||||
|
|
|
|||
224
docs/docs.go
224
docs/docs.go
|
|
@ -1,4 +1,4 @@
|
|||
// Code generated by swaggo/swag. DO NOT EDIT
|
||||
// Package docs Code generated by swaggo/swag. DO NOT EDIT
|
||||
package docs
|
||||
|
||||
import "github.com/swaggo/swag"
|
||||
|
|
@ -1903,6 +1903,165 @@ const docTemplate = `{
|
|||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/whep/{id}": {
|
||||
"post": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.",
|
||||
"consumes": [
|
||||
"application/sdp"
|
||||
],
|
||||
"produces": [
|
||||
"application/sdp"
|
||||
],
|
||||
"tags": [
|
||||
"v16.16.0"
|
||||
],
|
||||
"summary": "Subscribe to a WebRTC stream via WHEP",
|
||||
"operationId": "webrtc-3-whep-subscribe",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Process ID with config.webrtc.enabled=true",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"201": {
|
||||
"description": "SDP answer",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "missing stream id, malformed body, or invalid SDP",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "no stream registered for this process id",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"406": {
|
||||
"description": "offer SDP missing required H264 / Opus rtpmap",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"503": {
|
||||
"description": "peer cap reached (per-stream or total)",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"504": {
|
||||
"description": "ICE gathering timeout",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/whep/{id}/{resource}": {
|
||||
"delete": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.",
|
||||
"tags": [
|
||||
"v16.16.0"
|
||||
],
|
||||
"summary": "Tear down a WHEP subscription",
|
||||
"operationId": "webrtc-3-whep-unsubscribe",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Process ID",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Resource ID from the Subscribe Location header",
|
||||
"name": "resource",
|
||||
"in": "path",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"204": {
|
||||
"description": "no content"
|
||||
},
|
||||
"400": {
|
||||
"description": "missing resource id",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"patch": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.",
|
||||
"consumes": [
|
||||
"application/trickle-ice-sdpfrag"
|
||||
],
|
||||
"tags": [
|
||||
"v16.16.0"
|
||||
],
|
||||
"summary": "Trickle ICE candidates for a WHEP subscription",
|
||||
"operationId": "webrtc-3-whep-trickle",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Process ID",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Resource ID from the Subscribe Location header",
|
||||
"name": "resource",
|
||||
"in": "path",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"204": {
|
||||
"description": "no content"
|
||||
},
|
||||
"400": {
|
||||
"description": "missing resource id or unreadable body",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "peer not found",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/widget/process/{id}": {
|
||||
"get": {
|
||||
"description": "Fetch minimal statistics about a process, which is not protected by any auth.",
|
||||
|
|
@ -2082,6 +2241,10 @@ const docTemplate = `{
|
|||
"created_at": {
|
||||
"type": "string"
|
||||
},
|
||||
"fork": {
|
||||
"description": "Fork is the human-readable fork name (e.g. \"Datarhei — Dragon Fork\").",
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
|
|
@ -2091,6 +2254,10 @@ const docTemplate = `{
|
|||
"uptime_seconds": {
|
||||
"type": "integer"
|
||||
},
|
||||
"variant": {
|
||||
"description": "Variant identifies the build flavor — empty (or \"core\") for an\nupstream Datarhei build, \"dragonfork\" for the Dragon Fork.",
|
||||
"type": "string"
|
||||
},
|
||||
"version": {
|
||||
"$ref": "#/definitions/api.Version"
|
||||
}
|
||||
|
|
@ -2629,6 +2796,9 @@ const docTemplate = `{
|
|||
"version": {
|
||||
"type": "integer",
|
||||
"format": "int64"
|
||||
},
|
||||
"webrtc": {
|
||||
"$ref": "#/definitions/config.DataWebRTC"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
@ -3109,6 +3279,9 @@ const docTemplate = `{
|
|||
"ffmpeg",
|
||||
""
|
||||
]
|
||||
},
|
||||
"webrtc": {
|
||||
"$ref": "#/definitions/api.ProcessConfigWebRTC"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
@ -3176,6 +3349,29 @@ const docTemplate = `{
|
|||
}
|
||||
}
|
||||
},
|
||||
"api.ProcessConfigWebRTC": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"audio_map": {
|
||||
"type": "string"
|
||||
},
|
||||
"audio_pt": {
|
||||
"type": "integer"
|
||||
},
|
||||
"enabled": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"force_transcode": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"video_map": {
|
||||
"type": "string"
|
||||
},
|
||||
"video_pt": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ProcessReport": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
|
@ -4441,6 +4637,9 @@ const docTemplate = `{
|
|||
"version": {
|
||||
"type": "integer",
|
||||
"format": "int64"
|
||||
},
|
||||
"webrtc": {
|
||||
"$ref": "#/definitions/config.DataWebRTC"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
@ -4709,6 +4908,27 @@ const docTemplate = `{
|
|||
}
|
||||
}
|
||||
},
|
||||
"config.DataWebRTC": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"enable": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"nat_1_to_1_ips": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"public_ip": {
|
||||
"type": "string"
|
||||
},
|
||||
"udp_mux_port": {
|
||||
"type": "integer",
|
||||
"format": "int"
|
||||
}
|
||||
}
|
||||
},
|
||||
"github_com_datarhei_core_v16_http_api.Config": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
|
@ -4831,6 +5051,8 @@ var SwaggerInfo = &swag.Spec{
|
|||
Description: "Expose REST API for the datarhei Core",
|
||||
InfoInstanceName: "swagger",
|
||||
SwaggerTemplate: docTemplate,
|
||||
LeftDelim: "{{",
|
||||
RightDelim: "}}",
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
|||
|
|
@ -1896,6 +1896,165 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/whep/{id}": {
|
||||
"post": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.",
|
||||
"consumes": [
|
||||
"application/sdp"
|
||||
],
|
||||
"produces": [
|
||||
"application/sdp"
|
||||
],
|
||||
"tags": [
|
||||
"v16.16.0"
|
||||
],
|
||||
"summary": "Subscribe to a WebRTC stream via WHEP",
|
||||
"operationId": "webrtc-3-whep-subscribe",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Process ID with config.webrtc.enabled=true",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"201": {
|
||||
"description": "SDP answer",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "missing stream id, malformed body, or invalid SDP",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "no stream registered for this process id",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"406": {
|
||||
"description": "offer SDP missing required H264 / Opus rtpmap",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"503": {
|
||||
"description": "peer cap reached (per-stream or total)",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"504": {
|
||||
"description": "ICE gathering timeout",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/whep/{id}/{resource}": {
|
||||
"delete": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.",
|
||||
"tags": [
|
||||
"v16.16.0"
|
||||
],
|
||||
"summary": "Tear down a WHEP subscription",
|
||||
"operationId": "webrtc-3-whep-unsubscribe",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Process ID",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Resource ID from the Subscribe Location header",
|
||||
"name": "resource",
|
||||
"in": "path",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"204": {
|
||||
"description": "no content"
|
||||
},
|
||||
"400": {
|
||||
"description": "missing resource id",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"patch": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.",
|
||||
"consumes": [
|
||||
"application/trickle-ice-sdpfrag"
|
||||
],
|
||||
"tags": [
|
||||
"v16.16.0"
|
||||
],
|
||||
"summary": "Trickle ICE candidates for a WHEP subscription",
|
||||
"operationId": "webrtc-3-whep-trickle",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Process ID",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Resource ID from the Subscribe Location header",
|
||||
"name": "resource",
|
||||
"in": "path",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"204": {
|
||||
"description": "no content"
|
||||
},
|
||||
"400": {
|
||||
"description": "missing resource id or unreadable body",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "peer not found",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/widget/process/{id}": {
|
||||
"get": {
|
||||
"description": "Fetch minimal statistics about a process, which is not protected by any auth.",
|
||||
|
|
@ -2075,6 +2234,10 @@
|
|||
"created_at": {
|
||||
"type": "string"
|
||||
},
|
||||
"fork": {
|
||||
"description": "Fork is the human-readable fork name (e.g. \"Datarhei — Dragon Fork\").",
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
|
|
@ -2084,6 +2247,10 @@
|
|||
"uptime_seconds": {
|
||||
"type": "integer"
|
||||
},
|
||||
"variant": {
|
||||
"description": "Variant identifies the build flavor — empty (or \"core\") for an\nupstream Datarhei build, \"dragonfork\" for the Dragon Fork.",
|
||||
"type": "string"
|
||||
},
|
||||
"version": {
|
||||
"$ref": "#/definitions/api.Version"
|
||||
}
|
||||
|
|
@ -2622,6 +2789,9 @@
|
|||
"version": {
|
||||
"type": "integer",
|
||||
"format": "int64"
|
||||
},
|
||||
"webrtc": {
|
||||
"$ref": "#/definitions/config.DataWebRTC"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
@ -3102,6 +3272,9 @@
|
|||
"ffmpeg",
|
||||
""
|
||||
]
|
||||
},
|
||||
"webrtc": {
|
||||
"$ref": "#/definitions/api.ProcessConfigWebRTC"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
@ -3169,6 +3342,29 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"api.ProcessConfigWebRTC": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"audio_map": {
|
||||
"type": "string"
|
||||
},
|
||||
"audio_pt": {
|
||||
"type": "integer"
|
||||
},
|
||||
"enabled": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"force_transcode": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"video_map": {
|
||||
"type": "string"
|
||||
},
|
||||
"video_pt": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ProcessReport": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
|
@ -4434,6 +4630,9 @@
|
|||
"version": {
|
||||
"type": "integer",
|
||||
"format": "int64"
|
||||
},
|
||||
"webrtc": {
|
||||
"$ref": "#/definitions/config.DataWebRTC"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
@ -4702,6 +4901,27 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"config.DataWebRTC": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"enable": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"nat_1_to_1_ips": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"public_ip": {
|
||||
"type": "string"
|
||||
},
|
||||
"udp_mux_port": {
|
||||
"type": "integer",
|
||||
"format": "int"
|
||||
}
|
||||
}
|
||||
},
|
||||
"github_com_datarhei_core_v16_http_api.Config": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
|
|
|||
|
|
@ -56,12 +56,21 @@ definitions:
|
|||
type: array
|
||||
created_at:
|
||||
type: string
|
||||
fork:
|
||||
description: Fork is the human-readable fork name (e.g. "Datarhei — Dragon
|
||||
Fork").
|
||||
type: string
|
||||
id:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
uptime_seconds:
|
||||
type: integer
|
||||
variant:
|
||||
description: |-
|
||||
Variant identifies the build flavor — empty (or "core") for an
|
||||
upstream Datarhei build, "dragonfork" for the Dragon Fork.
|
||||
type: string
|
||||
version:
|
||||
$ref: '#/definitions/api.Version'
|
||||
type: object
|
||||
|
|
@ -420,6 +429,8 @@ definitions:
|
|||
version:
|
||||
format: int64
|
||||
type: integer
|
||||
webrtc:
|
||||
$ref: '#/definitions/config.DataWebRTC'
|
||||
type: object
|
||||
api.ConfigError:
|
||||
additionalProperties:
|
||||
|
|
@ -743,6 +754,8 @@ definitions:
|
|||
- ffmpeg
|
||||
- ""
|
||||
type: string
|
||||
webrtc:
|
||||
$ref: '#/definitions/api.ProcessConfigWebRTC'
|
||||
required:
|
||||
- input
|
||||
- output
|
||||
|
|
@ -790,6 +803,21 @@ definitions:
|
|||
format: uint64
|
||||
type: integer
|
||||
type: object
|
||||
api.ProcessConfigWebRTC:
|
||||
properties:
|
||||
audio_map:
|
||||
type: string
|
||||
audio_pt:
|
||||
type: integer
|
||||
enabled:
|
||||
type: boolean
|
||||
force_transcode:
|
||||
type: boolean
|
||||
video_map:
|
||||
type: string
|
||||
video_pt:
|
||||
type: integer
|
||||
type: object
|
||||
api.ProcessReport:
|
||||
properties:
|
||||
created_at:
|
||||
|
|
@ -1709,6 +1737,8 @@ definitions:
|
|||
version:
|
||||
format: int64
|
||||
type: integer
|
||||
webrtc:
|
||||
$ref: '#/definitions/config.DataWebRTC'
|
||||
type: object
|
||||
api.Skills:
|
||||
properties:
|
||||
|
|
@ -1882,6 +1912,20 @@ definitions:
|
|||
uptime:
|
||||
type: integer
|
||||
type: object
|
||||
config.DataWebRTC:
|
||||
properties:
|
||||
enable:
|
||||
type: boolean
|
||||
nat_1_to_1_ips:
|
||||
items:
|
||||
type: string
|
||||
type: array
|
||||
public_ip:
|
||||
type: string
|
||||
udp_mux_port:
|
||||
format: int
|
||||
type: integer
|
||||
type: object
|
||||
github_com_datarhei_core_v16_http_api.Config:
|
||||
properties:
|
||||
config:
|
||||
|
|
@ -3186,6 +3230,113 @@ paths:
|
|||
summary: List all publishing SRT treams
|
||||
tags:
|
||||
- v16.9.0
|
||||
/api/v3/whep/{id}:
|
||||
post:
|
||||
consumes:
|
||||
- application/sdp
|
||||
description: 'Subscribe to a process''s WebRTC egress stream. Body is the SDP
|
||||
offer (Content-Type: application/sdp). Response is the SDP answer; the Location
|
||||
header points at the DELETE/PATCH resource for teardown and trickle ICE.'
|
||||
operationId: webrtc-3-whep-subscribe
|
||||
parameters:
|
||||
- description: Process ID with config.webrtc.enabled=true
|
||||
in: path
|
||||
name: id
|
||||
required: true
|
||||
type: string
|
||||
produces:
|
||||
- application/sdp
|
||||
responses:
|
||||
"201":
|
||||
description: SDP answer
|
||||
schema:
|
||||
type: string
|
||||
"400":
|
||||
description: missing stream id, malformed body, or invalid SDP
|
||||
schema:
|
||||
type: string
|
||||
"404":
|
||||
description: no stream registered for this process id
|
||||
schema:
|
||||
type: string
|
||||
"406":
|
||||
description: offer SDP missing required H264 / Opus rtpmap
|
||||
schema:
|
||||
type: string
|
||||
"503":
|
||||
description: peer cap reached (per-stream or total)
|
||||
schema:
|
||||
type: string
|
||||
"504":
|
||||
description: ICE gathering timeout
|
||||
schema:
|
||||
type: string
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
summary: Subscribe to a WebRTC stream via WHEP
|
||||
tags:
|
||||
- v16.16.0
|
||||
/api/v3/whep/{id}/{resource}:
|
||||
delete:
|
||||
description: Idempotent peer teardown by resource id (returned in the Location
|
||||
header by Subscribe). Returns 204 even when the resource is unknown, per the
|
||||
WHEP spec.
|
||||
operationId: webrtc-3-whep-unsubscribe
|
||||
parameters:
|
||||
- description: Process ID
|
||||
in: path
|
||||
name: id
|
||||
required: true
|
||||
type: string
|
||||
- description: Resource ID from the Subscribe Location header
|
||||
in: path
|
||||
name: resource
|
||||
required: true
|
||||
type: string
|
||||
responses:
|
||||
"204":
|
||||
description: no content
|
||||
"400":
|
||||
description: missing resource id
|
||||
schema:
|
||||
type: string
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
summary: Tear down a WHEP subscription
|
||||
tags:
|
||||
- v16.16.0
|
||||
patch:
|
||||
consumes:
|
||||
- application/trickle-ice-sdpfrag
|
||||
description: Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.
|
||||
operationId: webrtc-3-whep-trickle
|
||||
parameters:
|
||||
- description: Process ID
|
||||
in: path
|
||||
name: id
|
||||
required: true
|
||||
type: string
|
||||
- description: Resource ID from the Subscribe Location header
|
||||
in: path
|
||||
name: resource
|
||||
required: true
|
||||
type: string
|
||||
responses:
|
||||
"204":
|
||||
description: no content
|
||||
"400":
|
||||
description: missing resource id or unreadable body
|
||||
schema:
|
||||
type: string
|
||||
"404":
|
||||
description: peer not found
|
||||
schema:
|
||||
type: string
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
summary: Trickle ICE candidates for a WHEP subscription
|
||||
tags:
|
||||
- v16.16.0
|
||||
/api/v3/widget/process/{id}:
|
||||
get:
|
||||
description: Fetch minimal statistics about a process, which is not protected
|
||||
|
|
|
|||
|
|
@ -3,6 +3,11 @@ package api
|
|||
// About is some general information about the API
|
||||
type About struct {
|
||||
App string `json:"app"`
|
||||
// Variant identifies the build flavor — empty (or "core") for an
|
||||
// upstream Datarhei build, "dragonfork" for the Dragon Fork.
|
||||
Variant string `json:"variant,omitempty"`
|
||||
// Fork is the human-readable fork name (e.g. "Datarhei — Dragon Fork").
|
||||
Fork string `json:"fork,omitempty"`
|
||||
Auths []string `json:"auths"`
|
||||
Name string `json:"name"`
|
||||
ID string `json:"id"`
|
||||
|
|
|
|||
|
|
@ -44,10 +44,12 @@ type ProcessConfigLimits struct {
|
|||
|
||||
// ProcessConfig represents the configuration of an ffmpeg process
|
||||
type ProcessConfigWebRTC struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
VideoPT uint8 `json:"video_pt,omitempty"`
|
||||
AudioPT uint8 `json:"audio_pt,omitempty"`
|
||||
ForceTranscode bool `json:"force_transcode,omitempty"`
|
||||
Enabled bool `json:"enabled"`
|
||||
VideoPT uint8 `json:"video_pt,omitempty"`
|
||||
AudioPT uint8 `json:"audio_pt,omitempty"`
|
||||
ForceTranscode bool `json:"force_transcode,omitempty"`
|
||||
VideoMap string `json:"video_map,omitempty"`
|
||||
AudioMap string `json:"audio_map,omitempty"`
|
||||
}
|
||||
|
||||
type ProcessConfig struct {
|
||||
|
|
@ -83,6 +85,8 @@ func (cfg *ProcessConfig) Marshal() *app.Config {
|
|||
VideoPT: cfg.WebRTC.VideoPT,
|
||||
AudioPT: cfg.WebRTC.AudioPT,
|
||||
ForceTranscode: cfg.WebRTC.ForceTranscode,
|
||||
VideoMap: cfg.WebRTC.VideoMap,
|
||||
AudioMap: cfg.WebRTC.AudioMap,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -168,6 +172,8 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) {
|
|||
cfg.WebRTC.VideoPT = c.WebRTC.VideoPT
|
||||
cfg.WebRTC.AudioPT = c.WebRTC.AudioPT
|
||||
cfg.WebRTC.ForceTranscode = c.WebRTC.ForceTranscode
|
||||
cfg.WebRTC.VideoMap = c.WebRTC.VideoMap
|
||||
cfg.WebRTC.AudioMap = c.WebRTC.AudioMap
|
||||
|
||||
cfg.Options = make([]string, len(c.Options))
|
||||
copy(cfg.Options, c.Options)
|
||||
|
|
|
|||
|
|
@ -79,3 +79,31 @@ func TestProcessConfigWebRTCDefaults(t *testing.T) {
|
|||
t.Fatalf("default should be disabled, got %+v", cfg.WebRTC)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProcessConfigWebRTCMapsRoundtrip extends the WebRTC DTO
|
||||
// roundtrip with the issue-#2 VideoMap/AudioMap fields so the
|
||||
// regression doesn't repeat: a multi-input pipeline that sets
|
||||
// `audio_map: "1:a:0"` must reach the restream config layer
|
||||
// unchanged.
|
||||
func TestProcessConfigWebRTCMapsRoundtrip(t *testing.T) {
|
||||
body := []byte(`{
|
||||
"id":"p","input":[{"id":"i","address":"x"}],"output":[{"id":"o","address":"-"}],
|
||||
"webrtc":{"enabled":true,"video_map":"0:v:1","audio_map":"1:a:0"}
|
||||
}`)
|
||||
var dto ProcessConfig
|
||||
if err := json.Unmarshal(body, &dto); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
if dto.WebRTC.VideoMap != "0:v:1" || dto.WebRTC.AudioMap != "1:a:0" {
|
||||
t.Fatalf("DTO maps lost: %+v", dto.WebRTC)
|
||||
}
|
||||
cfg := dto.Marshal()
|
||||
if cfg.WebRTC.VideoMap != "0:v:1" || cfg.WebRTC.AudioMap != "1:a:0" {
|
||||
t.Fatalf("app.Config maps lost: %+v", cfg.WebRTC)
|
||||
}
|
||||
var back ProcessConfig
|
||||
back.Unmarshal(cfg)
|
||||
if back.WebRTC.VideoMap != "0:v:1" || back.WebRTC.AudioMap != "1:a:0" {
|
||||
t.Fatalf("Unmarshal lost maps: %+v", back.WebRTC)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,8 @@ func (p *AboutHandler) About(c echo.Context) error {
|
|||
|
||||
about := api.About{
|
||||
App: app.Name,
|
||||
Variant: app.Variant,
|
||||
Fork: app.Fork,
|
||||
Name: p.restream.Name(),
|
||||
Auths: p.auths,
|
||||
ID: p.restream.ID(),
|
||||
|
|
|
|||
|
|
@ -25,10 +25,18 @@ type ConfigIO struct {
|
|||
// RTP to a loopback UDP port the subsystem allocates. The subsystem reads
|
||||
// that RTP and fans it out to WHEP subscribers.
|
||||
type ConfigWebRTC struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
VideoPT uint8 `json:"video_pt"`
|
||||
AudioPT uint8 `json:"audio_pt"`
|
||||
ForceTranscode bool `json:"force_transcode"`
|
||||
Enabled bool `json:"enabled"`
|
||||
VideoPT uint8 `json:"video_pt"`
|
||||
AudioPT uint8 `json:"audio_pt"`
|
||||
ForceTranscode bool `json:"force_transcode"`
|
||||
|
||||
// VideoMap / AudioMap select which input stream the WebRTC RTP
|
||||
// legs draw from. Defaults are "0:v:0" and "0:a:0" — correct for
|
||||
// any RTMP / SRT publisher (single input, both A and V on input
|
||||
// 0). For multi-input pipelines (lavfi test sources, SDI capture
|
||||
// fed alongside file audio, etc.) the operator can override.
|
||||
VideoMap string `json:"video_map,omitempty"`
|
||||
AudioMap string `json:"audio_map,omitempty"`
|
||||
}
|
||||
|
||||
// Clone returns a deep copy of the WebRTC config (currently a value copy;
|
||||
|
|
|
|||
86
test/TESTING.md
Normal file
86
test/TESTING.md
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
# Testing the WebRTC egress path
|
||||
|
||||
## In-process (CI)
|
||||
|
||||
```sh
|
||||
go test -race -count=1 ./app/webrtc/... ./core/webrtc/...
|
||||
```
|
||||
|
||||
The integration tests under `app/webrtc/` allocate UDP ports on
|
||||
loopback, spin up an Echo handler, attach a Pion subscriber, and
|
||||
spray synthetic RTP into the registered Source. `TestIntegration_FiveViewerFanout`
|
||||
covers the 5-concurrent-viewer acceptance path from the M3 design.
|
||||
|
||||
## Manual / browser
|
||||
|
||||
`whep-player.html` is a self-contained WHEP subscriber a human can
|
||||
point at any live deploy. Open it directly in a browser:
|
||||
|
||||
```
|
||||
file:///path/to/datarhei-dragonfork-core/test/whep-player.html
|
||||
```
|
||||
|
||||
…or copy it onto a static host (no server-side dependency). It accepts
|
||||
the WHEP URL and an optional bearer token (the deploy uses Core's
|
||||
JWT, so paste an `access_token` from `POST /api/login`). It POSTs an
|
||||
SDP offer with a recvonly video + audio transceiver, applies the
|
||||
answer, and renders the stream in `<video>`. Stats panel shows ICE +
|
||||
PeerConnection states, the codec pulled from the answer SDP, and a
|
||||
1-Hz inbound-bitrate sample. Disconnect issues a WHEP `DELETE` on
|
||||
the resource URL the server returned in `Location`.
|
||||
|
||||
Shareable URL:
|
||||
|
||||
```
|
||||
file:///.../whep-player.html?url=http://10.0.0.25:8090/api/v3/whep/myStream&token=eyJhbGciOi...
|
||||
```
|
||||
|
||||
## Pion CLI helper
|
||||
|
||||
`test/whep-client/` is the same handshake in Go, useful for scripting
|
||||
or running on the same machine as Core for an apples-to-apples loopback
|
||||
test:
|
||||
|
||||
```sh
|
||||
cd test/whep-client
|
||||
go build -o /tmp/whep-client .
|
||||
/tmp/whep-client -url http://10.0.0.25:8090/api/v3/whep/myStream -token "$JWT" -timeout 15s
|
||||
```
|
||||
|
||||
Exits 0 once both video and audio tracks have received their first
|
||||
RTP packet. Used in the M2 deploy verification on TrueNAS.
|
||||
|
||||
## Latency p95 gate
|
||||
|
||||
Wired into CI via the `latency-gate` job in `.forgejo/workflows/test.yml`.
|
||||
Run locally:
|
||||
|
||||
```sh
|
||||
go test -tags latency -timeout 90s -race -count=1 \
|
||||
-run TestLatencyServerHop ./app/webrtc/...
|
||||
```
|
||||
|
||||
### What it measures
|
||||
|
||||
Server-hop latency from `corewebrtc.Source` ingest through Pion's
|
||||
DTLS-SRTP egress to a subscriber's `track.ReadRTP()`. The publisher
|
||||
embeds a wall-clock UnixNano timestamp in each RTP payload; the
|
||||
subscriber reads it on arrival and diffs.
|
||||
|
||||
### What it does NOT measure
|
||||
|
||||
True glass-to-glass latency would include FFmpeg encode and a real
|
||||
H.264 decoder on the subscriber side. The design (`webrtc-design.md`
|
||||
§7) calls for `drawtext`-burned frame counters + decode-side pixel
|
||||
sampling; implementing that in pure Go would require a cgo H.264
|
||||
decoder or an FFmpeg-as-sidecar pipe, neither of which pays off for
|
||||
the dominant CI question (*"did anybody regress the server hop?"*).
|
||||
Encode/decode latency is fixed by the codec stack — Core code changes
|
||||
won't move it.
|
||||
|
||||
### Threshold
|
||||
|
||||
`p95 < 50 ms` on the CI runner. Locally observed on a quiet host:
|
||||
`p50 ≈ 110 µs`, `p95 ≈ 240 µs`, `p99 ≈ 320 µs`. The 50ms gate is two
|
||||
orders of magnitude above that — generous, but a regression that
|
||||
crosses it indicates a genuine slowdown rather than runner noise.
|
||||
354
test/whep-player.html
Normal file
354
test/whep-player.html
Normal file
|
|
@ -0,0 +1,354 @@
|
|||
<!doctype html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<title>Dragon Fork — WHEP Player</title>
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||||
<style>
|
||||
:root {
|
||||
color-scheme: light dark;
|
||||
--fg: #e7e7ea;
|
||||
--bg: #0d0e12;
|
||||
--accent: #ff6633;
|
||||
--muted: #8b8e98;
|
||||
--good: #5dd29c;
|
||||
--warn: #ffb45e;
|
||||
--bad: #ff6470;
|
||||
--panel: #1a1c22;
|
||||
}
|
||||
* { box-sizing: border-box; }
|
||||
body {
|
||||
margin: 0;
|
||||
font: 14px/1.5 -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
|
||||
background: var(--bg);
|
||||
color: var(--fg);
|
||||
min-height: 100vh;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
}
|
||||
header {
|
||||
padding: 1.25rem 1.5rem;
|
||||
border-bottom: 1px solid #232530;
|
||||
display: flex;
|
||||
align-items: baseline;
|
||||
gap: 0.75rem;
|
||||
}
|
||||
header h1 {
|
||||
margin: 0;
|
||||
font-size: 1.05rem;
|
||||
letter-spacing: 0.02em;
|
||||
}
|
||||
header h1 .accent { color: var(--accent); }
|
||||
header .subtitle { color: var(--muted); font-size: 0.85rem; }
|
||||
|
||||
main {
|
||||
display: grid;
|
||||
grid-template-columns: 1fr;
|
||||
gap: 1rem;
|
||||
padding: 1.5rem;
|
||||
max-width: 1200px;
|
||||
width: 100%;
|
||||
margin: 0 auto;
|
||||
flex: 1;
|
||||
}
|
||||
@media (min-width: 900px) {
|
||||
main {
|
||||
grid-template-columns: 360px 1fr;
|
||||
align-items: start;
|
||||
}
|
||||
}
|
||||
|
||||
.panel {
|
||||
background: var(--panel);
|
||||
border-radius: 10px;
|
||||
padding: 1.25rem;
|
||||
}
|
||||
|
||||
label {
|
||||
display: block;
|
||||
margin-top: 0.75rem;
|
||||
color: var(--muted);
|
||||
font-size: 0.78rem;
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.06em;
|
||||
}
|
||||
input[type=text] {
|
||||
width: 100%;
|
||||
padding: 0.55rem 0.7rem;
|
||||
margin-top: 0.25rem;
|
||||
background: #0d0e12;
|
||||
border: 1px solid #2a2c36;
|
||||
border-radius: 6px;
|
||||
color: var(--fg);
|
||||
font: inherit;
|
||||
}
|
||||
input[type=text]:focus { border-color: var(--accent); outline: none; }
|
||||
|
||||
.actions {
|
||||
display: flex;
|
||||
gap: 0.5rem;
|
||||
margin-top: 1.25rem;
|
||||
}
|
||||
button {
|
||||
flex: 1;
|
||||
padding: 0.7rem 1rem;
|
||||
border: none;
|
||||
border-radius: 6px;
|
||||
background: var(--accent);
|
||||
color: #000;
|
||||
font-weight: 600;
|
||||
cursor: pointer;
|
||||
}
|
||||
button:disabled { opacity: 0.4; cursor: not-allowed; }
|
||||
button.secondary { background: #2a2c36; color: var(--fg); }
|
||||
|
||||
video {
|
||||
width: 100%;
|
||||
background: #000;
|
||||
border-radius: 10px;
|
||||
aspect-ratio: 16 / 9;
|
||||
}
|
||||
|
||||
.stats {
|
||||
display: grid;
|
||||
grid-template-columns: max-content 1fr;
|
||||
gap: 0.4rem 1rem;
|
||||
margin-top: 1rem;
|
||||
font-size: 0.85rem;
|
||||
}
|
||||
.stats .label { color: var(--muted); }
|
||||
.stats .value { font-variant-numeric: tabular-nums; }
|
||||
.pill {
|
||||
display: inline-block;
|
||||
padding: 0.1rem 0.55rem;
|
||||
border-radius: 999px;
|
||||
font-size: 0.75rem;
|
||||
background: #2a2c36;
|
||||
}
|
||||
.pill.good { background: rgba(93,210,156,0.18); color: var(--good); }
|
||||
.pill.warn { background: rgba(255,180,94,0.18); color: var(--warn); }
|
||||
.pill.bad { background: rgba(255,100,112,0.20); color: var(--bad); }
|
||||
|
||||
.log {
|
||||
margin-top: 1rem;
|
||||
max-height: 220px;
|
||||
overflow-y: auto;
|
||||
background: #0d0e12;
|
||||
padding: 0.6rem 0.8rem;
|
||||
border-radius: 6px;
|
||||
font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace;
|
||||
font-size: 0.78rem;
|
||||
line-height: 1.4;
|
||||
white-space: pre-wrap;
|
||||
word-break: break-word;
|
||||
}
|
||||
.log .ts { color: var(--muted); }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<header>
|
||||
<h1>Dragon Fork <span class="accent">WHEP</span></h1>
|
||||
<span class="subtitle">manual smoke test for the WebRTC egress path</span>
|
||||
</header>
|
||||
|
||||
<main>
|
||||
<section class="panel">
|
||||
<label for="whep-url">WHEP endpoint</label>
|
||||
<input id="whep-url" type="text" placeholder="http://10.0.0.25:8090/api/v3/whep/myStream"
|
||||
value="">
|
||||
<label for="bearer">JWT bearer token</label>
|
||||
<input id="bearer" type="text" placeholder="eyJhbGciOi…">
|
||||
|
||||
<div class="actions">
|
||||
<button id="btn-play">Subscribe</button>
|
||||
<button id="btn-stop" class="secondary" disabled>Disconnect</button>
|
||||
</div>
|
||||
|
||||
<div class="stats">
|
||||
<span class="label">ICE</span> <span id="stat-ice" class="value pill">idle</span>
|
||||
<span class="label">Connection</span> <span id="stat-conn" class="value pill">idle</span>
|
||||
<span class="label">Resource</span> <span id="stat-res" class="value">—</span>
|
||||
<span class="label">Video codec</span> <span id="stat-vcodec" class="value">—</span>
|
||||
<span class="label">Audio codec</span> <span id="stat-acodec" class="value">—</span>
|
||||
<span class="label">Inbound bitrate</span><span id="stat-bitrate" class="value">—</span>
|
||||
</div>
|
||||
|
||||
<div id="log" class="log" aria-live="polite"></div>
|
||||
</section>
|
||||
|
||||
<section class="panel" style="padding:0;background:#000;">
|
||||
<video id="video" controls autoplay playsinline muted></video>
|
||||
</section>
|
||||
</main>
|
||||
|
||||
<script>
|
||||
// --- tiny state -------------------------------------------------
|
||||
const $ = (id) => document.getElementById(id);
|
||||
const log = (line, level='info') => {
|
||||
const ts = new Date().toLocaleTimeString();
|
||||
const div = document.createElement('div');
|
||||
div.innerHTML = `<span class="ts">${ts}</span> <span class="lvl-${level}">${line}</span>`;
|
||||
$('log').prepend(div);
|
||||
};
|
||||
const setPill = (el, text, klass) => { el.textContent = text; el.className = 'value pill ' + klass; };
|
||||
|
||||
let pc = null;
|
||||
let resourceURL = null; // absolute or path; whichever the server returned
|
||||
let bitrateTimer = null;
|
||||
|
||||
// --- subscribe / disconnect -------------------------------------
|
||||
$('btn-play').addEventListener('click', subscribe);
|
||||
$('btn-stop').addEventListener('click', disconnect);
|
||||
|
||||
// Pre-populate WHEP endpoint from query string for shareable URLs
|
||||
// (e.g. file:///.../whep-player.html?url=http://.../whep/foo&token=…).
|
||||
(function bootstrap() {
|
||||
const q = new URLSearchParams(location.search);
|
||||
if (q.get('url')) $('whep-url').value = q.get('url');
|
||||
if (q.get('token')) $('bearer').value = q.get('token');
|
||||
})();
|
||||
|
||||
async function subscribe() {
|
||||
if (pc) { log('already connected; disconnect first', 'warn'); return; }
|
||||
const url = $('whep-url').value.trim();
|
||||
const token = $('bearer').value.trim();
|
||||
if (!url) { log('WHEP URL is required', 'bad'); return; }
|
||||
|
||||
$('btn-play').disabled = true;
|
||||
$('btn-stop').disabled = false;
|
||||
setPill($('stat-ice'), 'gathering', 'warn');
|
||||
setPill($('stat-conn'), 'connecting', 'warn');
|
||||
|
||||
pc = new RTCPeerConnection({
|
||||
// No ICE servers: production deploy advertises NAT1To1 host
|
||||
// candidates, which work over the LAN. Add stun:/turn: here
|
||||
// if you're testing across NAT.
|
||||
iceServers: [],
|
||||
});
|
||||
|
||||
pc.ontrack = (evt) => {
|
||||
log(`ontrack: kind=${evt.track.kind}`, 'info');
|
||||
// Both tracks share the same MediaStream; attach once.
|
||||
if ($('video').srcObject !== evt.streams[0]) {
|
||||
$('video').srcObject = evt.streams[0];
|
||||
}
|
||||
};
|
||||
pc.oniceconnectionstatechange = () => {
|
||||
const s = pc.iceConnectionState;
|
||||
let klass = 'warn';
|
||||
if (s === 'connected' || s === 'completed') klass = 'good';
|
||||
else if (s === 'failed' || s === 'disconnected' || s === 'closed') klass = 'bad';
|
||||
setPill($('stat-ice'), s, klass);
|
||||
log(`ICE state: ${s}`);
|
||||
};
|
||||
pc.onconnectionstatechange = () => {
|
||||
const s = pc.connectionState;
|
||||
let klass = 'warn';
|
||||
if (s === 'connected') klass = 'good';
|
||||
else if (s === 'failed' || s === 'disconnected' || s === 'closed') klass = 'bad';
|
||||
setPill($('stat-conn'), s, klass);
|
||||
log(`PC state: ${s}`);
|
||||
};
|
||||
|
||||
pc.addTransceiver('video', { direction: 'recvonly' });
|
||||
pc.addTransceiver('audio', { direction: 'recvonly' });
|
||||
|
||||
try {
|
||||
const offer = await pc.createOffer();
|
||||
await pc.setLocalDescription(offer);
|
||||
// Wait for ICE gathering to complete so the offer is non-trickle.
|
||||
await new Promise((res) => {
|
||||
if (pc.iceGatheringState === 'complete') return res();
|
||||
pc.addEventListener('icegatheringstatechange', () => {
|
||||
if (pc.iceGatheringState === 'complete') res();
|
||||
});
|
||||
});
|
||||
|
||||
const headers = { 'Content-Type': 'application/sdp' };
|
||||
if (token) headers['Authorization'] = 'Bearer ' + token;
|
||||
const resp = await fetch(url, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: pc.localDescription.sdp,
|
||||
});
|
||||
if (!resp.ok) {
|
||||
const body = await resp.text();
|
||||
throw new Error(`WHEP POST ${resp.status}: ${body || resp.statusText}`);
|
||||
}
|
||||
// Per WHEP spec: server returns SDP answer; Location is the resource.
|
||||
const loc = resp.headers.get('Location');
|
||||
if (loc) {
|
||||
// Resolve relative Location against the WHEP URL.
|
||||
try { resourceURL = new URL(loc, url).toString(); }
|
||||
catch { resourceURL = loc; }
|
||||
$('stat-res').textContent = resourceURL;
|
||||
}
|
||||
const answer = await resp.text();
|
||||
await pc.setRemoteDescription({ type: 'answer', sdp: answer });
|
||||
log(`subscribed (${resp.status})`, 'good');
|
||||
|
||||
// Pull codec info out of the SDP for a quick UI hint.
|
||||
const codec = (kind, sdp) => {
|
||||
const m = new RegExp(`m=${kind}[^\r\n]*[\r\n](?:[abc][^\r\n]*[\r\n]){0,30}?a=rtpmap:\\d+ ([^/\r\n]+)`).exec(sdp);
|
||||
return m ? m[1] : '?';
|
||||
};
|
||||
$('stat-vcodec').textContent = codec('video', answer);
|
||||
$('stat-acodec').textContent = codec('audio', answer);
|
||||
|
||||
bitrateTimer = setInterval(updateBitrate, 1000);
|
||||
} catch (err) {
|
||||
log(`error: ${err.message}`, 'bad');
|
||||
await disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
async function disconnect() {
|
||||
if (bitrateTimer) { clearInterval(bitrateTimer); bitrateTimer = null; }
|
||||
$('btn-play').disabled = false;
|
||||
$('btn-stop').disabled = true;
|
||||
|
||||
// WHEP: best-effort DELETE on the resource URL the server gave us.
|
||||
if (resourceURL) {
|
||||
try {
|
||||
const headers = {};
|
||||
const token = $('bearer').value.trim();
|
||||
if (token) headers['Authorization'] = 'Bearer ' + token;
|
||||
const r = await fetch(resourceURL, { method: 'DELETE', headers });
|
||||
log(`DELETE ${r.status}`, r.ok ? 'good' : 'warn');
|
||||
} catch (e) {
|
||||
log(`DELETE failed: ${e.message}`, 'warn');
|
||||
}
|
||||
resourceURL = null;
|
||||
}
|
||||
|
||||
if (pc) { pc.close(); pc = null; }
|
||||
$('video').srcObject = null;
|
||||
setPill($('stat-ice'), 'idle', '');
|
||||
setPill($('stat-conn'), 'idle', '');
|
||||
$('stat-res').textContent = '—';
|
||||
$('stat-vcodec').textContent = '—';
|
||||
$('stat-acodec').textContent = '—';
|
||||
$('stat-bitrate').textContent = '—';
|
||||
}
|
||||
|
||||
// --- bitrate sampling -------------------------------------------
|
||||
let lastBytes = null;
|
||||
let lastTs = null;
|
||||
async function updateBitrate() {
|
||||
if (!pc || pc.connectionState !== 'connected') return;
|
||||
const stats = await pc.getStats();
|
||||
let bytes = 0;
|
||||
stats.forEach((r) => {
|
||||
if (r.type === 'inbound-rtp' && !r.isRemote) bytes += r.bytesReceived || 0;
|
||||
});
|
||||
const now = performance.now();
|
||||
if (lastBytes !== null) {
|
||||
const kbps = ((bytes - lastBytes) * 8) / ((now - lastTs) || 1);
|
||||
$('stat-bitrate').textContent = kbps.toFixed(0) + ' kbps';
|
||||
}
|
||||
lastBytes = bytes;
|
||||
lastTs = now;
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
Loading…
Reference in a new issue