feat/fix: promotion.js — growing migrate flow + deltacast cleanup
This commit is contained in:
parent
167d9ad009
commit
1f342826bd
1 changed files with 23 additions and 32 deletions
|
|
@ -1,11 +1,3 @@
|
||||||
// Promotion worker — scans the growing-files SMB landing zone for stable
|
|
||||||
// captures (no mtime change for N seconds) and uploads them to S3, flipping
|
|
||||||
// the matching asset's status from 'growing' to 'ready' so the Premiere
|
|
||||||
// panel can relink to the proxy/hi-res URLs.
|
|
||||||
//
|
|
||||||
// Why a poll loop and not chokidar: NFS/SMB mounts don't reliably surface
|
|
||||||
// inotify events through the kernel; mtime polling is the boring-but-works
|
|
||||||
// answer for fairness across all storage backends.
|
|
||||||
import { readdir, stat, unlink, mkdir, writeFile } from 'node:fs/promises';
|
import { readdir, stat, unlink, mkdir, writeFile } from 'node:fs/promises';
|
||||||
import { execFileSync } from 'node:child_process';
|
import { execFileSync } from 'node:child_process';
|
||||||
import { join, relative, basename } from 'node:path';
|
import { join, relative, basename } from 'node:path';
|
||||||
|
|
@ -81,10 +73,9 @@ async function ensureGrowingShareMounted() {
|
||||||
let inflight = new Set();
|
let inflight = new Set();
|
||||||
let idleThresholdMs = 8000;
|
let idleThresholdMs = 8000;
|
||||||
|
|
||||||
// BUG FIX #3: Create a single module-level proxyQueue instead of spinning up
|
// Single module-level proxyQueue — avoids creating and closing a new Queue
|
||||||
// a new Queue connection (and closing it) on every promoted file. Repeatedly
|
// connection on every promoted file, which hammers Redis with connect/disconnect
|
||||||
// creating and closing BullMQ Queues hammers Redis with connect/disconnect
|
// cycles and leaks connections when close() races with add().
|
||||||
// cycles and leaks connections when the close() call races with the add().
|
|
||||||
const parseRedisUrl = (url) => {
|
const parseRedisUrl = (url) => {
|
||||||
const parsed = new URL(url);
|
const parsed = new URL(url);
|
||||||
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
|
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
|
||||||
|
|
@ -95,7 +86,7 @@ const proxyQueue = new Queue('proxy', {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Interval handles — stored so startPromotionWorker() can expose a shutdown
|
// Interval handles — stored so startPromotionWorker() can expose a shutdown
|
||||||
// function for the SIGTERM handler in index.js (BUG FIX #4).
|
// function for the SIGTERM handler in index.js.
|
||||||
let scanInterval = null;
|
let scanInterval = null;
|
||||||
let thresholdInterval = null;
|
let thresholdInterval = null;
|
||||||
|
|
||||||
|
|
@ -111,8 +102,12 @@ async function loadIdleThreshold() {
|
||||||
|
|
||||||
async function* walk(dir) {
|
async function* walk(dir) {
|
||||||
let entries = [];
|
let entries = [];
|
||||||
try { entries = await readdir(dir, { withFileTypes: true }); }
|
try {
|
||||||
catch (_) { return; }
|
entries = await readdir(dir, { withFileTypes: true });
|
||||||
|
} catch (err) {
|
||||||
|
console.warn(`[promotion] readdir failed for ${dir}: ${err.message}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
for (const e of entries) {
|
for (const e of entries) {
|
||||||
const full = join(dir, e.name);
|
const full = join(dir, e.name);
|
||||||
if (e.isDirectory()) yield* walk(full);
|
if (e.isDirectory()) yield* walk(full);
|
||||||
|
|
@ -146,20 +141,16 @@ async function promote(filePath) {
|
||||||
}
|
}
|
||||||
const asset = r.rows[0];
|
const asset = r.rows[0];
|
||||||
|
|
||||||
// CRITICAL: do not promote while the recorder is STILL RECORDING this
|
// Do not promote while the recorder is STILL RECORDING this session.
|
||||||
// session. The mtime-idle heuristic is unreliable over CIFS (attribute
|
// The mtime-idle heuristic is unreliable over CIFS (attribute caching makes
|
||||||
// caching makes an actively-growing MXF look "stable"), which caused the
|
// an actively-growing MXF look "stable"). Gate on the recorder's live
|
||||||
// worker to grab a live file mid-record (~15s in), upload it, flip the
|
// status: only promote once recording stopped.
|
||||||
// asset to 'ready', and unlink it — "a worker is stealing the file". The
|
|
||||||
// growing asset's display_name IS the recorder's current_session_id, so
|
|
||||||
// gate on the recorder's live status: only promote once recording stopped.
|
|
||||||
const recActive = await query(
|
const recActive = await query(
|
||||||
`SELECT 1 FROM recorders
|
`SELECT 1 FROM recorders
|
||||||
WHERE current_session_id = $1 AND status = 'recording' LIMIT 1`,
|
WHERE current_session_id = $1 AND status = 'recording' LIMIT 1`,
|
||||||
[clipName]
|
[clipName]
|
||||||
);
|
);
|
||||||
if (recActive.rows.length > 0) {
|
if (recActive.rows.length > 0) {
|
||||||
// Still recording — leave the growing file in place for the editor.
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -177,18 +168,21 @@ async function promote(filePath) {
|
||||||
[s3Key, st.size, asset.id]
|
[s3Key, st.size, asset.id]
|
||||||
);
|
);
|
||||||
|
|
||||||
// BUG FIX #3 (cont): Use the module-level proxyQueue singleton instead of
|
|
||||||
// creating a new Queue + closing it inside every promote() call.
|
|
||||||
await proxyQueue.add('generate', {
|
await proxyQueue.add('generate', {
|
||||||
assetId: asset.id,
|
assetId: asset.id,
|
||||||
inputKey: s3Key,
|
inputKey: s3Key,
|
||||||
outputKey: `proxies/${asset.id}.mp4`,
|
outputKey: `proxies/${asset.id}.mp4`,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
console.log(`[promotion] asset ${asset.id} promoted, proxy queued`);
|
||||||
|
|
||||||
|
// Unlink the source file from the SMB share — best-effort cleanup only.
|
||||||
|
// The file is already safely in S3 and the asset is 'ready'. CIFS/SMB
|
||||||
|
// can return EIO if the share was remounted or the remote node rebooted
|
||||||
|
// between upload and unlink; log the error but do not fail the job.
|
||||||
await unlink(filePath).catch(err => {
|
await unlink(filePath).catch(err => {
|
||||||
console.warn(`[promotion] could not unlink ${rel}: ${err.message}`);
|
console.warn(`[promotion] could not unlink ${rel} (best-effort, file already in S3): ${err.message}`);
|
||||||
});
|
});
|
||||||
console.log(`[promotion] asset ${asset.id} promoted, proxy queued, local file removed`);
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error('[promotion] failed for', filePath, err);
|
console.error('[promotion] failed for', filePath, err);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
@ -203,16 +197,13 @@ async function scan() {
|
||||||
let st;
|
let st;
|
||||||
try { st = await stat(file); } catch (_) { continue; }
|
try { st = await stat(file); } catch (_) { continue; }
|
||||||
if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) {
|
if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) {
|
||||||
// BUG FIX #3 (cont): await promote() so that unhandled promise rejections
|
|
||||||
// don't escape the scan loop. promote() already catches internally, so
|
|
||||||
// this is belt-and-suspenders against future changes.
|
|
||||||
await promote(file);
|
await promote(file);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// BUG FIX #4: Return a shutdown function so index.js can clear the intervals
|
// Return a shutdown function so index.js can clear the intervals and close the
|
||||||
// and close the queue connection during SIGTERM.
|
// queue connection during SIGTERM.
|
||||||
export function startPromotionWorker() {
|
export function startPromotionWorker() {
|
||||||
loadIdleThreshold();
|
loadIdleThreshold();
|
||||||
// Mount the SMB landing zone before the first scan so we watch the SAME share
|
// Mount the SMB landing zone before the first scan so we watch the SAME share
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue