diff --git a/services/worker/src/workers/conform.js b/services/worker/src/workers/conform.js index 661de4d..7de74b1 100644 --- a/services/worker/src/workers/conform.js +++ b/services/worker/src/workers/conform.js @@ -1,5 +1,5 @@ import { join } from 'path'; -import { unlink, writeFile, readFile } from 'fs/promises'; +import { unlink, writeFile, mkdir } from 'fs/promises'; import { tmpdir } from 'os'; import { query } from '../db/client.js'; import { downloadFromS3, uploadToS3 } from '../s3/client.js'; @@ -11,36 +11,35 @@ const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; export const conformWorker = async (job) => { const { jobId, edl, projectId, outputFormat } = job.data; - const tmpDir = tmpdir(); - const edlPath = join(tmpDir, `edl-${job.id}.edl`); - const segmentsDir = join(tmpDir, `segments-${job.id}`); + const tmpDir = tmpdir(); + const edlPath = join(tmpDir, `edl-${job.id}.edl`); + const segmentsDir = join(tmpDir, `segments-${job.id}`); const segmentListPath = join(tmpDir, `segments-${job.id}.txt`); - const outputPath = join(tmpDir, `output-${job.id}.${outputFormat || 'mov'}`); + const outputPath = join(tmpDir, `output-${job.id}.${outputFormat || 'mov'}`); try { // Write EDL to temp file await writeFile(edlPath, edl, 'utf-8'); // Parse EDL - job.updateProgress(5); + await job.updateProgress(5); console.log(`[conform] Parsing EDL for job ${jobId}`); const edits = parseEDL(edl); - // Create segments directory - await writeFile(segmentsDir, ''); + // Create temp directory for segment files + await mkdir(segmentsDir, { recursive: true }); - // Process each edit let processedEdits = 0; - const concatList = []; + const concatList = []; for (const edit of edits) { - job.updateProgress(Math.min(5 + (processedEdits / edits.length) * 50, 55)); + await job.updateProgress(Math.min(5 + (processedEdits / edits.length) * 50, 55)); console.log(`[conform] Processing edit ${edit.editNumber}: ${edit.reelName}`); - // Look up asset by reel name + // Look up asset by filename (the reel name in the EDL matches the clip filename) const assetRes = await query( - 'SELECT id, original_s3_key FROM assets WHERE original_filename = $1 LIMIT 1', + 'SELECT id, original_s3_key FROM assets WHERE filename = $1 LIMIT 1', [edit.reelName] ); @@ -49,78 +48,67 @@ export const conformWorker = async (job) => { } const { id: assetId, original_s3_key: sourceKey } = assetRes.rows[0]; - const segmentInputPath = join(tmpDir, `segment-${edit.editNumber}-input`); - const segmentOutputPath = join(tmpDir, `segment-${edit.editNumber}.mov`); + const segmentInputPath = join(segmentsDir, `segment-${edit.editNumber}-src`); + const segmentOutputPath = join(segmentsDir, `segment-${edit.editNumber}.mov`); - // Download source from S3 - console.log(`[conform] Downloading segment ${edit.editNumber} from S3`); + // Download source clip from S3 + console.log(`[conform] Downloading segment ${edit.editNumber} from S3 (${sourceKey})`); await downloadFromS3(S3_BUCKET, sourceKey, segmentInputPath); - // Trim segment - console.log(`[conform] Trimming segment ${edit.editNumber}: ${edit.sourceIn} -> ${edit.sourceOut}`); - await trimSegment( - segmentInputPath, - segmentOutputPath, - edit.sourceIn, - edit.sourceOut - ); + // Trim to EDL in/out points + console.log(`[conform] Trimming ${edit.editNumber}: ${edit.sourceIn} → ${edit.sourceOut}`); + await trimSegment(segmentInputPath, segmentOutputPath, edit.sourceIn, edit.sourceOut); concatList.push(segmentOutputPath); - // Cleanup input + // Remove the (large) source download immediately to conserve disk await unlink(segmentInputPath).catch(() => {}); processedEdits++; } - // Create concat file - job.updateProgress(60); - console.log(`[conform] Creating concat list for ${concatList.length} segments`); - const concatContent = concatList - .map(path => `file '${path}'`) - .join('\n'); + // Write ffmpeg concat file + await job.updateProgress(60); + console.log(`[conform] Writing concat list for ${concatList.length} segments`); + const concatContent = concatList.map(p => `file '${p}'`).join('\n'); await writeFile(segmentListPath, concatContent, 'utf-8'); - // Concatenate all segments - job.updateProgress(70); + // Concatenate + await job.updateProgress(70); console.log(`[conform] Concatenating segments for job ${jobId}`); await concatSegments(segmentListPath, outputPath); - // Upload final output to S3 - job.updateProgress(85); + // Upload to S3 + await job.updateProgress(85); const outputKey = `jobs/${jobId}/output.${outputFormat || 'mov'}`; - console.log(`[conform] Uploading final output to ${outputKey}`); + console.log(`[conform] Uploading output to ${outputKey}`); await uploadToS3(S3_BUCKET, outputKey, outputPath); - // Update job record - job.updateProgress(95); - console.log(`[conform] Updating job record ${jobId}`); + // Mark job complete + await job.updateProgress(95); await query( - 'UPDATE jobs SET status = $1, result = $2 WHERE id = $3', - ['complete', JSON.stringify({ output_key: outputKey }), jobId] + `UPDATE jobs SET status = 'complete', result = $1, updated_at = NOW() WHERE id = $2`, + [JSON.stringify({ output_key: outputKey }), jobId] ); - job.updateProgress(100); - console.log(`[conform] Job ${jobId} conform complete`); - + await job.updateProgress(100); + console.log(`[conform] Job ${jobId} complete`); return { jobId, outputKey }; + } catch (error) { - console.error(`[conform] Error processing job ${jobId}:`, error); + console.error(`[conform] Error in job ${jobId}:`, error); await query( - 'UPDATE jobs SET status = $1 WHERE id = $2', - ['error', jobId] + `UPDATE jobs SET status = 'failed', error = $1, updated_at = NOW() WHERE id = $2`, + [error.message, jobId] ); throw error; + } finally { - // Cleanup - try { - await Promise.all([ - unlink(edlPath).catch(() => {}), - unlink(segmentListPath).catch(() => {}), - unlink(outputPath).catch(() => {}), - ]); - } catch (err) { - console.error(`[conform] Cleanup error for job ${job.id}:`, err); - } + // Best-effort cleanup of temp files + await Promise.all([ + unlink(edlPath).catch(() => {}), + unlink(segmentListPath).catch(() => {}), + unlink(outputPath).catch(() => {}), + ]); } };