2026-04-07 21:58:30 -04:00
import express from 'express' ;
2026-05-17 18:39:09 -04:00
import { execSync , spawn } from 'child_process' ;
2026-05-28 19:12:40 -04:00
import { existsSync , readdirSync } from 'node:fs' ;
2026-04-07 21:58:30 -04:00
import captureManager from '../capture-manager.js' ;
2026-05-18 07:57:48 -04:00
import dgram from 'dgram' ;
import net from 'net' ;
function parseUrl ( u ) {
try {
const m = String ( u ) . match ( /^[a-z]+:\/\/([^:\/?#]+)(?::(\d+))?/i ) ;
if ( ! m ) return null ;
return { host : m [ 1 ] , port : parseInt ( m [ 2 ] || '0' , 10 ) } ;
} catch ( _ ) { return null ; }
}
async function checkReachable ( host , port , sourceType ) {
if ( ! port ) return { ok : true } ;
if ( sourceType === 'srt' ) return await udpSendProbe ( host , port ) ;
if ( sourceType === 'rtmp' ) return await tcpConnectProbe ( host , port ) ;
return { ok : true } ;
}
function udpSendProbe ( host , port ) {
return new Promise ( ( resolve ) => {
const sock = dgram . createSocket ( 'udp4' ) ;
let done = false ;
const finish = ( result ) => { if ( done ) return ; done = true ; try { sock . close ( ) ; } catch ( _ ) { } resolve ( result ) ; } ;
sock . on ( 'error' , ( err ) => {
const msg = String ( err && err . message || err ) ;
if ( /EHOSTUNREACH|ENETUNREACH/i . test ( msg ) ) {
finish ( { ok : false , error : 'Host ' + host + ' is unreachable from the capture container (no route). Confirm the IP is correct and the machine is online.' , diagnostic : msg } ) ;
} else if ( /ECONNREFUSED|EPORTUNREACH/i . test ( msg ) ) {
finish ( { ok : false , error : 'Nothing is listening on UDP ' + host + ':' + port + '. In vMix, confirm the SRT output is Started and the port matches.' , diagnostic : msg } ) ;
} else {
finish ( { ok : false , error : 'UDP probe failed: ' + msg , diagnostic : msg } ) ;
}
} ) ;
sock . send ( Buffer . from ( 'Z-AMPP-PROBE' ) , port , host , ( ) => { } ) ;
setTimeout ( ( ) => finish ( { ok : true } ) , 1500 ) ;
} ) ;
}
function tcpConnectProbe ( host , port ) {
return new Promise ( ( resolve ) => {
const sock = new net . Socket ( ) ;
let done = false ;
const finish = ( r ) => { if ( done ) return ; done = true ; try { sock . destroy ( ) ; } catch ( _ ) { } resolve ( r ) ; } ;
sock . setTimeout ( 2500 ) ;
sock . once ( 'connect' , ( ) => finish ( { ok : true } ) ) ;
sock . once ( 'timeout' , ( ) => finish ( { ok : false , error : 'TCP connect to ' + host + ':' + port + ' timed out. Confirm the host is reachable and a TCP listener is running.' } ) ) ;
sock . once ( 'error' , ( err ) => {
const msg = String ( err && err . message || err ) ;
if ( /EHOSTUNREACH|ENETUNREACH/i . test ( msg ) ) finish ( { ok : false , error : 'Host ' + host + ' unreachable (no route).' , diagnostic : msg } ) ;
else if ( /ECONNREFUSED/i . test ( msg ) ) finish ( { ok : false , error : 'Nothing is listening on TCP ' + host + ':' + port + '.' , diagnostic : msg } ) ;
else finish ( { ok : false , error : 'TCP probe failed: ' + msg , diagnostic : msg } ) ;
} ) ;
sock . connect ( port , host ) ;
} ) ;
}
function classifyProbeError ( raw , sourceType ) {
const r = ( raw || '' ) . toLowerCase ( ) ;
if ( sourceType === 'srt' ) {
if ( /connection .* failed: (input\/output|timer expired|connection setup failure)/i . test ( raw ) ) {
return 'SRT handshake failed. In vMix: confirm the External Output is Started, Type=SRT, Mode=Listener, port matches, and any passphrase / stream ID is empty (or copied exactly).' ;
}
}
if ( sourceType === 'rtmp' ) {
if ( /connection refused/i . test ( r ) ) return 'Nothing is listening on RTMP at this address. Start your RTMP source.' ;
if ( /end-of-file|invalid data found/i . test ( r ) ) return 'Got a TCP connection but no RTMP stream. Confirm the source is publishing and the path / stream-key match.' ;
}
return raw ;
}
2026-04-07 21:58:30 -04:00
const router = express . Router ( ) ;
const MAM _API _URL = process . env . MAM _API _URL || 'http://mam-api:3000' ;
/ * *
* GET / devices
* List available DeckLink devices
* /
router . get ( '/devices' , ( req , res ) => {
try {
const devices = [ ] ;
let output = '' ;
try {
2026-05-21 17:17:31 -04:00
output = execSync ( 'ffmpeg -sources decklink 2>&1' , {
2026-04-07 21:58:30 -04:00
encoding : 'utf-8' ,
} ) ;
} catch ( error ) {
// ffmpeg returns non-zero, but stderr is still captured
output = error . stderr ? error . stderr . toString ( ) : error . toString ( ) ;
}
2026-05-28 18:25:56 -04:00
// Parse ffmpeg output for DeckLink device names.
2026-05-28 18:36:06 -04:00
// DeckLink source lines: " 81:76669a80:00000000 [DeckLink Duo (1)] (none)"
2026-04-07 21:58:30 -04:00
const lines = output . split ( '\n' ) ;
let deviceIndex = 0 ;
for ( const line of lines ) {
2026-05-28 18:36:06 -04:00
const match = line . match ( /^\s+[0-9a-f:]+\s+\[([^\]]+)\]/ ) ;
if ( match ) {
2026-04-07 21:58:30 -04:00
devices . push ( {
index : deviceIndex ,
name : match [ 1 ] ,
} ) ;
deviceIndex ++ ;
}
}
res . json ( { devices } ) ;
} catch ( error ) {
console . error ( 'Error listing devices:' , error ) ;
res . status ( 500 ) . json ( { error : 'Failed to list devices' } ) ;
}
} ) ;
2026-05-28 19:12:40 -04:00
/ * *
* GET / devices / deltacast
* List available Deltacast ports .
* Reads / dev / deltacast < N > nodes ; falls back to env DELTACAST _PORT _COUNT
* so nodes without hardware still report their configured port count
* ( test - card mode ) .
* /
router . get ( '/devices/deltacast' , ( req , res ) => {
try {
const devices = [ ] ;
// First: enumerate actual /dev/deltacast* device nodes.
try {
const devEntries = readdirSync ( '/dev' ) . filter ( n => / ^ deltacast \ d + $ / . test ( n ) ) ;
devEntries . sort ( ) ;
for ( const entry of devEntries ) {
const m = entry . match ( /^deltacast(\d+)$/ ) ;
if ( m ) {
devices . push ( {
index : parseInt ( m [ 1 ] , 10 ) ,
name : ` Deltacast Port ${ m [ 1 ] } ` ,
device : ` /dev/ ${ entry } ` ,
present : true ,
} ) ;
}
}
} catch ( _ ) { /* /dev always exists; ignore */ }
// Second: if DELTACAST_PORT_COUNT env is set and larger than what we found,
// fill in the remaining slots as test-card entries (no physical device).
const envCount = parseInt ( process . env . DELTACAST _PORT _COUNT || '0' , 10 ) ;
const found = new Set ( devices . map ( d => d . index ) ) ;
for ( let i = 0 ; i < envCount ; i ++ ) {
if ( ! found . has ( i ) ) {
devices . push ( {
index : i ,
name : ` Deltacast Port ${ i } (test card) ` ,
device : ` /dev/deltacast ${ i } ` ,
present : false ,
} ) ;
}
}
devices . sort ( ( a , b ) => a . index - b . index ) ;
res . json ( { devices } ) ;
} catch ( error ) {
console . error ( 'Error listing Deltacast devices:' , error ) ;
res . status ( 500 ) . json ( { error : 'Failed to list Deltacast devices' } ) ;
}
} ) ;
2026-04-07 21:58:30 -04:00
/ * *
* GET / status
* Get current capture status
* /
router . get ( '/status' , ( req , res ) => {
try {
const status = captureManager . getStatus ( ) ;
res . json ( status ) ;
} catch ( error ) {
console . error ( 'Error getting status:' , error ) ;
res . status ( 500 ) . json ( { error : 'Failed to get status' } ) ;
}
} ) ;
2026-05-17 18:39:09 -04:00
router . post ( '/probe' , async ( req , res ) => {
try {
2026-05-18 07:57:48 -04:00
const { source _type = 'sdi' , source _url , listen = false } = req . body || { } ;
2026-05-17 18:39:09 -04:00
if ( source _type === 'sdi' ) {
try {
2026-05-21 17:17:31 -04:00
const raw = execSync ( 'ffmpeg -hide_banner -sources decklink 2>&1' , { encoding : 'utf-8' , timeout : 5000 } ) ;
2026-05-17 18:39:09 -04:00
const devices = [ ] ;
for ( const line of raw . split ( '\n' ) ) {
2026-05-28 18:36:06 -04:00
const m = line . match ( /^\s+[0-9a-f:]+\s+\[([^\]]+)\]/ ) ;
if ( m ) devices . push ( m [ 1 ] ) ;
2026-05-17 18:39:09 -04:00
}
return res . json ( { ok : true , source _type , devices } ) ;
} catch ( err ) {
const out = ( err . stderr || err . stdout || err . toString ( ) ) . toString ( ) ;
return res . json ( { ok : false , source _type , error : out . slice ( 0 , 800 ) } ) ;
}
}
2026-05-28 19:12:40 -04:00
if ( source _type === 'deltacast' ) {
// Enumerate /dev/deltacast* nodes; report present/absent per index.
try {
const envCount = parseInt ( process . env . DELTACAST _PORT _COUNT || '0' , 10 ) ;
const devEntries = readdirSync ( '/dev' ) . filter ( n => / ^ deltacast \ d + $ / . test ( n ) ) . sort ( ) ;
const found = devEntries . map ( n => {
const m = n . match ( /^deltacast(\d+)$/ ) ;
return { index : parseInt ( m [ 1 ] , 10 ) , device : ` /dev/ ${ n } ` , present : true } ;
} ) ;
const foundIdx = new Set ( found . map ( d => d . index ) ) ;
for ( let i = 0 ; i < envCount ; i ++ ) {
if ( ! foundIdx . has ( i ) ) {
found . push ( { index : i , device : ` /dev/deltacast ${ i } ` , present : false } ) ;
}
}
found . sort ( ( a , b ) => a . index - b . index ) ;
return res . json ( { ok : true , source _type , devices : found } ) ;
} catch ( err ) {
return res . json ( { ok : false , source _type , error : err . message } ) ;
}
}
2026-05-17 18:39:09 -04:00
if ( listen ) {
return res . json ( { ok : false , source _type , error : 'Listener-mode sources cannot be probed standalone. Start the recorder and watch the signal indicator.' } ) ;
}
if ( ! source _url ) return res . status ( 400 ) . json ( { error : 'source_url is required' } ) ;
2026-05-18 07:57:48 -04:00
// Pre-flight: parse host:port and check L3/L4 reachability so we can give
// an actionable error instead of the opaque libsrt "Input/output error".
const parsed = parseUrl ( source _url ) ;
if ( ! parsed ) {
return res . json ( { ok : false , source _type , source _url , error : 'Could not parse host:port from URL.' } ) ;
}
const reach = await checkReachable ( parsed . host , parsed . port , source _type ) ;
if ( ! reach . ok ) {
return res . json ( { ok : false , source _type , source _url , error : reach . error , diagnostic : reach . diagnostic } ) ;
}
2026-05-17 18:39:09 -04:00
let url = source _url ;
if ( source _type === 'srt' && ! /mode=/ . test ( url ) ) {
url += ( url . includes ( '?' ) ? '&' : '?' ) + 'mode=caller' ;
}
const args = [ '-hide_banner' , '-v' , 'error' , '-probesize' , '32M' , '-analyzeduration' , '8M' , '-rw_timeout' , '7000000' , '-i' , url , '-show_streams' , '-show_format' , '-of' , 'json' ] ;
const ff = spawn ( 'ffprobe' , args ) ;
let stdout = '' , stderr = '' ;
ff . stdout . on ( 'data' , ( c ) => { stdout += c ; } ) ;
ff . stderr . on ( 'data' , ( c ) => { stderr += c ; } ) ;
const killer = setTimeout ( ( ) => { try { ff . kill ( 'SIGKILL' ) ; } catch ( _ ) { } } , 10000 ) ;
ff . on ( 'close' , ( code ) => {
clearTimeout ( killer ) ;
if ( code !== 0 ) {
2026-05-18 07:57:48 -04:00
const rawErr = ( stderr || 'ffprobe failed' ) . slice ( 0 , 800 ) ;
const friendly = classifyProbeError ( rawErr , source _type ) ;
return res . json ( { ok : false , source _type , source _url , error : friendly , diagnostic : rawErr } ) ;
2026-05-17 18:39:09 -04:00
}
try {
const parsed = JSON . parse ( stdout ) ;
const streams = ( parsed . streams || [ ] ) . map ( s => ( {
index : s . index , codec _type : s . codec _type , codec _name : s . codec _name ,
width : s . width , height : s . height , pix _fmt : s . pix _fmt ,
r _frame _rate : s . r _frame _rate , avg _frame _rate : s . avg _frame _rate ,
sample _rate : s . sample _rate , channels : s . channels ,
channel _layout : s . channel _layout , bit _rate : s . bit _rate ,
} ) ) ;
return res . json ( { ok : true , source _type , source _url ,
format : { format _name : parsed . format && parsed . format . format _name , duration : parsed . format && parsed . format . duration , bit _rate : parsed . format && parsed . format . bit _rate } ,
streams } ) ;
} catch ( err ) {
return res . json ( { ok : false , source _type , source _url , error : 'Could not parse ffprobe output: ' + err . message } ) ;
}
} ) ;
} catch ( error ) {
console . error ( 'Probe error:' , error ) ;
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
2026-04-07 21:58:30 -04:00
/ * *
* POST / start
* Start a new capture session
2026-05-16 08:20:10 -04:00
*
* Body ( SDI ) :
* { project _id , clip _name , device , bin _id ? , source _type ? }
*
* Body ( SRT / RTMP caller ) :
* { project _id , clip _name , source _type , source _url , bin _id ? }
*
* Body ( SRT / RTMP listener ) :
* { project _id , clip _name , source _type , listen : true , listen _port ? , stream _key ? , bin _id ? }
2026-04-07 21:58:30 -04:00
* /
router . post ( '/start' , async ( req , res ) => {
try {
2026-05-16 08:20:10 -04:00
const {
project _id ,
bin _id ,
clip _name ,
device ,
source _type = 'sdi' ,
source _url ,
listen = false ,
listen _port ,
stream _key ,
} = req . body ;
if ( ! project _id || ! clip _name ) {
return res . status ( 400 ) . json ( {
error : 'Missing required fields: project_id, clip_name' ,
} ) ;
}
2026-04-07 21:58:30 -04:00
2026-05-16 08:20:10 -04:00
// Source-specific validation
if ( source _type === 'sdi' ) {
if ( device === undefined || device === null ) {
return res . status ( 400 ) . json ( { error : 'SDI source requires: device' } ) ;
}
} else if ( source _type === 'srt' || source _type === 'rtmp' ) {
if ( ! listen && ! source _url ) {
return res . status ( 400 ) . json ( {
error : ` ${ source _type . toUpperCase ( ) } caller mode requires: source_url ` ,
} ) ;
}
} else {
2026-04-07 21:58:30 -04:00
return res . status ( 400 ) . json ( {
2026-05-16 08:20:10 -04:00
error : ` Unknown source_type: ${ source _type } . Must be sdi, srt, or rtmp ` ,
2026-04-07 21:58:30 -04:00
} ) ;
}
const session = await captureManager . start ( {
projectId : project _id ,
2026-05-16 00:30:25 -04:00
binId : bin _id || null ,
2026-04-07 21:58:30 -04:00
clipName : clip _name ,
device ,
2026-05-16 08:20:10 -04:00
sourceType : source _type ,
sourceUrl : source _url ,
listen ,
listenPort : listen _port ,
streamKey : stream _key ,
2026-04-07 21:58:30 -04:00
} ) ;
res . json ( session ) ;
} catch ( error ) {
console . error ( 'Error starting capture:' , error ) ;
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
/ * *
* POST / stop
* Stop the current capture session
* Body : { session _id }
* /
router . post ( '/stop' , async ( req , res ) => {
try {
const { session _id } = req . body ;
if ( ! session _id ) {
return res . status ( 400 ) . json ( { error : 'Missing required field: session_id' } ) ;
}
const completedSession = await captureManager . stop ( session _id ) ;
2026-05-16 08:20:10 -04:00
// Register asset with mam-api.
// If proxyKey is null (SRT/RTMP source), set needsProxy=true so the
// worker generates a proxy from the hires file asynchronously.
2026-04-07 21:58:30 -04:00
try {
const mamResponse = await fetch ( ` ${ MAM _API _URL } /api/v1/assets ` , {
method : 'POST' ,
headers : { 'Content-Type' : 'application/json' } ,
body : JSON . stringify ( {
projectId : completedSession . projectId ,
binId : completedSession . binId ,
clipName : completedSession . clipName ,
2026-05-16 08:20:10 -04:00
sourceType : completedSession . sourceType ,
2026-04-07 21:58:30 -04:00
hiresKey : completedSession . hiresKey ,
proxyKey : completedSession . proxyKey ,
2026-05-16 08:20:10 -04:00
needsProxy : completedSession . proxyKey === null ,
2026-04-07 21:58:30 -04:00
duration : completedSession . duration ,
capturedAt : completedSession . startedAt ,
} ) ,
} ) ;
if ( ! mamResponse . ok ) {
console . warn (
` MAM API registration returned ${ mamResponse . status } : ${ await mamResponse . text ( ) } ` ,
) ;
}
} catch ( mamError ) {
console . warn ( 'Failed to register asset with MAM API:' , mamError . message ) ;
}
res . json ( completedSession ) ;
} catch ( error ) {
console . error ( 'Error stopping capture:' , error ) ;
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
export default router ;