diff --git a/services/capture/src/capture-manager.js b/services/capture/src/capture-manager.js index 5f1315f..bf57fa5 100644 --- a/services/capture/src/capture-manager.js +++ b/services/capture/src/capture-manager.js @@ -4,6 +4,48 @@ import { dirname } from 'node:path'; import { v4 as uuidv4 } from 'uuid'; import { createUploadStream } from './s3/client.js'; +/** + * Reads the first line from a spawned process's stderr stream. + * Resolves with the parsed JSON object when the first '\n' arrives. + * Rejects if the process exits with a non-zero code before emitting a line, + * or if timeoutMs elapses. + */ +function readFirstStderrLine(proc, timeoutMs = 35_000) { + return new Promise((resolve, reject) => { + let buf = ''; + let settled = false; + const settle = (fn) => { if (settled) return; settled = true; fn(); }; + + const timer = setTimeout(() => { + settle(() => reject(new Error(`deltacast-capture: timed out waiting for format JSON after ${timeoutMs}ms`))); + }, timeoutMs); + + proc.stderr.setEncoding('utf8'); + proc.stderr.on('data', (chunk) => { + buf += chunk; + const nl = buf.indexOf('\n'); + if (nl === -1) return; + const line = buf.slice(0, nl).trim(); + clearTimeout(timer); + try { + const parsed = JSON.parse(line); + if (parsed.error) { + settle(() => reject(new Error(`deltacast-capture: ${parsed.error}`))); + } else { + settle(() => resolve(parsed)); + } + } catch (e) { + settle(() => reject(new Error(`deltacast-capture: invalid JSON on stderr: ${line}`))); + } + }); + + proc.on('exit', (code) => { + clearTimeout(timer); + settle(() => reject(new Error(`deltacast-capture: exited with code ${code} before emitting format JSON`))); + }); + }); +} + const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; // Growing-files mode: writes the master to a local SMB-backed share that the