diff --git a/public/index.html b/public/index.html index f7a970a..555e4dc 100644 --- a/public/index.html +++ b/public/index.html @@ -467,7 +467,7 @@ body::before{content:'';position:fixed;inset:0;background:radial-gradient(ellips
- HTTP Mode: Direct S3 presigned upload. Best for LAN and stable connections. + HTTP Mode: Parallel chunked HTTP upload (6 concurrent 32 MB parts). Aspera-class speed — no UDP needed.
@@ -932,7 +932,7 @@ function setMode(mode) { if (mode === 'http') { btn.className = 'btn-upload'; if (label) label.textContent = 'HTTP Mode:'; - if (detail) detail.textContent = 'Direct S3 presigned upload. Best for LAN and stable connections.'; + if (detail) detail.textContent = 'Parallel chunked HTTP upload (6 concurrent 32 MB parts). Aspera-class speed — no UDP needed.'; if (hint) hint.style.display = 'none'; if (btnHttp) { btnHttp.className = 'mode-btn active-http'; } if (btnUdp) { btnUdp.className = 'mode-btn'; } @@ -1087,31 +1087,100 @@ async function startUpload() { updateUploadBtn(); updateStats(); } -async function uploadHTTP(files) { - for (const item of files) { - const idx = selectedFiles.indexOf(item); - setFileStatus(idx,'uploading','Uploading…'); - document.getElementById(`prog-${idx}`).style.display='block'; - try { - const presigned = await api('POST','/api/presigned',{filename:item.name,prefix:selectedPrefix,contentType:item.file.type||'application/octet-stream'}); - if (!presigned.success) throw new Error(presigned.error||'Failed to get presigned URL'); - // Use the content type the server signed — browser file.type may differ for broadcast formats - const signedType = presigned.contentType || item.file.type || 'application/octet-stream'; - await new Promise((resolve,reject) => { - const xhr=new XMLHttpRequest(); - xhr.open('PUT',presigned.url); - xhr.setRequestHeader('Content-Type',signedType); - xhr.upload.onprogress=e=>{ if(e.lengthComputable){ const p=Math.round(e.loaded/e.total*100); document.getElementById(`progbar-${idx}`).style.width=p+'%'; setFileStatus(idx,'uploading',p+'%'); } }; - xhr.onload=()=>xhr.status<300?resolve():reject(new Error(`S3 error ${xhr.status}`)); - xhr.onerror=()=>reject(new Error('Network error')); - xhr.send(item.file); +// ============================================================ +// PARALLEL CHUNK UPLOAD (Option 4 — HTTP parallelism, Aspera-class speed) +// Slices each file into 32 MB chunks, uploads 6 concurrently via POST, +// server proxies to S3 multipart. Same approach as MASV — no UDP needed. +// ============================================================ +const CHUNK_SIZE = 32 * 1024 * 1024; // 32 MB per part +const CHUNK_WORKERS = 6; // concurrent chunk POSTs per file + +async function uploadFileChunked(item, idx) { + const file = item.file; + const totalParts = Math.ceil(file.size / CHUNK_SIZE); + const mime = file.type || 'application/octet-stream'; + + // 1. Initiate S3 multipart upload + const init = await api('POST','/api/upload/initiate',{ + filename: item.name, prefix: selectedPrefix, + contentType: mime, totalParts, + }); + if (!init.success) throw new Error(init.error || 'Failed to initiate upload'); + const { uploadId } = init; + + const pb = document.getElementById(`progbar-${idx}`); + let done = 0; + + // 2. Upload all parts with bounded concurrency + const queue = Array.from({length: totalParts}, (_,i) => i+1); // 1-indexed + + async function worker() { + while (queue.length) { + const partNumber = queue.shift(); + if (partNumber === undefined) break; + const start = (partNumber - 1) * CHUNK_SIZE; + const blob = file.slice(start, Math.min(start + CHUNK_SIZE, file.size)); + const fd = new FormData(); + fd.append('uploadId', uploadId); + fd.append('partNumber', String(partNumber)); + fd.append('chunk', blob, item.name); + const resp = await fetch('/api/upload/chunk', { + method: 'POST', + headers: { 'x-auth-token': authToken }, + body: fd, }); - document.getElementById(`progbar-${idx}`).style.width='100%'; - setFileStatus(idx,'done','✓ Done'); item.status='done'; - showToast(`Uploaded: ${item.name}`,'success'); - } catch(e) { setFileStatus(idx,'error','✗ Error'); item.status='error'; showToast(`Failed: ${item.name} — ${e.message}`,'error'); } - updateStats(); + const data = await resp.json(); + if (!data.success) throw new Error(`Part ${partNumber} failed: ${data.error}`); + done++; + const pct = Math.round(done / totalParts * 100); + if (pb) pb.style.width = pct + '%'; + setFileStatus(idx, 'uploading', pct + '%'); + } } + + try { + await Promise.all(Array.from({length: Math.min(CHUNK_WORKERS, totalParts)}, worker)); + } catch (err) { + // Abort orphaned multipart upload on any chunk failure + fetch('/api/upload/abort', { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'x-auth-token': authToken }, + body: JSON.stringify({ uploadId }), + }).catch(() => {}); + throw err; + } + + // 3. Complete the multipart upload + const complete = await api('POST','/api/upload/complete',{ uploadId }); + if (!complete.success) throw new Error(complete.error || 'Failed to complete upload'); +} + +async function uploadHTTP(files) { + // Upload files with up to 4 concurrent file uploads + const FILE_CONCURRENCY = 4; + const fileQueue = [...files]; + + async function fileWorker() { + while (fileQueue.length) { + const item = fileQueue.shift(); + if (!item) break; + const idx = selectedFiles.indexOf(item); + setFileStatus(idx,'uploading','Uploading…'); + document.getElementById(`prog-${idx}`).style.display='block'; + try { + await uploadFileChunked(item, idx); + document.getElementById(`progbar-${idx}`).style.width='100%'; + setFileStatus(idx,'done','✓ Done'); item.status='done'; + showToast(`Uploaded: ${item.name}`,'success'); + } catch(e) { + setFileStatus(idx,'error','✗ Error'); item.status='error'; + showToast(`Failed: ${item.name} — ${e.message}`,'error'); + } + updateStats(); + } + } + + await Promise.all(Array.from({length: Math.min(FILE_CONCURRENCY, files.length)}, fileWorker)); } async function uploadUDP(files) { diff --git a/server.js b/server.js index e8940c6..646d390 100644 --- a/server.js +++ b/server.js @@ -918,3 +918,111 @@ server.timeout = 0; server.keepAliveTimeout = 0; server.headersTimeout = 0; server.requestTimeout = 0; + +// ==================== PARALLEL CHUNK UPLOAD (Option 4 — HTTP parallelism) ==================== +// Client splits files into 32 MB chunks and POSTs 6 concurrently. +// Server proxies each chunk to S3 as a multipart upload part. +// Achieves Aspera-class throughput over plain HTTP — no UDP, no port forwarding needed. +// Based on the same approach used by MASV. + +const { + CreateMultipartUploadCommand, + UploadPartCommand, + CompleteMultipartUploadCommand, + AbortMultipartUploadCommand, +} = require("@aws-sdk/client-s3"); + +// In-memory multipart session map +// uploadId → { key, bucket, parts: [{PartNumber, ETag}], partCount } +const chunkSessions = new Map(); + +// 1. Initiate — creates S3 multipart upload, returns uploadId to client +app.post("/api/upload/initiate", requireAuth, async (req, res) => { + if (!s3Client) return res.status(503).json({ success: false, error: "S3 not configured" }); + const { filename, prefix, contentType, totalParts } = req.body; + if (!filename || !totalParts) return res.status(400).json({ success: false, error: "filename and totalParts required" }); + if (isBlockedFile(filename)) return res.status(400).json({ success: false, error: `Blocked file type: ${filename}` }); + + const bucket = db.s3Config?.bucket || ""; + if (!bucket) return res.status(503).json({ success: false, error: "S3 bucket not configured" }); + + const key = prefix ? `${prefix.replace(/[-\/]+$/, "")}--${filename}` : filename; + const mime = contentType || getMimeType(filename, "application/octet-stream"); + + try { + const resp = await s3Client.send(new CreateMultipartUploadCommand({ Bucket: bucket, Key: key, ContentType: mime })); + const uploadId = resp.UploadId; + chunkSessions.set(uploadId, { key, bucket, parts: [], partCount: parseInt(totalParts) }); + setTimeout(() => chunkSessions.delete(uploadId), 4 * 60 * 60 * 1000); // 4-hour TTL + console.log(`[chunk] Initiated: ${key} — ${totalParts} parts, uploadId=${uploadId}`); + res.json({ success: true, uploadId, key }); + } catch (err) { + console.error("[chunk] Initiate error:", err.message); + res.status(500).json({ success: false, error: err.message }); + } +}); + +// 2. Chunk — receive one chunk (multipart/form-data), proxy to S3 +const chunkMulter = multer({ storage: multer.memoryStorage(), limits: { fileSize: 200 * 1024 * 1024 } }); +app.post("/api/upload/chunk", requireAuth, chunkMulter.single("chunk"), async (req, res) => { + if (!s3Client) return res.status(503).json({ success: false, error: "S3 not configured" }); + const { uploadId, partNumber } = req.body; + const partNum = parseInt(partNumber); + if (!uploadId || !partNum) return res.status(400).json({ success: false, error: "uploadId and partNumber required" }); + + const session = chunkSessions.get(uploadId); + if (!session) return res.status(404).json({ success: false, error: "Session not found or expired" }); + + const body = req.file?.buffer; + if (!body || body.length === 0) return res.status(400).json({ success: false, error: "No chunk data" }); + + try { + const resp = await s3Client.send(new UploadPartCommand({ + Bucket: session.bucket, Key: session.key, + UploadId: uploadId, PartNumber: partNum, Body: body, + })); + session.parts.push({ PartNumber: partNum, ETag: resp.ETag }); + console.log(`[chunk] Part ${partNum}/${session.partCount} OK for ${session.key}`); + res.json({ success: true, partNumber: partNum, etag: resp.ETag }); + } catch (err) { + console.error(`[chunk] Part ${partNum} error:`, err.message); + res.status(500).json({ success: false, error: err.message }); + } +}); + +// 3. Complete — assemble all parts into final S3 object +app.post("/api/upload/complete", requireAuth, async (req, res) => { + if (!s3Client) return res.status(503).json({ success: false, error: "S3 not configured" }); + const { uploadId } = req.body; + if (!uploadId) return res.status(400).json({ success: false, error: "uploadId required" }); + + const session = chunkSessions.get(uploadId); + if (!session) return res.status(404).json({ success: false, error: "Session not found or expired" }); + + const parts = session.parts.slice().sort((a, b) => a.PartNumber - b.PartNumber); + try { + await s3Client.send(new CompleteMultipartUploadCommand({ + Bucket: session.bucket, Key: session.key, UploadId: uploadId, + MultipartUpload: { Parts: parts }, + })); + chunkSessions.delete(uploadId); + console.log(`[chunk] Completed: ${session.key} (${parts.length} parts)`); + res.json({ success: true, key: session.key }); + } catch (err) { + console.error("[chunk] Complete error:", err.message); + try { await s3Client.send(new AbortMultipartUploadCommand({ Bucket: session.bucket, Key: session.key, UploadId: uploadId })); } catch (_) {} + chunkSessions.delete(uploadId); + res.status(500).json({ success: false, error: err.message }); + } +}); + +// 4. Abort — cancel a multipart upload (called on client-side error) +app.post("/api/upload/abort", requireAuth, async (req, res) => { + const { uploadId } = req.body; + const session = chunkSessions.get(uploadId); + if (session && s3Client) { + try { await s3Client.send(new AbortMultipartUploadCommand({ Bucket: session.bucket, Key: session.key, UploadId: uploadId })); } catch (_) {} + } + if (uploadId) chunkSessions.delete(uploadId); + res.json({ success: true }); +});