feat(jobs): replace polling with SSE EventSource for live job updates

- Drop setTimeout/scheduleRefresh loop in favour of EventSource on
  /api/v1/jobs/events (pushes every 2 s from the server)
- Refresh dot turns green on open, goes grey + "Reconnecting…" on error
  (EventSource auto-reconnects natively)
- Type-filter is now applied client-side against the full SSE payload so
  the dropdown change no longer triggers an HTTP round-trip
- killJob / retryJob / clearCompleted no longer call loadJobs(); the next
  SSE push (≤2 s) reflects the change automatically
This commit is contained in:
Zac Gaetano 2026-05-19 23:17:18 -04:00
parent 16b8530d43
commit 81c771a7be

View file

@ -479,7 +479,7 @@
<div class="spacer"></div> <div class="spacer"></div>
<select class="form-select type-select" id="type-filter" onchange="loadJobs()"> <select class="form-select type-select" id="type-filter" onchange="renderJobs()">
<option value="">All types</option> <option value="">All types</option>
<option value="transcode">Transcode</option> <option value="transcode">Transcode</option>
<option value="proxy">Proxy</option> <option value="proxy">Proxy</option>
@ -490,7 +490,7 @@
<div class="refresh-indicator"> <div class="refresh-indicator">
<div class="refresh-dot" id="refresh-dot"></div> <div class="refresh-dot" id="refresh-dot"></div>
<span id="refresh-label">Live</span> <span id="refresh-label">Connecting…</span>
</div> </div>
</div> </div>
@ -525,9 +525,9 @@
──────────────────────────────────────────────────────── */ ──────────────────────────────────────────────────────── */
const API = '/api/v1'; const API = '/api/v1';
let allJobs = []; let allJobs = [];
let currentFilter = 'all'; let currentFilter = 'all';
let refreshTimer = null; let sseSource = null;
let activeCount = 0; let activeCount = 0;
/* ──────────────────────────────────────────────────────── /* ────────────────────────────────────────────────────────
@ -547,33 +547,46 @@ async function api(path, opts = {}) {
} }
/* ──────────────────────────────────────────────────────── /* ────────────────────────────────────────────────────────
Load jobs SSE live feed
──────────────────────────────────────────────────────── */ ──────────────────────────────────────────────────────── */
async function loadJobs() { function startSSE() {
const type = document.getElementById('type-filter').value; const dot = document.getElementById('refresh-dot');
try { const label = document.getElementById('refresh-label');
let url = '/jobs';
const params = [];
if (type) params.push(`type=${encodeURIComponent(type)}`);
if (params.length) url += '?' + params.join('&');
const data = await api(url); if (sseSource) { sseSource.close(); sseSource = null; }
allJobs = Array.isArray(data) ? data : (data.jobs || []);
updateStats(); sseSource = new EventSource('/api/v1/jobs/events');
updateCounts();
renderJobs(); sseSource.addEventListener('open', () => {
scheduleRefresh(); dot.classList.add('live');
} catch (e) { label.textContent = 'Live';
showError('Failed to load jobs: ' + e.message); });
}
sseSource.addEventListener('message', (ev) => {
try {
const payload = JSON.parse(ev.data);
if (payload.type !== 'jobs') return;
allJobs = payload.jobs;
updateStats();
updateCounts();
renderJobs();
} catch (_) {}
});
sseSource.addEventListener('error', () => {
dot.classList.remove('live');
label.textContent = 'Reconnecting…';
});
} }
/* ────────────────────────────────────────────────────────
Stats + counts
──────────────────────────────────────────────────────── */
function updateStats() { function updateStats() {
const total = allJobs.length; const total = allJobs.length;
const active = allJobs.filter(j => j.status === 'active' || j.status === 'waiting').length; const active = allJobs.filter(j => j.status === 'active' || j.status === 'waiting').length;
const done = allJobs.filter(j => j.status === 'completed').length; const done = allJobs.filter(j => j.status === 'completed').length;
const failed = allJobs.filter(j => j.status === 'failed').length; const failed = allJobs.filter(j => j.status === 'failed').length;
activeCount = active; activeCount = active;
@ -584,11 +597,13 @@ function updateStats() {
} }
function updateCounts() { function updateCounts() {
const typeFilter = document.getElementById('type-filter').value;
const base = typeFilter ? allJobs.filter(j => j.type === typeFilter) : allJobs;
const counts = { const counts = {
all: allJobs.length, all: base.length,
active: allJobs.filter(j => j.status === 'active' || j.status === 'waiting').length, active: base.filter(j => j.status === 'active' || j.status === 'waiting').length,
completed: allJobs.filter(j => j.status === 'completed').length, completed: base.filter(j => j.status === 'completed').length,
failed: allJobs.filter(j => j.status === 'failed').length failed: base.filter(j => j.status === 'failed').length
}; };
for (const [k, v] of Object.entries(counts)) { for (const [k, v] of Object.entries(counts)) {
const el = document.getElementById('cnt-' + k); const el = document.getElementById('cnt-' + k);
@ -600,11 +615,12 @@ function updateCounts() {
Render Render
──────────────────────────────────────────────────────── */ ──────────────────────────────────────────────────────── */
function getFilteredJobs() { function getFilteredJobs() {
if (currentFilter === 'all') return allJobs; const typeFilter = document.getElementById('type-filter').value;
if (currentFilter === 'active') return allJobs.filter(j => j.status === 'active' || j.status === 'waiting'); let jobs = typeFilter ? allJobs.filter(j => j.type === typeFilter) : allJobs;
if (currentFilter === 'completed') return allJobs.filter(j => j.status === 'completed'); if (currentFilter === 'active') return jobs.filter(j => j.status === 'active' || j.status === 'waiting');
if (currentFilter === 'failed') return allJobs.filter(j => j.status === 'failed'); if (currentFilter === 'completed') return jobs.filter(j => j.status === 'completed');
return allJobs; if (currentFilter === 'failed') return jobs.filter(j => j.status === 'failed');
return jobs;
} }
function renderJobs() { function renderJobs() {
@ -706,7 +722,7 @@ async function killJob(jobId, ev) {
if (!confirm('Remove this job from the queue? If a worker is still processing it, the run is abandoned.')) return; if (!confirm('Remove this job from the queue? If a worker is still processing it, the run is abandoned.')) return;
try { try {
const r = await fetch('/api/v1/jobs/' + encodeURIComponent(jobId), { method: 'DELETE', credentials: 'include' }); const r = await fetch('/api/v1/jobs/' + encodeURIComponent(jobId), { method: 'DELETE', credentials: 'include' });
if (r.ok) { toast('Job removed', 'success'); loadJobs(); } if (r.ok) { toast('Job removed', 'success'); }
else { const d = await r.json().catch(()=>({})); toast('Remove failed: ' + (d.error || r.statusText), 'error'); } else { const d = await r.json().catch(()=>({})); toast('Remove failed: ' + (d.error || r.statusText), 'error'); }
} catch (err) { } catch (err) {
toast('Remove failed: ' + err.message, 'error'); toast('Remove failed: ' + err.message, 'error');
@ -718,7 +734,6 @@ async function retryJob(assetId, ev) {
try { try {
await api('/assets/' + encodeURIComponent(assetId) + '/retry', { method: 'POST' }); await api('/assets/' + encodeURIComponent(assetId) + '/retry', { method: 'POST' });
toast('Job re-queued — processing will restart shortly.'); toast('Job re-queued — processing will restart shortly.');
loadJobs();
} catch (e) { } catch (e) {
showError('Retry failed: ' + e.message); showError('Retry failed: ' + e.message);
} }
@ -745,24 +760,6 @@ function setFilter(filter, btn) {
renderJobs(); renderJobs();
} }
/* ────────────────────────────────────────────────────────
Auto-refresh
──────────────────────────────────────────────────────── */
function scheduleRefresh() {
clearTimeout(refreshTimer);
const dot = document.getElementById('refresh-dot');
const label = document.getElementById('refresh-label');
if (activeCount > 0) {
dot.classList.add('live');
label.textContent = 'Live';
refreshTimer = setTimeout(loadJobs, 5000);
} else {
dot.classList.remove('live');
label.textContent = 'Idle';
}
}
/* ──────────────────────────────────────────────────────── /* ────────────────────────────────────────────────────────
Detail panel Detail panel
──────────────────────────────────────────────────────── */ ──────────────────────────────────────────────────────── */
@ -841,8 +838,6 @@ function closeDetail() {
document.getElementById('detail-overlay').classList.remove('open'); document.getElementById('detail-overlay').classList.remove('open');
} }
/* ── formatFileSize + formatDuration from common utils ── */
/* ──────────────────────────────────────────────────────── /* ────────────────────────────────────────────────────────
Clear completed Clear completed
──────────────────────────────────────────────────────── */ ──────────────────────────────────────────────────────── */
@ -853,7 +848,6 @@ async function clearCompleted() {
try { try {
await Promise.all(completed.map(j => api(`/jobs/${j.id}`, { method: 'DELETE' }).catch(() => {}))); await Promise.all(completed.map(j => api(`/jobs/${j.id}`, { method: 'DELETE' }).catch(() => {})));
toast(`Cleared ${completed.length} completed job${completed.length === 1 ? '' : 's'}.`); toast(`Cleared ${completed.length} completed job${completed.length === 1 ? '' : 's'}.`);
loadJobs();
} catch (e) { } catch (e) {
showError('Failed to clear jobs: ' + e.message); showError('Failed to clear jobs: ' + e.message);
} }
@ -900,7 +894,7 @@ function showError(msg) { toast(msg, 'error'); }
/* ──────────────────────────────────────────────────────── /* ────────────────────────────────────────────────────────
Init Init
──────────────────────────────────────────────────────── */ ──────────────────────────────────────────────────────── */
loadJobs(); startSSE();
</script> </script>
<script src="js/topbar-strip.js?v=1"></script> <script src="js/topbar-strip.js?v=1"></script>
<script src="js/auth-guard.js"></script> <script src="js/auth-guard.js"></script>