From 5fe3df60b54df535b03d71782692fd337a6f02c2 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Thu, 30 Apr 2026 17:38:22 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20add=20AMPP=20placement=20background=20w?= =?UTF-8?q?orker=20=E2=80=94=20polls=20jobs,=20auto-places=20assets=20by?= =?UTF-8?q?=20folder=20ID?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/ampp-placement-worker.js | 190 +++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 lib/ampp-placement-worker.js diff --git a/lib/ampp-placement-worker.js b/lib/ampp-placement-worker.js new file mode 100644 index 0000000..0409274 --- /dev/null +++ b/lib/ampp-placement-worker.js @@ -0,0 +1,190 @@ +"use strict"; + +const { placeAssetInFolderById } = require("./ampp-folder-placer"); + +// ================================================================ +// AMPP Placement Worker — lib/ampp-placement-worker.js +// +// Runs inside DragonWind as a background setInterval loop. +// Every POLL_INTERVAL_MS it: +// 1. Finds pending placements in db.pendingPlacements +// 2. Fetches recent completed AMPP ingest jobs +// 3. Matches jobs to placements by filename (broad field search) +// 4. Calls placeAssetInFolderById() with the known folder ID +// 5. Updates placement status in db +// +// Job field matching: +// AMPP job field names vary by version. The worker searches ALL +// string values in the job object for a match against the filename. +// It also logs the full key list of the first matched job so you +// can identify the exact field name and configure ASSET_ID_FIELDS. +// +// Tune ASSET_ID_FIELDS below once you know your AMPP's field names. +// ================================================================ + +// Fields to check for the asset:id — in priority order +const ASSET_ID_FIELDS = [ + "asset:id", + "assetId", + "job:assetId", + "asset:uuid", + "id", +]; + +// How old a pending placement can be before we give up on it (24h) +const EXPIRY_MS = 24 * 60 * 60 * 1000; + +// Default poll interval (overridable via start()) +const DEFAULT_POLL_MS = 30_000; + +class AmppPlacementWorker { + /** + * @param {object} opts + * @param {() => string} opts.getAmppBase — returns current AMPP base URL + * @param {() => Promise} opts.getAmppToken — returns valid Bearer token + * @param {object} opts.db — shared db object (mutated in place) + * @param {() => void} opts.saveData — persists db to disk + */ + constructor({ getAmppBase, getAmppToken, db, saveData }) { + this._getAmppBase = getAmppBase; + this._getAmppToken = getAmppToken; + this._db = db; + this._saveData = saveData; + this._timer = null; + this._busy = false; + } + + start(intervalMs = DEFAULT_POLL_MS) { + if (this._timer) return; // already running + this._timer = setInterval(() => this._tick(), intervalMs); + console.log(`[placement-worker] Started — polling every ${intervalMs / 1000}s`); + // Run one cycle immediately so the first upload doesn't wait a full interval + setImmediate(() => this._tick()); + } + + stop() { + if (this._timer) { clearInterval(this._timer); this._timer = null; } + console.log("[placement-worker] Stopped"); + } + + // ── Main poll cycle ──────────────────────────────────────────────────────── + async _tick() { + if (this._busy) return; // skip overlapping runs + this._busy = true; + try { + await this._processPending(); + } catch (err) { + console.error("[placement-worker] Tick error:", err.message); + } finally { + this._busy = false; + } + } + + async _processPending() { + const pending = (this._db.pendingPlacements || []).filter(p => p.status === "waiting"); + + // Expire stale placements first (before any API calls) + let dirty = false; + for (const p of pending) { + if (Date.now() - new Date(p.createdAt).getTime() > EXPIRY_MS) { + p.status = "expired"; + console.warn(`[placement-worker] Expired: ${p.filename} (created ${p.createdAt})`); + dirty = true; + } + } + if (dirty) this._saveData(this._db); + + const active = pending.filter(p => p.status === "waiting"); + if (!active.length) return; + + console.log(`[placement-worker] ${active.length} pending placement(s) — checking AMPP jobs…`); + + let token, jobs; + try { + token = await this._getAmppToken(); + jobs = await this._fetchRecentJobs(token); + } catch (err) { + console.error("[placement-worker] Could not reach AMPP:", err.message); + return; + } + + console.log(`[placement-worker] ${jobs.length} recent AMPP job(s) fetched`); + + // Log keys of first job to help identify field names (only once per session) + if (jobs.length > 0 && !this._loggedJobKeys) { + console.log("[placement-worker] Sample job keys:", Object.keys(jobs[0])); + console.log("[placement-worker] Sample job (first 800 chars):", JSON.stringify(jobs[0]).slice(0, 800)); + this._loggedJobKeys = true; + } + + let changed = false; + for (const placement of active) { + const job = this._matchJob(jobs, placement.filename); + if (!job) continue; + + const assetId = this._extractAssetId(job); + if (!assetId) { + console.warn(`[placement-worker] Matched job for '${placement.filename}' but could not extract asset:id. Job keys: ${Object.keys(job).join(", ")}`); + continue; + } + + console.log(`[placement-worker] Placing '${placement.filename}' → folder ${placement.amppFolderId} (asset ${assetId})`); + try { + await placeAssetInFolderById(this._getAmppBase(), assetId, placement.amppFolderId, token); + placement.status = "placed"; + placement.assetId = assetId; + placement.placedAt = new Date().toISOString(); + console.log(`[placement-worker] ✓ Placed '${placement.filename}' in folder '${placement.amppFolderName}'`); + } catch (err) { + placement.status = "failed"; + placement.error = err.message; + console.error(`[placement-worker] ✗ Failed to place '${placement.filename}':`, err.message); + } + changed = true; + } + + if (changed) this._saveData(this._db); + } + + // ── Fetch recent AMPP jobs ───────────────────────────────────────────────── + async _fetchRecentJobs(token) { + const base = this._getAmppBase(); + const url = `${base}/api/v1/queue/job/jobs/querypage?skip=0&limit=200&sort=created:dateTime&asc=false`; + const r = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}` }, + body: "{}", + signal: AbortSignal.timeout(15000), + }); + if (!r.ok) throw new Error(`AMPP jobs query returned HTTP ${r.status}`); + const data = await r.json(); + return Array.isArray(data) ? data : (data?.items ?? data?.results ?? []); + } + + // ── Match a job to a filename ────────────────────────────────────────────── + // Searches ALL string values in the job object — broad but reliable while + // we don't yet know the exact field name. Once confirmed, you can tighten + // this to specific fields for performance. + _matchJob(jobs, filename) { + const lc = filename.toLowerCase(); + return jobs.find(job => { + return Object.values(job).some(v => { + if (typeof v !== "string") return false; + const vl = v.toLowerCase(); + // Match exact filename or filename at end of a path/key + return vl === lc || vl.endsWith("/" + lc) || vl.endsWith("\\" + lc); + }); + }) ?? null; + } + + // ── Extract asset ID from a job ──────────────────────────────────────────── + _extractAssetId(job) { + for (const field of ASSET_ID_FIELDS) { + const v = job[field]; + if (v && typeof v === "string") return v.trim(); + } + return null; + } +} + +module.exports = AmppPlacementWorker;