From a86c1c72f9010e89a4f077ac2018a15b0e7a5080 Mon Sep 17 00:00:00 2001 From: opencode Date: Wed, 27 May 2026 02:38:42 +0000 Subject: [PATCH] fix(player): stitch S3 ranges around RustFS empty-body bug (#143) RustFS returns empty bodies for ranged GETs whose start offset is past ~5.9 MB on single-file proxy MP4s. HEAD reports correct size, full GET (`bytes=0-`) works, but `bytes=8179166-` comes back 206 + correct Content-Range header with zero bytes. Confirmed via direct S3 probe against broadcastmgmt.cloud/dragonmam (see scratch tests). Workaround in mam-api `GET /api/v1/assets/:id/video` until the proxy worker emits HLS (planned v1.2.1): - HEAD the object first to learn total size (also gives ETag / Last-Modified for conditional requests). - No-Range / unparseable-Range / pre-EOF requests \u2192 plain pipe. - Parsed `bytes=N-M` requests below RUSTFS_RANGE_SAFE_START (default 5_500_000) \u2192 direct ranged GET, RustFS handles fine. - Anything reaching into the broken zone \u2192 stream from offset 0, drop bytes below start, stop at end. Memory stays flat; extra bandwidth = (end+1 - requested-size) per seek. - Genuinely out-of-range \u2192 416 with Cache-Control: no-store so the browser doesn't poison its cache. Also stashes (not yet wired up) the HLS pieces we'll need for the follow-up: `segmentToHls` ffmpeg helper + `uploadDirectoryToS3` worker s3 helper. Harmless additions; not referenced by any code path yet. Confirmed against the affected asset (a72aaa03-...): bytes=0-100k + 50% +100k native pass-through; 70% +100k and near-EOF previously hung the browser, now stream correctly via the stitched path. Refs #143. --- services/mam-api/src/routes/assets.js | 214 +++++++++++++++++-------- services/worker/src/ffmpeg/executor.js | 28 ++++ services/worker/src/s3/client.js | 33 ++++ 3 files changed, 209 insertions(+), 66 deletions(-) diff --git a/services/mam-api/src/routes/assets.js b/services/mam-api/src/routes/assets.js index 6532a8f..e4d2fd2 100644 --- a/services/mam-api/src/routes/assets.js +++ b/services/mam-api/src/routes/assets.js @@ -575,6 +575,66 @@ router.get('/:id/live-path', async (req, res, next) => { // - ETag + Last-Modified for conditional requests (304 on repeat visits) // - Cache-Control: private, max-age=3600 so the browser caches segments // and doesn't re-fetch them on every seek within a session +// Issue #143 — RustFS returns empty bodies for ranged GETs whose start offset +// is past ~5.9 MB on single-file proxy MP4s. Confirmed via direct S3 probe: +// HEAD reports correct size, full GET (`bytes=0-`) works perfectly, but +// `bytes=8179166-` returns 206 + the right Content-Range header and a zero- +// byte body. A streaming GET from 0 reads cleanly *through* the broken zone. +// +// Workaround until the proxy worker emits HLS (planned v1.2.1): stream the +// proxy from offset 0, skip bytes the client didn't ask for, stop after the +// requested end. Browser sees a normal 206 + Content-Range. Mem stays flat; +// extra RustFS-to-mam-api bandwidth = (end+1 - actual-range) per seek. +// +// Small head-of-file ranges below RUSTFS_RANGE_SAFE_START are handled by a +// direct ranged GET — saves the streaming-from-0 cost on the common case of +// initial moov + first-segment fetch. + +async function* stitchedS3Stream(key, startByte, endByte) { + // Yields buffers covering exactly [startByte, endByte] inclusive. + // + // RustFS only mis-serves a ranged GET when the *start* offset of the + // request is past ~5.8 MB. So we pull the object in 4 MB windows whose + // START offsets always stay below the broken threshold: + // - We anchor every chunk's start at a multiple of RUSTFS_SAFE_CHUNK + // (0, 4 MB, 8 MB, …). + // - Wait — that puts later starts past the threshold. + // Instead: skip directly to the chunk containing `startByte`, but request + // it as `bytes=anchorStart-end` where anchorStart < threshold. Since the + // bug only bites when the *request start* offset is large, we never issue + // a single GET whose Range start is past the broken zone — we instead + // exploit that a low-offset GET that *continues past* the threshold reads + // cleanly (confirmed by the bytes=0- full-GET probe). + // + // Practically: one GET from 0 that streams up through endByte, dropping + // the bytes below startByte as they arrive. Memory stays flat; we pay + // (endByte+1) bytes of RustFS-to-mam-api bandwidth per request. + const res = await s3Client.send(new GetObjectCommand({ + Bucket: getS3Bucket(), + Key: key, + Range: `bytes=0-${endByte}`, + })); + + let consumed = 0; // bytes seen so far from S3 + let totalEmitted = 0; + for await (const buf of res.Body) { + const bufStart = consumed; // file offset of buf[0] + const bufEnd = consumed + buf.length - 1; + consumed += buf.length; + if (bufEnd < startByte) continue; // entirely before window + const sliceFrom = Math.max(0, startByte - bufStart); + const sliceTo = Math.min(buf.length, endByte - bufStart + 1); + if (sliceTo > sliceFrom) { + yield buf.subarray(sliceFrom, sliceTo); + totalEmitted += sliceTo - sliceFrom; + } + if (bufEnd >= endByte) break; + } + if (totalEmitted === 0) { + throw new Error(`RustFS returned empty body for ${key} bytes=0-${endByte}`); + } +} + router.get('/:id/video', async (req, res, next) => { try { const { id } = req.params; @@ -586,85 +646,107 @@ router.get('/:id/video', async (req, res, next) => { const key = a.proxy_s3_key || (origIsVideo ? a.original_s3_key : null); if (!key) return res.status(404).json({ error: 'No browser-playable source' }); - // Issue #143 — seeking to the very end of a clip stalled the player. - // Two contributing causes: - // 1) The previous 416 path re-fetched the full object with GetObject to - // learn the size (HEAD would do, and not transfer the bytes). - // 2) 416 responses inherited the same `private, max-age=3600` cache - // directive as 206, so once the browser hit "out of range" near EOF - // it stayed cached for the rest of the session and the player never - // retried. - // Now: HEAD the object first to learn the true size, clamp the client's - // Range to a valid window, and return uncached 416 only for truly - // unsatisfiable requests. + // HEAD the object to learn the true size. let totalSize = 0; + let etag, lastModified; try { const head = await s3Client.send(new HeadObjectCommand({ Bucket: getS3Bucket(), Key: key })); - totalSize = head.ContentLength || 0; + totalSize = head.ContentLength || 0; + etag = head.ETag; + lastModified = head.LastModified; } catch (_) { - // If HEAD fails we still try the GET — keeps behaviour for backends - // that disallow HEAD. + // HEAD failed — fall back to a plain GET (no range). } const rangeHeader = req.headers.range; - let clampedRange = rangeHeader; - if (rangeHeader && totalSize > 0) { - // bytes=START-END or bytes=START- (we ignore multi-range; not used by HTML5 video). - const m = /^bytes=(\d+)-(\d*)$/.exec(rangeHeader.trim()); - if (m) { - let start = parseInt(m[1], 10); - let end = m[2] === '' ? totalSize - 1 : parseInt(m[2], 10); - if (!Number.isFinite(start) || start < 0) start = 0; - if (!Number.isFinite(end) || end >= totalSize) end = totalSize - 1; - if (start >= totalSize) { - // Genuinely past EOF — return a clean, uncached 416. - res.writeHead(416, { - 'Content-Type': 'text/plain', - 'Content-Length': '0', - 'Content-Range': `bytes */${totalSize}`, - 'Accept-Ranges': 'bytes', - 'Cache-Control': 'no-store', - }); - return res.end(); - } - if (start > end) start = end; - clampedRange = `bytes=${start}-${end}`; - } + + // No Range header → stream the whole object. RustFS handles `bytes=0-` + // and "no Range" fine; this is the fast path. + if (!rangeHeader || totalSize === 0) { + const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key })); + const headers = { + 'Content-Type': 'video/mp4', + 'Accept-Ranges': 'bytes', + 'Cache-Control': 'private, max-age=3600', + }; + if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength); + if (s3Res.ETag) headers['ETag'] = s3Res.ETag; + if (s3Res.LastModified) headers['Last-Modified'] = s3Res.LastModified.toUTCString(); + res.writeHead(200, headers); + s3Res.Body.pipe(res); + return; } - const params = { Bucket: getS3Bucket(), Key: key }; - if (clampedRange) params.Range = clampedRange; - - let s3Res; - try { - s3Res = await s3Client.send(new GetObjectCommand(params)); - } catch (err) { - // Defensive: even with clamping, some backends still throw InvalidRange. - if (err.Code === 'InvalidRange' || err.$metadata?.httpStatusCode === 416) { - res.writeHead(416, { - 'Content-Type': 'text/plain', - 'Content-Length': '0', - 'Content-Range': `bytes */${totalSize}`, - 'Accept-Ranges': 'bytes', - 'Cache-Control': 'no-store', - }); - return res.end(); - } - throw err; + // Parse `bytes=START-END` / `bytes=START-`. Ignore multi-range. + const m = /^bytes=(\d+)-(\d*)$/.exec(rangeHeader.trim()); + if (!m) { + // Unparseable Range — fall back to full body, browser will cope. + const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key })); + res.writeHead(200, { 'Content-Type': 'video/mp4', 'Accept-Ranges': 'bytes' }); + s3Res.Body.pipe(res); + return; } - const status = clampedRange ? 206 : 200; + let start = parseInt(m[1], 10); + let end = m[2] === '' ? totalSize - 1 : parseInt(m[2], 10); + if (!Number.isFinite(start) || start < 0) start = 0; + if (!Number.isFinite(end) || end >= totalSize) end = totalSize - 1; + if (start >= totalSize) { + res.writeHead(416, { + 'Content-Type': 'text/plain', + 'Content-Length': '0', + 'Content-Range': `bytes */${totalSize}`, + 'Accept-Ranges': 'bytes', + 'Cache-Control': 'no-store', + }); + return res.end(); + } + if (start > end) start = end; + + const contentLength = end - start + 1; const headers = { - 'Content-Type': 'video/mp4', - 'Accept-Ranges': 'bytes', - 'Cache-Control': 'private, max-age=3600', + 'Content-Type': 'video/mp4', + 'Accept-Ranges': 'bytes', + 'Cache-Control': 'private, max-age=3600', + 'Content-Range': `bytes ${start}-${end}/${totalSize}`, + 'Content-Length': String(contentLength), }; - if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength); - if (s3Res.ContentRange) headers['Content-Range'] = s3Res.ContentRange; - if (s3Res.ETag) headers['ETag'] = s3Res.ETag; - if (s3Res.LastModified) headers['Last-Modified'] = s3Res.LastModified.toUTCString(); - res.writeHead(status, headers); - s3Res.Body.pipe(res); + if (etag) headers['ETag'] = etag; + if (lastModified) headers['Last-Modified'] = lastModified.toUTCString(); + + // For small head-of-file ranges (entirely below the broken threshold) + // a direct ranged GET works and saves the streaming-from-0 cost. + const RUSTFS_RANGE_SAFE_START = parseInt(process.env.RUSTFS_RANGE_SAFE_START || String(5_500_000), 10); + if (start < RUSTFS_RANGE_SAFE_START && end < RUSTFS_RANGE_SAFE_START) { + const s3Res = await s3Client.send(new GetObjectCommand({ + Bucket: getS3Bucket(), Key: key, Range: `bytes=${start}-${end}`, + })); + res.writeHead(206, headers); + s3Res.Body.pipe(res); + return; + } + + // Otherwise: stream from offset 0, drop bytes below `start`, stop at + // `end`. Browser sees a normal 206; mam-api stays memory-flat. + res.writeHead(206, headers); + try { + for await (const buf of stitchedS3Stream(key, start, end)) { + // res.write returns false when backpressure builds — pause and wait. + if (!res.write(buf)) { + await new Promise(r => res.once('drain', r)); + } + if (res.destroyed) return; + } + res.end(); + } catch (err) { + console.error(`[video] stitch failed for ${key}:`, err.message); + if (!res.headersSent) { + res.writeHead(500, { 'Content-Type': 'text/plain', 'Cache-Control': 'no-store' }); + res.end('Upstream storage error'); + } else { + res.destroy(err); + } + } } catch (err) { next(err); } }); diff --git a/services/worker/src/ffmpeg/executor.js b/services/worker/src/ffmpeg/executor.js index 8b7ea03..ace835b 100644 --- a/services/worker/src/ffmpeg/executor.js +++ b/services/worker/src/ffmpeg/executor.js @@ -174,6 +174,34 @@ export const trimSegment = async (inputPath, outputPath, inPoint, outPoint) => { await runFFmpeg(args); }; +// Segment an existing MP4/MOV into HLS (fMP4) — init.mp4 + segment_*.m4s + +// playlist.m3u8. Keeps the original codec (no re-encode) so this is cheap to +// run after the proxy transcode. fMP4 segments stay <5 MB at our proxy +// bitrate, which sidesteps RustFS's broken byte-range path on large objects. +export const segmentToHls = async (inputPath, outputDir, options = {}) => { + const { + segmentDurationSec = 4, + playlistName = 'playlist.m3u8', + initName = 'init.mp4', + segmentPattern = 'segment_%05d.m4s', + } = options; + + const args = [ + '-i', inputPath, + '-c', 'copy', + '-f', 'hls', + '-hls_time', String(segmentDurationSec), + '-hls_playlist_type', 'vod', + '-hls_segment_type', 'fmp4', + '-hls_flags', 'independent_segments', + '-hls_fmp4_init_filename', initName, + '-hls_segment_filename', `${outputDir}/${segmentPattern}`, + '-y', + `${outputDir}/${playlistName}`, + ]; + await runFFmpeg(args); +}; + export const concatSegments = async (segmentListFile, outputPath) => { const args = [ '-f', 'concat', diff --git a/services/worker/src/s3/client.js b/services/worker/src/s3/client.js index c537716..ece50c8 100644 --- a/services/worker/src/s3/client.js +++ b/services/worker/src/s3/client.js @@ -1,7 +1,15 @@ import { S3Client, GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3'; import { createReadStream, createWriteStream } from 'fs'; +import { readdir } from 'fs/promises'; +import { join, extname } from 'path'; import { pipeline } from 'stream/promises'; +const CONTENT_TYPES = { + '.m3u8': 'application/vnd.apple.mpegurl', + '.m4s': 'video/iso.segment', + '.mp4': 'video/mp4', +}; + const createS3Client = () => { return new S3Client({ region: process.env.S3_REGION || 'us-east-1', @@ -44,6 +52,31 @@ export const uploadToS3 = async (bucket, key, localPath) => { } }; +// Upload every file in `localDir` to `bucket` under `keyPrefix/`. Used for the +// HLS proxy output (init.mp4 + segment_*.m4s + playlist.m3u8). Each file goes +// up as its own PutObject so individual segments stay small and never trigger +// RustFS's broken byte-range path on large objects. +export const uploadDirectoryToS3 = async (bucket, keyPrefix, localDir) => { + const client = createS3Client(); + try { + const entries = await readdir(localDir, { withFileTypes: true }); + const files = entries.filter(e => e.isFile()).map(e => e.name); + for (const name of files) { + const ext = extname(name).toLowerCase(); + const ct = CONTENT_TYPES[ext] || 'application/octet-stream'; + await client.send(new PutObjectCommand({ + Bucket: bucket, + Key: `${keyPrefix}/${name}`, + Body: createReadStream(join(localDir, name)), + ContentType: ct, + })); + } + return files; + } finally { + client.destroy(); + } +}; + // Multipart-aware streaming upload — used by the promotion worker to push // large growing-file masters without buffering them entirely in memory. export const uploadStreamToS3 = async (bucket, key, readable) => {