DragonWind/lib/ampp-placement-worker.js

190 lines
7.4 KiB
JavaScript

"use strict";
const { placeAssetInFolderById } = require("./ampp-folder-placer");
// ================================================================
// AMPP Placement Worker — lib/ampp-placement-worker.js
//
// Runs inside DragonWind as a background setInterval loop.
// Every POLL_INTERVAL_MS it:
// 1. Finds pending placements in db.pendingPlacements
// 2. Fetches recent completed AMPP ingest jobs
// 3. Matches jobs to placements by filename (broad field search)
// 4. Calls placeAssetInFolderById() with the known folder ID
// 5. Updates placement status in db
//
// Job field matching:
// AMPP job field names vary by version. The worker searches ALL
// string values in the job object for a match against the filename.
// It also logs the full key list of the first matched job so you
// can identify the exact field name and configure ASSET_ID_FIELDS.
//
// Tune ASSET_ID_FIELDS below once you know your AMPP's field names.
// ================================================================
// Fields to check for the asset:id — in priority order
const ASSET_ID_FIELDS = [
"asset:id",
"assetId",
"job:assetId",
"asset:uuid",
"id",
];
// How old a pending placement can be before we give up on it (24h)
const EXPIRY_MS = 24 * 60 * 60 * 1000;
// Default poll interval (overridable via start())
const DEFAULT_POLL_MS = 30_000;
class AmppPlacementWorker {
/**
* @param {object} opts
* @param {() => string} opts.getAmppBase — returns current AMPP base URL
* @param {() => Promise<string>} opts.getAmppToken — returns valid Bearer token
* @param {object} opts.db — shared db object (mutated in place)
* @param {() => void} opts.saveData — persists db to disk
*/
constructor({ getAmppBase, getAmppToken, db, saveData }) {
this._getAmppBase = getAmppBase;
this._getAmppToken = getAmppToken;
this._db = db;
this._saveData = saveData;
this._timer = null;
this._busy = false;
}
start(intervalMs = DEFAULT_POLL_MS) {
if (this._timer) return; // already running
this._timer = setInterval(() => this._tick(), intervalMs);
console.log(`[placement-worker] Started — polling every ${intervalMs / 1000}s`);
// Run one cycle immediately so the first upload doesn't wait a full interval
setImmediate(() => this._tick());
}
stop() {
if (this._timer) { clearInterval(this._timer); this._timer = null; }
console.log("[placement-worker] Stopped");
}
// ── Main poll cycle ────────────────────────────────────────────────────────
async _tick() {
if (this._busy) return; // skip overlapping runs
this._busy = true;
try {
await this._processPending();
} catch (err) {
console.error("[placement-worker] Tick error:", err.message);
} finally {
this._busy = false;
}
}
async _processPending() {
const pending = (this._db.pendingPlacements || []).filter(p => p.status === "waiting");
// Expire stale placements first (before any API calls)
let dirty = false;
for (const p of pending) {
if (Date.now() - new Date(p.createdAt).getTime() > EXPIRY_MS) {
p.status = "expired";
console.warn(`[placement-worker] Expired: ${p.filename} (created ${p.createdAt})`);
dirty = true;
}
}
if (dirty) this._saveData(this._db);
const active = pending.filter(p => p.status === "waiting");
if (!active.length) return;
console.log(`[placement-worker] ${active.length} pending placement(s) — checking AMPP jobs…`);
let token, jobs;
try {
token = await this._getAmppToken();
jobs = await this._fetchRecentJobs(token);
} catch (err) {
console.error("[placement-worker] Could not reach AMPP:", err.message);
return;
}
console.log(`[placement-worker] ${jobs.length} recent AMPP job(s) fetched`);
// Log keys of first job to help identify field names (only once per session)
if (jobs.length > 0 && !this._loggedJobKeys) {
console.log("[placement-worker] Sample job keys:", Object.keys(jobs[0]));
console.log("[placement-worker] Sample job (first 800 chars):", JSON.stringify(jobs[0]).slice(0, 800));
this._loggedJobKeys = true;
}
let changed = false;
for (const placement of active) {
const job = this._matchJob(jobs, placement.filename);
if (!job) continue;
const assetId = this._extractAssetId(job);
if (!assetId) {
console.warn(`[placement-worker] Matched job for '${placement.filename}' but could not extract asset:id. Job keys: ${Object.keys(job).join(", ")}`);
continue;
}
console.log(`[placement-worker] Placing '${placement.filename}' → folder ${placement.amppFolderId} (asset ${assetId})`);
try {
await placeAssetInFolderById(this._getAmppBase(), assetId, placement.amppFolderId, token);
placement.status = "placed";
placement.assetId = assetId;
placement.placedAt = new Date().toISOString();
console.log(`[placement-worker] ✓ Placed '${placement.filename}' in folder '${placement.amppFolderName}'`);
} catch (err) {
placement.status = "failed";
placement.error = err.message;
console.error(`[placement-worker] ✗ Failed to place '${placement.filename}':`, err.message);
}
changed = true;
}
if (changed) this._saveData(this._db);
}
// ── Fetch recent AMPP jobs ─────────────────────────────────────────────────
async _fetchRecentJobs(token) {
const base = this._getAmppBase();
const url = `${base}/api/v1/queue/job/jobs/querypage?skip=0&limit=200&sort=created:dateTime&asc=false`;
const r = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}` },
body: "{}",
signal: AbortSignal.timeout(15000),
});
if (!r.ok) throw new Error(`AMPP jobs query returned HTTP ${r.status}`);
const data = await r.json();
return Array.isArray(data) ? data : (data?.items ?? data?.results ?? []);
}
// ── Match a job to a filename ──────────────────────────────────────────────
// Searches ALL string values in the job object — broad but reliable while
// we don't yet know the exact field name. Once confirmed, you can tighten
// this to specific fields for performance.
_matchJob(jobs, filename) {
const lc = filename.toLowerCase();
return jobs.find(job => {
return Object.values(job).some(v => {
if (typeof v !== "string") return false;
const vl = v.toLowerCase();
// Match exact filename or filename at end of a path/key
return vl === lc || vl.endsWith("/" + lc) || vl.endsWith("\\" + lc);
});
}) ?? null;
}
// ── Extract asset ID from a job ────────────────────────────────────────────
_extractAssetId(job) {
for (const field of ASSET_ID_FIELDS) {
const v = job[field];
if (v && typeof v === "string") return v.trim();
}
return null;
}
}
module.exports = AmppPlacementWorker;