"use strict"; const { placeAssetInFolderById } = require("./ampp-folder-placer"); // ================================================================ // AMPP Placement Worker — lib/ampp-placement-worker.js // // Runs inside DragonWind as a background setInterval loop. // Every POLL_INTERVAL_MS it: // 1. Finds pending placements in db.pendingPlacements // 2. Fetches recent completed AMPP ingest jobs // 3. Matches jobs to placements by filename (broad field search) // 4. Calls placeAssetInFolderById() with the known folder ID // 5. Updates placement status in db // // Job field matching: // AMPP job field names vary by version. The worker searches ALL // string values in the job object for a match against the filename. // It also logs the full key list of the first matched job so you // can identify the exact field name and configure ASSET_ID_FIELDS. // // Tune ASSET_ID_FIELDS below once you know your AMPP's field names. // ================================================================ // Fields to check for the asset:id — in priority order const ASSET_ID_FIELDS = [ "asset:id", "assetId", "job:assetId", "asset:uuid", "id", ]; // How old a pending placement can be before we give up on it (24h) const EXPIRY_MS = 24 * 60 * 60 * 1000; // Default poll interval (overridable via start()) const DEFAULT_POLL_MS = 30_000; class AmppPlacementWorker { /** * @param {object} opts * @param {() => string} opts.getAmppBase — returns current AMPP base URL * @param {() => Promise} opts.getAmppToken — returns valid Bearer token * @param {object} opts.db — shared db object (mutated in place) * @param {() => void} opts.saveData — persists db to disk */ constructor({ getAmppBase, getAmppToken, db, saveData }) { this._getAmppBase = getAmppBase; this._getAmppToken = getAmppToken; this._db = db; this._saveData = saveData; this._timer = null; this._busy = false; } start(intervalMs = DEFAULT_POLL_MS) { if (this._timer) return; // already running this._timer = setInterval(() => this._tick(), intervalMs); console.log(`[placement-worker] Started — polling every ${intervalMs / 1000}s`); // Run one cycle immediately so the first upload doesn't wait a full interval setImmediate(() => this._tick()); } stop() { if (this._timer) { clearInterval(this._timer); this._timer = null; } console.log("[placement-worker] Stopped"); } // ── Main poll cycle ──────────────────────────────────────────────────────── async _tick() { if (this._busy) return; // skip overlapping runs this._busy = true; try { await this._processPending(); } catch (err) { console.error("[placement-worker] Tick error:", err.message); } finally { this._busy = false; } } async _processPending() { const pending = (this._db.pendingPlacements || []).filter(p => p.status === "waiting"); // Expire stale placements first (before any API calls) let dirty = false; for (const p of pending) { if (Date.now() - new Date(p.createdAt).getTime() > EXPIRY_MS) { p.status = "expired"; console.warn(`[placement-worker] Expired: ${p.filename} (created ${p.createdAt})`); dirty = true; } } if (dirty) this._saveData(this._db); const active = pending.filter(p => p.status === "waiting"); if (!active.length) return; console.log(`[placement-worker] ${active.length} pending placement(s) — checking AMPP jobs…`); let token, jobs; try { token = await this._getAmppToken(); jobs = await this._fetchRecentJobs(token); } catch (err) { console.error("[placement-worker] Could not reach AMPP:", err.message); return; } console.log(`[placement-worker] ${jobs.length} recent AMPP job(s) fetched`); // Log keys of first job to help identify field names (only once per session) if (jobs.length > 0 && !this._loggedJobKeys) { console.log("[placement-worker] Sample job keys:", Object.keys(jobs[0])); console.log("[placement-worker] Sample job (first 800 chars):", JSON.stringify(jobs[0]).slice(0, 800)); this._loggedJobKeys = true; } let changed = false; for (const placement of active) { const job = this._matchJob(jobs, placement.filename); if (!job) continue; const assetId = this._extractAssetId(job); if (!assetId) { console.warn(`[placement-worker] Matched job for '${placement.filename}' but could not extract asset:id. Job keys: ${Object.keys(job).join(", ")}`); continue; } console.log(`[placement-worker] Placing '${placement.filename}' → folder ${placement.amppFolderId} (asset ${assetId})`); try { await placeAssetInFolderById(this._getAmppBase(), assetId, placement.amppFolderId, token); placement.status = "placed"; placement.assetId = assetId; placement.placedAt = new Date().toISOString(); console.log(`[placement-worker] ✓ Placed '${placement.filename}' in folder '${placement.amppFolderName}'`); } catch (err) { placement.status = "failed"; placement.error = err.message; console.error(`[placement-worker] ✗ Failed to place '${placement.filename}':`, err.message); } changed = true; } if (changed) this._saveData(this._db); } // ── Fetch recent AMPP jobs ───────────────────────────────────────────────── async _fetchRecentJobs(token) { const base = this._getAmppBase(); const url = `${base}/api/v1/queue/job/jobs/querypage?skip=0&limit=200&sort=created:dateTime&asc=false`; const r = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}` }, body: "{}", signal: AbortSignal.timeout(15000), }); if (!r.ok) throw new Error(`AMPP jobs query returned HTTP ${r.status}`); const data = await r.json(); return Array.isArray(data) ? data : (data?.items ?? data?.results ?? []); } // ── Match a job to a filename ────────────────────────────────────────────── // Searches ALL string values in the job object — broad but reliable while // we don't yet know the exact field name. Once confirmed, you can tighten // this to specific fields for performance. _matchJob(jobs, filename) { const lc = filename.toLowerCase(); return jobs.find(job => { return Object.values(job).some(v => { if (typeof v !== "string") return false; const vl = v.toLowerCase(); // Match exact filename or filename at end of a path/key return vl === lc || vl.endsWith("/" + lc) || vl.endsWith("\\" + lc); }); }) ?? null; } // ── Extract asset ID from a job ──────────────────────────────────────────── _extractAssetId(job) { for (const field of ASSET_ID_FIELDS) { const v = job[field]; if (v && typeof v === "string") return v.trim(); } return null; } } module.exports = AmppPlacementWorker;