import { join } from 'path'; import { tmpdir } from 'os'; import { mkdtemp, readdir, rm, unlink } from 'fs/promises'; import { Queue } from 'bullmq'; import { query } from '../db/client.js'; import { downloadFromS3, uploadToS3 } from '../s3/client.js'; import { segmentToHls } from '../ffmpeg/executor.js'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; }; // Remux a local, already-browser-compatible MP4 (H.264/AAC, as the proxy // worker produces) into an fMP4 HLS rendition and upload it to hls//. // This is a stream COPY (no re-encode) — it costs seconds, not minutes. // // HLS is served to the browser as whole-file GETs through mam-api, which // sidesteps the RustFS ranged-GET bug that the MP4 /video path has to stitch // around. The MP4 proxy is kept for the Premiere panel + downloads. // // Returns the playlist S3 key and sets assets.hls_s3_key. export async function remuxToHls(localMp4Path, assetId) { const workDir = await mkdtemp(join(tmpdir(), `hls-${assetId}-`)); try { // Produces playlist.m3u8 + init.mp4 + segment_NNNNN.m4s in workDir. await segmentToHls(localMp4Path, workDir); const prefix = `hls/${assetId}`; const files = await readdir(workDir); if (!files.includes('playlist.m3u8')) { throw new Error('segmentToHls produced no playlist.m3u8'); } for (const f of files) { await uploadToS3(S3_BUCKET, `${prefix}/${f}`, join(workDir, f)); } const playlistKey = `${prefix}/playlist.m3u8`; await query( 'UPDATE assets SET hls_s3_key = $1, updated_at = NOW() WHERE id = $2', [playlistKey, assetId] ); return playlistKey; } finally { await rm(workDir, { recursive: true, force: true }).catch(() => {}); } } // Backfill worker: remux an EXISTING proxy MP4 into HLS for assets that // predate the proxy-worker HLS step. Enqueued by POST /assets/:id/reprocess?type=hls. export const hlsWorker = async (job) => { const { assetId, proxyKey } = job.data; if (!proxyKey) throw new Error('hls job requires proxyKey'); const tmpPath = join(tmpdir(), `hls-src-${job.id}.mp4`); try { await job.updateProgress(10); console.log(`[hls] Downloading proxy ${proxyKey} for asset ${assetId}`); await downloadFromS3(S3_BUCKET, proxyKey, tmpPath); await job.updateProgress(40); const key = await remuxToHls(tmpPath, assetId); console.log(`[hls] Asset ${assetId} HLS rendition complete → ${key}`); await job.updateProgress(100); return { assetId, hlsKey: key }; } finally { await unlink(tmpPath).catch(() => {}); } }; export const hlsQueue = new Queue('hls', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), });