import { join } from 'path'; import { unlink } from 'fs/promises' import { tmpdir } from 'os'; import { Queue } from 'bullmq'; import { query } from '../db/client.js'; import { downloadFromS3, uploadToS3 } from '../s3/client.js'; import { transcodeVideo } from '../ffmpeg/executor.js'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; // Dispatch thumbnail job once proxy is ready — same Redis connection as the worker const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; }; const thumbnailQueue = new Queue('thumbnail', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); export const proxyWorker = async (job) => { const { assetId, inputKey, outputKey } = job.data; const tmpDir = tmpdir(); const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`); const outputPath = join(tmpDir, `proxy-output-${job.id}.mp4`); try { // Download original from S3 await job.updateProgress(10); console.log(`[proxy] Downloading ${inputKey} for asset ${assetId}`); await downloadFromS3(S3_BUCKET, inputKey, inputPath); // Transcode to H.264 proxy await job.updateProgress(30); console.log(`[proxy] Transcoding asset ${assetId}`); await transcodeVideo(inputPath, outputPath, { videoCodec: 'libx264', videoPreset: 'fast', videoBitrate: '10M', audioCodec: 'aac', audioBitrate: '192k', }); // Upload proxy to S3 await job.updateProgress(70); console.log(`[proxy] Uploading to ${outputKey}`); await uploadToS3(S3_BUCKET, outputKey, outputPath); // Update asset record — proxy ready, status stays processing until thumbnail done await job.updateProgress(90); await query( `UPDATE assets SET proxy_s3_key = $1, updated_at = NOW() WHERE id = $2`, [outputKey, assetId] ); // Now proxy exists in S3 — safe to queue thumbnail generation const thumbnailKey = `thumbnails/${assetId}.jpg`; await thumbnailQueue.add('generate', { assetId, proxyKey: outputKey, outputKey: thumbnailKey, }); console.log(`[proxy] Asset ${assetId} proxy complete, thumbnail job queued`); await job.updateProgress(100); return { assetId, outputKey }; } catch (error) { console.error(`[proxy] Error processing asset ${assetId}:`, error); await query( `UPDATE assets SET status = 'error', updated_at = NOW() WHERE id = $1`, [assetId] ); throw error; } finally { await Promise.all([ unlink(inputPath).catch(() => {}), unlink(outputPath).catch(() => {}), ]); } };