From 1ff7ff8d2ba4fe283971472f0bb8d3bf14b64ec4 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Tue, 7 Apr 2026 21:58:19 -0400 Subject: [PATCH] add services/worker/src/workers/conform.js --- services/worker/src/workers/conform.js | 126 +++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 services/worker/src/workers/conform.js diff --git a/services/worker/src/workers/conform.js b/services/worker/src/workers/conform.js new file mode 100644 index 0000000..661de4d --- /dev/null +++ b/services/worker/src/workers/conform.js @@ -0,0 +1,126 @@ +import { join } from 'path'; +import { unlink, writeFile, readFile } from 'fs/promises'; +import { tmpdir } from 'os'; +import { query } from '../db/client.js'; +import { downloadFromS3, uploadToS3 } from '../s3/client.js'; +import { trimSegment, concatSegments } from '../ffmpeg/executor.js'; +import { parseEDL } from '../edl/parser.js'; + +const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; + +export const conformWorker = async (job) => { + const { jobId, edl, projectId, outputFormat } = job.data; + + const tmpDir = tmpdir(); + const edlPath = join(tmpDir, `edl-${job.id}.edl`); + const segmentsDir = join(tmpDir, `segments-${job.id}`); + const segmentListPath = join(tmpDir, `segments-${job.id}.txt`); + const outputPath = join(tmpDir, `output-${job.id}.${outputFormat || 'mov'}`); + + try { + // Write EDL to temp file + await writeFile(edlPath, edl, 'utf-8'); + + // Parse EDL + job.updateProgress(5); + console.log(`[conform] Parsing EDL for job ${jobId}`); + const edits = parseEDL(edl); + + // Create segments directory + await writeFile(segmentsDir, ''); + + // Process each edit + let processedEdits = 0; + const concatList = []; + + for (const edit of edits) { + job.updateProgress(Math.min(5 + (processedEdits / edits.length) * 50, 55)); + + console.log(`[conform] Processing edit ${edit.editNumber}: ${edit.reelName}`); + + // Look up asset by reel name + const assetRes = await query( + 'SELECT id, original_s3_key FROM assets WHERE original_filename = $1 LIMIT 1', + [edit.reelName] + ); + + if (assetRes.rows.length === 0) { + throw new Error(`Asset not found for reel: ${edit.reelName}`); + } + + const { id: assetId, original_s3_key: sourceKey } = assetRes.rows[0]; + const segmentInputPath = join(tmpDir, `segment-${edit.editNumber}-input`); + const segmentOutputPath = join(tmpDir, `segment-${edit.editNumber}.mov`); + + // Download source from S3 + console.log(`[conform] Downloading segment ${edit.editNumber} from S3`); + await downloadFromS3(S3_BUCKET, sourceKey, segmentInputPath); + + // Trim segment + console.log(`[conform] Trimming segment ${edit.editNumber}: ${edit.sourceIn} -> ${edit.sourceOut}`); + await trimSegment( + segmentInputPath, + segmentOutputPath, + edit.sourceIn, + edit.sourceOut + ); + + concatList.push(segmentOutputPath); + + // Cleanup input + await unlink(segmentInputPath).catch(() => {}); + + processedEdits++; + } + + // Create concat file + job.updateProgress(60); + console.log(`[conform] Creating concat list for ${concatList.length} segments`); + const concatContent = concatList + .map(path => `file '${path}'`) + .join('\n'); + await writeFile(segmentListPath, concatContent, 'utf-8'); + + // Concatenate all segments + job.updateProgress(70); + console.log(`[conform] Concatenating segments for job ${jobId}`); + await concatSegments(segmentListPath, outputPath); + + // Upload final output to S3 + job.updateProgress(85); + const outputKey = `jobs/${jobId}/output.${outputFormat || 'mov'}`; + console.log(`[conform] Uploading final output to ${outputKey}`); + await uploadToS3(S3_BUCKET, outputKey, outputPath); + + // Update job record + job.updateProgress(95); + console.log(`[conform] Updating job record ${jobId}`); + await query( + 'UPDATE jobs SET status = $1, result = $2 WHERE id = $3', + ['complete', JSON.stringify({ output_key: outputKey }), jobId] + ); + + job.updateProgress(100); + console.log(`[conform] Job ${jobId} conform complete`); + + return { jobId, outputKey }; + } catch (error) { + console.error(`[conform] Error processing job ${jobId}:`, error); + await query( + 'UPDATE jobs SET status = $1 WHERE id = $2', + ['error', jobId] + ); + throw error; + } finally { + // Cleanup + try { + await Promise.all([ + unlink(edlPath).catch(() => {}), + unlink(segmentListPath).catch(() => {}), + unlink(outputPath).catch(() => {}), + ]); + } catch (err) { + console.error(`[conform] Cleanup error for job ${job.id}:`, err); + } + } +};