From 3c7cc1a77f9cabf9f81200d36613e94ba8fbb619 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Thu, 4 Jun 2026 16:56:11 +0000 Subject: [PATCH] fix(worker): retry transient S3 aborts + reuse one keep-alive client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Burn test: 5 assets errored during proxy with 'aborted'/'socket hang up' during the master DOWNLOAD. The masters all exist in S3 (262-269MB) — it's the connection-limited RustFS backend dropping streams when 8 jobs hammer it at once. Two fixes: 1. downloadFromS3/uploadToS3 now retry transient failures (aborted, socket hang up, ECONNRESET, timeout, 5xx, throttle) up to 5x with exponential backoff, cleaning the partial file between download attempts. A single mid-stream abort no longer errors the whole asset. 2. Reuse ONE shared S3 client instead of createS3Client()+client.destroy() per call. The per-call destroy tore down the keep-alive agent's sockets every time, so connection pooling never happened and each transfer opened fresh connections — exactly what overwhelmed RustFS. A long-lived client lets the keep-alive pool actually be reused. --- services/worker/src/s3/client.js | 138 +++++++++++++++++++------------ 1 file changed, 85 insertions(+), 53 deletions(-) diff --git a/services/worker/src/s3/client.js b/services/worker/src/s3/client.js index 29bfca1..2c893b2 100644 --- a/services/worker/src/s3/client.js +++ b/services/worker/src/s3/client.js @@ -21,7 +21,8 @@ const CONTENT_TYPES = { const _httpAgent = new http.Agent({ keepAlive: true, maxSockets: 128, timeout: 600_000 }); const _httpsAgent = new https.Agent({ keepAlive: true, maxSockets: 128, timeout: 600_000 }); -const createS3Client = () => { +// Build a client. NOTE: callers must NOT destroy() this — see _sharedClient. +const buildS3Client = () => { return new S3Client({ region: process.env.S3_REGION || 'us-east-1', endpoint: process.env.S3_ENDPOINT, @@ -39,34 +40,73 @@ const createS3Client = () => { }); }; -export const downloadFromS3 = async (bucket, key, localPath) => { - const client = createS3Client(); - try { - const response = await client.send( - new GetObjectCommand({ Bucket: bucket, Key: key }) - ); - - const writeStream = createWriteStream(localPath); - await pipeline(response.Body, writeStream); - } finally { - client.destroy(); - } +// ONE shared client reused across all operations. Previously every call did +// createS3Client() then client.destroy() in finally — which tore down the +// keep-alive agent's sockets every time, so pooling never happened and each +// transfer opened brand-new connections. Under burn load (8 masters at once) +// that hammered the connection-limited RustFS backend into aborting streams. +// A single long-lived client lets the keep-alive pool actually be reused. +let _sharedClient = null; +const createS3Client = () => { + if (!_sharedClient) _sharedClient = buildS3Client(); + return _sharedClient; }; -export const uploadToS3 = async (bucket, key, localPath) => { - const client = createS3Client(); - try { - const readStream = createReadStream(localPath); - await client.send( - new PutObjectCommand({ - Bucket: bucket, - Key: key, - Body: readStream, - }) - ); - } finally { - client.destroy(); +// Transient connection failures that warrant a retry. Under burn load (8 masters +// uploading + downloading at once) the connection-limited RustFS S3 backend +// aborts/hangs up mid-stream — a single failure used to error the whole proxy +// job permanently. These are NOT real "file missing" / auth errors. +const _isTransientS3 = (err) => { + const s = `${err?.name || ''} ${err?.code || ''} ${err?.message || ''}`.toLowerCase(); + return /aborted|socket hang up|timeout|econnreset|epipe|econnrefused|enotfound|stream|network|503|500|slowdown|throttl/.test(s); +}; +const _sleep = (ms) => new Promise(r => setTimeout(r, ms)); + +export const downloadFromS3 = async (bucket, key, localPath, maxAttempts = 5) => { + let lastErr = null; + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + const client = createS3Client(); + try { + const response = await client.send(new GetObjectCommand({ Bucket: bucket, Key: key })); + const writeStream = createWriteStream(localPath); + await pipeline(response.Body, writeStream); + return; // success + } catch (err) { + lastErr = err; + // Clean the partial file before retrying so we don't leave a truncated master. + try { await (await import('node:fs/promises')).unlink(localPath); } catch (_) {} + if (attempt < maxAttempts && _isTransientS3(err)) { + const backoff = Math.min(8000, 400 * 2 ** (attempt - 1)); // 400,800,1600,3200ms + console.warn(`[s3] download ${key} attempt ${attempt}/${maxAttempts} failed (${err.name || err.message}); retrying in ${backoff}ms`); + await _sleep(backoff); + continue; + } + throw err; + } } + throw lastErr; +}; + +export const uploadToS3 = async (bucket, key, localPath, maxAttempts = 5) => { + let lastErr = null; + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + const client = createS3Client(); + try { + const readStream = createReadStream(localPath); + await client.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body: readStream })); + return; // success + } catch (err) { + lastErr = err; + if (attempt < maxAttempts && _isTransientS3(err)) { + const backoff = Math.min(8000, 400 * 2 ** (attempt - 1)); + console.warn(`[s3] upload ${key} attempt ${attempt}/${maxAttempts} failed (${err.name || err.message}); retrying in ${backoff}ms`); + await _sleep(backoff); + continue; + } + throw err; + } + } + throw lastErr; }; // Upload every file in `localDir` to `bucket` under `keyPrefix/`. Used for the @@ -75,23 +115,19 @@ export const uploadToS3 = async (bucket, key, localPath) => { // RustFS's broken byte-range path on large objects. export const uploadDirectoryToS3 = async (bucket, keyPrefix, localDir) => { const client = createS3Client(); - try { - const entries = await readdir(localDir, { withFileTypes: true }); - const files = entries.filter(e => e.isFile()).map(e => e.name); - for (const name of files) { - const ext = extname(name).toLowerCase(); - const ct = CONTENT_TYPES[ext] || 'application/octet-stream'; - await client.send(new PutObjectCommand({ - Bucket: bucket, - Key: `${keyPrefix}/${name}`, - Body: createReadStream(join(localDir, name)), - ContentType: ct, - })); - } - return files; - } finally { - client.destroy(); + const entries = await readdir(localDir, { withFileTypes: true }); + const files = entries.filter(e => e.isFile()).map(e => e.name); + for (const name of files) { + const ext = extname(name).toLowerCase(); + const ct = CONTENT_TYPES[ext] || 'application/octet-stream'; + await client.send(new PutObjectCommand({ + Bucket: bucket, + Key: `${keyPrefix}/${name}`, + Body: createReadStream(join(localDir, name)), + ContentType: ct, + })); } + return files; }; // Multipart-aware streaming upload — used by the promotion worker to push @@ -99,15 +135,11 @@ export const uploadDirectoryToS3 = async (bucket, keyPrefix, localDir) => { export const uploadStreamToS3 = async (bucket, key, readable) => { const { Upload } = await import('@aws-sdk/lib-storage'); const client = createS3Client(); - try { - const upload = new Upload({ - client, - params: { Bucket: bucket, Key: key, Body: readable }, - queueSize: 4, - partSize: 8 * 1024 * 1024, - }); - await upload.done(); - } finally { - client.destroy(); - } + const upload = new Upload({ + client, + params: { Bucket: bucket, Key: key, Body: readable }, + queueSize: 4, + partSize: 8 * 1024 * 1024, + }); + await upload.done(); };