Root cause: three critical bugs in the UDP upload flow: 1. Main server never registered sessions on the relay — it stored them in its own memory but never called POST /session on the relay, so the relay had no idea about any upload sessions. 2. Relay had no HTTP chunk endpoint — the Chrome extension sends chunks via HTTP POST to /session/:id/chunk/:index, but the relay only had a binary UDP listener. Added the HTTP fallback endpoint. 3. Relay had no CORS headers — browser requests from chrome-extension:// origins were blocked. Added CORS middleware. The flow now works: Browser → POST /api/udp/session (main server) Main server → POST /session (relay, with s3Config) Browser → POST /session/:id/chunk/:n (relay, via public URL) Relay → S3 multipart upload Browser → POST /api/udp/session/:id/complete (main server) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
351 lines
12 KiB
JavaScript
351 lines
12 KiB
JavaScript
"use strict";
|
|
/**
|
|
* Dragon Wind UDP Relay Server
|
|
*
|
|
* Acts as a high-speed relay between the Chrome extension (or any UDP client)
|
|
* and the S3 destination. Uses Forward Error Correction (FEC) style chunking
|
|
* with acknowledgement over a TCP control channel, and raw UDP for data.
|
|
*
|
|
* Architecture:
|
|
* Client --[UDP data]--> Relay --[S3 multipart upload]--> S3
|
|
* Client <--[TCP ACK]--- Relay
|
|
*
|
|
* Ports:
|
|
* TCP 3001 — Control / HTTP API (health, session management)
|
|
* UDP 5000 — Data transfer
|
|
*
|
|
* Environment:
|
|
* PORT TCP control port (default 3001)
|
|
* UDP_PORT UDP data port (default 5000)
|
|
* MAX_SESSIONS Max concurrent sessions (default 50)
|
|
*/
|
|
|
|
const express = require("express");
|
|
const dgram = require("dgram");
|
|
const crypto = require("crypto");
|
|
const { S3Client, CreateMultipartUploadCommand, UploadPartCommand,
|
|
CompleteMultipartUploadCommand, AbortMultipartUploadCommand } = require("@aws-sdk/client-s3");
|
|
const { NodeHttpHandler } = require("@smithy/node-http-handler");
|
|
const https = require("https");
|
|
const http = require("http");
|
|
|
|
const TCP_PORT = parseInt(process.env.PORT || "3001");
|
|
const UDP_PORT = parseInt(process.env.UDP_PORT || "5000");
|
|
const MAX_SESSIONS = parseInt(process.env.MAX_SESSIONS || "50");
|
|
const CHUNK_SIZE = 64 * 1024; // 64KB UDP chunks
|
|
const PART_SIZE = 5 * 1024 * 1024; // 5MB S3 multipart parts
|
|
const SESSION_TTL = 2 * 60 * 60 * 1000; // 2 hours
|
|
|
|
// ==================== STATE ====================
|
|
const sessions = new Map(); // sessionId → RelaySession
|
|
|
|
// ==================== S3 CLIENT ====================
|
|
function makeS3(cfg) {
|
|
const isHttps = (cfg.endpoint || "").startsWith("https");
|
|
const agent = isHttps
|
|
? new https.Agent({ keepAlive: true, maxSockets: 10, rejectUnauthorized: false })
|
|
: new http.Agent({ keepAlive: true, maxSockets: 10 });
|
|
return new S3Client({
|
|
endpoint: cfg.endpoint,
|
|
region: cfg.region || "us-east-1",
|
|
credentials: { accessKeyId: cfg.accessKeyId, secretAccessKey: cfg.secretAccessKey },
|
|
forcePathStyle: true,
|
|
requestHandler: new NodeHttpHandler({
|
|
connectionTimeout: 15000, socketTimeout: 600000,
|
|
...(isHttps ? { httpsAgent: agent } : { httpAgent: agent }),
|
|
}),
|
|
});
|
|
}
|
|
|
|
// ==================== SESSION CLASS ====================
|
|
class RelaySession {
|
|
constructor({ sessionId, filename, size, key, bucket, s3Cfg }) {
|
|
this.sessionId = sessionId;
|
|
this.filename = filename;
|
|
this.size = size;
|
|
this.key = key;
|
|
this.bucket = bucket;
|
|
this.s3 = makeS3(s3Cfg);
|
|
|
|
this.status = "pending";
|
|
this.receivedBytes = 0;
|
|
this.chunks = new Map(); // chunkIndex → Buffer
|
|
this.expectedChunks = Math.ceil(size / CHUNK_SIZE);
|
|
this.uploadId = null;
|
|
this.parts = [];
|
|
this.partBuffer = Buffer.alloc(0);
|
|
this.partNumber = 1;
|
|
this.createdAt = Date.now();
|
|
this.clientAddr = null;
|
|
this.clientPort = null;
|
|
|
|
// Timeout cleanup
|
|
this._timeout = setTimeout(() => this._cleanup("timeout"), SESSION_TTL);
|
|
}
|
|
|
|
async initMultipart() {
|
|
const cmd = new CreateMultipartUploadCommand({ Bucket: this.bucket, Key: this.key });
|
|
const r = await this.s3.send(cmd);
|
|
this.uploadId = r.UploadId;
|
|
this.status = "receiving";
|
|
console.log(`[${this.sessionId.slice(0, 8)}] Multipart initiated: ${this.key} (${this.uploadId})`);
|
|
}
|
|
|
|
// Called for each received UDP chunk
|
|
async receiveChunk(chunkIndex, data) {
|
|
if (this.chunks.has(chunkIndex)) return; // duplicate
|
|
this.chunks.set(chunkIndex, data);
|
|
this.receivedBytes += data.length;
|
|
|
|
// Append to part buffer
|
|
this.partBuffer = Buffer.concat([this.partBuffer, data]);
|
|
|
|
// Flush part when buffer hits PART_SIZE or we have all chunks
|
|
if (this.partBuffer.length >= PART_SIZE || this.chunks.size === this.expectedChunks) {
|
|
await this._flushPart();
|
|
}
|
|
}
|
|
|
|
async _flushPart() {
|
|
if (this.partBuffer.length === 0) return;
|
|
const body = this.partBuffer;
|
|
this.partBuffer = Buffer.alloc(0);
|
|
const cmd = new UploadPartCommand({
|
|
Bucket: this.bucket, Key: this.key,
|
|
UploadId: this.uploadId,
|
|
PartNumber: this.partNumber,
|
|
Body: body,
|
|
ContentLength: body.length,
|
|
});
|
|
const r = await this.s3.send(cmd);
|
|
this.parts.push({ PartNumber: this.partNumber, ETag: r.ETag });
|
|
console.log(`[${this.sessionId.slice(0, 8)}] Part ${this.partNumber} uploaded (${body.length} bytes)`);
|
|
this.partNumber++;
|
|
}
|
|
|
|
async complete() {
|
|
await this._flushPart(); // flush remainder
|
|
const cmd = new CompleteMultipartUploadCommand({
|
|
Bucket: this.bucket, Key: this.key,
|
|
UploadId: this.uploadId,
|
|
MultipartUpload: { Parts: this.parts },
|
|
});
|
|
await this.s3.send(cmd);
|
|
this.status = "completed";
|
|
clearTimeout(this._timeout);
|
|
console.log(`[${this.sessionId.slice(0, 8)}] ✅ Upload complete: ${this.key}`);
|
|
}
|
|
|
|
async abort() {
|
|
if (this.uploadId) {
|
|
try {
|
|
await this.s3.send(new AbortMultipartUploadCommand({
|
|
Bucket: this.bucket, Key: this.key, UploadId: this.uploadId,
|
|
}));
|
|
} catch (_) {}
|
|
}
|
|
this._cleanup("aborted");
|
|
}
|
|
|
|
_cleanup(reason) {
|
|
clearTimeout(this._timeout);
|
|
this.status = reason;
|
|
sessions.delete(this.sessionId);
|
|
console.log(`[${this.sessionId.slice(0, 8)}] Session ${reason}`);
|
|
}
|
|
|
|
progress() {
|
|
return {
|
|
sessionId: this.sessionId,
|
|
filename: this.filename,
|
|
size: this.size,
|
|
receivedBytes: this.receivedBytes,
|
|
expectedChunks: this.expectedChunks,
|
|
receivedChunks: this.chunks.size,
|
|
percent: this.size > 0 ? Math.round((this.receivedBytes / this.size) * 100) : 0,
|
|
status: this.status,
|
|
parts: this.parts.length,
|
|
};
|
|
}
|
|
}
|
|
|
|
// ==================== UDP SERVER ====================
|
|
const udpServer = dgram.createSocket("udp4");
|
|
|
|
/*
|
|
* UDP Packet Format (binary):
|
|
* [0..15] sessionId (16 bytes, hex → binary)
|
|
* [16..19] chunkIndex (uint32 BE)
|
|
* [20..] data payload
|
|
*/
|
|
udpServer.on("message", async (msg, rinfo) => {
|
|
if (msg.length < 21) return; // too short
|
|
|
|
const sessionIdBuf = msg.slice(0, 16);
|
|
const sessionId = sessionIdBuf.toString("hex");
|
|
const chunkIndex = msg.readUInt32BE(16);
|
|
const data = msg.slice(20);
|
|
|
|
const session = sessions.get(sessionId);
|
|
if (!session) {
|
|
console.warn(`[UDP] Unknown session: ${sessionId.slice(0, 8)}`);
|
|
return;
|
|
}
|
|
|
|
// Track client address for ACKs
|
|
if (!session.clientAddr) {
|
|
session.clientAddr = rinfo.address;
|
|
session.clientPort = rinfo.port;
|
|
}
|
|
|
|
try {
|
|
if (session.status === "pending") {
|
|
await session.initMultipart();
|
|
}
|
|
await session.receiveChunk(chunkIndex, data);
|
|
|
|
// Send ACK back to client
|
|
// ACK format: [0..15] sessionId [16..19] chunkIndex [20] status(0=ok)
|
|
const ack = Buffer.alloc(21);
|
|
sessionIdBuf.copy(ack, 0);
|
|
ack.writeUInt32BE(chunkIndex, 16);
|
|
ack.writeUInt8(0, 20);
|
|
udpServer.send(ack, rinfo.port, rinfo.address);
|
|
|
|
// Auto-complete when all chunks received
|
|
if (session.chunks.size === session.expectedChunks && session.status === "receiving") {
|
|
await session.complete();
|
|
}
|
|
} catch (err) {
|
|
console.error(`[UDP] Error for session ${sessionId.slice(0, 8)}:`, err.message);
|
|
// Send NACK
|
|
const nack = Buffer.alloc(21);
|
|
sessionIdBuf.copy(nack, 0);
|
|
nack.writeUInt32BE(chunkIndex, 16);
|
|
nack.writeUInt8(1, 20); // 1 = error
|
|
udpServer.send(nack, rinfo.port, rinfo.address);
|
|
}
|
|
});
|
|
|
|
udpServer.on("error", (err) => console.error("[UDP] Server error:", err));
|
|
|
|
udpServer.bind(UDP_PORT, "0.0.0.0", () => {
|
|
console.log(`🌪️ UDP relay listening on port ${UDP_PORT}`);
|
|
});
|
|
|
|
// ==================== HTTP CONTROL API ====================
|
|
const app = express();
|
|
|
|
// CORS — browsers (Chrome extension) send chunks via HTTP fallback
|
|
app.use((req, res, next) => {
|
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
res.setHeader("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS");
|
|
res.setHeader("Access-Control-Allow-Headers", "Content-Type,Authorization");
|
|
if (req.method === "OPTIONS") return res.sendStatus(204);
|
|
next();
|
|
});
|
|
|
|
app.use(express.json());
|
|
app.use(express.raw({ type: "application/octet-stream", limit: "100mb" }));
|
|
|
|
// Health check
|
|
app.get("/health", (req, res) => {
|
|
res.json({
|
|
status: "ok",
|
|
udpPort: UDP_PORT,
|
|
activeSessions: sessions.size,
|
|
maxSessions: MAX_SESSIONS,
|
|
uptime: process.uptime(),
|
|
});
|
|
});
|
|
|
|
// Create session
|
|
app.post("/session", (req, res) => {
|
|
const { sessionId, filename, size, key, bucket, s3Config } = req.body;
|
|
if (!sessionId || !filename || !size || !key || !bucket || !s3Config)
|
|
return res.status(400).json({ error: "Missing required fields: sessionId, filename, size, key, bucket, s3Config" });
|
|
if (!s3Config.endpoint || !s3Config.accessKeyId || !s3Config.secretAccessKey)
|
|
return res.status(400).json({ error: "s3Config must include endpoint, accessKeyId, secretAccessKey" });
|
|
if (sessions.size >= MAX_SESSIONS)
|
|
return res.status(503).json({ error: "Max concurrent sessions reached" });
|
|
if (sessions.has(sessionId))
|
|
return res.status(409).json({ error: "Session already exists" });
|
|
|
|
const session = new RelaySession({ sessionId, filename, size, key, bucket, s3Cfg: s3Config });
|
|
sessions.set(sessionId, session);
|
|
console.log(`[HTTP] Session created: ${sessionId.slice(0, 8)} — ${filename} (${size} bytes)`);
|
|
res.json({ success: true, sessionId, udpPort: UDP_PORT, chunkSize: CHUNK_SIZE, expectedChunks: session.expectedChunks });
|
|
});
|
|
|
|
// HTTP chunk fallback — Chrome extensions can't send raw UDP, so they POST chunks over HTTP
|
|
app.post("/session/:id/chunk/:index", async (req, res) => {
|
|
const session = sessions.get(req.params.id);
|
|
if (!session) return res.status(404).json({ error: "Session not found" });
|
|
const chunkIndex = parseInt(req.params.index);
|
|
if (isNaN(chunkIndex)) return res.status(400).json({ error: "Invalid chunk index" });
|
|
|
|
try {
|
|
// Initialize multipart upload on first chunk
|
|
if (session.status === "pending") {
|
|
await session.initMultipart();
|
|
}
|
|
await session.receiveChunk(chunkIndex, req.body);
|
|
|
|
// Auto-complete when all chunks received
|
|
if (session.chunks.size === session.expectedChunks && session.status === "receiving") {
|
|
await session.complete();
|
|
}
|
|
|
|
res.json({
|
|
success: true,
|
|
chunkIndex,
|
|
receivedChunks: session.chunks.size,
|
|
expectedChunks: session.expectedChunks,
|
|
percent: session.progress().percent,
|
|
});
|
|
} catch (err) {
|
|
console.error(`[HTTP] Chunk ${chunkIndex} error for ${req.params.id.slice(0, 8)}:`, err.message);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// Get session progress
|
|
app.get("/session/:id", (req, res) => {
|
|
const session = sessions.get(req.params.id);
|
|
if (!session) return res.status(404).json({ error: "Session not found" });
|
|
res.json(session.progress());
|
|
});
|
|
|
|
// Complete session manually
|
|
app.post("/session/:id/complete", async (req, res) => {
|
|
const session = sessions.get(req.params.id);
|
|
if (!session) return res.status(404).json({ error: "Session not found" });
|
|
try {
|
|
await session.complete();
|
|
res.json({ success: true, status: "completed" });
|
|
} catch (err) {
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// Abort session
|
|
app.delete("/session/:id", async (req, res) => {
|
|
const session = sessions.get(req.params.id);
|
|
if (!session) return res.status(404).json({ error: "Session not found" });
|
|
await session.abort();
|
|
res.json({ success: true, status: "aborted" });
|
|
});
|
|
|
|
// List all sessions
|
|
app.get("/sessions", (req, res) => {
|
|
res.json({ sessions: Array.from(sessions.values()).map((s) => s.progress()) });
|
|
});
|
|
|
|
app.listen(TCP_PORT, "0.0.0.0", () => {
|
|
console.log(`🌪️ Dragon Wind Relay control API on port ${TCP_PORT}`);
|
|
console.log(` UDP data port: ${UDP_PORT}`);
|
|
console.log(` Max sessions: ${MAX_SESSIONS}`);
|
|
});
|
|
|
|
process.on("unhandledRejection", (r) => console.error("Unhandled:", r));
|
|
process.on("uncaughtException", (e) => console.error("Uncaught:", e));
|