feat: add AMPP placement background worker — polls jobs, auto-places assets by folder ID
This commit is contained in:
parent
cdcb668e74
commit
5fe3df60b5
1 changed files with 190 additions and 0 deletions
190
lib/ampp-placement-worker.js
Normal file
190
lib/ampp-placement-worker.js
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
"use strict";
|
||||
|
||||
const { placeAssetInFolderById } = require("./ampp-folder-placer");
|
||||
|
||||
// ================================================================
|
||||
// AMPP Placement Worker — lib/ampp-placement-worker.js
|
||||
//
|
||||
// Runs inside DragonWind as a background setInterval loop.
|
||||
// Every POLL_INTERVAL_MS it:
|
||||
// 1. Finds pending placements in db.pendingPlacements
|
||||
// 2. Fetches recent completed AMPP ingest jobs
|
||||
// 3. Matches jobs to placements by filename (broad field search)
|
||||
// 4. Calls placeAssetInFolderById() with the known folder ID
|
||||
// 5. Updates placement status in db
|
||||
//
|
||||
// Job field matching:
|
||||
// AMPP job field names vary by version. The worker searches ALL
|
||||
// string values in the job object for a match against the filename.
|
||||
// It also logs the full key list of the first matched job so you
|
||||
// can identify the exact field name and configure ASSET_ID_FIELDS.
|
||||
//
|
||||
// Tune ASSET_ID_FIELDS below once you know your AMPP's field names.
|
||||
// ================================================================
|
||||
|
||||
// Fields to check for the asset:id — in priority order
|
||||
const ASSET_ID_FIELDS = [
|
||||
"asset:id",
|
||||
"assetId",
|
||||
"job:assetId",
|
||||
"asset:uuid",
|
||||
"id",
|
||||
];
|
||||
|
||||
// How old a pending placement can be before we give up on it (24h)
|
||||
const EXPIRY_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
// Default poll interval (overridable via start())
|
||||
const DEFAULT_POLL_MS = 30_000;
|
||||
|
||||
class AmppPlacementWorker {
|
||||
/**
|
||||
* @param {object} opts
|
||||
* @param {() => string} opts.getAmppBase — returns current AMPP base URL
|
||||
* @param {() => Promise<string>} opts.getAmppToken — returns valid Bearer token
|
||||
* @param {object} opts.db — shared db object (mutated in place)
|
||||
* @param {() => void} opts.saveData — persists db to disk
|
||||
*/
|
||||
constructor({ getAmppBase, getAmppToken, db, saveData }) {
|
||||
this._getAmppBase = getAmppBase;
|
||||
this._getAmppToken = getAmppToken;
|
||||
this._db = db;
|
||||
this._saveData = saveData;
|
||||
this._timer = null;
|
||||
this._busy = false;
|
||||
}
|
||||
|
||||
start(intervalMs = DEFAULT_POLL_MS) {
|
||||
if (this._timer) return; // already running
|
||||
this._timer = setInterval(() => this._tick(), intervalMs);
|
||||
console.log(`[placement-worker] Started — polling every ${intervalMs / 1000}s`);
|
||||
// Run one cycle immediately so the first upload doesn't wait a full interval
|
||||
setImmediate(() => this._tick());
|
||||
}
|
||||
|
||||
stop() {
|
||||
if (this._timer) { clearInterval(this._timer); this._timer = null; }
|
||||
console.log("[placement-worker] Stopped");
|
||||
}
|
||||
|
||||
// ── Main poll cycle ────────────────────────────────────────────────────────
|
||||
async _tick() {
|
||||
if (this._busy) return; // skip overlapping runs
|
||||
this._busy = true;
|
||||
try {
|
||||
await this._processPending();
|
||||
} catch (err) {
|
||||
console.error("[placement-worker] Tick error:", err.message);
|
||||
} finally {
|
||||
this._busy = false;
|
||||
}
|
||||
}
|
||||
|
||||
async _processPending() {
|
||||
const pending = (this._db.pendingPlacements || []).filter(p => p.status === "waiting");
|
||||
|
||||
// Expire stale placements first (before any API calls)
|
||||
let dirty = false;
|
||||
for (const p of pending) {
|
||||
if (Date.now() - new Date(p.createdAt).getTime() > EXPIRY_MS) {
|
||||
p.status = "expired";
|
||||
console.warn(`[placement-worker] Expired: ${p.filename} (created ${p.createdAt})`);
|
||||
dirty = true;
|
||||
}
|
||||
}
|
||||
if (dirty) this._saveData(this._db);
|
||||
|
||||
const active = pending.filter(p => p.status === "waiting");
|
||||
if (!active.length) return;
|
||||
|
||||
console.log(`[placement-worker] ${active.length} pending placement(s) — checking AMPP jobs…`);
|
||||
|
||||
let token, jobs;
|
||||
try {
|
||||
token = await this._getAmppToken();
|
||||
jobs = await this._fetchRecentJobs(token);
|
||||
} catch (err) {
|
||||
console.error("[placement-worker] Could not reach AMPP:", err.message);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[placement-worker] ${jobs.length} recent AMPP job(s) fetched`);
|
||||
|
||||
// Log keys of first job to help identify field names (only once per session)
|
||||
if (jobs.length > 0 && !this._loggedJobKeys) {
|
||||
console.log("[placement-worker] Sample job keys:", Object.keys(jobs[0]));
|
||||
console.log("[placement-worker] Sample job (first 800 chars):", JSON.stringify(jobs[0]).slice(0, 800));
|
||||
this._loggedJobKeys = true;
|
||||
}
|
||||
|
||||
let changed = false;
|
||||
for (const placement of active) {
|
||||
const job = this._matchJob(jobs, placement.filename);
|
||||
if (!job) continue;
|
||||
|
||||
const assetId = this._extractAssetId(job);
|
||||
if (!assetId) {
|
||||
console.warn(`[placement-worker] Matched job for '${placement.filename}' but could not extract asset:id. Job keys: ${Object.keys(job).join(", ")}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
console.log(`[placement-worker] Placing '${placement.filename}' → folder ${placement.amppFolderId} (asset ${assetId})`);
|
||||
try {
|
||||
await placeAssetInFolderById(this._getAmppBase(), assetId, placement.amppFolderId, token);
|
||||
placement.status = "placed";
|
||||
placement.assetId = assetId;
|
||||
placement.placedAt = new Date().toISOString();
|
||||
console.log(`[placement-worker] ✓ Placed '${placement.filename}' in folder '${placement.amppFolderName}'`);
|
||||
} catch (err) {
|
||||
placement.status = "failed";
|
||||
placement.error = err.message;
|
||||
console.error(`[placement-worker] ✗ Failed to place '${placement.filename}':`, err.message);
|
||||
}
|
||||
changed = true;
|
||||
}
|
||||
|
||||
if (changed) this._saveData(this._db);
|
||||
}
|
||||
|
||||
// ── Fetch recent AMPP jobs ─────────────────────────────────────────────────
|
||||
async _fetchRecentJobs(token) {
|
||||
const base = this._getAmppBase();
|
||||
const url = `${base}/api/v1/queue/job/jobs/querypage?skip=0&limit=200&sort=created:dateTime&asc=false`;
|
||||
const r = await fetch(url, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}` },
|
||||
body: "{}",
|
||||
signal: AbortSignal.timeout(15000),
|
||||
});
|
||||
if (!r.ok) throw new Error(`AMPP jobs query returned HTTP ${r.status}`);
|
||||
const data = await r.json();
|
||||
return Array.isArray(data) ? data : (data?.items ?? data?.results ?? []);
|
||||
}
|
||||
|
||||
// ── Match a job to a filename ──────────────────────────────────────────────
|
||||
// Searches ALL string values in the job object — broad but reliable while
|
||||
// we don't yet know the exact field name. Once confirmed, you can tighten
|
||||
// this to specific fields for performance.
|
||||
_matchJob(jobs, filename) {
|
||||
const lc = filename.toLowerCase();
|
||||
return jobs.find(job => {
|
||||
return Object.values(job).some(v => {
|
||||
if (typeof v !== "string") return false;
|
||||
const vl = v.toLowerCase();
|
||||
// Match exact filename or filename at end of a path/key
|
||||
return vl === lc || vl.endsWith("/" + lc) || vl.endsWith("\\" + lc);
|
||||
});
|
||||
}) ?? null;
|
||||
}
|
||||
|
||||
// ── Extract asset ID from a job ────────────────────────────────────────────
|
||||
_extractAssetId(job) {
|
||||
for (const field of ASSET_ID_FIELDS) {
|
||||
const v = job[field];
|
||||
if (v && typeof v === "string") return v.trim();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = AmppPlacementWorker;
|
||||
Loading…
Reference in a new issue