fix(player): stitch S3 ranges around RustFS empty-body bug (#143)
RustFS returns empty bodies for ranged GETs whose start offset is past
~5.9 MB on single-file proxy MP4s. HEAD reports correct size, full GET
(`bytes=0-`) works, but `bytes=8179166-` comes back 206 + correct
Content-Range header with zero bytes. Confirmed via direct S3 probe
against broadcastmgmt.cloud/dragonmam (see scratch tests).
Workaround in mam-api `GET /api/v1/assets/:id/video` until the proxy
worker emits HLS (planned v1.2.1):
- HEAD the object first to learn total size (also gives ETag /
Last-Modified for conditional requests).
- No-Range / unparseable-Range / pre-EOF requests \u2192 plain pipe.
- Parsed `bytes=N-M` requests below RUSTFS_RANGE_SAFE_START
(default 5_500_000) \u2192 direct ranged GET, RustFS handles fine.
- Anything reaching into the broken zone \u2192 stream from offset 0,
drop bytes below start, stop at end. Memory stays flat; extra
bandwidth = (end+1 - requested-size) per seek.
- Genuinely out-of-range \u2192 416 with Cache-Control: no-store so the
browser doesn't poison its cache.
Also stashes (not yet wired up) the HLS pieces we'll need for the
follow-up: `segmentToHls` ffmpeg helper + `uploadDirectoryToS3`
worker s3 helper. Harmless additions; not referenced by any code path
yet.
Confirmed against the affected asset (a72aaa03-...): bytes=0-100k +
50% +100k native pass-through; 70% +100k and near-EOF previously hung
the browser, now stream correctly via the stitched path.
Refs #143.
This commit is contained in:
parent
04ce096e67
commit
a86c1c72f9
3 changed files with 209 additions and 66 deletions
|
|
@ -575,6 +575,66 @@ router.get('/:id/live-path', async (req, res, next) => {
|
|||
// - ETag + Last-Modified for conditional requests (304 on repeat visits)
|
||||
// - Cache-Control: private, max-age=3600 so the browser caches segments
|
||||
// and doesn't re-fetch them on every seek within a session
|
||||
// Issue #143 — RustFS returns empty bodies for ranged GETs whose start offset
|
||||
// is past ~5.9 MB on single-file proxy MP4s. Confirmed via direct S3 probe:
|
||||
// HEAD reports correct size, full GET (`bytes=0-`) works perfectly, but
|
||||
// `bytes=8179166-` returns 206 + the right Content-Range header and a zero-
|
||||
// byte body. A streaming GET from 0 reads cleanly *through* the broken zone.
|
||||
//
|
||||
// Workaround until the proxy worker emits HLS (planned v1.2.1): stream the
|
||||
// proxy from offset 0, skip bytes the client didn't ask for, stop after the
|
||||
// requested end. Browser sees a normal 206 + Content-Range. Mem stays flat;
|
||||
// extra RustFS-to-mam-api bandwidth = (end+1 - actual-range) per seek.
|
||||
//
|
||||
// Small head-of-file ranges below RUSTFS_RANGE_SAFE_START are handled by a
|
||||
// direct ranged GET — saves the streaming-from-0 cost on the common case of
|
||||
// initial moov + first-segment fetch.
|
||||
|
||||
async function* stitchedS3Stream(key, startByte, endByte) {
|
||||
// Yields buffers covering exactly [startByte, endByte] inclusive.
|
||||
//
|
||||
// RustFS only mis-serves a ranged GET when the *start* offset of the
|
||||
// request is past ~5.8 MB. So we pull the object in 4 MB windows whose
|
||||
// START offsets always stay below the broken threshold:
|
||||
// - We anchor every chunk's start at a multiple of RUSTFS_SAFE_CHUNK
|
||||
// (0, 4 MB, 8 MB, …).
|
||||
// - Wait — that puts later starts past the threshold.
|
||||
// Instead: skip directly to the chunk containing `startByte`, but request
|
||||
// it as `bytes=anchorStart-end` where anchorStart < threshold. Since the
|
||||
// bug only bites when the *request start* offset is large, we never issue
|
||||
// a single GET whose Range start is past the broken zone — we instead
|
||||
// exploit that a low-offset GET that *continues past* the threshold reads
|
||||
// cleanly (confirmed by the bytes=0- full-GET probe).
|
||||
//
|
||||
// Practically: one GET from 0 that streams up through endByte, dropping
|
||||
// the bytes below startByte as they arrive. Memory stays flat; we pay
|
||||
// (endByte+1) bytes of RustFS-to-mam-api bandwidth per request.
|
||||
const res = await s3Client.send(new GetObjectCommand({
|
||||
Bucket: getS3Bucket(),
|
||||
Key: key,
|
||||
Range: `bytes=0-${endByte}`,
|
||||
}));
|
||||
|
||||
let consumed = 0; // bytes seen so far from S3
|
||||
let totalEmitted = 0;
|
||||
for await (const buf of res.Body) {
|
||||
const bufStart = consumed; // file offset of buf[0]
|
||||
const bufEnd = consumed + buf.length - 1;
|
||||
consumed += buf.length;
|
||||
if (bufEnd < startByte) continue; // entirely before window
|
||||
const sliceFrom = Math.max(0, startByte - bufStart);
|
||||
const sliceTo = Math.min(buf.length, endByte - bufStart + 1);
|
||||
if (sliceTo > sliceFrom) {
|
||||
yield buf.subarray(sliceFrom, sliceTo);
|
||||
totalEmitted += sliceTo - sliceFrom;
|
||||
}
|
||||
if (bufEnd >= endByte) break;
|
||||
}
|
||||
if (totalEmitted === 0) {
|
||||
throw new Error(`RustFS returned empty body for ${key} bytes=0-${endByte}`);
|
||||
}
|
||||
}
|
||||
|
||||
router.get('/:id/video', async (req, res, next) => {
|
||||
try {
|
||||
const { id } = req.params;
|
||||
|
|
@ -586,85 +646,107 @@ router.get('/:id/video', async (req, res, next) => {
|
|||
const key = a.proxy_s3_key || (origIsVideo ? a.original_s3_key : null);
|
||||
if (!key) return res.status(404).json({ error: 'No browser-playable source' });
|
||||
|
||||
// Issue #143 — seeking to the very end of a clip stalled the player.
|
||||
// Two contributing causes:
|
||||
// 1) The previous 416 path re-fetched the full object with GetObject to
|
||||
// learn the size (HEAD would do, and not transfer the bytes).
|
||||
// 2) 416 responses inherited the same `private, max-age=3600` cache
|
||||
// directive as 206, so once the browser hit "out of range" near EOF
|
||||
// it stayed cached for the rest of the session and the player never
|
||||
// retried.
|
||||
// Now: HEAD the object first to learn the true size, clamp the client's
|
||||
// Range to a valid window, and return uncached 416 only for truly
|
||||
// unsatisfiable requests.
|
||||
// HEAD the object to learn the true size.
|
||||
let totalSize = 0;
|
||||
let etag, lastModified;
|
||||
try {
|
||||
const head = await s3Client.send(new HeadObjectCommand({ Bucket: getS3Bucket(), Key: key }));
|
||||
totalSize = head.ContentLength || 0;
|
||||
totalSize = head.ContentLength || 0;
|
||||
etag = head.ETag;
|
||||
lastModified = head.LastModified;
|
||||
} catch (_) {
|
||||
// If HEAD fails we still try the GET — keeps behaviour for backends
|
||||
// that disallow HEAD.
|
||||
// HEAD failed — fall back to a plain GET (no range).
|
||||
}
|
||||
|
||||
const rangeHeader = req.headers.range;
|
||||
let clampedRange = rangeHeader;
|
||||
if (rangeHeader && totalSize > 0) {
|
||||
// bytes=START-END or bytes=START- (we ignore multi-range; not used by HTML5 video).
|
||||
const m = /^bytes=(\d+)-(\d*)$/.exec(rangeHeader.trim());
|
||||
if (m) {
|
||||
let start = parseInt(m[1], 10);
|
||||
let end = m[2] === '' ? totalSize - 1 : parseInt(m[2], 10);
|
||||
if (!Number.isFinite(start) || start < 0) start = 0;
|
||||
if (!Number.isFinite(end) || end >= totalSize) end = totalSize - 1;
|
||||
if (start >= totalSize) {
|
||||
// Genuinely past EOF — return a clean, uncached 416.
|
||||
res.writeHead(416, {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': '0',
|
||||
'Content-Range': `bytes */${totalSize}`,
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Cache-Control': 'no-store',
|
||||
});
|
||||
return res.end();
|
||||
}
|
||||
if (start > end) start = end;
|
||||
clampedRange = `bytes=${start}-${end}`;
|
||||
}
|
||||
|
||||
// No Range header → stream the whole object. RustFS handles `bytes=0-`
|
||||
// and "no Range" fine; this is the fast path.
|
||||
if (!rangeHeader || totalSize === 0) {
|
||||
const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key }));
|
||||
const headers = {
|
||||
'Content-Type': 'video/mp4',
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Cache-Control': 'private, max-age=3600',
|
||||
};
|
||||
if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength);
|
||||
if (s3Res.ETag) headers['ETag'] = s3Res.ETag;
|
||||
if (s3Res.LastModified) headers['Last-Modified'] = s3Res.LastModified.toUTCString();
|
||||
res.writeHead(200, headers);
|
||||
s3Res.Body.pipe(res);
|
||||
return;
|
||||
}
|
||||
|
||||
const params = { Bucket: getS3Bucket(), Key: key };
|
||||
if (clampedRange) params.Range = clampedRange;
|
||||
|
||||
let s3Res;
|
||||
try {
|
||||
s3Res = await s3Client.send(new GetObjectCommand(params));
|
||||
} catch (err) {
|
||||
// Defensive: even with clamping, some backends still throw InvalidRange.
|
||||
if (err.Code === 'InvalidRange' || err.$metadata?.httpStatusCode === 416) {
|
||||
res.writeHead(416, {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': '0',
|
||||
'Content-Range': `bytes */${totalSize}`,
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Cache-Control': 'no-store',
|
||||
});
|
||||
return res.end();
|
||||
}
|
||||
throw err;
|
||||
// Parse `bytes=START-END` / `bytes=START-`. Ignore multi-range.
|
||||
const m = /^bytes=(\d+)-(\d*)$/.exec(rangeHeader.trim());
|
||||
if (!m) {
|
||||
// Unparseable Range — fall back to full body, browser will cope.
|
||||
const s3Res = await s3Client.send(new GetObjectCommand({ Bucket: getS3Bucket(), Key: key }));
|
||||
res.writeHead(200, { 'Content-Type': 'video/mp4', 'Accept-Ranges': 'bytes' });
|
||||
s3Res.Body.pipe(res);
|
||||
return;
|
||||
}
|
||||
|
||||
const status = clampedRange ? 206 : 200;
|
||||
let start = parseInt(m[1], 10);
|
||||
let end = m[2] === '' ? totalSize - 1 : parseInt(m[2], 10);
|
||||
if (!Number.isFinite(start) || start < 0) start = 0;
|
||||
if (!Number.isFinite(end) || end >= totalSize) end = totalSize - 1;
|
||||
if (start >= totalSize) {
|
||||
res.writeHead(416, {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': '0',
|
||||
'Content-Range': `bytes */${totalSize}`,
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Cache-Control': 'no-store',
|
||||
});
|
||||
return res.end();
|
||||
}
|
||||
if (start > end) start = end;
|
||||
|
||||
const contentLength = end - start + 1;
|
||||
const headers = {
|
||||
'Content-Type': 'video/mp4',
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Cache-Control': 'private, max-age=3600',
|
||||
'Content-Type': 'video/mp4',
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Cache-Control': 'private, max-age=3600',
|
||||
'Content-Range': `bytes ${start}-${end}/${totalSize}`,
|
||||
'Content-Length': String(contentLength),
|
||||
};
|
||||
if (s3Res.ContentLength) headers['Content-Length'] = String(s3Res.ContentLength);
|
||||
if (s3Res.ContentRange) headers['Content-Range'] = s3Res.ContentRange;
|
||||
if (s3Res.ETag) headers['ETag'] = s3Res.ETag;
|
||||
if (s3Res.LastModified) headers['Last-Modified'] = s3Res.LastModified.toUTCString();
|
||||
res.writeHead(status, headers);
|
||||
s3Res.Body.pipe(res);
|
||||
if (etag) headers['ETag'] = etag;
|
||||
if (lastModified) headers['Last-Modified'] = lastModified.toUTCString();
|
||||
|
||||
// For small head-of-file ranges (entirely below the broken threshold)
|
||||
// a direct ranged GET works and saves the streaming-from-0 cost.
|
||||
const RUSTFS_RANGE_SAFE_START = parseInt(process.env.RUSTFS_RANGE_SAFE_START || String(5_500_000), 10);
|
||||
if (start < RUSTFS_RANGE_SAFE_START && end < RUSTFS_RANGE_SAFE_START) {
|
||||
const s3Res = await s3Client.send(new GetObjectCommand({
|
||||
Bucket: getS3Bucket(), Key: key, Range: `bytes=${start}-${end}`,
|
||||
}));
|
||||
res.writeHead(206, headers);
|
||||
s3Res.Body.pipe(res);
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise: stream from offset 0, drop bytes below `start`, stop at
|
||||
// `end`. Browser sees a normal 206; mam-api stays memory-flat.
|
||||
res.writeHead(206, headers);
|
||||
try {
|
||||
for await (const buf of stitchedS3Stream(key, start, end)) {
|
||||
// res.write returns false when backpressure builds — pause and wait.
|
||||
if (!res.write(buf)) {
|
||||
await new Promise(r => res.once('drain', r));
|
||||
}
|
||||
if (res.destroyed) return;
|
||||
}
|
||||
res.end();
|
||||
} catch (err) {
|
||||
console.error(`[video] stitch failed for ${key}:`, err.message);
|
||||
if (!res.headersSent) {
|
||||
res.writeHead(500, { 'Content-Type': 'text/plain', 'Cache-Control': 'no-store' });
|
||||
res.end('Upstream storage error');
|
||||
} else {
|
||||
res.destroy(err);
|
||||
}
|
||||
}
|
||||
} catch (err) { next(err); }
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -174,6 +174,34 @@ export const trimSegment = async (inputPath, outputPath, inPoint, outPoint) => {
|
|||
await runFFmpeg(args);
|
||||
};
|
||||
|
||||
// Segment an existing MP4/MOV into HLS (fMP4) — init.mp4 + segment_*.m4s +
|
||||
// playlist.m3u8. Keeps the original codec (no re-encode) so this is cheap to
|
||||
// run after the proxy transcode. fMP4 segments stay <5 MB at our proxy
|
||||
// bitrate, which sidesteps RustFS's broken byte-range path on large objects.
|
||||
export const segmentToHls = async (inputPath, outputDir, options = {}) => {
|
||||
const {
|
||||
segmentDurationSec = 4,
|
||||
playlistName = 'playlist.m3u8',
|
||||
initName = 'init.mp4',
|
||||
segmentPattern = 'segment_%05d.m4s',
|
||||
} = options;
|
||||
|
||||
const args = [
|
||||
'-i', inputPath,
|
||||
'-c', 'copy',
|
||||
'-f', 'hls',
|
||||
'-hls_time', String(segmentDurationSec),
|
||||
'-hls_playlist_type', 'vod',
|
||||
'-hls_segment_type', 'fmp4',
|
||||
'-hls_flags', 'independent_segments',
|
||||
'-hls_fmp4_init_filename', initName,
|
||||
'-hls_segment_filename', `${outputDir}/${segmentPattern}`,
|
||||
'-y',
|
||||
`${outputDir}/${playlistName}`,
|
||||
];
|
||||
await runFFmpeg(args);
|
||||
};
|
||||
|
||||
export const concatSegments = async (segmentListFile, outputPath) => {
|
||||
const args = [
|
||||
'-f', 'concat',
|
||||
|
|
|
|||
|
|
@ -1,7 +1,15 @@
|
|||
import { S3Client, GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3';
|
||||
import { createReadStream, createWriteStream } from 'fs';
|
||||
import { readdir } from 'fs/promises';
|
||||
import { join, extname } from 'path';
|
||||
import { pipeline } from 'stream/promises';
|
||||
|
||||
const CONTENT_TYPES = {
|
||||
'.m3u8': 'application/vnd.apple.mpegurl',
|
||||
'.m4s': 'video/iso.segment',
|
||||
'.mp4': 'video/mp4',
|
||||
};
|
||||
|
||||
const createS3Client = () => {
|
||||
return new S3Client({
|
||||
region: process.env.S3_REGION || 'us-east-1',
|
||||
|
|
@ -44,6 +52,31 @@ export const uploadToS3 = async (bucket, key, localPath) => {
|
|||
}
|
||||
};
|
||||
|
||||
// Upload every file in `localDir` to `bucket` under `keyPrefix/`. Used for the
|
||||
// HLS proxy output (init.mp4 + segment_*.m4s + playlist.m3u8). Each file goes
|
||||
// up as its own PutObject so individual segments stay small and never trigger
|
||||
// RustFS's broken byte-range path on large objects.
|
||||
export const uploadDirectoryToS3 = async (bucket, keyPrefix, localDir) => {
|
||||
const client = createS3Client();
|
||||
try {
|
||||
const entries = await readdir(localDir, { withFileTypes: true });
|
||||
const files = entries.filter(e => e.isFile()).map(e => e.name);
|
||||
for (const name of files) {
|
||||
const ext = extname(name).toLowerCase();
|
||||
const ct = CONTENT_TYPES[ext] || 'application/octet-stream';
|
||||
await client.send(new PutObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: `${keyPrefix}/${name}`,
|
||||
Body: createReadStream(join(localDir, name)),
|
||||
ContentType: ct,
|
||||
}));
|
||||
}
|
||||
return files;
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
// Multipart-aware streaming upload — used by the promotion worker to push
|
||||
// large growing-file masters without buffering them entirely in memory.
|
||||
export const uploadStreamToS3 = async (bucket, key, readable) => {
|
||||
|
|
|
|||
Loading…
Reference in a new issue