From 4ba898f6a3385afb9ed8bfeb170ae9dbfb3554de Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Fri, 15 May 2026 21:26:57 -0400 Subject: [PATCH] fix: remove premature thumbnail dispatch from upload route (proxy worker now handles it) --- services/mam-api/src/routes/upload.js | 187 +++++++++----------------- 1 file changed, 62 insertions(+), 125 deletions(-) diff --git a/services/mam-api/src/routes/upload.js b/services/mam-api/src/routes/upload.js index 84fbae9..528d7e2 100644 --- a/services/mam-api/src/routes/upload.js +++ b/services/mam-api/src/routes/upload.js @@ -14,31 +14,23 @@ import { getAmppConfig, ensureFolderPath } from '../ampp/client.js'; const router = express.Router(); -// Setup multer for memory storage const memoryStorage = multer.memoryStorage(); const upload = multer({ storage: memoryStorage }); -// Initialize BullMQ queues -const proxyQueue = new Queue('proxy', { - connection: { - url: process.env.REDIS_URL || 'redis://localhost:6379', - }, -}); +const parseRedisUrl = (url) => { + const parsed = new URL(url); + return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; +}; -const thumbnailQueue = new Queue('thumbnail', { - connection: { - url: process.env.REDIS_URL || 'redis://localhost:6379', - }, +// Only proxy queue needed here — proxy worker dispatches thumbnail once done +const proxyQueue = new Queue('proxy', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); // --------------------------------------------------------------- // AMPP Sync Helpers // --------------------------------------------------------------- -/** - * Walk up the bins table to build an ordered array of folder name segments - * from the root ancestor down to the given binId. - */ async function resolveBinPath(binId) { const segments = []; let currentId = binId; @@ -49,25 +41,21 @@ async function resolveBinPath(binId) { ); if (result.rows.length === 0) break; const bin = result.rows[0]; - segments.unshift(bin.name); // prepend → final order is root-to-leaf + segments.unshift(bin.name); currentId = bin.parent_id; } return segments; } /** - * Fire-and-forget: ensure the AMPP folder hierarchy exists for this asset's - * project/bin, then persist the resulting folder:id on the asset record so - * the AMPP Script Task can look it up and do the final link. - * - * Never throws — logs failures but does NOT fail the Dragon-Wind upload. + * Fire-and-forget: mirror asset's project/bin path into AMPP folder hierarchy. + * Never throws — failures are logged but never surface to the caller. */ async function syncToAmpp(assetId, projectId, binId) { try { const config = await getAmppConfig(); - if (!config) return; // AMPP not configured — skip silently + if (!config) return; - // Look up project name const projResult = await pool.query( 'SELECT name FROM projects WHERE id = $1', [projectId] @@ -75,18 +63,15 @@ async function syncToAmpp(assetId, projectId, binId) { if (projResult.rows.length === 0) return; const projectName = projResult.rows[0].name; - // Build folder path: ProjectName / [BinAncestors...] / BinName const segments = [projectName]; if (binId) { const binSegments = await resolveBinPath(binId); segments.push(...binSegments); } - // Create or verify folder hierarchy in AMPP const folderId = await ensureFolderPath(config, segments); if (!folderId) return; - // Persist AMPP folder ID on asset so Script Task can look it up by filename await pool.query( 'UPDATE assets SET ampp_folder_id = $1, ampp_synced_at = NOW() WHERE id = $2', [folderId, assetId] @@ -110,34 +95,25 @@ router.post('/init', async (req, res, next) => { } const assetId = uuidv4(); - const s3Key = `originals/${projectId}/${filename}`; + const s3Key = `originals/${projectId}/${filename}`; + const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : []; - // Create asset record in database - const assetQuery = ` - INSERT INTO assets ( + await pool.query( + `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, file_size, tags, created_at, updated_at ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW()) - RETURNING * - `; + VALUES ($1,$2,$3,$4,$4,'ingesting',$5,$6,$7,$8,NOW(),NOW())`, + [ + assetId, projectId, binId || null, filename, + contentType.startsWith('video') ? 'video' + : contentType.startsWith('audio') ? 'audio' + : contentType.startsWith('image') ? 'image' : 'document', + s3Key, fileSize, + tagsArray.length > 0 ? tagsArray : null, + ] + ); - const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : []; - - await pool.query(assetQuery, [ - assetId, - projectId, - binId || null, - filename, - filename, - 'ingesting', - contentType, - s3Key, - fileSize, - tagsArray.length > 0 ? JSON.stringify(tagsArray) : null, - ]); - - // Create S3 multipart upload const multipartUpload = await s3Client.send( new CreateMultipartUploadCommand({ Bucket: S3_BUCKET, @@ -167,7 +143,6 @@ router.post('/part', upload.single('file'), async (req, res, next) => { }); } - // Upload part to S3 const partUpload = await s3Client.send( new UploadPartCommand({ Bucket: S3_BUCKET, @@ -198,33 +173,25 @@ router.post('/complete', async (req, res, next) => { }); } - // Prepare parts for completion - const partsList = parts.map((part) => ({ - ETag: part.etag, - PartNumber: part.partNumber, - })); - - // Complete the multipart upload await s3Client.send( new CompleteMultipartUploadCommand({ Bucket: S3_BUCKET, Key: key, UploadId: uploadId, MultipartUpload: { - Parts: partsList, + Parts: parts.map(p => ({ ETag: p.etag, PartNumber: p.partNumber })), }, }) ); - // Update asset status to processing - const updateQuery = ` - UPDATE assets - SET status = 'processing', updated_at = NOW() - WHERE id = $1 - RETURNING * - `; - - const result = await pool.query(updateQuery, [assetId]); + // Original file in S3 — queue proxy generation + // proxy worker will dispatch thumbnail once proxy is ready + const result = await pool.query( + `UPDATE assets + SET status = 'processing', updated_at = NOW() + WHERE id = $1 RETURNING *`, + [assetId] + ); if (result.rows.length === 0) { return res.status(404).json({ error: 'Asset not found' }); @@ -232,20 +199,13 @@ router.post('/complete', async (req, res, next) => { const asset = result.rows[0]; - // Queue proxy and thumbnail generation await proxyQueue.add('generate', { assetId, - inputKey: key, + inputKey: key, outputKey: `proxies/${assetId}.mp4`, }); - await thumbnailQueue.add('generate', { - assetId, - inputKey: `proxies/${assetId}.mp4`, - outputKey: `thumbnails/${assetId}.jpg`, - }); - - // Sync AMPP folder structure — non-blocking, never fails the upload + // Sync AMPP folder — non-blocking syncToAmpp(asset.id, asset.project_id, asset.bin_id); res.json(asset); @@ -265,16 +225,10 @@ router.post('/abort', async (req, res, next) => { }); } - // Abort S3 multipart upload await s3Client.send( - new AbortMultipartUploadCommand({ - Bucket: S3_BUCKET, - Key: key, - UploadId: uploadId, - }) + new AbortMultipartUploadCommand({ Bucket: S3_BUCKET, Key: key, UploadId: uploadId }) ); - // Delete asset record await pool.query('DELETE FROM assets WHERE id = $1', [assetId]); res.json({ message: 'Upload aborted and asset deleted' }); @@ -283,7 +237,7 @@ router.post('/abort', async (req, res, next) => { } }); -// POST /api/v1/upload/simple - Simple single-file upload for small files +// POST /api/v1/upload/simple - Single-file upload for smaller files (<50 MB) router.post('/simple', upload.single('file'), async (req, res, next) => { try { const { filename, projectId, binId, tags, contentType } = req.body; @@ -294,62 +248,45 @@ router.post('/simple', upload.single('file'), async (req, res, next) => { }); } - const assetId = uuidv4(); - const s3Key = `originals/${projectId}/${filename}`; - - // Create asset record + const assetId = uuidv4(); + const s3Key = `originals/${projectId}/${filename}`; + const mimeType = contentType || req.file.mimetype; const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : []; - const assetQuery = ` - INSERT INTO assets ( + await pool.query( + `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, file_size, tags, created_at, updated_at ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW()) - RETURNING * - `; + VALUES ($1,$2,$3,$4,$4,'ingesting',$5,$6,$7,$8,NOW(),NOW())`, + [ + assetId, projectId, binId || null, filename, + mimeType.startsWith('video') ? 'video' + : mimeType.startsWith('audio') ? 'audio' + : mimeType.startsWith('image') ? 'image' : 'document', + s3Key, req.file.size, + tagsArray.length > 0 ? tagsArray : null, + ] + ); - await pool.query(assetQuery, [ - assetId, - projectId, - binId || null, - filename, - filename, - 'ingesting', - contentType || req.file.mimetype, - s3Key, - req.file.size, - tagsArray.length > 0 ? JSON.stringify(tagsArray) : null, - ]); + await uploadStream(s3Key, req.file.buffer, mimeType); - // Upload to S3 - await uploadStream(s3Key, req.file.buffer, contentType || req.file.mimetype); + const result = await pool.query( + `UPDATE assets SET status = 'processing', updated_at = NOW() + WHERE id = $1 RETURNING *`, + [assetId] + ); - // Update asset status to processing - const updateQuery = ` - UPDATE assets - SET status = 'processing', updated_at = NOW() - WHERE id = $1 - RETURNING * - `; - - const result = await pool.query(updateQuery, [assetId]); const asset = result.rows[0]; - // Queue proxy and thumbnail generation + // Queue proxy — proxy worker dispatches thumbnail on completion await proxyQueue.add('generate', { assetId, - inputKey: s3Key, + inputKey: s3Key, outputKey: `proxies/${assetId}.mp4`, }); - await thumbnailQueue.add('generate', { - assetId, - inputKey: `proxies/${assetId}.mp4`, - outputKey: `thumbnails/${assetId}.jpg`, - }); - - // Sync AMPP folder structure — non-blocking, never fails the upload + // Sync AMPP folder — non-blocking syncToAmpp(assetId, projectId, binId || null); res.json(asset);