// Promotion worker — scans the growing-files SMB landing zone for stable // captures (no mtime change for N seconds) and uploads them to S3, flipping // the matching asset's status from 'growing' to 'ready' so the Premiere // panel can relink to the proxy/hi-res URLs. // // Why a poll loop and not chokidar: NFS/SMB mounts don't reliably surface // inotify events through the kernel; mtime polling is the boring-but-works // answer for fairness across all storage backends. import { readdir, stat, unlink, readFile } from 'node:fs/promises'; import { join, relative, basename } from 'node:path'; import { createReadStream } from 'node:fs'; import { query } from '../db/client.js'; import { uploadStreamToS3 } from '../s3/client.js'; const GROWING_PATH = process.env.GROWING_PATH || '/growing'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; const POLL_MS = 5000; let inflight = new Set(); let idleThresholdMs = 8000; async function loadIdleThreshold() { try { const r = await query( `SELECT value FROM settings WHERE key = 'growing_promote_after_seconds'` ); const sec = parseInt(r.rows[0]?.value, 10); if (sec > 0) idleThresholdMs = sec * 1000; } catch (_) { /* table not migrated yet — keep default */ } } async function* walk(dir) { let entries = []; try { entries = await readdir(dir, { withFileTypes: true }); } catch (_) { return; } for (const e of entries) { const full = join(dir, e.name); if (e.isDirectory()) yield* walk(full); else if (e.isFile()) yield full; } } async function promote(filePath) { if (inflight.has(filePath)) return; inflight.add(filePath); try { // Reconstruct the S3 key from the relative path under GROWING_PATH. // Capture writes `${GROWING_PATH}/${projectId}/${clipName}.${ext}`, which // mirrors `projects/${projectId}/masters/${clipName}.${ext}` in S3. const rel = relative(GROWING_PATH, filePath); // /. const [projectId, fileName] = rel.split('/', 2); if (!projectId || !fileName) return; const s3Key = `projects/${projectId}/masters/${fileName}`; // Find the matching live asset by display_name = clipName. const clipName = basename(fileName, '.' + fileName.split('.').pop()); const r = await query( `SELECT id, status FROM assets WHERE project_id = $1 AND display_name = $2 ORDER BY created_at DESC LIMIT 1`, [projectId, clipName] ); if (r.rows.length === 0) { console.warn(`[promotion] no asset row for ${rel} — skipping`); return; } const asset = r.rows[0]; const st = await stat(filePath); console.log(`[promotion] uploading ${rel} (${st.size} bytes) -> s3://${S3_BUCKET}/${s3Key}`); await uploadStreamToS3(S3_BUCKET, s3Key, createReadStream(filePath)); await query( `UPDATE assets SET original_s3_key = $1, file_size = $2, status = 'ready', updated_at = NOW() WHERE id = $3`, [s3Key, st.size, asset.id] ); // Queue the proxy job so the editor gets a browser-playable proxy and // the panel's "relink to hi-res" path becomes available. const { Queue } = await import('bullmq'); const { hostname, port } = new URL(process.env.REDIS_URL || 'redis://queue:6379'); const proxyQueue = new Queue('proxy', { connection: { host: hostname, port: parseInt(port, 10) || 6379 }, }); await proxyQueue.add('generate', { assetId: asset.id, inputKey: s3Key, outputKey: `proxies/${asset.id}.mp4`, }); await proxyQueue.close(); await unlink(filePath).catch(err => { console.warn(`[promotion] could not unlink ${rel}: ${err.message}`); }); console.log(`[promotion] asset ${asset.id} promoted, proxy queued, local file removed`); } catch (err) { console.error('[promotion] failed for', filePath, err); } finally { inflight.delete(filePath); } } async function scan() { const now = Date.now(); for await (const file of walk(GROWING_PATH)) { if (inflight.has(file)) continue; let st; try { st = await stat(file); } catch (_) { continue; } if (now - st.mtimeMs >= idleThresholdMs && st.size > 0) { promote(file); } } } export function startPromotionWorker() { loadIdleThreshold(); setInterval(loadIdleThreshold, 60_000); setInterval(scan, POLL_MS); console.log(`[promotion] watching ${GROWING_PATH} (idle threshold ${idleThresholdMs}ms)`); }