Frontend / UX / a11y - Sidebar collapse/expand toggle with localStorage persistence (#142) - Settings sections wrap inputs in <form> with Enter-to-submit + native validation; password autocomplete=new-password (#141, #138) - Asset thumbnails get descriptive alt text (#140) - Production deploy now precompiles JSX via esbuild and loads the production React UMD instead of dev builds + in-browser Babel (#139, #122) - Search wrapper gets role=search; global search input gets aria-label, role=combobox, aria-controls/aria-expanded/aria-activedescendant wiring (#137, #135) - Dashboard and Library no longer share the same nav icon (#136) - Sidebar collapses off-canvas with a topbar menu button below 768 px; mobile default is collapsed (#134) - --text-3 bumped to #8B92A0 for WCAG AA contrast on --bg-0 (#133) - Schedule and Library routes were rendering empty inside the .main flex container — switched to flex:1 + min-height:0 (#131, #132, editor + asset detail get the same fix) - Jobs nav badge now polls /jobs?status=active every 10 s and reflects the live count (#130, #113) - aria-label sweep on every icon-only button (#126) - Premiere panel release list moved to window.PREMIERE_RELEASES in data.jsx; Editor + Settings read from the same source (#125) - Typo setPgMclips → setPgmClips (#124) - Stray console.error / console.warn calls gated behind window.DF_LOG.{warn,error} (#123) - Hardcoded /api/v1 paths route through window.ZAMPP_API_PREFIX (#115) - Schedule rows no longer crash on null recorder_id (#117) - EditorKeyboard guards against document.activeElement === null (#116) - Unmount-safe timers for PasswordResetModal, Containers, Editor (#111) - Player seek clamps below totalMs, server-side range clamping + uncached 416 on EOF, client-side EOF-stall watchdog (#143) - Duration badge overlap fix on narrow asset cards (#52) Backend / security / reliability - GET /recorders fixed N+1: single LATERAL JOIN for live_asset_id; Docker inspects bounded to actually-recording rows (#121) - Upload disk-storage (multer.diskStorage) streams parts to S3 instead of buffering 500 MB in RAM (#120) - /assets list clamps limit to MAX_LIMIT=500 to prevent OOM (#119) - SDK upload archive listing + post-extract sanitize block zip-slip / tar-slip and symlink escapes (#118) - Migrations track applied state in schema_migrations, run in a transaction, and exit non-zero on failure (#107) - node-agent BMD_COUNT override uses BMD_DEVICE_PREFIX; filesystem detection wins (#109, #127) - GPU_COUNT override now merges with nvidia-smi enrichment (#108) - /cluster/heartbeat requires a node-bound token or admin user; tokens carry bound_hostname (#106) - /recorders/:id/start error responses no longer echo the Docker create payload — env vars stay out of client responses (#105) - /recorders/probe restricts schemes (srt/rtmp/rtsp/udp/rtp), blocks private + loopback hosts for non-admins, denies common service ports (#104) - Scheduler tick guarded by a Postgres advisory lock; pending/running rows claimed via UPDATE...RETURNING + FOR UPDATE SKIP LOCKED to survive multi-node deploys (#103) - UUID validateUuid('id') param middleware on every /:id route (#102) - Error handler scrubs Postgres error messages and 5xx detail (#101) - Graceful SIGTERM/SIGINT shutdown — stops scheduler, drains the HTTP server, ends the pool, 25 s force-exit watchdog (#100) - AMPP sync moved from fire-and-forget to a persisted retry queue (ampp_sync_status / attempts / next_attempt_at + scheduler retry loop with exponential backoff) (#77) Migrations - 019: api_tokens.bound_hostname (#106) - 020: assets.ampp_sync_status + retry bookkeeping (#77) Other - Defer #92 Growing-files per-upload toggle, #80 Audio tab, #57 Dashboard redesign, #56 Editor SPA polish phase 3, #114 S3 migration tool to v1.3
378 lines
11 KiB
JavaScript
378 lines
11 KiB
JavaScript
import express from 'express';
|
|
import multer from 'multer';
|
|
import fs from 'fs';
|
|
import os from 'os';
|
|
import path from 'path';
|
|
import { Queue } from 'bullmq';
|
|
import { v4 as uuidv4 } from 'uuid';
|
|
import pool from '../db/pool.js';
|
|
import { s3Client, uploadStream, deleteObject, getS3Bucket } from '../s3/client.js';
|
|
import {
|
|
CreateMultipartUploadCommand,
|
|
UploadPartCommand,
|
|
CompleteMultipartUploadCommand,
|
|
AbortMultipartUploadCommand,
|
|
} from '@aws-sdk/client-s3';
|
|
import { getAmppConfig, ensureFolderPath } from '../ampp/client.js';
|
|
|
|
const router = express.Router();
|
|
|
|
// Issue #120 — was multer.memoryStorage(): a 500 MB part stayed pinned in
|
|
// RAM per concurrent upload, OOM'ing the API under load. Disk storage in
|
|
// /tmp (or UPLOAD_TMP_DIR override) keeps the API memory footprint flat
|
|
// and is just as fast to stream back out to S3.
|
|
const UPLOAD_TMP_DIR = process.env.UPLOAD_TMP_DIR || path.join(os.tmpdir(), 'df-uploads');
|
|
try { fs.mkdirSync(UPLOAD_TMP_DIR, { recursive: true }); } catch {}
|
|
const diskStorage = multer.diskStorage({
|
|
destination: (_req, _file, cb) => cb(null, UPLOAD_TMP_DIR),
|
|
filename: (_req, _file, cb) => cb(null, `part-${uuidv4()}`),
|
|
});
|
|
const upload = multer({ storage: diskStorage, limits: { fileSize: 500 * 1024 * 1024 } });
|
|
|
|
// Best-effort cleanup of an uploaded tmp part. Never throws.
|
|
function unlinkPart(p) {
|
|
if (!p) return;
|
|
fs.unlink(p, () => {});
|
|
}
|
|
|
|
const parseRedisUrl = (url) => {
|
|
const parsed = new URL(url);
|
|
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
|
|
};
|
|
|
|
// Only proxy queue needed here — proxy worker dispatches thumbnail once done
|
|
const proxyQueue = new Queue('proxy', {
|
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
|
});
|
|
|
|
// ---------------------------------------------------------------
|
|
// AMPP Sync Helpers
|
|
// ---------------------------------------------------------------
|
|
|
|
async function resolveBinPath(binId) {
|
|
const segments = [];
|
|
let currentId = binId;
|
|
while (currentId) {
|
|
const result = await pool.query(
|
|
'SELECT id, name, parent_id FROM bins WHERE id = $1',
|
|
[currentId]
|
|
);
|
|
if (result.rows.length === 0) break;
|
|
const bin = result.rows[0];
|
|
segments.unshift(bin.name);
|
|
currentId = bin.parent_id;
|
|
}
|
|
return segments;
|
|
}
|
|
|
|
/**
|
|
* Issue #77 — mirror asset's project/bin path into AMPP folder hierarchy
|
|
* and track the sync state on the asset row so the scheduler can retry
|
|
* failed rows on a backoff schedule. Still safe to call fire-and-forget
|
|
* from upload endpoints; the caller never sees an exception, but the
|
|
* failure is now persisted instead of swallowed.
|
|
*/
|
|
export async function syncToAmpp(assetId, projectId, binId) {
|
|
try {
|
|
const config = await getAmppConfig();
|
|
if (!config) {
|
|
await pool.query(
|
|
`UPDATE assets SET ampp_sync_status = 'disabled', ampp_sync_last_error = NULL,
|
|
ampp_sync_next_attempt_at = NULL
|
|
WHERE id = $1`,
|
|
[assetId]
|
|
);
|
|
return;
|
|
}
|
|
|
|
const projResult = await pool.query(
|
|
'SELECT name FROM projects WHERE id = $1',
|
|
[projectId]
|
|
);
|
|
if (projResult.rows.length === 0) return;
|
|
const projectName = projResult.rows[0].name;
|
|
|
|
const segments = [projectName];
|
|
if (binId) {
|
|
const binSegments = await resolveBinPath(binId);
|
|
segments.push(...binSegments);
|
|
}
|
|
|
|
const folderId = await ensureFolderPath(config, segments);
|
|
if (!folderId) throw new Error('ensureFolderPath returned no id');
|
|
|
|
await pool.query(
|
|
`UPDATE assets
|
|
SET ampp_folder_id = $1,
|
|
ampp_synced_at = NOW(),
|
|
ampp_sync_status = 'synced',
|
|
ampp_sync_attempts = 0,
|
|
ampp_sync_next_attempt_at = NULL,
|
|
ampp_sync_last_error = NULL
|
|
WHERE id = $2`,
|
|
[folderId, assetId]
|
|
);
|
|
|
|
console.log(`[AMPP] asset ${assetId} → folder ${folderId} (${segments.join(' / ')})`);
|
|
} catch (err) {
|
|
// Persist the failure with exponential backoff so the scheduler retries.
|
|
const msg = (err.message || String(err)).slice(0, 500);
|
|
await pool.query(
|
|
`UPDATE assets
|
|
SET ampp_sync_status = 'failed',
|
|
ampp_sync_attempts = ampp_sync_attempts + 1,
|
|
ampp_sync_last_error = $2,
|
|
ampp_sync_next_attempt_at = NOW() + (LEAST(LEAST(ampp_sync_attempts + 1, 8), 8) * INTERVAL '2 minutes')
|
|
WHERE id = $1`,
|
|
[assetId, msg]
|
|
).catch(() => {});
|
|
console.error(`[AMPP] sync failed for asset ${assetId}: ${msg}`);
|
|
}
|
|
}
|
|
|
|
// Derive a media_type string from a MIME type
|
|
function mediaTypeFromMime(mime = '') {
|
|
if (mime.startsWith('video')) return 'video';
|
|
if (mime.startsWith('audio')) return 'audio';
|
|
if (mime.startsWith('image')) return 'image';
|
|
return 'document';
|
|
}
|
|
|
|
// GET /api/v1/upload - List in-progress uploads (#68)
|
|
router.get('/', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT id, filename, display_name, project_id, bin_id, status, created_at, updated_at
|
|
FROM assets
|
|
WHERE status = 'ingesting'
|
|
ORDER BY created_at DESC
|
|
LIMIT 50`
|
|
);
|
|
res.json(result.rows);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /api/v1/upload/init - Initialize a multipart upload
|
|
router.post('/init', async (req, res, next) => {
|
|
try {
|
|
const { filename, fileSize, contentType, projectId, binId, tags } = req.body;
|
|
|
|
if (!filename || !fileSize || !contentType || !projectId) {
|
|
return res.status(400).json({
|
|
error: 'Missing required fields: filename, fileSize, contentType, projectId',
|
|
});
|
|
}
|
|
|
|
const assetId = uuidv4();
|
|
const s3Key = `originals/${assetId}/${filename}`;
|
|
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
|
|
|
|
await pool.query(
|
|
`INSERT INTO assets (
|
|
id, project_id, bin_id, filename, display_name, status,
|
|
media_type, original_s3_key, file_size, tags, created_at, updated_at
|
|
)
|
|
VALUES ($1,$2,$3,$4,$4,'ingesting',$5,$6,$7,$8,NOW(),NOW())`,
|
|
[
|
|
assetId, projectId, binId || null, filename,
|
|
mediaTypeFromMime(contentType),
|
|
s3Key, fileSize,
|
|
tagsArray.length > 0 ? tagsArray : null,
|
|
]
|
|
);
|
|
|
|
const multipartUpload = await s3Client.send(
|
|
new CreateMultipartUploadCommand({
|
|
Bucket: getS3Bucket(),
|
|
Key: s3Key,
|
|
ContentType: contentType,
|
|
})
|
|
);
|
|
|
|
res.json({
|
|
assetId,
|
|
uploadId: multipartUpload.UploadId,
|
|
key: s3Key,
|
|
});
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// POST /api/v1/upload/part - Upload a single part
|
|
router.post('/part', upload.single('file'), async (req, res, next) => {
|
|
const tmpPath = req.file && req.file.path;
|
|
try {
|
|
const { uploadId, key, partNumber } = req.body;
|
|
|
|
if (!uploadId || !key || !partNumber || !req.file) {
|
|
unlinkPart(tmpPath);
|
|
return res.status(400).json({
|
|
error: 'Missing required fields: uploadId, key, partNumber, and file',
|
|
});
|
|
}
|
|
|
|
// Stream the on-disk part to S3 instead of buffering in RAM (#120).
|
|
const partUpload = await s3Client.send(
|
|
new UploadPartCommand({
|
|
Bucket: getS3Bucket(),
|
|
Key: key,
|
|
PartNumber: parseInt(partNumber, 10),
|
|
UploadId: uploadId,
|
|
Body: fs.createReadStream(tmpPath),
|
|
ContentLength: req.file.size,
|
|
})
|
|
);
|
|
|
|
res.json({
|
|
partNumber: parseInt(partNumber, 10),
|
|
etag: partUpload.ETag,
|
|
});
|
|
} catch (err) {
|
|
next(err);
|
|
} finally {
|
|
unlinkPart(tmpPath);
|
|
}
|
|
});
|
|
|
|
// POST /api/v1/upload/complete - Complete the multipart upload
|
|
router.post('/complete', async (req, res, next) => {
|
|
try {
|
|
const { uploadId, key, assetId, parts } = req.body;
|
|
|
|
if (!uploadId || !key || !assetId || !parts || !Array.isArray(parts)) {
|
|
return res.status(400).json({
|
|
error: 'Missing required fields: uploadId, key, assetId, and parts array',
|
|
});
|
|
}
|
|
|
|
await s3Client.send(
|
|
new CompleteMultipartUploadCommand({
|
|
Bucket: getS3Bucket(),
|
|
Key: key,
|
|
UploadId: uploadId,
|
|
MultipartUpload: {
|
|
Parts: parts.map(p => ({
|
|
ETag: p.ETag || p.etag,
|
|
PartNumber: p.partNumber || p.PartNumber,
|
|
})),
|
|
},
|
|
})
|
|
);
|
|
|
|
const result = await pool.query(
|
|
`UPDATE assets
|
|
SET status = 'processing', updated_at = NOW()
|
|
WHERE id = $1 RETURNING *`,
|
|
[assetId]
|
|
);
|
|
|
|
if (result.rows.length === 0) {
|
|
return res.status(404).json({ error: 'Asset not found' });
|
|
}
|
|
|
|
const asset = result.rows[0];
|
|
|
|
await proxyQueue.add('generate', {
|
|
assetId,
|
|
inputKey: key,
|
|
outputKey: `proxies/${assetId}.mp4`,
|
|
});
|
|
|
|
// Await AMPP sync to catch errors; failures are logged but non-fatal
|
|
await syncToAmpp(asset.id, asset.project_id, asset.bin_id).catch(err => {
|
|
console.error(`AMPP sync failed for asset ${asset.id}:`, err);
|
|
});
|
|
|
|
res.json(asset);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// POST /api/v1/upload/abort - Abort upload
|
|
router.post('/abort', async (req, res, next) => {
|
|
try {
|
|
const { uploadId, key, assetId } = req.body;
|
|
|
|
if (!uploadId || !key || !assetId) {
|
|
return res.status(400).json({
|
|
error: 'Missing required fields: uploadId, key, assetId',
|
|
});
|
|
}
|
|
|
|
await s3Client.send(
|
|
new AbortMultipartUploadCommand({ Bucket: getS3Bucket(), Key: key, UploadId: uploadId })
|
|
);
|
|
|
|
await pool.query('DELETE FROM assets WHERE id = $1', [assetId]);
|
|
|
|
res.json({ message: 'Upload aborted and asset deleted' });
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// POST /api/v1/upload/simple - Single-file upload for smaller files (<50 MB)
|
|
router.post('/simple', upload.single('file'), async (req, res, next) => {
|
|
const tmpPath = req.file && req.file.path;
|
|
try {
|
|
const { filename, projectId, binId, tags, contentType } = req.body;
|
|
|
|
if (!filename || !projectId || !req.file) {
|
|
unlinkPart(tmpPath);
|
|
return res.status(400).json({
|
|
error: 'Missing required fields: filename, projectId, and file',
|
|
});
|
|
}
|
|
|
|
const assetId = uuidv4();
|
|
const s3Key = `originals/${assetId}/${filename}`;
|
|
const mimeType = contentType || req.file.mimetype;
|
|
const tagsArray = tags ? (Array.isArray(tags) ? tags : [tags]) : [];
|
|
|
|
await pool.query(
|
|
`INSERT INTO assets (
|
|
id, project_id, bin_id, filename, display_name, status,
|
|
media_type, original_s3_key, file_size, tags, created_at, updated_at
|
|
)
|
|
VALUES ($1,$2,$3,$4,$4,'ingesting',$5,$6,$7,$8,NOW(),NOW())`,
|
|
[
|
|
assetId, projectId, binId || null, filename,
|
|
mediaTypeFromMime(mimeType),
|
|
s3Key, req.file.size,
|
|
tagsArray.length > 0 ? tagsArray : null,
|
|
]
|
|
);
|
|
|
|
// Stream the on-disk upload directly to S3 (#120).
|
|
await uploadStream(s3Key, fs.createReadStream(tmpPath), mimeType);
|
|
|
|
const result = await pool.query(
|
|
`UPDATE assets SET status = 'processing', updated_at = NOW()
|
|
WHERE id = $1 RETURNING *`,
|
|
[assetId]
|
|
);
|
|
|
|
const asset = result.rows[0];
|
|
|
|
await proxyQueue.add('generate', {
|
|
assetId,
|
|
inputKey: s3Key,
|
|
outputKey: `proxies/${assetId}.mp4`,
|
|
});
|
|
|
|
// Await AMPP sync to catch errors; failures are logged but non-fatal
|
|
await syncToAmpp(assetId, projectId, binId || null).catch(err => {
|
|
console.error(`AMPP sync failed for asset ${assetId}:`, err);
|
|
});
|
|
|
|
res.json(asset);
|
|
} catch (err) {
|
|
next(err);
|
|
} finally {
|
|
unlinkPart(tmpPath);
|
|
}
|
|
});
|
|
|
|
export default router;
|