dragonflight/services/mam-api/src/routes/jobs.js
ZGaetano 17646c1155 fix(jobs): read from BullMQ queues instead of empty DB table
GET /api/v1/jobs now queries the proxy, thumbnail, and conform BullMQ
queues directly and returns normalized job objects with id, type,
status, progress, asset_id, timestamps, and error fields.

Also adds DELETE /:id to remove completed/failed jobs from the queue,
supporting the clearCompleted action in jobs.html.

The PostgreSQL jobs table is still used only for conform job creation
(POST /conform) to preserve that workflow.
2026-05-16 17:38:53 -04:00

180 lines
6.1 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import express from 'express';
import pool from '../db/pool.js';
import { requireAuth } from '../middleware/auth.js';
import { Queue } from 'bullmq';
import { v4 as uuidv4 } from 'uuid';
const router = express.Router();
router.use(requireAuth);
// ── Redis connection ──────────────────────────────────────────────────────────
const parseRedisUrl = (url) => {
try {
const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
} catch {
return { host: 'localhost', port: 6379 };
}
};
const redisConn = parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379');
const proxyQueue = new Queue('proxy', { connection: redisConn });
const thumbnailQueue = new Queue('thumbnail', { connection: redisConn });
const conformQueue = new Queue('conform', { connection: redisConn });
const QUEUES = [
{ queue: proxyQueue, type: 'proxy' },
{ queue: thumbnailQueue, type: 'thumbnail' },
{ queue: conformQueue, type: 'conform' },
];
// BullMQ state → API status mapping
const STATE_MAP = {
waiting: 'waiting',
active: 'active',
completed:'completed',
failed: 'failed',
delayed: 'waiting',
paused: 'waiting',
};
function normalizeJob(bullJob, type, apiStatus) {
const isCompleted = apiStatus === 'completed';
const isFailed = apiStatus === 'failed';
return {
id: `${type}:${bullJob.id}`,
type,
status: apiStatus,
progress: typeof bullJob.progress === 'number' ? bullJob.progress : 0,
asset_id: bullJob.data?.assetId || null,
asset_name: bullJob.data?.assetName || null,
created_at: bullJob.timestamp ? new Date(bullJob.timestamp).toISOString() : null,
started_at: bullJob.processedOn ? new Date(bullJob.processedOn).toISOString() : null,
completed_at: isCompleted && bullJob.finishedOn ? new Date(bullJob.finishedOn).toISOString() : null,
failed_at: isFailed && bullJob.finishedOn ? new Date(bullJob.finishedOn).toISOString() : null,
error: bullJob.failedReason || null,
metadata: bullJob.data || {},
};
}
async function getAllBullMQJobs() {
const results = [];
for (const { queue, type } of QUEUES) {
for (const [bullState, apiStatus] of Object.entries(STATE_MAP)) {
try {
const jobs = await queue.getJobs([bullState], 0, 200);
for (const job of jobs) {
results.push(normalizeJob(job, type, apiStatus));
}
} catch {
// queue may be empty or unavailable for this state skip
}
}
}
return results;
}
// ── GET / - List jobs (BullMQ queues) ────────────────────────────────────────
router.get('/', async (req, res, next) => {
try {
const { type, status, asset_id } = req.query;
let jobs = await getAllBullMQJobs();
if (type) jobs = jobs.filter(j => j.type === type);
if (status) jobs = jobs.filter(j => j.status === status);
if (asset_id) jobs = jobs.filter(j => j.asset_id === asset_id);
jobs.sort((a, b) => new Date(b.created_at || 0) - new Date(a.created_at || 0));
res.json(jobs);
} catch (err) {
next(err);
}
});
// ── GET /:id - Single job ─────────────────────────────────────────────────────
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
// id format: "type:bullId" e.g. "proxy:1"
const colonIdx = id.indexOf(':');
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
for (const { queue, type } of QUEUES) {
if (qType && type !== qType) continue;
try {
const job = await queue.getJob(bullId);
if (job) {
const state = await job.getState();
const apiStatus = STATE_MAP[state] || state;
return res.json(normalizeJob(job, type, apiStatus));
}
} catch { /* try next queue */ }
}
res.status(404).json({ error: 'Job not found' });
} catch (err) {
next(err);
}
});
// ── DELETE /:id - Remove a job ────────────────────────────────────────────────
router.delete('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const colonIdx = id.indexOf(':');
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
for (const { queue, type } of QUEUES) {
if (qType && type !== qType) continue;
try {
const job = await queue.getJob(bullId);
if (job) {
await job.remove();
return res.json({ success: true });
}
} catch { /* try next queue */ }
}
res.status(404).json({ error: 'Job not found' });
} catch (err) {
next(err);
}
});
// ── POST /conform - Submit a conform job ──────────────────────────────────────
router.post('/conform', async (req, res, next) => {
try {
const { edl, project_id, output_format } = req.body;
if (!edl || !project_id || !output_format) {
return res.status(400).json({
error: 'edl, project_id, and output_format are required',
});
}
const jobId = uuidv4();
const result = await pool.query(
`INSERT INTO jobs (id, type, status, project_id, metadata, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
RETURNING *`,
[jobId, 'conform', 'pending', project_id, JSON.stringify({ edl, output_format })]
);
const job = result.rows[0];
await conformQueue.add('conform-task', {
jobId,
edl,
projectId: project_id,
outputFormat: output_format,
});
res.status(201).json(job);
} catch (err) {
next(err);
}
});
export default router;