fix: bulk-fetch jobs by state (no N+1 getState()); add GET /events SSE stream

This commit is contained in:
Zac Gaetano 2026-05-19 23:09:47 -04:00
parent d382c6b559
commit 8a2ef38326

View file

@ -30,14 +30,17 @@ const QUEUES = [
// BullMQ state → API status mapping
const STATE_MAP = {
waiting: 'waiting',
active: 'active',
completed:'completed',
failed: 'failed',
delayed: 'waiting',
paused: 'waiting',
waiting: 'waiting',
active: 'active',
completed: 'completed',
failed: 'failed',
delayed: 'waiting',
paused: 'waiting',
};
// Ordered state buckets used for bulk fetch — avoids N+1 getState() calls.
const STATE_BUCKETS = ['active', 'waiting', 'completed', 'failed', 'delayed', 'paused'];
function normalizeJob(bullJob, type, apiStatus) {
const isCompleted = apiStatus === 'completed';
const isFailed = apiStatus === 'failed';
@ -57,24 +60,56 @@ function normalizeJob(bullJob, type, apiStatus) {
};
}
// Fetch all jobs from all queues in bulk by state bucket (no per-job getState() calls).
async function getAllBullMQJobs() {
const results = [];
const allStates = ['waiting', 'active', 'completed', 'failed', 'delayed', 'paused'];
for (const { queue, type } of QUEUES) {
try {
const jobs = await queue.getJobs(allStates, 0, 200);
for (const job of jobs) {
const state = await job.getState();
const apiStatus = STATE_MAP[state] || state;
results.push(normalizeJob(job, type, apiStatus));
for (const bucket of STATE_BUCKETS) {
try {
const apiStatus = STATE_MAP[bucket] || bucket;
// getJobs([state], start, end) returns at most 200 per bucket
const jobs = await queue.getJobs([bucket], 0, 200);
for (const job of jobs) {
results.push(normalizeJob(job, type, apiStatus));
}
} catch {
// queue or bucket unavailable — skip
}
} catch {
// queue may be unavailable skip
}
}
return results;
}
// ── GET /events Server-Sent Events stream of live job updates ───────────────
//
// Must be declared BEFORE GET /:id so the literal path "events" isn't treated
// as a job-id parameter.
//
router.get('/events', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // disable nginx proxy buffering
res.flushHeaders();
let closed = false;
req.on('close', () => { closed = true; });
const push = async () => {
if (closed) return;
try {
const jobs = await getAllBullMQJobs();
if (!closed) res.write(`data: ${JSON.stringify({ type: 'jobs', jobs })}\n\n`);
} catch (err) {
if (!closed) res.write(`data: ${JSON.stringify({ type: 'error', message: err.message })}\n\n`);
}
if (!closed) setTimeout(push, 2000);
};
// Send initial snapshot immediately, then every 2 s
await push();
});
// ── GET / - List jobs (BullMQ queues) ────────────────────────────────────────
router.get('/', async (req, res, next) => {
try {