import { join } from 'path'; import { unlink, writeFile, mkdir } from 'fs/promises'; import { tmpdir } from 'os'; import { query } from '../db/client.js'; import { downloadFromS3, uploadToS3 } from '../s3/client.js'; import { trimSegment, concatSegments } from '../ffmpeg/executor.js'; import { parseEDL } from '../edl/parser.js'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; export const conformWorker = async (job) => { const { jobId, edl, projectId, outputFormat } = job.data; const tmpDir = tmpdir(); const edlPath = join(tmpDir, `edl-${job.id}.edl`); const segmentsDir = join(tmpDir, `segments-${job.id}`); const segmentListPath = join(tmpDir, `segments-${job.id}.txt`); const outputPath = join(tmpDir, `output-${job.id}.${outputFormat || 'mov'}`); try { // Write EDL to temp file await writeFile(edlPath, edl, 'utf-8'); // Parse EDL await job.updateProgress(5); console.log(`[conform] Parsing EDL for job ${jobId}`); const edits = parseEDL(edl); // Create temp directory for segment files await mkdir(segmentsDir, { recursive: true }); let processedEdits = 0; const concatList = []; for (const edit of edits) { await job.updateProgress(Math.min(5 + (processedEdits / edits.length) * 50, 55)); console.log(`[conform] Processing edit ${edit.editNumber}: ${edit.reelName}`); // Look up asset by filename (the reel name in the EDL matches the clip filename) const assetRes = await query( 'SELECT id, original_s3_key FROM assets WHERE filename = $1 LIMIT 1', [edit.reelName] ); if (assetRes.rows.length === 0) { throw new Error(`Asset not found for reel: ${edit.reelName}`); } const { id: assetId, original_s3_key: sourceKey } = assetRes.rows[0]; const segmentInputPath = join(segmentsDir, `segment-${edit.editNumber}-src`); const segmentOutputPath = join(segmentsDir, `segment-${edit.editNumber}.mov`); // Download source clip from S3 console.log(`[conform] Downloading segment ${edit.editNumber} from S3 (${sourceKey})`); await downloadFromS3(S3_BUCKET, sourceKey, segmentInputPath); // Trim to EDL in/out points console.log(`[conform] Trimming ${edit.editNumber}: ${edit.sourceIn} → ${edit.sourceOut}`); await trimSegment(segmentInputPath, segmentOutputPath, edit.sourceIn, edit.sourceOut); concatList.push(segmentOutputPath); // Remove the (large) source download immediately to conserve disk await unlink(segmentInputPath).catch(() => {}); processedEdits++; } // Write ffmpeg concat file await job.updateProgress(60); console.log(`[conform] Writing concat list for ${concatList.length} segments`); const concatContent = concatList.map(p => `file '${p}'`).join('\n'); await writeFile(segmentListPath, concatContent, 'utf-8'); // Concatenate await job.updateProgress(70); console.log(`[conform] Concatenating segments for job ${jobId}`); await concatSegments(segmentListPath, outputPath); // Upload to S3 await job.updateProgress(85); const outputKey = `jobs/${jobId}/output.${outputFormat || 'mov'}`; console.log(`[conform] Uploading output to ${outputKey}`); await uploadToS3(S3_BUCKET, outputKey, outputPath); // Mark job complete await job.updateProgress(95); await query( `UPDATE jobs SET status = 'complete', result = $1, updated_at = NOW() WHERE id = $2`, [JSON.stringify({ output_key: outputKey }), jobId] ); await job.updateProgress(100); console.log(`[conform] Job ${jobId} complete`); return { jobId, outputKey }; } catch (error) { console.error(`[conform] Error in job ${jobId}:`, error); await query( `UPDATE jobs SET status = 'failed', error = $1, updated_at = NOW() WHERE id = $2`, [error.message, jobId] ); throw error; } finally { // Best-effort cleanup of temp files await Promise.all([ unlink(edlPath).catch(() => {}), unlink(segmentListPath).catch(() => {}), unlink(outputPath).catch(() => {}), ]); } };