DragonWind/udp-relay/server.js
Zac Gaetano 641701edf8 feat: Dragon Wind v1.0 — dual-mode broadcast uploader
- Full VPM Uploader feature set (auth, users, folders, AMPP monitor)
- HTTP upload via presigned S3 URLs with XHR progress tracking
- UDP upload mode with relay server (WebRTC DataChannel + HTTP fallback)
- S3 Admin settings with live Test Connection (upload+delete verify)
- UDP Relay Admin settings with health check
- Standalone UDP relay server (Node.js + Docker) with multipart S3 assembly
- Chrome Extension (Manifest v3): popup, background, content script
- Dynamic S3 client — reconfigures on save without restart
- Dark/light theme, full AMPP job monitor
- docker-compose.yml with dragon-wind + udp-relay services

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 20:05:34 -04:00

308 lines
10 KiB
JavaScript

"use strict";
/**
* Dragon Wind UDP Relay Server
*
* Acts as a high-speed relay between the Chrome extension (or any UDP client)
* and the S3 destination. Uses Forward Error Correction (FEC) style chunking
* with acknowledgement over a TCP control channel, and raw UDP for data.
*
* Architecture:
* Client --[UDP data]--> Relay --[S3 multipart upload]--> S3
* Client <--[TCP ACK]--- Relay
*
* Ports:
* TCP 3001 — Control / HTTP API (health, session management)
* UDP 5000 — Data transfer
*
* Environment:
* PORT TCP control port (default 3001)
* UDP_PORT UDP data port (default 5000)
* MAX_SESSIONS Max concurrent sessions (default 50)
*/
const express = require("express");
const dgram = require("dgram");
const crypto = require("crypto");
const { S3Client, CreateMultipartUploadCommand, UploadPartCommand,
CompleteMultipartUploadCommand, AbortMultipartUploadCommand } = require("@aws-sdk/client-s3");
const { NodeHttpHandler } = require("@smithy/node-http-handler");
const https = require("https");
const http = require("http");
const TCP_PORT = parseInt(process.env.PORT || "3001");
const UDP_PORT = parseInt(process.env.UDP_PORT || "5000");
const MAX_SESSIONS = parseInt(process.env.MAX_SESSIONS || "50");
const CHUNK_SIZE = 64 * 1024; // 64KB UDP chunks
const PART_SIZE = 5 * 1024 * 1024; // 5MB S3 multipart parts
const SESSION_TTL = 2 * 60 * 60 * 1000; // 2 hours
// ==================== STATE ====================
const sessions = new Map(); // sessionId → RelaySession
// ==================== S3 CLIENT ====================
function makeS3(cfg) {
const isHttps = (cfg.endpoint || "").startsWith("https");
const agent = isHttps
? new https.Agent({ keepAlive: true, maxSockets: 10, rejectUnauthorized: false })
: new http.Agent({ keepAlive: true, maxSockets: 10 });
return new S3Client({
endpoint: cfg.endpoint,
region: cfg.region || "us-east-1",
credentials: { accessKeyId: cfg.accessKeyId, secretAccessKey: cfg.secretAccessKey },
forcePathStyle: true,
requestHandler: new NodeHttpHandler({
connectionTimeout: 15000, socketTimeout: 600000,
...(isHttps ? { httpsAgent: agent } : { httpAgent: agent }),
}),
});
}
// ==================== SESSION CLASS ====================
class RelaySession {
constructor({ sessionId, filename, size, key, bucket, s3Cfg }) {
this.sessionId = sessionId;
this.filename = filename;
this.size = size;
this.key = key;
this.bucket = bucket;
this.s3 = makeS3(s3Cfg);
this.status = "pending";
this.receivedBytes = 0;
this.chunks = new Map(); // chunkIndex → Buffer
this.expectedChunks = Math.ceil(size / CHUNK_SIZE);
this.uploadId = null;
this.parts = [];
this.partBuffer = Buffer.alloc(0);
this.partNumber = 1;
this.createdAt = Date.now();
this.clientAddr = null;
this.clientPort = null;
// Timeout cleanup
this._timeout = setTimeout(() => this._cleanup("timeout"), SESSION_TTL);
}
async initMultipart() {
const cmd = new CreateMultipartUploadCommand({ Bucket: this.bucket, Key: this.key });
const r = await this.s3.send(cmd);
this.uploadId = r.UploadId;
this.status = "receiving";
console.log(`[${this.sessionId.slice(0, 8)}] Multipart initiated: ${this.key} (${this.uploadId})`);
}
// Called for each received UDP chunk
async receiveChunk(chunkIndex, data) {
if (this.chunks.has(chunkIndex)) return; // duplicate
this.chunks.set(chunkIndex, data);
this.receivedBytes += data.length;
// Append to part buffer
this.partBuffer = Buffer.concat([this.partBuffer, data]);
// Flush part when buffer hits PART_SIZE or we have all chunks
if (this.partBuffer.length >= PART_SIZE || this.chunks.size === this.expectedChunks) {
await this._flushPart();
}
}
async _flushPart() {
if (this.partBuffer.length === 0) return;
const body = this.partBuffer;
this.partBuffer = Buffer.alloc(0);
const cmd = new UploadPartCommand({
Bucket: this.bucket, Key: this.key,
UploadId: this.uploadId,
PartNumber: this.partNumber,
Body: body,
ContentLength: body.length,
});
const r = await this.s3.send(cmd);
this.parts.push({ PartNumber: this.partNumber, ETag: r.ETag });
console.log(`[${this.sessionId.slice(0, 8)}] Part ${this.partNumber} uploaded (${body.length} bytes)`);
this.partNumber++;
}
async complete() {
await this._flushPart(); // flush remainder
const cmd = new CompleteMultipartUploadCommand({
Bucket: this.bucket, Key: this.key,
UploadId: this.uploadId,
MultipartUpload: { Parts: this.parts },
});
await this.s3.send(cmd);
this.status = "completed";
clearTimeout(this._timeout);
console.log(`[${this.sessionId.slice(0, 8)}] ✅ Upload complete: ${this.key}`);
}
async abort() {
if (this.uploadId) {
try {
await this.s3.send(new AbortMultipartUploadCommand({
Bucket: this.bucket, Key: this.key, UploadId: this.uploadId,
}));
} catch (_) {}
}
this._cleanup("aborted");
}
_cleanup(reason) {
clearTimeout(this._timeout);
this.status = reason;
sessions.delete(this.sessionId);
console.log(`[${this.sessionId.slice(0, 8)}] Session ${reason}`);
}
progress() {
return {
sessionId: this.sessionId,
filename: this.filename,
size: this.size,
receivedBytes: this.receivedBytes,
expectedChunks: this.expectedChunks,
receivedChunks: this.chunks.size,
percent: this.size > 0 ? Math.round((this.receivedBytes / this.size) * 100) : 0,
status: this.status,
parts: this.parts.length,
};
}
}
// ==================== UDP SERVER ====================
const udpServer = dgram.createSocket("udp4");
/*
* UDP Packet Format (binary):
* [0..15] sessionId (16 bytes, hex → binary)
* [16..19] chunkIndex (uint32 BE)
* [20..] data payload
*/
udpServer.on("message", async (msg, rinfo) => {
if (msg.length < 21) return; // too short
const sessionIdBuf = msg.slice(0, 16);
const sessionId = sessionIdBuf.toString("hex");
const chunkIndex = msg.readUInt32BE(16);
const data = msg.slice(20);
const session = sessions.get(sessionId);
if (!session) {
console.warn(`[UDP] Unknown session: ${sessionId.slice(0, 8)}`);
return;
}
// Track client address for ACKs
if (!session.clientAddr) {
session.clientAddr = rinfo.address;
session.clientPort = rinfo.port;
}
try {
if (session.status === "pending") {
await session.initMultipart();
}
await session.receiveChunk(chunkIndex, data);
// Send ACK back to client
// ACK format: [0..15] sessionId [16..19] chunkIndex [20] status(0=ok)
const ack = Buffer.alloc(21);
sessionIdBuf.copy(ack, 0);
ack.writeUInt32BE(chunkIndex, 16);
ack.writeUInt8(0, 20);
udpServer.send(ack, rinfo.port, rinfo.address);
// Auto-complete when all chunks received
if (session.chunks.size === session.expectedChunks && session.status === "receiving") {
await session.complete();
}
} catch (err) {
console.error(`[UDP] Error for session ${sessionId.slice(0, 8)}:`, err.message);
// Send NACK
const nack = Buffer.alloc(21);
sessionIdBuf.copy(nack, 0);
nack.writeUInt32BE(chunkIndex, 16);
nack.writeUInt8(1, 20); // 1 = error
udpServer.send(nack, rinfo.port, rinfo.address);
}
});
udpServer.on("error", (err) => console.error("[UDP] Server error:", err));
udpServer.bind(UDP_PORT, "0.0.0.0", () => {
console.log(`🌪️ UDP relay listening on port ${UDP_PORT}`);
});
// ==================== HTTP CONTROL API ====================
const app = express();
app.use(express.json());
// Health check
app.get("/health", (req, res) => {
res.json({
status: "ok",
udpPort: UDP_PORT,
activeSessions: sessions.size,
maxSessions: MAX_SESSIONS,
uptime: process.uptime(),
});
});
// Create session
app.post("/session", (req, res) => {
const { sessionId, filename, size, key, bucket, s3Config } = req.body;
if (!sessionId || !filename || !size || !key || !bucket || !s3Config)
return res.status(400).json({ error: "Missing required fields: sessionId, filename, size, key, bucket, s3Config" });
if (!s3Config.endpoint || !s3Config.accessKeyId || !s3Config.secretAccessKey)
return res.status(400).json({ error: "s3Config must include endpoint, accessKeyId, secretAccessKey" });
if (sessions.size >= MAX_SESSIONS)
return res.status(503).json({ error: "Max concurrent sessions reached" });
if (sessions.has(sessionId))
return res.status(409).json({ error: "Session already exists" });
const session = new RelaySession({ sessionId, filename, size, key, bucket, s3Cfg: s3Config });
sessions.set(sessionId, session);
console.log(`[HTTP] Session created: ${sessionId.slice(0, 8)}${filename} (${size} bytes)`);
res.json({ success: true, sessionId, udpPort: UDP_PORT, chunkSize: CHUNK_SIZE, expectedChunks: session.expectedChunks });
});
// Get session progress
app.get("/session/:id", (req, res) => {
const session = sessions.get(req.params.id);
if (!session) return res.status(404).json({ error: "Session not found" });
res.json(session.progress());
});
// Complete session manually
app.post("/session/:id/complete", async (req, res) => {
const session = sessions.get(req.params.id);
if (!session) return res.status(404).json({ error: "Session not found" });
try {
await session.complete();
res.json({ success: true, status: "completed" });
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// Abort session
app.delete("/session/:id", async (req, res) => {
const session = sessions.get(req.params.id);
if (!session) return res.status(404).json({ error: "Session not found" });
await session.abort();
res.json({ success: true, status: "aborted" });
});
// List all sessions
app.get("/sessions", (req, res) => {
res.json({ sessions: Array.from(sessions.values()).map((s) => s.progress()) });
});
app.listen(TCP_PORT, "0.0.0.0", () => {
console.log(`🌪️ Dragon Wind Relay control API on port ${TCP_PORT}`);
console.log(` UDP data port: ${UDP_PORT}`);
console.log(` Max sessions: ${MAX_SESSIONS}`);
});
process.on("unhandledRejection", (r) => console.error("Unhandled:", r));
process.on("uncaughtException", (e) => console.error("Uncaught:", e));