fix(jobs): real cancel for active jobs + multi-threaded thumbnail worker
DELETE /jobs/:id was throwing "404 not found" when the operator tried to
cancel a running job. BullMQ refuses job.remove() while a job is in the
active state; the route caught that error and fell through to the
404 branch, which was misleading because the job actually exists — the
queue was just refusing to drop it from under the worker.
Fix:
- Detect 'active' state explicitly and call moveToFailed(err, '0', false)
first. Token '0' bypasses the per-worker lock check (the operator-side
cancel doesn't hold the worker lock). That transitions active -> failed
and frees the queue's concurrency slot.
- If moveToFailed itself fails (lock owned by a live worker), fall back
to job.discard() so at least the result is thrown away.
- If remove() then fails (stalled, broken state), drop the job's Redis
key directly via queue.client. Last-resort obliteration.
- Stop swallowing getJob() errors — if Redis is sad, surface it via
next(err) instead of returning a misleading 404.
- Return { cancelled: true } when the job was active, so the client
can show "Cancelled" rather than "Removed" in any future toast.
While here: thumbnail jobs now run with concurrency 4 by default
(proxy 2, conform 1, import 1 unchanged). Every queue defaulted to
concurrency 1 before, so a single stalled job blocked the entire queue.
All three are overridable via PROXY_CONCURRENCY / THUMBNAIL_CONCURRENCY
/ CONFORM_CONCURRENCY env vars for nodes with more headroom.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
2b85fb49df
commit
91325a4267
2 changed files with 55 additions and 9 deletions
|
|
@ -207,7 +207,13 @@ router.post('/:id/retry', async (req, res, next) => {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// ── DELETE /:id - Remove a job ────────────────────────────────────────────────
|
// ── DELETE /:id - Remove a job (also handles cancel for active jobs) ─────────
|
||||||
|
// BullMQ refuses job.remove() while a job is in the 'active' state. Before this
|
||||||
|
// fix the route caught that error and fell through to a misleading 404, so
|
||||||
|
// operators couldn't kill a stalled-active job from the UI. Now we detect the
|
||||||
|
// active state explicitly: moveToFailed with the magic '0' token bypasses the
|
||||||
|
// per-worker lock check and transitions active → failed (freeing the queue's
|
||||||
|
// concurrency slot), then remove() drops the row.
|
||||||
router.delete('/:id', async (req, res, next) => {
|
router.delete('/:id', async (req, res, next) => {
|
||||||
try {
|
try {
|
||||||
const { id } = req.params;
|
const { id } = req.params;
|
||||||
|
|
@ -215,16 +221,44 @@ router.delete('/:id', async (req, res, next) => {
|
||||||
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
|
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
|
||||||
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
|
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
|
||||||
|
|
||||||
|
let lastErr = null;
|
||||||
for (const { queue, type } of QUEUES) {
|
for (const { queue, type } of QUEUES) {
|
||||||
if (qType && type !== qType) continue;
|
if (qType && type !== qType) continue;
|
||||||
|
let job;
|
||||||
try {
|
try {
|
||||||
const job = await queue.getJob(bullId);
|
job = await queue.getJob(bullId);
|
||||||
if (job) {
|
} catch (err) {
|
||||||
await job.remove();
|
// Queue-level lookup error: remember it so we don't mask it with 404.
|
||||||
return res.json({ success: true });
|
lastErr = err;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!job) continue;
|
||||||
|
|
||||||
|
const state = await job.getState();
|
||||||
|
if (state === 'active') {
|
||||||
|
// Token '0' tells BullMQ to skip the worker-lock check — necessary
|
||||||
|
// because the operator-side cancel doesn't hold the worker's lock.
|
||||||
|
try {
|
||||||
|
await job.moveToFailed(new Error('Cancelled by operator'), '0', false);
|
||||||
|
} catch (err) {
|
||||||
|
// Lock owned by a still-living worker; fall back to discard + remove
|
||||||
|
// so at least the result is thrown away and the row is gone.
|
||||||
|
try { await job.discard(); } catch (_) {}
|
||||||
}
|
}
|
||||||
} catch { /* try next queue */ }
|
}
|
||||||
|
try {
|
||||||
|
await job.remove();
|
||||||
|
} catch (err) {
|
||||||
|
// Last-resort obliteration of the job row via raw Redis. This is
|
||||||
|
// the path stalled jobs hit when moveToFailed couldn't transition
|
||||||
|
// them either.
|
||||||
|
const client = await queue.client;
|
||||||
|
const prefix = queue.toKey(bullId);
|
||||||
|
await client.del(prefix);
|
||||||
|
}
|
||||||
|
return res.json({ success: true, cancelled: state === 'active' });
|
||||||
}
|
}
|
||||||
|
if (lastErr) return next(lastErr);
|
||||||
res.status(404).json({ error: 'Job not found' });
|
res.status(404).json({ error: 'Job not found' });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
next(err);
|
next(err);
|
||||||
|
|
|
||||||
|
|
@ -48,10 +48,20 @@ const createWorker = (queueName, handler, overrides = {}) => {
|
||||||
return worker;
|
return worker;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Per-queue concurrency. Defaults to 1, which serialises every job in a
|
||||||
|
// queue — meaning a single stalled job blocks every other one. We want
|
||||||
|
// thumbnails (cheap, parallel-safe) to run several at a time so a slow
|
||||||
|
// outlier doesn't back the rest of the catalog up. Proxy + conform are
|
||||||
|
// heavier (ffmpeg transcode) so we keep them lower to avoid trashing
|
||||||
|
// the box; tune via env if a node has more headroom.
|
||||||
|
const PROXY_CONCURRENCY = parseInt(process.env.PROXY_CONCURRENCY || '2', 10);
|
||||||
|
const THUMBNAIL_CONCURRENCY = parseInt(process.env.THUMBNAIL_CONCURRENCY || '4', 10);
|
||||||
|
const CONFORM_CONCURRENCY = parseInt(process.env.CONFORM_CONCURRENCY || '1', 10);
|
||||||
|
|
||||||
const workers = [
|
const workers = [
|
||||||
createWorker('proxy', proxyWorker),
|
createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }),
|
||||||
createWorker('thumbnail', thumbnailWorker),
|
createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }),
|
||||||
createWorker('conform', conformWorker),
|
createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }),
|
||||||
// YouTube imports: keep concurrency at 1 so we don't burn through rate
|
// YouTube imports: keep concurrency at 1 so we don't burn through rate
|
||||||
// limits when several jobs land back-to-back. Lock window is longer than
|
// limits when several jobs land back-to-back. Lock window is longer than
|
||||||
// the default because a long video download can run for minutes.
|
// the default because a long video download can run for minutes.
|
||||||
|
|
@ -62,6 +72,8 @@ const workers = [
|
||||||
}),
|
}),
|
||||||
];
|
];
|
||||||
|
|
||||||
|
console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCURRENCY} conform=${CONFORM_CONCURRENCY} import=1`);
|
||||||
|
|
||||||
startPromotionWorker();
|
startPromotionWorker();
|
||||||
|
|
||||||
console.log('Wild Dragon Worker Service started');
|
console.log('Wild Dragon Worker Service started');
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue