Root causes found: 1. Scheduler crashing every 15s: assets table has no error_message column. Fix: remove error_message from UPDATE in scheduler.js (#66 regression). 2. Clip freezing: client-side filmstrip seek loop runs on main thread, seeks same proxy the player is streaming → both stall → freeze. Fix: replace browser seek loop entirely with server-side FFmpeg worker. 3. No dedicated filmstrip worker: filmstrip was never pre-built server-side. Changes: - services/mam-api/src/db/migrations/018-add-filmstrip-s3-key.sql Add filmstrip_s3_key TEXT column to assets table - services/worker/src/workers/filmstrip.js (new) BullMQ worker: downloads proxy, runs FFmpeg fps filter to extract 28 evenly-spaced JPEG frames, base64-encodes them, uploads JSON array to S3 at filmstrips/<assetId>.json, stores key in DB - services/worker/src/workers/thumbnail.js Queue filmstrip job automatically after thumbnail completes - services/worker/src/index.js Register filmstrip worker (concurrency=2), export filmstripQueue singleton, close it on SIGTERM - services/mam-api/src/routes/assets.js - filmstripQueue added - POST /reprocess?type=filmstrip now supported - GET /:id/filmstrip returns signed S3 URL for JSON frames - services/mam-api/src/routes/jobs.js filmstrip queue visible in Jobs UI - services/web-ui/public/screens-asset.jsx Replace browser seek loop with fetch of /assets/:id/filmstrip → fetch S3 JSON → render frames. Zero browser-side video seeking. Right-click and Files tab re-generate via API endpoint.
679 lines
29 KiB
JavaScript
679 lines
29 KiB
JavaScript
import express from 'express';
|
|
import { Queue } from 'bullmq';
|
|
import { v4 as uuidv4 } from 'uuid';
|
|
import { promises as fs } from 'node:fs';
|
|
import path from 'node:path';
|
|
import pool from '../db/pool.js';
|
|
import { getSignedUrlForObject, deleteObject, s3Client, getS3Bucket } from '../s3/client.js';
|
|
import { GetObjectCommand } from '@aws-sdk/client-s3';
|
|
import { requireAuth } from '../middleware/auth.js';
|
|
|
|
const router = express.Router();
|
|
|
|
router.use(requireAuth);
|
|
|
|
// BullMQ queue connection (mirrors worker/src/index.js)
|
|
const parseRedisUrl = (url) => {
|
|
const parsed = new URL(url);
|
|
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
|
|
};
|
|
|
|
const proxyQueue = new Queue('proxy', {
|
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
|
});
|
|
|
|
const thumbnailQueue = new Queue('thumbnail', {
|
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
|
});
|
|
|
|
const trimQueue = new Queue('trim', {
|
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
|
});
|
|
|
|
const filmstripQueue = new Queue('filmstrip', {
|
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
|
});
|
|
|
|
// GET / - List assets with filtering
|
|
router.get('/', async (req, res, next) => {
|
|
try {
|
|
const {
|
|
project_id,
|
|
bin_id,
|
|
status,
|
|
search,
|
|
media_type,
|
|
limit = 50,
|
|
offset = 0,
|
|
include_archived,
|
|
} = req.query;
|
|
|
|
let query = `
|
|
SELECT a.*,
|
|
COUNT(*) OVER() AS full_count
|
|
FROM assets a
|
|
WHERE 1=1
|
|
`;
|
|
const params = [];
|
|
let paramCount = 1;
|
|
|
|
// Exclude archived unless explicitly requested — independent of status filter
|
|
if (include_archived !== 'true') {
|
|
query += ` AND a.status <> 'archived'`;
|
|
}
|
|
|
|
if (project_id) {
|
|
query += ` AND a.project_id = $${paramCount++}`;
|
|
params.push(project_id);
|
|
}
|
|
|
|
if (bin_id) {
|
|
query += ` AND a.bin_id = $${paramCount++}`;
|
|
params.push(bin_id);
|
|
}
|
|
|
|
if (status) {
|
|
query += ` AND a.status = $${paramCount++}`;
|
|
params.push(status);
|
|
}
|
|
|
|
if (media_type) {
|
|
query += ` AND a.media_type = $${paramCount++}`;
|
|
params.push(media_type);
|
|
}
|
|
|
|
if (search) {
|
|
query += ` AND (a.display_name ILIKE $${paramCount} OR a.filename ILIKE $${paramCount} OR a.notes ILIKE $${paramCount})`;
|
|
params.push(`%${search}%`);
|
|
paramCount++;
|
|
}
|
|
|
|
query += ` ORDER BY a.created_at DESC`;
|
|
query += ` LIMIT $${paramCount++} OFFSET $${paramCount++}`;
|
|
params.push(limit, offset);
|
|
|
|
const result = await pool.query(query, params);
|
|
const total = result.rows.length > 0 ? parseInt(result.rows[0].full_count, 10) : 0;
|
|
|
|
res.json({
|
|
assets: result.rows.map(({ full_count, ...rest }) => rest),
|
|
total,
|
|
});
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// POST / - Register a new asset from a completed capture session
|
|
router.post('/', async (req, res, next) => {
|
|
try {
|
|
const {
|
|
projectId,
|
|
binId,
|
|
clipName,
|
|
hiresKey,
|
|
proxyKey,
|
|
duration,
|
|
capturedAt,
|
|
sourceType, // Bug #64: was ignored — now used to set media_type
|
|
needsProxy, // Bug #64: was ignored — now controls proxy queue logic
|
|
} = req.body;
|
|
|
|
if (!projectId || !clipName) {
|
|
return res.status(400).json({ error: 'projectId and clipName are required' });
|
|
}
|
|
|
|
const durationNum = duration !== undefined && duration !== null ? Number(duration) : null;
|
|
if (durationNum !== null && !Number.isFinite(durationNum)) {
|
|
return res.status(400).json({ error: 'duration must be a finite number (seconds)' });
|
|
}
|
|
const durationMs = durationNum !== null ? Math.round(durationNum * 1000) : null;
|
|
|
|
const existing = await pool.query(
|
|
`SELECT * FROM assets
|
|
WHERE project_id = $1 AND display_name = $2 AND status = 'live'
|
|
ORDER BY created_at DESC LIMIT 1`,
|
|
[projectId, clipName]
|
|
);
|
|
|
|
// Bug #63: refuse to overwrite a live asset — return 409 with the existing row
|
|
if (existing.rows.length > 0) {
|
|
return res.status(409).json({
|
|
error: 'A live asset with this name already exists',
|
|
asset: existing.rows[0],
|
|
});
|
|
}
|
|
|
|
let id;
|
|
let asset;
|
|
{
|
|
id = uuidv4();
|
|
// Bug #64: use sourceType to set media_type (default 'video')
|
|
const mediaType = (sourceType === 'audio') ? 'audio' : 'video';
|
|
const ins = await pool.query(
|
|
`INSERT INTO assets (
|
|
id, project_id, bin_id,
|
|
filename, display_name,
|
|
status, media_type,
|
|
original_s3_key, proxy_s3_key,
|
|
duration_ms,
|
|
created_at, updated_at
|
|
)
|
|
VALUES (
|
|
$1, $2, $3,
|
|
$4, $4,
|
|
'processing', $9,
|
|
$5, $6,
|
|
$7,
|
|
COALESCE($8::timestamptz, NOW()), NOW()
|
|
)
|
|
RETURNING *`,
|
|
[
|
|
id, projectId, binId || null,
|
|
clipName,
|
|
hiresKey || null, proxyKey || null,
|
|
durationMs,
|
|
capturedAt || null,
|
|
mediaType,
|
|
]
|
|
);
|
|
asset = ins.rows[0];
|
|
}
|
|
|
|
const thumbnailKey = `thumbnails/${id}.jpg`;
|
|
|
|
// Bug #64: when needsProxy is explicitly false and proxyKey is already set,
|
|
// skip re-queuing a proxy job and mark the asset ready immediately.
|
|
if (needsProxy === false && proxyKey) {
|
|
await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id]);
|
|
asset.status = 'ready';
|
|
} else if (proxyKey) {
|
|
await thumbnailQueue.add('generate', { assetId: id, proxyKey, outputKey: thumbnailKey });
|
|
} else if (asset.original_s3_key) {
|
|
const generatedProxyKey = `proxies/${id}.mp4`;
|
|
await proxyQueue.add('generate', {
|
|
assetId: id, inputKey: asset.original_s3_key, outputKey: generatedProxyKey,
|
|
});
|
|
console.log(`[assets] queued proxy for ${id} (${asset.display_name})`);
|
|
} else {
|
|
await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id]);
|
|
asset.status = 'ready';
|
|
}
|
|
|
|
res.status(201).json(asset);
|
|
} catch (err) {
|
|
// Unique constraint violation from the partial index (migration 017) — two
|
|
// concurrent captures raced through the SELECT before either INSERT landed.
|
|
if (err.code === '23505' && err.constraint === 'idx_assets_live_unique') {
|
|
return res.status(409).json({ error: 'A live asset with this name already exists (concurrent capture race)' });
|
|
}
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// POST /cleanup-live
|
|
router.post('/cleanup-live', async (req, res, next) => {
|
|
try {
|
|
const maxAgeHours = Math.max(1, parseInt(req.query.max_age_hours || '4', 10));
|
|
const result = await pool.query(
|
|
`UPDATE assets SET status = 'error', updated_at = NOW()
|
|
WHERE status = 'live' AND created_at < NOW() - ($1 * INTERVAL '1 hour')
|
|
RETURNING id, display_name, project_id, created_at`,
|
|
[maxAgeHours]
|
|
);
|
|
res.json({ cleaned: result.rowCount, assets: result.rows });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /cleanup-live-orphans
|
|
router.post('/cleanup-live-orphans', async (_req, res, next) => {
|
|
try {
|
|
const liveRoot = process.env.LIVE_DIR || '/live';
|
|
let entries;
|
|
try {
|
|
entries = await fs.readdir(liveRoot, { withFileTypes: true });
|
|
} catch (err) {
|
|
if (err.code === 'ENOENT') return res.json({ reaped: 0, kept: 0, dirs: [] });
|
|
throw err;
|
|
}
|
|
const uuidRe = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
|
|
const dirIds = entries.filter(e => e.isDirectory() && uuidRe.test(e.name)).map(e => e.name);
|
|
if (dirIds.length === 0) return res.json({ reaped: 0, kept: 0, dirs: [] });
|
|
const known = await pool.query('SELECT id FROM assets WHERE id = ANY($1::uuid[])', [dirIds]);
|
|
const keep = new Set(known.rows.map(r => r.id));
|
|
const reaped = [], kept = [];
|
|
for (const id of dirIds) {
|
|
if (keep.has(id)) { kept.push(id); continue; }
|
|
const fullPath = path.join(liveRoot, id);
|
|
try {
|
|
await fs.rm(fullPath, { recursive: true, force: true });
|
|
reaped.push(id);
|
|
console.log(`[assets] reaped orphan live dir ${fullPath}`);
|
|
} catch (err) {
|
|
console.warn(`[assets] failed to reap ${fullPath}: ${err.message}`);
|
|
}
|
|
}
|
|
res.json({ reaped: reaped.length, kept: kept.length, dirs: reaped });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /:id
|
|
router.get('/:id', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const result = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
|
|
if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
res.json(result.rows[0]);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// PATCH /:id
|
|
router.patch('/:id', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const { display_name, tags, notes, bin_id } = req.body;
|
|
const updates = [], params = [];
|
|
let paramCount = 1;
|
|
if (display_name !== undefined) { updates.push(`display_name = $${paramCount++}`); params.push(display_name); }
|
|
if (tags !== undefined) { updates.push(`tags = $${paramCount++}`); params.push(tags); }
|
|
if (notes !== undefined) { updates.push(`notes = $${paramCount++}`); params.push(notes); }
|
|
if (bin_id !== undefined) { updates.push(`bin_id = $${paramCount++}`); params.push(bin_id || null); }
|
|
if (updates.length === 0) return res.status(400).json({ error: 'No fields to update' });
|
|
updates.push(`updated_at = NOW()`);
|
|
params.push(id);
|
|
const result = await pool.query(
|
|
`UPDATE assets SET ${updates.join(', ')} WHERE id = $${paramCount} RETURNING *`, params
|
|
);
|
|
if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
res.json(result.rows[0]);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /:id/copy
|
|
router.post('/:id/copy', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const { binId, projectId } = req.body;
|
|
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
|
|
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const src = r.rows[0];
|
|
const newId = uuidv4();
|
|
// Bug #60: null out proxy_s3_key and thumbnail_s3_key on the copy to avoid
|
|
// sharing S3 objects with the source. Set status to 'processing' so the copy
|
|
// gets its own proxy generated. Re-queue proxy generation below if source exists.
|
|
const ins = await pool.query(
|
|
`INSERT INTO assets (
|
|
id, project_id, bin_id, filename, display_name,
|
|
status, media_type, original_s3_key, proxy_s3_key, thumbnail_s3_key,
|
|
codec, resolution, fps, duration_ms, start_tc, file_size, tags, notes,
|
|
created_at, updated_at
|
|
) VALUES (
|
|
$1,$2,$3,$4,$5,$6,$7,$8,NULL,NULL,$9,$10,$11,$12,$13,$14,$15,$16,NOW(),NOW()
|
|
) RETURNING *`,
|
|
[
|
|
newId, projectId || src.project_id,
|
|
binId === undefined ? src.bin_id : (binId || null),
|
|
src.filename, src.display_name, 'processing', src.media_type,
|
|
src.original_s3_key,
|
|
src.codec, src.resolution, src.fps, src.duration_ms, src.start_tc,
|
|
src.file_size, src.tags, src.notes,
|
|
]
|
|
);
|
|
const copy = ins.rows[0];
|
|
// Re-queue proxy generation from original_s3_key so the copy gets its own proxy
|
|
if (copy.original_s3_key) {
|
|
const newProxyKey = `proxies/${newId}.mp4`;
|
|
await proxyQueue.add('generate', {
|
|
assetId: newId, inputKey: copy.original_s3_key, outputKey: newProxyKey,
|
|
});
|
|
console.log(`[assets] queued proxy for copy ${newId} from ${newProxyKey}`);
|
|
} else {
|
|
// No source to transcode from — mark ready immediately
|
|
await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [newId]);
|
|
copy.status = 'ready';
|
|
}
|
|
res.status(201).json(copy);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /:id/mark-empty
|
|
router.post('/:id/mark-empty', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
// Bug #66: first check the asset exists and what status it is in
|
|
const check = await pool.query(`SELECT id, status FROM assets WHERE id = $1`, [id]);
|
|
if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const current = check.rows[0].status;
|
|
// Already terminal — nothing to do, return 200 with skipped flag
|
|
if (current === 'error' || current === 'ready') {
|
|
return res.status(200).json({ id, skipped: true });
|
|
}
|
|
// Allow update for 'live' or 'processing' (race condition on shutdown)
|
|
if (current === 'live' || current === 'processing') {
|
|
await pool.query(
|
|
`UPDATE assets
|
|
SET status = 'error',
|
|
notes = COALESCE(notes || E'\\n', '') || 'Recording produced no frames — source never connected.',
|
|
updated_at = NOW()
|
|
WHERE id = $1`,
|
|
[id]
|
|
);
|
|
return res.json({ id });
|
|
}
|
|
// Any other status (e.g. 'archived') is incompatible
|
|
return res.status(409).json({
|
|
error: `Cannot mark-empty an asset with status '${current}'`,
|
|
status: current,
|
|
});
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /:id/generate-proxy
|
|
router.post('/:id/generate-proxy', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
|
|
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const asset = r.rows[0];
|
|
if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no hi-res source to proxy from' });
|
|
const proxyKey = asset.proxy_s3_key || `proxies/${id}.mp4`;
|
|
await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey });
|
|
const updated = await pool.query(
|
|
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [id]
|
|
);
|
|
res.json(updated.rows[0]);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /backfill-proxies
|
|
router.post('/backfill-proxies', async (_req, res, next) => {
|
|
try {
|
|
const targets = await pool.query(
|
|
`SELECT id, original_s3_key FROM assets
|
|
WHERE status = 'ready' AND original_s3_key IS NOT NULL
|
|
AND (proxy_s3_key IS NULL OR proxy_s3_key = '')
|
|
ORDER BY created_at DESC`
|
|
);
|
|
for (const asset of targets.rows) {
|
|
const proxyKey = `proxies/${asset.id}.mp4`;
|
|
await proxyQueue.add('generate', { assetId: asset.id, inputKey: asset.original_s3_key, outputKey: proxyKey });
|
|
}
|
|
if (targets.rows.length > 0) {
|
|
await pool.query(
|
|
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = ANY($1)`,
|
|
[targets.rows.map(r => r.id)]
|
|
);
|
|
}
|
|
res.json({ queued: targets.rows.length });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /:id/reprocess?type=proxy|thumbnail|filmstrip
|
|
// Force-requeue a processing job regardless of current asset status.
|
|
router.post('/:id/reprocess', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const type = req.query.type || 'proxy';
|
|
if (!['proxy', 'thumbnail', 'filmstrip'].includes(type)) {
|
|
return res.status(400).json({ error: 'type must be "proxy", "thumbnail", or "filmstrip"' });
|
|
}
|
|
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
|
|
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const asset = r.rows[0];
|
|
if (type === 'proxy') {
|
|
if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no source file' });
|
|
const proxyKey = `proxies/${id}.mp4`;
|
|
await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey });
|
|
await pool.query(`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1`, [id]);
|
|
return res.json({ queued: 'proxy', assetId: id });
|
|
}
|
|
if (type === 'thumbnail') {
|
|
if (!asset.proxy_s3_key) return res.status(400).json({ error: 'Asset has no proxy' });
|
|
const thumbnailKey = `thumbnails/${id}.jpg`;
|
|
await thumbnailQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key, outputKey: thumbnailKey });
|
|
return res.json({ queued: 'thumbnail', assetId: id });
|
|
}
|
|
if (type === 'filmstrip') {
|
|
if (!asset.proxy_s3_key) return res.status(400).json({ error: 'Asset has no proxy — generate proxy first' });
|
|
await filmstripQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key });
|
|
return res.json({ queued: 'filmstrip', assetId: id });
|
|
}
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /:id/filmstrip — returns signed URL to the pre-built filmstrip JSON
|
|
router.get('/:id/filmstrip', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const r = await pool.query('SELECT filmstrip_s3_key FROM assets WHERE id = $1', [id]);
|
|
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const { filmstrip_s3_key } = r.rows[0];
|
|
if (!filmstrip_s3_key) return res.json({ url: null, ready: false });
|
|
const url = await getSignedUrlForObject(filmstrip_s3_key);
|
|
res.json({ url, ready: true });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /:id/retry
|
|
router.post('/:id/retry', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
|
|
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const asset = r.rows[0];
|
|
if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no source file to reprocess' });
|
|
const canRetry = asset.status === 'error' || !asset.proxy_s3_key;
|
|
if (!canRetry) return res.status(400).json({ error: `Nothing to retry — asset is ${asset.status} and already has a proxy.` });
|
|
const proxyKey = asset.proxy_s3_key || `proxies/${id}.mp4`;
|
|
await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey });
|
|
const updated = await pool.query(
|
|
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [id]
|
|
);
|
|
res.json(updated.rows[0]);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// DELETE /:id
|
|
router.delete('/:id', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const { hard } = req.query;
|
|
if (hard === 'true') {
|
|
const assetResult = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
|
|
if (assetResult.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const asset = assetResult.rows[0];
|
|
|
|
// Remove any pending/waiting BullMQ jobs for this asset before deleting
|
|
// the row — prevents workers from receiving jobs for a non-existent asset.
|
|
for (const queue of [proxyQueue, thumbnailQueue]) {
|
|
try {
|
|
const waiting = await queue.getJobs(['waiting', 'delayed', 'prioritized']);
|
|
for (const job of waiting) {
|
|
if (job.data?.assetId === id) await job.remove();
|
|
}
|
|
} catch (e) {
|
|
console.warn(`[assets] BullMQ cleanup failed for asset ${id}:`, e.message);
|
|
}
|
|
}
|
|
|
|
const s3Errors = [];
|
|
for (const key of [asset.proxy_s3_key, asset.thumbnail_s3_key, asset.original_s3_key]) {
|
|
if (!key) continue;
|
|
try { await deleteObject(key); }
|
|
catch (e) { s3Errors.push({ key, error: e.message }); console.warn(`[assets] s3 delete failed for ${key}:`, e.message); }
|
|
}
|
|
await pool.query('DELETE FROM assets WHERE id = $1', [id]);
|
|
res.json({ message: 'Asset deleted permanently', ...(s3Errors.length ? { s3Errors } : {}) });
|
|
} else {
|
|
const result = await pool.query(
|
|
`UPDATE assets SET status = 'archived', updated_at = NOW() WHERE id = $1 RETURNING *`, [id]
|
|
);
|
|
if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
res.json(result.rows[0]);
|
|
}
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /:id/stream
|
|
router.get('/:id/stream', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
|
|
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const a = r.rows[0];
|
|
if (a.status === 'live') return res.json({ url: `/live/${a.id}/index.m3u8`, type: 'hls', live: true });
|
|
if (a.proxy_s3_key) return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4' });
|
|
// Fall back to original for any video file so uploaded/YouTube clips
|
|
// show a filmstrip even before the proxy worker finishes (#58)
|
|
const orig = a.original_s3_key;
|
|
const VIDEO_EXTS = ['.mp4', '.mov', '.mxf', '.ts', '.m4v', '.mkv', '.avi', '.webm'];
|
|
if (orig && VIDEO_EXTS.some(ext => orig.toLowerCase().endsWith(ext))) {
|
|
return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4', source: 'original' });
|
|
}
|
|
return res.json({ url: null, type: null, reason: 'no_proxy', has_source: !!a.original_s3_key });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /:id/live-path
|
|
router.get('/:id/live-path', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const a = await pool.query('SELECT id, project_id, display_name, status FROM assets WHERE id = $1', [id]);
|
|
if (a.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const asset = a.rows[0];
|
|
if (asset.status !== 'live') return res.status(409).json({ error: 'Asset is not currently growing', status: asset.status });
|
|
const s = await pool.query(`SELECT key, value FROM settings WHERE key IN ('growing_enabled','growing_smb_url')`);
|
|
const cfg = {};
|
|
for (const { key, value } of s.rows) cfg[key] = value;
|
|
if (cfg.growing_enabled !== 'true') return res.status(409).json({ error: 'Growing-files mode is disabled' });
|
|
if (!cfg.growing_smb_url) return res.status(409).json({ error: 'No SMB URL configured — set growing_smb_url in Settings' });
|
|
const rec = await pool.query(
|
|
`SELECT recording_container FROM recorders WHERE current_session_id = $1 ORDER BY updated_at DESC LIMIT 1`,
|
|
[asset.id]
|
|
);
|
|
const ext = rec.rows[0]?.recording_container || 'mov';
|
|
const smbRoot = cfg.growing_smb_url.replace(/\/+$/, '');
|
|
const winPath = smbRoot.replace(/^smb:\/\//, '\\\\').replace(/\//g, '\\') + `\\${asset.project_id}\\${asset.display_name}.${ext}`;
|
|
const posix = smbRoot.replace(/^smb:\/\//, '//') + `/${asset.project_id}/${asset.display_name}.${ext}`;
|
|
res.json({ smb_url: `${smbRoot}/${asset.project_id}/${asset.display_name}.${ext}`, win_path: winPath, posix_path: posix, project_id: asset.project_id, display_name: asset.display_name, ext });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /:id/video
|
|
router.get('/:id/video', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const r = await pool.query('SELECT proxy_s3_key, original_s3_key FROM assets WHERE id = $1', [id]);
|
|
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const a = r.rows[0];
|
|
const VIDEO_EXTS = ['.mp4', '.mov', '.mxf', '.ts', '.m4v', '.mkv', '.avi', '.webm'];
|
|
const origIsVideo = a.original_s3_key && VIDEO_EXTS.some(ext => a.original_s3_key.toLowerCase().endsWith(ext));
|
|
const key = a.proxy_s3_key || (origIsVideo ? a.original_s3_key : null);
|
|
if (!key) return res.status(404).json({ error: 'No browser-playable source' });
|
|
const params = { Bucket: getS3Bucket(), Key: key };
|
|
const rangeHeader = req.headers.range;
|
|
if (rangeHeader) params.Range = rangeHeader;
|
|
const s3Res = await s3Client.send(new GetObjectCommand(params));
|
|
const status = rangeHeader ? 206 : 200;
|
|
const headers = { 'Content-Type': 'video/mp4', 'Accept-Ranges': 'bytes', 'Cache-Control': 'no-store' };
|
|
if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength);
|
|
if (s3Res.ContentRange) headers['Content-Range'] = s3Res.ContentRange;
|
|
res.writeHead(status, headers);
|
|
s3Res.Body.pipe(res);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /:id/hires
|
|
router.get('/:id/hires', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const r = await pool.query('SELECT original_s3_key, filename, display_name, file_size FROM assets WHERE id = $1', [id]);
|
|
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const a = r.rows[0];
|
|
if (!a.original_s3_key) return res.status(404).json({ error: 'No hi-res source available' });
|
|
const url = await getSignedUrlForObject(a.original_s3_key);
|
|
const parts = a.original_s3_key.split('.');
|
|
const ext = (parts.length > 1 ? parts[parts.length - 1] : 'mxf').toLowerCase();
|
|
const base = (a.display_name || a.filename || id).replace(/[^\w.-]/g, '_').substring(0, 100);
|
|
res.json({ url, filename: `${base}.${ext}`, ext, file_size: a.file_size || null, type: 'hires' });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /:id/thumbnail
|
|
router.get('/:id/thumbnail', async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const result = await pool.query('SELECT thumbnail_s3_key FROM assets WHERE id = $1', [id]);
|
|
if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
|
|
const { thumbnail_s3_key } = result.rows[0];
|
|
if (!thumbnail_s3_key) return res.status(404).json({ error: 'Thumbnail not yet available' });
|
|
const url = await getSignedUrlForObject(thumbnail_s3_key);
|
|
if (req.query.redirect === '1') return res.redirect(302, url);
|
|
res.json({ url });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /batch-trim
|
|
router.post('/batch-trim', async (req, res, next) => {
|
|
try {
|
|
const { clips } = req.body;
|
|
if (!Array.isArray(clips) || clips.length === 0) return res.status(400).json({ error: 'clips array is required and must be non-empty' });
|
|
for (const c of clips) {
|
|
if (!c.assetId || !c.filename ||
|
|
!Number.isFinite(Number(c.sourceInFrames)) || !Number.isFinite(Number(c.sourceOutFrames)) ||
|
|
!Number.isFinite(Number(c.timelineInFrames)) || !Number.isFinite(Number(c.timelineOutFrames)) ||
|
|
!Number.isInteger(Number(c.trackIndex)) || Number(c.trackIndex) < 0) {
|
|
return res.status(400).json({ error: 'Each clip must have assetId, filename, sourceInFrames, sourceOutFrames, timelineInFrames, timelineOutFrames, and a non-negative integer trackIndex' });
|
|
}
|
|
}
|
|
const jobId = uuidv4();
|
|
const expiresAt = new Date(Date.now() + 24 * 60 * 60 * 1000);
|
|
await pool.query(
|
|
`INSERT INTO jobs (id, type, status, payload, expires_at) VALUES ($1,$2,$3,$4,$5)`,
|
|
[jobId, 'trim', 'queued', JSON.stringify({ clips }), expiresAt]
|
|
);
|
|
const clipResults = [];
|
|
for (const c of clips) {
|
|
const clipInstanceId = uuidv4();
|
|
await trimQueue.add('trim-clip', { jobId, clipInstanceId, assetId: c.assetId, filename: c.filename, sourceInFrames: c.sourceInFrames, sourceOutFrames: c.sourceOutFrames, timelineInFrames: c.timelineInFrames, timelineOutFrames: c.timelineOutFrames, trackIndex: c.trackIndex });
|
|
await pool.query(`INSERT INTO temp_segments (job_id, clip_instance_id, asset_id, s3_key, expires_at) VALUES ($1,$2,$3,'',$4)`, [jobId, clipInstanceId, c.assetId, expiresAt]);
|
|
clipResults.push({ clipInstanceId, status: 'queued' });
|
|
}
|
|
res.status(201).json({ jobId, clips: clipResults });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /trim-status/:jobId
|
|
router.get('/trim-status/:jobId', async (req, res, next) => {
|
|
try {
|
|
const { jobId } = req.params;
|
|
const jobResult = await pool.query('SELECT * FROM jobs WHERE id = $1', [jobId]);
|
|
if (jobResult.rows.length === 0) return res.status(404).json({ error: 'Trim job not found' });
|
|
const job = jobResult.rows[0];
|
|
|
|
// Auto-expire: delete stale jobs rows (and their temp_segments) past TTL
|
|
if (job.expires_at && new Date(job.expires_at) < new Date()) {
|
|
await pool.query('DELETE FROM temp_segments WHERE job_id = $1', [jobId]);
|
|
await pool.query('DELETE FROM jobs WHERE id = $1', [jobId]);
|
|
return res.status(404).json({ error: 'Trim job expired' });
|
|
}
|
|
const segResult = await pool.query(`SELECT clip_instance_id, asset_id, s3_key, expires_at FROM temp_segments WHERE job_id = $1 ORDER BY created_at`, [jobId]);
|
|
const clips = segResult.rows.map(row => ({ clipInstanceId: row.clip_instance_id, assetId: row.asset_id, s3Key: row.s3_key || null, status: row.s3_key ? 'completed' : job.status, expiresAt: row.expires_at }));
|
|
res.json({ jobId, status: job.status, clips });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /temp-segment-url/:clipInstanceId
|
|
router.get('/temp-segment-url/:clipInstanceId', async (req, res, next) => {
|
|
try {
|
|
const { clipInstanceId } = req.params;
|
|
const result = await pool.query('SELECT s3_key FROM temp_segments WHERE clip_instance_id = $1 AND expires_at > NOW()', [clipInstanceId]);
|
|
if (result.rows.length === 0) return res.status(404).json({ error: 'Temp segment not found or expired' });
|
|
const { s3_key } = result.rows[0];
|
|
if (!s3_key) return res.status(404).json({ error: 'Segment not yet processed' });
|
|
const url = await getSignedUrlForObject(s3_key);
|
|
res.json({ url, s3Key: s3_key });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
export default router;
|