141 lines
5.3 KiB
JavaScript
141 lines
5.3 KiB
JavaScript
|
|
// Scheduler tick — every TICK_INTERVAL_MS scan the recorder_schedules table
|
||
|
|
// and transition any rows whose window opens or closes. The actual recorder
|
||
|
|
// start/stop is delegated to the existing /recorders/:id/start|stop routes
|
||
|
|
// via an in-process HTTP call, so we reuse all of the existing container
|
||
|
|
// orchestration, growing-files handling, asset row creation, etc.
|
||
|
|
//
|
||
|
|
// On schedule completion: a 'daily' or 'weekly' recurring schedule is cloned
|
||
|
|
// forward by 1 day / 7 days into a new 'pending' row.
|
||
|
|
|
||
|
|
import pool from './db/pool.js';
|
||
|
|
|
||
|
|
const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10);
|
||
|
|
const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`;
|
||
|
|
|
||
|
|
let _tickRunning = false;
|
||
|
|
let _interval = null;
|
||
|
|
|
||
|
|
async function callSelf(path, method = 'POST') {
|
||
|
|
const res = await fetch(`${SELF_URL}${path}`, {
|
||
|
|
method,
|
||
|
|
headers: { 'Content-Type': 'application/json' },
|
||
|
|
signal: AbortSignal.timeout(30000),
|
||
|
|
});
|
||
|
|
if (!res.ok) {
|
||
|
|
const text = await res.text().catch(() => '');
|
||
|
|
throw new Error(`${method} ${path} → HTTP ${res.status}: ${text.slice(0, 200)}`);
|
||
|
|
}
|
||
|
|
return res.json().catch(() => ({}));
|
||
|
|
}
|
||
|
|
|
||
|
|
async function tick() {
|
||
|
|
if (_tickRunning) return;
|
||
|
|
_tickRunning = true;
|
||
|
|
|
||
|
|
try {
|
||
|
|
// 1) Start any pending schedules whose window has opened
|
||
|
|
const dueStart = await pool.query(
|
||
|
|
`SELECT * FROM recorder_schedules
|
||
|
|
WHERE status = 'pending' AND start_at <= NOW() AND end_at > NOW()
|
||
|
|
ORDER BY start_at ASC`
|
||
|
|
);
|
||
|
|
for (const s of dueStart.rows) {
|
||
|
|
try {
|
||
|
|
const result = await callSelf(`/api/v1/recorders/${s.recorder_id}/start`);
|
||
|
|
await pool.query(
|
||
|
|
`UPDATE recorder_schedules
|
||
|
|
SET status = 'running', last_asset_id = NULL, updated_at = NOW()
|
||
|
|
WHERE id = $1`,
|
||
|
|
[s.id]
|
||
|
|
);
|
||
|
|
console.log(`[scheduler] started schedule "${s.name}" on recorder ${s.recorder_id} (session=${result.current_session_id || '?'})`);
|
||
|
|
} catch (err) {
|
||
|
|
await pool.query(
|
||
|
|
`UPDATE recorder_schedules
|
||
|
|
SET status = 'failed', error_message = $2, updated_at = NOW()
|
||
|
|
WHERE id = $1`,
|
||
|
|
[s.id, err.message.slice(0, 500)]
|
||
|
|
);
|
||
|
|
console.error(`[scheduler] start failed for schedule ${s.id}: ${err.message}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 2) Stop any running schedules whose window has closed
|
||
|
|
const dueStop = await pool.query(
|
||
|
|
`SELECT * FROM recorder_schedules
|
||
|
|
WHERE status = 'running' AND end_at <= NOW()
|
||
|
|
ORDER BY end_at ASC`
|
||
|
|
);
|
||
|
|
for (const s of dueStop.rows) {
|
||
|
|
try {
|
||
|
|
await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`);
|
||
|
|
await pool.query(
|
||
|
|
`UPDATE recorder_schedules SET status = 'completed', updated_at = NOW()
|
||
|
|
WHERE id = $1`,
|
||
|
|
[s.id]
|
||
|
|
);
|
||
|
|
console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`);
|
||
|
|
await enqueueNextOccurrence(s);
|
||
|
|
} catch (err) {
|
||
|
|
// Stop failed — flag as failed but don't keep trying forever.
|
||
|
|
await pool.query(
|
||
|
|
`UPDATE recorder_schedules
|
||
|
|
SET status = 'failed', error_message = $2, updated_at = NOW()
|
||
|
|
WHERE id = $1`,
|
||
|
|
[s.id, ('stop: ' + err.message).slice(0, 500)]
|
||
|
|
);
|
||
|
|
console.error(`[scheduler] stop failed for schedule ${s.id}: ${err.message}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 3) If a schedule was cancelled while running, stop the recorder.
|
||
|
|
const cancelledRunning = await pool.query(
|
||
|
|
`SELECT s.* FROM recorder_schedules s
|
||
|
|
JOIN recorders r ON r.id = s.recorder_id
|
||
|
|
WHERE s.status = 'cancelled' AND r.status = 'recording'
|
||
|
|
AND s.updated_at > NOW() - INTERVAL '5 minutes'`
|
||
|
|
);
|
||
|
|
for (const s of cancelledRunning.rows) {
|
||
|
|
try {
|
||
|
|
await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`);
|
||
|
|
console.log(`[scheduler] cancelled schedule "${s.name}" — stopped recorder ${s.recorder_id}`);
|
||
|
|
} catch (err) {
|
||
|
|
console.warn(`[scheduler] cancel-stop failed for ${s.id}: ${err.message}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
} catch (err) {
|
||
|
|
console.error('[scheduler] tick error:', err);
|
||
|
|
} finally {
|
||
|
|
_tickRunning = false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
async function enqueueNextOccurrence(schedule) {
|
||
|
|
if (schedule.recurrence === 'none') return;
|
||
|
|
const days = schedule.recurrence === 'weekly' ? 7 : 1;
|
||
|
|
const start = new Date(schedule.start_at);
|
||
|
|
const end = new Date(schedule.end_at);
|
||
|
|
start.setUTCDate(start.getUTCDate() + days);
|
||
|
|
end.setUTCDate(end.getUTCDate() + days);
|
||
|
|
await pool.query(
|
||
|
|
`INSERT INTO recorder_schedules
|
||
|
|
(name, recorder_id, start_at, end_at, recurrence, status)
|
||
|
|
VALUES ($1, $2, $3, $4, $5, 'pending')`,
|
||
|
|
[schedule.name, schedule.recorder_id, start.toISOString(), end.toISOString(), schedule.recurrence]
|
||
|
|
);
|
||
|
|
console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`);
|
||
|
|
}
|
||
|
|
|
||
|
|
export function startSchedulerLoop() {
|
||
|
|
if (_interval) return;
|
||
|
|
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);
|
||
|
|
// Fire once on startup so a window that opened while the API was down
|
||
|
|
// doesn't have to wait a full interval.
|
||
|
|
setTimeout(() => tick().catch(() => {}), 2000);
|
||
|
|
_interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS);
|
||
|
|
}
|
||
|
|
|
||
|
|
export function stopSchedulerLoop() {
|
||
|
|
if (_interval) { clearInterval(_interval); _interval = null; }
|
||
|
|
}
|