feat: add POST /jobs/:id/retry endpoint for re-queuing failed BullMQ jobs

This commit is contained in:
Zac Gaetano 2026-05-22 12:18:53 -04:00
parent fea0f2962b
commit ddb4cf0c51

View file

@ -67,7 +67,6 @@ async function getAllBullMQJobs() {
for (const bucket of STATE_BUCKETS) { for (const bucket of STATE_BUCKETS) {
try { try {
const apiStatus = STATE_MAP[bucket] || bucket; const apiStatus = STATE_MAP[bucket] || bucket;
// getJobs([state], start, end) returns at most 200 per bucket
const jobs = await queue.getJobs([bucket], 0, 200); const jobs = await queue.getJobs([bucket], 0, 200);
for (const job of jobs) { for (const job of jobs) {
results.push(normalizeJob(job, type, apiStatus)); results.push(normalizeJob(job, type, apiStatus));
@ -81,15 +80,11 @@ async function getAllBullMQJobs() {
} }
// ── GET /events Server-Sent Events stream of live job updates ─────────────── // ── GET /events Server-Sent Events stream of live job updates ───────────────
//
// Must be declared BEFORE GET /:id so the literal path "events" isn't treated
// as a job-id parameter.
//
router.get('/events', async (req, res) => { router.get('/events', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // disable nginx proxy buffering res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders(); res.flushHeaders();
let closed = false; let closed = false;
@ -106,7 +101,6 @@ router.get('/events', async (req, res) => {
if (!closed) setTimeout(push, 2000); if (!closed) setTimeout(push, 2000);
}; };
// Send initial snapshot immediately, then every 2 s
await push(); await push();
}); });
@ -131,7 +125,6 @@ router.get('/', async (req, res, next) => {
router.get('/:id', async (req, res, next) => { router.get('/:id', async (req, res, next) => {
try { try {
const { id } = req.params; const { id } = req.params;
// id format: "type:bullId" e.g. "proxy:1"
const colonIdx = id.indexOf(':'); const colonIdx = id.indexOf(':');
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null; const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id; const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
@ -153,6 +146,30 @@ router.get('/:id', async (req, res, next) => {
} }
}); });
// ── POST /:id/retry - Retry a failed job ──────────────────────────────────────
router.post('/:id/retry', async (req, res, next) => {
try {
const { id } = req.params;
const colonIdx = id.indexOf(':');
const qType = colonIdx > -1 ? id.slice(0, colonIdx) : null;
const bullId = colonIdx > -1 ? id.slice(colonIdx + 1) : id;
for (const { queue, type } of QUEUES) {
if (qType && type !== qType) continue;
try {
const job = await queue.getJob(bullId);
if (job) {
await job.retry();
return res.json({ id, status: 'queued' });
}
} catch { /* try next queue */ }
}
res.status(404).json({ error: 'Job not found' });
} catch (err) {
next(err);
}
});
// ── DELETE /:id - Remove a job ──────────────────────────────────────────────── // ── DELETE /:id - Remove a job ────────────────────────────────────────────────
router.delete('/:id', async (req, res, next) => { router.delete('/:id', async (req, res, next) => {
try { try {