From 10949bc460d0a73d1a3a2056b2672de3f334d7e8 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Fri, 15 May 2026 21:26:16 -0400 Subject: [PATCH] fix: dispatch thumbnail job after proxy completes instead of racing from upload route --- services/worker/src/workers/proxy.js | 61 +++++++++++++++++----------- 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/services/worker/src/workers/proxy.js b/services/worker/src/workers/proxy.js index 543d39a..25fb144 100644 --- a/services/worker/src/workers/proxy.js +++ b/services/worker/src/workers/proxy.js @@ -1,69 +1,82 @@ import { join } from 'path'; -import { unlink } from 'fs/promises'; +import { unlink } from 'fs/promises' import { tmpdir } from 'os'; +import { Queue } from 'bullmq'; import { query } from '../db/client.js'; import { downloadFromS3, uploadToS3 } from '../s3/client.js'; import { transcodeVideo } from '../ffmpeg/executor.js'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; +// Dispatch thumbnail job once proxy is ready — same Redis connection as the worker +const parseRedisUrl = (url) => { + const parsed = new URL(url); + return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; +}; + +const thumbnailQueue = new Queue('thumbnail', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); + export const proxyWorker = async (job) => { const { assetId, inputKey, outputKey } = job.data; const tmpDir = tmpdir(); - const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`); + const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`); const outputPath = join(tmpDir, `proxy-output-${job.id}.mp4`); try { - // Download from S3 + // Download original from S3 job.updateProgress(10); console.log(`[proxy] Downloading ${inputKey} for asset ${assetId}`); await downloadFromS3(S3_BUCKET, inputKey, inputPath); - // Transcode + // Transcode to H.264 proxy job.updateProgress(30); console.log(`[proxy] Transcoding asset ${assetId}`); await transcodeVideo(inputPath, outputPath, { - videoCodec: 'libx264', - videoPreset: 'fast', + videoCodec: 'libx264', + videoPreset: 'fast', videoBitrate: '10M', - audioCodec: 'aac', + audioCodec: 'aac', audioBitrate: '192k', }); - // Upload to S3 + // Upload proxy to S3 job.updateProgress(70); console.log(`[proxy] Uploading to ${outputKey}`); await uploadToS3(S3_BUCKET, outputKey, outputPath); - // Update database + // Update asset record — proxy ready, status processing until thumbnail done job.updateProgress(90); - console.log(`[proxy] Updating asset record for ${assetId}`); await query( - 'UPDATE assets SET proxy_s3_key = $1, status = $2 WHERE id = $3', - [outputKey, 'ready', assetId] + `UPDATE assets SET proxy_s3_key = $1, updated_at = NOW() WHERE id = $2`, + [outputKey, assetId] ); + // Now proxy exists in S3 — safe to queue thumbnail generation + const thumbnailKey = `thumbnails/${assetId}.jpg`; + await thumbnailQueue.add('generate', { + assetId, + proxyKey: outputKey, + outputKey: thumbnailKey, + }); + + console.log(`[proxy] Asset ${assetId} proxy complete, thumbnail job queued`); job.updateProgress(100); - console.log(`[proxy] Asset ${assetId} proxy complete`); return { assetId, outputKey }; } catch (error) { console.error(`[proxy] Error processing asset ${assetId}:`, error); await query( - 'UPDATE assets SET status = $1 WHERE id = $2', - ['error', assetId] + `UPDATE assets SET status = 'error', updated_at = NOW() WHERE id = $1`, + [assetId] ); throw error; } finally { - // Cleanup - try { - await Promise.all([ - unlink(inputPath).catch(() => {}), - unlink(outputPath).catch(() => {}), - ]); - } catch (err) { - console.error(`[proxy] Cleanup error for job ${job.id}:`, err); - } + await Promise.all([ + unlink(inputPath).catch(() => {}), + unlink(outputPath).catch(() => {}), + ]); } };