fix(promotion): singleton proxyQueue; await promote(); return shutdown fn (issue #94 bugs 3, 4)
This commit is contained in:
parent
e289554e44
commit
a6c9529c50
1 changed files with 37 additions and 12 deletions
|
|
@ -6,9 +6,10 @@
|
||||||
// Why a poll loop and not chokidar: NFS/SMB mounts don't reliably surface
|
// Why a poll loop and not chokidar: NFS/SMB mounts don't reliably surface
|
||||||
// inotify events through the kernel; mtime polling is the boring-but-works
|
// inotify events through the kernel; mtime polling is the boring-but-works
|
||||||
// answer for fairness across all storage backends.
|
// answer for fairness across all storage backends.
|
||||||
import { readdir, stat, unlink, readFile } from 'node:fs/promises';
|
import { readdir, stat, unlink } from 'node:fs/promises';
|
||||||
import { join, relative, basename } from 'node:path';
|
import { join, relative, basename } from 'node:path';
|
||||||
import { createReadStream } from 'node:fs';
|
import { createReadStream } from 'node:fs';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
import { query } from '../db/client.js';
|
import { query } from '../db/client.js';
|
||||||
import { uploadStreamToS3 } from '../s3/client.js';
|
import { uploadStreamToS3 } from '../s3/client.js';
|
||||||
|
|
||||||
|
|
@ -19,6 +20,24 @@ const POLL_MS = 5000;
|
||||||
let inflight = new Set();
|
let inflight = new Set();
|
||||||
let idleThresholdMs = 8000;
|
let idleThresholdMs = 8000;
|
||||||
|
|
||||||
|
// BUG FIX #3: Create a single module-level proxyQueue instead of spinning up
|
||||||
|
// a new Queue connection (and closing it) on every promoted file. Repeatedly
|
||||||
|
// creating and closing BullMQ Queues hammers Redis with connect/disconnect
|
||||||
|
// cycles and leaks connections when the close() call races with the add().
|
||||||
|
const parseRedisUrl = (url) => {
|
||||||
|
const parsed = new URL(url);
|
||||||
|
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
|
||||||
|
};
|
||||||
|
|
||||||
|
const proxyQueue = new Queue('proxy', {
|
||||||
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Interval handles — stored so startPromotionWorker() can expose a shutdown
|
||||||
|
// function for the SIGTERM handler in index.js (BUG FIX #4).
|
||||||
|
let scanInterval = null;
|
||||||
|
let thresholdInterval = null;
|
||||||
|
|
||||||
async function loadIdleThreshold() {
|
async function loadIdleThreshold() {
|
||||||
try {
|
try {
|
||||||
const r = await query(
|
const r = await query(
|
||||||
|
|
@ -80,19 +99,13 @@ async function promote(filePath) {
|
||||||
[s3Key, st.size, asset.id]
|
[s3Key, st.size, asset.id]
|
||||||
);
|
);
|
||||||
|
|
||||||
// Queue the proxy job so the editor gets a browser-playable proxy and
|
// BUG FIX #3 (cont): Use the module-level proxyQueue singleton instead of
|
||||||
// the panel's "relink to hi-res" path becomes available.
|
// creating a new Queue + closing it inside every promote() call.
|
||||||
const { Queue } = await import('bullmq');
|
|
||||||
const { hostname, port } = new URL(process.env.REDIS_URL || 'redis://queue:6379');
|
|
||||||
const proxyQueue = new Queue('proxy', {
|
|
||||||
connection: { host: hostname, port: parseInt(port, 10) || 6379 },
|
|
||||||
});
|
|
||||||
await proxyQueue.add('generate', {
|
await proxyQueue.add('generate', {
|
||||||
assetId: asset.id,
|
assetId: asset.id,
|
||||||
inputKey: s3Key,
|
inputKey: s3Key,
|
||||||
outputKey: `proxies/${asset.id}.mp4`,
|
outputKey: `proxies/${asset.id}.mp4`,
|
||||||
});
|
});
|
||||||
await proxyQueue.close();
|
|
||||||
|
|
||||||
await unlink(filePath).catch(err => {
|
await unlink(filePath).catch(err => {
|
||||||
console.warn(`[promotion] could not unlink ${rel}: ${err.message}`);
|
console.warn(`[promotion] could not unlink ${rel}: ${err.message}`);
|
||||||
|
|
@ -112,14 +125,26 @@ async function scan() {
|
||||||
let st;
|
let st;
|
||||||
try { st = await stat(file); } catch (_) { continue; }
|
try { st = await stat(file); } catch (_) { continue; }
|
||||||
if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) {
|
if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) {
|
||||||
promote(file);
|
// BUG FIX #3 (cont): await promote() so that unhandled promise rejections
|
||||||
|
// don't escape the scan loop. promote() already catches internally, so
|
||||||
|
// this is belt-and-suspenders against future changes.
|
||||||
|
await promote(file);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BUG FIX #4: Return a shutdown function so index.js can clear the intervals
|
||||||
|
// and close the queue connection during SIGTERM.
|
||||||
export function startPromotionWorker() {
|
export function startPromotionWorker() {
|
||||||
loadIdleThreshold();
|
loadIdleThreshold();
|
||||||
setInterval(loadIdleThreshold, 60_000);
|
thresholdInterval = setInterval(loadIdleThreshold, 60_000);
|
||||||
setInterval(scan, POLL_MS);
|
scanInterval = setInterval(scan, POLL_MS);
|
||||||
console.log(`[promotion] watching ${GROWING_PATH} (idle threshold ${idleThresholdMs}ms)`);
|
console.log(`[promotion] watching ${GROWING_PATH} (idle threshold ${idleThresholdMs}ms)`);
|
||||||
|
|
||||||
|
return async function stopPromotionWorker() {
|
||||||
|
if (scanInterval) clearInterval(scanInterval);
|
||||||
|
if (thresholdInterval) clearInterval(thresholdInterval);
|
||||||
|
await proxyQueue.close().catch(() => {});
|
||||||
|
console.log('[promotion] worker stopped');
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue