import { join } from 'path'; import { unlink, writeFile, readFile } from 'fs/promises'; import { tmpdir } from 'os'; import { query } from '../db/client.js'; import { downloadFromS3, uploadToS3 } from '../s3/client.js'; import { trimSegment, concatSegments } from '../ffmpeg/executor.js'; import { parseEDL } from '../edl/parser.js'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; export const conformWorker = async (job) => { const { jobId, edl, projectId, outputFormat } = job.data; const tmpDir = tmpdir(); const edlPath = join(tmpDir, `edl-${job.id}.edl`); const segmentsDir = join(tmpDir, `segments-${job.id}`); const segmentListPath = join(tmpDir, `segments-${job.id}.txt`); const outputPath = join(tmpDir, `output-${job.id}.${outputFormat || 'mov'}`); try { // Write EDL to temp file await writeFile(edlPath, edl, 'utf-8'); // Parse EDL job.updateProgress(5); console.log(`[conform] Parsing EDL for job ${jobId}`); const edits = parseEDL(edl); // Create segments directory await writeFile(segmentsDir, ''); // Process each edit let processedEdits = 0; const concatList = []; for (const edit of edits) { job.updateProgress(Math.min(5 + (processedEdits / edits.length) * 50, 55)); console.log(`[conform] Processing edit ${edit.editNumber}: ${edit.reelName}`); // Look up asset by reel name const assetRes = await query( 'SELECT id, original_s3_key FROM assets WHERE original_filename = $1 LIMIT 1', [edit.reelName] ); if (assetRes.rows.length === 0) { throw new Error(`Asset not found for reel: ${edit.reelName}`); } const { id: assetId, original_s3_key: sourceKey } = assetRes.rows[0]; const segmentInputPath = join(tmpDir, `segment-${edit.editNumber}-input`); const segmentOutputPath = join(tmpDir, `segment-${edit.editNumber}.mov`); // Download source from S3 console.log(`[conform] Downloading segment ${edit.editNumber} from S3`); await downloadFromS3(S3_BUCKET, sourceKey, segmentInputPath); // Trim segment console.log(`[conform] Trimming segment ${edit.editNumber}: ${edit.sourceIn} -> ${edit.sourceOut}`); await trimSegment( segmentInputPath, segmentOutputPath, edit.sourceIn, edit.sourceOut ); concatList.push(segmentOutputPath); // Cleanup input await unlink(segmentInputPath).catch(() => {}); processedEdits++; } // Create concat file job.updateProgress(60); console.log(`[conform] Creating concat list for ${concatList.length} segments`); const concatContent = concatList .map(path => `file '${path}'`) .join('\n'); await writeFile(segmentListPath, concatContent, 'utf-8'); // Concatenate all segments job.updateProgress(70); console.log(`[conform] Concatenating segments for job ${jobId}`); await concatSegments(segmentListPath, outputPath); // Upload final output to S3 job.updateProgress(85); const outputKey = `jobs/${jobId}/output.${outputFormat || 'mov'}`; console.log(`[conform] Uploading final output to ${outputKey}`); await uploadToS3(S3_BUCKET, outputKey, outputPath); // Update job record job.updateProgress(95); console.log(`[conform] Updating job record ${jobId}`); await query( 'UPDATE jobs SET status = $1, result = $2 WHERE id = $3', ['complete', JSON.stringify({ output_key: outputKey }), jobId] ); job.updateProgress(100); console.log(`[conform] Job ${jobId} conform complete`); return { jobId, outputKey }; } catch (error) { console.error(`[conform] Error processing job ${jobId}:`, error); await query( 'UPDATE jobs SET status = $1 WHERE id = $2', ['error', jobId] ); throw error; } finally { // Cleanup try { await Promise.all([ unlink(edlPath).catch(() => {}), unlink(segmentListPath).catch(() => {}), unlink(outputPath).catch(() => {}), ]); } catch (err) { console.error(`[conform] Cleanup error for job ${job.id}:`, err); } } };