diff --git a/services/capture/src/index.js b/services/capture/src/index.js index 8a2ae21..2d8e561 100644 --- a/services/capture/src/index.js +++ b/services/capture/src/index.js @@ -151,14 +151,24 @@ async function gracefulShutdown(signal) { } } } else if (completed.growingPath) { - // Growing-files recorder: the master lives on the SMB share as a .ts, - // NOT in S3 yet. The promotion worker (which watches the same share) - // uploads it to S3 and enqueues the proxy from the real, finalized key. - // We must NOT call /finalize here: that sets original_s3_key to a key - // that doesn't exist yet and enqueues a proxy that instantly fails with - // "unable to open the file on disk." Leave the asset 'live' for the - // promotion worker to flip to 'ready'. - console.log(`[shutdown] growing capture finalized on share (${completed.growingPath}); leaving promotion worker to upload + proxy`); + // Growing-files recorder: the master lives on the SMB share. Mark the asset + // as pending_migration so the UI shows it is on SMB and provides a manual + // right-click option to promote it to S3. + console.log(`[shutdown] growing capture finalized on share (${completed.growingPath}); flagging pending_migration`); + try { + const res = await fetch(`${MAM_API_URL}/api/v1/assets/${liveAssetId}/pending-migration`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...(MAM_API_TOKEN ? { 'Authorization': `Bearer ${MAM_API_TOKEN}` } : {}) }, + body: JSON.stringify({ duration: completed.duration }), + }); + if (!res.ok) { + console.warn(`[shutdown] mam-api pending-migration returned ${res.status}: ${await res.text()}`); + } else { + console.log('[shutdown] live asset flagged pending_migration with mam-api'); + } + } catch (mamErr) { + console.error('[shutdown] failed to flag pending_migration:', mamErr.message); + } } else if (liveAssetId) { // Finalise the pre-created live asset by id (avoids POST / 409 collision). try { diff --git a/services/capture/src/routes/capture.js b/services/capture/src/routes/capture.js index a299116..3eb73f4 100644 --- a/services/capture/src/routes/capture.js +++ b/services/capture/src/routes/capture.js @@ -398,27 +398,33 @@ router.post('/stop', async (req, res) => { const completedSession = await captureManager.stop(session_id); - // Finalize the pre-created live asset (live -> processing) so the proxy / - // thumbnail job chain kicks off. assetId is set when /start created the live - // asset; guard in case it wasn't (older callers / failed pre-create). + // Finalize the pre-created live asset. + // If it was a growing-file session, we call /pending-migration to flip status + // to 'pending_migration' (on SMB, not S3). Otherwise, we call /finalize to + // kick off the proxy/thumbnail job chain. if (completedSession.assetId) { try { - const mamResponse = await fetch(`${MAM_API_URL}/api/v1/assets/${completedSession.assetId}/finalize`, { + const path = completedSession.growingPath ? 'pending-migration' : 'finalize'; + const body = completedSession.growingPath + ? { duration: completedSession.duration } + : { + hiresKey: completedSession.hiresKey, + proxyKey: completedSession.proxyKey, + needsProxy: completedSession.proxyKey === null, + duration: completedSession.duration, + capturedAt: completedSession.startedAt, + }; + + const mamResponse = await fetch(`${MAM_API_URL}/api/v1/assets/${completedSession.assetId}/${path}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - hiresKey: completedSession.hiresKey, - proxyKey: completedSession.proxyKey, - needsProxy: completedSession.proxyKey === null, - duration: completedSession.duration, - capturedAt: completedSession.startedAt, - }), + body: JSON.stringify(body), }); if (!mamResponse.ok) { - console.warn(`MAM API finalize returned ${mamResponse.status}: ${await mamResponse.text()}`); + console.warn(`MAM API ${path} returned ${mamResponse.status}: ${await mamResponse.text()}`); } } catch (mamError) { - console.warn('Failed to finalize asset with MAM API:', mamError.message); + console.warn('Failed to finalize/pending-migrate asset with MAM API:', mamError.message); } } diff --git a/services/mam-api/src/db/migrations/034-add-pending-migration-status.sql b/services/mam-api/src/db/migrations/034-add-pending-migration-status.sql new file mode 100644 index 0000000..9da6f2c --- /dev/null +++ b/services/mam-api/src/db/migrations/034-add-pending-migration-status.sql @@ -0,0 +1,2 @@ +-- 2026-06: add 'pending_migration' to asset_status enum for manual SMB-to-S3 promotion +ALTER TYPE asset_status ADD VALUE 'pending_migration'; diff --git a/services/mam-api/src/routes/assets.js b/services/mam-api/src/routes/assets.js index 73e892a..af83b44 100644 --- a/services/mam-api/src/routes/assets.js +++ b/services/mam-api/src/routes/assets.js @@ -64,6 +64,10 @@ const hlsQueue = new Queue('hls', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); +const promotionQueue = new Queue('promotion', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); + // GET / - List assets with filtering router.get('/', async (req, res, next) => { try { @@ -508,6 +512,67 @@ router.post('/:id/finalize', requireAssetEdit, async (req, res, next) => { } catch (err) { next(err); } }); +// POST /:id/pending-migration +// Capture sidecar calls this on a SUCCESSFUL growing-file recording stop. +// Flips the asset status from 'live' to 'pending_migration' (on SMB, not S3). +router.post('/:id/pending-migration', requireAssetEdit, async (req, res, next) => { + try { + const { id } = req.params; + const { duration } = req.body; + + const check = await pool.query(`SELECT status FROM assets WHERE id = $1`, [id]); + if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); + if (check.rows[0].status !== 'live') { + return res.status(200).json({ skipped: true }); + } + + const durationNum = duration !== undefined && duration !== null ? Number(duration) : null; + const durationMs = (durationNum !== null && Number.isFinite(durationNum)) ? Math.round(durationNum * 1000) : null; + + const upd = await pool.query( + `UPDATE assets + SET status = 'pending_migration', + duration_ms = COALESCE($2, duration_ms), + updated_at = NOW() + WHERE id = $1 + RETURNING *`, + [id, durationMs] + ); + + console.log(`[assets] set pending-migration status for asset ${id}`); + res.json(upd.rows[0]); + } catch (err) { next(err); } +}); + +// POST /:id/promote +// Promotes an asset from 'pending_migration' (SMB) to S3. +// Enqueues a 'promotion' job in BullMQ to handle the S3 upload and metadata updates. +router.post('/:id/promote', requireAssetEdit, async (req, res, next) => { + try { + const { id } = req.params; + + const check = await pool.query(`SELECT status FROM assets WHERE id = $1`, [id]); + if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); + const { status } = check.rows[0]; + + if (status !== 'pending_migration') { + return res.status(400).json({ error: `Asset status is "${status}" — only "pending_migration" assets can be promoted` }); + } + + // Update status to 'processing' so it is locked + await pool.query( + `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1`, + [id] + ); + + // Queue the promotion job in BullMQ + await promotionQueue.add('promote', { assetId: id }); + console.log(`[assets] queued promotion for asset ${id}`); + + res.json({ ok: true, status: 'processing' }); + } catch (err) { next(err); } +}); + // POST /:id/live-thumbnail — set the poster thumbnail for a still-live asset. // The capture sidecar extracts the first video frame from the first HLS segment // (where the segment physically exists) and uploads it to S3, then calls this to diff --git a/services/web-ui/public/screens-library.jsx b/services/web-ui/public/screens-library.jsx index 1b318d5..f16b67d 100644 --- a/services/web-ui/public/screens-library.jsx +++ b/services/web-ui/public/screens-library.jsx @@ -413,10 +413,10 @@ function Library({ navigate, onOpenAsset, openProject, onClearProject }) {
- {['all', 'ready', 'processing', 'live', 'error'].map(function(f) { + {['all', 'ready', 'processing', 'live', 'error', 'pending_migration'].map(function(f) { return ( ); })} @@ -591,6 +591,16 @@ function AssetContextMenu({ asset, x, y, bins, onClose, onChanged, onOpen, onRen const rename = function() { if (onRename) onRename(asset); else onClose(); }; + const promoteToS3 = function() { + onClose(); + window.ZAMPP_API.fetch('/assets/' + asset.id + '/promote', { method: 'POST' }) + .then(function() { + if (onChanged) onChanged(); + window.alert('Promotion job queued. The file is being uploaded to S3 in the background.'); + }) + .catch(function(e) { alert('Promotion failed: ' + e.message); }); + }; + const moveToBin = function(binId) { onClose(); window.ZAMPP_API.fetch('/assets/' + asset.id, { method: 'PATCH', body: JSON.stringify({ bin_id: binId }) }) @@ -618,6 +628,9 @@ function AssetContextMenu({ asset, x, y, bins, onClose, onChanged, onOpen, onRen {asset.original_s3_key && onDownload && ( )} + {asset.status === 'pending_migration' && ( + + )}
{(bins && bins.length > 0) ? ( <> @@ -722,6 +735,7 @@ function AssetCard({ asset, onOpen, onContextMenu, onDownload, onDragStart, drag {asset.status === 'live' && LIVE} {asset.status === 'processing' && Processing} {asset.status === 'error' && Error} + {asset.status === 'pending_migration' && SMB}
{/* Hi-res download trigger: only shown when the asset has an original_s3_key (everything queued through ingest / conform). diff --git a/services/web-ui/public/visuals.jsx b/services/web-ui/public/visuals.jsx index 5382aa6..7651931 100644 --- a/services/web-ui/public/visuals.jsx +++ b/services/web-ui/public/visuals.jsx @@ -45,6 +45,19 @@ function AssetThumb({ asset, size = 'md' }) { return ; } + if (asset.status === 'pending_migration' && !asset.thumbnail_s3_key && !thumbUrl) { + return ( +
+
+ + Awaiting migration +
+
+ ); + } + const altText = asset.name ? `Thumbnail for ${asset.name}` : 'Asset thumbnail'; return (
@@ -232,6 +245,7 @@ function StatusDot({ status }) { done: { color: 'var(--success)', pulse: false }, failed: { color: 'var(--danger)', pulse: false }, stopped: { color: 'var(--text-4)', pulse: false }, + pending_migration: { color: 'var(--warning)', pulse: false }, }; const s = map[status] || { color: 'var(--text-3)' }; return ; diff --git a/services/worker/src/index.js b/services/worker/src/index.js index 31476a4..ce0c82e 100644 --- a/services/worker/src/index.js +++ b/services/worker/src/index.js @@ -8,7 +8,7 @@ import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/ import { trimWorker } from './workers/trimWorker.js'; import { hlsWorker } from './workers/hls.js'; import { playoutStageWorker } from './workers/playout-stage.js'; -import { startPromotionWorker } from './workers/promotion.js'; +import { promotionWorker } from './workers/promotion.js'; const parseRedisUrl = (url) => { const parsed = new URL(url); @@ -98,6 +98,8 @@ const workers = [ // playout-stage = S3 → /media volume + EBU R128 loudnorm. CPU/IO-bound; // colocate with workers that already have ffmpeg + the media mount. want('playout-stage') && createWorker('playout-stage', playoutStageWorker, { concurrency: 1 }), + // promotion = manual growing-files promotion (S3 upload + DB update + queue proxy) + want('promotion') && createWorker('promotion', promotionWorker, { concurrency: 1 }), ].filter(Boolean); console.log(`WORKER_QUEUES=${_wq || '(all)'}`); @@ -106,18 +108,9 @@ export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCURRENCY} filmstrip=${FILMSTRIP_CONCURRENCY} conform=${CONFORM_CONCURRENCY} trim=${TRIM_CONCURRENCY} import=1`); -// BUG FIX #4: startPromotionWorker() now returns a shutdown function that -// clears the poll intervals and closes the promotion proxyQueue singleton. -// Promotion (growing-files -> S3) is a polling SCAN, not a queue consumer. -// With multiple worker containers it must run on exactly one, or every node -// races the same files. Gate behind RUN_PROMOTION (set true on a single worker). -const stopPromotionWorker = (process.env.RUN_PROMOTION === 'true') ? startPromotionWorker() : null; -if (process.env.RUN_PROMOTION === 'true') console.log('[promotion] scanner ENABLED on this worker'); - console.log('Wild Dragon Worker Service started'); console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`); -console.log('Active queues: proxy, thumbnail, conform, trim, import'); -console.log('Background scans: promotion (growing-files → S3)'); +console.log('Active queues: proxy, thumbnail, conform, trim, import, promotion'); process.on('SIGTERM', async () => { console.log('SIGTERM received, shutting down...'); @@ -136,8 +129,6 @@ process.on('SIGTERM', async () => { proxyThumbnailQueue.close().catch(() => {}), youtubeProxyQueue.close().catch(() => {}), filmstripQueue.close().catch(() => {}), - // BUG FIX #4: Stop the promotion worker intervals and close its proxyQueue - stopPromotionWorker ? stopPromotionWorker() : Promise.resolve(), ]); console.log('All workers and queues closed'); diff --git a/services/worker/src/workers/promotion.js b/services/worker/src/workers/promotion.js index 7ce6f4f..342d410 100644 --- a/services/worker/src/workers/promotion.js +++ b/services/worker/src/workers/promotion.js @@ -1,18 +1,15 @@ import { readdir, stat, unlink, mkdir, writeFile } from 'node:fs/promises'; import { execFileSync } from 'node:child_process'; import { join, relative, basename } from 'node:path'; -import { createReadStream } from 'node:fs'; +import { createReadStream, existsSync } from 'node:fs'; import { Queue } from 'bullmq'; import { query } from '../db/client.js'; import { uploadStreamToS3 } from '../s3/client.js'; const GROWING_PATH = process.env.GROWING_PATH || '/growing'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; -const POLL_MS = 5000; const SMB_CREDS_FILE = '/run/promotion-smb-creds'; -// Normalize a Windows / smb:// share path to the //host/share UNC that -// mount.cifs accepts (mirrors services/capture/src/capture-manager.js). function toUncShare(raw) { if (!raw) return ''; let s = String(raw).trim().replace(/\\/g, '/'); @@ -26,12 +23,6 @@ function isMounted(path) { catch { return false; } } -// Mount the growing-files CIFS share at GROWING_PATH so the promotion scanner -// sees the SAME files the capture sidecar writes on the remote node. Without -// this the worker was watching a LOCAL empty /growing and never promoted any -// growing capture — the master never reached S3 and the only proxy that fired -// was the bogus one from capture's finalize call (against a key that doesn't -// exist) → "unable to open the file on disk". Best-effort + idempotent. async function ensureGrowingShareMounted() { const r = await query( `SELECT key, value FROM settings WHERE key = ANY($1)`, @@ -70,12 +61,6 @@ async function ensureGrowingShareMounted() { } } -let inflight = new Set(); -let idleThresholdMs = 8000; - -// Single module-level proxyQueue — avoids creating and closing a new Queue -// connection on every promoted file, which hammers Redis with connect/disconnect -// cycles and leaks connections when close() races with add(). const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 }; @@ -85,139 +70,57 @@ const proxyQueue = new Queue('proxy', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); -// Interval handles — stored so startPromotionWorker() can expose a shutdown -// function for the SIGTERM handler in index.js. -let scanInterval = null; -let thresholdInterval = null; +// BullMQ Worker handler for manual S3 promotion +export const promotionWorker = async (job) => { + const { assetId } = job.data; -async function loadIdleThreshold() { - try { - const r = await query( - `SELECT value FROM settings WHERE key = 'growing_promote_after_seconds'` - ); - const sec = parseInt(r.rows[0]?.value, 10); - if (sec > 0) idleThresholdMs = sec * 1000; - } catch (_) { /* table not migrated yet — keep default */ } -} + // 1. Ensure growing share is mounted + await ensureGrowingShareMounted(); -async function* walk(dir) { - let entries = []; - try { - entries = await readdir(dir, { withFileTypes: true }); - } catch (err) { - console.warn(`[promotion] readdir failed for ${dir}: ${err.message}`); - return; + // 2. Fetch asset details + const r = await query( + 'SELECT id, filename, project_id, status FROM assets WHERE id = $1', + [assetId] + ); + if (r.rows.length === 0) { + throw new Error(`Asset ${assetId} not found in database`); } - for (const e of entries) { - const full = join(dir, e.name); - if (e.isDirectory()) yield* walk(full); - else if (e.isFile()) yield full; + const asset = r.rows[0]; + + // 3. Resolve local path + const localPath = `${GROWING_PATH}/${asset.project_id}/${asset.filename}.mxf`; + if (!existsSync(localPath)) { + throw new Error(`Growing file not found at ${localPath}`); } -} -async function promote(filePath) { - if (inflight.has(filePath)) return; - inflight.add(filePath); - try { - // Reconstruct the S3 key from the relative path under GROWING_PATH. - // Capture writes `${GROWING_PATH}/${projectId}/${clipName}.${ext}`, which - // mirrors `projects/${projectId}/masters/${clipName}.${ext}` in S3. - const rel = relative(GROWING_PATH, filePath); // /. - const [projectId, fileName] = rel.split('/', 2); - if (!projectId || !fileName) return; - const s3Key = `projects/${projectId}/masters/${fileName}`; + const s3Key = `projects/${asset.project_id}/masters/${asset.filename}.mxf`; - // Find the matching live asset by display_name = clipName. - const clipName = basename(fileName, '.' + fileName.split('.').pop()); - const r = await query( - `SELECT id, status FROM assets - WHERE project_id = $1 AND display_name = $2 - ORDER BY created_at DESC LIMIT 1`, - [projectId, clipName] - ); - if (r.rows.length === 0) { - console.warn(`[promotion] no asset row for ${rel} — skipping`); - return; - } - const asset = r.rows[0]; + const st = await stat(localPath); + console.log(`[promotion] promoting asset ${assetId}: uploading ${localPath} (${st.size} bytes) -> s3://${S3_BUCKET}/${s3Key}`); + await uploadStreamToS3(S3_BUCKET, s3Key, createReadStream(localPath)); - // Do not promote while the recorder is STILL RECORDING this session. - // The mtime-idle heuristic is unreliable over CIFS (attribute caching makes - // an actively-growing MXF look "stable"). Gate on the recorder's live - // status: only promote once recording stopped. - const recActive = await query( - `SELECT 1 FROM recorders - WHERE current_session_id = $1 AND status = 'recording' LIMIT 1`, - [clipName] - ); - if (recActive.rows.length > 0) { - return; - } + // 4. Update asset status to ready (with correct S3 key and size) + await query( + `UPDATE assets + SET original_s3_key = $1, + file_size = $2, + status = 'ready', + updated_at = NOW() + WHERE id = $3`, + [s3Key, st.size, assetId] + ); - const st = await stat(filePath); - console.log(`[promotion] uploading ${rel} (${st.size} bytes) -> s3://${S3_BUCKET}/${s3Key}`); - await uploadStreamToS3(S3_BUCKET, s3Key, createReadStream(filePath)); + // 5. Queue proxy generation job + await proxyQueue.add('generate', { + assetId: assetId, + inputKey: s3Key, + outputKey: `proxies/${assetId}.mp4`, + }); - await query( - `UPDATE assets - SET original_s3_key = $1, - file_size = $2, - status = 'ready', - updated_at = NOW() - WHERE id = $3`, - [s3Key, st.size, asset.id] - ); + console.log(`[promotion] asset ${assetId} promoted, proxy queued`); - await proxyQueue.add('generate', { - assetId: asset.id, - inputKey: s3Key, - outputKey: `proxies/${asset.id}.mp4`, - }); - - console.log(`[promotion] asset ${asset.id} promoted, proxy queued`); - - // Unlink the source file from the SMB share — best-effort cleanup only. - // The file is already safely in S3 and the asset is 'ready'. CIFS/SMB - // can return EIO if the share was remounted or the remote node rebooted - // between upload and unlink; log the error but do not fail the job. - await unlink(filePath).catch(err => { - console.warn(`[promotion] could not unlink ${rel} (best-effort, file already in S3): ${err.message}`); - }); - } catch (err) { - console.error('[promotion] failed for', filePath, err); - } finally { - inflight.delete(filePath); - } -} - -async function scan() { - const now = Date.now(); - for await (const file of walk(GROWING_PATH)) { - if (inflight.has(file)) continue; - let st; - try { st = await stat(file); } catch (_) { continue; } - if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) { - await promote(file); - } - } -} - -// Return a shutdown function so index.js can clear the intervals and close the -// queue connection during SIGTERM. -export function startPromotionWorker() { - loadIdleThreshold(); - // Mount the SMB landing zone before the first scan so we watch the SAME share - // the capture sidecars write to (best-effort; falls back to local GROWING_PATH). - ensureGrowingShareMounted().catch((e) => - console.error('[promotion] mount bootstrap failed:', e.message)); - thresholdInterval = setInterval(loadIdleThreshold, 60_000); - scanInterval = setInterval(scan, POLL_MS); - console.log(`[promotion] watching ${GROWING_PATH} (idle threshold ${idleThresholdMs}ms)`); - - return async function stopPromotionWorker() { - if (scanInterval) clearInterval(scanInterval); - if (thresholdInterval) clearInterval(thresholdInterval); - await proxyQueue.close().catch(() => {}); - console.log('[promotion] worker stopped'); - }; -} + // 6. Clean up local file (best-effort) + await unlink(localPath).catch(err => { + console.warn(`[promotion] could not unlink ${localPath} (best-effort, file already in S3): ${err.message}`); + }); +};