dragonflight/services/mam-api/src/routes/assets.js

512 lines
15 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import express from 'express';
import { Queue } from 'bullmq';
import { v4 as uuidv4 } from 'uuid';
import pool from '../db/pool.js';
import { getSignedUrlForObject, deleteObject, s3Client, S3_BUCKET } from '../s3/client.js';
import { GetObjectCommand } from '@aws-sdk/client-s3';
import { requireAuth } from '../middleware/auth.js';
const router = express.Router();
router.use(requireAuth);
// BullMQ queue connection (mirrors worker/src/index.js)
const parseRedisUrl = (url) => {
const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
};
const proxyQueue = new Queue('proxy', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
const thumbnailQueue = new Queue('thumbnail', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
// GET / - List assets with filtering
router.get('/', async (req, res, next) => {
try {
const {
project_id,
bin_id,
status,
search,
media_type,
limit = 50,
offset = 0,
include_archived,
} = req.query;
let query = `
SELECT a.*,
COUNT(*) OVER() AS full_count
FROM assets a
WHERE 1=1
`;
const params = [];
let paramCount = 1;
// Hide archived rows unless explicitly asked for, or unless the caller is
// already filtering by status=archived.
if (!status && include_archived !== 'true') {
query += ` AND a.status <> 'archived'`;
}
if (project_id) {
query += ` AND a.project_id = $${paramCount++}`;
params.push(project_id);
}
if (bin_id) {
query += ` AND a.bin_id = $${paramCount++}`;
params.push(bin_id);
}
if (status) {
query += ` AND a.status = $${paramCount++}`;
params.push(status);
}
if (media_type) {
query += ` AND a.media_type = $${paramCount++}`;
params.push(media_type);
}
if (search) {
// Search display_name, filename, and notes — filename was previously omitted
query += ` AND (a.display_name ILIKE $${paramCount} OR a.filename ILIKE $${paramCount} OR a.notes ILIKE $${paramCount})`;
params.push(`%${search}%`);
paramCount++;
}
query += ` ORDER BY a.created_at DESC`;
query += ` LIMIT $${paramCount++} OFFSET $${paramCount++}`;
params.push(limit, offset);
const result = await pool.query(query, params);
const total = result.rows.length > 0 ? parseInt(result.rows[0].full_count, 10) : 0;
res.json({
assets: result.rows.map(({ full_count, ...rest }) => rest),
total,
});
} catch (err) {
next(err);
}
});
// POST / - Register a new asset from a completed capture session
//
// Called by the capture service immediately after stop() completes.
// At this point both the HiRes and Proxy files already exist in S3
// (written by the dual FFmpeg stream), so we set status='processing'
// and immediately dispatch a thumbnail job — no proxy_gen needed.
router.post('/', async (req, res, next) => {
try {
const {
projectId,
binId,
clipName,
hiresKey,
proxyKey,
duration, // seconds (integer)
capturedAt, // ISO 8601 string
} = req.body;
if (!projectId || !clipName) {
return res.status(400).json({ error: 'projectId and clipName are required' });
}
const durationMs = duration ? Math.round(duration * 1000) : null;
// Phase 1 growing-files: an asset row may already exist in status='live'
// (pre-created at recorder start so the library shows the recording while
// it is happening). If so we UPDATE that row instead of inserting a new
// one -- otherwise we would have two rows per recording.
const existing = await pool.query(
`SELECT * FROM assets
WHERE project_id = $1 AND display_name = $2 AND status = 'live'
ORDER BY created_at DESC LIMIT 1`,
[projectId, clipName]
);
let id;
let asset;
if (existing.rows.length > 0) {
id = existing.rows[0].id;
const upd = await pool.query(
`UPDATE assets
SET status = 'processing',
original_s3_key = COALESCE($2, original_s3_key),
proxy_s3_key = COALESCE($3, proxy_s3_key),
duration_ms = COALESCE($4, duration_ms),
bin_id = COALESCE(bin_id, $5),
updated_at = NOW()
WHERE id = $1
RETURNING *`,
[id, hiresKey || null, proxyKey || null, durationMs, binId || null]
);
asset = upd.rows[0];
} else {
id = uuidv4();
const ins = await pool.query(
`INSERT INTO assets (
id, project_id, bin_id,
filename, display_name,
status, media_type,
original_s3_key, proxy_s3_key,
duration_ms,
created_at, updated_at
)
VALUES (
$1, $2, $3,
$4, $4,
'processing', 'video',
$5, $6,
$7,
COALESCE($8::timestamptz, NOW()), NOW()
)
RETURNING *`,
[
id, projectId, binId || null,
clipName,
hiresKey || null, proxyKey || null,
durationMs,
capturedAt || null,
]
);
asset = ins.rows[0];
}
const thumbnailKey = `thumbnails/${id}.jpg`;
// Dispatch thumbnail job — proxy already in S3 from capture
if (proxyKey) {
await thumbnailQueue.add('generate', {
assetId: id,
proxyKey,
outputKey: thumbnailKey,
});
} else {
// No proxy yet — mark ready immediately (e.g. audio-only or test mode)
await pool.query(
`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`,
[id]
);
asset.status = 'ready';
}
res.status(existing.rows.length > 0 ? 200 : 201).json(asset);
} catch (err) {
next(err);
}
});
// POST /cleanup-live mark stuck 'live' assets as 'error'
//
// Recorder containers that crash without calling the stop callback leave
// assets permanently in 'live' status. This endpoint recovers them.
// Default age threshold: 4 hours. Accepts ?max_age_hours=N override.
router.post('/cleanup-live', async (req, res, next) => {
try {
const maxAgeHours = Math.max(1, parseInt(req.query.max_age_hours || '4', 10));
const result = await pool.query(
`UPDATE assets
SET status = 'error', updated_at = NOW()
WHERE status = 'live'
AND created_at < NOW() - ($1 * INTERVAL '1 hour')
RETURNING id, display_name, project_id, created_at`,
[maxAgeHours]
);
res.json({ cleaned: result.rowCount, assets: result.rows });
} catch (err) {
next(err);
}
});
// GET /:id - Single asset
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const result = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Asset not found' });
}
res.json(result.rows[0]);
} catch (err) {
next(err);
}
});
// PATCH /:id - Update asset metadata
router.patch('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const { display_name, tags, notes, bin_id } = req.body;
const updates = [];
const params = [];
let paramCount = 1;
if (display_name !== undefined) {
updates.push(`display_name = $${paramCount++}`);
params.push(display_name);
}
if (tags !== undefined) {
updates.push(`tags = $${paramCount++}`);
params.push(tags);
}
if (notes !== undefined) {
updates.push(`notes = $${paramCount++}`);
params.push(notes);
}
if (bin_id !== undefined) {
// Accept null to move the asset back to the project root
updates.push(`bin_id = $${paramCount++}`);
params.push(bin_id || null);
}
if (updates.length === 0) {
return res.status(400).json({ error: 'No fields to update' });
}
updates.push(`updated_at = NOW()`);
params.push(id);
const query = `
UPDATE assets
SET ${updates.join(', ')}
WHERE id = $${paramCount}
RETURNING *
`;
const result = await pool.query(query, params);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Asset not found' });
}
res.json(result.rows[0]);
} catch (err) {
next(err);
}
});
// POST /:id/copy - Reference-copy an asset into another bin (or project)
//
// Same S3 keys, new asset row. Mirrors filename + metadata. Useful for
// multi-binning a single piece of media without duplicating storage.
router.post('/:id/copy', async (req, res, next) => {
try {
const { id } = req.params;
const { binId, projectId } = req.body;
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const src = r.rows[0];
const newId = uuidv4();
const ins = await pool.query(
`INSERT INTO assets (
id, project_id, bin_id, filename, display_name,
status, media_type, original_s3_key, proxy_s3_key, thumbnail_s3_key,
codec, resolution, fps, duration_ms, start_tc, file_size, tags, notes,
created_at, updated_at
) VALUES (
$1, $2, $3, $4, $5,
$6, $7, $8, $9, $10,
$11, $12, $13, $14, $15, $16, $17, $18,
NOW(), NOW()
) RETURNING *`,
[
newId,
projectId || src.project_id,
binId === undefined ? src.bin_id : (binId || null),
src.filename,
src.display_name,
src.status,
src.media_type,
src.original_s3_key,
src.proxy_s3_key,
src.thumbnail_s3_key,
src.codec,
src.resolution,
src.fps,
src.duration_ms,
src.start_tc,
src.file_size,
src.tags,
src.notes,
]
);
res.status(201).json(ins.rows[0]);
} catch (err) {
next(err);
}
});
// POST /:id/retry - Re-queue proxy generation for an asset stuck in error state
//
// Proxy failures leave assets at status='error' with no recovery path from the
// UI. This endpoint re-dispatches the proxy job so the worker chain
// (proxy → thumbnail) runs again without manual DB edits.
router.post('/:id/retry', async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const asset = r.rows[0];
if (asset.status !== 'error') {
return res.status(400).json({ error: `Asset is not in error state (current: ${asset.status})` });
}
if (!asset.original_s3_key) {
return res.status(400).json({ error: 'Asset has no source file to reprocess' });
}
// Re-use the existing proxy key if one was partially written; otherwise
// construct the canonical key so the worker chain writes to the right place.
const proxyKey = asset.proxy_s3_key || `proxies/${id}.mp4`;
await proxyQueue.add('generate', {
assetId: id,
inputKey: asset.original_s3_key,
outputKey: proxyKey,
});
const updated = await pool.query(
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`,
[id]
);
res.json(updated.rows[0]);
} catch (err) {
next(err);
}
});
// DELETE /:id - Soft or hard delete
router.delete('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const { hard } = req.query;
if (hard === 'true') {
const assetResult = await pool.query(
'SELECT * FROM assets WHERE id = $1',
[id]
);
if (assetResult.rows.length === 0) {
return res.status(404).json({ error: 'Asset not found' });
}
const asset = assetResult.rows[0];
if (asset.proxy_s3_key) await deleteObject(asset.proxy_s3_key);
if (asset.thumbnail_s3_key) await deleteObject(asset.thumbnail_s3_key);
if (asset.original_s3_key) await deleteObject(asset.original_s3_key);
await pool.query('DELETE FROM assets WHERE id = $1', [id]);
res.json({ message: 'Asset deleted permanently' });
} else {
const result = await pool.query(
`UPDATE assets SET status = 'archived', updated_at = NOW()
WHERE id = $1 RETURNING *`,
[id]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Asset not found' });
}
res.json(result.rows[0]);
}
} catch (err) {
next(err);
}
});
// GET /:id/stream - Signed URL for proxy playback
router.get('/:id/stream', async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const a = r.rows[0];
if (a.status === 'live') {
return res.json({ url: `/live/${a.id}/index.m3u8`, type: 'hls', live: true });
}
if (a.proxy_s3_key) {
return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4' });
}
const orig = a.original_s3_key;
if (orig && orig.toLowerCase().endsWith('.mp4')) {
return res.json({ url: `/api/v1/assets/${id}/video`, type: 'mp4' });
}
return res.json({ url: null, type: null, reason: 'no_proxy' });
} catch (err) {
next(err);
}
});
// GET /:id/video - Stream proxy for browser video playback (bypasses S3 direct)
router.get('/:id/video', async (req, res, next) => {
try {
const { id } = req.params;
const r = await pool.query('SELECT proxy_s3_key, original_s3_key FROM assets WHERE id = $1', [id]);
if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const a = r.rows[0];
const key = a.proxy_s3_key || (a.original_s3_key?.toLowerCase().endsWith('.mp4') ? a.original_s3_key : null);
if (!key) return res.status(404).json({ error: 'No browser-playable source' });
const params = { Bucket: S3_BUCKET, Key: key };
const rangeHeader = req.headers.range;
if (rangeHeader) params.Range = rangeHeader;
const s3Res = await s3Client.send(new GetObjectCommand(params));
const status = rangeHeader ? 206 : 200;
const headers = {
'Content-Type': 'video/mp4',
'Accept-Ranges': 'bytes',
'Cache-Control': 'no-store'
};
if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength);
if (s3Res.ContentRange) headers['Content-Range'] = s3Res.ContentRange;
res.writeHead(status, headers);
s3Res.Body.pipe(res);
} catch (err) { next(err); }
});
// GET /:id/thumbnail - Signed URL for thumbnail image
router.get('/:id/thumbnail', async (req, res, next) => {
try {
const { id } = req.params;
const result = await pool.query(
'SELECT thumbnail_s3_key FROM assets WHERE id = $1',
[id]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Asset not found' });
}
const { thumbnail_s3_key } = result.rows[0];
if (!thumbnail_s3_key) {
return res.status(404).json({ error: 'Thumbnail not yet available' });
}
const url = await getSignedUrlForObject(thumbnail_s3_key);
res.json({ url });
} catch (err) {
next(err);
}
});
export default router;