From 9ad88e4df40718e7ea286431344b6e1938f9f40a Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Sat, 23 May 2026 16:05:41 -0400 Subject: [PATCH] =?UTF-8?q?feat(ingest):=20YouTube=20importer=20=E2=80=94?= =?UTF-8?q?=20paste=20link,=20asset=20travels=20normal=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds Ingest → YouTube. UI takes a URL + project, API enqueues a BullMQ "import" job, worker shells out to yt-dlp, lands the MP4 in S3 at the same originals/{assetId}/... path uploads use, then hands off to the existing proxy queue. Imported assets share one lifecycle with uploads from that point on. Worker container picks up yt-dlp + python3 (apk on alpine, apt on the GPU variant). The new 'import' queue is registered in jobs.js so it appears in the Jobs SSE stream and retry/delete work for free. Spec: docs/superpowers/specs/2026-05-23-youtube-importer-design.md Co-Authored-By: Claude Opus 4.7 --- .../src/db/migrations/011-youtube-import.sql | 15 ++ services/mam-api/src/index.js | 2 + services/mam-api/src/routes/imports.js | 96 ++++++++ services/mam-api/src/routes/jobs.js | 2 + services/web-ui/public/app.jsx | 2 + services/web-ui/public/screens-ingest.jsx | 194 ++++++++++++++- services/web-ui/public/screens-jobs.jsx | 2 +- services/web-ui/public/shell.jsx | 3 +- services/worker/Dockerfile | 3 +- services/worker/Dockerfile.gpu | 5 +- services/worker/src/index.js | 14 +- services/worker/src/workers/youtube-import.js | 223 ++++++++++++++++++ 12 files changed, 553 insertions(+), 8 deletions(-) create mode 100644 services/mam-api/src/db/migrations/011-youtube-import.sql create mode 100644 services/mam-api/src/routes/imports.js create mode 100644 services/worker/src/workers/youtube-import.js diff --git a/services/mam-api/src/db/migrations/011-youtube-import.sql b/services/mam-api/src/db/migrations/011-youtube-import.sql new file mode 100644 index 0000000..25aebc8 --- /dev/null +++ b/services/mam-api/src/db/migrations/011-youtube-import.sql @@ -0,0 +1,15 @@ +-- 2026-05: YouTube importer — new job type + remember source URL on assets. +-- +-- Job type enum gains 'youtube_import' so the Jobs screen can show imports +-- alongside proxy / thumbnail / conform. Assets gain source_url so an +-- imported asset remembers where it came from (used by the Asset Detail +-- page and, later, dedup checks). + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'youtube_import' AND enumtypid = 'job_type'::regtype) THEN + ALTER TYPE job_type ADD VALUE 'youtube_import'; + END IF; +END $$; + +ALTER TABLE assets ADD COLUMN IF NOT EXISTS source_url TEXT; diff --git a/services/mam-api/src/index.js b/services/mam-api/src/index.js index bcff8d5..1ba708a 100644 --- a/services/mam-api/src/index.js +++ b/services/mam-api/src/index.js @@ -30,6 +30,7 @@ import sdkRouter from './routes/sdk.js'; import schedulesRouter from './routes/schedules.js'; import metricsRouter from './routes/metrics.js'; import commentsRouter from './routes/comments.js'; +import importsRouter from './routes/imports.js'; import { startSchedulerLoop } from './scheduler.js'; const app = express(); @@ -83,6 +84,7 @@ app.use('/api/v1/sdk', sdkRouter); app.use('/api/v1/schedules', schedulesRouter); app.use('/api/v1/metrics', metricsRouter); app.use('/api/v1/assets/:assetId/comments', commentsRouter); +app.use('/api/v1/imports', importsRouter); // ── Error handler ───────────────────────────────────────────────────────────── app.use(errorHandler); diff --git a/services/mam-api/src/routes/imports.js b/services/mam-api/src/routes/imports.js new file mode 100644 index 0000000..33afd64 --- /dev/null +++ b/services/mam-api/src/routes/imports.js @@ -0,0 +1,96 @@ +// External media imports — currently YouTube only. +// +// The flow mirrors upload.js: create the asset row up front with a placeholder +// filename (the worker fills in the real title once yt-dlp prints metadata), +// then enqueue a BullMQ job. The worker downloads, lands the file in S3 at the +// same originals/{assetId}/... path uploads use, and hands off to the existing +// proxy queue — so an imported asset travels the same lifecycle as any upload. + +import express from 'express'; +import { Queue } from 'bullmq'; +import { v4 as uuidv4 } from 'uuid'; +import pool from '../db/pool.js'; +import { requireAuth } from '../middleware/auth.js'; + +const router = express.Router(); +router.use(requireAuth); + +const parseRedisUrl = (url) => { + try { + const parsed = new URL(url); + return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 }; + } catch { + return { host: 'localhost', port: 6379 }; + } +}; + +const importQueue = new Queue('import', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); + +// Match the same three forms the client UI validates against. Server is the +// authoritative check — never trust the client to have validated. +const YT_PATTERNS = [ + /^https?:\/\/(?:www\.|m\.)?youtube\.com\/watch\?[^ ]*v=[A-Za-z0-9_-]{11}/i, + /^https?:\/\/youtu\.be\/[A-Za-z0-9_-]{11}/i, + /^https?:\/\/(?:www\.)?youtube\.com\/shorts\/[A-Za-z0-9_-]{11}/i, +]; + +function isYouTubeUrl(url) { + return typeof url === 'string' && YT_PATTERNS.some((re) => re.test(url)); +} + +// POST /api/v1/imports/youtube — body { url, projectId, binId? } +router.post('/youtube', async (req, res, next) => { + try { + const { url, projectId, binId } = req.body || {}; + + if (!url || !projectId) { + return res.status(400).json({ error: 'url and projectId are required' }); + } + if (!isYouTubeUrl(url)) { + return res.status(400).json({ error: 'Invalid YouTube URL' }); + } + // A playlist URL has `list=…` — yt-dlp's --no-playlist would still grab + // the single video, but the operator probably meant "import the list" and + // we don't support that yet. Reject so the intent is explicit. + if (/[?&]list=/i.test(url)) { + return res.status(400).json({ error: "Playlists aren't supported yet" }); + } + + const projCheck = await pool.query('SELECT id FROM projects WHERE id = $1', [projectId]); + if (projCheck.rows.length === 0) { + return res.status(404).json({ error: 'Project not found' }); + } + + const assetId = uuidv4(); + + // Placeholder filename/display_name — the worker overwrites both once + // yt-dlp resolves the video title (usually within a second or two). + await pool.query( + `INSERT INTO assets ( + id, project_id, bin_id, filename, display_name, status, + media_type, original_s3_key, source_url, created_at, updated_at + ) + VALUES ($1, $2, $3, $4, $4, 'ingesting', 'video', NULL, $5, NOW(), NOW())`, + [assetId, projectId, binId || null, url, url] + ); + + const bullJob = await importQueue.add('youtube', { + assetId, + url, + // Surface the URL in the Jobs screen until the worker fills in the title. + assetName: url, + }); + + res.status(202).json({ + assetId, + jobId: `import:${bullJob.id}`, + status: 'queued', + }); + } catch (err) { + next(err); + } +}); + +export default router; diff --git a/services/mam-api/src/routes/jobs.js b/services/mam-api/src/routes/jobs.js index 3a5802d..8227006 100644 --- a/services/mam-api/src/routes/jobs.js +++ b/services/mam-api/src/routes/jobs.js @@ -21,11 +21,13 @@ const redisConn = parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'); const proxyQueue = new Queue('proxy', { connection: redisConn }); const thumbnailQueue = new Queue('thumbnail', { connection: redisConn }); const conformQueue = new Queue('conform', { connection: redisConn }); +const importQueue = new Queue('import', { connection: redisConn }); const QUEUES = [ { queue: proxyQueue, type: 'proxy' }, { queue: thumbnailQueue, type: 'thumbnail' }, { queue: conformQueue, type: 'conform' }, + { queue: importQueue, type: 'import' }, ]; // BullMQ state → API status mapping diff --git a/services/web-ui/public/app.jsx b/services/web-ui/public/app.jsx index b144017..c98c678 100644 --- a/services/web-ui/public/app.jsx +++ b/services/web-ui/public/app.jsx @@ -41,6 +41,7 @@ function App() { library: ['Library'], projects: ['Projects'], upload: ['Ingest', 'Upload'], recorders: ['Ingest', 'Recorders'], schedule: ['Ingest', 'Schedule'], + youtube: ['Ingest', 'YouTube'], capture: ['Ingest', 'Capture'], monitors: ['Ingest', 'Monitors'], jobs: ['Jobs'], editor: ['Editor'], users: ['Admin', 'Users & Groups'], tokens: ['Admin', 'Tokens'], @@ -74,6 +75,7 @@ function App() { case 'upload': content = ; break; case 'recorders': content = setShowNewRecorder(true)} />; break; case 'schedule': content = ; break; + case 'youtube': content = ; break; case 'capture': content = ; break; case 'monitors': content = ; break; case 'jobs': content = ; break; diff --git a/services/web-ui/public/screens-ingest.jsx b/services/web-ui/public/screens-ingest.jsx index 77c24d0..c58d912 100644 --- a/services/web-ui/public/screens-ingest.jsx +++ b/services/web-ui/public/screens-ingest.jsx @@ -181,6 +181,198 @@ function Upload({ navigate }) { ); } +/* ===== YouTube importer ===== */ +// Accept the same three URL shapes the API validates against. +const _YT_PATTERNS = [ + /^https?:\/\/(?:www\.|m\.)?youtube\.com\/watch\?[^ ]*v=[A-Za-z0-9_-]{11}/i, + /^https?:\/\/youtu\.be\/[A-Za-z0-9_-]{11}/i, + /^https?:\/\/(?:www\.)?youtube\.com\/shorts\/[A-Za-z0-9_-]{11}/i, +]; +function _looksLikeYouTube(s) { + return typeof s === 'string' && _YT_PATTERNS.some(re => re.test(s.trim())); +} + +function YouTubeImport({ navigate }) { + const PROJECTS = window.ZAMPP_DATA?.PROJECTS || []; + const [projectId, setProjectId] = React.useState(PROJECTS[0]?.id || ''); + const [url, setUrl] = React.useState(''); + const [queue, setQueue] = React.useState([]); // { id, url, status, progress, title, error, assetId, jobId } + const [submitting, setSubmitting] = React.useState(false); + + const valid = _looksLikeYouTube(url); + + const updateRow = React.useCallback((id, patch) => { + setQueue(prev => prev.map(r => r.id === id ? { ...r, ...patch } : r)); + }, []); + + // Poll the asset row to pick up the title once yt-dlp resolves it, and the + // proxy job's progress so the queue row reflects the full lifecycle, not + // just the import step. + const pollRow = React.useCallback((row) => { + if (!row.assetId) return; + let stopped = false; + const tick = async () => { + if (stopped) return; + try { + const asset = await window.ZAMPP_API.fetch('/assets/' + row.assetId); + const patch = {}; + if (asset.display_name && asset.display_name !== row.url) patch.title = asset.display_name; + if (asset.status === 'ready') { + patch.status = 'done'; + patch.progress = 100; + } else if (asset.status === 'error') { + patch.status = 'error'; + patch.error = patch.error || 'Import failed — check the Jobs screen for details.'; + } else if (asset.status === 'processing') { + patch.status = 'processing'; + } + if (Object.keys(patch).length) updateRow(row.id, patch); + if (asset.status === 'ready' || asset.status === 'error') return; + } catch { /* ignore */ } + setTimeout(tick, 3000); + }; + tick(); + return () => { stopped = true; }; + }, [updateRow]); + + const submit = React.useCallback(async () => { + if (!valid || !projectId || submitting) return; + setSubmitting(true); + const rowId = Date.now(); + const row = { + id: rowId, + url: url.trim(), + status: 'queued', + progress: 0, + title: '', + error: null, + assetId: null, + jobId: null, + }; + setQueue(prev => [row, ...prev]); + try { + const res = await window.ZAMPP_API.fetch('/imports/youtube', { + method: 'POST', + body: JSON.stringify({ url: row.url, projectId }), + }); + updateRow(rowId, { assetId: res.assetId, jobId: res.jobId, status: 'downloading' }); + pollRow({ ...row, assetId: res.assetId, jobId: res.jobId }); + setUrl(''); + } catch (e) { + updateRow(rowId, { status: 'error', error: e.message || 'Failed to start import' }); + } finally { + setSubmitting(false); + } + }, [valid, projectId, submitting, url, updateRow, pollRow]); + + return ( +
+
+

