feat(promotion): implement manual growing files promotion via BullMQ queue + pending_migration status + right click Move to S3

This commit is contained in:
Zac Gaetano 2026-06-03 00:38:50 +00:00
parent 62b9a90291
commit a04ef2de3a
8 changed files with 183 additions and 178 deletions

View file

@ -151,14 +151,24 @@ async function gracefulShutdown(signal) {
} }
} }
} else if (completed.growingPath) { } else if (completed.growingPath) {
// Growing-files recorder: the master lives on the SMB share as a .ts, // Growing-files recorder: the master lives on the SMB share. Mark the asset
// NOT in S3 yet. The promotion worker (which watches the same share) // as pending_migration so the UI shows it is on SMB and provides a manual
// uploads it to S3 and enqueues the proxy from the real, finalized key. // right-click option to promote it to S3.
// We must NOT call /finalize here: that sets original_s3_key to a key console.log(`[shutdown] growing capture finalized on share (${completed.growingPath}); flagging pending_migration`);
// that doesn't exist yet and enqueues a proxy that instantly fails with try {
// "unable to open the file on disk." Leave the asset 'live' for the const res = await fetch(`${MAM_API_URL}/api/v1/assets/${liveAssetId}/pending-migration`, {
// promotion worker to flip to 'ready'. method: 'POST',
console.log(`[shutdown] growing capture finalized on share (${completed.growingPath}); leaving promotion worker to upload + proxy`); headers: { 'Content-Type': 'application/json', ...(MAM_API_TOKEN ? { 'Authorization': `Bearer ${MAM_API_TOKEN}` } : {}) },
body: JSON.stringify({ duration: completed.duration }),
});
if (!res.ok) {
console.warn(`[shutdown] mam-api pending-migration returned ${res.status}: ${await res.text()}`);
} else {
console.log('[shutdown] live asset flagged pending_migration with mam-api');
}
} catch (mamErr) {
console.error('[shutdown] failed to flag pending_migration:', mamErr.message);
}
} else if (liveAssetId) { } else if (liveAssetId) {
// Finalise the pre-created live asset by id (avoids POST / 409 collision). // Finalise the pre-created live asset by id (avoids POST / 409 collision).
try { try {

View file

@ -398,27 +398,33 @@ router.post('/stop', async (req, res) => {
const completedSession = await captureManager.stop(session_id); const completedSession = await captureManager.stop(session_id);
// Finalize the pre-created live asset (live -> processing) so the proxy / // Finalize the pre-created live asset.
// thumbnail job chain kicks off. assetId is set when /start created the live // If it was a growing-file session, we call /pending-migration to flip status
// asset; guard in case it wasn't (older callers / failed pre-create). // to 'pending_migration' (on SMB, not S3). Otherwise, we call /finalize to
// kick off the proxy/thumbnail job chain.
if (completedSession.assetId) { if (completedSession.assetId) {
try { try {
const mamResponse = await fetch(`${MAM_API_URL}/api/v1/assets/${completedSession.assetId}/finalize`, { const path = completedSession.growingPath ? 'pending-migration' : 'finalize';
const body = completedSession.growingPath
? { duration: completedSession.duration }
: {
hiresKey: completedSession.hiresKey,
proxyKey: completedSession.proxyKey,
needsProxy: completedSession.proxyKey === null,
duration: completedSession.duration,
capturedAt: completedSession.startedAt,
};
const mamResponse = await fetch(`${MAM_API_URL}/api/v1/assets/${completedSession.assetId}/${path}`, {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ body: JSON.stringify(body),
hiresKey: completedSession.hiresKey,
proxyKey: completedSession.proxyKey,
needsProxy: completedSession.proxyKey === null,
duration: completedSession.duration,
capturedAt: completedSession.startedAt,
}),
}); });
if (!mamResponse.ok) { if (!mamResponse.ok) {
console.warn(`MAM API finalize returned ${mamResponse.status}: ${await mamResponse.text()}`); console.warn(`MAM API ${path} returned ${mamResponse.status}: ${await mamResponse.text()}`);
} }
} catch (mamError) { } catch (mamError) {
console.warn('Failed to finalize asset with MAM API:', mamError.message); console.warn('Failed to finalize/pending-migrate asset with MAM API:', mamError.message);
} }
} }

View file

@ -0,0 +1,2 @@
-- 2026-06: add 'pending_migration' to asset_status enum for manual SMB-to-S3 promotion
ALTER TYPE asset_status ADD VALUE 'pending_migration';

View file

@ -64,6 +64,10 @@ const hlsQueue = new Queue('hls', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
}); });
const promotionQueue = new Queue('promotion', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
// GET / - List assets with filtering // GET / - List assets with filtering
router.get('/', async (req, res, next) => { router.get('/', async (req, res, next) => {
try { try {
@ -508,6 +512,67 @@ router.post('/:id/finalize', requireAssetEdit, async (req, res, next) => {
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// POST /:id/pending-migration
// Capture sidecar calls this on a SUCCESSFUL growing-file recording stop.
// Flips the asset status from 'live' to 'pending_migration' (on SMB, not S3).
router.post('/:id/pending-migration', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const { duration } = req.body;
const check = await pool.query(`SELECT status FROM assets WHERE id = $1`, [id]);
if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
if (check.rows[0].status !== 'live') {
return res.status(200).json({ skipped: true });
}
const durationNum = duration !== undefined && duration !== null ? Number(duration) : null;
const durationMs = (durationNum !== null && Number.isFinite(durationNum)) ? Math.round(durationNum * 1000) : null;
const upd = await pool.query(
`UPDATE assets
SET status = 'pending_migration',
duration_ms = COALESCE($2, duration_ms),
updated_at = NOW()
WHERE id = $1
RETURNING *`,
[id, durationMs]
);
console.log(`[assets] set pending-migration status for asset ${id}`);
res.json(upd.rows[0]);
} catch (err) { next(err); }
});
// POST /:id/promote
// Promotes an asset from 'pending_migration' (SMB) to S3.
// Enqueues a 'promotion' job in BullMQ to handle the S3 upload and metadata updates.
router.post('/:id/promote', requireAssetEdit, async (req, res, next) => {
try {
const { id } = req.params;
const check = await pool.query(`SELECT status FROM assets WHERE id = $1`, [id]);
if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const { status } = check.rows[0];
if (status !== 'pending_migration') {
return res.status(400).json({ error: `Asset status is "${status}" — only "pending_migration" assets can be promoted` });
}
// Update status to 'processing' so it is locked
await pool.query(
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1`,
[id]
);
// Queue the promotion job in BullMQ
await promotionQueue.add('promote', { assetId: id });
console.log(`[assets] queued promotion for asset ${id}`);
res.json({ ok: true, status: 'processing' });
} catch (err) { next(err); }
});
// POST /:id/live-thumbnail — set the poster thumbnail for a still-live asset. // POST /:id/live-thumbnail — set the poster thumbnail for a still-live asset.
// The capture sidecar extracts the first video frame from the first HLS segment // The capture sidecar extracts the first video frame from the first HLS segment
// (where the segment physically exists) and uploads it to S3, then calls this to // (where the segment physically exists) and uploads it to S3, then calls this to

View file

@ -413,10 +413,10 @@ function Library({ navigate, onOpenAsset, openProject, onClearProject }) {
<input value={search} onChange={function(e) { setSearch(e.target.value); }} placeholder="Filter assets…" /> <input value={search} onChange={function(e) { setSearch(e.target.value); }} placeholder="Filter assets…" />
</div> </div>
<div className="tab-group"> <div className="tab-group">
{['all', 'ready', 'processing', 'live', 'error'].map(function(f) { {['all', 'ready', 'processing', 'live', 'error', 'pending_migration'].map(function(f) {
return ( return (
<button key={f} className={filter === f ? 'active' : ''} onClick={function() { setFilter(f); }}> <button key={f} className={filter === f ? 'active' : ''} onClick={function() { setFilter(f); }}>
{f === 'all' ? 'All' : f[0].toUpperCase() + f.slice(1)} {f === 'all' ? 'All' : f === 'pending_migration' ? 'Pending' : f[0].toUpperCase() + f.slice(1)}
</button> </button>
); );
})} })}
@ -591,6 +591,16 @@ function AssetContextMenu({ asset, x, y, bins, onClose, onChanged, onOpen, onRen
const rename = function() { if (onRename) onRename(asset); else onClose(); }; const rename = function() { if (onRename) onRename(asset); else onClose(); };
const promoteToS3 = function() {
onClose();
window.ZAMPP_API.fetch('/assets/' + asset.id + '/promote', { method: 'POST' })
.then(function() {
if (onChanged) onChanged();
window.alert('Promotion job queued. The file is being uploaded to S3 in the background.');
})
.catch(function(e) { alert('Promotion failed: ' + e.message); });
};
const moveToBin = function(binId) { const moveToBin = function(binId) {
onClose(); onClose();
window.ZAMPP_API.fetch('/assets/' + asset.id, { method: 'PATCH', body: JSON.stringify({ bin_id: binId }) }) window.ZAMPP_API.fetch('/assets/' + asset.id, { method: 'PATCH', body: JSON.stringify({ bin_id: binId }) })
@ -618,6 +628,9 @@ function AssetContextMenu({ asset, x, y, bins, onClose, onChanged, onOpen, onRen
{asset.original_s3_key && onDownload && ( {asset.original_s3_key && onDownload && (
<button onClick={function() { onDownload(asset); }}><Icon name="download" size={11} />Download original</button> <button onClick={function() { onDownload(asset); }}><Icon name="download" size={11} />Download original</button>
)} )}
{asset.status === 'pending_migration' && (
<button onClick={promoteToS3}><Icon name="upload" size={11} />Move to S3</button>
)}
<div className="ctx-divider" /> <div className="ctx-divider" />
{(bins && bins.length > 0) ? ( {(bins && bins.length > 0) ? (
<> <>
@ -722,6 +735,7 @@ function AssetCard({ asset, onOpen, onContextMenu, onDownload, onDragStart, drag
{asset.status === 'live' && <span className="badge live">LIVE</span>} {asset.status === 'live' && <span className="badge live">LIVE</span>}
{asset.status === 'processing' && <span className="badge warning">Processing</span>} {asset.status === 'processing' && <span className="badge warning">Processing</span>}
{asset.status === 'error' && <span className="badge danger">Error</span>} {asset.status === 'error' && <span className="badge danger">Error</span>}
{asset.status === 'pending_migration' && <span className="badge warning" style={{ background: '#e8821c', color: '#fff' }}>SMB</span>}
</div> </div>
{/* Hi-res download trigger: only shown when the asset has an {/* Hi-res download trigger: only shown when the asset has an
original_s3_key (everything queued through ingest / conform). original_s3_key (everything queued through ingest / conform).

View file

@ -45,6 +45,19 @@ function AssetThumb({ asset, size = 'md' }) {
return <LiveThumb assetId={asset.id} aspect={aspect} />; return <LiveThumb assetId={asset.id} aspect={aspect} />;
} }
if (asset.status === 'pending_migration' && !asset.thumbnail_s3_key && !thumbUrl) {
return (
<div className="asset-thumb" style={{ aspectRatio: aspect, position: 'relative', background: 'var(--bg-2)', overflow: 'hidden' }}>
<div style={{ position: 'absolute', inset: 0, display: 'flex', flexDirection: 'column',
alignItems: 'center', justifyContent: 'center', gap: 6,
color: 'var(--text-3)', fontSize: 11 }}>
<Icon name="upload" size={20} style={{ opacity: 0.5 }} />
<span>Awaiting migration</span>
</div>
</div>
);
}
const altText = asset.name ? `Thumbnail for ${asset.name}` : 'Asset thumbnail'; const altText = asset.name ? `Thumbnail for ${asset.name}` : 'Asset thumbnail';
return ( return (
<div className="asset-thumb" style={{ background: 'var(--bg-2)', aspectRatio: aspect, overflow: 'hidden', position: 'relative' }}> <div className="asset-thumb" style={{ background: 'var(--bg-2)', aspectRatio: aspect, overflow: 'hidden', position: 'relative' }}>
@ -232,6 +245,7 @@ function StatusDot({ status }) {
done: { color: 'var(--success)', pulse: false }, done: { color: 'var(--success)', pulse: false },
failed: { color: 'var(--danger)', pulse: false }, failed: { color: 'var(--danger)', pulse: false },
stopped: { color: 'var(--text-4)', pulse: false }, stopped: { color: 'var(--text-4)', pulse: false },
pending_migration: { color: 'var(--warning)', pulse: false },
}; };
const s = map[status] || { color: 'var(--text-3)' }; const s = map[status] || { color: 'var(--text-3)' };
return <span className={'status-dot ' + (s.pulse ? 'pulse' : '')} style={{ background: s.color, boxShadow: '0 0 0 3px ' + s.color + '30' }} />; return <span className={'status-dot ' + (s.pulse ? 'pulse' : '')} style={{ background: s.color, boxShadow: '0 0 0 3px ' + s.color + '30' }} />;

View file

@ -8,7 +8,7 @@ import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/
import { trimWorker } from './workers/trimWorker.js'; import { trimWorker } from './workers/trimWorker.js';
import { hlsWorker } from './workers/hls.js'; import { hlsWorker } from './workers/hls.js';
import { playoutStageWorker } from './workers/playout-stage.js'; import { playoutStageWorker } from './workers/playout-stage.js';
import { startPromotionWorker } from './workers/promotion.js'; import { promotionWorker } from './workers/promotion.js';
const parseRedisUrl = (url) => { const parseRedisUrl = (url) => {
const parsed = new URL(url); const parsed = new URL(url);
@ -98,6 +98,8 @@ const workers = [
// playout-stage = S3 → /media volume + EBU R128 loudnorm. CPU/IO-bound; // playout-stage = S3 → /media volume + EBU R128 loudnorm. CPU/IO-bound;
// colocate with workers that already have ffmpeg + the media mount. // colocate with workers that already have ffmpeg + the media mount.
want('playout-stage') && createWorker('playout-stage', playoutStageWorker, { concurrency: 1 }), want('playout-stage') && createWorker('playout-stage', playoutStageWorker, { concurrency: 1 }),
// promotion = manual growing-files promotion (S3 upload + DB update + queue proxy)
want('promotion') && createWorker('promotion', promotionWorker, { concurrency: 1 }),
].filter(Boolean); ].filter(Boolean);
console.log(`WORKER_QUEUES=${_wq || '(all)'}`); console.log(`WORKER_QUEUES=${_wq || '(all)'}`);
@ -106,18 +108,9 @@ export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions
console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCURRENCY} filmstrip=${FILMSTRIP_CONCURRENCY} conform=${CONFORM_CONCURRENCY} trim=${TRIM_CONCURRENCY} import=1`); console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCURRENCY} filmstrip=${FILMSTRIP_CONCURRENCY} conform=${CONFORM_CONCURRENCY} trim=${TRIM_CONCURRENCY} import=1`);
// BUG FIX #4: startPromotionWorker() now returns a shutdown function that
// clears the poll intervals and closes the promotion proxyQueue singleton.
// Promotion (growing-files -> S3) is a polling SCAN, not a queue consumer.
// With multiple worker containers it must run on exactly one, or every node
// races the same files. Gate behind RUN_PROMOTION (set true on a single worker).
const stopPromotionWorker = (process.env.RUN_PROMOTION === 'true') ? startPromotionWorker() : null;
if (process.env.RUN_PROMOTION === 'true') console.log('[promotion] scanner ENABLED on this worker');
console.log('Wild Dragon Worker Service started'); console.log('Wild Dragon Worker Service started');
console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`); console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`);
console.log('Active queues: proxy, thumbnail, conform, trim, import'); console.log('Active queues: proxy, thumbnail, conform, trim, import, promotion');
console.log('Background scans: promotion (growing-files → S3)');
process.on('SIGTERM', async () => { process.on('SIGTERM', async () => {
console.log('SIGTERM received, shutting down...'); console.log('SIGTERM received, shutting down...');
@ -136,8 +129,6 @@ process.on('SIGTERM', async () => {
proxyThumbnailQueue.close().catch(() => {}), proxyThumbnailQueue.close().catch(() => {}),
youtubeProxyQueue.close().catch(() => {}), youtubeProxyQueue.close().catch(() => {}),
filmstripQueue.close().catch(() => {}), filmstripQueue.close().catch(() => {}),
// BUG FIX #4: Stop the promotion worker intervals and close its proxyQueue
stopPromotionWorker ? stopPromotionWorker() : Promise.resolve(),
]); ]);
console.log('All workers and queues closed'); console.log('All workers and queues closed');

View file

@ -1,18 +1,15 @@
import { readdir, stat, unlink, mkdir, writeFile } from 'node:fs/promises'; import { readdir, stat, unlink, mkdir, writeFile } from 'node:fs/promises';
import { execFileSync } from 'node:child_process'; import { execFileSync } from 'node:child_process';
import { join, relative, basename } from 'node:path'; import { join, relative, basename } from 'node:path';
import { createReadStream } from 'node:fs'; import { createReadStream, existsSync } from 'node:fs';
import { Queue } from 'bullmq'; import { Queue } from 'bullmq';
import { query } from '../db/client.js'; import { query } from '../db/client.js';
import { uploadStreamToS3 } from '../s3/client.js'; import { uploadStreamToS3 } from '../s3/client.js';
const GROWING_PATH = process.env.GROWING_PATH || '/growing'; const GROWING_PATH = process.env.GROWING_PATH || '/growing';
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
const POLL_MS = 5000;
const SMB_CREDS_FILE = '/run/promotion-smb-creds'; const SMB_CREDS_FILE = '/run/promotion-smb-creds';
// Normalize a Windows / smb:// share path to the //host/share UNC that
// mount.cifs accepts (mirrors services/capture/src/capture-manager.js).
function toUncShare(raw) { function toUncShare(raw) {
if (!raw) return ''; if (!raw) return '';
let s = String(raw).trim().replace(/\\/g, '/'); let s = String(raw).trim().replace(/\\/g, '/');
@ -26,12 +23,6 @@ function isMounted(path) {
catch { return false; } catch { return false; }
} }
// Mount the growing-files CIFS share at GROWING_PATH so the promotion scanner
// sees the SAME files the capture sidecar writes on the remote node. Without
// this the worker was watching a LOCAL empty /growing and never promoted any
// growing capture — the master never reached S3 and the only proxy that fired
// was the bogus one from capture's finalize call (against a key that doesn't
// exist) → "unable to open the file on disk". Best-effort + idempotent.
async function ensureGrowingShareMounted() { async function ensureGrowingShareMounted() {
const r = await query( const r = await query(
`SELECT key, value FROM settings WHERE key = ANY($1)`, `SELECT key, value FROM settings WHERE key = ANY($1)`,
@ -70,12 +61,6 @@ async function ensureGrowingShareMounted() {
} }
} }
let inflight = new Set();
let idleThresholdMs = 8000;
// Single module-level proxyQueue — avoids creating and closing a new Queue
// connection on every promoted file, which hammers Redis with connect/disconnect
// cycles and leaks connections when close() races with add().
const parseRedisUrl = (url) => { const parseRedisUrl = (url) => {
const parsed = new URL(url); const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 }; return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
@ -85,139 +70,57 @@ const proxyQueue = new Queue('proxy', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
}); });
// Interval handles — stored so startPromotionWorker() can expose a shutdown // BullMQ Worker handler for manual S3 promotion
// function for the SIGTERM handler in index.js. export const promotionWorker = async (job) => {
let scanInterval = null; const { assetId } = job.data;
let thresholdInterval = null;
async function loadIdleThreshold() { // 1. Ensure growing share is mounted
try { await ensureGrowingShareMounted();
const r = await query(
`SELECT value FROM settings WHERE key = 'growing_promote_after_seconds'`
);
const sec = parseInt(r.rows[0]?.value, 10);
if (sec > 0) idleThresholdMs = sec * 1000;
} catch (_) { /* table not migrated yet — keep default */ }
}
async function* walk(dir) { // 2. Fetch asset details
let entries = []; const r = await query(
try { 'SELECT id, filename, project_id, status FROM assets WHERE id = $1',
entries = await readdir(dir, { withFileTypes: true }); [assetId]
} catch (err) { );
console.warn(`[promotion] readdir failed for ${dir}: ${err.message}`); if (r.rows.length === 0) {
return; throw new Error(`Asset ${assetId} not found in database`);
} }
for (const e of entries) { const asset = r.rows[0];
const full = join(dir, e.name);
if (e.isDirectory()) yield* walk(full); // 3. Resolve local path
else if (e.isFile()) yield full; const localPath = `${GROWING_PATH}/${asset.project_id}/${asset.filename}.mxf`;
if (!existsSync(localPath)) {
throw new Error(`Growing file not found at ${localPath}`);
} }
}
async function promote(filePath) { const s3Key = `projects/${asset.project_id}/masters/${asset.filename}.mxf`;
if (inflight.has(filePath)) return;
inflight.add(filePath);
try {
// Reconstruct the S3 key from the relative path under GROWING_PATH.
// Capture writes `${GROWING_PATH}/${projectId}/${clipName}.${ext}`, which
// mirrors `projects/${projectId}/masters/${clipName}.${ext}` in S3.
const rel = relative(GROWING_PATH, filePath); // <projectId>/<clip>.<ext>
const [projectId, fileName] = rel.split('/', 2);
if (!projectId || !fileName) return;
const s3Key = `projects/${projectId}/masters/${fileName}`;
// Find the matching live asset by display_name = clipName. const st = await stat(localPath);
const clipName = basename(fileName, '.' + fileName.split('.').pop()); console.log(`[promotion] promoting asset ${assetId}: uploading ${localPath} (${st.size} bytes) -> s3://${S3_BUCKET}/${s3Key}`);
const r = await query( await uploadStreamToS3(S3_BUCKET, s3Key, createReadStream(localPath));
`SELECT id, status FROM assets
WHERE project_id = $1 AND display_name = $2
ORDER BY created_at DESC LIMIT 1`,
[projectId, clipName]
);
if (r.rows.length === 0) {
console.warn(`[promotion] no asset row for ${rel} — skipping`);
return;
}
const asset = r.rows[0];
// Do not promote while the recorder is STILL RECORDING this session. // 4. Update asset status to ready (with correct S3 key and size)
// The mtime-idle heuristic is unreliable over CIFS (attribute caching makes await query(
// an actively-growing MXF look "stable"). Gate on the recorder's live `UPDATE assets
// status: only promote once recording stopped. SET original_s3_key = $1,
const recActive = await query( file_size = $2,
`SELECT 1 FROM recorders status = 'ready',
WHERE current_session_id = $1 AND status = 'recording' LIMIT 1`, updated_at = NOW()
[clipName] WHERE id = $3`,
); [s3Key, st.size, assetId]
if (recActive.rows.length > 0) { );
return;
}
const st = await stat(filePath); // 5. Queue proxy generation job
console.log(`[promotion] uploading ${rel} (${st.size} bytes) -> s3://${S3_BUCKET}/${s3Key}`); await proxyQueue.add('generate', {
await uploadStreamToS3(S3_BUCKET, s3Key, createReadStream(filePath)); assetId: assetId,
inputKey: s3Key,
outputKey: `proxies/${assetId}.mp4`,
});
await query( console.log(`[promotion] asset ${assetId} promoted, proxy queued`);
`UPDATE assets
SET original_s3_key = $1,
file_size = $2,
status = 'ready',
updated_at = NOW()
WHERE id = $3`,
[s3Key, st.size, asset.id]
);
await proxyQueue.add('generate', { // 6. Clean up local file (best-effort)
assetId: asset.id, await unlink(localPath).catch(err => {
inputKey: s3Key, console.warn(`[promotion] could not unlink ${localPath} (best-effort, file already in S3): ${err.message}`);
outputKey: `proxies/${asset.id}.mp4`, });
}); };
console.log(`[promotion] asset ${asset.id} promoted, proxy queued`);
// Unlink the source file from the SMB share — best-effort cleanup only.
// The file is already safely in S3 and the asset is 'ready'. CIFS/SMB
// can return EIO if the share was remounted or the remote node rebooted
// between upload and unlink; log the error but do not fail the job.
await unlink(filePath).catch(err => {
console.warn(`[promotion] could not unlink ${rel} (best-effort, file already in S3): ${err.message}`);
});
} catch (err) {
console.error('[promotion] failed for', filePath, err);
} finally {
inflight.delete(filePath);
}
}
async function scan() {
const now = Date.now();
for await (const file of walk(GROWING_PATH)) {
if (inflight.has(file)) continue;
let st;
try { st = await stat(file); } catch (_) { continue; }
if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) {
await promote(file);
}
}
}
// Return a shutdown function so index.js can clear the intervals and close the
// queue connection during SIGTERM.
export function startPromotionWorker() {
loadIdleThreshold();
// Mount the SMB landing zone before the first scan so we watch the SAME share
// the capture sidecars write to (best-effort; falls back to local GROWING_PATH).
ensureGrowingShareMounted().catch((e) =>
console.error('[promotion] mount bootstrap failed:', e.message));
thresholdInterval = setInterval(loadIdleThreshold, 60_000);
scanInterval = setInterval(scan, POLL_MS);
console.log(`[promotion] watching ${GROWING_PATH} (idle threshold ${idleThresholdMs}ms)`);
return async function stopPromotionWorker() {
if (scanInterval) clearInterval(scanInterval);
if (thresholdInterval) clearInterval(thresholdInterval);
await proxyQueue.close().catch(() => {});
console.log('[promotion] worker stopped');
};
}