fix(player): stitch S3 ranges around RustFS empty-body bug (#143)
RustFS returns empty bodies for ranged GETs whose start offset is past
~5.9 MB on single-file proxy MP4s. HEAD reports correct size, full GET
(`bytes=0-`) works, but `bytes=8179166-` comes back 206 + correct
Content-Range header with zero bytes. Confirmed via direct S3 probe
against broadcastmgmt.cloud/dragonmam (see scratch tests).
Workaround in mam-api `GET /api/v1/assets/:id/video` until the proxy
worker emits HLS (planned v1.2.1):
- HEAD the object first to learn total size (also gives ETag /
Last-Modified for conditional requests).
- No-Range / unparseable-Range / pre-EOF requests \u2192 plain pipe.
- Parsed `bytes=N-M` requests below RUSTFS_RANGE_SAFE_START
(default 5_500_000) \u2192 direct ranged GET, RustFS handles fine.
- Anything reaching into the broken zone \u2192 stream from offset 0,
drop bytes below start, stop at end. Memory stays flat; extra
bandwidth = (end+1 - requested-size) per seek.
- Genuinely out-of-range \u2192 416 with Cache-Control: no-store so the
browser doesn't poison its cache.
Also stashes (not yet wired up) the HLS pieces we'll need for the
follow-up: `segmentToHls` ffmpeg helper + `uploadDirectoryToS3`
worker s3 helper. Harmless additions; not referenced by any code path
yet.
Confirmed against the affected asset (a72aaa03-...): bytes=0-100k +
50% +100k native pass-through; 70% +100k and near-EOF previously hung
the browser, now stream correctly via the stitched path.
Refs #143.
This commit is contained in:
parent
04ce096e67
commit
a86c1c72f9
3 changed files with 209 additions and 66 deletions
|
|
@ -575,6 +575,66 @@ router.get('/:id/live-path', async (req, res, next) => {
|
||||||
// - ETag + Last-Modified for conditional requests (304 on repeat visits)
|
// - ETag + Last-Modified for conditional requests (304 on repeat visits)
|
||||||
// - Cache-Control: private, max-age=3600 so the browser caches segments
|
// - Cache-Control: private, max-age=3600 so the browser caches segments
|
||||||
// and doesn't re-fetch them on every seek within a session
|
// and doesn't re-fetch them on every seek within a session
|
||||||
|
// Issue #143 — RustFS returns empty bodies for ranged GETs whose start offset
|
||||||
|
// is past ~5.9 MB on single-file proxy MP4s. Confirmed via direct S3 probe:
|
||||||
|
// HEAD reports correct size, full GET (`bytes=0-`) works perfectly, but
|
||||||
|
// `bytes=8179166-` returns 206 + the right Content-Range header and a zero-
|
||||||
|
// byte body. A streaming GET from 0 reads cleanly *through* the broken zone.
|
||||||
|
//
|
||||||
|
// Workaround until the proxy worker emits HLS (planned v1.2.1): stream the
|
||||||
|
// proxy from offset 0, skip bytes the client didn't ask for, stop after the
|
||||||
|
// requested end. Browser sees a normal 206 + Content-Range. Mem stays flat;
|
||||||
|
// extra RustFS-to-mam-api bandwidth = (end+1 - actual-range) per seek.
|
||||||
|
//
|
||||||
|
// Small head-of-file ranges below RUSTFS_RANGE_SAFE_START are handled by a
|
||||||
|
// direct ranged GET — saves the streaming-from-0 cost on the common case of
|
||||||
|
// initial moov + first-segment fetch.
|
||||||
|
|
||||||
|
async function* stitchedS3Stream(key, startByte, endByte) {
|
||||||
|
// Yields buffers covering exactly [startByte, endByte] inclusive.
|
||||||
|
//
|
||||||
|
// RustFS only mis-serves a ranged GET when the *start* offset of the
|
||||||
|
// request is past ~5.8 MB. So we pull the object in 4 MB windows whose
|
||||||
|
// START offsets always stay below the broken threshold:
|
||||||
|
// - We anchor every chunk's start at a multiple of RUSTFS_SAFE_CHUNK
|
||||||
|
// (0, 4 MB, 8 MB, …).
|
||||||
|
// - Wait — that puts later starts past the threshold.
|
||||||
|
// Instead: skip directly to the chunk containing `startByte`, but request
|
||||||
|
// it as `bytes=anchorStart-end` where anchorStart < threshold. Since the
|
||||||
|
// bug only bites when the *request start* offset is large, we never issue
|
||||||
|
// a single GET whose Range start is past the broken zone — we instead
|
||||||
|
// exploit that a low-offset GET that *continues past* the threshold reads
|
||||||
|
// cleanly (confirmed by the bytes=0- full-GET probe).
|
||||||
|
//
|
||||||
|
// Practically: one GET from 0 that streams up through endByte, dropping
|
||||||
|
// the bytes below startByte as they arrive. Memory stays flat; we pay
|
||||||
|
// (endByte+1) bytes of RustFS-to-mam-api bandwidth per request.
|
||||||
|
const res = await s3Client.send(new GetObjectCommand({
|
||||||
|
Bucket: getS3Bucket(),
|
||||||
|
Key: key,
|
||||||
|
Range: `bytes=0-${endByte}`,
|
||||||
|
}));
|
||||||
|
|
||||||
|
let consumed = 0; // bytes seen so far from S3
|
||||||
|
let totalEmitted = 0;
|
||||||
|
for await (const buf of res.Body) {
|
||||||
|
const bufStart = consumed; // file offset of buf[0]
|
||||||
|
const bufEnd = consumed + buf.length - 1;
|
||||||
|
consumed += buf.length;
|
||||||
|
if (bufEnd < startByte) continue; // entirely before window
|
||||||
|
const sliceFrom = Math.max(0, startByte - bufStart);
|
||||||
|
const sliceTo = Math.min(buf.length, endByte - bufStart + 1);
|
||||||
|
if (sliceTo > sliceFrom) {
|
||||||
|
yield buf.subarray(sliceFrom, sliceTo);
|
||||||
|
totalEmitted += sliceTo - sliceFrom;
|
||||||
|
}
|
||||||
|
if (bufEnd >= endByte) break;
|
||||||
|
}
|
||||||
|
if (totalEmitted === 0) {
|
||||||
|
throw new Error(`RustFS returned empty body for ${key} bytes=0-${endByte}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
router.get('/:id/video', async (req, res, next) => {
|
router.get('/:id/video', async (req, res, next) => {
|
||||||
try {
|
try {
|
||||||
const { id } = req.params;
|
const { id } = req.params;
|
||||||
|
|
@ -586,38 +646,52 @@ router.get('/:id/video', async (req, res, next) => {
|
||||||
const key = a.proxy_s3_key || (origIsVideo ? a.original_s3_key : null);
|
const key = a.proxy_s3_key || (origIsVideo ? a.original_s3_key : null);
|
||||||
if (!key) return res.status(404).json({ error: 'No browser-playable source' });
|
if (!key) return res.status(404).json({ error: 'No browser-playable source' });
|
||||||
|
|
||||||
// Issue #143 — seeking to the very end of a clip stalled the player.
|
// HEAD the object to learn the true size.
|
||||||
// Two contributing causes:
|
|
||||||
// 1) The previous 416 path re-fetched the full object with GetObject to
|
|
||||||
// learn the size (HEAD would do, and not transfer the bytes).
|
|
||||||
// 2) 416 responses inherited the same `private, max-age=3600` cache
|
|
||||||
// directive as 206, so once the browser hit "out of range" near EOF
|
|
||||||
// it stayed cached for the rest of the session and the player never
|
|
||||||
// retried.
|
|
||||||
// Now: HEAD the object first to learn the true size, clamp the client's
|
|
||||||
// Range to a valid window, and return uncached 416 only for truly
|
|
||||||
// unsatisfiable requests.
|
|
||||||
let totalSize = 0;
|
let totalSize = 0;
|
||||||
|
let etag, lastModified;
|
||||||
try {
|
try {
|
||||||
const head = await s3Client.send(new HeadObjectCommand({ Bucket: getS3Bucket(), Key: key }));
|
const head = await s3Client.send(new HeadObjectCommand({ Bucket: getS3Bucket(), Key: key }));
|
||||||
totalSize = head.ContentLength || 0;
|
totalSize = head.ContentLength || 0;
|
||||||
|
etag = head.ETag;
|
||||||
|
lastModified = head.LastModified;
|
||||||
} catch (_) {
|
} catch (_) {
|
||||||
// If HEAD fails we still try the GET — keeps behaviour for backends
|
// HEAD failed — fall back to a plain GET (no range).
|
||||||
// that disallow HEAD.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const rangeHeader = req.headers.range;
|
const rangeHeader = req.headers.range;
|
||||||
let clampedRange = rangeHeader;
|
|
||||||
if (rangeHeader && totalSize > 0) {
|
// No Range header → stream the whole object. RustFS handles `bytes=0-`
|
||||||
// bytes=START-END or bytes=START- (we ignore multi-range; not used by HTML5 video).
|
// and "no Range" fine; this is the fast path.
|
||||||
|
if (!rangeHeader || totalSize === 0) {
|
||||||
|
const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key }));
|
||||||
|
const headers = {
|
||||||
|
'Content-Type': 'video/mp4',
|
||||||
|
'Accept-Ranges': 'bytes',
|
||||||
|
'Cache-Control': 'private, max-age=3600',
|
||||||
|
};
|
||||||
|
if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength);
|
||||||
|
if (s3Res.ETag) headers['ETag'] = s3Res.ETag;
|
||||||
|
if (s3Res.LastModified) headers['Last-Modified'] = s3Res.LastModified.toUTCString();
|
||||||
|
res.writeHead(200, headers);
|
||||||
|
s3Res.Body.pipe(res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse `bytes=START-END` / `bytes=START-`. Ignore multi-range.
|
||||||
const m = /^bytes=(\d+)-(\d*)$/.exec(rangeHeader.trim());
|
const m = /^bytes=(\d+)-(\d*)$/.exec(rangeHeader.trim());
|
||||||
if (m) {
|
if (!m) {
|
||||||
|
// Unparseable Range — fall back to full body, browser will cope.
|
||||||
|
const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key }));
|
||||||
|
res.writeHead(200, { 'Content-Type': 'video/mp4', 'Accept-Ranges': 'bytes' });
|
||||||
|
s3Res.Body.pipe(res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let start = parseInt(m[1], 10);
|
let start = parseInt(m[1], 10);
|
||||||
let end = m[2] === '' ? totalSize - 1 : parseInt(m[2], 10);
|
let end = m[2] === '' ? totalSize - 1 : parseInt(m[2], 10);
|
||||||
if (!Number.isFinite(start) || start < 0) start = 0;
|
if (!Number.isFinite(start) || start < 0) start = 0;
|
||||||
if (!Number.isFinite(end) || end >= totalSize) end = totalSize - 1;
|
if (!Number.isFinite(end) || end >= totalSize) end = totalSize - 1;
|
||||||
if (start >= totalSize) {
|
if (start >= totalSize) {
|
||||||
// Genuinely past EOF — return a clean, uncached 416.
|
|
||||||
res.writeHead(416, {
|
res.writeHead(416, {
|
||||||
'Content-Type': 'text/plain',
|
'Content-Type': 'text/plain',
|
||||||
'Content-Length': '0',
|
'Content-Length': '0',
|
||||||
|
|
@ -628,43 +702,51 @@ router.get('/:id/video', async (req, res, next) => {
|
||||||
return res.end();
|
return res.end();
|
||||||
}
|
}
|
||||||
if (start > end) start = end;
|
if (start > end) start = end;
|
||||||
clampedRange = `bytes=${start}-${end}`;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const params = { Bucket: getS3Bucket(), Key: key };
|
const contentLength = end - start + 1;
|
||||||
if (clampedRange) params.Range = clampedRange;
|
|
||||||
|
|
||||||
let s3Res;
|
|
||||||
try {
|
|
||||||
s3Res = await s3Client.send(new GetObjectCommand(params));
|
|
||||||
} catch (err) {
|
|
||||||
// Defensive: even with clamping, some backends still throw InvalidRange.
|
|
||||||
if (err.Code === 'InvalidRange' || err.$metadata?.httpStatusCode === 416) {
|
|
||||||
res.writeHead(416, {
|
|
||||||
'Content-Type': 'text/plain',
|
|
||||||
'Content-Length': '0',
|
|
||||||
'Content-Range': `bytes */${totalSize}`,
|
|
||||||
'Accept-Ranges': 'bytes',
|
|
||||||
'Cache-Control': 'no-store',
|
|
||||||
});
|
|
||||||
return res.end();
|
|
||||||
}
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
|
|
||||||
const status = clampedRange ? 206 : 200;
|
|
||||||
const headers = {
|
const headers = {
|
||||||
'Content-Type': 'video/mp4',
|
'Content-Type': 'video/mp4',
|
||||||
'Accept-Ranges': 'bytes',
|
'Accept-Ranges': 'bytes',
|
||||||
'Cache-Control': 'private, max-age=3600',
|
'Cache-Control': 'private, max-age=3600',
|
||||||
|
'Content-Range': `bytes ${start}-${end}/${totalSize}`,
|
||||||
|
'Content-Length': String(contentLength),
|
||||||
};
|
};
|
||||||
if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength);
|
if (etag) headers['ETag'] = etag;
|
||||||
if (s3Res.ContentRange) headers['Content-Range'] = s3Res.ContentRange;
|
if (lastModified) headers['Last-Modified'] = lastModified.toUTCString();
|
||||||
if (s3Res.ETag) headers['ETag'] = s3Res.ETag;
|
|
||||||
if (s3Res.LastModified) headers['Last-Modified'] = s3Res.LastModified.toUTCString();
|
// For small head-of-file ranges (entirely below the broken threshold)
|
||||||
res.writeHead(status, headers);
|
// a direct ranged GET works and saves the streaming-from-0 cost.
|
||||||
|
const RUSTFS_RANGE_SAFE_START = parseInt(process.env.RUSTFS_RANGE_SAFE_START || String(5_500_000), 10);
|
||||||
|
if (start < RUSTFS_RANGE_SAFE_START && end < RUSTFS_RANGE_SAFE_START) {
|
||||||
|
const s3Res = await s3Client.send(new GetObjectCommand({
|
||||||
|
Bucket: getS3Bucket(), Key: key, Range: `bytes=${start}-${end}`,
|
||||||
|
}));
|
||||||
|
res.writeHead(206, headers);
|
||||||
s3Res.Body.pipe(res);
|
s3Res.Body.pipe(res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise: stream from offset 0, drop bytes below `start`, stop at
|
||||||
|
// `end`. Browser sees a normal 206; mam-api stays memory-flat.
|
||||||
|
res.writeHead(206, headers);
|
||||||
|
try {
|
||||||
|
for await (const buf of stitchedS3Stream(key, start, end)) {
|
||||||
|
// res.write returns false when backpressure builds — pause and wait.
|
||||||
|
if (!res.write(buf)) {
|
||||||
|
await new Promise(r => res.once('drain', r));
|
||||||
|
}
|
||||||
|
if (res.destroyed) return;
|
||||||
|
}
|
||||||
|
res.end();
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`[video] stitch failed for ${key}:`, err.message);
|
||||||
|
if (!res.headersSent) {
|
||||||
|
res.writeHead(500, { 'Content-Type': 'text/plain', 'Cache-Control': 'no-store' });
|
||||||
|
res.end('Upstream storage error');
|
||||||
|
} else {
|
||||||
|
res.destroy(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (err) { next(err); }
|
} catch (err) { next(err); }
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -174,6 +174,34 @@ export const trimSegment = async (inputPath, outputPath, inPoint, outPoint) => {
|
||||||
await runFFmpeg(args);
|
await runFFmpeg(args);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Segment an existing MP4/MOV into HLS (fMP4) — init.mp4 + segment_*.m4s +
|
||||||
|
// playlist.m3u8. Keeps the original codec (no re-encode) so this is cheap to
|
||||||
|
// run after the proxy transcode. fMP4 segments stay <5 MB at our proxy
|
||||||
|
// bitrate, which sidesteps RustFS's broken byte-range path on large objects.
|
||||||
|
export const segmentToHls = async (inputPath, outputDir, options = {}) => {
|
||||||
|
const {
|
||||||
|
segmentDurationSec = 4,
|
||||||
|
playlistName = 'playlist.m3u8',
|
||||||
|
initName = 'init.mp4',
|
||||||
|
segmentPattern = 'segment_%05d.m4s',
|
||||||
|
} = options;
|
||||||
|
|
||||||
|
const args = [
|
||||||
|
'-i', inputPath,
|
||||||
|
'-c', 'copy',
|
||||||
|
'-f', 'hls',
|
||||||
|
'-hls_time', String(segmentDurationSec),
|
||||||
|
'-hls_playlist_type', 'vod',
|
||||||
|
'-hls_segment_type', 'fmp4',
|
||||||
|
'-hls_flags', 'independent_segments',
|
||||||
|
'-hls_fmp4_init_filename', initName,
|
||||||
|
'-hls_segment_filename', `${outputDir}/${segmentPattern}`,
|
||||||
|
'-y',
|
||||||
|
`${outputDir}/${playlistName}`,
|
||||||
|
];
|
||||||
|
await runFFmpeg(args);
|
||||||
|
};
|
||||||
|
|
||||||
export const concatSegments = async (segmentListFile, outputPath) => {
|
export const concatSegments = async (segmentListFile, outputPath) => {
|
||||||
const args = [
|
const args = [
|
||||||
'-f', 'concat',
|
'-f', 'concat',
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,15 @@
|
||||||
import { S3Client, GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3';
|
import { S3Client, GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3';
|
||||||
import { createReadStream, createWriteStream } from 'fs';
|
import { createReadStream, createWriteStream } from 'fs';
|
||||||
|
import { readdir } from 'fs/promises';
|
||||||
|
import { join, extname } from 'path';
|
||||||
import { pipeline } from 'stream/promises';
|
import { pipeline } from 'stream/promises';
|
||||||
|
|
||||||
|
const CONTENT_TYPES = {
|
||||||
|
'.m3u8': 'application/vnd.apple.mpegurl',
|
||||||
|
'.m4s': 'video/iso.segment',
|
||||||
|
'.mp4': 'video/mp4',
|
||||||
|
};
|
||||||
|
|
||||||
const createS3Client = () => {
|
const createS3Client = () => {
|
||||||
return new S3Client({
|
return new S3Client({
|
||||||
region: process.env.S3_REGION || 'us-east-1',
|
region: process.env.S3_REGION || 'us-east-1',
|
||||||
|
|
@ -44,6 +52,31 @@ export const uploadToS3 = async (bucket, key, localPath) => {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Upload every file in `localDir` to `bucket` under `keyPrefix/`. Used for the
|
||||||
|
// HLS proxy output (init.mp4 + segment_*.m4s + playlist.m3u8). Each file goes
|
||||||
|
// up as its own PutObject so individual segments stay small and never trigger
|
||||||
|
// RustFS's broken byte-range path on large objects.
|
||||||
|
export const uploadDirectoryToS3 = async (bucket, keyPrefix, localDir) => {
|
||||||
|
const client = createS3Client();
|
||||||
|
try {
|
||||||
|
const entries = await readdir(localDir, { withFileTypes: true });
|
||||||
|
const files = entries.filter(e => e.isFile()).map(e => e.name);
|
||||||
|
for (const name of files) {
|
||||||
|
const ext = extname(name).toLowerCase();
|
||||||
|
const ct = CONTENT_TYPES[ext] || 'application/octet-stream';
|
||||||
|
await client.send(new PutObjectCommand({
|
||||||
|
Bucket: bucket,
|
||||||
|
Key: `${keyPrefix}/${name}`,
|
||||||
|
Body: createReadStream(join(localDir, name)),
|
||||||
|
ContentType: ct,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
return files;
|
||||||
|
} finally {
|
||||||
|
client.destroy();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Multipart-aware streaming upload — used by the promotion worker to push
|
// Multipart-aware streaming upload — used by the promotion worker to push
|
||||||
// large growing-file masters without buffering them entirely in memory.
|
// large growing-file masters without buffering them entirely in memory.
|
||||||
export const uploadStreamToS3 = async (bucket, key, readable) => {
|
export const uploadStreamToS3 = async (bucket, key, readable) => {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue