diff --git a/services/mam-api/src/routes/jobs.js b/services/mam-api/src/routes/jobs.js index 4677dba..1a83b15 100644 --- a/services/mam-api/src/routes/jobs.js +++ b/services/mam-api/src/routes/jobs.js @@ -30,14 +30,17 @@ const QUEUES = [ // BullMQ state → API status mapping const STATE_MAP = { - waiting: 'waiting', - active: 'active', - completed:'completed', - failed: 'failed', - delayed: 'waiting', - paused: 'waiting', + waiting: 'waiting', + active: 'active', + completed: 'completed', + failed: 'failed', + delayed: 'waiting', + paused: 'waiting', }; +// Ordered state buckets used for bulk fetch — avoids N+1 getState() calls. +const STATE_BUCKETS = ['active', 'waiting', 'completed', 'failed', 'delayed', 'paused']; + function normalizeJob(bullJob, type, apiStatus) { const isCompleted = apiStatus === 'completed'; const isFailed = apiStatus === 'failed'; @@ -57,24 +60,56 @@ function normalizeJob(bullJob, type, apiStatus) { }; } +// Fetch all jobs from all queues in bulk by state bucket (no per-job getState() calls). async function getAllBullMQJobs() { const results = []; - const allStates = ['waiting', 'active', 'completed', 'failed', 'delayed', 'paused']; for (const { queue, type } of QUEUES) { - try { - const jobs = await queue.getJobs(allStates, 0, 200); - for (const job of jobs) { - const state = await job.getState(); - const apiStatus = STATE_MAP[state] || state; - results.push(normalizeJob(job, type, apiStatus)); + for (const bucket of STATE_BUCKETS) { + try { + const apiStatus = STATE_MAP[bucket] || bucket; + // getJobs([state], start, end) returns at most 200 per bucket + const jobs = await queue.getJobs([bucket], 0, 200); + for (const job of jobs) { + results.push(normalizeJob(job, type, apiStatus)); + } + } catch { + // queue or bucket unavailable — skip } - } catch { - // queue may be unavailable – skip } } return results; } +// ── GET /events – Server-Sent Events stream of live job updates ─────────────── +// +// Must be declared BEFORE GET /:id so the literal path "events" isn't treated +// as a job-id parameter. +// +router.get('/events', async (req, res) => { + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); // disable nginx proxy buffering + res.flushHeaders(); + + let closed = false; + req.on('close', () => { closed = true; }); + + const push = async () => { + if (closed) return; + try { + const jobs = await getAllBullMQJobs(); + if (!closed) res.write(`data: ${JSON.stringify({ type: 'jobs', jobs })}\n\n`); + } catch (err) { + if (!closed) res.write(`data: ${JSON.stringify({ type: 'error', message: err.message })}\n\n`); + } + if (!closed) setTimeout(push, 2000); + }; + + // Send initial snapshot immediately, then every 2 s + await push(); +}); + // ── GET / - List jobs (BullMQ queues) ──────────────────────────────────────── router.get('/', async (req, res, next) => { try {