"use strict"; /** * Dragon Wind UDP Relay Server * * Acts as a high-speed relay between the Chrome extension (or any UDP client) * and the S3 destination. Uses Forward Error Correction (FEC) style chunking * with acknowledgement over a TCP control channel, and raw UDP for data. * * Architecture: * Client --[UDP data]--> Relay --[S3 multipart upload]--> S3 * Client <--[TCP ACK]--- Relay * * Ports: * TCP 3001 — Control / HTTP API (health, session management) * UDP 5000 — Data transfer * * Environment: * PORT TCP control port (default 3001) * UDP_PORT UDP data port (default 5000) * MAX_SESSIONS Max concurrent sessions (default 50) */ const express = require("express"); const dgram = require("dgram"); const crypto = require("crypto"); const { S3Client, CreateMultipartUploadCommand, UploadPartCommand, CompleteMultipartUploadCommand, AbortMultipartUploadCommand } = require("@aws-sdk/client-s3"); const { NodeHttpHandler } = require("@smithy/node-http-handler"); const https = require("https"); const http = require("http"); const TCP_PORT = parseInt(process.env.PORT || "3001"); const UDP_PORT = parseInt(process.env.UDP_PORT || "5000"); const MAX_SESSIONS = parseInt(process.env.MAX_SESSIONS || "50"); const CHUNK_SIZE = 64 * 1024; // 64KB UDP chunks const PART_SIZE = 5 * 1024 * 1024; // 5MB S3 multipart parts const SESSION_TTL = 2 * 60 * 60 * 1000; // 2 hours // ==================== STATE ==================== const sessions = new Map(); // sessionId → RelaySession // ==================== S3 CLIENT ==================== function makeS3(cfg) { const isHttps = (cfg.endpoint || "").startsWith("https"); const agent = isHttps ? new https.Agent({ keepAlive: true, maxSockets: 10, rejectUnauthorized: false }) : new http.Agent({ keepAlive: true, maxSockets: 10 }); return new S3Client({ endpoint: cfg.endpoint, region: cfg.region || "us-east-1", credentials: { accessKeyId: cfg.accessKeyId, secretAccessKey: cfg.secretAccessKey }, forcePathStyle: true, requestHandler: new NodeHttpHandler({ connectionTimeout: 15000, socketTimeout: 600000, ...(isHttps ? { httpsAgent: agent } : { httpAgent: agent }), }), }); } // ==================== SESSION CLASS ==================== class RelaySession { constructor({ sessionId, filename, size, key, bucket, s3Cfg }) { this.sessionId = sessionId; this.filename = filename; this.size = size; this.key = key; this.bucket = bucket; this.s3 = makeS3(s3Cfg); this.status = "pending"; this.receivedBytes = 0; this.chunks = new Map(); // chunkIndex → Buffer this.expectedChunks = Math.ceil(size / CHUNK_SIZE); this.uploadId = null; this.parts = []; this.partBuffer = Buffer.alloc(0); this.partNumber = 1; this.createdAt = Date.now(); this.clientAddr = null; this.clientPort = null; // Timeout cleanup this._timeout = setTimeout(() => this._cleanup("timeout"), SESSION_TTL); } async initMultipart() { const cmd = new CreateMultipartUploadCommand({ Bucket: this.bucket, Key: this.key }); const r = await this.s3.send(cmd); this.uploadId = r.UploadId; this.status = "receiving"; console.log(`[${this.sessionId.slice(0, 8)}] Multipart initiated: ${this.key} (${this.uploadId})`); } // Called for each received UDP chunk async receiveChunk(chunkIndex, data) { if (this.chunks.has(chunkIndex)) return; // duplicate this.chunks.set(chunkIndex, data); this.receivedBytes += data.length; // Append to part buffer this.partBuffer = Buffer.concat([this.partBuffer, data]); // Flush part when buffer hits PART_SIZE or we have all chunks if (this.partBuffer.length >= PART_SIZE || this.chunks.size === this.expectedChunks) { await this._flushPart(); } } async _flushPart() { if (this.partBuffer.length === 0) return; const body = this.partBuffer; this.partBuffer = Buffer.alloc(0); const cmd = new UploadPartCommand({ Bucket: this.bucket, Key: this.key, UploadId: this.uploadId, PartNumber: this.partNumber, Body: body, ContentLength: body.length, }); const r = await this.s3.send(cmd); this.parts.push({ PartNumber: this.partNumber, ETag: r.ETag }); console.log(`[${this.sessionId.slice(0, 8)}] Part ${this.partNumber} uploaded (${body.length} bytes)`); this.partNumber++; } async complete() { await this._flushPart(); // flush remainder const cmd = new CompleteMultipartUploadCommand({ Bucket: this.bucket, Key: this.key, UploadId: this.uploadId, MultipartUpload: { Parts: this.parts }, }); await this.s3.send(cmd); this.status = "completed"; clearTimeout(this._timeout); console.log(`[${this.sessionId.slice(0, 8)}] ✅ Upload complete: ${this.key}`); } async abort() { if (this.uploadId) { try { await this.s3.send(new AbortMultipartUploadCommand({ Bucket: this.bucket, Key: this.key, UploadId: this.uploadId, })); } catch (_) {} } this._cleanup("aborted"); } _cleanup(reason) { clearTimeout(this._timeout); this.status = reason; sessions.delete(this.sessionId); console.log(`[${this.sessionId.slice(0, 8)}] Session ${reason}`); } progress() { return { sessionId: this.sessionId, filename: this.filename, size: this.size, receivedBytes: this.receivedBytes, expectedChunks: this.expectedChunks, receivedChunks: this.chunks.size, percent: this.size > 0 ? Math.round((this.receivedBytes / this.size) * 100) : 0, status: this.status, parts: this.parts.length, }; } } // ==================== UDP SERVER ==================== const udpServer = dgram.createSocket("udp4"); /* * UDP Packet Format (binary): * [0..15] sessionId (16 bytes, hex → binary) * [16..19] chunkIndex (uint32 BE) * [20..] data payload */ udpServer.on("message", async (msg, rinfo) => { if (msg.length < 21) return; // too short const sessionIdBuf = msg.slice(0, 16); const sessionId = sessionIdBuf.toString("hex"); const chunkIndex = msg.readUInt32BE(16); const data = msg.slice(20); const session = sessions.get(sessionId); if (!session) { console.warn(`[UDP] Unknown session: ${sessionId.slice(0, 8)}`); return; } // Track client address for ACKs if (!session.clientAddr) { session.clientAddr = rinfo.address; session.clientPort = rinfo.port; } try { if (session.status === "pending") { await session.initMultipart(); } await session.receiveChunk(chunkIndex, data); // Send ACK back to client // ACK format: [0..15] sessionId [16..19] chunkIndex [20] status(0=ok) const ack = Buffer.alloc(21); sessionIdBuf.copy(ack, 0); ack.writeUInt32BE(chunkIndex, 16); ack.writeUInt8(0, 20); udpServer.send(ack, rinfo.port, rinfo.address); // Auto-complete when all chunks received if (session.chunks.size === session.expectedChunks && session.status === "receiving") { await session.complete(); } } catch (err) { console.error(`[UDP] Error for session ${sessionId.slice(0, 8)}:`, err.message); // Send NACK const nack = Buffer.alloc(21); sessionIdBuf.copy(nack, 0); nack.writeUInt32BE(chunkIndex, 16); nack.writeUInt8(1, 20); // 1 = error udpServer.send(nack, rinfo.port, rinfo.address); } }); udpServer.on("error", (err) => console.error("[UDP] Server error:", err)); udpServer.bind(UDP_PORT, "0.0.0.0", () => { console.log(`🌪️ UDP relay listening on port ${UDP_PORT}`); }); // ==================== HTTP CONTROL API ==================== const app = express(); // CORS — browsers (Chrome extension) send chunks via HTTP fallback app.use((req, res, next) => { res.setHeader("Access-Control-Allow-Origin", "*"); res.setHeader("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS"); res.setHeader("Access-Control-Allow-Headers", "Content-Type,Authorization"); if (req.method === "OPTIONS") return res.sendStatus(204); next(); }); app.use(express.json()); app.use(express.raw({ type: "application/octet-stream", limit: "100mb" })); // Health check app.get("/health", (req, res) => { res.json({ status: "ok", udpPort: UDP_PORT, activeSessions: sessions.size, maxSessions: MAX_SESSIONS, uptime: process.uptime(), }); }); // Create session app.post("/session", (req, res) => { const { sessionId, filename, size, key, bucket, s3Config } = req.body; if (!sessionId || !filename || !size || !key || !bucket || !s3Config) return res.status(400).json({ error: "Missing required fields: sessionId, filename, size, key, bucket, s3Config" }); if (!s3Config.endpoint || !s3Config.accessKeyId || !s3Config.secretAccessKey) return res.status(400).json({ error: "s3Config must include endpoint, accessKeyId, secretAccessKey" }); if (sessions.size >= MAX_SESSIONS) return res.status(503).json({ error: "Max concurrent sessions reached" }); if (sessions.has(sessionId)) return res.status(409).json({ error: "Session already exists" }); const session = new RelaySession({ sessionId, filename, size, key, bucket, s3Cfg: s3Config }); sessions.set(sessionId, session); console.log(`[HTTP] Session created: ${sessionId.slice(0, 8)} — ${filename} (${size} bytes)`); res.json({ success: true, sessionId, udpPort: UDP_PORT, chunkSize: CHUNK_SIZE, expectedChunks: session.expectedChunks }); }); // HTTP chunk fallback — Chrome extensions can't send raw UDP, so they POST chunks over HTTP app.post("/session/:id/chunk/:index", async (req, res) => { const session = sessions.get(req.params.id); if (!session) return res.status(404).json({ error: "Session not found" }); const chunkIndex = parseInt(req.params.index); if (isNaN(chunkIndex)) return res.status(400).json({ error: "Invalid chunk index" }); try { // Initialize multipart upload on first chunk if (session.status === "pending") { await session.initMultipart(); } await session.receiveChunk(chunkIndex, req.body); // Auto-complete when all chunks received if (session.chunks.size === session.expectedChunks && session.status === "receiving") { await session.complete(); } res.json({ success: true, chunkIndex, receivedChunks: session.chunks.size, expectedChunks: session.expectedChunks, percent: session.progress().percent, }); } catch (err) { console.error(`[HTTP] Chunk ${chunkIndex} error for ${req.params.id.slice(0, 8)}:`, err.message); res.status(500).json({ error: err.message }); } }); // Get session progress app.get("/session/:id", (req, res) => { const session = sessions.get(req.params.id); if (!session) return res.status(404).json({ error: "Session not found" }); res.json(session.progress()); }); // Complete session manually app.post("/session/:id/complete", async (req, res) => { const session = sessions.get(req.params.id); if (!session) return res.status(404).json({ error: "Session not found" }); try { await session.complete(); res.json({ success: true, status: "completed" }); } catch (err) { res.status(500).json({ error: err.message }); } }); // Abort session app.delete("/session/:id", async (req, res) => { const session = sessions.get(req.params.id); if (!session) return res.status(404).json({ error: "Session not found" }); await session.abort(); res.json({ success: true, status: "aborted" }); }); // List all sessions app.get("/sessions", (req, res) => { res.json({ sessions: Array.from(sessions.values()).map((s) => s.progress()) }); }); app.listen(TCP_PORT, "0.0.0.0", () => { console.log(`🌪️ Dragon Wind Relay control API on port ${TCP_PORT}`); console.log(` UDP data port: ${UDP_PORT}`); console.log(` Max sessions: ${MAX_SESSIONS}`); }); process.on("unhandledRejection", (r) => console.error("Unhandled:", r)); process.on("uncaughtException", (e) => console.error("Uncaught:", e));