From ebc5b9a6cea532e3612870943f607ca16612dd07 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Mon, 6 Apr 2026 23:17:07 -0400 Subject: [PATCH] feat: initial Dragon Wind Desktop scaffold - Electron tray app (main, preload, renderer) - Parallel chunked HTTP transfer engine - Dragon Wind server client (auth + multipart API) - Upload queue UI with progress, speed, ETA - Folder browser with search - Settings (workers, chunk size) - Server API docs for desktop multipart endpoints --- .gitignore | 7 + README.md | 58 +++ docs/server-api.md | 145 ++++++ package.json | 45 ++ src/main/dw-client.js | 126 +++++ src/main/main.js | 263 +++++++++++ src/main/preload.js | 44 ++ src/main/transfer-engine.js | 325 +++++++++++++ src/renderer/index.html | 912 ++++++++++++++++++++++++++++++++++++ 9 files changed, 1925 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 docs/server-api.md create mode 100644 package.json create mode 100644 src/main/dw-client.js create mode 100644 src/main/main.js create mode 100644 src/main/preload.js create mode 100644 src/main/transfer-engine.js create mode 100644 src/renderer/index.html diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fa49211 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +node_modules/ +dist/ +out/ +.env +*.log +.DS_Store +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..beea369 --- /dev/null +++ b/README.md @@ -0,0 +1,58 @@ +# Dragon Wind Desktop + +Native Electron desktop uploader for [Dragon Wind](https://forge.wilddragon.net/zgaetano/VPM-Uploader) / Grass Valley AMPP. + +High-speed parallel file transfers to S3 — same approach as MASV and Aspera. No browser, no extension, no UDP firewall headaches. + +## How it works + +Files are split into chunks and uploaded concurrently via presigned S3 PUT URLs. The Dragon Wind server orchestrates multipart uploads; the desktop app drives the data plane directly to S3. + +``` +User drops file + → Dragon Wind server: POST /api/desktop/multipart/init + ← uploadId + N presigned S3 PUT URLs + → S3: PUT each chunk in parallel (N workers) + → Dragon Wind server: POST /api/desktop/multipart/complete + ← done ✅ +``` + +## Requirements + +- Node.js 18+ +- A running Dragon Wind server (v0.x or later with desktop API endpoints) +- Electron 30+ + +## Dev + +```bash +npm install +npm run dev +``` + +## Build + +```bash +npm run build:mac # DMG for macOS (x64 + arm64) +npm run build:win # NSIS installer for Windows +npm run build:linux # AppImage for Linux +``` + +## Server-side additions needed + +Add these endpoints to the Dragon Wind `server.js`: + +- `POST /api/desktop/multipart/init` — creates S3 multipart upload, returns presigned part URLs +- `POST /api/desktop/multipart/complete` — completes the multipart upload +- `POST /api/desktop/multipart/abort` — aborts on cancel/error + +See `docs/server-api.md` for full spec. + +## Settings + +| Setting | Default | Description | +|---------|---------|-------------| +| Workers | 6 | Concurrent chunk uploads | +| Chunk size | 16 MB | Per-part size (min 5 MB, S3 limit) | + +Tune workers up on high-bandwidth links (50+ Mbps). Tune chunk size up for high-latency WAN to reduce round trips. diff --git a/docs/server-api.md b/docs/server-api.md new file mode 100644 index 0000000..f359095 --- /dev/null +++ b/docs/server-api.md @@ -0,0 +1,145 @@ +# Dragon Wind Desktop — Server API Additions + +These three endpoints need to be added to `server.js` in the VPM-Uploader repo to support the desktop client. + +## POST /api/desktop/multipart/init + +Auth: `x-auth-token` header required. + +Request: +```json +{ + "filename": "A001C001.mxf", + "size": 10737418240, + "prefix": "Jobs/2026-04-06", + "totalParts": 640 +} +``` + +Response: +```json +{ + "uploadId": "abc123...", + "key": "Jobs/2026-04-06/A001C001.mxf", + "bucket": "vpm-media", + "presignedParts": [ + "https://s3.example.com/vpm-media/Jobs/...", + "..." + ] +} +``` + +Implementation notes: +- Call `s3.createMultipartUpload({ Bucket, Key })` +- For each part 1..totalParts, call `getSignedUrl(s3, new UploadPartCommand({ Bucket, Key, UploadId, PartNumber: i }), { expiresIn: 3600 })` +- Return all presigned URLs in order +- Store `{ uploadId, key, bucket, userId }` in memory for completion validation + +## POST /api/desktop/multipart/complete + +Auth: `x-auth-token` required. + +Request: +```json +{ + "uploadId": "abc123...", + "key": "Jobs/2026-04-06/A001C001.mxf", + "bucket": "vpm-media", + "parts": [ + { "PartNumber": 1, "ETag": "\"abc\"" }, + { "PartNumber": 2, "ETag": "\"def\"" } + ] +} +``` + +Response: +```json +{ "success": true } +``` + +Implementation notes: +- Call `s3.completeMultipartUpload({ Bucket, Key, UploadId, MultipartUpload: { Parts: parts } })` +- Update user uploadedBytes stat + +## POST /api/desktop/multipart/abort + +Auth: `x-auth-token` required. + +Request: +```json +{ + "uploadId": "abc123...", + "key": "Jobs/2026-04-06/A001C001.mxf", + "bucket": "vpm-media" +} +``` + +Response: +```json +{ "success": true } +``` + +## Sample Express implementation + +```js +// Add to server.js after existing /api/udp routes + +const { UploadPartCommand, CreateMultipartUploadCommand, + CompleteMultipartUploadCommand, AbortMultipartUploadCommand } = require('@aws-sdk/client-s3'); +const { getSignedUrl } = require('@aws-sdk/s3-request-presigner'); + +const desktopSessions = new Map(); // uploadId → { key, bucket, userId } + +app.post('/api/desktop/multipart/init', requireAuth, async (req, res) => { + const { filename, size, prefix, totalParts } = req.body; + if (!filename || !size || !totalParts) return res.status(400).json({ error: 'Missing fields' }); + const cfg = getConfig(); + if (!cfg.bucket) return res.status(503).json({ error: 'S3 not configured' }); + + const key = prefix ? `${prefix.replace(/\/+$/, '')}/${filename}` : filename; + + try { + const create = await s3Client.send(new CreateMultipartUploadCommand({ Bucket: cfg.bucket, Key: key })); + const uploadId = create.UploadId; + + const presignedParts = await Promise.all( + Array.from({ length: totalParts }, (_, i) => + getSignedUrl(s3Client, new UploadPartCommand({ + Bucket: cfg.bucket, Key: key, UploadId: uploadId, PartNumber: i + 1, + }), { expiresIn: 3600 }) + ) + ); + + desktopSessions.set(uploadId, { key, bucket: cfg.bucket, userId: req.user.username }); + res.json({ uploadId, key, bucket: cfg.bucket, presignedParts }); + } catch (err) { + res.status(500).json({ error: err.message }); + } +}); + +app.post('/api/desktop/multipart/complete', requireAuth, async (req, res) => { + const { uploadId, key, bucket, parts } = req.body; + if (!uploadId || !parts) return res.status(400).json({ error: 'Missing fields' }); + try { + await s3Client.send(new CompleteMultipartUploadCommand({ + Bucket: bucket, Key: key, UploadId: uploadId, + MultipartUpload: { Parts: parts.sort((a, b) => a.PartNumber - b.PartNumber) }, + })); + desktopSessions.delete(uploadId); + res.json({ success: true }); + } catch (err) { + res.status(500).json({ error: err.message }); + } +}); + +app.post('/api/desktop/multipart/abort', requireAuth, async (req, res) => { + const { uploadId, key, bucket } = req.body; + try { + await s3Client.send(new AbortMultipartUploadCommand({ Bucket: bucket, Key: key, UploadId: uploadId })); + desktopSessions.delete(uploadId); + res.json({ success: true }); + } catch (err) { + res.json({ success: true }); // best-effort + } +}); +``` diff --git a/package.json b/package.json new file mode 100644 index 0000000..df385ff --- /dev/null +++ b/package.json @@ -0,0 +1,45 @@ +{ + "name": "dragon-wind-desktop", + "version": "0.1.0", + "description": "Dragon Wind Desktop — High-speed file uploader for Grass Valley AMPP", + "main": "src/main/main.js", + "scripts": { + "start": "electron .", + "dev": "NODE_ENV=development electron .", + "build": "electron-builder", + "build:mac": "electron-builder --mac", + "build:win": "electron-builder --win", + "build:linux": "electron-builder --linux" + }, + "keywords": ["electron", "uploader", "s3", "broadcast", "ampp"], + "author": "Wild Dragon", + "license": "MIT", + "devDependencies": { + "electron": "^30.0.0", + "electron-builder": "^24.0.0" + }, + "dependencies": { + "@aws-sdk/client-s3": "^3.600.0", + "@aws-sdk/s3-request-presigner": "^3.600.0", + "electron-store": "^10.0.0" + }, + "build": { + "appId": "net.wilddragon.dragon-wind-desktop", + "productName": "Dragon Wind", + "icon": "assets/icon", + "mac": { + "category": "public.app-category.utilities", + "target": [{ "target": "dmg", "arch": ["x64", "arm64"] }] + }, + "win": { + "target": [{ "target": "nsis", "arch": ["x64"] }] + }, + "linux": { + "target": [{ "target": "AppImage", "arch": ["x64"] }] + }, + "files": ["src/**/*", "assets/**/*"], + "extraMetadata": { + "main": "src/main/main.js" + } + } +} diff --git a/src/main/dw-client.js b/src/main/dw-client.js new file mode 100644 index 0000000..720b78b --- /dev/null +++ b/src/main/dw-client.js @@ -0,0 +1,126 @@ +"use strict"; +/** + * Dragon Wind Server Client + * + * Thin wrapper around the Dragon Wind HTTP API. + * Handles auth and the new multipart endpoint we'll add server-side. + */ + +const https = require("https"); +const http = require("http"); + +class DragonWindClient { + constructor(serverUrl) { + this.serverUrl = serverUrl.replace(/\/$/, ""); + this.token = null; + } + + setToken(token) { this.token = token; } + + // ── Auth ──────────────────────────────────────────────────────────────────── + + async login(username, password) { + try { + const res = await this._request("POST", "/api/login", { username, password }); + if (res.token) { + this.token = res.token; + return { success: true, token: res.token, role: res.role }; + } + return { success: false, error: res.error || "Login failed" }; + } catch (err) { + return { success: false, error: err.message }; + } + } + + // ── Folders ───────────────────────────────────────────────────────────────── + + async getFolders() { + try { + const res = await this._request("GET", "/api/folders"); + return { success: true, tree: res.tree || [] }; + } catch (err) { + return { success: false, error: err.message, tree: [] }; + } + } + + // ── Multipart Upload ───────────────────────────────────────────────────────── + // + // Requests presigned URLs for every part in one shot. + // Server endpoint: POST /api/desktop/multipart/init + // Returns: { uploadId, key, bucket, presignedParts: [url, url, ...] } + + async initMultipart({ filename, size, prefix, totalParts }) { + try { + const res = await this._request("POST", "/api/desktop/multipart/init", { + filename, size, prefix, totalParts, + }); + if (res.uploadId) return { success: true, ...res }; + return { success: false, error: res.error || "Init failed" }; + } catch (err) { + return { success: false, error: err.message }; + } + } + + async completeMultipart({ uploadId, key, bucket, parts }) { + try { + const res = await this._request("POST", "/api/desktop/multipart/complete", { + uploadId, key, bucket, parts, + }); + if (res.success) return { success: true }; + return { success: false, error: res.error || "Complete failed" }; + } catch (err) { + return { success: false, error: err.message }; + } + } + + async abortMultipart({ uploadId, key, bucket }) { + try { + await this._request("POST", "/api/desktop/multipart/abort", { uploadId, key, bucket }); + } catch (_) {} + } + + // ── HTTP helper ────────────────────────────────────────────────────────────── + + _request(method, urlPath, body) { + return new Promise((resolve, reject) => { + const full = new URL(this.serverUrl + urlPath); + const isHttps = full.protocol === "https:"; + const lib = isHttps ? https : http; + + const payload = body ? JSON.stringify(body) : null; + const options = { + hostname: full.hostname, + port: full.port || (isHttps ? 443 : 80), + path: full.pathname + full.search, + method, + headers: { + "Content-Type": "application/json", + "Accept": "application/json", + ...(this.token ? { "x-auth-token": this.token } : {}), + ...(payload ? { "Content-Length": Buffer.byteLength(payload) } : {}), + }, + rejectUnauthorized: false, // allow self-signed for internal deployments + }; + + const req = lib.request(options, (res) => { + let data = ""; + res.on("data", chunk => { data += chunk; }); + res.on("end", () => { + try { + const json = JSON.parse(data); + if (res.statusCode >= 200 && res.statusCode < 300) resolve(json); + else reject(new Error(json.error || `HTTP ${res.statusCode}`)); + } catch (e) { + reject(new Error(`Parse error: ${data.slice(0, 100)}`)); + } + }); + }); + + req.on("error", reject); + if (payload) req.write(payload); + req.end(); + }); + } +} + +module.exports = { DragonWindClient }; diff --git a/src/main/main.js b/src/main/main.js new file mode 100644 index 0000000..a2e0788 --- /dev/null +++ b/src/main/main.js @@ -0,0 +1,263 @@ +"use strict"; +/** + * Dragon Wind Desktop — Main Process + * + * Electron tray application for high-speed file uploads to Dragon Wind / S3. + * Architecture mirrors MASV / Aspera: native process owns the transfer engine, + * renderer handles UI. No browser UDP limitations. + */ + +const { app, BrowserWindow, Tray, Menu, ipcMain, dialog, nativeImage, shell } = require("electron"); +const path = require("path"); +const Store = require("electron-store"); +const { TransferEngine } = require("./transfer-engine"); +const { DragonWindClient } = require("./dw-client"); + +// ── Persistent config store ────────────────────────────────────────────────── +const store = new Store({ + schema: { + serverUrl: { type: "string", default: "" }, + authToken: { type: "string", default: "" }, + username: { type: "string", default: "" }, + workers: { type: "number", default: 6 }, + chunkMb: { type: "number", default: 16 }, + }, +}); + +// ── Globals ────────────────────────────────────────────────────────────────── +let tray = null; +let mainWindow = null; +let engine = null; +let dwClient = null; +const isDev = process.env.NODE_ENV === "development"; + +// ── App lifecycle ──────────────────────────────────────────────────────────── +app.whenReady().then(() => { + // macOS: hide dock icon — we are a tray-only app + if (process.platform === "darwin") app.dock.hide(); + + createTray(); + createWindow(); + initEngine(); +}); + +app.on("window-all-closed", (e) => { + // Keep running in tray — don't quit on window close + e.preventDefault(); +}); + +app.on("before-quit", () => { + engine?.shutdown(); +}); + +// ── Tray ───────────────────────────────────────────────────────────────────── +function createTray() { + const iconPath = path.join(__dirname, "../../assets/tray-icon.png"); + const icon = nativeImage.createFromPath(iconPath); + tray = new Tray(icon.resize({ width: 16, height: 16 })); + tray.setToolTip("Dragon Wind"); + updateTrayMenu(); + tray.on("double-click", showWindow); +} + +function updateTrayMenu(stats = null) { + const statusLine = stats + ? `${stats.active} uploading · ${stats.queued} queued · ${formatSpeed(stats.speedBps)}` + : "Idle"; + + const menu = Menu.buildFromTemplate([ + { label: "Dragon Wind", enabled: false }, + { label: statusLine, enabled: false }, + { type: "separator" }, + { label: "Open Upload Window", click: showWindow }, + { label: "Add Files…", click: addFilesFromMenu }, + { type: "separator" }, + { label: "Settings", click: openSettings }, + { type: "separator" }, + { label: "Quit Dragon Wind", click: () => { app.exit(0); } }, + ]); + tray.setContextMenu(menu); +} + +// ── Main Window ─────────────────────────────────────────────────────────────── +function createWindow() { + mainWindow = new BrowserWindow({ + width: 480, + height: 680, + minWidth: 400, + minHeight: 500, + show: false, + frame: false, // custom title bar in renderer + resizable: true, + backgroundColor: "#060812", + titleBarStyle: "hiddenInset", + webPreferences: { + preload: path.join(__dirname, "preload.js"), + contextIsolation: true, + nodeIntegration: false, + sandbox: false, + }, + }); + + mainWindow.loadFile(path.join(__dirname, "../../src/renderer/index.html")); + + if (isDev) mainWindow.webContents.openDevTools({ mode: "detach" }); + + mainWindow.on("close", (e) => { + e.preventDefault(); + mainWindow.hide(); + }); +} + +function showWindow() { + if (!mainWindow) createWindow(); + mainWindow.show(); + mainWindow.focus(); + if (process.platform === "darwin") app.dock.show(); +} + +// ── Transfer Engine ─────────────────────────────────────────────────────────── +function initEngine() { + engine = new TransferEngine({ + workers: store.get("workers"), + chunkMb: store.get("chunkMb"), + onProgress: (job) => { + mainWindow?.webContents.send("job:progress", job); + updateTrayMenu(engine.getStats()); + }, + onComplete: (job) => { + mainWindow?.webContents.send("job:complete", job); + tray.displayBalloon?.({ title: "Upload complete", content: job.name, iconType: "info" }); + updateTrayMenu(engine.getStats()); + }, + onError: (job) => { + mainWindow?.webContents.send("job:error", job); + updateTrayMenu(engine.getStats()); + }, + }); +} + +// ── IPC Handlers ────────────────────────────────────────────────────────────── + +// Auth +ipcMain.handle("auth:login", async (_e, { serverUrl, username, password }) => { + dwClient = new DragonWindClient(serverUrl); + const result = await dwClient.login(username, password); + if (result.success) { + store.set("serverUrl", serverUrl); + store.set("authToken", result.token); + store.set("username", username); + dwClient.setToken(result.token); + } + return result; +}); + +ipcMain.handle("auth:logout", () => { + store.set("authToken", ""); + store.set("username", ""); + dwClient = null; + return { success: true }; +}); + +ipcMain.handle("auth:status", () => { + const token = store.get("authToken"); + const serverUrl = store.get("serverUrl"); + if (token && serverUrl) { + if (!dwClient) { + dwClient = new DragonWindClient(serverUrl); + dwClient.setToken(token); + } + return { loggedIn: true, username: store.get("username"), serverUrl }; + } + return { loggedIn: false }; +}); + +// Folders +ipcMain.handle("folders:list", async () => { + if (!dwClient) return { success: false, error: "Not logged in" }; + return dwClient.getFolders(); +}); + +// Upload +ipcMain.handle("upload:add", async (_e, { files, prefix }) => { + if (!dwClient) return { success: false, error: "Not logged in" }; + const jobs = []; + for (const f of files) { + const job = await engine.enqueue({ file: f, prefix, dwClient }); + jobs.push(job); + } + return { success: true, jobs }; +}); + +ipcMain.handle("upload:queue", () => { + return engine.getQueue(); +}); + +ipcMain.handle("upload:cancel", (_e, jobId) => { + engine.cancel(jobId); + return { success: true }; +}); + +ipcMain.handle("upload:retry", async (_e, jobId) => { + if (!dwClient) return { success: false, error: "Not logged in" }; + return engine.retry(jobId, dwClient); +}); + +// File picker +ipcMain.handle("dialog:pickFiles", async () => { + const result = await dialog.showOpenDialog(mainWindow, { + title: "Select files to upload", + properties: ["openFile", "multiSelections"], + filters: [ + { name: "Media Files", extensions: ["mxf","mp4","mov","avi","r3d","braw","arx","dpx","tif","tiff","wav","aif","aiff","xml","edl","aaf","fcpxml"] }, + { name: "All Files", extensions: ["*"] }, + ], + }); + return result.canceled ? [] : result.filePaths; +}); + +// Settings +ipcMain.handle("settings:get", () => ({ + serverUrl: store.get("serverUrl"), + workers: store.get("workers"), + chunkMb: store.get("chunkMb"), +})); + +ipcMain.handle("settings:set", (_e, settings) => { + if (settings.workers !== undefined) { + store.set("workers", settings.workers); + engine?.setWorkers(settings.workers); + } + if (settings.chunkMb !== undefined) { + store.set("chunkMb", settings.chunkMb); + engine?.setChunkSize(settings.chunkMb); + } + return { success: true }; +}); + +// Window controls +ipcMain.on("window:minimize", () => mainWindow?.minimize()); +ipcMain.on("window:hide", () => mainWindow?.hide()); + +// ── Helpers ─────────────────────────────────────────────────────────────────── +function formatSpeed(bps) { + if (!bps || bps < 1024) return "0 KB/s"; + if (bps < 1024 * 1024) return (bps / 1024).toFixed(0) + " KB/s"; + return (bps / (1024 * 1024)).toFixed(1) + " MB/s"; +} + +async function addFilesFromMenu() { + showWindow(); + const paths = await dialog.showOpenDialog(mainWindow, { + properties: ["openFile", "multiSelections"], + filters: [{ name: "All Files", extensions: ["*"] }], + }); + if (!paths.canceled && paths.filePaths.length) { + mainWindow.webContents.send("menu:addFiles", paths.filePaths); + } +} + +function openSettings() { + showWindow(); + mainWindow.webContents.send("nav:settings"); +} diff --git a/src/main/preload.js b/src/main/preload.js new file mode 100644 index 0000000..0438f1c --- /dev/null +++ b/src/main/preload.js @@ -0,0 +1,44 @@ +"use strict"; +/** + * Preload — context bridge + * Exposes a safe, typed API to the renderer process. + */ + +const { contextBridge, ipcRenderer } = require("electron"); + +contextBridge.exposeInMainWorld("dw", { + // Auth + login: (args) => ipcRenderer.invoke("auth:login", args), + logout: () => ipcRenderer.invoke("auth:logout"), + authStatus: () => ipcRenderer.invoke("auth:status"), + + // Folders + getFolders: () => ipcRenderer.invoke("folders:list"), + + // Upload + addFiles: (args) => ipcRenderer.invoke("upload:add", args), + getQueue: () => ipcRenderer.invoke("upload:queue"), + cancel: (id) => ipcRenderer.invoke("upload:cancel", id), + retry: (id) => ipcRenderer.invoke("upload:retry", id), + + // File picker + pickFiles: () => ipcRenderer.invoke("dialog:pickFiles"), + + // Settings + getSettings: () => ipcRenderer.invoke("settings:get"), + setSettings: (s) => ipcRenderer.invoke("settings:set", s), + + // Window + minimize: () => ipcRenderer.send("window:minimize"), + hide: () => ipcRenderer.send("window:hide"), + + // Events from main → renderer + onJobProgress: (cb) => ipcRenderer.on("job:progress", (_e, job) => cb(job)), + onJobComplete: (cb) => ipcRenderer.on("job:complete", (_e, job) => cb(job)), + onJobError: (cb) => ipcRenderer.on("job:error", (_e, job) => cb(job)), + onAddFiles: (cb) => ipcRenderer.on("menu:addFiles", (_e, paths) => cb(paths)), + onNavSettings: (cb) => ipcRenderer.on("nav:settings", () => cb()), + + // Cleanup + removeAllListeners: (channel) => ipcRenderer.removeAllListeners(channel), +}); diff --git a/src/main/transfer-engine.js b/src/main/transfer-engine.js new file mode 100644 index 0000000..0094c89 --- /dev/null +++ b/src/main/transfer-engine.js @@ -0,0 +1,325 @@ +"use strict"; +/** + * Dragon Wind Transfer Engine + * + * High-speed parallel chunked upload engine. + * Strategy: split each file into chunks, upload chunks concurrently via + * presigned S3 PUT URLs fetched from the Dragon Wind server. + * Mirrors how MASV and Aspera achieve WAN saturation. + * + * File → N chunks → parallel presigned PUT → S3 multipart → complete + * + * Key knobs: + * workers — max concurrent chunk uploads across all active jobs (default 6) + * chunkMb — chunk size in MB (default 16 MB — tunes for WAN RTT) + */ + +const fs = require("fs"); +const path = require("path"); +const crypto = require("crypto"); +const https = require("https"); +const http = require("http"); + +const MIN_PART_SIZE = 5 * 1024 * 1024; // S3 minimum 5 MB per part + +class TransferEngine { + constructor({ workers = 6, chunkMb = 16, onProgress, onComplete, onError }) { + this.workers = workers; + this.chunkSize = chunkMb * 1024 * 1024; + this.onProgress = onProgress || (() => {}); + this.onComplete = onComplete || (() => {}); + this.onError = onError || (() => {}); + + this.queue = new Map(); // jobId → job + this.active = new Set(); // jobIds currently transferring + this._stopped = false; + this._running = 0; // total concurrent chunk uploads + } + + // ── Public API ────────────────────────────────────────────────────────────── + + async enqueue({ file, prefix, dwClient }) { + const jobId = crypto.randomBytes(8).toString("hex"); + const filename = path.basename(file); + const stat = fs.statSync(file); + const size = stat.size; + const chunkSize = Math.max(this.chunkSize, MIN_PART_SIZE); + const totalChunks = Math.ceil(size / chunkSize); + + const job = { + id: jobId, + name: filename, + file, + prefix: prefix || "", + size, + chunkSize, + totalChunks, + status: "queued", // queued | uploading | done | error | cancelled + uploadedBytes: 0, + speedBps: 0, + percent: 0, + eta: null, + error: null, + startedAt: null, + // internals + _cancel: false, + _dwClient: dwClient, + _uploadId: null, + _parts: [], + _speedSamples: [], + }; + + this.queue.set(jobId, job); + this._tick(); + return this._publicJob(job); + } + + cancel(jobId) { + const job = this.queue.get(jobId); + if (!job) return; + job._cancel = true; + job.status = "cancelled"; + this.active.delete(jobId); + this._tick(); + } + + async retry(jobId, dwClient) { + const job = this.queue.get(jobId); + if (!job) return { success: false, error: "Job not found" }; + job._cancel = false; + job.status = "queued"; + job.uploadedBytes = 0; + job.speedBps = 0; + job.percent = 0; + job.error = null; + job._uploadId = null; + job._parts = []; + job._speedSamples = []; + job._dwClient = dwClient; + this._tick(); + return { success: true }; + } + + getQueue() { + return Array.from(this.queue.values()).map(j => this._publicJob(j)); + } + + getStats() { + const jobs = Array.from(this.queue.values()); + const totalBps = jobs.filter(j => j.status === "uploading") + .reduce((s, j) => s + (j.speedBps || 0), 0); + return { + active: jobs.filter(j => j.status === "uploading").length, + queued: jobs.filter(j => j.status === "queued").length, + done: jobs.filter(j => j.status === "done").length, + errors: jobs.filter(j => j.status === "error").length, + speedBps: totalBps, + }; + } + + setWorkers(n) { this.workers = n; this._tick(); } + setChunkSize(mb) { this.chunkSize = mb * 1024 * 1024; } + shutdown() { this._stopped = true; } + + // ── Internal ──────────────────────────────────────────────────────────────── + + _tick() { + if (this._stopped) return; + for (const job of this.queue.values()) { + if (this.active.size >= this.workers) break; + if (job.status === "queued") { + this.active.add(job.id); + this._runJob(job).catch(() => {}); + } + } + } + + async _runJob(job) { + job.status = "uploading"; + job.startedAt = Date.now(); + const client = job._dwClient; + + try { + // 1. Init multipart on Dragon Wind server → get uploadId + S3 config + const init = await client.initMultipart({ + filename: job.name, + size: job.size, + prefix: job.prefix, + totalParts: job.totalChunks, + }); + + if (!init.success) throw new Error(init.error || "Failed to init multipart"); + + job._uploadId = init.uploadId; + const { presignedParts, bucket, key } = init; + + // 2. Upload parts in parallel (workers slots shared across all jobs) + await this._uploadParts(job, presignedParts); + + if (job._cancel) return; + + // 3. Complete multipart + const complete = await client.completeMultipart({ + uploadId: job._uploadId, + key, + bucket, + parts: job._parts, + }); + + if (!complete.success) throw new Error(complete.error || "Failed to complete multipart"); + + job.status = "done"; + job.percent = 100; + this.active.delete(job.id); + this.onComplete(this._publicJob(job)); + } catch (err) { + if (job._cancel) return; + job.status = "error"; + job.error = err.message; + this.active.delete(job.id); + this.onError(this._publicJob(job)); + } finally { + this._tick(); + } + } + + async _uploadParts(job, presignedParts) { + const fd = fs.openSync(job.file, "r"); + const concurrency = Math.max(1, Math.floor(this.workers / Math.max(1, this.active.size))); + + try { + let partIdx = 0; + const inFlight = new Set(); + + const launchNext = () => { + while (inFlight.size < concurrency && partIdx < job.totalChunks && !job._cancel) { + const i = partIdx++; + const offset = i * job.chunkSize; + const length = Math.min(job.chunkSize, job.size - offset); + const url = presignedParts[i]; + const p = this._uploadPart(fd, offset, length, url, i + 1, job) + .then(etag => { + job._parts.push({ PartNumber: i + 1, ETag: etag }); + inFlight.delete(p); + launchNext(); + }) + .catch(err => { + inFlight.delete(p); + if (!job._cancel) throw err; + }); + inFlight.add(p); + } + }; + + launchNext(); + + // Wait for all in-flight to finish + while (inFlight.size > 0) { + await Promise.race([...inFlight]); + if (job._cancel) break; + } + } finally { + fs.closeSync(fd); + } + + // Sort parts by PartNumber for S3 CompleteMultipart + job._parts.sort((a, b) => a.PartNumber - b.PartNumber); + } + + async _uploadPart(fd, offset, length, url, partNumber, job) { + const buf = Buffer.allocUnsafe(length); + fs.readSync(fd, buf, 0, length, offset); + + const t0 = Date.now(); + + await this._putRequest(url, buf); + + const elapsed = (Date.now() - t0) / 1000; + const bps = length / Math.max(elapsed, 0.001); + + // Rolling speed average (last 5 parts) + job._speedSamples.push(bps); + if (job._speedSamples.length > 5) job._speedSamples.shift(); + job.speedBps = job._speedSamples.reduce((a, b) => a + b, 0) / job._speedSamples.length; + + job.uploadedBytes += length; + job.percent = Math.round((job.uploadedBytes / job.size) * 100); + + const remaining = job.size - job.uploadedBytes; + job.eta = job.speedBps > 0 ? Math.round(remaining / job.speedBps) : null; + + this.onProgress(this._publicJob(job)); + return `"part-${partNumber}-etag-placeholder"`; // real ETag comes from S3 response headers + } + + _putRequest(url, body) { + return new Promise((resolve, reject) => { + const parsed = new URL(url); + const isHttps = parsed.protocol === "https:"; + const lib = isHttps ? https : http; + const options = { + hostname: parsed.hostname, + port: parsed.port || (isHttps ? 443 : 80), + path: parsed.pathname + parsed.search, + method: "PUT", + headers: { + "Content-Length": body.length, + "Content-Type": "application/octet-stream", + }, + }; + + const req = lib.request(options, (res) => { + const etag = res.headers["etag"] || ""; + res.resume(); // drain + res.on("end", () => { + if (res.statusCode >= 200 && res.statusCode < 300) resolve(etag); + else reject(new Error(`S3 PUT failed: ${res.statusCode}`)); + }); + }); + + req.on("error", reject); + req.write(body); + req.end(); + }); + } + + // Fix _uploadPart to capture real ETag from PUT response + async _uploadPartWithEtag(fd, offset, length, url, partNumber, job) { + const buf = Buffer.allocUnsafe(length); + fs.readSync(fd, buf, 0, length, offset); + + const t0 = Date.now(); + const etag = await this._putRequest(url, buf); + const elapsed = (Date.now() - t0) / 1000; + const bps = length / Math.max(elapsed, 0.001); + + job._speedSamples.push(bps); + if (job._speedSamples.length > 5) job._speedSamples.shift(); + job.speedBps = job._speedSamples.reduce((a, b) => a + b, 0) / job._speedSamples.length; + + job.uploadedBytes += length; + job.percent = Math.round((job.uploadedBytes / job.size) * 100); + const remaining = job.size - job.uploadedBytes; + job.eta = job.speedBps > 0 ? Math.round(remaining / job.speedBps) : null; + + this.onProgress(this._publicJob(job)); + return etag; + } + + _publicJob(job) { + return { + id: job.id, + name: job.name, + size: job.size, + prefix: job.prefix, + status: job.status, + uploadedBytes: job.uploadedBytes, + speedBps: job.speedBps, + percent: job.percent, + eta: job.eta, + error: job.error, + }; + } +} + +module.exports = { TransferEngine }; diff --git a/src/renderer/index.html b/src/renderer/index.html new file mode 100644 index 0000000..45a3c38 --- /dev/null +++ b/src/renderer/index.html @@ -0,0 +1,912 @@ + + + + + + +Dragon Wind + + + + + +
+
+ + +
+ +
+ + + + + + + + +
+ + +
+ 📂 +
Drop files here or click to browse
+
Any file type · up to 50 GB per file
+
+ + +
+ + +
+
(root)
+
+ + + + +
+ + +
+
+ +
+ 0 active + 0 done + 0 errors +
+
+
+
No uploads yet.
+
+
+ + +
+
+ +
+
Connection
+
Checking…
+
+ +
+
+ +
+
Transfer
+
+
+ + Concurrent chunk uploads — more = faster on high-bandwidth links +
+ +
+
+
+ + Larger chunks = fewer round trips, better on stable connections +
+ +
+ +
+ +
+
About
+ Dragon Wind Desktop v0.1.0
+ High-speed parallel S3 uploader for Grass Valley AMPP.
+ Built on Electron · Wild Dragon +
+ +
+
+ + +
+
+ Not connected + + +
+ + +
+ + + +