import { S3Client, GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3'; import { createReadStream, createWriteStream } from 'fs'; import { readdir } from 'fs/promises'; import { join, extname } from 'path'; import { pipeline } from 'stream/promises'; const CONTENT_TYPES = { '.m3u8': 'application/vnd.apple.mpegurl', '.m4s': 'video/iso.segment', '.mp4': 'video/mp4', }; const createS3Client = () => { return new S3Client({ region: process.env.S3_REGION || 'us-east-1', endpoint: process.env.S3_ENDPOINT, credentials: { accessKeyId: process.env.S3_ACCESS_KEY, secretAccessKey: process.env.S3_SECRET_KEY, }, forcePathStyle: true, }); }; export const downloadFromS3 = async (bucket, key, localPath) => { const client = createS3Client(); try { const response = await client.send( new GetObjectCommand({ Bucket: bucket, Key: key }) ); const writeStream = createWriteStream(localPath); await pipeline(response.Body, writeStream); } finally { client.destroy(); } }; export const uploadToS3 = async (bucket, key, localPath) => { const client = createS3Client(); try { const readStream = createReadStream(localPath); await client.send( new PutObjectCommand({ Bucket: bucket, Key: key, Body: readStream, }) ); } finally { client.destroy(); } }; // Upload every file in `localDir` to `bucket` under `keyPrefix/`. Used for the // HLS proxy output (init.mp4 + segment_*.m4s + playlist.m3u8). Each file goes // up as its own PutObject so individual segments stay small and never trigger // RustFS's broken byte-range path on large objects. export const uploadDirectoryToS3 = async (bucket, keyPrefix, localDir) => { const client = createS3Client(); try { const entries = await readdir(localDir, { withFileTypes: true }); const files = entries.filter(e => e.isFile()).map(e => e.name); for (const name of files) { const ext = extname(name).toLowerCase(); const ct = CONTENT_TYPES[ext] || 'application/octet-stream'; await client.send(new PutObjectCommand({ Bucket: bucket, Key: `${keyPrefix}/${name}`, Body: createReadStream(join(localDir, name)), ContentType: ct, })); } return files; } finally { client.destroy(); } }; // Multipart-aware streaming upload — used by the promotion worker to push // large growing-file masters without buffering them entirely in memory. export const uploadStreamToS3 = async (bucket, key, readable) => { const { Upload } = await import('@aws-sdk/lib-storage'); const client = createS3Client(); try { const upload = new Upload({ client, params: { Bucket: bucket, Key: key, Body: readable }, queueSize: 4, partSize: 8 * 1024 * 1024, }); await upload.done(); } finally { client.destroy(); } };