dragonflight/services/mam-api/src/routes/upload.js

362 lines
9.7 KiB
JavaScript
Raw Normal View History

import express from 'express';
import multer from 'multer';
import { Queue } from 'bullmq';
import { v4 as uuidv4 } from 'uuid';
import pool from '../db/pool.js';
import { s3Client, uploadStream, deleteObject, S3_BUCKET } from '../s3/client.js';
import {
CreateMultipartUploadCommand,
UploadPartCommand,
CompleteMultipartUploadCommand,
AbortMultipartUploadCommand,
} from '@aws-sdk/client-s3';
import { getAmppConfig, ensureFolderPath } from '../ampp/client.js';
const router = express.Router();
// Setup multer for memory storage
const memoryStorage = multer.memoryStorage();
const upload = multer({ storage: memoryStorage });
// Initialize BullMQ queues
const proxyQueue = new Queue('proxy', {
connection: {
url: process.env.REDIS_URL || 'redis://localhost:6379',
},
});
const thumbnailQueue = new Queue('thumbnail', {
connection: {
url: process.env.REDIS_URL || 'redis://localhost:6379',
},
});
// ---------------------------------------------------------------
// AMPP Sync Helpers
// ---------------------------------------------------------------
/**
* Walk up the bins table to build an ordered array of folder name segments
* from the root ancestor down to the given binId.
*/
async function resolveBinPath(binId) {
const segments = [];
let currentId = binId;
while (currentId) {
const result = await pool.query(
'SELECT id, name, parent_id FROM bins WHERE id = $1',
[currentId]
);
if (result.rows.length === 0) break;
const bin = result.rows[0];
segments.unshift(bin.name); // prepend → final order is root-to-leaf
currentId = bin.parent_id;
}
return segments;
}
/**
* Fire-and-forget: ensure the AMPP folder hierarchy exists for this asset's
* project/bin, then persist the resulting folder:id on the asset record so
* the AMPP Script Task can look it up and do the final link.
*
* Never throws logs failures but does NOT fail the Dragon-Wind upload.
*/
async function syncToAmpp(assetId, projectId, binId) {
try {
const config = await getAmppConfig();
if (!config) return; // AMPP not configured — skip silently
// Look up project name
const projResult = await pool.query(
'SELECT name FROM projects WHERE id = $1',
[projectId]
);
if (projResult.rows.length === 0) return;
const projectName = projResult.rows[0].name;
// Build folder path: ProjectName / [BinAncestors...] / BinName
const segments = [projectName];
if (binId) {
const binSegments = await resolveBinPath(binId);
segments.push(...binSegments);
}
// Create or verify folder hierarchy in AMPP
const folderId = await ensureFolderPath(config, segments);
if (!folderId) return;
// Persist AMPP folder ID on asset so Script Task can look it up by filename
await pool.query(
'UPDATE assets SET ampp_folder_id = $1, ampp_synced_at = NOW() WHERE id = $2',
[folderId, assetId]
);
console.log(`[AMPP] asset ${assetId} → folder ${folderId} (${segments.join(' / ')})`);
} catch (err) {
console.error(`[AMPP] sync failed for asset ${assetId}:`, err.message);
}
}
// POST /api/v1/upload/init - Initialize a multipart upload
router.post('/init', async (req, res, next) => {
try {
const { filename, fileSize, contentType, projectId, binId, tags } = req.body;
if (!filename || !fileSize || !contentType || !projectId) {
return res.status(400).json({
error: 'Missing required fields: filename, fileSize, contentType, projectId',
});
}
const assetId = uuidv4();
const s3Key = `originals/${projectId}/${filename}`;
// Create asset record in database
const assetQuery = `
INSERT INTO assets (
id, project_id, bin_id, filename, display_name, status,
media_type, original_s3_key, file_size, tags, created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
RETURNING *
`;
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
await pool.query(assetQuery, [
assetId,
projectId,
binId || null,
filename,
filename,
'ingesting',
contentType,
s3Key,
fileSize,
tagsArray.length > 0 ? JSON.stringify(tagsArray) : null,
]);
// Create S3 multipart upload
const multipartUpload = await s3Client.send(
new CreateMultipartUploadCommand({
Bucket: S3_BUCKET,
Key: s3Key,
ContentType: contentType,
})
);
res.json({
assetId,
uploadId: multipartUpload.UploadId,
key: s3Key,
});
} catch (err) {
next(err);
}
});
// POST /api/v1/upload/part - Upload a single part
router.post('/part', upload.single('file'), async (req, res, next) => {
try {
const { uploadId, key, partNumber } = req.body;
if (!uploadId || !key || !partNumber || !req.file) {
return res.status(400).json({
error: 'Missing required fields: uploadId, key, partNumber, and file',
});
}
// Upload part to S3
const partUpload = await s3Client.send(
new UploadPartCommand({
Bucket: S3_BUCKET,
Key: key,
PartNumber: parseInt(partNumber, 10),
UploadId: uploadId,
Body: req.file.buffer,
})
);
res.json({
partNumber: parseInt(partNumber, 10),
etag: partUpload.ETag,
});
} catch (err) {
next(err);
}
});
// POST /api/v1/upload/complete - Complete the multipart upload
router.post('/complete', async (req, res, next) => {
try {
const { uploadId, key, assetId, parts } = req.body;
if (!uploadId || !key || !assetId || !parts || !Array.isArray(parts)) {
return res.status(400).json({
error: 'Missing required fields: uploadId, key, assetId, and parts array',
});
}
// Prepare parts for completion
const partsList = parts.map((part) => ({
ETag: part.etag,
PartNumber: part.partNumber,
}));
// Complete the multipart upload
await s3Client.send(
new CompleteMultipartUploadCommand({
Bucket: S3_BUCKET,
Key: key,
UploadId: uploadId,
MultipartUpload: {
Parts: partsList,
},
})
);
// Update asset status to processing
const updateQuery = `
UPDATE assets
SET status = 'processing', updated_at = NOW()
WHERE id = $1
RETURNING *
`;
const result = await pool.query(updateQuery, [assetId]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Asset not found' });
}
const asset = result.rows[0];
// Queue proxy and thumbnail generation
await proxyQueue.add('generate', {
assetId,
inputKey: key,
outputKey: `proxies/${assetId}.mp4`,
});
await thumbnailQueue.add('generate', {
assetId,
inputKey: `proxies/${assetId}.mp4`,
outputKey: `thumbnails/${assetId}.jpg`,
});
// Sync AMPP folder structure — non-blocking, never fails the upload
syncToAmpp(asset.id, asset.project_id, asset.bin_id);
res.json(asset);
} catch (err) {
next(err);
}
});
// POST /api/v1/upload/abort - Abort upload
router.post('/abort', async (req, res, next) => {
try {
const { uploadId, key, assetId } = req.body;
if (!uploadId || !key || !assetId) {
return res.status(400).json({
error: 'Missing required fields: uploadId, key, assetId',
});
}
// Abort S3 multipart upload
await s3Client.send(
new AbortMultipartUploadCommand({
Bucket: S3_BUCKET,
Key: key,
UploadId: uploadId,
})
);
// Delete asset record
await pool.query('DELETE FROM assets WHERE id = $1', [assetId]);
res.json({ message: 'Upload aborted and asset deleted' });
} catch (err) {
next(err);
}
});
// POST /api/v1/upload/simple - Simple single-file upload for small files
router.post('/simple', upload.single('file'), async (req, res, next) => {
try {
const { filename, projectId, binId, tags, contentType } = req.body;
if (!filename || !projectId || !req.file) {
return res.status(400).json({
error: 'Missing required fields: filename, projectId, and file',
});
}
const assetId = uuidv4();
const s3Key = `originals/${projectId}/${filename}`;
// Create asset record
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
const assetQuery = `
INSERT INTO assets (
id, project_id, bin_id, filename, display_name, status,
media_type, original_s3_key, file_size, tags, created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
RETURNING *
`;
await pool.query(assetQuery, [
assetId,
projectId,
binId || null,
filename,
filename,
'ingesting',
contentType || req.file.mimetype,
s3Key,
req.file.size,
tagsArray.length > 0 ? JSON.stringify(tagsArray) : null,
]);
// Upload to S3
await uploadStream(s3Key, req.file.buffer, contentType || req.file.mimetype);
// Update asset status to processing
const updateQuery = `
UPDATE assets
SET status = 'processing', updated_at = NOW()
WHERE id = $1
RETURNING *
`;
const result = await pool.query(updateQuery, [assetId]);
const asset = result.rows[0];
// Queue proxy and thumbnail generation
await proxyQueue.add('generate', {
assetId,
inputKey: s3Key,
outputKey: `proxies/${assetId}.mp4`,
});
await thumbnailQueue.add('generate', {
assetId,
inputKey: `proxies/${assetId}.mp4`,
outputKey: `thumbnails/${assetId}.jpg`,
});
// Sync AMPP folder structure — non-blocking, never fails the upload
syncToAmpp(assetId, projectId, binId || null);
res.json(asset);
} catch (err) {
next(err);
}
});
export default router;