fix(worker): retry transient S3 aborts + reuse one keep-alive client
Burn test: 5 assets errored during proxy with 'aborted'/'socket hang up' during the master DOWNLOAD. The masters all exist in S3 (262-269MB) — it's the connection-limited RustFS backend dropping streams when 8 jobs hammer it at once. Two fixes: 1. downloadFromS3/uploadToS3 now retry transient failures (aborted, socket hang up, ECONNRESET, timeout, 5xx, throttle) up to 5x with exponential backoff, cleaning the partial file between download attempts. A single mid-stream abort no longer errors the whole asset. 2. Reuse ONE shared S3 client instead of createS3Client()+client.destroy() per call. The per-call destroy tore down the keep-alive agent's sockets every time, so connection pooling never happened and each transfer opened fresh connections — exactly what overwhelmed RustFS. A long-lived client lets the keep-alive pool actually be reused.
This commit is contained in:
parent
80f157968f
commit
3c7cc1a77f
1 changed files with 85 additions and 53 deletions
|
|
@ -21,7 +21,8 @@ const CONTENT_TYPES = {
|
|||
const _httpAgent = new http.Agent({ keepAlive: true, maxSockets: 128, timeout: 600_000 });
|
||||
const _httpsAgent = new https.Agent({ keepAlive: true, maxSockets: 128, timeout: 600_000 });
|
||||
|
||||
const createS3Client = () => {
|
||||
// Build a client. NOTE: callers must NOT destroy() this — see _sharedClient.
|
||||
const buildS3Client = () => {
|
||||
return new S3Client({
|
||||
region: process.env.S3_REGION || 'us-east-1',
|
||||
endpoint: process.env.S3_ENDPOINT,
|
||||
|
|
@ -39,34 +40,73 @@ const createS3Client = () => {
|
|||
});
|
||||
};
|
||||
|
||||
export const downloadFromS3 = async (bucket, key, localPath) => {
|
||||
const client = createS3Client();
|
||||
try {
|
||||
const response = await client.send(
|
||||
new GetObjectCommand({ Bucket: bucket, Key: key })
|
||||
);
|
||||
|
||||
const writeStream = createWriteStream(localPath);
|
||||
await pipeline(response.Body, writeStream);
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
// ONE shared client reused across all operations. Previously every call did
|
||||
// createS3Client() then client.destroy() in finally — which tore down the
|
||||
// keep-alive agent's sockets every time, so pooling never happened and each
|
||||
// transfer opened brand-new connections. Under burn load (8 masters at once)
|
||||
// that hammered the connection-limited RustFS backend into aborting streams.
|
||||
// A single long-lived client lets the keep-alive pool actually be reused.
|
||||
let _sharedClient = null;
|
||||
const createS3Client = () => {
|
||||
if (!_sharedClient) _sharedClient = buildS3Client();
|
||||
return _sharedClient;
|
||||
};
|
||||
|
||||
export const uploadToS3 = async (bucket, key, localPath) => {
|
||||
const client = createS3Client();
|
||||
try {
|
||||
const readStream = createReadStream(localPath);
|
||||
await client.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
Body: readStream,
|
||||
})
|
||||
);
|
||||
} finally {
|
||||
client.destroy();
|
||||
// Transient connection failures that warrant a retry. Under burn load (8 masters
|
||||
// uploading + downloading at once) the connection-limited RustFS S3 backend
|
||||
// aborts/hangs up mid-stream — a single failure used to error the whole proxy
|
||||
// job permanently. These are NOT real "file missing" / auth errors.
|
||||
const _isTransientS3 = (err) => {
|
||||
const s = `${err?.name || ''} ${err?.code || ''} ${err?.message || ''}`.toLowerCase();
|
||||
return /aborted|socket hang up|timeout|econnreset|epipe|econnrefused|enotfound|stream|network|503|500|slowdown|throttl/.test(s);
|
||||
};
|
||||
const _sleep = (ms) => new Promise(r => setTimeout(r, ms));
|
||||
|
||||
export const downloadFromS3 = async (bucket, key, localPath, maxAttempts = 5) => {
|
||||
let lastErr = null;
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
const client = createS3Client();
|
||||
try {
|
||||
const response = await client.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
|
||||
const writeStream = createWriteStream(localPath);
|
||||
await pipeline(response.Body, writeStream);
|
||||
return; // success
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
// Clean the partial file before retrying so we don't leave a truncated master.
|
||||
try { await (await import('node:fs/promises')).unlink(localPath); } catch (_) {}
|
||||
if (attempt < maxAttempts && _isTransientS3(err)) {
|
||||
const backoff = Math.min(8000, 400 * 2 ** (attempt - 1)); // 400,800,1600,3200ms
|
||||
console.warn(`[s3] download ${key} attempt ${attempt}/${maxAttempts} failed (${err.name || err.message}); retrying in ${backoff}ms`);
|
||||
await _sleep(backoff);
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
throw lastErr;
|
||||
};
|
||||
|
||||
export const uploadToS3 = async (bucket, key, localPath, maxAttempts = 5) => {
|
||||
let lastErr = null;
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
const client = createS3Client();
|
||||
try {
|
||||
const readStream = createReadStream(localPath);
|
||||
await client.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body: readStream }));
|
||||
return; // success
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
if (attempt < maxAttempts && _isTransientS3(err)) {
|
||||
const backoff = Math.min(8000, 400 * 2 ** (attempt - 1));
|
||||
console.warn(`[s3] upload ${key} attempt ${attempt}/${maxAttempts} failed (${err.name || err.message}); retrying in ${backoff}ms`);
|
||||
await _sleep(backoff);
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
throw lastErr;
|
||||
};
|
||||
|
||||
// Upload every file in `localDir` to `bucket` under `keyPrefix/`. Used for the
|
||||
|
|
@ -75,23 +115,19 @@ export const uploadToS3 = async (bucket, key, localPath) => {
|
|||
// RustFS's broken byte-range path on large objects.
|
||||
export const uploadDirectoryToS3 = async (bucket, keyPrefix, localDir) => {
|
||||
const client = createS3Client();
|
||||
try {
|
||||
const entries = await readdir(localDir, { withFileTypes: true });
|
||||
const files = entries.filter(e => e.isFile()).map(e => e.name);
|
||||
for (const name of files) {
|
||||
const ext = extname(name).toLowerCase();
|
||||
const ct = CONTENT_TYPES[ext] || 'application/octet-stream';
|
||||
await client.send(new PutObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: `${keyPrefix}/${name}`,
|
||||
Body: createReadStream(join(localDir, name)),
|
||||
ContentType: ct,
|
||||
}));
|
||||
}
|
||||
return files;
|
||||
} finally {
|
||||
client.destroy();
|
||||
const entries = await readdir(localDir, { withFileTypes: true });
|
||||
const files = entries.filter(e => e.isFile()).map(e => e.name);
|
||||
for (const name of files) {
|
||||
const ext = extname(name).toLowerCase();
|
||||
const ct = CONTENT_TYPES[ext] || 'application/octet-stream';
|
||||
await client.send(new PutObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: `${keyPrefix}/${name}`,
|
||||
Body: createReadStream(join(localDir, name)),
|
||||
ContentType: ct,
|
||||
}));
|
||||
}
|
||||
return files;
|
||||
};
|
||||
|
||||
// Multipart-aware streaming upload — used by the promotion worker to push
|
||||
|
|
@ -99,15 +135,11 @@ export const uploadDirectoryToS3 = async (bucket, keyPrefix, localDir) => {
|
|||
export const uploadStreamToS3 = async (bucket, key, readable) => {
|
||||
const { Upload } = await import('@aws-sdk/lib-storage');
|
||||
const client = createS3Client();
|
||||
try {
|
||||
const upload = new Upload({
|
||||
client,
|
||||
params: { Bucket: bucket, Key: key, Body: readable },
|
||||
queueSize: 4,
|
||||
partSize: 8 * 1024 * 1024,
|
||||
});
|
||||
await upload.done();
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
const upload = new Upload({
|
||||
client,
|
||||
params: { Bucket: bucket, Key: key, Body: readable },
|
||||
queueSize: 4,
|
||||
partSize: 8 * 1024 * 1024,
|
||||
});
|
||||
await upload.done();
|
||||
};
|
||||
|
|
|
|||
Loading…
Reference in a new issue