import express from 'express'; import http from 'http'; import fs from 'fs'; import net from 'net'; import dgram from 'dgram'; import pool from '../db/pool.js'; import { getS3Bucket } from '../s3/client.js'; import { validateUuid } from '../middleware/errors.js'; import { v4 as uuidv4 } from 'uuid'; const router = express.Router(); router.param('id', (req, res, next) => validateUuid('id')(req, res, next)); // Base port for on-demand SDI sidecar containers on remote worker nodes. // Device index 0 → 7438, index 1 → 7439, etc. const SIDECAR_PORT_BASE = 7438; // Docker API helper function function dockerApi(method, path, body = null) { return new Promise((resolve, reject) => { const options = { socketPath: '/var/run/docker.sock', path: `/v1.43${path}`, method, headers: { 'Content-Type': 'application/json' }, }; const req = http.request(options, (res) => { let data = ''; res.on('data', chunk => data += chunk); res.on('end', () => { try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); } catch { resolve({ status: res.statusCode, data }); } }); }); req.on('error', reject); // Add 10-second timeout to prevent indefinite hangs if Docker daemon is unresponsive req.setTimeout(10000, () => { req.destroy(new Error('Docker API timeout after 10s')); }); if (body) req.write(JSON.stringify(body)); req.end(); }); } // Look up the cluster node for a recorder and decide if it is remote. // Returns { remote: false } when the node is local or unset; // { remote: true, apiUrl, ip } when it is a different host. async function resolveNodeTarget(nodeId) { if (!nodeId) return { remote: false }; const r = await pool.query( 'SELECT hostname, ip_address, api_url FROM cluster_nodes WHERE id = $1', [nodeId] ); if (r.rows.length === 0) return { remote: false }; const node = r.rows[0]; const localHostname = process.env.NODE_HOSTNAME || ''; if (!node.api_url || node.hostname === localHostname) return { remote: false }; return { remote: true, apiUrl: node.api_url, ip: node.ip_address }; } // Helper function to generate clip name with timestamp function generateClipName(recorderName) { const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); const day = String(now.getDate()).padStart(2, '0'); const hours = String(now.getHours()).padStart(2, '0'); const minutes = String(now.getMinutes()).padStart(2, '0'); const seconds = String(now.getSeconds()).padStart(2, '0'); // Strip filesystem-hostile characters out of the recorder name (spaces // become underscores, anything outside [A-Za-z0-9._-] is dropped) so the // clipName flows cleanly through S3 keys, SMB paths, and ffmpeg args. const safe = String(recorderName || 'rec') .replace(/\s+/g, '_') .replace(/[^A-Za-z0-9._-]/g, '') .slice(0, 40) || 'rec'; return `${safe}_${year}${month}${day}_${hours}${minutes}${seconds}`; } // Sanitize an operator-provided clip name so it's safe as both an S3 key // segment and an SMB/POSIX filename. Allow letters, digits, dot, dash, // underscore, and spaces; collapse runs of whitespace; cap at 80 chars. function sanitizeClipName(raw) { if (typeof raw !== 'string') return null; const cleaned = raw .replace(/[^A-Za-z0-9._\- ]+/g, '') .replace(/\s+/g, ' ') .trim() .slice(0, 80); return cleaned.length > 0 ? cleaned : null; } /** * Build Docker PortBindings and ExposedPorts for listener-mode recorders. */ function buildPortConfig(sourceType, sourceConfig) { const portBindings = {}; const exposedPorts = {}; if (sourceConfig && sourceConfig.mode === 'listener') { if (sourceType === 'srt') { const port = String(sourceConfig.listen_port || 9000); const proto = `${port}/udp`; portBindings[proto] = [{ HostPort: port }]; exposedPorts[proto] = {}; } else if (sourceType === 'rtmp') { const port = String(sourceConfig.listen_port || 1935); const proto = `${port}/tcp`; portBindings[proto] = [{ HostPort: port }]; exposedPorts[proto] = {}; } } return { portBindings, exposedPorts }; } // Whitelist of recorder columns the API accepts on POST/PATCH. Keeping it // explicit prevents accidental writes to status / container_id / timestamps. const RECORDER_FIELDS = [ 'name', 'source_type', 'source_config', 'recording_codec', 'recording_resolution', 'recording_video_bitrate', 'recording_framerate', 'recording_audio_codec', 'recording_audio_bitrate', 'recording_audio_channels', 'recording_container', 'proxy_enabled', 'proxy_codec', 'proxy_resolution', 'proxy_video_bitrate', 'proxy_framerate', 'proxy_audio_codec', 'proxy_audio_bitrate', 'proxy_audio_channels', 'proxy_container', 'project_id', 'node_id', 'device_index', ]; function pickRecorderFields(body) { const out = {}; for (const k of RECORDER_FIELDS) { if (body[k] !== undefined) out[k] = body[k]; } return out; } // GET / - List all recorders // // Issue #121 — previous version fired N PG queries + N Docker inspects per // list call. Now we resolve `live_asset_id` for every recording row in a // single LATERAL JOIN, and the Docker `started_at` lookups are bounded by // the number of currently-recording rows (typically <10) and run in // parallel with a per-call timeout from `dockerApi`. router.get('/', async (req, res, next) => { try { const result = await pool.query(` SELECT r.*, la.live_asset_id FROM recorders r LEFT JOIN LATERAL ( SELECT a.id AS live_asset_id FROM assets a WHERE r.status = 'recording' AND a.project_id = r.project_id AND a.display_name = r.current_session_id AND a.status = 'live' ORDER BY a.created_at DESC LIMIT 1 ) la ON TRUE ORDER BY r.created_at DESC `); const rows = result.rows; // Only inspect containers for recorders that actually claim to be recording. const inspectable = rows.filter(r => r.status === 'recording' && r.container_id); await Promise.all(inspectable.map(async (r) => { try { const insp = await dockerApi('GET', `/containers/${r.container_id}/json`); if (insp.status === 200 && insp.data && insp.data.State) { r.started_at = insp.data.State.StartedAt; } } catch (_) { /* leave started_at undefined */ } })); res.json(rows); } catch (err) { next(err); } }); // POST / - Create a new recorder router.post('/', async (req, res, next) => { try { const fields = pickRecorderFields(req.body); if (!fields.name || !fields.source_type) { return res .status(400) .json({ error: 'Name and source_type are required' }); } // Defaults — written on insert so the DB row is always self-contained. const defaults = { source_config: {}, recording_codec: 'hevc_nvenc', recording_resolution: 'native', recording_audio_codec: 'pcm_s24le', recording_audio_channels: 2, recording_container: 'mov', proxy_enabled: true, proxy_codec: 'h264', proxy_resolution: '1920x1080', proxy_video_bitrate: '2M', proxy_audio_codec: 'aac', proxy_audio_bitrate: '128k', proxy_audio_channels: 2, proxy_container: 'mp4', }; const row = { id: uuidv4(), status: 'stopped', ...defaults, ...fields }; // Build INSERT dynamically so adding columns later means one place to update. const cols = Object.keys(row); const placeholders = cols.map((_, i) => `$${i + 1}`).join(', '); const values = cols.map(k => { const v = row[k]; if (k === 'source_config') return v && typeof v === 'object' ? v : {}; return v; }); const result = await pool.query( `INSERT INTO recorders (${cols.join(', ')}, created_at, updated_at) VALUES (${placeholders}, NOW(), NOW()) RETURNING *`, values ); res.status(201).json(result.rows[0]); } catch (err) { next(err); } }); // GET /:id - Get single recorder router.get('/:id', async (req, res, next) => { try { const { id } = req.params; const result = await pool.query( 'SELECT * FROM recorders WHERE id = $1', [id] ); if (result.rows.length === 0) { return res.status(404).json({ error: 'Recorder not found' }); } res.json(result.rows[0]); } catch (err) { next(err); } }); // PATCH /:id - Edit recorder settings // Blocked while recorder is actively recording to prevent config drift. router.patch('/:id', async (req, res, next) => { try { const { id } = req.params; const recorderResult = await pool.query( 'SELECT * FROM recorders WHERE id = $1', [id] ); if (recorderResult.rows.length === 0) { return res.status(404).json({ error: 'Recorder not found' }); } const recorder = recorderResult.rows[0]; if (recorder.status === 'recording') { return res.status(409).json({ error: 'Cannot edit a recorder while it is recording — stop it first' }); } const fields = pickRecorderFields(req.body); const cols = Object.keys(fields); if (cols.length === 0) { return res.status(400).json({ error: 'No fields to update' }); } const setClause = cols.map((k, i) => `${k} = $${i + 1}`).join(', '); const params = cols.map(k => fields[k]); params.push(id); const result = await pool.query( `UPDATE recorders SET ${setClause}, updated_at = NOW() WHERE id = $${params.length} RETURNING *`, params ); res.json(result.rows[0]); } catch (err) { next(err); } }); // POST /:id/start - Start recording router.post('/:id/start', async (req, res, next) => { try { const { id } = req.params; const recorderResult = await pool.query( 'SELECT * FROM recorders WHERE id = $1', [id] ); if (recorderResult.rows.length === 0) { return res.status(404).json({ error: 'Recorder not found' }); } const recorder = recorderResult.rows[0]; if (recorder.status === 'recording') { return res.status(400).json({ error: 'Recorder is already recording' }); } const s3Endpoint = process.env.S3_ENDPOINT; const s3Bucket = getS3Bucket(); // Use live config, not stale env snapshot (#61) const s3AccessKey = process.env.S3_ACCESS_KEY; const s3SecretKey = process.env.S3_SECRET_KEY; const mamApiUrl = process.env.MAM_API_URL || 'http://mam-api:3000'; const externalMamApiUrl = `http://${process.env.NODE_IP || '172.18.91.200'}:${process.env.PORT_MAM_API || 47432}`; const dockerNetwork = process.env.DOCKER_NETWORK || 'wild-dragon_wild-dragon'; // Growing-files mode is a global setting (settings table). When on, the // capture container writes the master to its /growing/ mount instead of // streaming it to S3 — Premiere can mount the SMB share and edit it live. const growingRow = await pool.query( `SELECT value FROM settings WHERE key = 'growing_enabled'` ); const growingEnabled = growingRow.rows[0]?.value === 'true' || growingRow.rows[0]?.value === true; // Operator-supplied clip name wins over the auto-timestamped fallback. // The Recorders UI passes this on the start request when the user types // something into the "Clip name" field; otherwise it's blank and we // generate `_` as before. const customClipName = sanitizeClipName(req.body && req.body.clipName); const clipName = customClipName || generateClipName(recorder.name); // Per-take project override: the Recorders UI can pass projectId on the // start request to send clips to a different project than the recorder's // default. Falls back to the recorder's configured project_id. const takeProjectId = (req.body && req.body.projectId && typeof req.body.projectId === 'string') ? req.body.projectId : recorder.project_id; // live-asset: create the asset row right now (status='live') so the // library shows the recording while it is happening. const assetIdLive = uuidv4(); try { const ext = recorder.recording_container || 'mov'; await pool.query( `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, created_at, updated_at ) VALUES ($1, $2, NULL, $3, $3, 'live', 'video', $4, NOW(), NOW())`, [assetIdLive, takeProjectId, clipName, `projects/${takeProjectId}/masters/${clipName}.${ext}`] ); } catch (e) { console.warn('[recorders] could not pre-create live asset:', e.message); } const sourceConfig = recorder.source_config || {}; const isListener = sourceConfig.mode === 'listener'; const sourceType = recorder.source_type; const deviceIndex = recorder.device_index ?? sourceConfig.device ?? 0; // Build container env — all codec controls flow through here. const env = [ `S3_ENDPOINT=${s3Endpoint}`, `S3_BUCKET=${s3Bucket}`, `S3_ACCESS_KEY=${s3AccessKey}`, `S3_SECRET_KEY=${s3SecretKey}`, `S3_REGION=${process.env.S3_REGION || 'us-east-1'}`, `MAM_API_URL=${mamApiUrl}`, `RECORDER_ID=${id}`, `SOURCE_TYPE=${sourceType}`, `SOURCE_CONFIG=${JSON.stringify(sourceConfig)}`, `DEVICE_INDEX=${deviceIndex}`, // Recording codec controls `RECORDING_CODEC=${recorder.recording_codec || 'prores_hq'}`, `RECORDING_RESOLUTION=${recorder.recording_resolution || 'native'}`, `RECORDING_VIDEO_BITRATE=${recorder.recording_video_bitrate || ''}`, `RECORDING_FRAMERATE=${recorder.recording_framerate || ''}`, `RECORDING_AUDIO_CODEC=${recorder.recording_audio_codec || 'pcm_s24le'}`, `RECORDING_AUDIO_BITRATE=${recorder.recording_audio_bitrate || ''}`, `RECORDING_AUDIO_CHANNELS=${recorder.recording_audio_channels ?? 2}`, `RECORDING_CONTAINER=${recorder.recording_container || 'mov'}`, // Proxy codec controls `PROXY_ENABLED=${recorder.proxy_enabled !== false ? 'true' : 'false'}`, `PROXY_CODEC=${recorder.proxy_codec || 'h264'}`, `PROXY_RESOLUTION=${recorder.proxy_resolution || '1920x1080'}`, `PROXY_VIDEO_BITRATE=${recorder.proxy_video_bitrate || '2M'}`, `PROXY_FRAMERATE=${recorder.proxy_framerate || ''}`, `PROXY_AUDIO_CODEC=${recorder.proxy_audio_codec || 'aac'}`, `PROXY_AUDIO_BITRATE=${recorder.proxy_audio_bitrate || '128k'}`, `PROXY_AUDIO_CHANNELS=${recorder.proxy_audio_channels ?? 2}`, `PROXY_CONTAINER=${recorder.proxy_container || 'mp4'}`, `PROJECT_ID=${takeProjectId}`, `CLIP_NAME=${clipName}`, `ASSET_ID=${assetIdLive}`, `MAM_API_TOKEN=${process.env.CAPTURE_TOKEN || ''}`, `GROWING_ENABLED=${growingEnabled ? 'true' : 'false'}`, `GROWING_PATH=/growing`, ]; // Deltacast: pass port count so the capture container can enumerate // test-card slots even without physical /dev/deltacast* nodes. if (sourceType === 'deltacast') { const dcCount = process.env.DELTACAST_PORT_COUNT || sourceConfig.port_count || ''; if (dcCount) env.push(`DELTACAST_PORT_COUNT=${dcCount}`); } if (sourceType === 'srt' || sourceType === 'rtmp') { env.push(`LISTEN=${isListener ? '1' : '0'}`); if (isListener) { env.push(`LISTEN_PORT=${sourceConfig.listen_port || (sourceType === 'srt' ? 9000 : 1935)}`); if (sourceType === 'rtmp' && sourceConfig.stream_key) { env.push(`STREAM_KEY=${sourceConfig.stream_key}`); } } else if (sourceConfig.url) { env.push(`SOURCE_URL=${sourceConfig.url}`); } } // GPU-accelerated codecs require the NVIDIA container runtime on the node. // hevc_nvenc / h264_nvenc are the only two we currently support; extend // this list if av1_nvenc or others are added later. const GPU_CODECS = ['hevc_nvenc', 'h264_nvenc']; const useGpu = GPU_CODECS.includes(recorder.recording_codec); // Determine whether to spawn locally or via a remote node-agent. const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id); // For remote sidecars, the capture container runs on the worker host network and cannot // resolve the Docker-internal mam-api hostname — replace with the external URL. if (isRemote) { const idx = env.findIndex(e => e.startsWith('MAM_API_URL=')); if (idx !== -1) env[idx] = `MAM_API_URL=${externalMamApiUrl}`; } let containerId; if (isRemote) { // Remote node: delegate container lifecycle to that node's agent. const capturePort = SIDECAR_PORT_BASE + (deviceIndex || 0); const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ image: 'wild-dragon-capture:latest', env, capturePort, sourceType, useGpu }), signal: AbortSignal.timeout(15000), }); if (!sidecarRes.ok) { // #105 — never proxy the remote node's raw response back to the // browser; it could contain echoed env vars on bad-request paths. const details = await sidecarRes.json().catch(() => ({})); console.error('[recorders] remote sidecar start failed:', JSON.stringify(details)); return res.status(502).json({ error: 'Remote node failed to start sidecar', details: (details && details.message) || 'see server logs', }); } const sidecarData = await sidecarRes.json(); containerId = sidecarData.containerId; } else { // Local spawn via Docker socket. const { portBindings, exposedPorts } = buildPortConfig(sourceType, sourceConfig); const alias = `recorder-${id}`; const hostBinds = ['/mnt/NVME/MAM/wild-dragon-live:/live']; if (sourceType === 'sdi') hostBinds.push('/dev/blackmagic:/dev/blackmagic'); if (sourceType === 'deltacast') { // Bind each /dev/deltacast* device node the host has into the container. // The capture service falls back to test-card if none are present. try { const { readdirSync } = await import('node:fs'); const dcEntries = readdirSync('/dev').filter(n => /^deltacast\d+$/.test(n)); for (const d of dcEntries) hostBinds.push(`/dev/${d}:/dev/${d}`); } catch (_) { /* no /dev/deltacast* nodes on this host */ } } if (growingEnabled) hostBinds.push('/mnt/NVME/MAM/wild-dragon-growing:/growing'); const localEnv = [...env]; if (useGpu) { localEnv.push('NVIDIA_VISIBLE_DEVICES=all'); localEnv.push('NVIDIA_DRIVER_CAPABILITIES=video,compute,utility'); } const localHostConfig = { Privileged: true, NetworkMode: dockerNetwork, PortBindings: Object.keys(portBindings).length > 0 ? portBindings : undefined, Binds: hostBinds, ...(useGpu && { Runtime: 'nvidia', DeviceRequests: [{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] }], }), }; const containerConfig = { Image: 'wild-dragon-capture:latest', Env: localEnv, ExposedPorts: Object.keys(exposedPorts).length > 0 ? exposedPorts : undefined, HostConfig: localHostConfig, NetworkingConfig: { EndpointsConfig: { [dockerNetwork]: { Aliases: [alias] }, }, }, Hostname: alias, }; const createRes = await dockerApi('POST', '/containers/create', containerConfig); if (createRes.status !== 201) { // Issue #105 — log the full Docker error server-side, but never echo // the create payload (which contains S3_ACCESS_KEY / STREAM_KEY in // Env) back to the client. Send a short, generic message. console.error('[recorders] container create failed:', JSON.stringify(createRes.data)); return res.status(500).json({ error: 'Failed to create container', details: (createRes.data && createRes.data.message) || 'see server logs', }); } containerId = createRes.data.Id; const startRes = await dockerApi('POST', `/containers/${containerId}/start`); if (startRes.status !== 204) { console.error('[recorders] container start failed:', JSON.stringify(startRes.data)); return res.status(500).json({ error: 'Failed to start container', details: (startRes.data && startRes.data.message) || 'see server logs', }); } } const updateResult = await pool.query( `UPDATE recorders SET container_id = $1, status = $2, current_session_id = $3, updated_at = NOW() WHERE id = $4 RETURNING *`, [containerId, 'recording', clipName, id] ); res.json(updateResult.rows[0]); } catch (err) { next(err); } }); // POST /:id/stop - Stop recording router.post('/:id/stop', async (req, res, next) => { try { const { id } = req.params; const recorderResult = await pool.query( 'SELECT * FROM recorders WHERE id = $1', [id] ); if (recorderResult.rows.length === 0) { return res.status(404).json({ error: 'Recorder not found' }); } const recorder = recorderResult.rows[0]; if (!recorder.container_id) { // No container tracked — reset stuck status gracefully. const result = await pool.query( `UPDATE recorders SET container_id = NULL, status = 'stopped', updated_at = NOW() WHERE id = $1 RETURNING *`, [id] ); return res.json(result.rows[0]); } const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id); if (isRemote) { const stopRes = await fetch(`${targetNodeApiUrl}/sidecar/${recorder.container_id}`, { method: 'DELETE', signal: AbortSignal.timeout(15000), }); if (!stopRes.ok && stopRes.status !== 404) { return res.status(502).json({ error: 'Remote node failed to stop sidecar' }); } } else { const stopRes = await dockerApi( 'POST', `/containers/${recorder.container_id}/stop` ); // 204 = stopped, 304 = already stopped, 404 = container gone — all acceptable. if (stopRes.status !== 204 && stopRes.status !== 304 && stopRes.status !== 404) { return res.status(500).json({ error: 'Failed to stop container', details: stopRes.data, }); } // Only attempt remove if the container existed (not 404). if (stopRes.status !== 404) { const removeRes = await dockerApi( 'DELETE', `/containers/${recorder.container_id}` ); if (removeRes.status !== 204 && removeRes.status !== 404) { return res.status(500).json({ error: 'Failed to remove container', details: removeRes.data, }); } } } const updateResult = await pool.query( `UPDATE recorders SET container_id = NULL, status = $1, updated_at = NOW() WHERE id = $2 RETURNING *`, ['stopped', id] ); res.json(updateResult.rows[0]); } catch (err) { next(err); } }); // GET /:id/status - Get live status router.get('/:id/status', async (req, res, next) => { try { const { id } = req.params; const recorderResult = await pool.query( 'SELECT * FROM recorders WHERE id = $1', [id] ); if (recorderResult.rows.length === 0) { return res.status(404).json({ error: 'Recorder not found' }); } const recorder = recorderResult.rows[0]; if (!recorder.container_id) { return res.json({ status: recorder.status, duration: 0, containerId: null, }); } const deviceIndex = recorder.device_index ?? (recorder.source_config?.device ?? 0); const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id); let isRunning = false; let duration = 0; let signal = 'connecting'; let signalKnown = false; let live = null; if (isRemote) { try { const statusRes = await fetch(`${targetNodeApiUrl}/sidecar/${recorder.container_id}/status`, { signal: AbortSignal.timeout(4000), }); if (statusRes.ok) { const data = await statusRes.json(); isRunning = data.running; if (data.startedAt) { duration = Math.floor((Date.now() - new Date(data.startedAt).getTime()) / 1000); } live = data.live; } } catch (_) { /* node unreachable */ } } else { const inspectRes = await dockerApi( 'GET', `/containers/${recorder.container_id}/json` ); if (inspectRes.status !== 200) { return res.json({ status: 'unknown', duration: 0, containerId: recorder.container_id, }); } const container = inspectRes.data; isRunning = container.State.Running; duration = Math.floor((Date.now() - new Date(container.State.StartedAt).getTime()) / 1000); try { const captureRes = await fetch(`http://recorder-${id}:3001/capture/status`, { signal: AbortSignal.timeout(2000) }); if (captureRes.ok) live = await captureRes.json(); } catch (_) { /* not ready yet */ } } if (isRunning) signal = 'receiving'; if (!isRunning) signal = 'stopped'; if (live && live.signal) { signal = live.signal; signalKnown = true; } res.json({ status: isRunning ? 'recording' : 'stopped', duration, containerId: recorder.container_id, signal, signalKnown, framesReceived: live ? live.framesReceived : null, currentFps: live ? live.currentFps : null, lastFrameAt: live ? live.lastFrameAt : null, lastError: live ? live.lastError : null, }); } catch (err) { next(err); } }); // DELETE /:id - Delete recorder router.delete('/:id', async (req, res, next) => { try { const { id } = req.params; const recorderResult = await pool.query( 'SELECT * FROM recorders WHERE id = $1', [id] ); if (recorderResult.rows.length === 0) { return res.status(404).json({ error: 'Recorder not found' }); } const recorder = recorderResult.rows[0]; if (recorder.container_id) { const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id); try { if (isRemote) { await fetch(`${targetNodeApiUrl}/sidecar/${recorder.container_id}`, { method: 'DELETE', signal: AbortSignal.timeout(10000), }); } else { await dockerApi('POST', `/containers/${recorder.container_id}/stop`); await dockerApi('DELETE', `/containers/${recorder.container_id}`); } } catch (err) { console.error('Error stopping container during delete:', err); } } const deleteResult = await pool.query( 'DELETE FROM recorders WHERE id = $1 RETURNING *', [id] ); res.json({ message: 'Recorder deleted', recorder: deleteResult.rows[0] }); } catch (err) { next(err); } }); // Issue #104 — limit probe targets so an authed user can't scan the cluster's // internal services (Docker socket, DB, metadata endpoints). const ALLOWED_PROBE_SCHEMES = new Set(['srt', 'rtmp', 'rtmps', 'rtsp', 'udp', 'rtp']); const BLOCKED_PROBE_PORTS = new Set([22, 25, 53, 80, 443, 5432, 6379, 9000, 9100, 9229]); function isPrivateOrLoopback(host) { if (!host) return true; const h = host.toLowerCase(); if (h === 'localhost' || h.endsWith('.local') || h.endsWith('.internal')) return true; // Hostname lookups happen later by the socket; here we just bail on the // obvious cases. IPv4 private ranges + IPv6 link-local + AWS metadata IP. if (/^127\./.test(h)) return true; if (/^10\./.test(h)) return true; if (/^192\.168\./.test(h)) return true; if (/^172\.(1[6-9]|2[0-9]|3[01])\./.test(h)) return true; if (/^169\.254\./.test(h)) return true; // link-local / AWS metadata if (/^100\.6[4-9]\./.test(h) || /^100\.[7-9]\d\./.test(h) || /^100\.1[0-1]\d\./.test(h) || /^100\.12[0-7]\./.test(h)) return true; if (/^0\./.test(h) || /^::1$/.test(h) || /^fe80:/.test(h) || /^fc/.test(h) || /^fd/.test(h)) return true; return false; } function isAdmin(req) { if (process.env.AUTH_ENABLED !== 'true') return true; return req.user?.role === 'admin'; } // POST /probe - Probe a source URL for reachability. // Tries the capture service first; falls back to basic TCP/UDP connectivity // check when capture is not running. router.post('/probe', async (req, res) => { const { source_type, url } = req.body || {}; // Validate URL up-front so we don't even let the capture service see junk. let parsed = null; if (url) { try { parsed = new URL(url); } catch { return res.status(400).json({ error: 'Invalid URL' }); } const proto = (parsed.protocol || '').replace(':', '').toLowerCase(); if (!ALLOWED_PROBE_SCHEMES.has(proto)) { return res.status(400).json({ error: `Scheme "${proto}" is not permitted for probe (#104)` }); } // Non-admin users can only probe public hostnames. Admins may probe LAN. if (!isAdmin(req) && isPrivateOrLoopback(parsed.hostname)) { return res.status(403).json({ error: 'Probe target must be a public host (#104)' }); } } // Try the capture service first (5s timeout) try { const r = await fetch('http://capture:3001/capture/probe', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(req.body || {}), signal: AbortSignal.timeout(5000), }); const data = await r.json().catch(() => ({})); return res.status(r.status).json(data); } catch (_) { // capture service not running — fall through to basic connectivity probe } if (!parsed) { return res.json({ reachable: false, mode: 'basic', note: 'Capture service offline. Provide a URL for connectivity check.', }); } const host = parsed.hostname; const proto = (parsed.protocol || '').replace(':', '').toLowerCase(); const isUdp = proto === 'srt' || source_type === 'srt'; const port = parseInt(parsed.port, 10) || (isUdp ? 9000 : 1935); if (BLOCKED_PROBE_PORTS.has(port) && !isAdmin(req)) { return res.status(403).json({ error: `Port ${port} is not permitted for probe (#104)` }); } const reachable = await (isUdp ? probeUdp(host, port) : probeTcp(host, port)); return res.json({ reachable, mode: 'basic', note: `Capture service offline · ${isUdp ? 'UDP' : 'TCP'} connectivity check only`, ...(reachable ? { source: `${host}:${port}` } : { error: `${host}:${port} did not respond` } ), }); }); function probeTcp(host, port) { return new Promise((resolve) => { const sock = new net.Socket(); let done = false; const finish = (ok) => { if (!done) { done = true; sock.destroy(); resolve(ok); } }; sock.setTimeout(4000); sock.connect(port, host, () => finish(true)); sock.on('error', () => finish(false)); sock.on('timeout', () => finish(false)); }); } function probeUdp(host, port) { return new Promise((resolve) => { const sock = dgram.createSocket('udp4'); let done = false; const finish = (ok) => { if (done) return; done = true; try { sock.close(); } catch (_) {} resolve(ok); }; // ICMP port-unreachable will fire sock.on('error') within ~100ms if nothing is listening sock.on('error', () => finish(false)); sock.send(Buffer.alloc(16, 0), 0, 16, port, host, (err) => { if (err) return finish(false); // No ICMP error after 2.5s → assume something is listening setTimeout(() => finish(true), 2500); }); setTimeout(() => finish(false), 5000); }); } // GET /:id/live/* — reverse-proxy the live HLS preview from the recorder's node. // Remote recorders: segments live on the worker node, served by its node-agent // (/live/...). Local recorders: served from this host's /live mount. Browser // media requests carry the session cookie (same-origin) so auth passes. router.get('/:id/live/:rest(*)', async (req, res, next) => { try { const { id } = req.params; const rest = req.params.rest; if (!rest || rest.includes('..')) return res.status(400).end(); const rec = await pool.query('SELECT node_id FROM recorders WHERE id = $1', [id]); if (rec.rows.length === 0) return res.status(404).json({ error: 'Recorder not found' }); const ct = rest.endsWith('.m3u8') ? 'application/vnd.apple.mpegurl' : rest.endsWith('.ts') ? 'video/mp2t' : 'application/octet-stream'; res.set('Cache-Control', 'no-cache'); res.set('Content-Type', ct); const target = await resolveNodeTarget(rec.rows[0].node_id); if (!target.remote) { return fs.readFile('/live/' + rest, (err, data) => { if (err) return res.status(404).end(); res.end(data); }); } const base = String(target.apiUrl).replace(/\/$/, ''); const upstream = await fetch(`${base}/live/${rest}`).catch(() => null); if (!upstream || !upstream.ok) return res.status(upstream ? upstream.status : 502).end(); res.end(Buffer.from(await upstream.arrayBuffer())); } catch (err) { next(err); } }); export default router;