import express from 'express'; import { Queue } from 'bullmq'; import { v4 as uuidv4 } from 'uuid'; import pool from '../db/pool.js'; import { getSignedUrlForObject, deleteObject, s3Client, getS3Bucket } from '../s3/client.js'; import { GetObjectCommand } from '@aws-sdk/client-s3'; import { requireAuth } from '../middleware/auth.js'; const router = express.Router(); router.use(requireAuth); // BullMQ queue connection (mirrors worker/src/index.js) const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; }; const proxyQueue = new Queue('proxy', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); const thumbnailQueue = new Queue('thumbnail', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); // GET / - List assets with filtering router.get('/', async (req, res, next) => { try { const { project_id, bin_id, status, search, media_type, limit = 50, offset = 0, include_archived, } = req.query; let query = ` SELECT a.*, COUNT(*) OVER() AS full_count FROM assets a WHERE 1=1 `; const params = []; let paramCount = 1; if (!status && include_archived !== 'true') { query += ` AND a.status <> 'archived'`; } if (project_id) { query += ` AND a.project_id = $${paramCount++}`; params.push(project_id); } if (bin_id) { query += ` AND a.bin_id = $${paramCount++}`; params.push(bin_id); } if (status) { query += ` AND a.status = $${paramCount++}`; params.push(status); } if (media_type) { query += ` AND a.media_type = $${paramCount++}`; params.push(media_type); } if (search) { query += ` AND (a.display_name ILIKE $${paramCount} OR a.filename ILIKE $${paramCount} OR a.notes ILIKE $${paramCount})`; params.push(`%${search}%`); paramCount++; } query += ` ORDER BY a.created_at DESC`; query += ` LIMIT $${paramCount++} OFFSET $${paramCount++}`; params.push(limit, offset); const result = await pool.query(query, params); const total = result.rows.length > 0 ? parseInt(result.rows[0].full_count, 10) : 0; res.json({ assets: result.rows.map(({ full_count, ...rest }) => rest), total, }); } catch (err) { next(err); } }); // POST / - Register a new asset from a completed capture session router.post('/', async (req, res, next) => { try { const { projectId, binId, clipName, hiresKey, proxyKey, duration, capturedAt, } = req.body; if (!projectId || !clipName) { return res.status(400).json({ error: 'projectId and clipName are required' }); } const durationMs = duration ? Math.round(duration * 1000) : null; const existing = await pool.query( `SELECT * FROM assets WHERE project_id = $1 AND display_name = $2 AND status = 'live' ORDER BY created_at DESC LIMIT 1`, [projectId, clipName] ); let id; let asset; if (existing.rows.length > 0) { id = existing.rows[0].id; const upd = await pool.query( `UPDATE assets SET status = 'processing', original_s3_key = COALESCE($2, original_s3_key), proxy_s3_key = COALESCE($3, proxy_s3_key), duration_ms = COALESCE($4, duration_ms), bin_id = COALESCE(bin_id, $5), updated_at = NOW() WHERE id = $1 RETURNING *`, [id, hiresKey || null, proxyKey || null, durationMs, binId || null] ); asset = upd.rows[0]; } else { id = uuidv4(); const ins = await pool.query( `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, proxy_s3_key, duration_ms, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $4, 'processing', 'video', $5, $6, $7, COALESCE($8::timestamptz, NOW()), NOW() ) RETURNING *`, [ id, projectId, binId || null, clipName, hiresKey || null, proxyKey || null, durationMs, capturedAt || null, ] ); asset = ins.rows[0]; } const thumbnailKey = `thumbnails/${id}.jpg`; if (proxyKey) { await thumbnailQueue.add('generate', { assetId: id, proxyKey, outputKey: thumbnailKey, }); } else { await pool.query( `UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id] ); asset.status = 'ready'; } res.status(existing.rows.length > 0 ? 200 : 201).json(asset); } catch (err) { next(err); } }); // POST /cleanup-live router.post('/cleanup-live', async (req, res, next) => { try { const maxAgeHours = Math.max(1, parseInt(req.query.max_age_hours || '4', 10)); const result = await pool.query( `UPDATE assets SET status = 'error', updated_at = NOW() WHERE status = 'live' AND created_at < NOW() - ($1 * INTERVAL '1 hour') RETURNING id, display_name, project_id, created_at`, [maxAgeHours] ); res.json({ cleaned: result.rowCount, assets: result.rows }); } catch (err) { next(err); } }); // GET /:id router.get('/:id', async (req, res, next) => { try { const { id } = req.params; const result = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); res.json(result.rows[0]); } catch (err) { next(err); } }); // PATCH /:id router.patch('/:id', async (req, res, next) => { try { const { id } = req.params; const { display_name, tags, notes, bin_id } = req.body; const updates = []; const params = []; let paramCount = 1; if (display_name !== undefined) { updates.push(`display_name = $${paramCount++}`); params.push(display_name); } if (tags !== undefined) { updates.push(`tags = $${paramCount++}`); params.push(tags); } if (notes !== undefined) { updates.push(`notes = $${paramCount++}`); params.push(notes); } if (bin_id !== undefined) { updates.push(`bin_id = $${paramCount++}`); params.push(bin_id || null); } if (updates.length === 0) return res.status(400).json({ error: 'No fields to update' }); updates.push(`updated_at = NOW()`); params.push(id); const result = await pool.query( `UPDATE assets SET ${updates.join(', ')} WHERE id = $${paramCount} RETURNING *`, params ); if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); res.json(result.rows[0]); } catch (err) { next(err); } }); // POST /:id/copy router.post('/:id/copy', async (req, res, next) => { try { const { id } = req.params; const { binId, projectId } = req.body; const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const src = r.rows[0]; const newId = uuidv4(); const ins = await pool.query( `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, proxy_s3_key, thumbnail_s3_key, codec, resolution, fps, duration_ms, start_tc, file_size, tags, notes, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, NOW(), NOW() ) RETURNING *`, [ newId, projectId || src.project_id, binId === undefined ? src.bin_id : (binId || null), src.filename, src.display_name, src.status, src.media_type, src.original_s3_key, src.proxy_s3_key, src.thumbnail_s3_key, src.codec, src.resolution, src.fps, src.duration_ms, src.start_tc, src.file_size, src.tags, src.notes, ] ); res.status(201).json(ins.rows[0]); } catch (err) { next(err); } }); // POST /:id/retry router.post('/:id/retry', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const asset = r.rows[0]; if (asset.status !== 'error') return res.status(400).json({ error: `Asset is not in error state (current: ${asset.status})` }); if (!asset.original_s3_key) return res.status(400).json({ error: 'Asset has no source file to reprocess' }); const proxyKey = asset.proxy_s3_key || `proxies/${id}.mp4`; await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: proxyKey }); const updated = await pool.query( `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [id] ); res.json(updated.rows[0]); } catch (err) { next(err); } }); // DELETE /:id router.delete('/:id', async (req, res, next) => { try { const { id } = req.params; const { hard } = req.query; if (hard === 'true') { const assetResult = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (assetResult.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const asset = assetResult.rows[0]; if (asset.proxy_s3_key) await deleteObject(asset.proxy_s3_key); if (asset.thumbnail_s3_key) await deleteObject(asset.thumbnail_s3_key); if (asset.original_s3_key) await deleteObject(asset.original_s3_key); await pool.query('DELETE FROM assets WHERE id = $1', [id]); res.json({ message: 'Asset deleted permanently' }); } else { const result = await pool.query( `UPDATE assets SET status = 'archived', updated_at = NOW() WHERE id = $1 RETURNING *`, [id] ); if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); res.json(result.rows[0]); } } catch (err) { next(err); } }); // GET /:id/stream router.get('/:id/stream', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const a = r.rows[0]; if (a.status === 'live') return res.json({ url: `/live/${a.id}/index.m3u8`, type: 'hls', live: true }); if (a.proxy_s3_key) return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4' }); const orig = a.original_s3_key; if (orig && orig.toLowerCase().endsWith('.mp4')) return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4' }); return res.json({ url: null, type: null, reason: 'no_proxy' }); } catch (err) { next(err); } }); // GET /:id/video router.get('/:id/video', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query('SELECT proxy_s3_key, original_s3_key FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const a = r.rows[0]; const key = a.proxy_s3_key || (a.original_s3_key?.toLowerCase().endsWith('.mp4') ? a.original_s3_key : null); if (!key) return res.status(404).json({ error: 'No browser-playable source' }); const params = { Bucket: getS3Bucket(), Key: key }; const rangeHeader = req.headers.range; if (rangeHeader) params.Range = rangeHeader; const s3Res = await s3Client.send(new GetObjectCommand(params)); const status = rangeHeader ? 206 : 200; const headers = { 'Content-Type': 'video/mp4', 'Accept-Ranges': 'bytes', 'Cache-Control': 'no-store' }; if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength); if (s3Res.ContentRange) headers['Content-Range'] = s3Res.ContentRange; res.writeHead(status, headers); s3Res.Body.pipe(res); } catch (err) { next(err); } }); // GET /:id/hires router.get('/:id/hires', async (req, res, next) => { try { const { id } = req.params; const r = await pool.query( 'SELECT original_s3_key, filename, display_name, file_size FROM assets WHERE id = $1', [id] ); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const a = r.rows[0]; if (!a.original_s3_key) return res.status(404).json({ error: 'No hi-res source available' }); const url = await getSignedUrlForObject(a.original_s3_key); const parts = a.original_s3_key.split('.'); const ext = (parts.length > 1 ? parts[parts.length - 1] : 'mxf').toLowerCase(); const base = (a.display_name || a.filename || id).replace(/[^\w.-]/g, '_').substring(0, 100); res.json({ url, filename: `${base}.${ext}`, ext, file_size: a.file_size || null, type: 'hires' }); } catch (err) { next(err); } }); // GET /:id/thumbnail router.get('/:id/thumbnail', async (req, res, next) => { try { const { id } = req.params; const result = await pool.query('SELECT thumbnail_s3_key FROM assets WHERE id = $1', [id]); if (result.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const { thumbnail_s3_key } = result.rows[0]; if (!thumbnail_s3_key) return res.status(404).json({ error: 'Thumbnail not yet available' }); const url = await getSignedUrlForObject(thumbnail_s3_key); if (req.query.redirect === '1') return res.redirect(302, url); res.json({ url }); } catch (err) { next(err); } }); export default router;