Phase 2: services/mam-api/src/routes/upload.js
This commit is contained in:
parent
a2c233aed3
commit
6994e2d697
1 changed files with 289 additions and 0 deletions
289
services/mam-api/src/routes/upload.js
Normal file
289
services/mam-api/src/routes/upload.js
Normal file
|
|
@ -0,0 +1,289 @@
|
||||||
|
import express from 'express';
|
||||||
|
import multer from 'multer';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import { v4 as uuidv4 } from 'uuid';
|
||||||
|
import pool from '../db/pool.js';
|
||||||
|
import { s3Client, uploadStream, deleteObject, S3_BUCKET } from '../s3/client.js';
|
||||||
|
import {
|
||||||
|
CreateMultipartUploadCommand,
|
||||||
|
UploadPartCommand,
|
||||||
|
CompleteMultipartUploadCommand,
|
||||||
|
AbortMultipartUploadCommand,
|
||||||
|
} from '@aws-sdk/client-s3';
|
||||||
|
|
||||||
|
const router = express.Router();
|
||||||
|
|
||||||
|
// Setup multer for memory storage
|
||||||
|
const memoryStorage = multer.memoryStorage();
|
||||||
|
const upload = multer({ storage: memoryStorage });
|
||||||
|
|
||||||
|
// Initialize BullMQ queues
|
||||||
|
const proxyQueue = new Queue('proxy', {
|
||||||
|
connection: {
|
||||||
|
url: process.env.REDIS_URL || 'redis://localhost:6379',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const thumbnailQueue = new Queue('thumbnail', {
|
||||||
|
connection: {
|
||||||
|
url: process.env.REDIS_URL || 'redis://localhost:6379',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /api/v1/upload/init - Initialize a multipart upload
|
||||||
|
router.post('/init', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { filename, fileSize, contentType, projectId, binId, tags } = req.body;
|
||||||
|
|
||||||
|
if (!filename || !fileSize || !contentType || !projectId) {
|
||||||
|
return res.status(400).json({
|
||||||
|
error: 'Missing required fields: filename, fileSize, contentType, projectId',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const assetId = uuidv4();
|
||||||
|
const s3Key = `originals/${projectId}/${filename}`;
|
||||||
|
|
||||||
|
// Create asset record in database
|
||||||
|
const assetQuery = `
|
||||||
|
INSERT INTO assets (
|
||||||
|
id, project_id, bin_id, filename, display_name, status,
|
||||||
|
media_type, original_s3_key, file_size, tags, created_at, updated_at
|
||||||
|
)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
|
||||||
|
RETURNING *
|
||||||
|
`;
|
||||||
|
|
||||||
|
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
|
||||||
|
|
||||||
|
await pool.query(assetQuery, [
|
||||||
|
assetId,
|
||||||
|
projectId,
|
||||||
|
binId || null,
|
||||||
|
filename,
|
||||||
|
filename,
|
||||||
|
'ingesting',
|
||||||
|
contentType,
|
||||||
|
s3Key,
|
||||||
|
fileSize,
|
||||||
|
tagsArray.length > 0 ? JSON.stringify(tagsArray) : null,
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Create S3 multipart upload
|
||||||
|
const multipartUpload = await s3Client.send(
|
||||||
|
new CreateMultipartUploadCommand({
|
||||||
|
Bucket: S3_BUCKET,
|
||||||
|
Key: s3Key,
|
||||||
|
ContentType: contentType,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
assetId,
|
||||||
|
uploadId: multipartUpload.UploadId,
|
||||||
|
key: s3Key,
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /api/v1/upload/part - Upload a single part
|
||||||
|
router.post('/part', upload.single('file'), async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { uploadId, key, partNumber } = req.body;
|
||||||
|
|
||||||
|
if (!uploadId || !key || !partNumber || !req.file) {
|
||||||
|
return res.status(400).json({
|
||||||
|
error: 'Missing required fields: uploadId, key, partNumber, and file',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload part to S3
|
||||||
|
const partUpload = await s3Client.send(
|
||||||
|
new UploadPartCommand({
|
||||||
|
Bucket: S3_BUCKET,
|
||||||
|
Key: key,
|
||||||
|
PartNumber: parseInt(partNumber, 10),
|
||||||
|
UploadId: uploadId,
|
||||||
|
Body: req.file.buffer,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
partNumber: parseInt(partNumber, 10),
|
||||||
|
etag: partUpload.ETag,
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /api/v1/upload/complete - Complete the multipart upload
|
||||||
|
router.post('/complete', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { uploadId, key, assetId, parts } = req.body;
|
||||||
|
|
||||||
|
if (!uploadId || !key || !assetId || !parts || !Array.isArray(parts)) {
|
||||||
|
return res.status(400).json({
|
||||||
|
error: 'Missing required fields: uploadId, key, assetId, and parts array',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare parts for completion
|
||||||
|
const partsList = parts.map((part) => ({
|
||||||
|
ETag: part.etag,
|
||||||
|
PartNumber: part.partNumber,
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Complete the multipart upload
|
||||||
|
await s3Client.send(
|
||||||
|
new CompleteMultipartUploadCommand({
|
||||||
|
Bucket: S3_BUCKET,
|
||||||
|
Key: key,
|
||||||
|
UploadId: uploadId,
|
||||||
|
MultipartUpload: {
|
||||||
|
Parts: partsList,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update asset status to processing
|
||||||
|
const updateQuery = `
|
||||||
|
UPDATE assets
|
||||||
|
SET status = 'processing', updated_at = NOW()
|
||||||
|
WHERE id = $1
|
||||||
|
RETURNING *
|
||||||
|
`;
|
||||||
|
|
||||||
|
const result = await pool.query(updateQuery, [assetId]);
|
||||||
|
|
||||||
|
if (result.rows.length === 0) {
|
||||||
|
return res.status(404).json({ error: 'Asset not found' });
|
||||||
|
}
|
||||||
|
|
||||||
|
const asset = result.rows[0];
|
||||||
|
|
||||||
|
// Queue proxy generation job
|
||||||
|
await proxyQueue.add('generate', {
|
||||||
|
assetId,
|
||||||
|
inputKey: key,
|
||||||
|
outputKey: `proxies/${assetId}.mp4`,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Queue thumbnail generation job
|
||||||
|
await thumbnailQueue.add('generate', {
|
||||||
|
assetId,
|
||||||
|
inputKey: `proxies/${assetId}.mp4`,
|
||||||
|
outputKey: `thumbnails/${assetId}.jpg`,
|
||||||
|
});
|
||||||
|
|
||||||
|
res.json(asset);
|
||||||
|
} catch (err) {
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /api/v1/upload/abort - Abort upload
|
||||||
|
router.post('/abort', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { uploadId, key, assetId } = req.body;
|
||||||
|
|
||||||
|
if (!uploadId || !key || !assetId) {
|
||||||
|
return res.status(400).json({
|
||||||
|
error: 'Missing required fields: uploadId, key, assetId',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abort S3 multipart upload
|
||||||
|
await s3Client.send(
|
||||||
|
new AbortMultipartUploadCommand({
|
||||||
|
Bucket: S3_BUCKET,
|
||||||
|
Key: key,
|
||||||
|
UploadId: uploadId,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// Delete asset record
|
||||||
|
await pool.query('DELETE FROM assets WHERE id = $1', [assetId]);
|
||||||
|
|
||||||
|
res.json({ message: 'Upload aborted and asset deleted' });
|
||||||
|
} catch (err) {
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /api/v1/upload/simple - Simple single-file upload for small files
|
||||||
|
router.post('/simple', upload.single('file'), async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { filename, projectId, binId, tags, contentType } = req.body;
|
||||||
|
|
||||||
|
if (!filename || !projectId || !req.file) {
|
||||||
|
return res.status(400).json({
|
||||||
|
error: 'Missing required fields: filename, projectId, and file',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const assetId = uuidv4();
|
||||||
|
const s3Key = `originals/${projectId}/${filename}`;
|
||||||
|
|
||||||
|
// Create asset record
|
||||||
|
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
|
||||||
|
|
||||||
|
const assetQuery = `
|
||||||
|
INSERT INTO assets (
|
||||||
|
id, project_id, bin_id, filename, display_name, status,
|
||||||
|
media_type, original_s3_key, file_size, tags, created_at, updated_at
|
||||||
|
)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
|
||||||
|
RETURNING *
|
||||||
|
`;
|
||||||
|
|
||||||
|
await pool.query(assetQuery, [
|
||||||
|
assetId,
|
||||||
|
projectId,
|
||||||
|
binId || null,
|
||||||
|
filename,
|
||||||
|
filename,
|
||||||
|
'ingesting',
|
||||||
|
contentType || req.file.mimetype,
|
||||||
|
s3Key,
|
||||||
|
req.file.size,
|
||||||
|
tagsArray.length > 0 ? JSON.stringify(tagsArray) : null,
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Upload to S3
|
||||||
|
await uploadStream(s3Key, req.file.buffer, contentType || req.file.mimetype);
|
||||||
|
|
||||||
|
// Update asset status to processing
|
||||||
|
const updateQuery = `
|
||||||
|
UPDATE assets
|
||||||
|
SET status = 'processing', updated_at = NOW()
|
||||||
|
WHERE id = $1
|
||||||
|
RETURNING *
|
||||||
|
`;
|
||||||
|
|
||||||
|
const result = await pool.query(updateQuery, [assetId]);
|
||||||
|
const asset = result.rows[0];
|
||||||
|
|
||||||
|
// Queue proxy generation job
|
||||||
|
await proxyQueue.add('generate', {
|
||||||
|
assetId,
|
||||||
|
inputKey: s3Key,
|
||||||
|
outputKey: `proxies/${assetId}.mp4`,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Queue thumbnail generation job
|
||||||
|
await thumbnailQueue.add('generate', {
|
||||||
|
assetId,
|
||||||
|
inputKey: `proxies/${assetId}.mp4`,
|
||||||
|
outputKey: `thumbnails/${assetId}.jpg`,
|
||||||
|
});
|
||||||
|
|
||||||
|
res.json(asset);
|
||||||
|
} catch (err) {
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
export default router;
|
||||||
Loading…
Reference in a new issue