import express from 'express'; import { Queue } from 'bullmq'; import { v4 as uuidv4 } from 'uuid'; import { promises as fs } from 'node:fs'; import path from 'node:path'; import pool from '../db/pool.js'; import { getSignedUrlForObject, deleteObject, s3Client, getS3Bucket } from '../s3/client.js'; import { GetObjectCommand } from '@aws-sdk/client-s3'; import { requireAuth } from '../middleware/auth.js'; const router = express.Router(); router.use(requireAuth); // BullMQ queue connection (mirrors worker/src/index.js) const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; }; const proxyQueue = new Queue('proxy', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); const thumbnailQueue = new Queue('thumbnail', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); const trimQueue = new Queue('trim', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); // GET / - List assets with filtering router.get('/', async (req, res, next) => { try { const { project_id, bin_id, status, search, media_type, limit = 50, offset = 0, include_archived, } = req.query; let query = ` SELECT a.*, COUNT(*) OVER() AS full_count FROM assets a WHERE 1=1 `; const params = []; let paramCount = 1; // Exclude archived unless explicitly requested — independent of status filter if (include_archived !== 'true') { query += ` AND a.status <> 'archived'`; } if (project_id) { query += ` AND a.project_id = $${paramCount++}`; params.push(project_id); } if (bin_id) { query += ` AND a.bin_id = $${paramCount++}`; params.push(bin_id); } if (status) { query += ` AND a.status = $${paramCount++}`; params.push(status); } if (media_type) { query += ` AND a.media_type = $${paramCount++}`; params.push(media_type); } if (search) { query += ` AND (a.display_name ILIKE $${paramCount} OR a.filename ILIKE $${paramCount} OR a.notes ILIKE $${paramCount})`; params.push(`%${search}%`); paramCount++; } query += ` ORDER BY a.created_at DESC`; query += ` LIMIT $${paramCount++} OFFSET $${paramCount++}`; params.push(limit, offset); const result = await pool.query(query, params); const total = result.rows.length > 0 ? parseInt(result.rows[0].full_count, 10) : 0; res.json({ assets: result.rows.map(({ full_count, ...rest }) => rest), total, }); } catch (err) { next(err); } }); // POST / - Register a new asset from a completed capture session router.post('/', async (req, res, next) => { try { const { projectId, binId, clipName, hiresKey, proxyKey, duration, capturedAt, } = req.body; if (!projectId || !clipName) { return res.status(400).json({ error: 'projectId and clipName are required' }); } const durationNum = duration !== undefined && duration !== null ? Number(duration) : null; if (durationNum !== null && !Number.isFinite(durationNum)) { return res.status(400).json({ error: 'duration must be a finite number (seconds)' }); } const durationMs = durationNum !== null ? Math.round(durationNum * 1000) : null; const existing = await pool.query( `SELECT * FROM assets WHERE project_id = $1 AND display_name = $2 AND status = 'live' ORDER BY created_at DESC LIMIT 1`, [projectId, clipName] ); let id; let asset; if (existing.rows.length > 0) { id = existing.rows[0].id; const upd = await pool.query( `UPDATE assets SET status = 'processing', original_s3_key = COALESCE($2, original_s3_key), proxy_s3_key = COALESCE($3, proxy_s3_key), duration_ms = COALESCE($4, duration_ms), bin_id = COALESCE(bin_id, $5), updated_at = NOW() WHERE id = $1 RETURNING *`, [id, hiresKey || null, proxyKey || null, durationMs, binId || null] ); asset = upd.rows[0]; } else { id = uuidv4(); const ins = await pool.query( `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, proxy_s3_key, duration_ms, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $4, 'processing', 'video', $5, $6, $7, COALESCE($8::timestamptz, NOW()), NOW() ) RETURNING *`, [ id, projectId, binId || null, clipName, hiresKey || null, proxyKey || null, durationMs, capturedAt || null, ] ); asset = ins.rows[0]; } const thumbnailKey = `thumbnails/${id}.jpg`; if (proxyKey) { await thumbnailQueue.add('generate', { assetId: id, proxyKey, outputKey: thumbnailKey }); } else if (asset.original_s3_key) { const generatedProxyKey = `proxies/${id}.mp4`; await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: generatedProxyKey, }); console.log(`[assets] queued proxy for ${id} (${asset.display_name})`); } else { await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id]); asset.status = 'ready'; } res.status(existing.rows.length > 0 ? 200 : 201).json(asset); } catch (err) { next(err); } }); // POST /cleanup-live router.post('/cleanup-live', async (req, res, next) => { try { const maxAgeHours = Math.max(1, parseInt(req.query.max_age_hours || '4', 10)); const result = await pool.query( `UPDATE assets SET status = 'error', updated_at = NOW() WHERE status = 'live' AND created_at < NOW() - ($1 * INTERVAL '1 hour') RETURNING id, display_name, project_id, created_at`, [maxAgeHours] ); res.json({ cleaned: result.rowCount, assets: result.rows }); } catch (err) { next(err); } }); // POST /cleanup-live-orphans router.post('/cleanup-live-orphans', async (_req, res, next) => { try { const liveRoot = process.env.LIVE_DIR || '/live'; let entries; try { entries = await fs.readdir(liveRoot, { withFileTypes: true }); } catch (err) { if (err.code === 'ENOENT') return res.json({ reaped: 0, kept: 0, dirs: [] }); throw err; } const uuidRe = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; const dirIds = entries.filter(e => e.isDirectory() && uuidRe.test(e.name)).map(e => e.name); if (dirIds.length === 0) return res.json({ reaped: 0, kept: 0, dirs: [] }); const known = await pool.query('SELECT id FROM assets WHERE id = ANY($1::uuid[])', [dirIds]); const keep = new Set(known.rows.map(r => r.id)); const reaped = [], kept = []; for (const id of dirIds) { if (keep.has(id)) { kept.push(id); continue; } const fullPath = path.join(liveRoot, id); try { await fs.rm(fullPath, { recursive: true, force: true }); reaped.push(id); console.log(`[assets] reaped orphan live dir ${fullPath}`); } catch (err) { console.warn(`[assets] failed to reap ${fullPath}: ${err.message}`); } } res.json({ reaped: reaped.length, kept: kept.length, dirs: reaped }); } catch (err) { next(err); } }); // GET /:id router.get('/:id', async (req, res, next) => { try { const { id } = req.params; const result = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); res.json(result.rows[0]); } catch (err) { next(err); } }); // PATCH /:id router.patch('/:id', async (req, res, next) => { try { const { id } = req.params; const { display_name, tags, notes, bin_id } = req.body; const updates = [], params = []; let paramCount = 1; if (display_name !== undefined) { updates.push(`display_name = $${paramCount++}`); params.push(display_name); } if (tags !== undefined) { updates.push(`tags = $${paramCount++}`); params.push(tags); } if (notes !== undefined) { updates.push(`notes = $${paramCount++}`); params.push(notes); } if (bin_id !== undefined) { updates.push(`bin_id = $${paramCount++}`); params.push(bin_id || null); } if (updates.length === 0) return res.status(400).json({ error: 'No fields to update' }); updates.push(`updated_at = NOW()`); params.push(id); const result = await pool.query( `UPDATE assets SET ${updates.join(', ')} WHERE id = $${paramCount} RETURNING *`, params ); if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); res.json(result.rows[0]); } catch (err) { next(err); } }); // POST /:id/copy router.post('/:id/copy', async (req, res, next) => { try { const { id } = req.params; const { binId, projectId } = req.body; const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const src = r.rows[0]; const newId = uuidv4(); const ins = await pool.query( `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, proxy_s3_key, thumbnail_s3_key, codec, resolution, fps, duration_ms, start_tc, file_size, tags, notes, created_at, updated_at ) VALUES ( $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,NOW(),NOW() ) RETURNING *`, [ newId, projectId || src.project_id, binId === undefined ? src.bin_id : (binId || null), src.filename, src.display_name, src.status, src.media_type, src.original_s3_key, src.proxy_s3_key, src.thumbnail_s3_key, src.codec, src.resolution, src.fps, src.duration_ms, src.start_tc, src.file_size, src.tags, src.notes, ] ); res.status(201).json(ins.rows[0]); } catch (err) { next(err); } }); // POST /:id/mark-empty router.post('/:id/mark-empty', async (req, res, next) => { try { const { id } = req.params; const result = await pool.query( `UPDATE assets SET status = 'error', notes = COALESCE(notes || E'\\n', '') || 'Recording produced no frames — source never connected.', updated_at = NOW() WHERE id = $1 AND status = 'live' RETURNING id`, [id] ); if (result.rows.length === 0) return res.status(404).json({ error: 'No matching live asset' }); res.json({ id }); } catch (err) { next(err); } }); // POST /:id/generate-proxy router.post('/:id/generate-proxy', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const asset = r.rows[0]; if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no hi-res source to proxy from' }); const proxyKey = asset.proxy_s3_key || `proxies/${id}.mp4`; await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey }); const updated = await pool.query( `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [id] ); res.json(updated.rows[0]); } catch (err) { next(err); } }); // POST /backfill-proxies router.post('/backfill-proxies', async (_req, res, next) => { try { const targets = await pool.query( `SELECT id, original_s3_key FROM assets WHERE status = 'ready' AND original_s3_key IS NOT NULL AND (proxy_s3_key IS NULL OR proxy_s3_key = '') ORDER BY created_at DESC` ); for (const asset of targets.rows) { const proxyKey = `proxies/${asset.id}.mp4`; await proxyQueue.add('generate', { assetId: asset.id, inputKey: asset.original_s3_key, outputKey: proxyKey }); } if (targets.rows.length > 0) { await pool.query( `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = ANY($1)`, [targets.rows.map(r => r.id)] ); } res.json({ queued: targets.rows.length }); } catch (err) { next(err); } }); // POST /:id/retry router.post('/:id/retry', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const asset = r.rows[0]; if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no source file to reprocess' }); const canRetry = asset.status === 'error' || !asset.proxy_s3_key; if (!canRetry) return res.status(400).json({ error: `Nothing to retry — asset is ${asset.status} and already has a proxy.` }); const proxyKey = asset.proxy_s3_key || `proxies/${id}.mp4`; await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey }); const updated = await pool.query( `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [id] ); res.json(updated.rows[0]); } catch (err) { next(err); } }); // DELETE /:id router.delete('/:id', async (req, res, next) => { try { const { id } = req.params; const { hard } = req.query; if (hard === 'true') { const assetResult = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (assetResult.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const asset = assetResult.rows[0]; const s3Errors = []; for (const key of [asset.proxy_s3_key, asset.thumbnail_s3_key, asset.original_s3_key]) { if (!key) continue; try { await deleteObject(key); } catch (e) { s3Errors.push({ key, error: e.message }); console.warn(`[assets] s3 delete failed for ${key}:`, e.message); } } await pool.query('DELETE FROM assets WHERE id = $1', [id]); res.json({ message: 'Asset deleted permanently', ...(s3Errors.length ? { s3Errors } : {}) }); } else { const result = await pool.query( `UPDATE assets SET status = 'archived', updated_at = NOW() WHERE id = $1 RETURNING *`, [id] ); if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); res.json(result.rows[0]); } } catch (err) { next(err); } }); // GET /:id/stream router.get('/:id/stream', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const a = r.rows[0]; if (a.status === 'live') return res.json({ url: `/live/${a.id}/index.m3u8`, type: 'hls', live: true }); if (a.proxy_s3_key) return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4' }); const orig = a.original_s3_key; if (orig && orig.toLowerCase().endsWith('.mp4')) return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4' }); return res.json({ url: null, type: null, reason: 'no_proxy', has_source: !!a.original_s3_key }); } catch (err) { next(err); } }); // GET /:id/live-path router.get('/:id/live-path', async (req, res, next) => { try { const { id } = req.params; const a = await pool.query('SELECT id, project_id, display_name, status FROM assets WHERE id = $1', [id]); if (a.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const asset = a.rows[0]; if (asset.status !== 'live') return res.status(409).json({ error: 'Asset is not currently growing', status: asset.status }); const s = await pool.query(`SELECT key, value FROM settings WHERE key IN ('growing_enabled','growing_smb_url')`); const cfg = {}; for (const { key, value } of s.rows) cfg[key] = value; if (cfg.growing_enabled !== 'true') return res.status(409).json({ error: 'Growing-files mode is disabled' }); if (!cfg.growing_smb_url) return res.status(409).json({ error: 'No SMB URL configured — set growing_smb_url in Settings' }); const rec = await pool.query( `SELECT recording_container FROM recorders WHERE current_session_id = $1 ORDER BY updated_at DESC LIMIT 1`, [asset.id] ); const ext = rec.rows[0]?.recording_container || 'mov'; const smbRoot = cfg.growing_smb_url.replace(/\/+$/, ''); const winPath = smbRoot.replace(/^smb:\/\//, '\\\\').replace(/\//g, '\\') + `\\${asset.project_id}\\${asset.display_name}.${ext}`; const posix = smbRoot.replace(/^smb:\/\//, '//') + `/${asset.project_id}/${asset.display_name}.${ext}`; res.json({ smb_url: `${smbRoot}/${asset.project_id}/${asset.display_name}.${ext}`, win_path: winPath, posix_path: posix, project_id: asset.project_id, display_name: asset.display_name, ext }); } catch (err) { next(err); } }); // GET /:id/video router.get('/:id/video', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query('SELECT proxy_s3_key, original_s3_key FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const a = r.rows[0]; const key = a.proxy_s3_key || (a.original_s3_key?.toLowerCase().endsWith('.mp4') ? a.original_s3_key : null); if (!key) return res.status(404).json({ error: 'No browser-playable source' }); const params = { Bucket: getS3Bucket(), Key: key }; const rangeHeader = req.headers.range; if (rangeHeader) params.Range = rangeHeader; const s3Res = await s3Client.send(new GetObjectCommand(params)); const status = rangeHeader ? 206 : 200; const headers = { 'Content-Type': 'video/mp4', 'Accept-Ranges': 'bytes', 'Cache-Control': 'no-store' }; if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength); if (s3Res.ContentRange) headers['Content-Range'] = s3Res.ContentRange; res.writeHead(status, headers); s3Res.Body.pipe(res); } catch (err) { next(err); } }); // GET /:id/hires router.get('/:id/hires', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query('SELECT original_s3_key, filename, display_name, file_size FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const a = r.rows[0]; if (!a.original_s3_key) return res.status(404).json({ error: 'No hi-res source available' }); const url = await getSignedUrlForObject(a.original_s3_key); const parts = a.original_s3_key.split('.'); const ext = (parts.length > 1 ? parts[parts.length - 1] : 'mxf').toLowerCase(); const base = (a.display_name || a.filename || id).replace(/[^\w.-]/g, '_').substring(0, 100); res.json({ url, filename: `${base}.${ext}`, ext, file_size: a.file_size || null, type: 'hires' }); } catch (err) { next(err); } }); // GET /:id/thumbnail router.get('/:id/thumbnail', async (req, res, next) => { try { const { id } = req.params; const result = await pool.query('SELECT thumbnail_s3_key FROM assets WHERE id = $1', [id]); if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const { thumbnail_s3_key } = result.rows[0]; if (!thumbnail_s3_key) return res.status(404).json({ error: 'Thumbnail not yet available' }); const url = await getSignedUrlForObject(thumbnail_s3_key); if (req.query.redirect === '1') return res.redirect(302, url); res.json({ url }); } catch (err) { next(err); } }); // POST /batch-trim router.post('/batch-trim', async (req, res, next) => { try { const { clips } = req.body; if (!Array.isArray(clips) || clips.length === 0) return res.status(400).json({ error: 'clips array is required and must be non-empty' }); for (const c of clips) { if (!c.assetId || !c.filename || !Number.isFinite(Number(c.sourceInFrames)) || !Number.isFinite(Number(c.sourceOutFrames)) || !Number.isFinite(Number(c.timelineInFrames)) || !Number.isFinite(Number(c.timelineOutFrames)) || !Number.isInteger(Number(c.trackIndex)) || Number(c.trackIndex) < 0) { return res.status(400).json({ error: 'Each clip must have assetId, filename, sourceInFrames, sourceOutFrames, timelineInFrames, timelineOutFrames, and a non-negative integer trackIndex' }); } } const jobId = uuidv4(); const expiresAt = new Date(Date.now() + 24 * 60 * 60 * 1000); await pool.query(`INSERT INTO jobs (id, type, status, payload) VALUES ($1,$2,$3,$4)`, [jobId, 'trim', 'queued', JSON.stringify({ clips })]); const clipResults = []; for (const c of clips) { const clipInstanceId = uuidv4(); await trimQueue.add('trim-clip', { jobId, clipInstanceId, assetId: c.assetId, filename: c.filename, sourceInFrames: c.sourceInFrames, sourceOutFrames: c.sourceOutFrames, timelineInFrames: c.timelineInFrames, timelineOutFrames: c.timelineOutFrames, trackIndex: c.trackIndex }); await pool.query(`INSERT INTO temp_segments (job_id, clip_instance_id, asset_id, s3_key, expires_at) VALUES ($1,$2,$3,'',$4)`, [jobId, clipInstanceId, c.assetId, expiresAt]); clipResults.push({ clipInstanceId, status: 'queued' }); } res.status(201).json({ jobId, clips: clipResults }); } catch (err) { next(err); } }); // GET /trim-status/:jobId router.get('/trim-status/:jobId', async (req, res, next) => { try { const { jobId } = req.params; const jobResult = await pool.query('SELECT * FROM jobs WHERE id = $1', [jobId]); if (jobResult.rows.length === 0) return res.status(404).json({ error: 'Trim job not found' }); const job = jobResult.rows[0]; const segResult = await pool.query(`SELECT clip_instance_id, asset_id, s3_key, expires_at FROM temp_segments WHERE job_id = $1 ORDER BY created_at`, [jobId]); const clips = segResult.rows.map(row => ({ clipInstanceId: row.clip_instance_id, assetId: row.asset_id, s3Key: row.s3_key || null, status: row.s3_key ? 'completed' : job.status, expiresAt: row.expires_at })); res.json({ jobId, status: job.status, clips }); } catch (err) { next(err); } }); // GET /temp-segment-url/:clipInstanceId router.get('/temp-segment-url/:clipInstanceId', async (req, res, next) => { try { const { clipInstanceId } = req.params; const result = await pool.query('SELECT s3_key FROM temp_segments WHERE clip_instance_id = $1 AND expires_at > NOW()', [clipInstanceId]); if (result.rows.length === 0) return res.status(404).json({ error: 'Temp segment not found or expired' }); const { s3_key } = result.rows[0]; if (!s3_key) return res.status(404).json({ error: 'Segment not yet processed' }); const url = await getSignedUrlForObject(s3_key); res.json({ url, s3Key: s3_key }); } catch (err) { next(err); } }); export default router;