The POST /conform route was inserting into the jobs table with non-existent
columns (project_id, metadata) and an invalid enum value ('pending'). Since
GET /jobs reads entirely from BullMQ, the DB insert was both incorrect and
redundant. Now we just enqueue the BullMQ job and return its ID.
168 lines
5.8 KiB
JavaScript
168 lines
5.8 KiB
JavaScript
import express from 'express';
|
||
import pool from '../db/pool.js';
|
||
import { requireAuth } from '../middleware/auth.js';
|
||
import { Queue } from 'bullmq';
|
||
|
||
const router = express.Router();
|
||
router.use(requireAuth);
|
||
|
||
// ── Redis connection ──────────────────────────────────────────────────────────
|
||
const parseRedisUrl = (url) => {
|
||
try {
|
||
const parsed = new URL(url);
|
||
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
|
||
} catch {
|
||
return { host: 'localhost', port: 6379 };
|
||
}
|
||
};
|
||
|
||
const redisConn = parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379');
|
||
|
||
const proxyQueue = new Queue('proxy', { connection: redisConn });
|
||
const thumbnailQueue = new Queue('thumbnail', { connection: redisConn });
|
||
const conformQueue = new Queue('conform', { connection: redisConn });
|
||
|
||
const QUEUES = [
|
||
{ queue: proxyQueue, type: 'proxy' },
|
||
{ queue: thumbnailQueue, type: 'thumbnail' },
|
||
{ queue: conformQueue, type: 'conform' },
|
||
];
|
||
|
||
// BullMQ state → API status mapping
|
||
const STATE_MAP = {
|
||
waiting: 'waiting',
|
||
active: 'active',
|
||
completed:'completed',
|
||
failed: 'failed',
|
||
delayed: 'waiting',
|
||
paused: 'waiting',
|
||
};
|
||
|
||
function normalizeJob(bullJob, type, apiStatus) {
|
||
const isCompleted = apiStatus === 'completed';
|
||
const isFailed = apiStatus === 'failed';
|
||
return {
|
||
id: `${type}:${bullJob.id}`,
|
||
type,
|
||
status: apiStatus,
|
||
progress: typeof bullJob.progress === 'number' ? bullJob.progress : 0,
|
||
asset_id: bullJob.data?.assetId || null,
|
||
asset_name: bullJob.data?.assetName || null,
|
||
created_at: bullJob.timestamp ? new Date(bullJob.timestamp).toISOString() : null,
|
||
started_at: bullJob.processedOn ? new Date(bullJob.processedOn).toISOString() : null,
|
||
completed_at: isCompleted && bullJob.finishedOn ? new Date(bullJob.finishedOn).toISOString() : null,
|
||
failed_at: isFailed && bullJob.finishedOn ? new Date(bullJob.finishedOn).toISOString() : null,
|
||
error: bullJob.failedReason || null,
|
||
metadata: bullJob.data || {},
|
||
};
|
||
}
|
||
|
||
async function getAllBullMQJobs() {
|
||
const results = [];
|
||
const allStates = ['waiting', 'active', 'completed', 'failed', 'delayed', 'paused'];
|
||
for (const { queue, type } of QUEUES) {
|
||
try {
|
||
const jobs = await queue.getJobs(allStates, 0, 200);
|
||
for (const job of jobs) {
|
||
const state = await job.getState();
|
||
const apiStatus = STATE_MAP[state] || state;
|
||
results.push(normalizeJob(job, type, apiStatus));
|
||
}
|
||
} catch {
|
||
// queue may be unavailable – skip
|
||
}
|
||
}
|
||
return results;
|
||
}
|
||
|
||
// ── GET / - List jobs (BullMQ queues) ────────────────────────────────────────
|
||
router.get('/', async (req, res, next) => {
|
||
try {
|
||
const { type, status, asset_id } = req.query;
|
||
let jobs = await getAllBullMQJobs();
|
||
|
||
if (type) jobs = jobs.filter(j => j.type === type);
|
||
if (status) jobs = jobs.filter(j => j.status === status);
|
||
if (asset_id) jobs = jobs.filter(j => j.asset_id === asset_id);
|
||
|
||
jobs.sort((a, b) => new Date(b.created_at || 0) - new Date(a.created_at || 0));
|
||
res.json(jobs);
|
||
} catch (err) {
|
||
next(err);
|
||
}
|
||
});
|
||
|
||
// ── GET /:id - Single job ─────────────────────────────────────────────────────
|
||
router.get('/:id', async (req, res, next) => {
|
||
try {
|
||
const { id } = req.params;
|
||
// id format: "type:bullId" e.g. "proxy:1"
|
||
const colonIdx = id.indexOf(':');
|
||
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
|
||
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
|
||
|
||
for (const { queue, type } of QUEUES) {
|
||
if (qType && type !== qType) continue;
|
||
try {
|
||
const job = await queue.getJob(bullId);
|
||
if (job) {
|
||
const state = await job.getState();
|
||
const apiStatus = STATE_MAP[state] || state;
|
||
return res.json(normalizeJob(job, type, apiStatus));
|
||
}
|
||
} catch { /* try next queue */ }
|
||
}
|
||
res.status(404).json({ error: 'Job not found' });
|
||
} catch (err) {
|
||
next(err);
|
||
}
|
||
});
|
||
|
||
// ── DELETE /:id - Remove a job ────────────────────────────────────────────────
|
||
router.delete('/:id', async (req, res, next) => {
|
||
try {
|
||
const { id } = req.params;
|
||
const colonIdx = id.indexOf(':');
|
||
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
|
||
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
|
||
|
||
for (const { queue, type } of QUEUES) {
|
||
if (qType && type !== qType) continue;
|
||
try {
|
||
const job = await queue.getJob(bullId);
|
||
if (job) {
|
||
await job.remove();
|
||
return res.json({ success: true });
|
||
}
|
||
} catch { /* try next queue */ }
|
||
}
|
||
res.status(404).json({ error: 'Job not found' });
|
||
} catch (err) {
|
||
next(err);
|
||
}
|
||
});
|
||
|
||
// ── POST /conform - Submit a conform (EDL export) job ────────────────────────
|
||
router.post('/conform', async (req, res, next) => {
|
||
try {
|
||
const { edl, project_id, output_format } = req.body;
|
||
|
||
if (!edl || !project_id || !output_format) {
|
||
return res.status(400).json({
|
||
error: 'edl, project_id, and output_format are required',
|
||
});
|
||
}
|
||
|
||
const bullJob = await conformQueue.add('conform-task', {
|
||
edl,
|
||
projectId: project_id,
|
||
outputFormat: output_format,
|
||
});
|
||
|
||
res.status(202).json({ id: `conform:${bullJob.id}`, status: 'queued' });
|
||
} catch (err) {
|
||
next(err);
|
||
}
|
||
});
|
||
|
||
export default router;
|