HLS VOD playback for browser (supplements MP4 proxy) #170

Open
zgaetano wants to merge 7 commits from feat/hls-vod-playback into main
Showing only changes of commit 1ca295d799 - Show all commits

View file

@ -1,16 +1,30 @@
import express from 'express';
import http from 'http';
import fs from 'fs';
import { createReadStream, existsSync } from 'fs';
import { stat } from 'fs/promises';
import net from 'net';
import dgram from 'dgram';
import pool from '../db/pool.js';
import { getS3Bucket } from '../s3/client.js';
import { s3Client, getS3Bucket } from '../s3/client.js';
import { Upload } from '@aws-sdk/lib-storage';
import { validateUuid } from '../middleware/errors.js';
import { assertProjectAccess, accessibleProjectIds } from '../auth/authz.js';
import { v4 as uuidv4 } from 'uuid';
import { Queue } from 'bullmq';
const router = express.Router();
// BullMQ proxy queue — used by the growing-file stop handler to queue proxy
// jobs when the capture container's finalize call races with the S3 upload.
const parseRedisUrl = (url) => {
const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
};
const proxyQueue = new Queue('proxy', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
// Every /:id recorder route is scoped to the recorder's project. The param
// handler validates the UUID, resolves the owning project_id, and asserts the
// 'view' baseline; mutating routes escalate to 'edit' via requireRecorderEdit.
@ -692,6 +706,28 @@ router.post('/:id/stop', requireRecorderEdit, async (req, res, next) => {
}
}
// ── Growing-files S3 promotion ────────────────────────────────────────────
// When growing_enabled=true the capture container writes the master file to
// /growing/{projectId}/{clipName}.{ext} (a host bind-mount that the mam-api
// container also has at /growing). The capture container's graceful-shutdown
// handler (triggered by the Docker stop above) calls POST /assets/:id/finalize
// with the expected S3 key, which queues the proxy job — but the file was
// never uploaded to S3, so the proxy worker fails with "unable to open file".
//
// Fix: after the container has exited (ffmpeg is done flushing), upload the
// growing file to the canonical S3 key from here. This is synchronous and
// completes before the HTTP response reaches the client, so the already-queued
// proxy job will find a valid S3 object when the worker dequeues it.
//
// Only applies to LOCAL recorders — remote recorders write to a different
// node's /growing mount which this process cannot access.
if (!isRemote && recorder.growing_enabled === true && recorder.current_session_id) {
await promoteGrowingFileToS3(recorder).catch(err => {
// Non-fatal — log and continue so the stop always succeeds.
console.error('[recorders/stop] growing-file promotion failed (non-fatal):', err.message);
});
}
const updateResult = await pool.query(
`UPDATE recorders
SET container_id = NULL, status = $1, updated_at = NOW()
@ -706,6 +742,109 @@ router.post('/:id/stop', requireRecorderEdit, async (req, res, next) => {
}
});
/**
* Upload a completed growing-file master from /growing to S3 so the proxy
* worker can find it at the expected original_s3_key.
*
* The capture container writes to:
* /growing/{projectId}/{clipName}.{ext}
*
* The canonical S3 key (set on the asset row at recording start) is:
* projects/{projectId}/masters/{clipName}.{ext}
*
* We look up the live/processing asset to derive both paths, do a multipart
* upload, update the asset's original_s3_key and file_size to match what we
* actually uploaded, then ensure a proxy job exists for it.
*/
async function promoteGrowingFileToS3(recorder) {
const clipName = recorder.current_session_id;
const container = recorder.recording_container || 'mov';
// Find the asset that was pre-created at recording start. It could be in
// 'live' (finalize hasn't fired yet) or 'processing' (finalize already ran
// from the container's SIGTERM handler). We need both its id and its
// project_id to reconstruct the growing path.
const assetRes = await pool.query(
`SELECT id, project_id, status, original_s3_key
FROM assets
WHERE display_name = $1
AND status IN ('live', 'processing', 'error')
ORDER BY created_at DESC
LIMIT 1`,
[clipName]
);
if (assetRes.rows.length === 0) {
console.warn(`[recorders/stop] no asset found for clip "${clipName}" — skipping growing-file promotion`);
return;
}
const asset = assetRes.rows[0];
const projectId = asset.project_id;
const growingDir = process.env.GROWING_DIR || '/growing';
const localPath = `${growingDir}/${projectId}/${clipName}.${container}`;
const s3Key = `projects/${projectId}/masters/${clipName}.${container}`;
if (!existsSync(localPath)) {
console.warn(`[recorders/stop] growing file not found at ${localPath} — nothing to promote (empty recording?)`);
return;
}
const fileStat = await stat(localPath);
if (fileStat.size === 0) {
console.warn(`[recorders/stop] growing file at ${localPath} is empty — skipping promotion`);
return;
}
console.log(`[recorders/stop] promoting growing file ${localPath} (${fileStat.size} bytes) → s3://${getS3Bucket()}/${s3Key}`);
const upload = new Upload({
client: s3Client,
params: {
Bucket: getS3Bucket(),
Key: s3Key,
Body: createReadStream(localPath),
},
queueSize: 4,
partSize: 8 * 1024 * 1024,
});
await upload.done();
console.log(`[recorders/stop] S3 upload complete for ${s3Key}`);
// Ensure the asset row reflects the correct S3 key and file size. The
// capture container's finalize call may have already set original_s3_key to
// this same value (it was pre-set at start), but update file_size which
// finalize doesn't touch.
await pool.query(
`UPDATE assets
SET original_s3_key = $1,
file_size = $2,
updated_at = NOW()
WHERE id = $3`,
[s3Key, fileStat.size, asset.id]
);
// If the asset is still 'live' (capture container's finalize hasn't fired or
// failed), flip it to 'processing' and queue the proxy job ourselves so the
// clip doesn't get stuck in the library as "Recording…".
if (asset.status === 'live') {
console.log(`[recorders/stop] finalize not yet called — queueing proxy and flipping asset ${asset.id} to processing`);
await pool.query(
`UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1`,
[asset.id]
);
await proxyQueue.add('generate', {
assetId: asset.id,
inputKey: s3Key,
outputKey: `proxies/${asset.id}.mp4`,
});
}
// If status is already 'processing', the capture container's finalize already
// ran and queued the proxy job. The S3 upload we just did ensures the worker
// will find a valid object when it dequeues that job — nothing else to do.
}
// GET /:id/status - Get live status
router.get('/:id/status', async (req, res, next) => {
try {