fix: dispatch thumbnail job after proxy completes instead of racing from upload route
This commit is contained in:
parent
b42199e597
commit
10949bc460
1 changed files with 37 additions and 24 deletions
|
|
@ -1,69 +1,82 @@
|
||||||
import { join } from 'path';
|
import { join } from 'path';
|
||||||
import { unlink } from 'fs/promises';
|
import { unlink } from 'fs/promises'
|
||||||
import { tmpdir } from 'os';
|
import { tmpdir } from 'os';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
import { query } from '../db/client.js';
|
import { query } from '../db/client.js';
|
||||||
import { downloadFromS3, uploadToS3 } from '../s3/client.js';
|
import { downloadFromS3, uploadToS3 } from '../s3/client.js';
|
||||||
import { transcodeVideo } from '../ffmpeg/executor.js';
|
import { transcodeVideo } from '../ffmpeg/executor.js';
|
||||||
|
|
||||||
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
|
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
|
||||||
|
|
||||||
|
// Dispatch thumbnail job once proxy is ready — same Redis connection as the worker
|
||||||
|
const parseRedisUrl = (url) => {
|
||||||
|
const parsed = new URL(url);
|
||||||
|
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
|
||||||
|
};
|
||||||
|
|
||||||
|
const thumbnailQueue = new Queue('thumbnail', {
|
||||||
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
||||||
|
});
|
||||||
|
|
||||||
export const proxyWorker = async (job) => {
|
export const proxyWorker = async (job) => {
|
||||||
const { assetId, inputKey, outputKey } = job.data;
|
const { assetId, inputKey, outputKey } = job.data;
|
||||||
|
|
||||||
const tmpDir = tmpdir();
|
const tmpDir = tmpdir();
|
||||||
const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`);
|
const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`);
|
||||||
const outputPath = join(tmpDir, `proxy-output-${job.id}.mp4`);
|
const outputPath = join(tmpDir, `proxy-output-${job.id}.mp4`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Download from S3
|
// Download original from S3
|
||||||
job.updateProgress(10);
|
job.updateProgress(10);
|
||||||
console.log(`[proxy] Downloading ${inputKey} for asset ${assetId}`);
|
console.log(`[proxy] Downloading ${inputKey} for asset ${assetId}`);
|
||||||
await downloadFromS3(S3_BUCKET, inputKey, inputPath);
|
await downloadFromS3(S3_BUCKET, inputKey, inputPath);
|
||||||
|
|
||||||
// Transcode
|
// Transcode to H.264 proxy
|
||||||
job.updateProgress(30);
|
job.updateProgress(30);
|
||||||
console.log(`[proxy] Transcoding asset ${assetId}`);
|
console.log(`[proxy] Transcoding asset ${assetId}`);
|
||||||
await transcodeVideo(inputPath, outputPath, {
|
await transcodeVideo(inputPath, outputPath, {
|
||||||
videoCodec: 'libx264',
|
videoCodec: 'libx264',
|
||||||
videoPreset: 'fast',
|
videoPreset: 'fast',
|
||||||
videoBitrate: '10M',
|
videoBitrate: '10M',
|
||||||
audioCodec: 'aac',
|
audioCodec: 'aac',
|
||||||
audioBitrate: '192k',
|
audioBitrate: '192k',
|
||||||
});
|
});
|
||||||
|
|
||||||
// Upload to S3
|
// Upload proxy to S3
|
||||||
job.updateProgress(70);
|
job.updateProgress(70);
|
||||||
console.log(`[proxy] Uploading to ${outputKey}`);
|
console.log(`[proxy] Uploading to ${outputKey}`);
|
||||||
await uploadToS3(S3_BUCKET, outputKey, outputPath);
|
await uploadToS3(S3_BUCKET, outputKey, outputPath);
|
||||||
|
|
||||||
// Update database
|
// Update asset record — proxy ready, status processing until thumbnail done
|
||||||
job.updateProgress(90);
|
job.updateProgress(90);
|
||||||
console.log(`[proxy] Updating asset record for ${assetId}`);
|
|
||||||
await query(
|
await query(
|
||||||
'UPDATE assets SET proxy_s3_key = $1, status = $2 WHERE id = $3',
|
`UPDATE assets SET proxy_s3_key = $1, updated_at = NOW() WHERE id = $2`,
|
||||||
[outputKey, 'ready', assetId]
|
[outputKey, assetId]
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Now proxy exists in S3 — safe to queue thumbnail generation
|
||||||
|
const thumbnailKey = `thumbnails/${assetId}.jpg`;
|
||||||
|
await thumbnailQueue.add('generate', {
|
||||||
|
assetId,
|
||||||
|
proxyKey: outputKey,
|
||||||
|
outputKey: thumbnailKey,
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(`[proxy] Asset ${assetId} proxy complete, thumbnail job queued`);
|
||||||
job.updateProgress(100);
|
job.updateProgress(100);
|
||||||
console.log(`[proxy] Asset ${assetId} proxy complete`);
|
|
||||||
|
|
||||||
return { assetId, outputKey };
|
return { assetId, outputKey };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error(`[proxy] Error processing asset ${assetId}:`, error);
|
console.error(`[proxy] Error processing asset ${assetId}:`, error);
|
||||||
await query(
|
await query(
|
||||||
'UPDATE assets SET status = $1 WHERE id = $2',
|
`UPDATE assets SET status = 'error', updated_at = NOW() WHERE id = $1`,
|
||||||
['error', assetId]
|
[assetId]
|
||||||
);
|
);
|
||||||
throw error;
|
throw error;
|
||||||
} finally {
|
} finally {
|
||||||
// Cleanup
|
await Promise.all([
|
||||||
try {
|
unlink(inputPath).catch(() => {}),
|
||||||
await Promise.all([
|
unlink(outputPath).catch(() => {}),
|
||||||
unlink(inputPath).catch(() => {}),
|
]);
|
||||||
unlink(outputPath).catch(() => {}),
|
|
||||||
]);
|
|
||||||
} catch (err) {
|
|
||||||
console.error(`[proxy] Cleanup error for job ${job.id}:`, err);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue