import { join } from 'path'; import { tmpdir } from 'os'; import { unlink } from 'fs/promises'; import { query } from '../db/client.js'; import { downloadFromS3, uploadToS3 } from '../s3/client.js'; import { trimSegment } from '../ffmpeg/executor.js'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; export const trimWorker = async (job) => { const { clipInstanceId, assetId, sourceInFrames, sourceOutFrames } = job.data; // jobId in job.data is the parent batch-trim job ID (from the jobs table) const { jobId } = job.data; const bullJobId = job.id; const tmpDir = tmpdir(); const downloadPath = join(tmpDir, `trim-${bullJobId}-src`); const outputPath = join(tmpDir, `trim-${bullJobId}.mov`); try { const assetRes = await query( 'SELECT original_s3_key FROM assets WHERE id = $1 LIMIT 1', [assetId] ); if (assetRes.rows.length === 0) { throw new Error(`Asset not found: ${assetId}`); } const { original_s3_key: sourceKey } = assetRes.rows[0]; await job.updateProgress(10); console.log(`[trim] Downloading asset ${assetId} from S3 (${sourceKey})`); await downloadFromS3(S3_BUCKET, sourceKey, downloadPath); await job.updateProgress(40); console.log(`[trim] Trimming frames ${sourceInFrames} → ${sourceOutFrames}`); await trimSegment(downloadPath, outputPath, sourceInFrames, sourceOutFrames); await job.updateProgress(70); const s3Key = `temp-segments/${clipInstanceId}.mov`; console.log(`[trim] Uploading trimmed segment to ${s3Key}`); await uploadToS3(S3_BUCKET, s3Key, outputPath); await job.updateProgress(85); await query( `INSERT INTO temp_segments (clip_instance_id, s3_key, expires_at) VALUES ($1, $2, NOW() + INTERVAL '24 hours') ON CONFLICT (clip_instance_id) DO UPDATE SET s3_key = EXCLUDED.s3_key, expires_at = EXCLUDED.expires_at`, [clipInstanceId, s3Key] ); // BUG FIX #2: Update the parent batch-trim job status to 'completed' once // all clips are done. We check whether every clip now has an s3_key; if so // we flip the jobs row to 'completed'. Using a single UPDATE avoids a // race between concurrent clip workers. if (jobId) { await query( `UPDATE jobs SET status = 'completed', updated_at = NOW() WHERE id = $1 AND status NOT IN ('failed', 'completed') AND NOT EXISTS ( SELECT 1 FROM temp_segments WHERE job_id = $1 AND (s3_key IS NULL OR s3_key = '') )`, [jobId] ).catch(e => console.error('[trim] Failed to update job status:', e.message)); } await job.updateProgress(100); console.log(`[trim] BullMQ job ${bullJobId} complete for clip ${clipInstanceId}`); return { clipInstanceId, s3Key }; } catch (error) { console.error(`[trim] Error in BullMQ job ${bullJobId}:`, error); // BUG FIX #2 (cont): Mark the parent jobs row as 'failed' so the // trim-status endpoint reflects the true state. if (jobId) { await query( `UPDATE jobs SET status = 'failed', updated_at = NOW() WHERE id = $1`, [jobId] ).catch(e => console.error('[trim] Failed to mark job failed:', e.message)); } throw error; } finally { await Promise.all([ unlink(downloadPath).catch(() => {}), unlink(outputPath).catch(() => {}), ]); } };