diff --git a/services/worker/src/workers/proxy.js b/services/worker/src/workers/proxy.js new file mode 100644 index 0000000..543d39a --- /dev/null +++ b/services/worker/src/workers/proxy.js @@ -0,0 +1,69 @@ +import { join } from 'path'; +import { unlink } from 'fs/promises'; +import { tmpdir } from 'os'; +import { query } from '../db/client.js'; +import { downloadFromS3, uploadToS3 } from '../s3/client.js'; +import { transcodeVideo } from '../ffmpeg/executor.js'; + +const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; + +export const proxyWorker = async (job) => { + const { assetId, inputKey, outputKey } = job.data; + + const tmpDir = tmpdir(); + const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`); + const outputPath = join(tmpDir, `proxy-output-${job.id}.mp4`); + + try { + // Download from S3 + job.updateProgress(10); + console.log(`[proxy] Downloading ${inputKey} for asset ${assetId}`); + await downloadFromS3(S3_BUCKET, inputKey, inputPath); + + // Transcode + job.updateProgress(30); + console.log(`[proxy] Transcoding asset ${assetId}`); + await transcodeVideo(inputPath, outputPath, { + videoCodec: 'libx264', + videoPreset: 'fast', + videoBitrate: '10M', + audioCodec: 'aac', + audioBitrate: '192k', + }); + + // Upload to S3 + job.updateProgress(70); + console.log(`[proxy] Uploading to ${outputKey}`); + await uploadToS3(S3_BUCKET, outputKey, outputPath); + + // Update database + job.updateProgress(90); + console.log(`[proxy] Updating asset record for ${assetId}`); + await query( + 'UPDATE assets SET proxy_s3_key = $1, status = $2 WHERE id = $3', + [outputKey, 'ready', assetId] + ); + + job.updateProgress(100); + console.log(`[proxy] Asset ${assetId} proxy complete`); + + return { assetId, outputKey }; + } catch (error) { + console.error(`[proxy] Error processing asset ${assetId}:`, error); + await query( + 'UPDATE assets SET status = $1 WHERE id = $2', + ['error', assetId] + ); + throw error; + } finally { + // Cleanup + try { + await Promise.all([ + unlink(inputPath).catch(() => {}), + unlink(outputPath).catch(() => {}), + ]); + } catch (err) { + console.error(`[proxy] Cleanup error for job ${job.id}:`, err); + } + } +};