feat(growing): auto-promotion scanner + hours-based delay setting
The growing_promote_after_seconds setting was stored but NEVER read — no scanner existed, so growing clips only left the SMB share on a manual right-click 'Move to S3'. This adds the missing automation: - promotion-scanner.js: every 60s, finds pending_migration assets idle (updated_at) longer than settings.growing_promote_after_seconds and enqueues a promotion job. Idempotent (status guard + stable jobId) so it's safe on every promotion worker. 12h default fallback. - worker/index.js: starts the scanner on promotion-capable workers. - Settings UI: the delay field is now 'Auto-promote to S3 after (hours)' (converts hours<->seconds; storage stays seconds). Notes the manual Library right-click 'Move to S3' option too. Manual promotion (right-click Move to S3) and the safe HLS-segment live thumbnail were already implemented and working.
This commit is contained in:
parent
0c405ae7d4
commit
727bdaae80
3 changed files with 128 additions and 3 deletions
|
|
@ -2722,6 +2722,8 @@ function GrowingSettingsCard() {
|
||||||
growing_smb_mount: cfg.growing_smb_mount,
|
growing_smb_mount: cfg.growing_smb_mount,
|
||||||
growing_smb_username: cfg.growing_smb_username,
|
growing_smb_username: cfg.growing_smb_username,
|
||||||
growing_smb_vers: cfg.growing_smb_vers,
|
growing_smb_vers: cfg.growing_smb_vers,
|
||||||
|
// UI edits the delay in HOURS; storage stays in seconds (the auto-promotion
|
||||||
|
// scanner reads growing_promote_after_seconds). Convert hours → seconds.
|
||||||
growing_promote_after_seconds: cfg.growing_promote_after_seconds,
|
growing_promote_after_seconds: cfg.growing_promote_after_seconds,
|
||||||
};
|
};
|
||||||
if (clearPwd) body.growing_smb_password_clear = true;
|
if (clearPwd) body.growing_smb_password_clear = true;
|
||||||
|
|
@ -2775,8 +2777,22 @@ function GrowingSettingsCard() {
|
||||||
<SField label="SMB share URL (for editors)">
|
<SField label="SMB share URL (for editors)">
|
||||||
<input className="field-input mono" value={cfg.growing_smb_url || ''} onChange={e => set('growing_smb_url', e.target.value)} placeholder="smb://10.0.0.25/mam-growing" />
|
<input className="field-input mono" value={cfg.growing_smb_url || ''} onChange={e => set('growing_smb_url', e.target.value)} placeholder="smb://10.0.0.25/mam-growing" />
|
||||||
</SField>
|
</SField>
|
||||||
<SField label="Promote-to-S3 idle threshold (seconds)">
|
<SField label="Auto-promote to S3 after (hours)">
|
||||||
<input className="field-input mono" type="number" value={cfg.growing_promote_after_seconds || ''} onChange={e => set('growing_promote_after_seconds', e.target.value)} placeholder="8" />
|
<input className="field-input mono" type="number" min="0" step="0.25"
|
||||||
|
value={(() => {
|
||||||
|
const secs = parseFloat(cfg.growing_promote_after_seconds);
|
||||||
|
return Number.isFinite(secs) ? +(secs / 3600).toFixed(2).replace(/\.?0+$/, '') : '';
|
||||||
|
})()}
|
||||||
|
onChange={e => {
|
||||||
|
const hours = parseFloat(e.target.value);
|
||||||
|
set('growing_promote_after_seconds', Number.isFinite(hours) ? String(Math.round(hours * 3600)) : '');
|
||||||
|
}}
|
||||||
|
placeholder="12" />
|
||||||
|
<div style={{ fontSize: 11, color: 'var(--text-3)', marginTop: 4 }}>
|
||||||
|
Growing clips left on the SMB share are uploaded to S3 automatically once they've
|
||||||
|
been idle this long. Set 0 to promote almost immediately. You can also right-click any
|
||||||
|
asset in the Library → "Move to S3" to promote it on demand.
|
||||||
|
</div>
|
||||||
</SField>
|
</SField>
|
||||||
<SettingsMsg msg={msg} />
|
<SettingsMsg msg={msg} />
|
||||||
<div style={{ display: 'flex', gap: 6, marginTop: 8 }}>
|
<div style={{ display: 'flex', gap: 6, marginTop: 8 }}>
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import { trimWorker } from './workers/trimWorker.js';
|
||||||
import { hlsWorker } from './workers/hls.js';
|
import { hlsWorker } from './workers/hls.js';
|
||||||
import { playoutStageWorker } from './workers/playout-stage.js';
|
import { playoutStageWorker } from './workers/playout-stage.js';
|
||||||
import { promotionWorker } from './workers/promotion.js';
|
import { promotionWorker } from './workers/promotion.js';
|
||||||
|
import { startPromotionScanner } from './workers/promotion-scanner.js';
|
||||||
|
|
||||||
const parseRedisUrl = (url) => {
|
const parseRedisUrl = (url) => {
|
||||||
const parsed = new URL(url);
|
const parsed = new URL(url);
|
||||||
|
|
@ -98,11 +99,22 @@ const workers = [
|
||||||
// playout-stage = S3 → /media volume + EBU R128 loudnorm. CPU/IO-bound;
|
// playout-stage = S3 → /media volume + EBU R128 loudnorm. CPU/IO-bound;
|
||||||
// colocate with workers that already have ffmpeg + the media mount.
|
// colocate with workers that already have ffmpeg + the media mount.
|
||||||
want('playout-stage') && createWorker('playout-stage', playoutStageWorker, { concurrency: 1 }),
|
want('playout-stage') && createWorker('playout-stage', playoutStageWorker, { concurrency: 1 }),
|
||||||
// promotion = manual growing-files promotion (S3 upload + DB update + queue proxy)
|
// promotion = growing-files promotion (S3 upload + DB update + queue proxy).
|
||||||
|
// Triggered manually via POST /assets/:id/promote AND automatically by the
|
||||||
|
// promotion scanner below once a pending_migration asset has been idle for
|
||||||
|
// settings.growing_promote_after_seconds.
|
||||||
want('promotion') && createWorker('promotion', promotionWorker, { concurrency: 1 }),
|
want('promotion') && createWorker('promotion', promotionWorker, { concurrency: 1 }),
|
||||||
].filter(Boolean);
|
].filter(Boolean);
|
||||||
console.log(`WORKER_QUEUES=${_wq || '(all)'}`);
|
console.log(`WORKER_QUEUES=${_wq || '(all)'}`);
|
||||||
|
|
||||||
|
// Auto-promotion scanner — only on promotion-capable workers, and only ONE
|
||||||
|
// instance is needed cluster-wide, but the scan is idempotent (status guard +
|
||||||
|
// stable jobId) so running it on every promotion worker is safe.
|
||||||
|
let _promotionScanner = null;
|
||||||
|
if (want('promotion')) {
|
||||||
|
_promotionScanner = startPromotionScanner(redisOptions);
|
||||||
|
}
|
||||||
|
|
||||||
// Filmstrip queue singleton — used by thumbnail worker to enqueue filmstrip jobs
|
// Filmstrip queue singleton — used by thumbnail worker to enqueue filmstrip jobs
|
||||||
export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions });
|
export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions });
|
||||||
|
|
||||||
|
|
|
||||||
97
services/worker/src/workers/promotion-scanner.js
Normal file
97
services/worker/src/workers/promotion-scanner.js
Normal file
|
|
@ -0,0 +1,97 @@
|
||||||
|
// Auto-promotion scanner.
|
||||||
|
//
|
||||||
|
// Growing-files recordings finish on the SMB share with status='pending_migration'.
|
||||||
|
// Promotion (SMB → S3 upload + proxy) is otherwise only triggered manually via
|
||||||
|
// POST /assets/:id/promote. This scanner closes that gap: on a fixed interval it
|
||||||
|
// finds pending_migration assets that have been idle longer than the operator-
|
||||||
|
// configured delay (settings.growing_promote_after_seconds) and enqueues a
|
||||||
|
// promotion job for each — so growing clips land in S3 automatically once the
|
||||||
|
// editor is done with the live file, without anyone clicking anything.
|
||||||
|
//
|
||||||
|
// "Idle" = assets.updated_at older than the delay. Capture stamps updated_at
|
||||||
|
// when it flips the asset to pending_migration on record stop, so the delay is
|
||||||
|
// measured from when the file stopped growing.
|
||||||
|
//
|
||||||
|
// Safe to run on every worker container: the UPDATE ... WHERE status =
|
||||||
|
// 'pending_migration' guard + BullMQ jobId dedupe (jobId = 'promote:<assetId>')
|
||||||
|
// makes double-enqueue from multiple scanners idempotent.
|
||||||
|
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import { query } from '../db/client.js';
|
||||||
|
|
||||||
|
const DEFAULT_DELAY_SECONDS = 43200; // 12h fallback if the setting is unset/invalid
|
||||||
|
const SCAN_INTERVAL_MS = parseInt(process.env.PROMOTION_SCAN_INTERVAL_MS || '60000', 10);
|
||||||
|
|
||||||
|
async function getPromoteDelaySeconds() {
|
||||||
|
try {
|
||||||
|
const r = await query(
|
||||||
|
`SELECT value FROM settings WHERE key = 'growing_promote_after_seconds'`
|
||||||
|
);
|
||||||
|
if (r.rows.length === 0) return DEFAULT_DELAY_SECONDS;
|
||||||
|
const n = parseInt(r.rows[0].value, 10);
|
||||||
|
return Number.isFinite(n) && n >= 0 ? n : DEFAULT_DELAY_SECONDS;
|
||||||
|
} catch (err) {
|
||||||
|
console.warn('[promotion-scanner] could not read delay setting:', err.message);
|
||||||
|
return DEFAULT_DELAY_SECONDS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function startPromotionScanner(redisOptions) {
|
||||||
|
const promotionQueue = new Queue('promotion', { connection: redisOptions });
|
||||||
|
|
||||||
|
const scanOnce = async () => {
|
||||||
|
try {
|
||||||
|
const delaySeconds = await getPromoteDelaySeconds();
|
||||||
|
|
||||||
|
// Find pending_migration assets idle longer than the delay. EXTRACT(EPOCH …)
|
||||||
|
// gives the age in seconds; compare against the configured threshold.
|
||||||
|
const r = await query(
|
||||||
|
`SELECT id, filename
|
||||||
|
FROM assets
|
||||||
|
WHERE status = 'pending_migration'
|
||||||
|
AND EXTRACT(EPOCH FROM (NOW() - updated_at)) >= $1
|
||||||
|
ORDER BY updated_at ASC
|
||||||
|
LIMIT 25`,
|
||||||
|
[delaySeconds]
|
||||||
|
);
|
||||||
|
|
||||||
|
if (r.rows.length === 0) return;
|
||||||
|
|
||||||
|
for (const asset of r.rows) {
|
||||||
|
// Flip to 'processing' first so a second scan tick won't re-pick it, and
|
||||||
|
// dedupe the job by a stable jobId so concurrent scanners coalesce.
|
||||||
|
const upd = await query(
|
||||||
|
`UPDATE assets SET status = 'processing', updated_at = NOW()
|
||||||
|
WHERE id = $1 AND status = 'pending_migration'
|
||||||
|
RETURNING id`,
|
||||||
|
[asset.id]
|
||||||
|
);
|
||||||
|
if (upd.rows.length === 0) continue; // another scanner/operator beat us to it
|
||||||
|
|
||||||
|
await promotionQueue.add(
|
||||||
|
'promote',
|
||||||
|
{ assetId: asset.id },
|
||||||
|
{ jobId: `promote:${asset.id}`, removeOnComplete: true, removeOnFail: 50 }
|
||||||
|
);
|
||||||
|
console.log(
|
||||||
|
`[promotion-scanner] auto-promoting ${asset.filename} (${asset.id}) — idle ≥ ${delaySeconds}s`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[promotion-scanner] scan failed:', err.message);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Kick off and then run on an interval. Unref so it never keeps the process
|
||||||
|
// alive on its own during shutdown.
|
||||||
|
const timer = setInterval(scanOnce, SCAN_INTERVAL_MS);
|
||||||
|
timer.unref?.();
|
||||||
|
// First scan shortly after boot (not instantly — let DB/redis settle).
|
||||||
|
setTimeout(scanOnce, 5000).unref?.();
|
||||||
|
|
||||||
|
console.log(
|
||||||
|
`[promotion-scanner] started — interval ${SCAN_INTERVAL_MS}ms (delay from settings.growing_promote_after_seconds)`
|
||||||
|
);
|
||||||
|
|
||||||
|
return { promotionQueue, stop: () => clearInterval(timer) };
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue