import express from 'express'; import multer from 'multer'; import fs from 'fs'; import os from 'os'; import path from 'path'; import { Queue } from 'bullmq'; import { v4 as uuidv4 } from 'uuid'; import pool from '../db/pool.js'; import { s3Client, uploadStream, deleteObject, getS3Bucket } from '../s3/client.js'; import { CreateMultipartUploadCommand, UploadPartCommand, CompleteMultipartUploadCommand, AbortMultipartUploadCommand, } from '@aws-sdk/client-s3'; import { getAmppConfig, ensureFolderPath } from '../ampp/client.js'; import { assertProjectAccess, accessibleProjectIds } from '../auth/authz.js'; const router = express.Router(); // Issue #120 — was multer.memoryStorage(): a 500 MB part stayed pinned in // RAM per concurrent upload, OOM'ing the API under load. Disk storage in // /tmp (or UPLOAD_TMP_DIR override) keeps the API memory footprint flat // and is just as fast to stream back out to S3. const UPLOAD_TMP_DIR = process.env.UPLOAD_TMP_DIR || path.join(os.tmpdir(), 'df-uploads'); try { fs.mkdirSync(UPLOAD_TMP_DIR, { recursive: true }); } catch {} const diskStorage = multer.diskStorage({ destination: (_req, _file, cb) => cb(null, UPLOAD_TMP_DIR), filename: (_req, _file, cb) => cb(null, `part-${uuidv4()}`), }); const upload = multer({ storage: diskStorage, limits: { fileSize: 500 * 1024 * 1024 } }); // Best-effort cleanup of an uploaded tmp part. Never throws. function unlinkPart(p) { if (!p) return; fs.unlink(p, () => {}); } const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; }; // Only proxy queue needed here — proxy worker dispatches thumbnail once done const proxyQueue = new Queue('proxy', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); // --------------------------------------------------------------- // AMPP Sync Helpers // --------------------------------------------------------------- async function resolveBinPath(binId) { const segments = []; let currentId = binId; while (currentId) { const result = await pool.query( 'SELECT id, name, parent_id FROM bins WHERE id = $1', [currentId] ); if (result.rows.length === 0) break; const bin = result.rows[0]; segments.unshift(bin.name); currentId = bin.parent_id; } return segments; } /** * Issue #77 — mirror asset's project/bin path into AMPP folder hierarchy * and track the sync state on the asset row so the scheduler can retry * failed rows on a backoff schedule. Still safe to call fire-and-forget * from upload endpoints; the caller never sees an exception, but the * failure is now persisted instead of swallowed. */ export async function syncToAmpp(assetId, projectId, binId) { try { const config = await getAmppConfig(); if (!config) { await pool.query( `UPDATE assets SET ampp_sync_status = 'disabled', ampp_sync_last_error = NULL, ampp_sync_next_attempt_at = NULL WHERE id = $1`, [assetId] ); return; } const projResult = await pool.query( 'SELECT name FROM projects WHERE id = $1', [projectId] ); if (projResult.rows.length === 0) return; const projectName = projResult.rows[0].name; const segments = [projectName]; if (binId) { const binSegments = await resolveBinPath(binId); segments.push(...binSegments); } const folderId = await ensureFolderPath(config, segments); if (!folderId) throw new Error('ensureFolderPath returned no id'); await pool.query( `UPDATE assets SET ampp_folder_id = $1, ampp_synced_at = NOW(), ampp_sync_status = 'synced', ampp_sync_attempts = 0, ampp_sync_next_attempt_at = NULL, ampp_sync_last_error = NULL WHERE id = $2`, [folderId, assetId] ); console.log(`[AMPP] asset ${assetId} → folder ${folderId} (${segments.join(' / ')})`); } catch (err) { // Persist the failure with exponential backoff so the scheduler retries. const msg = (err.message || String(err)).slice(0, 500); await pool.query( `UPDATE assets SET ampp_sync_status = 'failed', ampp_sync_attempts = ampp_sync_attempts + 1, ampp_sync_last_error = $2, ampp_sync_next_attempt_at = NOW() + (LEAST(LEAST(ampp_sync_attempts + 1, 8), 8) * INTERVAL '2 minutes') WHERE id = $1`, [assetId, msg] ).catch(() => {}); console.error(`[AMPP] sync failed for asset ${assetId}: ${msg}`); } } // Derive a media_type string from a MIME type function mediaTypeFromMime(mime = '') { if (mime.startsWith('video')) return 'video'; if (mime.startsWith('audio')) return 'audio'; if (mime.startsWith('image')) return 'image'; return 'document'; } // GET /api/v1/upload - List in-progress uploads (#68). Scoped to projects the // caller can see — admins are unfiltered; a scoped viewer/editor only sees // uploads for projects they have access to (no enumeration of other projects' // in-flight filenames). router.get('/', async (req, res, next) => { try { const access = await accessibleProjectIds(req.user); let query = `SELECT id, filename, display_name, project_id, bin_id, status, created_at, updated_at FROM assets WHERE status = 'ingesting'`; const params = []; if (!access.all) { if (access.ids.size === 0) return res.json([]); query += ` AND project_id = ANY($1::uuid[])`; params.push([...access.ids]); } query += ` ORDER BY created_at DESC LIMIT 50`; const result = await pool.query(query, params); res.json(result.rows); } catch (err) { next(err); } }); // POST /api/v1/upload/init - Initialize a multipart upload router.post('/init', async (req, res, next) => { try { const { filename, fileSize, contentType, projectId, binId, tags } = req.body; if (!filename || !fileSize || !contentType || !projectId) { return res.status(400).json({ error: 'Missing required fields: filename, fileSize, contentType, projectId', }); } // Uploading creates an asset under a project — require edit on that project. // Without this, any logged-in user could write into any project. await assertProjectAccess(req.user, projectId, 'edit'); if (binId) { const bin = await pool.query('SELECT project_id FROM bins WHERE id = $1', [binId]); if (bin.rows.length === 0) return res.status(400).json({ error: 'binId not found' }); if (bin.rows[0].project_id !== projectId) { return res.status(400).json({ error: 'binId belongs to a different project' }); } } const assetId = uuidv4(); const s3Key = `originals/${assetId}/${filename}`; const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : []; await pool.query( `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, file_size, tags, created_at, updated_at ) VALUES ($1,$2,$3,$4,$4,'ingesting',$5,$6,$7,$8,NOW(),NOW())`, [ assetId, projectId, binId || null, filename, mediaTypeFromMime(contentType), s3Key, fileSize, tagsArray.length > 0 ? tagsArray : null, ] ); const multipartUpload = await s3Client.send( new CreateMultipartUploadCommand({ Bucket: getS3Bucket(), Key: s3Key, ContentType: contentType, }) ); res.json({ assetId, uploadId: multipartUpload.UploadId, key: s3Key, }); } catch (err) { next(err); } }); // POST /api/v1/upload/part - Upload a single part router.post('/part', upload.single('file'), async (req, res, next) => { const tmpPath = req.file && req.file.path; try { const { uploadId, key, partNumber } = req.body; if (!uploadId || !key || !partNumber || !req.file) { unlinkPart(tmpPath); return res.status(400).json({ error: 'Missing required fields: uploadId, key, partNumber, and file', }); } // Stream the on-disk part to S3 instead of buffering in RAM (#120). const partUpload = await s3Client.send( new UploadPartCommand({ Bucket: getS3Bucket(), Key: key, PartNumber: parseInt(partNumber, 10), UploadId: uploadId, Body: fs.createReadStream(tmpPath), ContentLength: req.file.size, }) ); res.json({ partNumber: parseInt(partNumber, 10), etag: partUpload.ETag, }); } catch (err) { next(err); } finally { unlinkPart(tmpPath); } }); // POST /api/v1/upload/complete - Complete the multipart upload router.post('/complete', async (req, res, next) => { try { const { uploadId, key, assetId, parts } = req.body; if (!uploadId || !key || !assetId || !parts || !Array.isArray(parts)) { return res.status(400).json({ error: 'Missing required fields: uploadId, key, assetId, and parts array', }); } await s3Client.send( new CompleteMultipartUploadCommand({ Bucket: getS3Bucket(), Key: key, UploadId: uploadId, MultipartUpload: { Parts: parts.map(p => ({ ETag: p.ETag || p.etag, PartNumber: p.partNumber || p.PartNumber, })), }, }) ); const result = await pool.query( `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [assetId] ); if (result.rows.length === 0) { return res.status(404).json({ error: 'Asset not found' }); } const asset = result.rows[0]; await proxyQueue.add('generate', { assetId, inputKey: key, outputKey: `proxies/${assetId}.mp4`, }); // Await AMPP sync to catch errors; failures are logged but non-fatal await syncToAmpp(asset.id, asset.project_id, asset.bin_id).catch(err => { console.error(`AMPP sync failed for asset ${asset.id}:`, err); }); res.json(asset); } catch (err) { next(err); } }); // POST /api/v1/upload/abort - Abort upload router.post('/abort', async (req, res, next) => { try { const { uploadId, key, assetId } = req.body; if (!uploadId || !key || !assetId) { return res.status(400).json({ error: 'Missing required fields: uploadId, key, assetId', }); } await s3Client.send( new AbortMultipartUploadCommand({ Bucket: getS3Bucket(), Key: key, UploadId: uploadId }) ); await pool.query('DELETE FROM assets WHERE id = $1', [assetId]); res.json({ message: 'Upload aborted and asset deleted' }); } catch (err) { next(err); } }); // POST /api/v1/upload/simple - Single-file upload for smaller files (<50 MB) router.post('/simple', upload.single('file'), async (req, res, next) => { const tmpPath = req.file && req.file.path; try { const { filename, projectId, binId, tags, contentType } = req.body; if (!filename || !projectId || !req.file) { unlinkPart(tmpPath); return res.status(400).json({ error: 'Missing required fields: filename, projectId, and file', }); } // Same authz gate as /init. await assertProjectAccess(req.user, projectId, 'edit'); if (binId) { const bin = await pool.query('SELECT project_id FROM bins WHERE id = $1', [binId]); if (bin.rows.length === 0) { unlinkPart(tmpPath); return res.status(400).json({ error: 'binId not found' }); } if (bin.rows[0].project_id !== projectId) { unlinkPart(tmpPath); return res.status(400).json({ error: 'binId belongs to a different project' }); } } const assetId = uuidv4(); const s3Key = `originals/${assetId}/${filename}`; const mimeType = contentType || req.file.mimetype; const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : []; await pool.query( `INSERT INTO assets ( id, project_id, bin_id, filename, display_name, status, media_type, original_s3_key, file_size, tags, created_at, updated_at ) VALUES ($1,$2,$3,$4,$4,'ingesting',$5,$6,$7,$8,NOW(),NOW())`, [ assetId, projectId, binId || null, filename, mediaTypeFromMime(mimeType), s3Key, req.file.size, tagsArray.length > 0 ? tagsArray : null, ] ); // Stream the on-disk upload directly to S3 (#120). await uploadStream(s3Key, fs.createReadStream(tmpPath), mimeType); const result = await pool.query( `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1 RETURNING *`, [assetId] ); const asset = result.rows[0]; await proxyQueue.add('generate', { assetId, inputKey: s3Key, outputKey: `proxies/${assetId}.mp4`, }); // Await AMPP sync to catch errors; failures are logged but non-fatal await syncToAmpp(assetId, projectId, binId || null).catch(err => { console.error(`AMPP sync failed for asset ${assetId}:`, err); }); res.json(asset); } catch (err) { next(err); } finally { unlinkPart(tmpPath); } }); export default router;