dragonflight/services/mam-api/src/scheduler.js

160 lines
6 KiB
JavaScript
Raw Normal View History

// Scheduler tick — every TICK_INTERVAL_MS scan the recorder_schedules table
// and transition any rows whose window opens or closes. The actual recorder
// start/stop is delegated to the existing /recorders/:id/start|stop routes
// via an in-process HTTP call, so we reuse all of the existing container
// orchestration, growing-files handling, asset row creation, etc.
//
// On schedule completion: a 'daily' or 'weekly' recurring schedule is cloned
// forward by 1 day / 7 days into a new 'pending' row.
import pool from './db/pool.js';
const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10);
const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`;
let _tickRunning = false;
let _interval = null;
async function callSelf(path, method = 'POST') {
const res = await fetch(`${SELF_URL}${path}`, {
method,
headers: { 'Content-Type': 'application/json' },
signal: AbortSignal.timeout(30000),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${path} → HTTP ${res.status}: ${text.slice(0, 200)}`);
}
return res.json().catch(() => ({}));
}
async function tick() {
if (_tickRunning) return;
_tickRunning = true;
try {
// 1) Start any pending schedules whose window has opened
const dueStart = await pool.query(
`SELECT * FROM recorder_schedules
WHERE status = 'pending' AND start_at <= NOW() AND end_at > NOW()
ORDER BY start_at ASC`
);
for (const s of dueStart.rows) {
try {
const result = await callSelf(`/api/v1/recorders/${s.recorder_id}/start`);
await pool.query(
`UPDATE recorder_schedules
SET status = 'running', last_asset_id = NULL, updated_at = NOW()
WHERE id = $1`,
[s.id]
);
console.log(`[scheduler] started schedule "${s.name}" on recorder ${s.recorder_id} (session=${result.current_session_id || '?'})`);
} catch (err) {
await pool.query(
`UPDATE recorder_schedules
SET status = 'failed', error_message = $2, updated_at = NOW()
WHERE id = $1`,
[s.id, err.message.slice(0, 500)]
);
console.error(`[scheduler] start failed for schedule ${s.id}: ${err.message}`);
}
}
// 2) Stop any running schedules whose window has closed
const dueStop = await pool.query(
`SELECT * FROM recorder_schedules
WHERE status = 'running' AND end_at <= NOW()
ORDER BY end_at ASC`
);
for (const s of dueStop.rows) {
try {
await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`);
await pool.query(
`UPDATE recorder_schedules SET status = 'completed', updated_at = NOW()
WHERE id = $1`,
[s.id]
);
console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`);
await enqueueNextOccurrence(s);
} catch (err) {
// Stop failed — flag as failed but don't keep trying forever.
await pool.query(
`UPDATE recorder_schedules
SET status = 'failed', error_message = $2, updated_at = NOW()
WHERE id = $1`,
[s.id, ('stop: ' + err.message).slice(0, 500)]
);
console.error(`[scheduler] stop failed for schedule ${s.id}: ${err.message}`);
}
}
// 3) If a schedule was cancelled while running, stop the recorder.
const cancelledRunning = await pool.query(
`SELECT s.* FROM recorder_schedules s
JOIN recorders r ON r.id = s.recorder_id
WHERE s.status = 'cancelled' AND r.status = 'recording'
AND s.updated_at > NOW() - INTERVAL '5 minutes'`
);
for (const s of cancelledRunning.rows) {
try {
await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`);
console.log(`[scheduler] cancelled schedule "${s.name}" — stopped recorder ${s.recorder_id}`);
} catch (err) {
console.warn(`[scheduler] cancel-stop failed for ${s.id}: ${err.message}`);
}
}
fix: close all 24 open issues (#40–#94) Bug fixes: - #91: dockerApi() 10s socket timeout (Docker daemon hang) - #77: await syncToAmpp() with .catch() — no longer fire-and-forget - #75: migration 016 — add 'proxy','import' to job_type enum; add 'completed' to job_status - #73: BullMQ orphan job cleanup on hard asset delete - #70: batch-trim jobs table gets expires_at; trim-status auto-expires stale rows - #66: scheduler tick marks stale live assets (>2h) as error - #63: migration 017 — partial unique index prevents concurrent live asset overwrite - #61: recorders.js uses getS3Bucket() not stale process.env.S3_BUCKET - #60: already fixed (copy nulls proxy/thumbnail keys, requeues proxy) - #40: already fixed (All projects clears openProject) - #64: already fixed (sourceType/needsProxy handled) - #90: GET /jobs now includes DB jobs table (trim jobs visible in UI) - #74: nginx Content-Type header preserved; multer 500MB file size limit - #68: GET /upload returns in-progress ingesting assets - #58: /stream and /video endpoints fall back to original file for all video types - #55: recorder poll .catch() logs auth errors cleanly; redirect stops interval - #52: thumb-status and thumb-duration moved inside position:relative wrapper - #50: ProjectCard gets onContextMenu handler with rename/delete menu - #49: project context menu dismisses on contextmenu + scroll events Features: - #93: POST /assets/:id/reprocess?type=proxy|thumbnail — force re-queue any asset Asset ⋯ menu now shows 'Re-generate proxy' and 'Re-generate thumbnail' buttons UI: - Logo: brightness(0) invert(1) filter applied consistently in sidebar, launcher, and login — white logo pops on dark UI; inline style removed from login.html
2026-05-26 10:10:44 -04:00
// 4) Mark stale live assets as 'error' (#66).
// If a capture container crashes without calling mark-empty/mark-complete,
// the asset row stays status='live' indefinitely. Timeout after 2 hours.
const LIVE_TIMEOUT_MINUTES = parseInt(process.env.LIVE_ASSET_TIMEOUT_MINUTES || '120', 10);
const staleResult = await pool.query(
`UPDATE assets
SET status = 'error',
updated_at = NOW()
WHERE status = 'live'
AND created_at < NOW() - ($1 || ' minutes')::INTERVAL
RETURNING id, display_name`,
[LIVE_TIMEOUT_MINUTES]
);
if (staleResult.rows.length > 0) {
for (const row of staleResult.rows) {
console.warn(`[scheduler] marked stale live asset as error: ${row.id} (${row.display_name})`);
}
}
} catch (err) {
console.error('[scheduler] tick error:', err);
} finally {
_tickRunning = false;
}
}
async function enqueueNextOccurrence(schedule) {
if (schedule.recurrence === 'none') return;
const days = schedule.recurrence === 'weekly' ? 7 : 1;
const start = new Date(schedule.start_at);
const end = new Date(schedule.end_at);
start.setUTCDate(start.getUTCDate() + days);
end.setUTCDate(end.getUTCDate() + days);
await pool.query(
`INSERT INTO recorder_schedules
(name, recorder_id, start_at, end_at, recurrence, status)
VALUES ($1, $2, $3, $4, $5, 'pending')`,
[schedule.name, schedule.recorder_id, start.toISOString(), end.toISOString(), schedule.recurrence]
);
console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`);
}
export function startSchedulerLoop() {
if (_interval) return;
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);
// Fire once on startup so a window that opened while the API was down
// doesn't have to wait a full interval.
setTimeout(() => tick().catch(() => {}), 2000);
_interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS);
}
export function stopSchedulerLoop() {
if (_interval) { clearInterval(_interval); _interval = null; }
}