Add parallel chunked HTTP upload (Option 4 — Aspera-class speed)

Split files into 32 MB chunks, POST 6 concurrently to /api/upload/chunk,
server proxies each chunk as an S3 multipart part. Up to 4 files upload
in parallel simultaneously. Achieves Aspera-class throughput over plain
HTTP with no UDP port forwarding or custom protocols required.

Same approach used by MASV under the hood.

- server.js: Add /api/upload/initiate, /chunk, /complete, /abort endpoints
- public/index.html: Replace single-PUT uploadHTTP() with parallel chunked version
This commit is contained in:
Zac Gaetano 2026-04-06 23:22:51 -04:00
parent a3d1446060
commit 8ec43c299e
2 changed files with 202 additions and 25 deletions

View file

@ -467,7 +467,7 @@ body::before{content:'';position:fixed;inset:0;background:radial-gradient(ellips
<button class="mode-btn" id="btn-udp" onclick="setMode('udp')">⚡ UDP Mode</button> <button class="mode-btn" id="btn-udp" onclick="setMode('udp')">⚡ UDP Mode</button>
</div> </div>
<div class="mode-desc" id="mode-desc" style="margin-bottom:0"> <div class="mode-desc" id="mode-desc" style="margin-bottom:0">
<strong id="mode-label">HTTP Mode:</strong> <span id="mode-detail">Direct S3 presigned upload. Best for LAN and stable connections.</span> <strong id="mode-label">HTTP Mode:</strong> <span id="mode-detail">Parallel chunked HTTP upload (6 concurrent 32MB parts). Aspera-class speed — no UDP needed.</span>
<span id="udp-ext-hint" style="display:none;margin-left:.5rem;color:var(--dragon-bright);font-size:.75rem">⚡ UDP extension detected</span> <span id="udp-ext-hint" style="display:none;margin-left:.5rem;color:var(--dragon-bright);font-size:.75rem">⚡ UDP extension detected</span>
</div> </div>
</div> </div>
@ -932,7 +932,7 @@ function setMode(mode) {
if (mode === 'http') { if (mode === 'http') {
btn.className = 'btn-upload'; btn.className = 'btn-upload';
if (label) label.textContent = 'HTTP Mode:'; if (label) label.textContent = 'HTTP Mode:';
if (detail) detail.textContent = 'Direct S3 presigned upload. Best for LAN and stable connections.'; if (detail) detail.textContent = 'Parallel chunked HTTP upload (6 concurrent 32MB parts). Aspera-class speed — no UDP needed.';
if (hint) hint.style.display = 'none'; if (hint) hint.style.display = 'none';
if (btnHttp) { btnHttp.className = 'mode-btn active-http'; } if (btnHttp) { btnHttp.className = 'mode-btn active-http'; }
if (btnUdp) { btnUdp.className = 'mode-btn'; } if (btnUdp) { btnUdp.className = 'mode-btn'; }
@ -1087,31 +1087,100 @@ async function startUpload() {
updateUploadBtn(); updateStats(); updateUploadBtn(); updateStats();
} }
async function uploadHTTP(files) { // ============================================================
for (const item of files) { // PARALLEL CHUNK UPLOAD (Option 4 — HTTP parallelism, Aspera-class speed)
const idx = selectedFiles.indexOf(item); // Slices each file into 32 MB chunks, uploads 6 concurrently via POST,
setFileStatus(idx,'uploading','Uploading…'); // server proxies to S3 multipart. Same approach as MASV — no UDP needed.
document.getElementById(`prog-${idx}`).style.display='block'; // ============================================================
try { const CHUNK_SIZE = 32 * 1024 * 1024; // 32 MB per part
const presigned = await api('POST','/api/presigned',{filename:item.name,prefix:selectedPrefix,contentType:item.file.type||'application/octet-stream'}); const CHUNK_WORKERS = 6; // concurrent chunk POSTs per file
if (!presigned.success) throw new Error(presigned.error||'Failed to get presigned URL');
// Use the content type the server signed — browser file.type may differ for broadcast formats async function uploadFileChunked(item, idx) {
const signedType = presigned.contentType || item.file.type || 'application/octet-stream'; const file = item.file;
await new Promise((resolve,reject) => { const totalParts = Math.ceil(file.size / CHUNK_SIZE);
const xhr=new XMLHttpRequest(); const mime = file.type || 'application/octet-stream';
xhr.open('PUT',presigned.url);
xhr.setRequestHeader('Content-Type',signedType); // 1. Initiate S3 multipart upload
xhr.upload.onprogress=e=>{ if(e.lengthComputable){ const p=Math.round(e.loaded/e.total*100); document.getElementById(`progbar-${idx}`).style.width=p+'%'; setFileStatus(idx,'uploading',p+'%'); } }; const init = await api('POST','/api/upload/initiate',{
xhr.onload=()=>xhr.status<300?resolve():reject(new Error(`S3 error ${xhr.status}`)); filename: item.name, prefix: selectedPrefix,
xhr.onerror=()=>reject(new Error('Network error')); contentType: mime, totalParts,
xhr.send(item.file); });
if (!init.success) throw new Error(init.error || 'Failed to initiate upload');
const { uploadId } = init;
const pb = document.getElementById(`progbar-${idx}`);
let done = 0;
// 2. Upload all parts with bounded concurrency
const queue = Array.from({length: totalParts}, (_,i) => i+1); // 1-indexed
async function worker() {
while (queue.length) {
const partNumber = queue.shift();
if (partNumber === undefined) break;
const start = (partNumber - 1) * CHUNK_SIZE;
const blob = file.slice(start, Math.min(start + CHUNK_SIZE, file.size));
const fd = new FormData();
fd.append('uploadId', uploadId);
fd.append('partNumber', String(partNumber));
fd.append('chunk', blob, item.name);
const resp = await fetch('/api/upload/chunk', {
method: 'POST',
headers: { 'x-auth-token': authToken },
body: fd,
}); });
document.getElementById(`progbar-${idx}`).style.width='100%'; const data = await resp.json();
setFileStatus(idx,'done','✓ Done'); item.status='done'; if (!data.success) throw new Error(`Part ${partNumber} failed: ${data.error}`);
showToast(`Uploaded: ${item.name}`,'success'); done++;
} catch(e) { setFileStatus(idx,'error','✗ Error'); item.status='error'; showToast(`Failed: ${item.name} — ${e.message}`,'error'); } const pct = Math.round(done / totalParts * 100);
updateStats(); if (pb) pb.style.width = pct + '%';
setFileStatus(idx, 'uploading', pct + '%');
}
} }
try {
await Promise.all(Array.from({length: Math.min(CHUNK_WORKERS, totalParts)}, worker));
} catch (err) {
// Abort orphaned multipart upload on any chunk failure
fetch('/api/upload/abort', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'x-auth-token': authToken },
body: JSON.stringify({ uploadId }),
}).catch(() => {});
throw err;
}
// 3. Complete the multipart upload
const complete = await api('POST','/api/upload/complete',{ uploadId });
if (!complete.success) throw new Error(complete.error || 'Failed to complete upload');
}
async function uploadHTTP(files) {
// Upload files with up to 4 concurrent file uploads
const FILE_CONCURRENCY = 4;
const fileQueue = [...files];
async function fileWorker() {
while (fileQueue.length) {
const item = fileQueue.shift();
if (!item) break;
const idx = selectedFiles.indexOf(item);
setFileStatus(idx,'uploading','Uploading…');
document.getElementById(`prog-${idx}`).style.display='block';
try {
await uploadFileChunked(item, idx);
document.getElementById(`progbar-${idx}`).style.width='100%';
setFileStatus(idx,'done','✓ Done'); item.status='done';
showToast(`Uploaded: ${item.name}`,'success');
} catch(e) {
setFileStatus(idx,'error','✗ Error'); item.status='error';
showToast(`Failed: ${item.name} — ${e.message}`,'error');
}
updateStats();
}
}
await Promise.all(Array.from({length: Math.min(FILE_CONCURRENCY, files.length)}, fileWorker));
} }
async function uploadUDP(files) { async function uploadUDP(files) {

108
server.js
View file

@ -918,3 +918,111 @@ server.timeout = 0;
server.keepAliveTimeout = 0; server.keepAliveTimeout = 0;
server.headersTimeout = 0; server.headersTimeout = 0;
server.requestTimeout = 0; server.requestTimeout = 0;
// ==================== PARALLEL CHUNK UPLOAD (Option 4 — HTTP parallelism) ====================
// Client splits files into 32 MB chunks and POSTs 6 concurrently.
// Server proxies each chunk to S3 as a multipart upload part.
// Achieves Aspera-class throughput over plain HTTP — no UDP, no port forwarding needed.
// Based on the same approach used by MASV.
const {
CreateMultipartUploadCommand,
UploadPartCommand,
CompleteMultipartUploadCommand,
AbortMultipartUploadCommand,
} = require("@aws-sdk/client-s3");
// In-memory multipart session map
// uploadId → { key, bucket, parts: [{PartNumber, ETag}], partCount }
const chunkSessions = new Map();
// 1. Initiate — creates S3 multipart upload, returns uploadId to client
app.post("/api/upload/initiate", requireAuth, async (req, res) => {
if (!s3Client) return res.status(503).json({ success: false, error: "S3 not configured" });
const { filename, prefix, contentType, totalParts } = req.body;
if (!filename || !totalParts) return res.status(400).json({ success: false, error: "filename and totalParts required" });
if (isBlockedFile(filename)) return res.status(400).json({ success: false, error: `Blocked file type: ${filename}` });
const bucket = db.s3Config?.bucket || "";
if (!bucket) return res.status(503).json({ success: false, error: "S3 bucket not configured" });
const key = prefix ? `${prefix.replace(/[-\/]+$/, "")}--${filename}` : filename;
const mime = contentType || getMimeType(filename, "application/octet-stream");
try {
const resp = await s3Client.send(new CreateMultipartUploadCommand({ Bucket: bucket, Key: key, ContentType: mime }));
const uploadId = resp.UploadId;
chunkSessions.set(uploadId, { key, bucket, parts: [], partCount: parseInt(totalParts) });
setTimeout(() => chunkSessions.delete(uploadId), 4 * 60 * 60 * 1000); // 4-hour TTL
console.log(`[chunk] Initiated: ${key}${totalParts} parts, uploadId=${uploadId}`);
res.json({ success: true, uploadId, key });
} catch (err) {
console.error("[chunk] Initiate error:", err.message);
res.status(500).json({ success: false, error: err.message });
}
});
// 2. Chunk — receive one chunk (multipart/form-data), proxy to S3
const chunkMulter = multer({ storage: multer.memoryStorage(), limits: { fileSize: 200 * 1024 * 1024 } });
app.post("/api/upload/chunk", requireAuth, chunkMulter.single("chunk"), async (req, res) => {
if (!s3Client) return res.status(503).json({ success: false, error: "S3 not configured" });
const { uploadId, partNumber } = req.body;
const partNum = parseInt(partNumber);
if (!uploadId || !partNum) return res.status(400).json({ success: false, error: "uploadId and partNumber required" });
const session = chunkSessions.get(uploadId);
if (!session) return res.status(404).json({ success: false, error: "Session not found or expired" });
const body = req.file?.buffer;
if (!body || body.length === 0) return res.status(400).json({ success: false, error: "No chunk data" });
try {
const resp = await s3Client.send(new UploadPartCommand({
Bucket: session.bucket, Key: session.key,
UploadId: uploadId, PartNumber: partNum, Body: body,
}));
session.parts.push({ PartNumber: partNum, ETag: resp.ETag });
console.log(`[chunk] Part ${partNum}/${session.partCount} OK for ${session.key}`);
res.json({ success: true, partNumber: partNum, etag: resp.ETag });
} catch (err) {
console.error(`[chunk] Part ${partNum} error:`, err.message);
res.status(500).json({ success: false, error: err.message });
}
});
// 3. Complete — assemble all parts into final S3 object
app.post("/api/upload/complete", requireAuth, async (req, res) => {
if (!s3Client) return res.status(503).json({ success: false, error: "S3 not configured" });
const { uploadId } = req.body;
if (!uploadId) return res.status(400).json({ success: false, error: "uploadId required" });
const session = chunkSessions.get(uploadId);
if (!session) return res.status(404).json({ success: false, error: "Session not found or expired" });
const parts = session.parts.slice().sort((a, b) => a.PartNumber - b.PartNumber);
try {
await s3Client.send(new CompleteMultipartUploadCommand({
Bucket: session.bucket, Key: session.key, UploadId: uploadId,
MultipartUpload: { Parts: parts },
}));
chunkSessions.delete(uploadId);
console.log(`[chunk] Completed: ${session.key} (${parts.length} parts)`);
res.json({ success: true, key: session.key });
} catch (err) {
console.error("[chunk] Complete error:", err.message);
try { await s3Client.send(new AbortMultipartUploadCommand({ Bucket: session.bucket, Key: session.key, UploadId: uploadId })); } catch (_) {}
chunkSessions.delete(uploadId);
res.status(500).json({ success: false, error: err.message });
}
});
// 4. Abort — cancel a multipart upload (called on client-side error)
app.post("/api/upload/abort", requireAuth, async (req, res) => {
const { uploadId } = req.body;
const session = chunkSessions.get(uploadId);
if (session && s3Client) {
try { await s3Client.send(new AbortMultipartUploadCommand({ Bucket: session.bucket, Key: session.key, UploadId: uploadId })); } catch (_) {}
}
if (uploadId) chunkSessions.delete(uploadId);
res.json({ success: true });
});