import 'dotenv/config'; import { Worker, Queue } from 'bullmq'; import { proxyWorker, thumbnailQueue as proxyThumbnailQueue } from './workers/proxy.js'; import { thumbnailWorker } from './workers/thumbnail.js'; import { filmstripWorker } from './workers/filmstrip.js'; import { conformWorker } from './workers/conform.js'; import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/youtube-import.js'; import { trimWorker } from './workers/trimWorker.js'; import { startPromotionWorker } from './workers/promotion.js'; const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10), password: parsed.password || undefined, }; }; const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379'); const createWorker = (queueName, handler, overrides = {}) => { const worker = new Worker(queueName, handler, { connection: redisOptions, // Stall detection: if a worker dies mid-job, BullMQ moves it back to wait // after stalledInterval. Without this a crashed run sits in active forever. stalledInterval: 30000, maxStalledCount: 1, lockDuration: 60000, lockRenewTime: 15000, ...overrides, }); worker.on('completed', (job) => { console.log(`[${queueName}] Job ${job.id} completed`); }); worker.on('failed', (job, err) => { console.error(`[${queueName}] Job ${job.id} failed:`, err.message); }); worker.on('stalled', (jobId) => { console.warn(`[${queueName}] Job ${jobId} stalled — reclaimed`); }); worker.on('progress', (job, progress) => { console.log(`[${queueName}] Job ${job.id} progress:`, progress); }); return worker; }; // Per-queue concurrency. Defaults to 1, which serialises every job in a // queue — meaning a single stalled job blocks every other one. We want // thumbnails (cheap, parallel-safe) to run several at a time so a slow // outlier doesn't back the rest of the catalog up. Proxy + conform are // heavier (ffmpeg transcode) so we keep them lower to avoid trashing // the box; tune via env if a node has more headroom. const PROXY_CONCURRENCY = parseInt(process.env.PROXY_CONCURRENCY || '2', 10); const THUMBNAIL_CONCURRENCY = parseInt(process.env.THUMBNAIL_CONCURRENCY || '4', 10); const FILMSTRIP_CONCURRENCY = parseInt(process.env.FILMSTRIP_CONCURRENCY || '2', 10); const CONFORM_CONCURRENCY = parseInt(process.env.CONFORM_CONCURRENCY || '1', 10); const TRIM_CONCURRENCY = parseInt(process.env.TRIM_CONCURRENCY || '4', 10); const workers = [ createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }), createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }), createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }), createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }), createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }), createWorker('import', youtubeImportWorker, { concurrency: 1, lockDuration: 10 * 60 * 1000, lockRenewTime: 60000, }), ]; // Filmstrip queue singleton — used by thumbnail worker to enqueue filmstrip jobs export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions }); console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCURRENCY} filmstrip=${FILMSTRIP_CONCURRENCY} conform=${CONFORM_CONCURRENCY} trim=${TRIM_CONCURRENCY} import=1`); // BUG FIX #4: startPromotionWorker() now returns a shutdown function that // clears the poll intervals and closes the promotion proxyQueue singleton. const stopPromotionWorker = startPromotionWorker(); console.log('Wild Dragon Worker Service started'); console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`); console.log('Active queues: proxy, thumbnail, conform, trim, import'); console.log('Background scans: promotion (growing-files → S3)'); process.on('SIGTERM', async () => { console.log('SIGTERM received, shutting down...'); // BUG FIX #4 + #10: Close all BullMQ Workers AND all Queue client instances // on SIGTERM. Workers process jobs; Queues dispatch them. Both hold open // Redis connections that keep the event loop alive after workers are closed, // causing the process to hang indefinitely unless process.exit() is called. // Explicitly closing every Queue allows the event loop to drain naturally. await Promise.all([ // Close all Worker instances (stops accepting new jobs, waits for active) ...workers.map(w => w.close()), // BUG FIX #7: Close the Queue singletons from worker modules. // proxyThumbnailQueue: thumbnailQueue in proxy.js (dispatches thumbnail jobs) // youtubeProxyQueue: proxyQueue in youtube-import.js (dispatches proxy jobs) proxyThumbnailQueue.close().catch(() => {}), youtubeProxyQueue.close().catch(() => {}), filmstripQueue.close().catch(() => {}), // BUG FIX #4: Stop the promotion worker intervals and close its proxyQueue stopPromotionWorker(), ]); console.log('All workers and queues closed'); process.exit(0); });