From e289554e442b12b55f3ca31c20014db47f2238a1 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Tue, 26 May 2026 07:35:28 -0400 Subject: [PATCH] fix(trim): update jobs table status on complete/fail (issue #94 bug 2) --- services/worker/src/workers/trimWorker.js | 44 +++++++++++++++++++---- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/services/worker/src/workers/trimWorker.js b/services/worker/src/workers/trimWorker.js index 77178f2..f30640c 100644 --- a/services/worker/src/workers/trimWorker.js +++ b/services/worker/src/workers/trimWorker.js @@ -9,11 +9,13 @@ const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; export const trimWorker = async (job) => { const { clipInstanceId, assetId, sourceInFrames, sourceOutFrames } = job.data; - const jobId = job.id; + // jobId in job.data is the parent batch-trim job ID (from the jobs table) + const { jobId } = job.data; + const bullJobId = job.id; const tmpDir = tmpdir(); - const downloadPath = join(tmpDir, `trim-${jobId}-src`); - const outputPath = join(tmpDir, `trim-${jobId}.mov`); + const downloadPath = join(tmpDir, `trim-${bullJobId}-src`); + const outputPath = join(tmpDir, `trim-${bullJobId}.mov`); try { const assetRes = await query( @@ -43,17 +45,47 @@ export const trimWorker = async (job) => { await job.updateProgress(85); await query( `INSERT INTO temp_segments (clip_instance_id, s3_key, expires_at) - VALUES ($1, $2, NOW() + INTERVAL '24 hours')`, + VALUES ($1, $2, NOW() + INTERVAL '24 hours') + ON CONFLICT (clip_instance_id) DO UPDATE + SET s3_key = EXCLUDED.s3_key, expires_at = EXCLUDED.expires_at`, [clipInstanceId, s3Key] ); + // BUG FIX #2: Update the parent batch-trim job status to 'completed' once + // all clips are done. We check whether every clip now has an s3_key; if so + // we flip the jobs row to 'completed'. Using a single UPDATE avoids a + // race between concurrent clip workers. + if (jobId) { + await query( + `UPDATE jobs + SET status = 'completed', updated_at = NOW() + WHERE id = $1 + AND status NOT IN ('failed', 'completed') + AND NOT EXISTS ( + SELECT 1 FROM temp_segments + WHERE job_id = $1 AND (s3_key IS NULL OR s3_key = '') + )`, + [jobId] + ).catch(e => console.error('[trim] Failed to update job status:', e.message)); + } + await job.updateProgress(100); - console.log(`[trim] Job ${jobId} complete for clip ${clipInstanceId}`); + console.log(`[trim] BullMQ job ${bullJobId} complete for clip ${clipInstanceId}`); return { clipInstanceId, s3Key }; } catch (error) { - console.error(`[trim] Error in job ${jobId}:`, error); + console.error(`[trim] Error in BullMQ job ${bullJobId}:`, error); + + // BUG FIX #2 (cont): Mark the parent jobs row as 'failed' so the + // trim-status endpoint reflects the true state. + if (jobId) { + await query( + `UPDATE jobs SET status = 'failed', updated_at = NOW() WHERE id = $1`, + [jobId] + ).catch(e => console.error('[trim] Failed to mark job failed:', e.message)); + } + throw error; } finally { await Promise.all([