From 91325a426727b6aa3dc0fea5ebb25c0803823b1c Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Sat, 23 May 2026 17:23:07 -0400 Subject: [PATCH] fix(jobs): real cancel for active jobs + multi-threaded thumbnail worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DELETE /jobs/:id was throwing "404 not found" when the operator tried to cancel a running job. BullMQ refuses job.remove() while a job is in the active state; the route caught that error and fell through to the 404 branch, which was misleading because the job actually exists — the queue was just refusing to drop it from under the worker. Fix: - Detect 'active' state explicitly and call moveToFailed(err, '0', false) first. Token '0' bypasses the per-worker lock check (the operator-side cancel doesn't hold the worker lock). That transitions active -> failed and frees the queue's concurrency slot. - If moveToFailed itself fails (lock owned by a live worker), fall back to job.discard() so at least the result is thrown away. - If remove() then fails (stalled, broken state), drop the job's Redis key directly via queue.client. Last-resort obliteration. - Stop swallowing getJob() errors — if Redis is sad, surface it via next(err) instead of returning a misleading 404. - Return { cancelled: true } when the job was active, so the client can show "Cancelled" rather than "Removed" in any future toast. While here: thumbnail jobs now run with concurrency 4 by default (proxy 2, conform 1, import 1 unchanged). Every queue defaulted to concurrency 1 before, so a single stalled job blocked the entire queue. All three are overridable via PROXY_CONCURRENCY / THUMBNAIL_CONCURRENCY / CONFORM_CONCURRENCY env vars for nodes with more headroom. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/mam-api/src/routes/jobs.js | 46 +++++++++++++++++++++++++---- services/worker/src/index.js | 18 +++++++++-- 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/services/mam-api/src/routes/jobs.js b/services/mam-api/src/routes/jobs.js index a2f8cd0..f825e74 100644 --- a/services/mam-api/src/routes/jobs.js +++ b/services/mam-api/src/routes/jobs.js @@ -207,7 +207,13 @@ router.post('/:id/retry', async (req, res, next) => { } }); -// ── DELETE /:id - Remove a job ──────────────────────────────────────────────── +// ── DELETE /:id - Remove a job (also handles cancel for active jobs) ───────── +// BullMQ refuses job.remove() while a job is in the 'active' state. Before this +// fix the route caught that error and fell through to a misleading 404, so +// operators couldn't kill a stalled-active job from the UI. Now we detect the +// active state explicitly: moveToFailed with the magic '0' token bypasses the +// per-worker lock check and transitions active → failed (freeing the queue's +// concurrency slot), then remove() drops the row. router.delete('/:id', async (req, res, next) => { try { const { id } = req.params; @@ -215,16 +221,44 @@ router.delete('/:id', async (req, res, next) => { const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null; const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id; + let lastErr = null; for (const { queue, type } of QUEUES) { if (qType && type !== qType) continue; + let job; try { - const job = await queue.getJob(bullId); - if (job) { - await job.remove(); - return res.json({ success: true }); + job = await queue.getJob(bullId); + } catch (err) { + // Queue-level lookup error: remember it so we don't mask it with 404. + lastErr = err; + continue; + } + if (!job) continue; + + const state = await job.getState(); + if (state === 'active') { + // Token '0' tells BullMQ to skip the worker-lock check — necessary + // because the operator-side cancel doesn't hold the worker's lock. + try { + await job.moveToFailed(new Error('Cancelled by operator'), '0', false); + } catch (err) { + // Lock owned by a still-living worker; fall back to discard + remove + // so at least the result is thrown away and the row is gone. + try { await job.discard(); } catch (_) {} } - } catch { /* try next queue */ } + } + try { + await job.remove(); + } catch (err) { + // Last-resort obliteration of the job row via raw Redis. This is + // the path stalled jobs hit when moveToFailed couldn't transition + // them either. + const client = await queue.client; + const prefix = queue.toKey(bullId); + await client.del(prefix); + } + return res.json({ success: true, cancelled: state === 'active' }); } + if (lastErr) return next(lastErr); res.status(404).json({ error: 'Job not found' }); } catch (err) { next(err); diff --git a/services/worker/src/index.js b/services/worker/src/index.js index bb56f1d..7f7746a 100644 --- a/services/worker/src/index.js +++ b/services/worker/src/index.js @@ -48,10 +48,20 @@ const createWorker = (queueName, handler, overrides = {}) => { return worker; }; +// Per-queue concurrency. Defaults to 1, which serialises every job in a +// queue — meaning a single stalled job blocks every other one. We want +// thumbnails (cheap, parallel-safe) to run several at a time so a slow +// outlier doesn't back the rest of the catalog up. Proxy + conform are +// heavier (ffmpeg transcode) so we keep them lower to avoid trashing +// the box; tune via env if a node has more headroom. +const PROXY_CONCURRENCY = parseInt(process.env.PROXY_CONCURRENCY || '2', 10); +const THUMBNAIL_CONCURRENCY = parseInt(process.env.THUMBNAIL_CONCURRENCY || '4', 10); +const CONFORM_CONCURRENCY = parseInt(process.env.CONFORM_CONCURRENCY || '1', 10); + const workers = [ - createWorker('proxy', proxyWorker), - createWorker('thumbnail', thumbnailWorker), - createWorker('conform', conformWorker), + createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }), + createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }), + createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }), // YouTube imports: keep concurrency at 1 so we don't burn through rate // limits when several jobs land back-to-back. Lock window is longer than // the default because a long video download can run for minutes. @@ -62,6 +72,8 @@ const workers = [ }), ]; +console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCURRENCY} conform=${CONFORM_CONCURRENCY} import=1`); + startPromotionWorker(); console.log('Wild Dragon Worker Service started'); -- 2.45.2