diff --git a/services/mam-api/src/routes/upload.js b/services/mam-api/src/routes/upload.js new file mode 100644 index 0000000..01d3b5e --- /dev/null +++ b/services/mam-api/src/routes/upload.js @@ -0,0 +1,289 @@ +import express from 'express'; +import multer from 'multer'; +import { Queue } from 'bullmq'; +import { v4 as uuidv4 } from 'uuid'; +import pool from '../db/pool.js'; +import { s3Client, uploadStream, deleteObject, S3_BUCKET } from '../s3/client.js'; +import { + CreateMultipartUploadCommand, + UploadPartCommand, + CompleteMultipartUploadCommand, + AbortMultipartUploadCommand, +} from '@aws-sdk/client-s3'; + +const router = express.Router(); + +// Setup multer for memory storage +const memoryStorage = multer.memoryStorage(); +const upload = multer({ storage: memoryStorage }); + +// Initialize BullMQ queues +const proxyQueue = new Queue('proxy', { + connection: { + url: process.env.REDIS_URL || 'redis://localhost:6379', + }, +}); + +const thumbnailQueue = new Queue('thumbnail', { + connection: { + url: process.env.REDIS_URL || 'redis://localhost:6379', + }, +}); + +// POST /api/v1/upload/init - Initialize a multipart upload +router.post('/init', async (req, res, next) => { + try { + const { filename, fileSize, contentType, projectId, binId, tags } = req.body; + + if (!filename || !fileSize || !contentType || !projectId) { + return res.status(400).json({ + error: 'Missing required fields: filename, fileSize, contentType, projectId', + }); + } + + const assetId = uuidv4(); + const s3Key = `originals/${projectId}/${filename}`; + + // Create asset record in database + const assetQuery = ` + INSERT INTO assets ( + id, project_id, bin_id, filename, display_name, status, + media_type, original_s3_key, file_size, tags, created_at, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW()) + RETURNING * + `; + + const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : []; + + await pool.query(assetQuery, [ + assetId, + projectId, + binId || null, + filename, + filename, + 'ingesting', + contentType, + s3Key, + fileSize, + tagsArray.length > 0 ? JSON.stringify(tagsArray) : null, + ]); + + // Create S3 multipart upload + const multipartUpload = await s3Client.send( + new CreateMultipartUploadCommand({ + Bucket: S3_BUCKET, + Key: s3Key, + ContentType: contentType, + }) + ); + + res.json({ + assetId, + uploadId: multipartUpload.UploadId, + key: s3Key, + }); + } catch (err) { + next(err); + } +}); + +// POST /api/v1/upload/part - Upload a single part +router.post('/part', upload.single('file'), async (req, res, next) => { + try { + const { uploadId, key, partNumber } = req.body; + + if (!uploadId || !key || !partNumber || !req.file) { + return res.status(400).json({ + error: 'Missing required fields: uploadId, key, partNumber, and file', + }); + } + + // Upload part to S3 + const partUpload = await s3Client.send( + new UploadPartCommand({ + Bucket: S3_BUCKET, + Key: key, + PartNumber: parseInt(partNumber, 10), + UploadId: uploadId, + Body: req.file.buffer, + }) + ); + + res.json({ + partNumber: parseInt(partNumber, 10), + etag: partUpload.ETag, + }); + } catch (err) { + next(err); + } +}); + +// POST /api/v1/upload/complete - Complete the multipart upload +router.post('/complete', async (req, res, next) => { + try { + const { uploadId, key, assetId, parts } = req.body; + + if (!uploadId || !key || !assetId || !parts || !Array.isArray(parts)) { + return res.status(400).json({ + error: 'Missing required fields: uploadId, key, assetId, and parts array', + }); + } + + // Prepare parts for completion + const partsList = parts.map((part) => ({ + ETag: part.etag, + PartNumber: part.partNumber, + })); + + // Complete the multipart upload + await s3Client.send( + new CompleteMultipartUploadCommand({ + Bucket: S3_BUCKET, + Key: key, + UploadId: uploadId, + MultipartUpload: { + Parts: partsList, + }, + }) + ); + + // Update asset status to processing + const updateQuery = ` + UPDATE assets + SET status = 'processing', updated_at = NOW() + WHERE id = $1 + RETURNING * + `; + + const result = await pool.query(updateQuery, [assetId]); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Asset not found' }); + } + + const asset = result.rows[0]; + + // Queue proxy generation job + await proxyQueue.add('generate', { + assetId, + inputKey: key, + outputKey: `proxies/${assetId}.mp4`, + }); + + // Queue thumbnail generation job + await thumbnailQueue.add('generate', { + assetId, + inputKey: `proxies/${assetId}.mp4`, + outputKey: `thumbnails/${assetId}.jpg`, + }); + + res.json(asset); + } catch (err) { + next(err); + } +}); + +// POST /api/v1/upload/abort - Abort upload +router.post('/abort', async (req, res, next) => { + try { + const { uploadId, key, assetId } = req.body; + + if (!uploadId || !key || !assetId) { + return res.status(400).json({ + error: 'Missing required fields: uploadId, key, assetId', + }); + } + + // Abort S3 multipart upload + await s3Client.send( + new AbortMultipartUploadCommand({ + Bucket: S3_BUCKET, + Key: key, + UploadId: uploadId, + }) + ); + + // Delete asset record + await pool.query('DELETE FROM assets WHERE id = $1', [assetId]); + + res.json({ message: 'Upload aborted and asset deleted' }); + } catch (err) { + next(err); + } +}); + +// POST /api/v1/upload/simple - Simple single-file upload for small files +router.post('/simple', upload.single('file'), async (req, res, next) => { + try { + const { filename, projectId, binId, tags, contentType } = req.body; + + if (!filename || !projectId || !req.file) { + return res.status(400).json({ + error: 'Missing required fields: filename, projectId, and file', + }); + } + + const assetId = uuidv4(); + const s3Key = `originals/${projectId}/${filename}`; + + // Create asset record + const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : []; + + const assetQuery = ` + INSERT INTO assets ( + id, project_id, bin_id, filename, display_name, status, + media_type, original_s3_key, file_size, tags, created_at, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW()) + RETURNING * + `; + + await pool.query(assetQuery, [ + assetId, + projectId, + binId || null, + filename, + filename, + 'ingesting', + contentType || req.file.mimetype, + s3Key, + req.file.size, + tagsArray.length > 0 ? JSON.stringify(tagsArray) : null, + ]); + + // Upload to S3 + await uploadStream(s3Key, req.file.buffer, contentType || req.file.mimetype); + + // Update asset status to processing + const updateQuery = ` + UPDATE assets + SET status = 'processing', updated_at = NOW() + WHERE id = $1 + RETURNING * + `; + + const result = await pool.query(updateQuery, [assetId]); + const asset = result.rows[0]; + + // Queue proxy generation job + await proxyQueue.add('generate', { + assetId, + inputKey: s3Key, + outputKey: `proxies/${assetId}.mp4`, + }); + + // Queue thumbnail generation job + await thumbnailQueue.add('generate', { + assetId, + inputKey: `proxies/${assetId}.mp4`, + outputKey: `thumbnails/${assetId}.jpg`, + }); + + res.json(asset); + } catch (err) { + next(err); + } +}); + +export default router;