Compare commits

..

No commits in common. "a6d789279c8236d93764bda0445c77307ef0e52c" and "2b85fb49df8d9cf11dfd896a16c92c93db14adf8" have entirely different histories.

2 changed files with 9 additions and 55 deletions

View file

@ -207,13 +207,7 @@ router.post('/:id/retry', async (req, res, next) => {
} }
}); });
// ── DELETE /:id - Remove a job (also handles cancel for active jobs) ───────── // ── DELETE /:id - Remove a job ────────────────────────────────────────────────
// BullMQ refuses job.remove() while a job is in the 'active' state. Before this
// fix the route caught that error and fell through to a misleading 404, so
// operators couldn't kill a stalled-active job from the UI. Now we detect the
// active state explicitly: moveToFailed with the magic '0' token bypasses the
// per-worker lock check and transitions active → failed (freeing the queue's
// concurrency slot), then remove() drops the row.
router.delete('/:id', async (req, res, next) => { router.delete('/:id', async (req, res, next) => {
try { try {
const { id } = req.params; const { id } = req.params;
@ -221,44 +215,16 @@ router.delete('/:id', async (req, res, next) => {
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null; const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id; const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
let lastErr = null;
for (const { queue, type } of QUEUES) { for (const { queue, type } of QUEUES) {
if (qType && type !== qType) continue; if (qType && type !== qType) continue;
let job;
try {
job = await queue.getJob(bullId);
} catch (err) {
// Queue-level lookup error: remember it so we don't mask it with 404.
lastErr = err;
continue;
}
if (!job) continue;
const state = await job.getState();
if (state === 'active') {
// Token '0' tells BullMQ to skip the worker-lock check — necessary
// because the operator-side cancel doesn't hold the worker's lock.
try {
await job.moveToFailed(new Error('Cancelled by operator'), '0', false);
} catch (err) {
// Lock owned by a still-living worker; fall back to discard + remove
// so at least the result is thrown away and the row is gone.
try { await job.discard(); } catch (_) {}
}
}
try { try {
const job = await queue.getJob(bullId);
if (job) {
await job.remove(); await job.remove();
} catch (err) { return res.json({ success: true });
// Last-resort obliteration of the job row via raw Redis. This is
// the path stalled jobs hit when moveToFailed couldn't transition
// them either.
const client = await queue.client;
const prefix = queue.toKey(bullId);
await client.del(prefix);
} }
return res.json({ success: true, cancelled: state === 'active' }); } catch { /* try next queue */ }
} }
if (lastErr) return next(lastErr);
res.status(404).json({ error: 'Job not found' }); res.status(404).json({ error: 'Job not found' });
} catch (err) { } catch (err) {
next(err); next(err);

View file

@ -48,20 +48,10 @@ const createWorker = (queueName, handler, overrides = {}) => {
return worker; return worker;
}; };
// Per-queue concurrency. Defaults to 1, which serialises every job in a
// queue — meaning a single stalled job blocks every other one. We want
// thumbnails (cheap, parallel-safe) to run several at a time so a slow
// outlier doesn't back the rest of the catalog up. Proxy + conform are
// heavier (ffmpeg transcode) so we keep them lower to avoid trashing
// the box; tune via env if a node has more headroom.
const PROXY_CONCURRENCY = parseInt(process.env.PROXY_CONCURRENCY || '2', 10);
const THUMBNAIL_CONCURRENCY = parseInt(process.env.THUMBNAIL_CONCURRENCY || '4', 10);
const CONFORM_CONCURRENCY = parseInt(process.env.CONFORM_CONCURRENCY || '1', 10);
const workers = [ const workers = [
createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }), createWorker('proxy', proxyWorker),
createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }), createWorker('thumbnail', thumbnailWorker),
createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }), createWorker('conform', conformWorker),
// YouTube imports: keep concurrency at 1 so we don't burn through rate // YouTube imports: keep concurrency at 1 so we don't burn through rate
// limits when several jobs land back-to-back. Lock window is longer than // limits when several jobs land back-to-back. Lock window is longer than
// the default because a long video download can run for minutes. // the default because a long video download can run for minutes.
@ -72,8 +62,6 @@ const workers = [
}), }),
]; ];
console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCURRENCY} conform=${CONFORM_CONCURRENCY} import=1`);
startPromotionWorker(); startPromotionWorker();
console.log('Wild Dragon Worker Service started'); console.log('Wild Dragon Worker Service started');