dragonflight/services/worker/src/index.js
Zac 562881f0db fix(jobs): stall detection + manual kill button so 5h-stuck actives can't happen
A thumbnail job from earlier stayed 'active' for 6+ hours: worker was restarted at 70% progress, BullMQ left it in the active set, and there was no stall reaper because the worker was created with only the default options.

Worker now passes stalledInterval: 30000, lockDuration: 60000, lockRenewTime: 15000, maxStalledCount: 1 to the Worker constructor. If a run dies, BullMQ reclaims the job back to waiting within 30s and a 'stalled' event is logged. Otherwise the lock is renewed mid-job.

Jobs UI gains a 'Kill' button per row next to Details. Calls DELETE /api/v1/jobs/:id which already removes the job from Redis. Use it on any row that looks stuck.
2026-05-17 19:10:19 -04:00

62 lines
1.8 KiB
JavaScript

import 'dotenv/config';
import { Worker } from 'bullmq';
import { proxyWorker } from './workers/proxy.js';
import { thumbnailWorker } from './workers/thumbnail.js';
import { conformWorker } from './workers/conform.js';
const parseRedisUrl = (url) => {
const parsed = new URL(url);
return {
host: parsed.hostname,
port: parseInt(parsed.port, 10),
password: parsed.password || undefined,
};
};
const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379');
const createWorker = (queueName, handler) => {
const worker = new Worker(queueName, handler, {
connection: redisOptions,
// Stall detection: if a worker dies mid-job, BullMQ moves it back to wait
// after stalledInterval. Without this a crashed run sits in active forever.
stalledInterval: 30000,
maxStalledCount: 1,
lockDuration: 60000,
lockRenewTime: 15000,
});
worker.on('completed', (job) => {
console.log(`[${queueName}] Job ${job.id} completed`);
});
worker.on('failed', (job, err) => {
console.error(`[${queueName}] Job ${job.id} failed:`, err.message);
});
worker.on('stalled', (jobId) => {
console.warn(`[${queueName}] Job ${jobId} stalled — reclaimed`);
});
worker.on('progress', (job, progress) => {
console.log(`[${queueName}] Job ${job.id} progress:`, progress);
});
return worker;
};
const workers = [
createWorker('proxy', proxyWorker),
createWorker('thumbnail', thumbnailWorker),
createWorker('conform', conformWorker),
];
console.log('Wild Dragon Worker Service started');
console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`);
console.log('Active queues: proxy, thumbnail, conform');
process.on('SIGTERM', async () => {
console.log('SIGTERM received, shutting down...');
await Promise.all(workers.map(w => w.close()));
process.exit(0);
});