From 562881f0db53c01c6d980086ebc20c4963736592 Mon Sep 17 00:00:00 2001 From: Zac Date: Sun, 17 May 2026 19:10:08 -0400 Subject: [PATCH] fix(jobs): stall detection + manual kill button so 5h-stuck actives can't happen A thumbnail job from earlier stayed 'active' for 6+ hours: worker was restarted at 70% progress, BullMQ left it in the active set, and there was no stall reaper because the worker was created with only the default options. Worker now passes stalledInterval: 30000, lockDuration: 60000, lockRenewTime: 15000, maxStalledCount: 1 to the Worker constructor. If a run dies, BullMQ reclaims the job back to waiting within 30s and a 'stalled' event is logged. Otherwise the lock is renewed mid-job. Jobs UI gains a 'Kill' button per row next to Details. Calls DELETE /api/v1/jobs/:id which already removes the job from Redis. Use it on any row that looks stuck. --- services/web-ui/public/jobs.html | 17 ++++++++++++++--- services/worker/src/index.js | 15 +++++++++++++-- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/services/web-ui/public/jobs.html b/services/web-ui/public/jobs.html index f87c205..d52592f 100644 --- a/services/web-ui/public/jobs.html +++ b/services/web-ui/public/jobs.html @@ -656,14 +656,25 @@ function renderRow(job) { ${dur} - + + `; return tr; } +async function killJob(jobId, ev) { + ev.stopPropagation(); + if (!confirm('Remove this job from the queue? If a worker is still processing it, the run is abandoned.')) return; + try { + const r = await fetch('/api/v1/jobs/' + encodeURIComponent(jobId), { method: 'DELETE', credentials: 'include' }); + if (r.ok) { toast('Job removed', 'success'); fetchJobs(); } + else { const d = await r.json().catch(()=>({})); toast('Remove failed: ' + (d.error || r.statusText), 'error'); } + } catch (err) { + toast('Remove failed: ' + err.message, 'error'); + } +} + function statusBadge(status) { const map = { active: 'Active', diff --git a/services/worker/src/index.js b/services/worker/src/index.js index 5c238d7..53163cb 100644 --- a/services/worker/src/index.js +++ b/services/worker/src/index.js @@ -16,7 +16,15 @@ const parseRedisUrl = (url) => { const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379'); const createWorker = (queueName, handler) => { - const worker = new Worker(queueName, handler, { connection: redisOptions }); + const worker = new Worker(queueName, handler, { + connection: redisOptions, + // Stall detection: if a worker dies mid-job, BullMQ moves it back to wait + // after stalledInterval. Without this a crashed run sits in active forever. + stalledInterval: 30000, + maxStalledCount: 1, + lockDuration: 60000, + lockRenewTime: 15000, + }); worker.on('completed', (job) => { console.log(`[${queueName}] Job ${job.id} completed`); @@ -26,7 +34,10 @@ const createWorker = (queueName, handler) => { console.error(`[${queueName}] Job ${job.id} failed:`, err.message); }); - // job.progress is a property (the value set by updateProgress), not a function + worker.on('stalled', (jobId) => { + console.warn(`[${queueName}] Job ${jobId} stalled — reclaimed`); + }); + worker.on('progress', (job, progress) => { console.log(`[${queueName}] Job ${job.id} progress:`, progress); });