diff --git a/services/mam-api/src/routes/recorders.js b/services/mam-api/src/routes/recorders.js index 017ac83..19e5868 100644 --- a/services/mam-api/src/routes/recorders.js +++ b/services/mam-api/src/routes/recorders.js @@ -1,16 +1,30 @@ import express from 'express'; import http from 'http'; import fs from 'fs'; +import { createReadStream, existsSync } from 'fs'; +import { stat } from 'fs/promises'; import net from 'net'; import dgram from 'dgram'; import pool from '../db/pool.js'; -import { getS3Bucket } from '../s3/client.js'; +import { s3Client, getS3Bucket } from '../s3/client.js'; +import { Upload } from '@aws-sdk/lib-storage'; import { validateUuid } from '../middleware/errors.js'; import { assertProjectAccess, accessibleProjectIds } from '../auth/authz.js'; import { v4 as uuidv4 } from 'uuid'; +import { Queue } from 'bullmq'; const router = express.Router(); +// BullMQ proxy queue — used by the growing-file stop handler to queue proxy +// jobs when the capture container's finalize call races with the S3 upload. +const parseRedisUrl = (url) => { + const parsed = new URL(url); + return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 }; +}; +const proxyQueue = new Queue('proxy', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); + // Every /:id recorder route is scoped to the recorder's project. The param // handler validates the UUID, resolves the owning project_id, and asserts the // 'view' baseline; mutating routes escalate to 'edit' via requireRecorderEdit. @@ -692,6 +706,28 @@ router.post('/:id/stop', requireRecorderEdit, async (req, res, next) => { } } + // ── Growing-files S3 promotion ──────────────────────────────────────────── + // When growing_enabled=true the capture container writes the master file to + // /growing/{projectId}/{clipName}.{ext} (a host bind-mount that the mam-api + // container also has at /growing). The capture container's graceful-shutdown + // handler (triggered by the Docker stop above) calls POST /assets/:id/finalize + // with the expected S3 key, which queues the proxy job — but the file was + // never uploaded to S3, so the proxy worker fails with "unable to open file". + // + // Fix: after the container has exited (ffmpeg is done flushing), upload the + // growing file to the canonical S3 key from here. This is synchronous and + // completes before the HTTP response reaches the client, so the already-queued + // proxy job will find a valid S3 object when the worker dequeues it. + // + // Only applies to LOCAL recorders — remote recorders write to a different + // node's /growing mount which this process cannot access. + if (!isRemote && recorder.growing_enabled === true && recorder.current_session_id) { + await promoteGrowingFileToS3(recorder).catch(err => { + // Non-fatal — log and continue so the stop always succeeds. + console.error('[recorders/stop] growing-file promotion failed (non-fatal):', err.message); + }); + } + const updateResult = await pool.query( `UPDATE recorders SET container_id = NULL, status = $1, updated_at = NOW() @@ -706,6 +742,109 @@ router.post('/:id/stop', requireRecorderEdit, async (req, res, next) => { } }); +/** + * Upload a completed growing-file master from /growing to S3 so the proxy + * worker can find it at the expected original_s3_key. + * + * The capture container writes to: + * /growing/{projectId}/{clipName}.{ext} + * + * The canonical S3 key (set on the asset row at recording start) is: + * projects/{projectId}/masters/{clipName}.{ext} + * + * We look up the live/processing asset to derive both paths, do a multipart + * upload, update the asset's original_s3_key and file_size to match what we + * actually uploaded, then ensure a proxy job exists for it. + */ +async function promoteGrowingFileToS3(recorder) { + const clipName = recorder.current_session_id; + const container = recorder.recording_container || 'mov'; + + // Find the asset that was pre-created at recording start. It could be in + // 'live' (finalize hasn't fired yet) or 'processing' (finalize already ran + // from the container's SIGTERM handler). We need both its id and its + // project_id to reconstruct the growing path. + const assetRes = await pool.query( + `SELECT id, project_id, status, original_s3_key + FROM assets + WHERE display_name = $1 + AND status IN ('live', 'processing', 'error') + ORDER BY created_at DESC + LIMIT 1`, + [clipName] + ); + + if (assetRes.rows.length === 0) { + console.warn(`[recorders/stop] no asset found for clip "${clipName}" — skipping growing-file promotion`); + return; + } + + const asset = assetRes.rows[0]; + const projectId = asset.project_id; + const growingDir = process.env.GROWING_DIR || '/growing'; + const localPath = `${growingDir}/${projectId}/${clipName}.${container}`; + const s3Key = `projects/${projectId}/masters/${clipName}.${container}`; + + if (!existsSync(localPath)) { + console.warn(`[recorders/stop] growing file not found at ${localPath} — nothing to promote (empty recording?)`); + return; + } + + const fileStat = await stat(localPath); + if (fileStat.size === 0) { + console.warn(`[recorders/stop] growing file at ${localPath} is empty — skipping promotion`); + return; + } + + console.log(`[recorders/stop] promoting growing file ${localPath} (${fileStat.size} bytes) → s3://${getS3Bucket()}/${s3Key}`); + + const upload = new Upload({ + client: s3Client, + params: { + Bucket: getS3Bucket(), + Key: s3Key, + Body: createReadStream(localPath), + }, + queueSize: 4, + partSize: 8 * 1024 * 1024, + }); + await upload.done(); + + console.log(`[recorders/stop] S3 upload complete for ${s3Key}`); + + // Ensure the asset row reflects the correct S3 key and file size. The + // capture container's finalize call may have already set original_s3_key to + // this same value (it was pre-set at start), but update file_size which + // finalize doesn't touch. + await pool.query( + `UPDATE assets + SET original_s3_key = $1, + file_size = $2, + updated_at = NOW() + WHERE id = $3`, + [s3Key, fileStat.size, asset.id] + ); + + // If the asset is still 'live' (capture container's finalize hasn't fired or + // failed), flip it to 'processing' and queue the proxy job ourselves so the + // clip doesn't get stuck in the library as "Recording…". + if (asset.status === 'live') { + console.log(`[recorders/stop] finalize not yet called — queueing proxy and flipping asset ${asset.id} to processing`); + await pool.query( + `UPDATE assets SET status = 'processing', updated_at = NOW() WHERE id = $1`, + [asset.id] + ); + await proxyQueue.add('generate', { + assetId: asset.id, + inputKey: s3Key, + outputKey: `proxies/${asset.id}.mp4`, + }); + } + // If status is already 'processing', the capture container's finalize already + // ran and queued the proxy job. The S3 upload we just did ensures the worker + // will find a valid object when it dequeues that job — nothing else to do. +} + // GET /:id/status - Get live status router.get('/:id/status', async (req, res, next) => { try {