195 lines
7.7 KiB
JavaScript
195 lines
7.7 KiB
JavaScript
"use strict";
|
|
|
|
const { placeAssetInFolderById, getOrCreateFolderByPath } = require("./ampp-folder-placer");
|
|
|
|
// ================================================================
|
|
// AMPP Placement Worker — lib/ampp-placement-worker.js
|
|
//
|
|
// Runs inside DragonWind as a background setInterval loop.
|
|
// Every POLL_INTERVAL_MS it:
|
|
// 1. Finds pending placements in db.pendingPlacements
|
|
// 2. Fetches recent completed AMPP ingest jobs
|
|
// 3. Matches jobs to placements by filename (broad field search)
|
|
// 4. Calls placeAssetInFolderById() with the known folder ID
|
|
// 5. Updates placement status in db
|
|
//
|
|
// Job field matching:
|
|
// AMPP job field names vary by version. The worker searches ALL
|
|
// string values in the job object for a match against the filename.
|
|
// It also logs the full key list of the first matched job so you
|
|
// can identify the exact field name and configure ASSET_ID_FIELDS.
|
|
//
|
|
// Tune ASSET_ID_FIELDS below once you know your AMPP's field names.
|
|
// ================================================================
|
|
|
|
// Fields to check for the asset:id — in priority order
|
|
const ASSET_ID_FIELDS = [
|
|
"asset:id",
|
|
"assetId",
|
|
"job:assetId",
|
|
"asset:uuid",
|
|
"id",
|
|
];
|
|
|
|
// How old a pending placement can be before we give up on it (24h)
|
|
const EXPIRY_MS = 24 * 60 * 60 * 1000;
|
|
|
|
// Default poll interval (overridable via start())
|
|
const DEFAULT_POLL_MS = 30_000;
|
|
|
|
class AmppPlacementWorker {
|
|
/**
|
|
* @param {object} opts
|
|
* @param {() => string} opts.getAmppBase — returns current AMPP base URL
|
|
* @param {() => Promise<string>} opts.getAmppToken — returns valid Bearer token
|
|
* @param {object} opts.db — shared db object (mutated in place)
|
|
* @param {() => void} opts.saveData — persists db to disk
|
|
*/
|
|
constructor({ getAmppBase, getAmppToken, db, saveData }) {
|
|
this._getAmppBase = getAmppBase;
|
|
this._getAmppToken = getAmppToken;
|
|
this._db = db;
|
|
this._saveData = saveData;
|
|
this._timer = null;
|
|
this._busy = false;
|
|
}
|
|
|
|
start(intervalMs = DEFAULT_POLL_MS) {
|
|
if (this._timer) return; // already running
|
|
this._timer = setInterval(() => this._tick(), intervalMs);
|
|
console.log(`[placement-worker] Started — polling every ${intervalMs / 1000}s`);
|
|
// Run one cycle immediately so the first upload doesn't wait a full interval
|
|
setImmediate(() => this._tick());
|
|
}
|
|
|
|
stop() {
|
|
if (this._timer) { clearInterval(this._timer); this._timer = null; }
|
|
console.log("[placement-worker] Stopped");
|
|
}
|
|
|
|
// ── Main poll cycle ────────────────────────────────────────────────────────
|
|
async _tick() {
|
|
if (this._busy) return; // skip overlapping runs
|
|
this._busy = true;
|
|
try {
|
|
await this._processPending();
|
|
} catch (err) {
|
|
console.error("[placement-worker] Tick error:", err.message);
|
|
} finally {
|
|
this._busy = false;
|
|
}
|
|
}
|
|
|
|
async _processPending() {
|
|
const pending = (this._db.pendingPlacements || []).filter(p => p.status === "waiting");
|
|
|
|
// Expire stale placements first (before any API calls)
|
|
let dirty = false;
|
|
for (const p of pending) {
|
|
if (Date.now() - new Date(p.createdAt).getTime() > EXPIRY_MS) {
|
|
p.status = "expired";
|
|
console.warn(`[placement-worker] Expired: ${p.filename} (created ${p.createdAt})`);
|
|
dirty = true;
|
|
}
|
|
}
|
|
if (dirty) this._saveData(this._db);
|
|
|
|
const active = pending.filter(p => p.status === "waiting");
|
|
if (!active.length) return;
|
|
|
|
console.log(`[placement-worker] ${active.length} pending placement(s) — checking AMPP jobs…`);
|
|
|
|
let token, jobs;
|
|
try {
|
|
token = await this._getAmppToken();
|
|
jobs = await this._fetchRecentJobs(token);
|
|
} catch (err) {
|
|
console.error("[placement-worker] Could not reach AMPP:", err.message);
|
|
return;
|
|
}
|
|
|
|
console.log(`[placement-worker] ${jobs.length} recent AMPP job(s) fetched`);
|
|
|
|
// Log keys of first job to help identify field names (only once per session)
|
|
if (jobs.length > 0 && !this._loggedJobKeys) {
|
|
console.log("[placement-worker] Sample job keys:", Object.keys(jobs[0]));
|
|
console.log("[placement-worker] Sample job (first 800 chars):", JSON.stringify(jobs[0]).slice(0, 800));
|
|
this._loggedJobKeys = true;
|
|
}
|
|
|
|
let changed = false;
|
|
for (const placement of active) {
|
|
const job = this._matchJob(jobs, placement.filename);
|
|
if (!job) continue;
|
|
|
|
const assetId = this._extractAssetId(job);
|
|
if (!assetId) {
|
|
console.warn(`[placement-worker] Matched job for '${placement.filename}' but could not extract asset:id. Job keys: ${Object.keys(job).join(", ")}`);
|
|
continue;
|
|
}
|
|
|
|
let targetFolderId = placement.amppFolderId;
|
|
if (!targetFolderId && placement.amppFolderName) {
|
|
try { targetFolderId = await getOrCreateFolderByPath(this._getAmppBase(), placement.amppFolderName, token); }
|
|
catch(e) { placement.status='failed'; placement.error=e.message; changed=true; continue; }
|
|
}
|
|
console.log(`[placement-worker] Placing '${placement.filename}' → folder ${targetFolderId} (asset ${assetId})`);
|
|
try {
|
|
await placeAssetInFolderById(this._getAmppBase(), assetId, targetFolderId, token);
|
|
placement.status = "placed";
|
|
placement.assetId = assetId;
|
|
placement.placedAt = new Date().toISOString();
|
|
console.log(`[placement-worker] ✓ Placed '${placement.filename}' in folder '${placement.amppFolderName}'`);
|
|
} catch (err) {
|
|
placement.status = "failed";
|
|
placement.error = err.message;
|
|
console.error(`[placement-worker] ✗ Failed to place '${placement.filename}':`, err.message);
|
|
}
|
|
changed = true;
|
|
}
|
|
|
|
if (changed) this._saveData(this._db);
|
|
}
|
|
|
|
// ── Fetch recent AMPP jobs ─────────────────────────────────────────────────
|
|
async _fetchRecentJobs(token) {
|
|
const base = this._getAmppBase();
|
|
const url = `${base}/api/v1/queue/job/jobs/querypage?skip=0&limit=200&sort=created:dateTime&asc=false`;
|
|
const r = await fetch(url, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}` },
|
|
body: "{}",
|
|
signal: AbortSignal.timeout(15000),
|
|
});
|
|
if (!r.ok) throw new Error(`AMPP jobs query returned HTTP ${r.status}`);
|
|
const data = await r.json();
|
|
return Array.isArray(data) ? data : (data?.items ?? data?.results ?? []);
|
|
}
|
|
|
|
// ── Match a job to a filename ──────────────────────────────────────────────
|
|
// Searches ALL string values in the job object — broad but reliable while
|
|
// we don't yet know the exact field name. Once confirmed, you can tighten
|
|
// this to specific fields for performance.
|
|
_matchJob(jobs, filename) {
|
|
const lc = filename.toLowerCase();
|
|
return jobs.find(job => {
|
|
return Object.values(job).some(v => {
|
|
if (typeof v !== "string") return false;
|
|
const vl = v.toLowerCase();
|
|
// Match exact filename or filename at end of a path/key
|
|
return vl === lc || vl.endsWith("/" + lc) || vl.endsWith("\\" + lc);
|
|
});
|
|
}) ?? null;
|
|
}
|
|
|
|
// ── Extract asset ID from a job ────────────────────────────────────────────
|
|
_extractAssetId(job) {
|
|
for (const field of ASSET_ID_FIELDS) {
|
|
const v = job[field];
|
|
if (v && typeof v === "string") return v.trim();
|
|
}
|
|
return null;
|
|
}
|
|
}
|
|
|
|
module.exports = AmppPlacementWorker;
|