// Promotion worker — scans the growing-files SMB landing zone for stable // captures (no mtime change for N seconds) and uploads them to S3, flipping // the matching asset's status from 'growing' to 'ready' so the Premiere // panel can relink to the proxy/hi-res URLs. // // Why a poll loop and not chokidar: NFS/SMB mounts don't reliably surface // inotify events through the kernel; mtime polling is the boring-but-works // answer for fairness across all storage backends. import { readdir, stat, unlink } from 'node:fs/promises'; import { join, relative, basename } from 'node:path'; import { createReadStream } from 'node:fs'; import { Queue } from 'bullmq'; import { query } from '../db/client.js'; import { uploadStreamToS3 } from '../s3/client.js'; const GROWING_PATH = process.env.GROWING_PATH || '/growing'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; const POLL_MS = 5000; let inflight = new Set(); let idleThresholdMs = 8000; // BUG FIX #3: Create a single module-level proxyQueue instead of spinning up // a new Queue connection (and closing it) on every promoted file. Repeatedly // creating and closing BullMQ Queues hammers Redis with connect/disconnect // cycles and leaks connections when the close() call races with the add(). const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 }; }; const proxyQueue = new Queue('proxy', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); // Interval handles — stored so startPromotionWorker() can expose a shutdown // function for the SIGTERM handler in index.js (BUG FIX #4). let scanInterval = null; let thresholdInterval = null; async function loadIdleThreshold() { try { const r = await query( `SELECT value FROM settings WHERE key = 'growing_promote_after_seconds'` ); const sec = parseInt(r.rows[0]?.value, 10); if (sec > 0) idleThresholdMs = sec * 1000; } catch (_) { /* table not migrated yet — keep default */ } } async function* walk(dir) { let entries = []; try { entries = await readdir(dir, { withFileTypes: true }); } catch (_) { return; } for (const e of entries) { const full = join(dir, e.name); if (e.isDirectory()) yield* walk(full); else if (e.isFile()) yield full; } } async function promote(filePath) { if (inflight.has(filePath)) return; inflight.add(filePath); try { // Reconstruct the S3 key from the relative path under GROWING_PATH. // Capture writes `${GROWING_PATH}/${projectId}/${clipName}.${ext}`, which // mirrors `projects/${projectId}/masters/${clipName}.${ext}` in S3. const rel = relative(GROWING_PATH, filePath); // /. const [projectId, fileName] = rel.split('/', 2); if (!projectId || !fileName) return; const s3Key = `projects/${projectId}/masters/${fileName}`; // Find the matching live asset by display_name = clipName. const clipName = basename(fileName, '.' + fileName.split('.').pop()); const r = await query( `SELECT id, status FROM assets WHERE project_id = $1 AND display_name = $2 ORDER BY created_at DESC LIMIT 1`, [projectId, clipName] ); if (r.rows.length === 0) { console.warn(`[promotion] no asset row for ${rel} — skipping`); return; } const asset = r.rows[0]; const st = await stat(filePath); console.log(`[promotion] uploading ${rel} (${st.size} bytes) -> s3://${S3_BUCKET}/${s3Key}`); await uploadStreamToS3(S3_BUCKET, s3Key, createReadStream(filePath)); await query( `UPDATE assets SET original_s3_key = $1, file_size = $2, status = 'ready', updated_at = NOW() WHERE id = $3`, [s3Key, st.size, asset.id] ); // BUG FIX #3 (cont): Use the module-level proxyQueue singleton instead of // creating a new Queue + closing it inside every promote() call. await proxyQueue.add('generate', { assetId: asset.id, inputKey: s3Key, outputKey: `proxies/${asset.id}.mp4`, }); await unlink(filePath).catch(err => { console.warn(`[promotion] could not unlink ${rel}: ${err.message}`); }); console.log(`[promotion] asset ${asset.id} promoted, proxy queued, local file removed`); } catch (err) { console.error('[promotion] failed for', filePath, err); } finally { inflight.delete(filePath); } } async function scan() { const now = Date.now(); for await (const file of walk(GROWING_PATH)) { if (inflight.has(file)) continue; let st; try { st = await stat(file); } catch (_) { continue; } if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) { // BUG FIX #3 (cont): await promote() so that unhandled promise rejections // don't escape the scan loop. promote() already catches internally, so // this is belt-and-suspenders against future changes. await promote(file); } } } // BUG FIX #4: Return a shutdown function so index.js can clear the intervals // and close the queue connection during SIGTERM. export function startPromotionWorker() { loadIdleThreshold(); thresholdInterval = setInterval(loadIdleThreshold, 60_000); scanInterval = setInterval(scan, POLL_MS); console.log(`[promotion] watching ${GROWING_PATH} (idle threshold ${idleThresholdMs}ms)`); return async function stopPromotionWorker() { if (scanInterval) clearInterval(scanInterval); if (thresholdInterval) clearInterval(thresholdInterval); await proxyQueue.close().catch(() => {}); console.log('[promotion] worker stopped'); }; }