import express from 'express'; import pool from '../db/pool.js'; import { requireAuth } from '../middleware/auth.js'; import { Queue } from 'bullmq'; const router = express.Router(); router.use(requireAuth); // ── Redis connection ────────────────────────────────────────────────────────── const parseRedisUrl = (url) => { try { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 }; } catch { return { host: 'localhost', port: 6379 }; } }; const redisConn = parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'); const proxyQueue = new Queue('proxy', { connection: redisConn }); const thumbnailQueue = new Queue('thumbnail', { connection: redisConn }); const conformQueue = new Queue('conform', { connection: redisConn }); const importQueue = new Queue('import', { connection: redisConn }); const QUEUES = [ { queue: proxyQueue, type: 'proxy' }, { queue: thumbnailQueue, type: 'thumbnail' }, { queue: conformQueue, type: 'conform' }, { queue: importQueue, type: 'import' }, ]; // BullMQ state → API status mapping const STATE_MAP = { waiting: 'waiting', active: 'active', completed: 'completed', failed: 'failed', delayed: 'waiting', paused: 'waiting', }; // Ordered state buckets used for bulk fetch — avoids N+1 getState() calls. const STATE_BUCKETS = ['active', 'waiting', 'completed', 'failed', 'delayed', 'paused']; function normalizeJob(bullJob, type, apiStatus) { const isCompleted = apiStatus === 'completed'; const isFailed = apiStatus === 'failed'; return { id: `${type}:${bullJob.id}`, type, status: apiStatus, progress: typeof bullJob.progress === 'number' ? bullJob.progress : 0, asset_id: bullJob.data?.assetId || null, asset_name: bullJob.data?.assetName || null, created_at: bullJob.timestamp ? new Date(bullJob.timestamp).toISOString() : null, started_at: bullJob.processedOn ? new Date(bullJob.processedOn).toISOString() : null, completed_at: isCompleted && bullJob.finishedOn ? new Date(bullJob.finishedOn).toISOString() : null, failed_at: isFailed && bullJob.finishedOn ? new Date(bullJob.finishedOn).toISOString() : null, error: bullJob.failedReason || null, metadata: bullJob.data || {}, }; } // Fetch all jobs from all queues in bulk by state bucket (no per-job getState() calls). async function getAllBullMQJobs() { const results = []; for (const { queue, type } of QUEUES) { for (const bucket of STATE_BUCKETS) { try { const apiStatus = STATE_MAP[bucket] || bucket; const jobs = await queue.getJobs([bucket], 0, 200); for (const job of jobs) { results.push(normalizeJob(job, type, apiStatus)); } } catch { // queue or bucket unavailable — skip } } } return results; } // Mutate `jobs` in place to fill in asset_name from the assets table for any // job that has an assetId but no inline assetName in its payload. One bulk // SQL query per refresh — cheap, and means we don't have to remember to pass // assetName at every enqueue site (upload.js, capture stop, scheduler, etc.). async function attachAssetNames(jobs) { const idsNeedingLookup = [...new Set( jobs.filter(j => j.asset_id && !j.asset_name).map(j => j.asset_id) )]; if (idsNeedingLookup.length === 0) return; let rows = []; try { const result = await pool.query( 'SELECT id, display_name, filename FROM assets WHERE id = ANY($1::uuid[])', [idsNeedingLookup] ); rows = result.rows; } catch { // If the lookup fails (DB down, bad UUID in a stale BullMQ payload), keep // serving jobs without names rather than 500-ing the whole list. return; } const byId = new Map(rows.map(r => [r.id, r.display_name || r.filename])); for (const j of jobs) { if (j.asset_id && !j.asset_name) { const name = byId.get(j.asset_id); if (name) j.asset_name = name; } } } // ── GET /events – Server-Sent Events stream of live job updates ─────────────── router.get('/events', async (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); res.flushHeaders(); let closed = false; req.on('close', () => { closed = true; }); const push = async () => { if (closed) return; try { const jobs = await getAllBullMQJobs(); await attachAssetNames(jobs); if (!closed) res.write(`data: ${JSON.stringify({ type: 'jobs', jobs })}\n\n`); } catch (err) { if (!closed) res.write(`data: ${JSON.stringify({ type: 'error', message: err.message })}\n\n`); } if (!closed) setTimeout(push, 2000); }; await push(); }); // ── GET / - List jobs (BullMQ queues) ──────────────────────────────────────── router.get('/', async (req, res, next) => { try { const { type, status, asset_id } = req.query; let jobs = await getAllBullMQJobs(); await attachAssetNames(jobs); if (type) jobs = jobs.filter(j => j.type === type); if (status) jobs = jobs.filter(j => j.status === status); if (asset_id) jobs = jobs.filter(j => j.asset_id === asset_id); jobs.sort((a, b) => new Date(b.created_at || 0) - new Date(a.created_at || 0)); res.json(jobs); } catch (err) { next(err); } }); // ── GET /:id - Single job ───────────────────────────────────────────────────── router.get('/:id', async (req, res, next) => { try { const { id } = req.params; const colonIdx = id.indexOf(':'); const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null; const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id; for (const { queue, type } of QUEUES) { if (qType && type !== qType) continue; try { const job = await queue.getJob(bullId); if (job) { const state = await job.getState(); const apiStatus = STATE_MAP[state] || state; const normalized = normalizeJob(job, type, apiStatus); await attachAssetNames([normalized]); return res.json(normalized); } } catch { /* try next queue */ } } res.status(404).json({ error: 'Job not found' }); } catch (err) { next(err); } }); // ── POST /:id/retry - Retry a failed job ────────────────────────────────────── router.post('/:id/retry', async (req, res, next) => { try { const { id } = req.params; const colonIdx = id.indexOf(':'); const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null; const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id; for (const { queue, type } of QUEUES) { if (qType && type !== qType) continue; try { const job = await queue.getJob(bullId); if (job) { await job.retry(); return res.json({ id, status: 'queued' }); } } catch { /* try next queue */ } } res.status(404).json({ error: 'Job not found' }); } catch (err) { next(err); } }); // ── DELETE /:id - Remove a job ──────────────────────────────────────────────── router.delete('/:id', async (req, res, next) => { try { const { id } = req.params; const colonIdx = id.indexOf(':'); const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null; const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id; for (const { queue, type } of QUEUES) { if (qType && type !== qType) continue; try { const job = await queue.getJob(bullId); if (job) { await job.remove(); return res.json({ success: true }); } } catch { /* try next queue */ } } res.status(404).json({ error: 'Job not found' }); } catch (err) { next(err); } }); // ── POST /conform - Submit a conform (EDL export) job ──────────────────────── router.post('/conform', async (req, res, next) => { try { const { edl, project_id, output_format } = req.body; if (!edl || !project_id || !output_format) { return res.status(400).json({ error: 'edl, project_id, and output_format are required', }); } const bullJob = await conformQueue.add('conform-task', { edl, projectId: project_id, outputFormat: output_format, }); res.status(202).json({ id: `conform:${bullJob.id}`, status: 'queued' }); } catch (err) { next(err); } }); export default router;