YouTube

+ Paste a link — we download and import the best available MP4. +
+
+
+
+ + +
+
+ +
+ +
+ setUrl(e.target.value)} + onKeyDown={e => { if (e.key === 'Enter' && valid) submit(); }} + placeholder="https://www.youtube.com/watch?v=… or https://youtu.be/…" + style={{ flex: 1 }} + autoFocus + /> + +
+ {url && !valid && ( +
+ That doesn't look like a YouTube URL. +
+ )} +
+ Only import videos you have rights to use. Private, age-gated, and members-only videos are not supported. +
+
+ + {queue.length > 0 && ( +
+
+ Queue {queue.length} + + +
+
+ {queue.map(r => { + const statusColor = + r.status === 'done' ? 'var(--success)' : + r.status === 'error' ? 'var(--danger)' : 'var(--text-3)'; + return ( +
+ +
+
+ + {r.title || r.url} + +
+ {r.title && ( +
+ {r.url} +
+ )} +
+
+
+ {r.error && ( +
{r.error}
+ )} +
+ + {r.status === 'done' ? '✓ done' + : r.status === 'error' ? '✗ failed' + : r.status === 'processing'? 'processing' + : r.status === 'downloading' ? 'downloading' + : 'queued'} + +
+ ); + })} +
+
+ )} +
+
+ ); +} + /* ===== Live preview (HLS) ==================================== Shared by RecorderRow + MonitorTile. The capture container writes HLS segments to /live/{assetId}/index.m3u8 (see capture-manager.js @@ -1396,4 +1588,4 @@ function NewScheduleModal({ recorders, onClose, onCreated, defaultStart, default ); } -Object.assign(window, { Upload, Recorders, Capture, Monitors, Schedule }); +Object.assign(window, { Upload, Recorders, Capture, Monitors, Schedule, YouTubeImport }); diff --git a/services/web-ui/public/screens-jobs.jsx b/services/web-ui/public/screens-jobs.jsx index 5d53ff8..ebf208f 100644 --- a/services/web-ui/public/screens-jobs.jsx +++ b/services/web-ui/public/screens-jobs.jsx @@ -45,7 +45,7 @@ function Jobs({ navigate }) { const normalizeJob = (j) => { const statusMap = { waiting: 'queued', active: 'running', completed: 'done', failed: 'failed' }; - const kindMap = { proxy: 'Proxy', thumbnail: 'Thumbnail', conform: 'Conform', transcode: 'Transcode' }; + const kindMap = { proxy: 'Proxy', thumbnail: 'Thumbnail', conform: 'Conform', transcode: 'Transcode', import: 'YouTube' }; const meta = j.metadata || {}; return { ...j, diff --git a/services/web-ui/public/shell.jsx b/services/web-ui/public/shell.jsx index 7d3ebaf..a565fec 100644 --- a/services/web-ui/public/shell.jsx +++ b/services/web-ui/public/shell.jsx @@ -9,6 +9,7 @@ const NAV_TREE = [ id: "ingest", label: "Ingest", icon: "upload", group: true, children: [ { id: "upload", label: "Upload", icon: "upload" }, + { id: "youtube", label: "YouTube", icon: "download" }, { id: "recorders", label: "Recorders", icon: "record" }, { id: "schedule", label: "Schedule", icon: "jobs" }, { id: "capture", label: "Capture", icon: "capture" }, @@ -89,7 +90,7 @@ function Sidebar({ active, onNavigate, me }) { }; React.useEffect(() => { - const ingestChildren = ["upload", "recorders", "schedule", "capture", "monitors"]; + const ingestChildren = ["upload", "youtube", "recorders", "schedule", "capture", "monitors"]; if (ingestChildren.includes(active)) { setOpenGroups(prev => prev.has("ingest") ? prev : new Set([...prev, "ingest"])); } diff --git a/services/worker/Dockerfile b/services/worker/Dockerfile index c8c75e7..b6c5733 100644 --- a/services/worker/Dockerfile +++ b/services/worker/Dockerfile @@ -1,5 +1,6 @@ FROM node:20-alpine -RUN apk add --no-cache ffmpeg +# yt-dlp powers the YouTube importer; python3 is its runtime dep. +RUN apk add --no-cache ffmpeg yt-dlp python3 WORKDIR /app COPY package*.json ./ RUN npm install --omit=dev diff --git a/services/worker/Dockerfile.gpu b/services/worker/Dockerfile.gpu index 364f8a7..3dd8466 100644 --- a/services/worker/Dockerfile.gpu +++ b/services/worker/Dockerfile.gpu @@ -8,9 +8,10 @@ FROM nvcr.io/nvidia/cuda:12.3.1-base-ubuntu22.04 -# Install Node.js 20 and ffmpeg (Ubuntu's ffmpeg includes h264_nvenc/hevc_nvenc) +# Install Node.js 20, ffmpeg (Ubuntu's ffmpeg includes h264_nvenc/hevc_nvenc), +# and yt-dlp (+ python3 runtime) for the YouTube importer. RUN apt-get update && apt-get install -y --no-install-recommends \ - curl ca-certificates ffmpeg \ + curl ca-certificates ffmpeg yt-dlp python3 \ && curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ && apt-get install -y --no-install-recommends nodejs \ && apt-get clean && rm -rf /var/lib/apt/lists/* diff --git a/services/worker/src/index.js b/services/worker/src/index.js index 979b50e..bb56f1d 100644 --- a/services/worker/src/index.js +++ b/services/worker/src/index.js @@ -3,6 +3,7 @@ import { Worker } from 'bullmq'; import { proxyWorker } from './workers/proxy.js'; import { thumbnailWorker } from './workers/thumbnail.js'; import { conformWorker } from './workers/conform.js'; +import { youtubeImportWorker } from './workers/youtube-import.js'; import { startPromotionWorker } from './workers/promotion.js'; const parseRedisUrl = (url) => { @@ -16,7 +17,7 @@ const parseRedisUrl = (url) => { const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379'); -const createWorker = (queueName, handler) => { +const createWorker = (queueName, handler, overrides = {}) => { const worker = new Worker(queueName, handler, { connection: redisOptions, // Stall detection: if a worker dies mid-job, BullMQ moves it back to wait @@ -25,6 +26,7 @@ const createWorker = (queueName, handler) => { maxStalledCount: 1, lockDuration: 60000, lockRenewTime: 15000, + ...overrides, }); worker.on('completed', (job) => { @@ -50,13 +52,21 @@ const workers = [ createWorker('proxy', proxyWorker), createWorker('thumbnail', thumbnailWorker), createWorker('conform', conformWorker), + // YouTube imports: keep concurrency at 1 so we don't burn through rate + // limits when several jobs land back-to-back. Lock window is longer than + // the default because a long video download can run for minutes. + createWorker('import', youtubeImportWorker, { + concurrency: 1, + lockDuration: 10 * 60 * 1000, + lockRenewTime: 60000, + }), ]; startPromotionWorker(); console.log('Wild Dragon Worker Service started'); console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`); -console.log('Active queues: proxy, thumbnail, conform'); +console.log('Active queues: proxy, thumbnail, conform, import'); console.log('Background scans: promotion (growing-files → S3)'); process.on('SIGTERM', async () => { diff --git a/services/worker/src/workers/youtube-import.js b/services/worker/src/workers/youtube-import.js new file mode 100644 index 0000000..12472ca --- /dev/null +++ b/services/worker/src/workers/youtube-import.js @@ -0,0 +1,223 @@ +// YouTube importer worker — shells out to yt-dlp, lands the resulting MP4 in +// S3 at the same originals/{assetId}/.mp4 path uploads use, then hands +// off to the existing proxy queue. From that point an imported asset is +// indistinguishable from an uploaded one. + +import { spawn } from 'node:child_process'; +import { join } from 'node:path'; +import { mkdtemp, rm, stat, readdir } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { Queue } from 'bullmq'; +import { query } from '../db/client.js'; +import { uploadToS3 } from '../s3/client.js'; +import { getMediaInfo } from '../ffmpeg/executor.js'; + +const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; + +const parseRedisUrl = (url) => { + const parsed = new URL(url); + return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; +}; + +// Hand off to the existing proxy queue once the original is in S3. +const proxyQueue = new Queue('proxy', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); + +// Map yt-dlp stderr lines to short, operator-friendly messages. Anything that +// doesn't match here falls back to the raw stderr (truncated). +function friendlyError(stderr) { + const s = stderr || ''; + if (/Private video/i.test(s)) return 'Private video — not supported.'; + if (/Sign in to confirm your age/i.test(s)) return 'Age-restricted video — not supported.'; + if (/members[- ]only/i.test(s)) return 'Members-only video — not supported.'; + if (/Video unavailable/i.test(s)) return 'Video unavailable or removed.'; + if (/not available in your country|geo[- ]?restricted/i.test(s)) return 'Video is geo-blocked from this region.'; + if (/HTTP Error 429/i.test(s)) return 'YouTube rate-limited the importer — try again later.'; + if (/Unable to extract|Unsupported URL/i.test(s)) return 'YouTube changed its API — worker image needs a rebuild.'; + + // Last-resort: the first ERROR: line from stderr, capped. + const m = s.match(/ERROR:\s*([^\n]+)/i); + const raw = (m ? m[1] : s).trim().slice(0, 300); + return raw || 'yt-dlp failed with no error message'; +} + +// Replace anything outside [A-Za-z0-9 ._-] with '-', collapse runs of +// whitespace/dashes, trim, cap to 120 chars. The .mp4 extension is appended +// by the caller. If the result is empty we fall back to the video ID. +function sanitizeTitle(title, videoId) { + if (!title || typeof title !== 'string') return `youtube-${videoId}`; + let out = title + .replace(/[^\w .\-]+/g, '-') + .replace(/[-\s]+/g, ' ') + .trim() + .slice(0, 120); + if (!out) out = `youtube-${videoId}`; + return out; +} + +// Run yt-dlp with progress streaming. Returns the parsed --print-json line. +// Throws if yt-dlp exits non-zero — the caller maps stderr to a friendly msg. +async function runYtDlp({ url, outputTemplate, onProgress }) { + return new Promise((resolve, reject) => { + const args = [ + '--no-playlist', + '--no-warnings', + '--restrict-filenames', + '-f', "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4]/b", + '--merge-output-format', 'mp4', + '--print-json', + '--newline', + '-o', outputTemplate, + url, + ]; + + const proc = spawn('yt-dlp', args); + let stdoutBuf = ''; + let stderrBuf = ''; + let lastJsonLine = null; + + proc.stdout.on('data', (chunk) => { + stdoutBuf += chunk.toString(); + let nl; + while ((nl = stdoutBuf.indexOf('\n')) !== -1) { + const line = stdoutBuf.slice(0, nl); + stdoutBuf = stdoutBuf.slice(nl + 1); + + // [download] 42.3% of 53.21MiB at 4.21MiB/s ETA 00:07 + const m = line.match(/\[download\]\s+(\d+(?:\.\d+)?)%/); + if (m && onProgress) onProgress(parseFloat(m[1])); + + // --print-json emits one JSON line at the end of a successful download. + if (line.startsWith('{') && line.endsWith('}')) { + try { lastJsonLine = JSON.parse(line); } catch { /* not the json line */ } + } + } + }); + + proc.stderr.on('data', (chunk) => { stderrBuf += chunk.toString(); }); + + proc.on('error', (err) => reject(new Error(`Failed to spawn yt-dlp: ${err.message}`))); + + proc.on('close', (code) => { + if (code === 0) { + resolve(lastJsonLine || {}); + } else { + const err = new Error(friendlyError(stderrBuf)); + err.stderr = stderrBuf; + err.code = code; + reject(err); + } + }); + }); +} + +export const youtubeImportWorker = async (job) => { + const { assetId, url } = job.data; + + // Each job gets its own temp directory so concurrent jobs (if we ever bump + // concurrency above 1) can't clobber each other's intermediate files. + const workDir = await mkdtemp(join(tmpdir(), `yt-${job.id}-`)); + const outputTemplate = join(workDir, `${assetId}.%(ext)s`); + + try { + console.log(`[youtube] Asset ${assetId}: importing ${url}`); + await job.updateProgress(2); + + // yt-dlp does the work; progress 5..60 covers the download. + const meta = await runYtDlp({ + url, + outputTemplate, + onProgress: async (pct) => { + const mapped = 5 + Math.floor(pct * 0.55); + try { await job.updateProgress(mapped); } catch { /* ignore */ } + }, + }); + + await job.updateProgress(62); + + // Find the resulting MP4 — yt-dlp's --merge-output-format ensures .mp4 + // but we scan the dir defensively in case the format string changes. + const files = await readdir(workDir); + const mp4 = files.find((f) => f.endsWith('.mp4')); + if (!mp4) { + throw new Error(`yt-dlp produced no .mp4 in ${workDir} (got ${files.join(', ') || 'nothing'})`); + } + const localPath = join(workDir, mp4); + + const { size: fileSize } = await stat(localPath); + if (fileSize < 4096) { + throw new Error(`Downloaded file is suspiciously small (${fileSize} bytes) — aborting before upload.`); + } + + // ffprobe the file ourselves — yt-dlp's metadata is sometimes missing or + // wrong (especially fps), and we already trust ffprobe everywhere else. + let mediaInfo = {}; + try { + mediaInfo = await getMediaInfo(localPath); + } catch (err) { + console.warn(`[youtube] getMediaInfo failed for ${assetId}: ${err.message}`); + } + + const videoId = meta.id || mp4.replace(/\..+$/, '').replace(/^.*-/, ''); + const sanitized = sanitizeTitle(meta.title || meta.fulltitle, videoId); + const filename = `${sanitized}.mp4`; + const originalKey = `originals/${assetId}/${filename}`; + + await job.updateProgress(70); + console.log(`[youtube] Uploading ${localPath} → s3://${S3_BUCKET}/${originalKey} (${fileSize} bytes)`); + await uploadToS3(S3_BUCKET, originalKey, localPath); + + await job.updateProgress(90); + + // Backfill the asset row with the real title + S3 key + ffprobe metadata, + // then flip to 'processing' so the rest of the UI treats it like any + // freshly-uploaded asset. + await query( + `UPDATE assets + SET filename = $1, + display_name = $2, + original_s3_key = $3, + codec = COALESCE($4, codec), + resolution = COALESCE($5, resolution), + fps = COALESCE($6, fps), + duration_ms = COALESCE($7, duration_ms), + file_size = COALESCE($8, file_size), + status = 'processing', + updated_at = NOW() + WHERE id = $9`, + [ + filename, + meta.title || meta.fulltitle || filename, + originalKey, + mediaInfo.codec ?? null, + mediaInfo.resolution ?? null, + mediaInfo.fps ?? null, + mediaInfo.durationMs ?? null, + mediaInfo.fileSizeBytes ?? fileSize, + assetId, + ] + ); + + // Hand off to the proxy queue — identical payload shape to upload.js so + // the proxy worker doesn't need to know this came from an import. + await proxyQueue.add('generate', { + assetId, + inputKey: originalKey, + outputKey: `proxies/${assetId}.mp4`, + }); + + console.log(`[youtube] Asset ${assetId} imported (${meta.title || 'untitled'}); proxy job queued`); + await job.updateProgress(100); + return { assetId, originalKey }; + } catch (error) { + console.error(`[youtube] Import failed for asset ${assetId}:`, error.message); + await query( + `UPDATE assets SET status = 'error', updated_at = NOW() WHERE id = $1`, + [assetId] + ); + throw error; + } finally { + await rm(workDir, { recursive: true, force: true }).catch(() => {}); + } +};