feat(assets): SMB tag + always-available S3 migrate for growing masters

- Growing-file masters (.mxf) are tagged smb on create (while live) and on
  pending-migration; the tag swaps to s3 once promoted.
- Migrate-to-S3 (promote) now accepts assets stuck in live (sidecar post-stop
  call never landed) in addition to pending_migration, guarded to .mxf SMB
  masters only.
- Promotion queue added to the Jobs tab QUEUES so SMB->S3 migrations are
  visible/trackable like other jobs.
- Library: SMB badge shows alongside LIVE for growing masters; Move to S3 shown
  for any SMB-origin asset (live-stuck or pending_migration).

Verified: stuck-live 5.4GB master migrated SMB->S3, job tracked in Jobs tab,
tags swapped smb->s3.
This commit is contained in:
OpenCode 2026-06-05 03:37:59 +00:00
parent 105d04729a
commit 5c07b4e8b1
4 changed files with 63 additions and 17 deletions

View file

@ -203,13 +203,18 @@ router.post('/', async (req, res, next) => {
id = uuidv4(); id = uuidv4();
const mediaType = (sourceType === 'audio') ? 'audio' : 'video'; const mediaType = (sourceType === 'audio') ? 'audio' : 'video';
const assetStatus = status || 'processing'; const assetStatus = status || 'processing';
// Growing-file masters land on the SMB share as .mxf — tag them 'smb' from
// the moment they're created (while still 'live') so the library shows the
// SMB origin and can always offer the S3 migrate action.
const isGrowingSmb = !!(hiresKey && /\.mxf$/i.test(hiresKey));
const initialTags = isGrowingSmb ? ['smb'] : [];
const ins = await pool.query( const ins = await pool.query(
`INSERT INTO assets ( `INSERT INTO assets (
id, project_id, bin_id, id, project_id, bin_id,
filename, display_name, filename, display_name,
status, media_type, status, media_type,
original_s3_key, proxy_s3_key, original_s3_key, proxy_s3_key,
duration_ms, duration_ms, tags,
created_at, updated_at created_at, updated_at
) )
VALUES ( VALUES (
@ -217,7 +222,7 @@ router.post('/', async (req, res, next) => {
$4, $4, $4, $4,
$10, $9, $10, $9,
$5, $6, $5, $6,
$7, $7, $11,
COALESCE($8::timestamptz, NOW()), NOW() COALESCE($8::timestamptz, NOW()), NOW()
) )
RETURNING *`, RETURNING *`,
@ -229,6 +234,7 @@ router.post('/', async (req, res, next) => {
capturedAt || null, capturedAt || null,
mediaType, mediaType,
assetStatus, assetStatus,
initialTags,
] ]
); );
asset = ins.rows[0]; asset = ins.rows[0];
@ -533,41 +539,57 @@ router.post('/:id/pending-migration', requireAssetEdit, async (req, res, next) =
`UPDATE assets `UPDATE assets
SET status = 'pending_migration', SET status = 'pending_migration',
duration_ms = COALESCE($2, duration_ms), duration_ms = COALESCE($2, duration_ms),
-- Tag the growing master as living on SMB (in addition to its live
-- origin) so the library can show it + offer the S3 migrate action.
tags = (
SELECT ARRAY(SELECT DISTINCT unnest(COALESCE(tags, '{}'::text[]) || ARRAY['smb']))
),
updated_at = NOW() updated_at = NOW()
WHERE id = $1 WHERE id = $1
RETURNING *`, RETURNING *`,
[id, durationMs] [id, durationMs]
); );
console.log(`[assets] set pending-migration status for asset ${id}`); console.log(`[assets] set pending-migration status (+smb tag) for asset ${id}`);
res.json(upd.rows[0]); res.json(upd.rows[0]);
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// POST /:id/promote // POST /:id/promote
// Promotes an asset from 'pending_migration' (SMB) to S3. // Promotes a growing-file / SMB master to S3.
// Enqueues a 'promotion' job in BullMQ to handle the S3 upload and metadata updates. // Normally an asset is 'pending_migration' (flipped by the sidecar on a clean
// growing-file stop). But a growing recording can get STUCK in 'live' if the
// sidecar's post-stop /pending-migration call never lands (crash, network).
// Operators must always be able to migrate those too, so we accept BOTH
// 'pending_migration' and 'live' here. Enqueues a 'promotion' BullMQ job (which
// shows in the Jobs tab) to handle the SMB→S3 upload + metadata updates.
router.post('/:id/promote', requireAssetEdit, async (req, res, next) => { router.post('/:id/promote', requireAssetEdit, async (req, res, next) => {
try { try {
const { id } = req.params; const { id } = req.params;
const check = await pool.query(`SELECT status FROM assets WHERE id = $1`, [id]); const check = await pool.query(`SELECT status, original_s3_key FROM assets WHERE id = $1`, [id]);
if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const { status } = check.rows[0]; const { status, original_s3_key } = check.rows[0];
if (status !== 'pending_migration') { const MIGRATABLE = new Set(['pending_migration', 'live']);
return res.status(400).json({ error: `Asset status is "${status}" — only "pending_migration" assets can be promoted` }); if (!MIGRATABLE.has(status)) {
return res.status(400).json({ error: `Asset status is "${status}" — only growing-file (SMB) assets in "pending_migration" or "live" can be migrated to S3` });
}
// Guard: only growing-file masters live on SMB. A non-growing 'live' asset
// (still recording, or a normal upload) has no SMB master to migrate.
if (status === 'live' && !(original_s3_key && /\.mxf$/i.test(original_s3_key))) {
return res.status(400).json({ error: 'This live asset is not a finished growing-file master on SMB.' });
} }
// Update status to 'processing' so it is locked // Lock it: 'processing' while the promotion job runs.
await pool.query( await pool.query(
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1`, `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1`,
[id] [id]
); );
// Queue the promotion job in BullMQ // Queue the promotion job in BullMQ — listed in the Jobs tab (type 'promotion').
await promotionQueue.add('promote', { assetId: id }); await promotionQueue.add('promote', { assetId: id });
console.log(`[assets] queued promotion for asset ${id}`); console.log(`[assets] queued promotion (SMB→S3) for asset ${id} (was ${status})`);
res.json({ ok: true, status: 'processing' }); res.json({ ok: true, status: 'processing' });
} catch (err) { next(err); } } catch (err) { next(err); }

View file

@ -29,6 +29,7 @@ const conformQueue = new Queue('conform', { connection: redisConn })
const importQueue = new Queue('import', { connection: redisConn }); const importQueue = new Queue('import', { connection: redisConn });
const trimQueue = new Queue('trim', { connection: redisConn }); const trimQueue = new Queue('trim', { connection: redisConn });
const playoutStageQueue = new Queue('playout-stage', { connection: redisConn }); const playoutStageQueue = new Queue('playout-stage', { connection: redisConn });
const promotionQueue = new Queue('promotion', { connection: redisConn });
const QUEUES = [ const QUEUES = [
{ queue: proxyQueue, type: 'proxy' }, { queue: proxyQueue, type: 'proxy' },
@ -38,6 +39,8 @@ const QUEUES = [
{ queue: importQueue, type: 'import' }, { queue: importQueue, type: 'import' },
{ queue: trimQueue, type: 'trim' }, { queue: trimQueue, type: 'trim' },
{ queue: playoutStageQueue, type: 'playout-stage' }, { queue: playoutStageQueue, type: 'playout-stage' },
// SMB→S3 migration of growing-file masters. Shows the migrate action in Jobs.
{ queue: promotionQueue, type: 'promotion' },
]; ];
// BullMQ state → API status mapping // BullMQ state → API status mapping

View file

@ -643,9 +643,18 @@ function AssetContextMenu({ asset, x, y, bins, onClose, onChanged, onOpen, onRen
{asset.original_s3_key && onDownload && ( {asset.original_s3_key && onDownload && (
<button onClick={function() { onDownload(asset); }}><Icon name="download" size={11} />Download original</button> <button onClick={function() { onDownload(asset); }}><Icon name="download" size={11} />Download original</button>
)} )}
{asset.status === 'pending_migration' && ( {(function() {
<button onClick={promoteToS3}><Icon name="upload" size={11} />Move to S3</button> // A growing-file master lives on the SMB share. Offer "Move to S3" for
)} // any such asset both the normal 'pending_migration' state AND a
// recording that got stuck in 'live' (its post-stop migrate never fired).
const onSmb = (asset.tags || []).indexOf('smb') !== -1
|| /\.mxf$/i.test(asset.original_s3_key || '');
const migratable = asset.status === 'pending_migration'
|| (asset.status === 'live' && onSmb);
return migratable
? <button onClick={promoteToS3}><Icon name="upload" size={11} />Move to S3</button>
: null;
})()}
<div className="ctx-divider" /> <div className="ctx-divider" />
{(bins && bins.length > 0) ? ( {(bins && bins.length > 0) ? (
<> <>
@ -758,7 +767,11 @@ function AssetCard({ asset, onOpen, onContextMenu, onDownload, onDragStart, drag
{asset.status === 'live' && <span className="badge live">LIVE</span>} {asset.status === 'live' && <span className="badge live">LIVE</span>}
{asset.status === 'processing' && <span className="badge warning">Processing</span>} {asset.status === 'processing' && <span className="badge warning">Processing</span>}
{asset.status === 'error' && <span className="badge danger">Error</span>} {asset.status === 'error' && <span className="badge danger">Error</span>}
{asset.status === 'pending_migration' && <span className="badge warning" style={{ background: '#e8821c', color: '#fff' }}>SMB</span>} {/* SMB badge for any growing-file master on the share shown ALONGSIDE
LIVE while it records, and on its own while pending migration. */}
{(((asset.tags || []).indexOf('smb') !== -1 || /\.mxf$/i.test(asset.original_s3_key || ''))
&& (asset.status === 'live' || asset.status === 'pending_migration')) &&
<span className="badge warning" style={{ background: '#e8821c', color: '#fff' }}>SMB</span>}
</div> </div>
{/* Hi-res download trigger: only shown when the asset has an {/* Hi-res download trigger: only shown when the asset has an
original_s3_key (everything queued through ingest / conform). original_s3_key (everything queued through ingest / conform).

View file

@ -99,12 +99,20 @@ export const promotionWorker = async (job) => {
console.log(`[promotion] promoting asset ${assetId}: uploading ${localPath} (${st.size} bytes) -> s3://${S3_BUCKET}/${s3Key}`); console.log(`[promotion] promoting asset ${assetId}: uploading ${localPath} (${st.size} bytes) -> s3://${S3_BUCKET}/${s3Key}`);
await uploadStreamToS3(S3_BUCKET, s3Key, createReadStream(localPath)); await uploadStreamToS3(S3_BUCKET, s3Key, createReadStream(localPath));
// 4. Update asset status to ready (with correct S3 key and size) // 4. Update asset status to ready (with correct S3 key and size).
// Swap the 'smb' origin tag for 's3' now the master lives in S3.
await query( await query(
`UPDATE assets `UPDATE assets
SET original_s3_key = $1, SET original_s3_key = $1,
file_size = $2, file_size = $2,
status = 'ready', status = 'ready',
tags = (
SELECT ARRAY(
SELECT DISTINCT unnest(
array_remove(COALESCE(tags, '{}'::text[]), 'smb') || ARRAY['s3']
)
)
),
updated_at = NOW() updated_at = NOW()
WHERE id = $3`, WHERE id = $3`,
[s3Key, st.size, assetId] [s3Key, st.size, assetId]