diff --git a/services/mam-api/src/db/migrations/025-hls-key.sql b/services/mam-api/src/db/migrations/025-hls-key.sql new file mode 100644 index 0000000..8442178 --- /dev/null +++ b/services/mam-api/src/db/migrations/025-hls-key.sql @@ -0,0 +1,5 @@ +-- HLS VOD playback: per-asset HLS rendition (fMP4) generated by the proxy +-- worker alongside the MP4 proxy. Presence of hls_s3_key means an HLS +-- playlist exists at hls//playlist.m3u8 and /assets/:id/stream +-- should prefer it (type: 'hls') over the MP4 range-stitched /video path. +ALTER TABLE assets ADD COLUMN IF NOT EXISTS hls_s3_key TEXT; diff --git a/services/mam-api/src/routes/assets.js b/services/mam-api/src/routes/assets.js index b7c216a..d31387e 100644 --- a/services/mam-api/src/routes/assets.js +++ b/services/mam-api/src/routes/assets.js @@ -33,6 +33,10 @@ const filmstripQueue = new Queue('filmstrip', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); +const hlsQueue = new Queue('hls', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); + // GET / - List assets with filtering router.get('/', async (req, res, next) => { try { @@ -477,8 +481,8 @@ router.post('/:id/reprocess', async (req, res, next) => { try { const { id } = req.params; const type = req.query.type || 'proxy'; - if (!['proxy', 'thumbnail', 'filmstrip'].includes(type)) { - return res.status(400).json({ error: 'type must be "proxy", "thumbnail", or "filmstrip"' }); + if (!['proxy', 'thumbnail', 'filmstrip', 'hls'].includes(type)) { + return res.status(400).json({ error: 'type must be "proxy", "thumbnail", "filmstrip", or "hls"' }); } const r = await pool.query('SELECT * FROM assets WHERE id = $1', [id]); if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); @@ -501,6 +505,12 @@ router.post('/:id/reprocess', async (req, res, next) => { await filmstripQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key }); return res.json({ queued: 'filmstrip', assetId: id }); } + if (type === 'hls') { + // Backfill: remux the existing proxy MP4 into an HLS rendition (no re-encode). + if (!asset.proxy_s3_key) return res.status(400).json({ error: 'Asset has no proxy — generate proxy first' }); + await hlsQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key }); + return res.json({ queued: 'hls', assetId: id }); + } } catch (err) { next(err); } }); @@ -585,6 +595,11 @@ router.get('/:id/stream', async (req, res, next) => { if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); const a = r.rows[0]; if (a.status === 'live') return res.json({ url: `/live/${a.id}/index.m3u8`, type: 'hls', live: true }); + // Prefer the HLS rendition for recorded assets — whole-file segment GETs + // avoid the RustFS ranged-GET stitching the MP4 /video path has to do. + if (a.hls_s3_key) { + return res.json({ url: `/api/v1/assets/${id}/hls/playlist.m3u8`, type: 'hls', source: 'proxy' }); + } const VIDEO_EXTS = ['.mp4', '.mov', '.mxf', '.ts', '.m4v', '.mkv', '.avi', '.webm']; const key = a.proxy_s3_key || (a.original_s3_key && VIDEO_EXTS.some(ext => a.original_s3_key.toLowerCase().endsWith(ext)) @@ -596,6 +611,42 @@ router.get('/:id/stream', async (req, res, next) => { } catch (err) { next(err); } }); +// GET /:id/hls/:file — serve an HLS rendition file (playlist / init / segment). +// Whole-object passthrough from S3: no Range handling, so this sidesteps the +// RustFS ranged-GET bug entirely (every segment is a small, complete GET). +// :file is strictly validated to prevent path traversal into the bucket. +const HLS_FILE_RE = /^(playlist\.m3u8|init\.mp4|segment_\d+\.m4s)$/; +router.get('/:id/hls/:file', async (req, res, next) => { + try { + const { id, file } = req.params; + if (!HLS_FILE_RE.test(file)) return res.status(400).json({ error: 'Invalid HLS file' }); + + const r = await pool.query('SELECT hls_s3_key FROM assets WHERE id = $1', [id]); + if (r.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); + const playlistKey = r.rows[0].hls_s3_key; + if (!playlistKey) return res.status(404).json({ error: 'No HLS rendition for this asset' }); + + // Derive the prefix from the stored playlist key (hls//playlist.m3u8) + // and request the specific file under it. + const prefix = playlistKey.replace(/\/[^/]+$/, ''); + const key = `${prefix}/${file}`; + + const isPlaylist = file.endsWith('.m3u8'); + const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key })); + res.writeHead(200, { + 'Content-Type': isPlaylist ? 'application/vnd.apple.mpegurl' : 'video/mp4', + 'Cache-Control': isPlaylist ? 'no-cache' : 'private, max-age=3600', + ...(s3Res.ContentLength ? { 'Content-Length': String(s3Res.ContentLength) } : {}), + }); + s3Res.Body.pipe(res); + } catch (err) { + if (err && (err.name === 'NoSuchKey' || err.$metadata?.httpStatusCode === 404)) { + return res.status(404).json({ error: 'HLS file not found' }); + } + next(err); + } +}); + // GET /:id/live-path router.get('/:id/live-path', async (req, res, next) => { try { diff --git a/services/worker/src/index.js b/services/worker/src/index.js index 23b6cb2..9152e76 100644 --- a/services/worker/src/index.js +++ b/services/worker/src/index.js @@ -6,6 +6,7 @@ import { filmstripWorker } from './workers/filmstrip.js'; import { conformWorker } from './workers/conform.js'; import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/youtube-import.js'; import { trimWorker } from './workers/trimWorker.js'; +import { hlsWorker } from './workers/hls.js'; import { startPromotionWorker } from './workers/promotion.js'; const parseRedisUrl = (url) => { @@ -85,6 +86,9 @@ const workers = [ want('filmstrip') && createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }), want('conform') && createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }), want('trim') && createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }), + // HLS backfill remux is a light stream-copy. Run it wherever proxy runs so + // existing proxy nodes pick up reprocess?type=hls jobs without an env change. + (want('proxy') || want('hls')) && createWorker('hls', hlsWorker, { concurrency: 2 }), want('import') && createWorker('import', youtubeImportWorker, { concurrency: 1, lockDuration: 10 * 60 * 1000, diff --git a/services/worker/src/workers/hls.js b/services/worker/src/workers/hls.js new file mode 100644 index 0000000..d721148 --- /dev/null +++ b/services/worker/src/workers/hls.js @@ -0,0 +1,73 @@ +import { join } from 'path'; +import { tmpdir } from 'os'; +import { mkdtemp, readdir, rm, unlink } from 'fs/promises'; +import { Queue } from 'bullmq'; +import { query } from '../db/client.js'; +import { downloadFromS3, uploadToS3 } from '../s3/client.js'; +import { segmentToHls } from '../ffmpeg/executor.js'; + +const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; + +const parseRedisUrl = (url) => { + const parsed = new URL(url); + return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; +}; + +// Remux a local, already-browser-compatible MP4 (H.264/AAC, as the proxy +// worker produces) into an fMP4 HLS rendition and upload it to hls//. +// This is a stream COPY (no re-encode) — it costs seconds, not minutes. +// +// HLS is served to the browser as whole-file GETs through mam-api, which +// sidesteps the RustFS ranged-GET bug that the MP4 /video path has to stitch +// around. The MP4 proxy is kept for the Premiere panel + downloads. +// +// Returns the playlist S3 key and sets assets.hls_s3_key. +export async function remuxToHls(localMp4Path, assetId) { + const workDir = await mkdtemp(join(tmpdir(), `hls-${assetId}-`)); + try { + // Produces playlist.m3u8 + init.mp4 + segment_NNNNN.m4s in workDir. + await segmentToHls(localMp4Path, workDir); + + const prefix = `hls/${assetId}`; + const files = await readdir(workDir); + if (!files.includes('playlist.m3u8')) { + throw new Error('segmentToHls produced no playlist.m3u8'); + } + for (const f of files) { + await uploadToS3(S3_BUCKET, `${prefix}/${f}`, join(workDir, f)); + } + + const playlistKey = `${prefix}/playlist.m3u8`; + await query( + 'UPDATE assets SET hls_s3_key = $1, updated_at = NOW() WHERE id = $2', + [playlistKey, assetId] + ); + return playlistKey; + } finally { + await rm(workDir, { recursive: true, force: true }).catch(() => {}); + } +} + +// Backfill worker: remux an EXISTING proxy MP4 into HLS for assets that +// predate the proxy-worker HLS step. Enqueued by POST /assets/:id/reprocess?type=hls. +export const hlsWorker = async (job) => { + const { assetId, proxyKey } = job.data; + if (!proxyKey) throw new Error('hls job requires proxyKey'); + const tmpPath = join(tmpdir(), `hls-src-${job.id}.mp4`); + try { + await job.updateProgress(10); + console.log(`[hls] Downloading proxy ${proxyKey} for asset ${assetId}`); + await downloadFromS3(S3_BUCKET, proxyKey, tmpPath); + await job.updateProgress(40); + const key = await remuxToHls(tmpPath, assetId); + console.log(`[hls] Asset ${assetId} HLS rendition complete → ${key}`); + await job.updateProgress(100); + return { assetId, hlsKey: key }; + } finally { + await unlink(tmpPath).catch(() => {}); + } +}; + +export const hlsQueue = new Queue('hls', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); diff --git a/services/worker/src/workers/proxy.js b/services/worker/src/workers/proxy.js index 6eb377a..de2711f 100644 --- a/services/worker/src/workers/proxy.js +++ b/services/worker/src/workers/proxy.js @@ -5,6 +5,7 @@ import { Queue } from 'bullmq'; import { query } from '../db/client.js'; import { downloadFromS3, uploadToS3 } from '../s3/client.js'; import { transcodeVideo, transcodeImage, getMediaInfo, isHwCodec } from '../ffmpeg/executor.js'; +import { remuxToHls } from './hls.js'; // Read the global proxy-encoder settings from the DB. These are written by // Settings → Proxy encoding in the GUI. Falls back to libx264 defaults if @@ -223,6 +224,19 @@ export const proxyWorker = async (job) => { ] ); + // Generate the HLS rendition from the proxy we just wrote. The file is + // still on local disk, so this is a fast stream-copy remux (no download, + // no re-encode). Best-effort: HLS is the preferred browser playback path, + // but the MP4 /video fallback still works if this fails, so never fail the + // proxy job over it. + await job.updateProgress(80); + try { + const hlsKey = await remuxToHls(outputPath, assetId); + console.log(`[proxy] HLS rendition generated for ${assetId} → ${hlsKey}`); + } catch (hlsErr) { + console.warn(`[proxy] HLS generation failed for ${assetId} (non-fatal): ${hlsErr.message}`); + } + // Now proxy exists in S3 — safe to queue thumbnail generation const thumbnailKey = `thumbnails/${assetId}.jpg`; await thumbnailQueue.add('generate', {