fix: remove premature thumbnail dispatch from upload route (proxy worker now handles it)

This commit is contained in:
Zac Gaetano 2026-05-15 21:26:57 -04:00
parent 6aff3cabc0
commit 4ba898f6a3

View file

@ -14,31 +14,23 @@ import { getAmppConfig, ensureFolderPath } from '../ampp/client.js';
const router = express.Router(); const router = express.Router();
// Setup multer for memory storage
const memoryStorage = multer.memoryStorage(); const memoryStorage = multer.memoryStorage();
const upload = multer({ storage: memoryStorage }); const upload = multer({ storage: memoryStorage });
// Initialize BullMQ queues const parseRedisUrl = (url) => {
const proxyQueue = new Queue('proxy', { const parsed = new URL(url);
connection: { return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
url: process.env.REDIS_URL || 'redis://localhost:6379', };
},
});
const thumbnailQueue = new Queue('thumbnail', { // Only proxy queue needed here — proxy worker dispatches thumbnail once done
connection: { const proxyQueue = new Queue('proxy', {
url: process.env.REDIS_URL || 'redis://localhost:6379', connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
},
}); });
// --------------------------------------------------------------- // ---------------------------------------------------------------
// AMPP Sync Helpers // AMPP Sync Helpers
// --------------------------------------------------------------- // ---------------------------------------------------------------
/**
* Walk up the bins table to build an ordered array of folder name segments
* from the root ancestor down to the given binId.
*/
async function resolveBinPath(binId) { async function resolveBinPath(binId) {
const segments = []; const segments = [];
let currentId = binId; let currentId = binId;
@ -49,25 +41,21 @@ async function resolveBinPath(binId) {
); );
if (result.rows.length === 0) break; if (result.rows.length === 0) break;
const bin = result.rows[0]; const bin = result.rows[0];
segments.unshift(bin.name); // prepend → final order is root-to-leaf segments.unshift(bin.name);
currentId = bin.parent_id; currentId = bin.parent_id;
} }
return segments; return segments;
} }
/** /**
* Fire-and-forget: ensure the AMPP folder hierarchy exists for this asset's * Fire-and-forget: mirror asset's project/bin path into AMPP folder hierarchy.
* project/bin, then persist the resulting folder:id on the asset record so * Never throws failures are logged but never surface to the caller.
* the AMPP Script Task can look it up and do the final link.
*
* Never throws logs failures but does NOT fail the Dragon-Wind upload.
*/ */
async function syncToAmpp(assetId, projectId, binId) { async function syncToAmpp(assetId, projectId, binId) {
try { try {
const config = await getAmppConfig(); const config = await getAmppConfig();
if (!config) return; // AMPP not configured — skip silently if (!config) return;
// Look up project name
const projResult = await pool.query( const projResult = await pool.query(
'SELECT name FROM projects WHERE id = $1', 'SELECT name FROM projects WHERE id = $1',
[projectId] [projectId]
@ -75,18 +63,15 @@ async function syncToAmpp(assetId, projectId, binId) {
if (projResult.rows.length === 0) return; if (projResult.rows.length === 0) return;
const projectName = projResult.rows[0].name; const projectName = projResult.rows[0].name;
// Build folder path: ProjectName / [BinAncestors...] / BinName
const segments = [projectName]; const segments = [projectName];
if (binId) { if (binId) {
const binSegments = await resolveBinPath(binId); const binSegments = await resolveBinPath(binId);
segments.push(...binSegments); segments.push(...binSegments);
} }
// Create or verify folder hierarchy in AMPP
const folderId = await ensureFolderPath(config, segments); const folderId = await ensureFolderPath(config, segments);
if (!folderId) return; if (!folderId) return;
// Persist AMPP folder ID on asset so Script Task can look it up by filename
await pool.query( await pool.query(
'UPDATE assets SET ampp_folder_id = $1, ampp_synced_at = NOW() WHERE id = $2', 'UPDATE assets SET ampp_folder_id = $1, ampp_synced_at = NOW() WHERE id = $2',
[folderId, assetId] [folderId, assetId]
@ -110,34 +95,25 @@ router.post('/init', async (req, res, next) => {
} }
const assetId = uuidv4(); const assetId = uuidv4();
const s3Key = `originals/${projectId}/${filename}`; const s3Key = `originals/${projectId}/${filename}`;
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
// Create asset record in database await pool.query(
const assetQuery = ` `INSERT INTO assets (
INSERT INTO assets (
id, project_id, bin_id, filename, display_name, status, id, project_id, bin_id, filename, display_name, status,
media_type, original_s3_key, file_size, tags, created_at, updated_at media_type, original_s3_key, file_size, tags, created_at, updated_at
) )
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW()) VALUES ($1,$2,$3,$4,$4,'ingesting',$5,$6,$7,$8,NOW(),NOW())`,
RETURNING * [
`; assetId, projectId, binId || null, filename,
contentType.startsWith('video') ? 'video'
: contentType.startsWith('audio') ? 'audio'
: contentType.startsWith('image') ? 'image' : 'document',
s3Key, fileSize,
tagsArray.length > 0 ? tagsArray : null,
]
);
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
await pool.query(assetQuery, [
assetId,
projectId,
binId || null,
filename,
filename,
'ingesting',
contentType,
s3Key,
fileSize,
tagsArray.length > 0 ? JSON.stringify(tagsArray) : null,
]);
// Create S3 multipart upload
const multipartUpload = await s3Client.send( const multipartUpload = await s3Client.send(
new CreateMultipartUploadCommand({ new CreateMultipartUploadCommand({
Bucket: S3_BUCKET, Bucket: S3_BUCKET,
@ -167,7 +143,6 @@ router.post('/part', upload.single('file'), async (req, res, next) => {
}); });
} }
// Upload part to S3
const partUpload = await s3Client.send( const partUpload = await s3Client.send(
new UploadPartCommand({ new UploadPartCommand({
Bucket: S3_BUCKET, Bucket: S3_BUCKET,
@ -198,33 +173,25 @@ router.post('/complete', async (req, res, next) => {
}); });
} }
// Prepare parts for completion
const partsList = parts.map((part) => ({
ETag: part.etag,
PartNumber: part.partNumber,
}));
// Complete the multipart upload
await s3Client.send( await s3Client.send(
new CompleteMultipartUploadCommand({ new CompleteMultipartUploadCommand({
Bucket: S3_BUCKET, Bucket: S3_BUCKET,
Key: key, Key: key,
UploadId: uploadId, UploadId: uploadId,
MultipartUpload: { MultipartUpload: {
Parts: partsList, Parts: parts.map(p => ({ ETag: p.etag, PartNumber: p.partNumber })),
}, },
}) })
); );
// Update asset status to processing // Original file in S3 — queue proxy generation
const updateQuery = ` // proxy worker will dispatch thumbnail once proxy is ready
UPDATE assets const result = await pool.query(
SET status = 'processing', updated_at = NOW() `UPDATE assets
WHERE id = $1 SET status = 'processing', updated_at = NOW()
RETURNING * WHERE id = $1 RETURNING *`,
`; [assetId]
);
const result = await pool.query(updateQuery, [assetId]);
if (result.rows.length === 0) { if (result.rows.length === 0) {
return res.status(404).json({ error: 'Asset not found' }); return res.status(404).json({ error: 'Asset not found' });
@ -232,20 +199,13 @@ router.post('/complete', async (req, res, next) => {
const asset = result.rows[0]; const asset = result.rows[0];
// Queue proxy and thumbnail generation
await proxyQueue.add('generate', { await proxyQueue.add('generate', {
assetId, assetId,
inputKey: key, inputKey: key,
outputKey: `proxies/${assetId}.mp4`, outputKey: `proxies/${assetId}.mp4`,
}); });
await thumbnailQueue.add('generate', { // Sync AMPP folder — non-blocking
assetId,
inputKey: `proxies/${assetId}.mp4`,
outputKey: `thumbnails/${assetId}.jpg`,
});
// Sync AMPP folder structure — non-blocking, never fails the upload
syncToAmpp(asset.id, asset.project_id, asset.bin_id); syncToAmpp(asset.id, asset.project_id, asset.bin_id);
res.json(asset); res.json(asset);
@ -265,16 +225,10 @@ router.post('/abort', async (req, res, next) => {
}); });
} }
// Abort S3 multipart upload
await s3Client.send( await s3Client.send(
new AbortMultipartUploadCommand({ new AbortMultipartUploadCommand({ Bucket: S3_BUCKET, Key: key, UploadId: uploadId })
Bucket: S3_BUCKET,
Key: key,
UploadId: uploadId,
})
); );
// Delete asset record
await pool.query('DELETE FROM assets WHERE id = $1', [assetId]); await pool.query('DELETE FROM assets WHERE id = $1', [assetId]);
res.json({ message: 'Upload aborted and asset deleted' }); res.json({ message: 'Upload aborted and asset deleted' });
@ -283,7 +237,7 @@ router.post('/abort', async (req, res, next) => {
} }
}); });
// POST /api/v1/upload/simple - Simple single-file upload for small files // POST /api/v1/upload/simple - Single-file upload for smaller files (<50 MB)
router.post('/simple', upload.single('file'), async (req, res, next) => { router.post('/simple', upload.single('file'), async (req, res, next) => {
try { try {
const { filename, projectId, binId, tags, contentType } = req.body; const { filename, projectId, binId, tags, contentType } = req.body;
@ -294,62 +248,45 @@ router.post('/simple', upload.single('file'), async (req, res, next) => {
}); });
} }
const assetId = uuidv4(); const assetId = uuidv4();
const s3Key = `originals/${projectId}/${filename}`; const s3Key = `originals/${projectId}/${filename}`;
const mimeType = contentType || req.file.mimetype;
// Create asset record
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : []; const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
const assetQuery = ` await pool.query(
INSERT INTO assets ( `INSERT INTO assets (
id, project_id, bin_id, filename, display_name, status, id, project_id, bin_id, filename, display_name, status,
media_type, original_s3_key, file_size, tags, created_at, updated_at media_type, original_s3_key, file_size, tags, created_at, updated_at
) )
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW()) VALUES ($1,$2,$3,$4,$4,'ingesting',$5,$6,$7,$8,NOW(),NOW())`,
RETURNING * [
`; assetId, projectId, binId || null, filename,
mimeType.startsWith('video') ? 'video'
: mimeType.startsWith('audio') ? 'audio'
: mimeType.startsWith('image') ? 'image' : 'document',
s3Key, req.file.size,
tagsArray.length > 0 ? tagsArray : null,
]
);
await pool.query(assetQuery, [ await uploadStream(s3Key, req.file.buffer, mimeType);
assetId,
projectId,
binId || null,
filename,
filename,
'ingesting',
contentType || req.file.mimetype,
s3Key,
req.file.size,
tagsArray.length > 0 ? JSON.stringify(tagsArray) : null,
]);
// Upload to S3 const result = await pool.query(
await uploadStream(s3Key, req.file.buffer, contentType || req.file.mimetype); `UPDATE assets SET status = 'processing', updated_at = NOW()
WHERE id = $1 RETURNING *`,
[assetId]
);
// Update asset status to processing
const updateQuery = `
UPDATE assets
SET status = 'processing', updated_at = NOW()
WHERE id = $1
RETURNING *
`;
const result = await pool.query(updateQuery, [assetId]);
const asset = result.rows[0]; const asset = result.rows[0];
// Queue proxy and thumbnail generation // Queue proxy — proxy worker dispatches thumbnail on completion
await proxyQueue.add('generate', { await proxyQueue.add('generate', {
assetId, assetId,
inputKey: s3Key, inputKey: s3Key,
outputKey: `proxies/${assetId}.mp4`, outputKey: `proxies/${assetId}.mp4`,
}); });
await thumbnailQueue.add('generate', { // Sync AMPP folder — non-blocking
assetId,
inputKey: `proxies/${assetId}.mp4`,
outputKey: `thumbnails/${assetId}.jpg`,
});
// Sync AMPP folder structure — non-blocking, never fails the upload
syncToAmpp(assetId, projectId, binId || null); syncToAmpp(assetId, projectId, binId || null);
res.json(asset); res.json(asset);