diff --git a/services/worker/src/workers/promotion.js b/services/worker/src/workers/promotion.js index b50497d..d5a5429 100644 --- a/services/worker/src/workers/promotion.js +++ b/services/worker/src/workers/promotion.js @@ -6,9 +6,10 @@ // Why a poll loop and not chokidar: NFS/SMB mounts don't reliably surface // inotify events through the kernel; mtime polling is the boring-but-works // answer for fairness across all storage backends. -import { readdir, stat, unlink, readFile } from 'node:fs/promises'; +import { readdir, stat, unlink } from 'node:fs/promises'; import { join, relative, basename } from 'node:path'; import { createReadStream } from 'node:fs'; +import { Queue } from 'bullmq'; import { query } from '../db/client.js'; import { uploadStreamToS3 } from '../s3/client.js'; @@ -19,6 +20,24 @@ const POLL_MS = 5000; let inflight = new Set(); let idleThresholdMs = 8000; +// BUG FIX #3: Create a single module-level proxyQueue instead of spinning up +// a new Queue connection (and closing it) on every promoted file. Repeatedly +// creating and closing BullMQ Queues hammers Redis with connect/disconnect +// cycles and leaks connections when the close() call races with the add(). +const parseRedisUrl = (url) => { + const parsed = new URL(url); + return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 }; +}; + +const proxyQueue = new Queue('proxy', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); + +// Interval handles — stored so startPromotionWorker() can expose a shutdown +// function for the SIGTERM handler in index.js (BUG FIX #4). +let scanInterval = null; +let thresholdInterval = null; + async function loadIdleThreshold() { try { const r = await query( @@ -80,19 +99,13 @@ async function promote(filePath) { [s3Key, st.size, asset.id] ); - // Queue the proxy job so the editor gets a browser-playable proxy and - // the panel's "relink to hi-res" path becomes available. - const { Queue } = await import('bullmq'); - const { hostname, port } = new URL(process.env.REDIS_URL || 'redis://queue:6379'); - const proxyQueue = new Queue('proxy', { - connection: { host: hostname, port: parseInt(port, 10) || 6379 }, - }); + // BUG FIX #3 (cont): Use the module-level proxyQueue singleton instead of + // creating a new Queue + closing it inside every promote() call. await proxyQueue.add('generate', { assetId: asset.id, inputKey: s3Key, outputKey: `proxies/${asset.id}.mp4`, }); - await proxyQueue.close(); await unlink(filePath).catch(err => { console.warn(`[promotion] could not unlink ${rel}: ${err.message}`); @@ -112,14 +125,26 @@ async function scan() { let st; try { st = await stat(file); } catch (_) { continue; } if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) { - promote(file); + // BUG FIX #3 (cont): await promote() so that unhandled promise rejections + // don't escape the scan loop. promote() already catches internally, so + // this is belt-and-suspenders against future changes. + await promote(file); } } } +// BUG FIX #4: Return a shutdown function so index.js can clear the intervals +// and close the queue connection during SIGTERM. export function startPromotionWorker() { loadIdleThreshold(); - setInterval(loadIdleThreshold, 60_000); - setInterval(scan, POLL_MS); + thresholdInterval = setInterval(loadIdleThreshold, 60_000); + scanInterval = setInterval(scan, POLL_MS); console.log(`[promotion] watching ${GROWING_PATH} (idle threshold ${idleThresholdMs}ms)`); + + return async function stopPromotionWorker() { + if (scanInterval) clearInterval(scanInterval); + if (thresholdInterval) clearInterval(thresholdInterval); + await proxyQueue.close().catch(() => {}); + console.log('[promotion] worker stopped'); + }; }