From 17646c11558be96fae0ca17f359307e5f34f97a2 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sat, 16 May 2026 17:38:53 -0400 Subject: [PATCH] fix(jobs): read from BullMQ queues instead of empty DB table GET /api/v1/jobs now queries the proxy, thumbnail, and conform BullMQ queues directly and returns normalized job objects with id, type, status, progress, asset_id, timestamps, and error fields. Also adds DELETE /:id to remove completed/failed jobs from the queue, supporting the clearCompleted action in jobs.html. The PostgreSQL jobs table is still used only for conform job creation (POST /conform) to preserve that workflow. --- services/mam-api/src/routes/jobs.js | 157 +++++++++++++++++++--------- 1 file changed, 107 insertions(+), 50 deletions(-) diff --git a/services/mam-api/src/routes/jobs.js b/services/mam-api/src/routes/jobs.js index 236bb7a..eca469e 100644 --- a/services/mam-api/src/routes/jobs.js +++ b/services/mam-api/src/routes/jobs.js @@ -5,10 +5,9 @@ import { Queue } from 'bullmq'; import { v4 as uuidv4 } from 'uuid'; const router = express.Router(); - router.use(requireAuth); -// Initialize BullMQ queue for conform jobs +// ── Redis connection ────────────────────────────────────────────────────────── const parseRedisUrl = (url) => { try { const parsed = new URL(url); @@ -18,66 +17,132 @@ const parseRedisUrl = (url) => { } }; -const conformQueue = new Queue('conform', { - connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), -}); +const redisConn = parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'); -// GET / - List jobs +const proxyQueue = new Queue('proxy', { connection: redisConn }); +const thumbnailQueue = new Queue('thumbnail', { connection: redisConn }); +const conformQueue = new Queue('conform', { connection: redisConn }); + +const QUEUES = [ + { queue: proxyQueue, type: 'proxy' }, + { queue: thumbnailQueue, type: 'thumbnail' }, + { queue: conformQueue, type: 'conform' }, +]; + +// BullMQ state → API status mapping +const STATE_MAP = { + waiting: 'waiting', + active: 'active', + completed:'completed', + failed: 'failed', + delayed: 'waiting', + paused: 'waiting', +}; + +function normalizeJob(bullJob, type, apiStatus) { + const isCompleted = apiStatus === 'completed'; + const isFailed = apiStatus === 'failed'; + return { + id: `${type}:${bullJob.id}`, + type, + status: apiStatus, + progress: typeof bullJob.progress === 'number' ? bullJob.progress : 0, + asset_id: bullJob.data?.assetId || null, + asset_name: bullJob.data?.assetName || null, + created_at: bullJob.timestamp ? new Date(bullJob.timestamp).toISOString() : null, + started_at: bullJob.processedOn ? new Date(bullJob.processedOn).toISOString() : null, + completed_at: isCompleted && bullJob.finishedOn ? new Date(bullJob.finishedOn).toISOString() : null, + failed_at: isFailed && bullJob.finishedOn ? new Date(bullJob.finishedOn).toISOString() : null, + error: bullJob.failedReason || null, + metadata: bullJob.data || {}, + }; +} + +async function getAllBullMQJobs() { + const results = []; + for (const { queue, type } of QUEUES) { + for (const [bullState, apiStatus] of Object.entries(STATE_MAP)) { + try { + const jobs = await queue.getJobs([bullState], 0, 200); + for (const job of jobs) { + results.push(normalizeJob(job, type, apiStatus)); + } + } catch { + // queue may be empty or unavailable for this state – skip + } + } + } + return results; +} + +// ── GET / - List jobs (BullMQ queues) ──────────────────────────────────────── router.get('/', async (req, res, next) => { try { const { type, status, asset_id } = req.query; + let jobs = await getAllBullMQJobs(); - let query = 'SELECT * FROM jobs WHERE 1=1'; - const params = []; - let paramCount = 1; + if (type) jobs = jobs.filter(j => j.type === type); + if (status) jobs = jobs.filter(j => j.status === status); + if (asset_id) jobs = jobs.filter(j => j.asset_id === asset_id); - if (type) { - query += ` AND type = $${paramCount++}`; - params.push(type); - } - - if (status) { - query += ` AND status = $${paramCount++}`; - params.push(status); - } - - if (asset_id) { - query += ` AND asset_id = $${paramCount++}`; - params.push(asset_id); - } - - query += ' ORDER BY created_at DESC'; - - const result = await pool.query(query, params); - res.json(result.rows); + jobs.sort((a, b) => new Date(b.created_at || 0) - new Date(a.created_at || 0)); + res.json(jobs); } catch (err) { next(err); } }); -// GET /:id - Single job with progress +// ── GET /:id - Single job ───────────────────────────────────────────────────── router.get('/:id', async (req, res, next) => { try { const { id } = req.params; + // id format: "type:bullId" e.g. "proxy:1" + const colonIdx = id.indexOf(':'); + const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null; + const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id; - const result = await pool.query( - 'SELECT * FROM jobs WHERE id = $1', - [id] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Job not found' }); + for (const { queue, type } of QUEUES) { + if (qType && type !== qType) continue; + try { + const job = await queue.getJob(bullId); + if (job) { + const state = await job.getState(); + const apiStatus = STATE_MAP[state] || state; + return res.json(normalizeJob(job, type, apiStatus)); + } + } catch { /* try next queue */ } } - - const job = result.rows[0]; - - res.json(job); + res.status(404).json({ error: 'Job not found' }); } catch (err) { next(err); } }); -// POST /conform - Submit conform job +// ── DELETE /:id - Remove a job ──────────────────────────────────────────────── +router.delete('/:id', async (req, res, next) => { + try { + const { id } = req.params; + const colonIdx = id.indexOf(':'); + const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null; + const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id; + + for (const { queue, type } of QUEUES) { + if (qType && type !== qType) continue; + try { + const job = await queue.getJob(bullId); + if (job) { + await job.remove(); + return res.json({ success: true }); + } + } catch { /* try next queue */ } + } + res.status(404).json({ error: 'Job not found' }); + } catch (err) { + next(err); + } +}); + +// ── POST /conform - Submit a conform job ────────────────────────────────────── router.post('/conform', async (req, res, next) => { try { const { edl, project_id, output_format } = req.body; @@ -90,23 +155,15 @@ router.post('/conform', async (req, res, next) => { const jobId = uuidv4(); - // Create job record in database const result = await pool.query( `INSERT INTO jobs (id, type, status, project_id, metadata, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) RETURNING *`, - [ - jobId, - 'conform', - 'pending', - project_id, - JSON.stringify({ edl, output_format }), - ] + [jobId, 'conform', 'pending', project_id, JSON.stringify({ edl, output_format })] ); const job = result.rows[0]; - // Add to BullMQ queue with camelCase keys matching the worker's destructuring await conformQueue.add('conform-task', { jobId, edl,