From 1f342826bd3b9b90acff268c148666c1b0f0f9ac Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Tue, 2 Jun 2026 18:38:56 -0400 Subject: [PATCH] =?UTF-8?q?feat/fix:=20promotion.js=20=E2=80=94=20growing?= =?UTF-8?q?=20migrate=20flow=20+=20deltacast=20cleanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/worker/src/workers/promotion.js | 55 ++++++++++-------------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/services/worker/src/workers/promotion.js b/services/worker/src/workers/promotion.js index 2f95692..7ce6f4f 100644 --- a/services/worker/src/workers/promotion.js +++ b/services/worker/src/workers/promotion.js @@ -1,11 +1,3 @@ -// Promotion worker — scans the growing-files SMB landing zone for stable -// captures (no mtime change for N seconds) and uploads them to S3, flipping -// the matching asset's status from 'growing' to 'ready' so the Premiere -// panel can relink to the proxy/hi-res URLs. -// -// Why a poll loop and not chokidar: NFS/SMB mounts don't reliably surface -// inotify events through the kernel; mtime polling is the boring-but-works -// answer for fairness across all storage backends. import { readdir, stat, unlink, mkdir, writeFile } from 'node:fs/promises'; import { execFileSync } from 'node:child_process'; import { join, relative, basename } from 'node:path'; @@ -81,10 +73,9 @@ async function ensureGrowingShareMounted() { let inflight = new Set(); let idleThresholdMs = 8000; -// BUG FIX #3: Create a single module-level proxyQueue instead of spinning up -// a new Queue connection (and closing it) on every promoted file. Repeatedly -// creating and closing BullMQ Queues hammers Redis with connect/disconnect -// cycles and leaks connections when the close() call races with the add(). +// Single module-level proxyQueue — avoids creating and closing a new Queue +// connection on every promoted file, which hammers Redis with connect/disconnect +// cycles and leaks connections when close() races with add(). const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 }; @@ -95,7 +86,7 @@ const proxyQueue = new Queue('proxy', { }); // Interval handles — stored so startPromotionWorker() can expose a shutdown -// function for the SIGTERM handler in index.js (BUG FIX #4). +// function for the SIGTERM handler in index.js. let scanInterval = null; let thresholdInterval = null; @@ -111,8 +102,12 @@ async function loadIdleThreshold() { async function* walk(dir) { let entries = []; - try { entries = await readdir(dir, { withFileTypes: true }); } - catch (_) { return; } + try { + entries = await readdir(dir, { withFileTypes: true }); + } catch (err) { + console.warn(`[promotion] readdir failed for ${dir}: ${err.message}`); + return; + } for (const e of entries) { const full = join(dir, e.name); if (e.isDirectory()) yield* walk(full); @@ -146,20 +141,16 @@ async function promote(filePath) { } const asset = r.rows[0]; - // CRITICAL: do not promote while the recorder is STILL RECORDING this - // session. The mtime-idle heuristic is unreliable over CIFS (attribute - // caching makes an actively-growing MXF look "stable"), which caused the - // worker to grab a live file mid-record (~15s in), upload it, flip the - // asset to 'ready', and unlink it — "a worker is stealing the file". The - // growing asset's display_name IS the recorder's current_session_id, so - // gate on the recorder's live status: only promote once recording stopped. + // Do not promote while the recorder is STILL RECORDING this session. + // The mtime-idle heuristic is unreliable over CIFS (attribute caching makes + // an actively-growing MXF look "stable"). Gate on the recorder's live + // status: only promote once recording stopped. const recActive = await query( `SELECT 1 FROM recorders WHERE current_session_id = $1 AND status = 'recording' LIMIT 1`, [clipName] ); if (recActive.rows.length > 0) { - // Still recording — leave the growing file in place for the editor. return; } @@ -177,18 +168,21 @@ async function promote(filePath) { [s3Key, st.size, asset.id] ); - // BUG FIX #3 (cont): Use the module-level proxyQueue singleton instead of - // creating a new Queue + closing it inside every promote() call. await proxyQueue.add('generate', { assetId: asset.id, inputKey: s3Key, outputKey: `proxies/${asset.id}.mp4`, }); + console.log(`[promotion] asset ${asset.id} promoted, proxy queued`); + + // Unlink the source file from the SMB share — best-effort cleanup only. + // The file is already safely in S3 and the asset is 'ready'. CIFS/SMB + // can return EIO if the share was remounted or the remote node rebooted + // between upload and unlink; log the error but do not fail the job. await unlink(filePath).catch(err => { - console.warn(`[promotion] could not unlink ${rel}: ${err.message}`); + console.warn(`[promotion] could not unlink ${rel} (best-effort, file already in S3): ${err.message}`); }); - console.log(`[promotion] asset ${asset.id} promoted, proxy queued, local file removed`); } catch (err) { console.error('[promotion] failed for', filePath, err); } finally { @@ -203,16 +197,13 @@ async function scan() { let st; try { st = await stat(file); } catch (_) { continue; } if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) { - // BUG FIX #3 (cont): await promote() so that unhandled promise rejections - // don't escape the scan loop. promote() already catches internally, so - // this is belt-and-suspenders against future changes. await promote(file); } } } -// BUG FIX #4: Return a shutdown function so index.js can clear the intervals -// and close the queue connection during SIGTERM. +// Return a shutdown function so index.js can clear the intervals and close the +// queue connection during SIGTERM. export function startPromotionWorker() { loadIdleThreshold(); // Mount the SMB landing zone before the first scan so we watch the SAME share