dragonflight/services/mam-api/src/routes/assets.js
Zac Gaetano 32a2d0329e fix(growing+gui): growing file = MXF XDCAM HD422 (Premiere-growable) + GUI fixes
Growing root cause (4th attempt): Premiere doesn't import H.264-in-.ts
("unsupported compression type"); its growing-file support is MXF OP1a.
Prior MXF/DNxHR failed because DNxHR is VBR and never flushes the incremental
index — XDCAM HD422 (mpeg2video, CBR) DOES write index segments into body
partitions mid-record (proven live via SIGKILL: 5 index segments, readable,
no footer). Growing master is now MXF OP1a / XDCAM HD422 4:2:2 CBR + PCM s16le,
operator bitrate as CBR (default 50M). live-path returns .mxf to match.

GUI: bitrate input is now always editable in growing mode (was hidden for
ProRes-selected codecs); codec menu shown disabled-with-explanation under
growing (it had only looked "missing" due to a stale served bundle).

Requires Premiere prefs: Media > "Automatically refresh growing files" ON,
and disable the two XMP-write-on-import options.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 22:13:01 -04:00

1067 lines
47 KiB
JavaScript

import express from 'express';
import { Queue } from 'bullmq';
import { v4 as uuidv4 } from 'uuid';
import { promises as fs } from 'node:fs';
import path from 'node:path';
import pool from '../db/pool.js';
import { getSignedUrlForObject, deleteObject, s3Client, getS3Bucket } from '../s3/client.js';
import { GetObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3';
import { validateUuid } from '../middleware/errors.js';
import { assertProjectAccess, accessibleProjectIds } from '../auth/authz.js';
import { requireAdmin } from '../middleware/auth.js';
const router = express.Router();
// Every /:id asset route is scoped to the asset's project. The param handler
// validates the UUID, resolves the owning project_id, and asserts at least
// 'view' access (the baseline for touching an asset at all). Mutating routes
// additionally assert 'edit' via req.assetProjectId. A missing asset is a clean
// 404 here rather than leaking existence to users without access.
router.param('id', async (req, res, next) => {
validateUuid('id')(req, res, () => {});
if (res.headersSent) return;
try {
const { rows } = await pool.query('SELECT project_id FROM assets WHERE id = $1', [req.params.id]);
if (rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
req.assetProjectId = rows[0].project_id;
await assertProjectAccess(req.user, req.assetProjectId, 'view');
next();
} catch (err) { next(err); }
});
// Route-level guard for mutating /:id endpoints — escalates the param handler's
// 'view' baseline to 'edit'. Reuses req.assetProjectId (already resolved).
async function requireAssetEdit(req, res, next) {
try {
await assertProjectAccess(req.user, req.assetProjectId, 'edit');
next();
} catch (err) { next(err); }
}
// BullMQ queue connection (mirrors worker/src/index.js)
const parseRedisUrl = (url) => {
const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
};
const proxyQueue = new Queue('proxy', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
const thumbnailQueue = new Queue('thumbnail', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
const trimQueue = new Queue('trim', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
const filmstripQueue = new Queue('filmstrip', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
const hlsQueue = new Queue('hls', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
// GET / - List assets with filtering
router.get('/', async (req, res, next) => {
try {
const {
project_id,
bin_id,
status,
search,
media_type,
limit: rawLimit = 50,
offset: rawOffset = 0,
include_archived,
} = req.query;
// Issue #119 — clamp pagination so an attacker (or a buggy client) can't
// request ?limit=999999999 and OOM the API while it serialises rows.
const MAX_LIMIT = 500;
const limit = Math.max(1, Math.min(MAX_LIMIT, parseInt(rawLimit, 10) || 50));
const offset = Math.max(0, parseInt(rawOffset, 10) || 0);
let query = `
SELECT a.*,
COUNT(*) OVER() AS full_count
FROM assets a
WHERE 1=1
`;
const params = [];
let paramCount = 1;
// Scope to projects the caller can access (admins are unfiltered). Without
// this, a granted user would see every asset across every project.
const access = await accessibleProjectIds(req.user);
if (!access.all) {
if (access.ids.size === 0) return res.json({ assets: [], total: 0 });
query += ` AND a.project_id = ANY($${paramCount++}::uuid[])`;
params.push([...access.ids]);
}
// Exclude archived unless explicitly requested — independent of status filter
if (include_archived !== 'true') {
query += ` AND a.status <> 'archived'`;
}
if (project_id) {
query += ` AND a.project_id = $${paramCount++}`;
params.push(project_id);
}
if (bin_id) {
query += ` AND a.bin_id = $${paramCount++}`;
params.push(bin_id);
}
if (status) {
query += ` AND a.status = $${paramCount++}`;
params.push(status);
}
if (media_type) {
query += ` AND a.media_type = $${paramCount++}`;
params.push(media_type);
}
if (search) {
query += ` AND (a.display_name ILIKE $${paramCount} OR a.filename ILIKE $${paramCount} OR a.notes ILIKE $${paramCount})`;
params.push(`%${search}%`);
paramCount++;
}
query += ` ORDER BY a.created_at DESC`;
query += ` LIMIT $${paramCount++} OFFSET $${paramCount++}`;
params.push(limit, offset);
const result = await pool.query(query, params);
const total = result.rows.length > 0 ? parseInt(result.rows[0].full_count, 10) : 0;
res.json({
assets: result.rows.map(({ full_count, ...rest }) => rest),
total,
});
} catch (err) {
next(err);
}
});
// POST / - Register a new asset from a completed capture session
router.post('/', async (req, res, next) => {
try {
const {
projectId,
binId,
clipName,
hiresKey,
proxyKey,
duration,
capturedAt,
sourceType, // Bug #64: was ignored — now used to set media_type
needsProxy, // Bug #64: was ignored — now controls proxy queue logic
} = req.body;
if (!projectId || !clipName) {
return res.status(400).json({ error: 'projectId and clipName are required' });
}
// Registering an asset writes into a project — require edit access there.
await assertProjectAccess(req.user, projectId, 'edit');
const durationNum = duration !== undefined && duration !== null ? Number(duration) : null;
if (durationNum !== null && !Number.isFinite(durationNum)) {
return res.status(400).json({ error: 'duration must be a finite number (seconds)' });
}
const durationMs = durationNum !== null ? Math.round(durationNum * 1000) : null;
const existing = await pool.query(
`SELECT * FROM assets
WHERE project_id = $1 AND display_name = $2 AND status = 'live'
ORDER BY created_at DESC LIMIT 1`,
[projectId, clipName]
);
// Bug #63: refuse to overwrite a live asset — return 409 with the existing row
if (existing.rows.length > 0) {
return res.status(409).json({
error: 'A live asset with this name already exists',
asset: existing.rows[0],
});
}
let id;
let asset;
{
id = uuidv4();
// Bug #64: use sourceType to set media_type (default 'video')
const mediaType = (sourceType === 'audio') ? 'audio' : 'video';
const ins = await pool.query(
`INSERT INTO assets (
id, project_id, bin_id,
filename, display_name,
status, media_type,
original_s3_key, proxy_s3_key,
duration_ms,
created_at, updated_at
)
VALUES (
$1, $2, $3,
$4, $4,
'processing', $9,
$5, $6,
$7,
COALESCE($8::timestamptz, NOW()), NOW()
)
RETURNING *`,
[
id, projectId, binId || null,
clipName,
hiresKey || null, proxyKey || null,
durationMs,
capturedAt || null,
mediaType,
]
);
asset = ins.rows[0];
}
const thumbnailKey = `thumbnails/${id}.jpg`;
// Bug #64: when needsProxy is explicitly false and proxyKey is already set,
// skip re-queuing a proxy job and mark the asset ready immediately.
if (needsProxy === false && proxyKey) {
await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id]);
asset.status = 'ready';
} else if (proxyKey) {
await thumbnailQueue.add('generate', { assetId: id, proxyKey, outputKey: thumbnailKey });
} else if (asset.original_s3_key) {
const generatedProxyKey = `proxies/${id}.mp4`;
await proxyQueue.add('generate', {
assetId: id, inputKey: asset.original_s3_key, outputKey: generatedProxyKey,
});
console.log(`[assets] queued proxy for ${id} (${asset.display_name})`);
} else {
await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id]);
asset.status = 'ready';
}
res.status(201).json(asset);
} catch (err) {
// Unique constraint violation from the partial index (migration 017) — two
// concurrent captures raced through the SELECT before either INSERT landed.
if (err.code === '23505' && err.constraint === 'idx_assets_live_unique') {
return res.status(409).json({ error: 'A live asset with this name already exists (concurrent capture race)' });
}
next(err);
}
});
// POST /cleanup-live — cross-project maintenance, admin only.
router.post('/cleanup-live', requireAdmin, async (req, res, next) => {
try {
const maxAgeHours = Math.max(1, parseInt(req.query.max_age_hours || '4', 10));
const result = await pool.query(
`UPDATE assets SET status = 'error', updated_at = NOW()
WHERE status = 'live' AND created_at < NOW() - ($1 * INTERVAL '1 hour')
RETURNING id, display_name, project_id, created_at`,
[maxAgeHours]
);
res.json({ cleaned: result.rowCount, assets: result.rows });
} catch (err) { next(err); }
});
// POST /cleanup-live-orphans — cross-project maintenance, admin only.
router.post('/cleanup-live-orphans', requireAdmin, async (_req, res, next) => {
try {
const liveRoot = process.env.LIVE_DIR || '/live';
let entries;
try {
entries = await fs.readdir(liveRoot, { withFileTypes: true });
} catch (err) {
if (err.code === 'ENOENT') return res.json({ reaped: 0, kept: 0, dirs: [] });
throw err;
}
const uuidRe = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
const dirIds = entries.filter(e => e.isDirectory() && uuidRe.test(e.name)).map(e => e.name);
if (dirIds.length === 0) return res.json({ reaped: 0, kept: 0, dirs: [] });
const known = await pool.query('SELECT id FROM assets WHERE id = ANY($1::uuid[])', [dirIds]);
const keep = new Set(known.rows.map(r => r.id));
const reaped = [], kept = [];
for (const id of dirIds) {
if (keep.has(id)) { kept.push(id); continue; }
const fullPath = path.join(liveRoot, id);
try {
await fs.rm(fullPath, { recursive: true, force: true });
reaped.push(id);
console.log(`[assets] reaped orphan live dir ${fullPath}`);
} catch (err) {
console.warn(`[assets] failed to reap ${fullPath}: ${err.message}`);
}
}
res.json({ reaped: reaped.length, kept: kept.length, dirs: reaped });
} catch (err) { next(err); }
});
// GET /:id
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const result = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
res.json(result.rows[0]);
} catch (err) { next(err); }
});
// PATCH /:id
router.patch('/:id', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const { display_name, tags, notes, bin_id } = req.body;
// bin_id must reference a bin in the asset's OWN project — otherwise an
// editor in project A could stuff their asset into project B's bin tree.
// Null/empty clears the bin, which is always allowed.
if (bin_id) {
const bin = await pool.query('SELECT project_id FROM bins WHERE id = $1', [bin_id]);
if (bin.rows.length === 0) return res.status(400).json({ error: 'bin_id not found' });
if (bin.rows[0].project_id !== req.assetProjectId) {
return res.status(400).json({ error: 'bin_id belongs to a different project' });
}
}
const updates = [], params = [];
let paramCount = 1;
if (display_name !== undefined) { updates.push(`display_name = $${paramCount++}`); params.push(display_name); }
if (tags !== undefined) { updates.push(`tags = $${paramCount++}`); params.push(tags); }
if (notes !== undefined) { updates.push(`notes = $${paramCount++}`); params.push(notes); }
if (bin_id !== undefined) { updates.push(`bin_id = $${paramCount++}`); params.push(bin_id || null); }
if (updates.length === 0) return res.status(400).json({ error: 'No fields to update' });
updates.push(`updated_at = NOW()`);
params.push(id);
const result = await pool.query(
`UPDATE assets SET ${updates.join(', ')} WHERE id = $${paramCount} RETURNING *`, params
);
if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
res.json(result.rows[0]);
} catch (err) { next(err); }
});
// POST /:id/copy
router.post('/:id/copy', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const { binId, projectId } = req.body;
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const src = r.rows[0];
// Destination project defaults to source's. If the caller overrides it,
// assert edit on the target — without this, an editor in project A could
// clone any asset they can see into project B with no grant on B.
const destProjectId = projectId || src.project_id;
if (projectId && projectId !== src.project_id) {
await assertProjectAccess(req.user, destProjectId, 'edit');
}
// Destination bin (if any) must belong to the destination project — same
// class of bug as the PATCH bin_id hole.
const destBinId = binId === undefined ? src.bin_id : (binId || null);
if (destBinId) {
const bin = await pool.query('SELECT project_id FROM bins WHERE id = $1', [destBinId]);
if (bin.rows.length === 0) return res.status(400).json({ error: 'binId not found' });
if (bin.rows[0].project_id !== destProjectId) {
return res.status(400).json({ error: 'binId belongs to a different project than the destination' });
}
}
const newId = uuidv4();
// Bug #60: null out proxy_s3_key and thumbnail_s3_key on the copy to avoid
// sharing S3 objects with the source. Set status to 'processing' so the copy
// gets its own proxy generated. Re-queue proxy generation below if source exists.
const ins = await pool.query(
`INSERT INTO assets (
id, project_id, bin_id, filename, display_name,
status, media_type, original_s3_key, proxy_s3_key, thumbnail_s3_key,
codec, resolution, fps, duration_ms, start_tc, file_size, tags, notes,
created_at, updated_at
) VALUES (
$1,$2,$3,$4,$5,$6,$7,$8,NULL,NULL,$9,$10,$11,$12,$13,$14,$15,$16,NOW(),NOW()
) RETURNING *`,
[
newId, destProjectId,
destBinId,
src.filename, src.display_name, 'processing', src.media_type,
src.original_s3_key,
src.codec, src.resolution, src.fps, src.duration_ms, src.start_tc,
src.file_size, src.tags, src.notes,
]
);
const copy = ins.rows[0];
// Re-queue proxy generation from original_s3_key so the copy gets its own proxy
if (copy.original_s3_key) {
const newProxyKey = `proxies/${newId}.mp4`;
await proxyQueue.add('generate', {
assetId: newId, inputKey: copy.original_s3_key, outputKey: newProxyKey,
});
console.log(`[assets] queued proxy for copy ${newId} from ${newProxyKey}`);
} else {
// No source to transcode from — mark ready immediately
await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [newId]);
copy.status = 'ready';
}
res.status(201).json(copy);
} catch (err) { next(err); }
});
// POST /:id/mark-empty
router.post('/:id/mark-empty', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
// Bug #66: first check the asset exists and what status it is in
const check = await pool.query(`SELECT id, status FROM assets WHERE id = $1`, [id]);
if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const current = check.rows[0].status;
// Already terminal — nothing to do, return 200 with skipped flag
if (current === 'error' || current === 'ready') {
return res.status(200).json({ id, skipped: true });
}
// Allow update for 'live' or 'processing' (race condition on shutdown)
if (current === 'live' || current === 'processing') {
await pool.query(
`UPDATE assets
SET status = 'error',
notes = COALESCE(notes || E'\\n', '') || 'Recording produced no frames — source never connected.',
updated_at = NOW()
WHERE id = $1`,
[id]
);
return res.json({ id });
}
// Any other status (e.g. 'archived') is incompatible
return res.status(409).json({
error: `Cannot mark-empty an asset with status '${current}'`,
status: current,
});
} catch (err) { next(err); }
});
// POST /:id/finalize
// Capture sidecar calls this on a SUCCESSFUL recording stop to finalise the
// pre-created 'live' asset (created at recorder start, id passed as ASSET_ID).
// Previously the sidecar did POST / to create a NEW asset, which collided with
// the existing live row -> 409 -> asset stuck 'live', no jobs. Finalising by id
// flips it out of 'live', records duration + S3 keys, and kicks off the
// proxy -> thumbnail -> filmstrip job chain.
router.post('/:id/finalize', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const { hiresKey, proxyKey, duration } = req.body;
const check = await pool.query(`SELECT * FROM assets WHERE id = $1`, [id]);
if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const current = check.rows[0].status;
// Already terminal — idempotent no-op (handles shutdown retries).
if (current === 'ready' || current === 'error') {
return res.status(200).json({ ...check.rows[0], skipped: true });
}
const durationNum = duration !== undefined && duration !== null ? Number(duration) : null;
const durationMs = (durationNum !== null && Number.isFinite(durationNum)) ? Math.round(durationNum * 1000) : null;
const upd = await pool.query(
`UPDATE assets
SET status = 'processing',
original_s3_key = COALESCE($2, original_s3_key),
proxy_s3_key = COALESCE($3, proxy_s3_key),
duration_ms = COALESCE($4, duration_ms),
updated_at = NOW()
WHERE id = $1
RETURNING *`,
[id, hiresKey || null, proxyKey || null, durationMs]
);
const asset = upd.rows[0];
const thumbnailKey = `thumbnails/${id}.jpg`;
if (asset.proxy_s3_key) {
// Proxy already produced by the capture sidecar — just build the
// thumbnail (which then chains filmstrip). Worker flips status->ready.
await thumbnailQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key, outputKey: thumbnailKey });
console.log(`[assets] finalize ${id}: queued thumbnail (proxy present)`);
} else if (asset.original_s3_key) {
// No proxy yet — generate it from the hi-res master. The proxy worker
// chains thumbnail -> filmstrip on completion.
const generatedProxyKey = `proxies/${id}.mp4`;
await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: generatedProxyKey });
console.log(`[assets] finalize ${id}: queued proxy from master`);
} else {
await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id]);
asset.status = 'ready';
}
console.log(`[assets] finalized live asset ${id} (${asset.display_name}) -> ${asset.status}`);
res.json(asset);
} catch (err) { next(err); }
});
// POST /:id/generate-proxy
router.post('/:id/generate-proxy', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const asset = r.rows[0];
if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no hi-res source to proxy from' });
const proxyKey = asset.proxy_s3_key || `proxies/${id}.mp4`;
await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey });
const updated = await pool.query(
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [id]
);
res.json(updated.rows[0]);
} catch (err) { next(err); }
});
// POST /backfill-proxies — cross-project maintenance, admin only.
router.post('/backfill-proxies', requireAdmin, async (_req, res, next) => {
try {
const targets = await pool.query(
`SELECT id, original_s3_key FROM assets
WHERE status = 'ready' AND original_s3_key IS NOT NULL
AND (proxy_s3_key IS NULL OR proxy_s3_key = '')
ORDER BY created_at DESC`
);
for (const asset of targets.rows) {
const proxyKey = `proxies/${asset.id}.mp4`;
await proxyQueue.add('generate', { assetId: asset.id, inputKey: asset.original_s3_key, outputKey: proxyKey });
}
if (targets.rows.length > 0) {
await pool.query(
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = ANY($1)`,
[targets.rows.map(r => r.id)]
);
}
res.json({ queued: targets.rows.length });
} catch (err) { next(err); }
});
// POST /:id/reprocess?type=proxy|thumbnail|filmstrip
// Force-requeue a processing job regardless of current asset status.
router.post('/:id/reprocess', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const type = req.query.type || 'proxy';
if (!['proxy', 'thumbnail', 'filmstrip', 'hls'].includes(type)) {
return res.status(400).json({ error: 'type must be "proxy", "thumbnail", "filmstrip", or "hls"' });
}
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const asset = r.rows[0];
if (type === 'proxy') {
if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no source file' });
const proxyKey = `proxies/${id}.mp4`;
await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey });
await pool.query(`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1`, [id]);
return res.json({ queued: 'proxy', assetId: id });
}
if (type === 'thumbnail') {
if (!asset.proxy_s3_key) return res.status(400).json({ error: 'Asset has no proxy' });
const thumbnailKey = `thumbnails/${id}.jpg`;
await thumbnailQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key, outputKey: thumbnailKey });
return res.json({ queued: 'thumbnail', assetId: id });
}
if (type === 'filmstrip') {
if (!asset.proxy_s3_key) return res.status(400).json({ error: 'Asset has no proxy — generate proxy first' });
await filmstripQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key });
return res.json({ queued: 'filmstrip', assetId: id });
}
if (type === 'hls') {
// Backfill: remux the existing proxy MP4 into an HLS rendition (no re-encode).
if (!asset.proxy_s3_key) return res.status(400).json({ error: 'Asset has no proxy — generate proxy first' });
await hlsQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key });
return res.json({ queued: 'hls', assetId: id });
}
} catch (err) { next(err); }
});
// GET /:id/filmstrip — returns signed URL to the pre-built filmstrip JSON
router.get('/:id/filmstrip', async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT filmstrip_s3_key FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const { filmstrip_s3_key } = r.rows[0];
if (!filmstrip_s3_key) return res.json({ url: null, ready: false });
const url = await getSignedUrlForObject(filmstrip_s3_key);
res.json({ url, ready: true });
} catch (err) { next(err); }
});
// POST /:id/retry
router.post('/:id/retry', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const asset = r.rows[0];
if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no source file to reprocess' });
const canRetry = asset.status === 'error' || !asset.proxy_s3_key;
if (!canRetry) return res.status(400).json({ error: `Nothing to retry — asset is ${asset.status} and already has a proxy.` });
const proxyKey = asset.proxy_s3_key || `proxies/${id}.mp4`;
await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey });
const updated = await pool.query(
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [id]
);
res.json(updated.rows[0]);
} catch (err) { next(err); }
});
// DELETE /:id
router.delete('/:id', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const { hard } = req.query;
if (hard === 'true') {
const assetResult = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (assetResult.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const asset = assetResult.rows[0];
// Remove any pending/waiting BullMQ jobs for this asset before deleting
// the row — prevents workers from receiving jobs for a non-existent asset.
for (const queue of [proxyQueue, thumbnailQueue]) {
try {
const waiting = await queue.getJobs(['waiting', 'delayed', 'prioritized']);
for (const job of waiting) {
if (job.data?.assetId === id) await job.remove();
}
} catch (e) {
console.warn(`[assets] BullMQ cleanup failed for asset ${id}:`, e.message);
}
}
const s3Errors = [];
for (const key of [asset.proxy_s3_key, asset.thumbnail_s3_key, asset.original_s3_key]) {
if (!key) continue;
try { await deleteObject(key); }
catch (e) { s3Errors.push({ key, error: e.message }); console.warn(`[assets] s3 delete failed for ${key}:`, e.message); }
}
await pool.query('DELETE FROM assets WHERE id = $1', [id]);
res.json({ message: 'Asset deleted permanently', ...(s3Errors.length ? { s3Errors } : {}) });
} else {
const result = await pool.query(
`UPDATE assets SET status = 'archived', updated_at = NOW() WHERE id = $1 RETURNING *`, [id]
);
if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
res.json(result.rows[0]);
}
} catch (err) { next(err); }
});
// GET /:id/stream
router.get('/:id/stream', async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const a = r.rows[0];
if (a.status === 'live') return res.json({ url: `/live/${a.id}/index.m3u8`, type: 'hls', live: true });
// `url` is the directly-downloadable MP4 proxy; `hls_url` is the HLS
// rendition for in-browser playback (whole-file segment GETs avoid the
// RustFS ranged-GET stitching the MP4 path needs). The Premiere plugin
// downloads `url` to a file and imports it, so `url` must NOT be the
// .m3u8 playlist — Premiere can't import a playlist ("unsupported
// compression type"). The web player prefers `hls_url` when present.
if (a.hls_s3_key) {
return res.json({
url: `/api/v1/assets/${id}/video`,
type: 'mp4',
source: a.proxy_s3_key ? 'proxy' : 'original',
hls_url: `/api/v1/assets/${id}/hls/playlist.m3u8`,
});
}
const VIDEO_EXTS = ['.mp4', '.mov', '.mxf', '.ts', '.m4v', '.mkv', '.avi', '.webm'];
const key = a.proxy_s3_key ||
(a.original_s3_key && VIDEO_EXTS.some(ext => a.original_s3_key.toLowerCase().endsWith(ext))
? a.original_s3_key : null);
if (key) {
return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4', source: a.proxy_s3_key ? 'proxy' : 'original' });
}
return res.json({ url: null, type: null, reason: 'no_proxy', has_source: !!a.original_s3_key });
} catch (err) { next(err); }
});
// GET /:id/hls/:file — serve an HLS rendition file (playlist / init / segment).
// Whole-object passthrough from S3: no Range handling, so this sidesteps the
// RustFS ranged-GET bug entirely (every segment is a small, complete GET).
// :file is strictly validated to prevent path traversal into the bucket.
const HLS_FILE_RE = /^(playlist\.m3u8|init\.mp4|segment_\d+\.m4s)$/;
router.get('/:id/hls/:file', async (req, res, next) => {
try {
const { id, file } = req.params;
if (!HLS_FILE_RE.test(file)) return res.status(400).json({ error: 'Invalid HLS file' });
const r = await pool.query('SELECT hls_s3_key FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const playlistKey = r.rows[0].hls_s3_key;
if (!playlistKey) return res.status(404).json({ error: 'No HLS rendition for this asset' });
// Derive the prefix from the stored playlist key (hls/<id>/playlist.m3u8)
// and request the specific file under it.
const prefix = playlistKey.replace(/\/[^/]+$/, '');
const key = `${prefix}/${file}`;
const isPlaylist = file.endsWith('.m3u8');
const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key }));
res.writeHead(200, {
'Content-Type': isPlaylist ? 'application/vnd.apple.mpegurl' : 'video/mp4',
'Cache-Control': isPlaylist ? 'no-cache' : 'private, max-age=3600',
...(s3Res.ContentLength ? { 'Content-Length': String(s3Res.ContentLength) } : {}),
});
s3Res.Body.pipe(res);
} catch (err) {
if (err && (err.name === 'NoSuchKey' || err.$metadata?.httpStatusCode === 404)) {
return res.status(404).json({ error: 'HLS file not found' });
}
next(err);
}
});
// GET /:id/live-path
router.get('/:id/live-path', async (req, res, next) => {
try {
const { id } = req.params;
const a = await pool.query('SELECT id, project_id, display_name, status FROM assets WHERE id = $1', [id]);
if (a.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const asset = a.rows[0];
if (asset.status !== 'live') return res.status(409).json({ error: 'Asset is not currently growing', status: asset.status });
// Growing-files mode is now per-recorder (recorders.growing_enabled), so we
// no longer gate on the removed global `growing_enabled` setting. A
// status='live' asset already proves a growing recorder is producing this
// file; we only need the editor-facing SMB URL to build the UNC path.
const s = await pool.query(`SELECT key, value FROM settings WHERE key = 'growing_smb_url'`);
const cfg = {};
for (const { key, value } of s.rows) cfg[key] = value;
if (!cfg.growing_smb_url) return res.status(409).json({ error: 'No SMB URL configured — set the editor SMB URL in Settings → Storage' });
// The growing master is ALWAYS MXF OP1a (XDCAM HD422) on the share, regardless
// of the recorder's configured finalized container — that is the format
// Premiere supports for edit-while-record growing files (incremental index
// segments written into body partitions, readable with no footer). The file
// on the share is `<clip>.mxf`. Keep this in lock-step with GROWING_EXT in
// services/capture/src/capture-manager.js.
const ext = 'mxf';
const smbRoot = cfg.growing_smb_url.replace(/\/+$/, '');
const winPath = smbRoot.replace(/^smb:\/\//, '\\\\').replace(/\//g, '\\') + `\\${asset.project_id}\\${asset.display_name}.${ext}`;
const posix = smbRoot.replace(/^smb:\/\//, '//') + `/${asset.project_id}/${asset.display_name}.${ext}`;
res.json({ smb_url: `${smbRoot}/${asset.project_id}/${asset.display_name}.${ext}`, win_path: winPath, posix_path: posix, project_id: asset.project_id, display_name: asset.display_name, ext });
} catch (err) { next(err); }
});
// GET /:id/video
// Proxies the S3 object through Node with proper cache headers.
// Direct S3 redirect doesn't work because broadcastmgmt.cloud (RustFS/openresty)
// rejects range requests that include an Origin header on presigned URLs — the
// signature only covers 'host', so adding Origin breaks signature validation.
// Instead we pipe through Node with:
// - ETag + Last-Modified for conditional requests (304 on repeat visits)
// - Cache-Control: private, max-age=3600 so the browser caches segments
// and doesn't re-fetch them on every seek within a session
// Issue #143 — RustFS returns empty bodies for ranged GETs whose start offset
// is past ~5.9 MB on single-file proxy MP4s. Confirmed via direct S3 probe:
// HEAD reports correct size, full GET (`bytes=0-`) works perfectly, but
// `bytes=8179166-` returns 206 + the right Content-Range header and a zero-
// byte body. A streaming GET from 0 reads cleanly *through* the broken zone.
//
// Workaround until the proxy worker emits HLS (planned v1.2.1): stream the
// proxy from offset 0, skip bytes the client didn't ask for, stop after the
// requested end. Browser sees a normal 206 + Content-Range. Mem stays flat;
// extra RustFS-to-mam-api bandwidth = (end+1 - actual-range) per seek.
//
// Small head-of-file ranges below RUSTFS_RANGE_SAFE_START are handled by a
// direct ranged GET — saves the streaming-from-0 cost on the common case of
// initial moov + first-segment fetch.
async function* stitchedS3Stream(key, startByte, endByte) {
// Yields buffers covering exactly [startByte, endByte] inclusive.
//
// RustFS only mis-serves a ranged GET when the *start* offset of the
// request is past ~5.8 MB. So we pull the object in 4 MB windows whose
// START offsets always stay below the broken threshold:
// - We anchor every chunk's start at a multiple of RUSTFS_SAFE_CHUNK
// (0, 4 MB, 8 MB, …).
// - Wait — that puts later starts past the threshold.
// Instead: skip directly to the chunk containing `startByte`, but request
// it as `bytes=anchorStart-end` where anchorStart < threshold. Since the
// bug only bites when the *request start* offset is large, we never issue
// a single GET whose Range start is past the broken zone — we instead
// exploit that a low-offset GET that *continues past* the threshold reads
// cleanly (confirmed by the bytes=0- full-GET probe).
//
// Practically: one GET from 0 that streams up through endByte, dropping
// the bytes below startByte as they arrive. Memory stays flat; we pay
// (endByte+1) bytes of RustFS-to-mam-api bandwidth per request.
const res = await s3Client.send(new GetObjectCommand({
Bucket: getS3Bucket(),
Key: key,
Range: `bytes=0-${endByte}`,
}));
let consumed = 0; // bytes seen so far from S3
let totalEmitted = 0;
for await (const buf of res.Body) {
const bufStart = consumed; // file offset of buf[0]
const bufEnd = consumed + buf.length - 1;
consumed += buf.length;
if (bufEnd < startByte) continue; // entirely before window
const sliceFrom = Math.max(0, startByte - bufStart);
const sliceTo = Math.min(buf.length, endByte - bufStart + 1);
if (sliceTo > sliceFrom) {
yield buf.subarray(sliceFrom, sliceTo);
totalEmitted += sliceTo - sliceFrom;
}
if (bufEnd >= endByte) break;
}
if (totalEmitted === 0) {
throw new Error(`RustFS returned empty body for ${key} bytes=0-${endByte}`);
}
}
router.get('/:id/video', async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT proxy_s3_key, original_s3_key FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const a = r.rows[0];
const VIDEO_EXTS = ['.mp4', '.mov', '.mxf', '.ts', '.m4v', '.mkv', '.avi', '.webm'];
const origIsVideo = a.original_s3_key && VIDEO_EXTS.some(ext => a.original_s3_key.toLowerCase().endsWith(ext));
const key = a.proxy_s3_key || (origIsVideo ? a.original_s3_key : null);
if (!key) return res.status(404).json({ error: 'No browser-playable source' });
// HEAD the object to learn the true size.
let totalSize = 0;
let etag, lastModified;
try {
const head = await s3Client.send(new HeadObjectCommand({ Bucket: getS3Bucket(), Key: key }));
totalSize = head.ContentLength || 0;
etag = head.ETag;
lastModified = head.LastModified;
} catch (_) {
// HEAD failed — fall back to a plain GET (no range).
}
const rangeHeader = req.headers.range;
// No Range header → stream the whole object. RustFS handles `bytes=0-`
// and "no Range" fine; this is the fast path.
if (!rangeHeader || totalSize === 0) {
const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key }));
const headers = {
'Content-Type': 'video/mp4',
'Accept-Ranges': 'bytes',
'Cache-Control': 'private, max-age=3600',
};
if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength);
if (s3Res.ETag) headers['ETag'] = s3Res.ETag;
if (s3Res.LastModified) headers['Last-Modified'] = s3Res.LastModified.toUTCString();
res.writeHead(200, headers);
s3Res.Body.pipe(res);
return;
}
// Parse `bytes=START-END` / `bytes=START-`. Ignore multi-range.
const m = /^bytes=(\d+)-(\d*)$/.exec(rangeHeader.trim());
if (!m) {
// Unparseable Range — fall back to full body, browser will cope.
const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key }));
res.writeHead(200, { 'Content-Type': 'video/mp4', 'Accept-Ranges': 'bytes' });
s3Res.Body.pipe(res);
return;
}
let start = parseInt(m[1], 10);
let end = m[2] === '' ? totalSize - 1 : parseInt(m[2], 10);
if (!Number.isFinite(start) || start < 0) start = 0;
if (!Number.isFinite(end) || end >= totalSize) end = totalSize - 1;
if (start >= totalSize) {
res.writeHead(416, {
'Content-Type': 'text/plain',
'Content-Length': '0',
'Content-Range': `bytes */${totalSize}`,
'Accept-Ranges': 'bytes',
'Cache-Control': 'no-store',
});
return res.end();
}
if (start > end) start = end;
const contentLength = end - start + 1;
const headers = {
'Content-Type': 'video/mp4',
'Accept-Ranges': 'bytes',
'Cache-Control': 'private, max-age=3600',
'Content-Range': `bytes ${start}-${end}/${totalSize}`,
'Content-Length': String(contentLength),
};
if (etag) headers['ETag'] = etag;
if (lastModified) headers['Last-Modified'] = lastModified.toUTCString();
// For small head-of-file ranges (entirely below the broken threshold)
// a direct ranged GET works and saves the streaming-from-0 cost.
const RUSTFS_RANGE_SAFE_START = parseInt(process.env.RUSTFS_RANGE_SAFE_START || String(5_500_000), 10);
if (start < RUSTFS_RANGE_SAFE_START && end < RUSTFS_RANGE_SAFE_START) {
const s3Res = await s3Client.send(new GetObjectCommand({
Bucket: getS3Bucket(), Key: key, Range: `bytes=${start}-${end}`,
}));
res.writeHead(206, headers);
s3Res.Body.pipe(res);
return;
}
// Otherwise: stream from offset 0, drop bytes below `start`, stop at
// `end`. Browser sees a normal 206; mam-api stays memory-flat.
res.writeHead(206, headers);
try {
for await (const buf of stitchedS3Stream(key, start, end)) {
// res.write returns false when backpressure builds — pause and wait.
if (!res.write(buf)) {
await new Promise(r => res.once('drain', r));
}
if (res.destroyed) return;
}
res.end();
} catch (err) {
console.error(`[video] stitch failed for ${key}:`, err.message);
if (!res.headersSent) {
res.writeHead(500, { 'Content-Type': 'text/plain', 'Cache-Control': 'no-store' });
res.end('Upstream storage error');
} else {
res.destroy(err);
}
}
} catch (err) { next(err); }
});
// GET /:id/hires
router.get('/:id/hires', async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT original_s3_key, filename, display_name, file_size FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const a = r.rows[0];
if (!a.original_s3_key) return res.status(404).json({ error: 'No hi-res source available' });
const url = await getSignedUrlForObject(a.original_s3_key);
const parts = a.original_s3_key.split('.');
const ext = (parts.length > 1 ? parts[parts.length - 1] : 'mxf').toLowerCase();
const base = (a.display_name || a.filename || id).replace(/[^\w.-]/g, '_').substring(0, 100);
res.json({ url, filename: `${base}.${ext}`, ext, file_size: a.file_size || null, type: 'hires' });
} catch (err) { next(err); }
});
// GET /:id/thumbnail
router.get('/:id/thumbnail', async (req, res, next) => {
try {
const { id } = req.params;
const result = await pool.query('SELECT thumbnail_s3_key FROM assets WHERE id = $1', [id]);
if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const { thumbnail_s3_key } = result.rows[0];
if (!thumbnail_s3_key) return res.status(404).json({ error: 'Thumbnail not yet available' });
const url = await getSignedUrlForObject(thumbnail_s3_key);
if (req.query.redirect === '1') return res.redirect(302, url);
res.json({ url });
} catch (err) { next(err); }
});
// POST /batch-trim
router.post('/batch-trim', async (req, res, next) => {
try {
const { clips } = req.body;
if (!Array.isArray(clips) || clips.length === 0) return res.status(400).json({ error: 'clips array is required and must be non-empty' });
for (const c of clips) {
if (!c.assetId || !c.filename ||
!Number.isFinite(Number(c.sourceInFrames)) || !Number.isFinite(Number(c.sourceOutFrames)) ||
!Number.isFinite(Number(c.timelineInFrames)) || !Number.isFinite(Number(c.timelineOutFrames)) ||
!Number.isInteger(Number(c.trackIndex)) || Number(c.trackIndex) < 0) {
return res.status(400).json({ error: 'Each clip must have assetId, filename, sourceInFrames, sourceOutFrames, timelineInFrames, timelineOutFrames, and a non-negative integer trackIndex' });
}
}
// Authorize every source asset's project (edit) before queuing any work.
const trimAssetIds = [...new Set(clips.map(c => c.assetId))];
const owning = await pool.query('SELECT id, project_id FROM assets WHERE id = ANY($1::uuid[])', [trimAssetIds]);
const projById = new Map(owning.rows.map(r => [r.id, r.project_id]));
for (const aid of trimAssetIds) {
const pid = projById.get(aid);
if (!pid) return res.status(404).json({ error: 'Asset not found: ' + aid });
await assertProjectAccess(req.user, pid, 'edit');
}
const jobId = uuidv4();
const expiresAt = new Date(Date.now() + 24 * 60 * 60 * 1000);
await pool.query(
`INSERT INTO jobs (id, type, status, payload, expires_at) VALUES ($1,$2,$3,$4,$5)`,
[jobId, 'trim', 'queued', JSON.stringify({ clips }), expiresAt]
);
const clipResults = [];
for (const c of clips) {
const clipInstanceId = uuidv4();
await trimQueue.add('trim-clip', { jobId, clipInstanceId, assetId: c.assetId, filename: c.filename, sourceInFrames: c.sourceInFrames, sourceOutFrames: c.sourceOutFrames, timelineInFrames: c.timelineInFrames, timelineOutFrames: c.timelineOutFrames, trackIndex: c.trackIndex });
await pool.query(`INSERT INTO temp_segments (job_id, clip_instance_id, asset_id, s3_key, expires_at) VALUES ($1,$2,$3,'',$4)`, [jobId, clipInstanceId, c.assetId, expiresAt]);
clipResults.push({ clipInstanceId, status: 'queued' });
}
res.status(201).json({ jobId, clips: clipResults });
} catch (err) { next(err); }
});
// GET /trim-status/:jobId
router.get('/trim-status/:jobId', async (req, res, next) => {
try {
const { jobId } = req.params;
const jobResult = await pool.query('SELECT * FROM jobs WHERE id = $1', [jobId]);
if (jobResult.rows.length === 0) return res.status(404).json({ error: 'Trim job not found' });
const job = jobResult.rows[0];
// Auto-expire: delete stale jobs rows (and their temp_segments) past TTL
if (job.expires_at && new Date(job.expires_at) < new Date()) {
await pool.query('DELETE FROM temp_segments WHERE job_id = $1', [jobId]);
await pool.query('DELETE FROM jobs WHERE id = $1', [jobId]);
return res.status(404).json({ error: 'Trim job expired' });
}
const segResult = await pool.query(`SELECT clip_instance_id, asset_id, s3_key, expires_at FROM temp_segments WHERE job_id = $1 ORDER BY created_at`, [jobId]);
const clips = segResult.rows.map(row => ({ clipInstanceId: row.clip_instance_id, assetId: row.asset_id, s3Key: row.s3_key || null, status: row.s3_key ? 'completed' : job.status, expiresAt: row.expires_at }));
res.json({ jobId, status: job.status, clips });
} catch (err) { next(err); }
});
// GET /temp-segment-url/:clipInstanceId
router.get('/temp-segment-url/:clipInstanceId', async (req, res, next) => {
try {
const { clipInstanceId } = req.params;
const result = await pool.query('SELECT s3_key FROM temp_segments WHERE clip_instance_id = $1 AND expires_at > NOW()', [clipInstanceId]);
if (result.rows.length === 0) return res.status(404).json({ error: 'Temp segment not found or expired' });
const { s3_key } = result.rows[0];
if (!s3_key) return res.status(404).json({ error: 'Segment not yet processed' });
const url = await getSignedUrlForObject(s3_key);
res.json({ url, s3Key: s3_key });
} catch (err) { next(err); }
});
// GET /:id/audio — return audio metadata and a signed URL for audio extraction
router.get('/:id/audio', async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query(
'SELECT id, media_type, proxy_s3_key, original_s3_key, audio_metadata, duration_ms FROM assets WHERE id = $1',
[id]
);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const a = r.rows[0];
const audioMeta = a.audio_metadata || null;
if (!audioMeta || !Array.isArray(audioMeta) || audioMeta.length === 0) {
return res.json({ tracks: [], hasAudio: false });
}
res.json({
tracks: audioMeta,
hasAudio: true,
durationMs: a.duration_ms || null,
});
} catch (err) { next(err); }
});
export default router;