Browser playback of recorded assets moves to HLS, retiring the MP4 range-stitching path for VOD. MP4 proxy is kept for the Premiere panel. - worker/hls.js: remuxToHls() stream-copies the proxy MP4 → fMP4 HLS (playlist.m3u8 + init.mp4 + segment_*.m4s) via existing segmentToHls, uploads to hls/<id>/, sets assets.hls_s3_key. hlsWorker backfills from an existing proxy. - proxy.js: generate HLS inline after the MP4 upload (local file, no re-download, no re-encode); best-effort/non-fatal. - worker/index.js: register 'hls' worker wherever 'proxy' runs. - mam-api: GET /assets/:id/hls/:file serves playlist/init/segments as whole-object GETs (no Range → sidesteps RustFS bug), strict filename validation. /stream prefers hls_s3_key (type:'hls'). reprocess?type=hls backfills. Migration 025 adds assets.hls_s3_key. - Frontend unchanged: hls.js path already handles type:'hls'. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
141 lines
6.9 KiB
JavaScript
141 lines
6.9 KiB
JavaScript
import 'dotenv/config';
|
|
import { Worker, Queue } from 'bullmq';
|
|
import { proxyWorker, thumbnailQueue as proxyThumbnailQueue } from './workers/proxy.js';
|
|
import { thumbnailWorker } from './workers/thumbnail.js';
|
|
import { filmstripWorker } from './workers/filmstrip.js';
|
|
import { conformWorker } from './workers/conform.js';
|
|
import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/youtube-import.js';
|
|
import { trimWorker } from './workers/trimWorker.js';
|
|
import { hlsWorker } from './workers/hls.js';
|
|
import { startPromotionWorker } from './workers/promotion.js';
|
|
|
|
const parseRedisUrl = (url) => {
|
|
const parsed = new URL(url);
|
|
return {
|
|
host: parsed.hostname,
|
|
port: parseInt(parsed.port, 10),
|
|
password: parsed.password || undefined,
|
|
};
|
|
};
|
|
|
|
const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379');
|
|
|
|
// Human-readable node/GPU label stamped onto each job so the Jobs UI can show
|
|
// which worker ran it (UI reads metadata.node). Set per container via env.
|
|
const WORKER_LABEL = process.env.WORKER_LABEL || process.env.HOSTNAME || 'worker';
|
|
|
|
const createWorker = (queueName, handler, overrides = {}) => {
|
|
// Stamp node attribution into the job data on pickup (persists to Redis).
|
|
const wrapped = async (job) => {
|
|
try { await job.updateData({ ...job.data, node: WORKER_LABEL }); } catch (_) {}
|
|
return handler(job);
|
|
};
|
|
const worker = new Worker(queueName, wrapped, {
|
|
connection: redisOptions,
|
|
// Stall detection: if a worker dies mid-job, BullMQ moves it back to wait
|
|
// after stalledInterval. Without this a crashed run sits in active forever.
|
|
stalledInterval: 30000,
|
|
maxStalledCount: 1,
|
|
lockDuration: 60000,
|
|
lockRenewTime: 15000,
|
|
...overrides,
|
|
});
|
|
|
|
worker.on('completed', (job) => {
|
|
console.log(`[${queueName}] Job ${job.id} completed`);
|
|
});
|
|
|
|
worker.on('failed', (job, err) => {
|
|
console.error(`[${queueName}] Job ${job.id} failed:`, err.message);
|
|
});
|
|
|
|
worker.on('stalled', (jobId) => {
|
|
console.warn(`[${queueName}] Job ${jobId} stalled — reclaimed`);
|
|
});
|
|
|
|
worker.on('progress', (job, progress) => {
|
|
console.log(`[${queueName}] Job ${job.id} progress:`, progress);
|
|
});
|
|
|
|
return worker;
|
|
};
|
|
|
|
// Per-queue concurrency. Defaults to 1, which serialises every job in a
|
|
// queue — meaning a single stalled job blocks every other one. We want
|
|
// thumbnails (cheap, parallel-safe) to run several at a time so a slow
|
|
// outlier doesn't back the rest of the catalog up. Proxy + conform are
|
|
// heavier (ffmpeg transcode) so we keep them lower to avoid trashing
|
|
// the box; tune via env if a node has more headroom.
|
|
const PROXY_CONCURRENCY = parseInt(process.env.PROXY_CONCURRENCY || '2', 10);
|
|
const THUMBNAIL_CONCURRENCY = parseInt(process.env.THUMBNAIL_CONCURRENCY || '4', 10);
|
|
const FILMSTRIP_CONCURRENCY = parseInt(process.env.FILMSTRIP_CONCURRENCY || '2', 10);
|
|
const CONFORM_CONCURRENCY = parseInt(process.env.CONFORM_CONCURRENCY || '1', 10);
|
|
const TRIM_CONCURRENCY = parseInt(process.env.TRIM_CONCURRENCY || '4', 10);
|
|
|
|
// Capability routing: a worker only subscribes to the queues named in
|
|
// WORKER_QUEUES (comma-separated). Unset = all queues (back-compat). This lets
|
|
// us pin heavy encodes (proxy/conform/trim) to strong GPUs and light jobs
|
|
// (thumbnail/filmstrip) to weak ones, each container bound to one GPU.
|
|
const _wq = (process.env.WORKER_QUEUES || '').trim();
|
|
const _enabled = _wq ? new Set(_wq.split(',').map(x => x.trim()).filter(Boolean)) : null;
|
|
const want = (q) => !_enabled || _enabled.has(q);
|
|
|
|
const workers = [
|
|
want('proxy') && createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }),
|
|
want('thumbnail') && createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }),
|
|
want('filmstrip') && createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }),
|
|
want('conform') && createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }),
|
|
want('trim') && createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }),
|
|
// HLS backfill remux is a light stream-copy. Run it wherever proxy runs so
|
|
// existing proxy nodes pick up reprocess?type=hls jobs without an env change.
|
|
(want('proxy') || want('hls')) && createWorker('hls', hlsWorker, { concurrency: 2 }),
|
|
want('import') && createWorker('import', youtubeImportWorker, {
|
|
concurrency: 1,
|
|
lockDuration: 10 * 60 * 1000,
|
|
lockRenewTime: 60000,
|
|
}),
|
|
].filter(Boolean);
|
|
console.log(`WORKER_QUEUES=${_wq || '(all)'}`);
|
|
|
|
// Filmstrip queue singleton — used by thumbnail worker to enqueue filmstrip jobs
|
|
export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions });
|
|
|
|
console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCURRENCY} filmstrip=${FILMSTRIP_CONCURRENCY} conform=${CONFORM_CONCURRENCY} trim=${TRIM_CONCURRENCY} import=1`);
|
|
|
|
// BUG FIX #4: startPromotionWorker() now returns a shutdown function that
|
|
// clears the poll intervals and closes the promotion proxyQueue singleton.
|
|
// Promotion (growing-files -> S3) is a polling SCAN, not a queue consumer.
|
|
// With multiple worker containers it must run on exactly one, or every node
|
|
// races the same files. Gate behind RUN_PROMOTION (set true on a single worker).
|
|
const stopPromotionWorker = (process.env.RUN_PROMOTION === 'true') ? startPromotionWorker() : null;
|
|
if (process.env.RUN_PROMOTION === 'true') console.log('[promotion] scanner ENABLED on this worker');
|
|
|
|
console.log('Wild Dragon Worker Service started');
|
|
console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`);
|
|
console.log('Active queues: proxy, thumbnail, conform, trim, import');
|
|
console.log('Background scans: promotion (growing-files → S3)');
|
|
|
|
process.on('SIGTERM', async () => {
|
|
console.log('SIGTERM received, shutting down...');
|
|
|
|
// BUG FIX #4 + #10: Close all BullMQ Workers AND all Queue client instances
|
|
// on SIGTERM. Workers process jobs; Queues dispatch them. Both hold open
|
|
// Redis connections that keep the event loop alive after workers are closed,
|
|
// causing the process to hang indefinitely unless process.exit() is called.
|
|
// Explicitly closing every Queue allows the event loop to drain naturally.
|
|
await Promise.all([
|
|
// Close all Worker instances (stops accepting new jobs, waits for active)
|
|
...workers.map(w => w.close()),
|
|
// BUG FIX #7: Close the Queue singletons from worker modules.
|
|
// proxyThumbnailQueue: thumbnailQueue in proxy.js (dispatches thumbnail jobs)
|
|
// youtubeProxyQueue: proxyQueue in youtube-import.js (dispatches proxy jobs)
|
|
proxyThumbnailQueue.close().catch(() => {}),
|
|
youtubeProxyQueue.close().catch(() => {}),
|
|
filmstripQueue.close().catch(() => {}),
|
|
// BUG FIX #4: Stop the promotion worker intervals and close its proxyQueue
|
|
stopPromotionWorker ? stopPromotionWorker() : Promise.resolve(),
|
|
]);
|
|
|
|
console.log('All workers and queues closed');
|
|
process.exit(0);
|
|
});
|