feat(ingest): YouTube importer — paste link, asset travels normal pipeline
Adds Ingest → YouTube. UI takes a URL + project, API enqueues a BullMQ
"import" job, worker shells out to yt-dlp, lands the MP4 in S3 at the
same originals/{assetId}/... path uploads use, then hands off to the
existing proxy queue. Imported assets share one lifecycle with uploads
from that point on.
Worker container picks up yt-dlp + python3 (apk on alpine, apt on the
GPU variant). The new 'import' queue is registered in jobs.js so it
appears in the Jobs SSE stream and retry/delete work for free.
Spec: docs/superpowers/specs/2026-05-23-youtube-importer-design.md
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
7a2710dc9a
commit
9ad88e4df4
12 changed files with 553 additions and 8 deletions
15
services/mam-api/src/db/migrations/011-youtube-import.sql
Normal file
15
services/mam-api/src/db/migrations/011-youtube-import.sql
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
-- 2026-05: YouTube importer — new job type + remember source URL on assets.
|
||||
--
|
||||
-- Job type enum gains 'youtube_import' so the Jobs screen can show imports
|
||||
-- alongside proxy / thumbnail / conform. Assets gain source_url so an
|
||||
-- imported asset remembers where it came from (used by the Asset Detail
|
||||
-- page and, later, dedup checks).
|
||||
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'youtube_import' AND enumtypid = 'job_type'::regtype) THEN
|
||||
ALTER TYPE job_type ADD VALUE 'youtube_import';
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
ALTER TABLE assets ADD COLUMN IF NOT EXISTS source_url TEXT;
|
||||
|
|
@ -30,6 +30,7 @@ import sdkRouter from './routes/sdk.js';
|
|||
import schedulesRouter from './routes/schedules.js';
|
||||
import metricsRouter from './routes/metrics.js';
|
||||
import commentsRouter from './routes/comments.js';
|
||||
import importsRouter from './routes/imports.js';
|
||||
import { startSchedulerLoop } from './scheduler.js';
|
||||
|
||||
const app = express();
|
||||
|
|
@ -83,6 +84,7 @@ app.use('/api/v1/sdk', sdkRouter);
|
|||
app.use('/api/v1/schedules', schedulesRouter);
|
||||
app.use('/api/v1/metrics', metricsRouter);
|
||||
app.use('/api/v1/assets/:assetId/comments', commentsRouter);
|
||||
app.use('/api/v1/imports', importsRouter);
|
||||
|
||||
// ── Error handler ─────────────────────────────────────────────────────────────
|
||||
app.use(errorHandler);
|
||||
|
|
|
|||
96
services/mam-api/src/routes/imports.js
Normal file
96
services/mam-api/src/routes/imports.js
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
// External media imports — currently YouTube only.
|
||||
//
|
||||
// The flow mirrors upload.js: create the asset row up front with a placeholder
|
||||
// filename (the worker fills in the real title once yt-dlp prints metadata),
|
||||
// then enqueue a BullMQ job. The worker downloads, lands the file in S3 at the
|
||||
// same originals/{assetId}/... path uploads use, and hands off to the existing
|
||||
// proxy queue — so an imported asset travels the same lifecycle as any upload.
|
||||
|
||||
import express from 'express';
|
||||
import { Queue } from 'bullmq';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import pool from '../db/pool.js';
|
||||
import { requireAuth } from '../middleware/auth.js';
|
||||
|
||||
const router = express.Router();
|
||||
router.use(requireAuth);
|
||||
|
||||
const parseRedisUrl = (url) => {
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
|
||||
} catch {
|
||||
return { host: 'localhost', port: 6379 };
|
||||
}
|
||||
};
|
||||
|
||||
const importQueue = new Queue('import', {
|
||||
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
||||
});
|
||||
|
||||
// Match the same three forms the client UI validates against. Server is the
|
||||
// authoritative check — never trust the client to have validated.
|
||||
const YT_PATTERNS = [
|
||||
/^https?:\/\/(?:www\.|m\.)?youtube\.com\/watch\?[^ ]*v=[A-Za-z0-9_-]{11}/i,
|
||||
/^https?:\/\/youtu\.be\/[A-Za-z0-9_-]{11}/i,
|
||||
/^https?:\/\/(?:www\.)?youtube\.com\/shorts\/[A-Za-z0-9_-]{11}/i,
|
||||
];
|
||||
|
||||
function isYouTubeUrl(url) {
|
||||
return typeof url === 'string' && YT_PATTERNS.some((re) => re.test(url));
|
||||
}
|
||||
|
||||
// POST /api/v1/imports/youtube — body { url, projectId, binId? }
|
||||
router.post('/youtube', async (req, res, next) => {
|
||||
try {
|
||||
const { url, projectId, binId } = req.body || {};
|
||||
|
||||
if (!url || !projectId) {
|
||||
return res.status(400).json({ error: 'url and projectId are required' });
|
||||
}
|
||||
if (!isYouTubeUrl(url)) {
|
||||
return res.status(400).json({ error: 'Invalid YouTube URL' });
|
||||
}
|
||||
// A playlist URL has `list=…` — yt-dlp's --no-playlist would still grab
|
||||
// the single video, but the operator probably meant "import the list" and
|
||||
// we don't support that yet. Reject so the intent is explicit.
|
||||
if (/[?&]list=/i.test(url)) {
|
||||
return res.status(400).json({ error: "Playlists aren't supported yet" });
|
||||
}
|
||||
|
||||
const projCheck = await pool.query('SELECT id FROM projects WHERE id = $1', [projectId]);
|
||||
if (projCheck.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Project not found' });
|
||||
}
|
||||
|
||||
const assetId = uuidv4();
|
||||
|
||||
// Placeholder filename/display_name — the worker overwrites both once
|
||||
// yt-dlp resolves the video title (usually within a second or two).
|
||||
await pool.query(
|
||||
`INSERT INTO assets (
|
||||
id, project_id, bin_id, filename, display_name, status,
|
||||
media_type, original_s3_key, source_url, created_at, updated_at
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $4, 'ingesting', 'video', NULL, $5, NOW(), NOW())`,
|
||||
[assetId, projectId, binId || null, url, url]
|
||||
);
|
||||
|
||||
const bullJob = await importQueue.add('youtube', {
|
||||
assetId,
|
||||
url,
|
||||
// Surface the URL in the Jobs screen until the worker fills in the title.
|
||||
assetName: url,
|
||||
});
|
||||
|
||||
res.status(202).json({
|
||||
assetId,
|
||||
jobId: `import:${bullJob.id}`,
|
||||
status: 'queued',
|
||||
});
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
export default router;
|
||||
|
|
@ -21,11 +21,13 @@ const redisConn = parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379');
|
|||
const proxyQueue = new Queue('proxy', { connection: redisConn });
|
||||
const thumbnailQueue = new Queue('thumbnail', { connection: redisConn });
|
||||
const conformQueue = new Queue('conform', { connection: redisConn });
|
||||
const importQueue = new Queue('import', { connection: redisConn });
|
||||
|
||||
const QUEUES = [
|
||||
{ queue: proxyQueue, type: 'proxy' },
|
||||
{ queue: thumbnailQueue, type: 'thumbnail' },
|
||||
{ queue: conformQueue, type: 'conform' },
|
||||
{ queue: importQueue, type: 'import' },
|
||||
];
|
||||
|
||||
// BullMQ state → API status mapping
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ function App() {
|
|||
library: ['Library'], projects: ['Projects'],
|
||||
upload: ['Ingest', 'Upload'], recorders: ['Ingest', 'Recorders'],
|
||||
schedule: ['Ingest', 'Schedule'],
|
||||
youtube: ['Ingest', 'YouTube'],
|
||||
capture: ['Ingest', 'Capture'], monitors: ['Ingest', 'Monitors'],
|
||||
jobs: ['Jobs'], editor: ['Editor'],
|
||||
users: ['Admin', 'Users & Groups'], tokens: ['Admin', 'Tokens'],
|
||||
|
|
@ -74,6 +75,7 @@ function App() {
|
|||
case 'upload': content = <Upload navigate={navigate} />; break;
|
||||
case 'recorders': content = <Recorders navigate={navigate} onNew={() => setShowNewRecorder(true)} />; break;
|
||||
case 'schedule': content = <Schedule navigate={navigate} />; break;
|
||||
case 'youtube': content = <YouTubeImport navigate={navigate} />; break;
|
||||
case 'capture': content = <Capture navigate={navigate} />; break;
|
||||
case 'monitors': content = <Monitors navigate={navigate} />; break;
|
||||
case 'jobs': content = <Jobs navigate={navigate} />; break;
|
||||
|
|
|
|||
|
|
@ -181,6 +181,198 @@ function Upload({ navigate }) {
|
|||
);
|
||||
}
|
||||
|
||||
/* ===== YouTube importer ===== */
|
||||
// Accept the same three URL shapes the API validates against.
|
||||
const _YT_PATTERNS = [
|
||||
/^https?:\/\/(?:www\.|m\.)?youtube\.com\/watch\?[^ ]*v=[A-Za-z0-9_-]{11}/i,
|
||||
/^https?:\/\/youtu\.be\/[A-Za-z0-9_-]{11}/i,
|
||||
/^https?:\/\/(?:www\.)?youtube\.com\/shorts\/[A-Za-z0-9_-]{11}/i,
|
||||
];
|
||||
function _looksLikeYouTube(s) {
|
||||
return typeof s === 'string' && _YT_PATTERNS.some(re => re.test(s.trim()));
|
||||
}
|
||||
|
||||
function YouTubeImport({ navigate }) {
|
||||
const PROJECTS = window.ZAMPP_DATA?.PROJECTS || [];
|
||||
const [projectId, setProjectId] = React.useState(PROJECTS[0]?.id || '');
|
||||
const [url, setUrl] = React.useState('');
|
||||
const [queue, setQueue] = React.useState([]); // { id, url, status, progress, title, error, assetId, jobId }
|
||||
const [submitting, setSubmitting] = React.useState(false);
|
||||
|
||||
const valid = _looksLikeYouTube(url);
|
||||
|
||||
const updateRow = React.useCallback((id, patch) => {
|
||||
setQueue(prev => prev.map(r => r.id === id ? { ...r, ...patch } : r));
|
||||
}, []);
|
||||
|
||||
// Poll the asset row to pick up the title once yt-dlp resolves it, and the
|
||||
// proxy job's progress so the queue row reflects the full lifecycle, not
|
||||
// just the import step.
|
||||
const pollRow = React.useCallback((row) => {
|
||||
if (!row.assetId) return;
|
||||
let stopped = false;
|
||||
const tick = async () => {
|
||||
if (stopped) return;
|
||||
try {
|
||||
const asset = await window.ZAMPP_API.fetch('/assets/' + row.assetId);
|
||||
const patch = {};
|
||||
if (asset.display_name && asset.display_name !== row.url) patch.title = asset.display_name;
|
||||
if (asset.status === 'ready') {
|
||||
patch.status = 'done';
|
||||
patch.progress = 100;
|
||||
} else if (asset.status === 'error') {
|
||||
patch.status = 'error';
|
||||
patch.error = patch.error || 'Import failed — check the Jobs screen for details.';
|
||||
} else if (asset.status === 'processing') {
|
||||
patch.status = 'processing';
|
||||
}
|
||||
if (Object.keys(patch).length) updateRow(row.id, patch);
|
||||
if (asset.status === 'ready' || asset.status === 'error') return;
|
||||
} catch { /* ignore */ }
|
||||
setTimeout(tick, 3000);
|
||||
};
|
||||
tick();
|
||||
return () => { stopped = true; };
|
||||
}, [updateRow]);
|
||||
|
||||
const submit = React.useCallback(async () => {
|
||||
if (!valid || !projectId || submitting) return;
|
||||
setSubmitting(true);
|
||||
const rowId = Date.now();
|
||||
const row = {
|
||||
id: rowId,
|
||||
url: url.trim(),
|
||||
status: 'queued',
|
||||
progress: 0,
|
||||
title: '',
|
||||
error: null,
|
||||
assetId: null,
|
||||
jobId: null,
|
||||
};
|
||||
setQueue(prev => [row, ...prev]);
|
||||
try {
|
||||
const res = await window.ZAMPP_API.fetch('/imports/youtube', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ url: row.url, projectId }),
|
||||
});
|
||||
updateRow(rowId, { assetId: res.assetId, jobId: res.jobId, status: 'downloading' });
|
||||
pollRow({ ...row, assetId: res.assetId, jobId: res.jobId });
|
||||
setUrl('');
|
||||
} catch (e) {
|
||||
updateRow(rowId, { status: 'error', error: e.message || 'Failed to start import' });
|
||||
} finally {
|
||||
setSubmitting(false);
|
||||
}
|
||||
}, [valid, projectId, submitting, url, updateRow, pollRow]);
|
||||
|
||||
return (
|
||||
<div className="page">
|
||||
<div className="page-header">
|
||||
<h1>YouTube</h1>
|
||||
<span className="subtitle">Paste a link — we download and import the best available MP4.</span>
|
||||
</div>
|
||||
<div className="page-body">
|
||||
<div style={{ display: 'grid', gridTemplateColumns: '1fr 1fr', gap: 12, marginBottom: 16 }}>
|
||||
<div>
|
||||
<label className="field-label">Project</label>
|
||||
<select className="field-input" value={projectId} onChange={e => setProjectId(e.target.value)}
|
||||
style={{ appearance: 'auto' }}>
|
||||
{PROJECTS.length === 0
|
||||
? <option value="">No projects</option>
|
||||
: PROJECTS.map(p => <option key={p.id} value={p.id}>{p.name}</option>)}
|
||||
</select>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="field" style={{ marginBottom: 8 }}>
|
||||
<label className="field-label">YouTube URL</label>
|
||||
<div style={{ display: 'flex', gap: 8 }}>
|
||||
<input
|
||||
className="field-input mono"
|
||||
value={url}
|
||||
onChange={e => setUrl(e.target.value)}
|
||||
onKeyDown={e => { if (e.key === 'Enter' && valid) submit(); }}
|
||||
placeholder="https://www.youtube.com/watch?v=… or https://youtu.be/…"
|
||||
style={{ flex: 1 }}
|
||||
autoFocus
|
||||
/>
|
||||
<button
|
||||
className="btn primary"
|
||||
onClick={submit}
|
||||
disabled={!valid || !projectId || submitting}
|
||||
title={!valid ? 'Paste a YouTube URL (youtube.com/watch, youtu.be, or shorts)' : ''}
|
||||
>
|
||||
<Icon name="download" />Import
|
||||
</button>
|
||||
</div>
|
||||
{url && !valid && (
|
||||
<div style={{ fontSize: 11.5, color: 'var(--danger)', marginTop: 4 }}>
|
||||
That doesn't look like a YouTube URL.
|
||||
</div>
|
||||
)}
|
||||
<div style={{ fontSize: 11.5, color: 'var(--text-3)', marginTop: 6 }}>
|
||||
Only import videos you have rights to use. Private, age-gated, and members-only videos are not supported.
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{queue.length > 0 && (
|
||||
<div style={{ marginTop: 20 }}>
|
||||
<div style={{ fontSize: 13, fontWeight: 600, marginBottom: 12, display: 'flex', alignItems: 'center', gap: 8 }}>
|
||||
Queue <span className="badge neutral">{queue.length}</span>
|
||||
<span style={{ flex: 1 }} />
|
||||
<button className="btn ghost sm" onClick={() => setQueue(q => q.filter(r => r.status !== 'done' && r.status !== 'error'))}>
|
||||
Clear finished
|
||||
</button>
|
||||
</div>
|
||||
<div className="panel">
|
||||
{queue.map(r => {
|
||||
const statusColor =
|
||||
r.status === 'done' ? 'var(--success)' :
|
||||
r.status === 'error' ? 'var(--danger)' : 'var(--text-3)';
|
||||
return (
|
||||
<div key={r.id} className="upload-row">
|
||||
<Icon name="link" size={16} style={{ color: 'var(--text-3)' }} />
|
||||
<div style={{ flex: 1, minWidth: 0 }}>
|
||||
<div style={{ display: 'flex', gap: 8, alignItems: 'center' }}>
|
||||
<span style={{ fontWeight: 500, fontSize: 12.5, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>
|
||||
{r.title || r.url}
|
||||
</span>
|
||||
</div>
|
||||
{r.title && (
|
||||
<div className="muted mono" style={{ fontSize: 10.5, marginTop: 2, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }} title={r.url}>
|
||||
{r.url}
|
||||
</div>
|
||||
)}
|
||||
<div style={{ marginTop: 6, height: 4, background: 'var(--bg-3)', borderRadius: 99, overflow: 'hidden' }}>
|
||||
<div style={{
|
||||
width: (r.status === 'done' ? 100 : r.progress) + '%',
|
||||
height: '100%',
|
||||
background: r.status === 'done' ? 'var(--success)' : r.status === 'error' ? 'var(--danger)' : 'var(--accent)',
|
||||
transition: 'width 200ms',
|
||||
}} />
|
||||
</div>
|
||||
{r.error && (
|
||||
<div style={{ marginTop: 3, fontSize: 11, color: 'var(--danger)' }}>{r.error}</div>
|
||||
)}
|
||||
</div>
|
||||
<span className="mono" style={{ fontSize: 11.5, minWidth: 88, textAlign: 'right', color: statusColor }}>
|
||||
{r.status === 'done' ? '✓ done'
|
||||
: r.status === 'error' ? '✗ failed'
|
||||
: r.status === 'processing'? 'processing'
|
||||
: r.status === 'downloading' ? 'downloading'
|
||||
: 'queued'}
|
||||
</span>
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
/* ===== Live preview (HLS) ====================================
|
||||
Shared by RecorderRow + MonitorTile. The capture container writes
|
||||
HLS segments to /live/{assetId}/index.m3u8 (see capture-manager.js
|
||||
|
|
@ -1396,4 +1588,4 @@ function NewScheduleModal({ recorders, onClose, onCreated, defaultStart, default
|
|||
);
|
||||
}
|
||||
|
||||
Object.assign(window, { Upload, Recorders, Capture, Monitors, Schedule });
|
||||
Object.assign(window, { Upload, Recorders, Capture, Monitors, Schedule, YouTubeImport });
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ function Jobs({ navigate }) {
|
|||
|
||||
const normalizeJob = (j) => {
|
||||
const statusMap = { waiting: 'queued', active: 'running', completed: 'done', failed: 'failed' };
|
||||
const kindMap = { proxy: 'Proxy', thumbnail: 'Thumbnail', conform: 'Conform', transcode: 'Transcode' };
|
||||
const kindMap = { proxy: 'Proxy', thumbnail: 'Thumbnail', conform: 'Conform', transcode: 'Transcode', import: 'YouTube' };
|
||||
const meta = j.metadata || {};
|
||||
return {
|
||||
...j,
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ const NAV_TREE = [
|
|||
id: "ingest", label: "Ingest", icon: "upload", group: true,
|
||||
children: [
|
||||
{ id: "upload", label: "Upload", icon: "upload" },
|
||||
{ id: "youtube", label: "YouTube", icon: "download" },
|
||||
{ id: "recorders", label: "Recorders", icon: "record" },
|
||||
{ id: "schedule", label: "Schedule", icon: "jobs" },
|
||||
{ id: "capture", label: "Capture", icon: "capture" },
|
||||
|
|
@ -89,7 +90,7 @@ function Sidebar({ active, onNavigate, me }) {
|
|||
};
|
||||
|
||||
React.useEffect(() => {
|
||||
const ingestChildren = ["upload", "recorders", "schedule", "capture", "monitors"];
|
||||
const ingestChildren = ["upload", "youtube", "recorders", "schedule", "capture", "monitors"];
|
||||
if (ingestChildren.includes(active)) {
|
||||
setOpenGroups(prev => prev.has("ingest") ? prev : new Set([...prev, "ingest"]));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
FROM node:20-alpine
|
||||
RUN apk add --no-cache ffmpeg
|
||||
# yt-dlp powers the YouTube importer; python3 is its runtime dep.
|
||||
RUN apk add --no-cache ffmpeg yt-dlp python3
|
||||
WORKDIR /app
|
||||
COPY package*.json ./
|
||||
RUN npm install --omit=dev
|
||||
|
|
|
|||
|
|
@ -8,9 +8,10 @@
|
|||
|
||||
FROM nvcr.io/nvidia/cuda:12.3.1-base-ubuntu22.04
|
||||
|
||||
# Install Node.js 20 and ffmpeg (Ubuntu's ffmpeg includes h264_nvenc/hevc_nvenc)
|
||||
# Install Node.js 20, ffmpeg (Ubuntu's ffmpeg includes h264_nvenc/hevc_nvenc),
|
||||
# and yt-dlp (+ python3 runtime) for the YouTube importer.
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
curl ca-certificates ffmpeg \
|
||||
curl ca-certificates ffmpeg yt-dlp python3 \
|
||||
&& curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
|
||||
&& apt-get install -y --no-install-recommends nodejs \
|
||||
&& apt-get clean && rm -rf /var/lib/apt/lists/*
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { Worker } from 'bullmq';
|
|||
import { proxyWorker } from './workers/proxy.js';
|
||||
import { thumbnailWorker } from './workers/thumbnail.js';
|
||||
import { conformWorker } from './workers/conform.js';
|
||||
import { youtubeImportWorker } from './workers/youtube-import.js';
|
||||
import { startPromotionWorker } from './workers/promotion.js';
|
||||
|
||||
const parseRedisUrl = (url) => {
|
||||
|
|
@ -16,7 +17,7 @@ const parseRedisUrl = (url) => {
|
|||
|
||||
const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379');
|
||||
|
||||
const createWorker = (queueName, handler) => {
|
||||
const createWorker = (queueName, handler, overrides = {}) => {
|
||||
const worker = new Worker(queueName, handler, {
|
||||
connection: redisOptions,
|
||||
// Stall detection: if a worker dies mid-job, BullMQ moves it back to wait
|
||||
|
|
@ -25,6 +26,7 @@ const createWorker = (queueName, handler) => {
|
|||
maxStalledCount: 1,
|
||||
lockDuration: 60000,
|
||||
lockRenewTime: 15000,
|
||||
...overrides,
|
||||
});
|
||||
|
||||
worker.on('completed', (job) => {
|
||||
|
|
@ -50,13 +52,21 @@ const workers = [
|
|||
createWorker('proxy', proxyWorker),
|
||||
createWorker('thumbnail', thumbnailWorker),
|
||||
createWorker('conform', conformWorker),
|
||||
// YouTube imports: keep concurrency at 1 so we don't burn through rate
|
||||
// limits when several jobs land back-to-back. Lock window is longer than
|
||||
// the default because a long video download can run for minutes.
|
||||
createWorker('import', youtubeImportWorker, {
|
||||
concurrency: 1,
|
||||
lockDuration: 10 * 60 * 1000,
|
||||
lockRenewTime: 60000,
|
||||
}),
|
||||
];
|
||||
|
||||
startPromotionWorker();
|
||||
|
||||
console.log('Wild Dragon Worker Service started');
|
||||
console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`);
|
||||
console.log('Active queues: proxy, thumbnail, conform');
|
||||
console.log('Active queues: proxy, thumbnail, conform, import');
|
||||
console.log('Background scans: promotion (growing-files → S3)');
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
|
|
|
|||
223
services/worker/src/workers/youtube-import.js
Normal file
223
services/worker/src/workers/youtube-import.js
Normal file
|
|
@ -0,0 +1,223 @@
|
|||
// YouTube importer worker — shells out to yt-dlp, lands the resulting MP4 in
|
||||
// S3 at the same originals/{assetId}/<title>.mp4 path uploads use, then hands
|
||||
// off to the existing proxy queue. From that point an imported asset is
|
||||
// indistinguishable from an uploaded one.
|
||||
|
||||
import { spawn } from 'node:child_process';
|
||||
import { join } from 'node:path';
|
||||
import { mkdtemp, rm, stat, readdir } from 'node:fs/promises';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { Queue } from 'bullmq';
|
||||
import { query } from '../db/client.js';
|
||||
import { uploadToS3 } from '../s3/client.js';
|
||||
import { getMediaInfo } from '../ffmpeg/executor.js';
|
||||
|
||||
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
|
||||
|
||||
const parseRedisUrl = (url) => {
|
||||
const parsed = new URL(url);
|
||||
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
|
||||
};
|
||||
|
||||
// Hand off to the existing proxy queue once the original is in S3.
|
||||
const proxyQueue = new Queue('proxy', {
|
||||
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
||||
});
|
||||
|
||||
// Map yt-dlp stderr lines to short, operator-friendly messages. Anything that
|
||||
// doesn't match here falls back to the raw stderr (truncated).
|
||||
function friendlyError(stderr) {
|
||||
const s = stderr || '';
|
||||
if (/Private video/i.test(s)) return 'Private video — not supported.';
|
||||
if (/Sign in to confirm your age/i.test(s)) return 'Age-restricted video — not supported.';
|
||||
if (/members[- ]only/i.test(s)) return 'Members-only video — not supported.';
|
||||
if (/Video unavailable/i.test(s)) return 'Video unavailable or removed.';
|
||||
if (/not available in your country|geo[- ]?restricted/i.test(s)) return 'Video is geo-blocked from this region.';
|
||||
if (/HTTP Error 429/i.test(s)) return 'YouTube rate-limited the importer — try again later.';
|
||||
if (/Unable to extract|Unsupported URL/i.test(s)) return 'YouTube changed its API — worker image needs a rebuild.';
|
||||
|
||||
// Last-resort: the first ERROR: line from stderr, capped.
|
||||
const m = s.match(/ERROR:\s*([^\n]+)/i);
|
||||
const raw = (m ? m[1] : s).trim().slice(0, 300);
|
||||
return raw || 'yt-dlp failed with no error message';
|
||||
}
|
||||
|
||||
// Replace anything outside [A-Za-z0-9 ._-] with '-', collapse runs of
|
||||
// whitespace/dashes, trim, cap to 120 chars. The .mp4 extension is appended
|
||||
// by the caller. If the result is empty we fall back to the video ID.
|
||||
function sanitizeTitle(title, videoId) {
|
||||
if (!title || typeof title !== 'string') return `youtube-${videoId}`;
|
||||
let out = title
|
||||
.replace(/[^\w .\-]+/g, '-')
|
||||
.replace(/[-\s]+/g, ' ')
|
||||
.trim()
|
||||
.slice(0, 120);
|
||||
if (!out) out = `youtube-${videoId}`;
|
||||
return out;
|
||||
}
|
||||
|
||||
// Run yt-dlp with progress streaming. Returns the parsed --print-json line.
|
||||
// Throws if yt-dlp exits non-zero — the caller maps stderr to a friendly msg.
|
||||
async function runYtDlp({ url, outputTemplate, onProgress }) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const args = [
|
||||
'--no-playlist',
|
||||
'--no-warnings',
|
||||
'--restrict-filenames',
|
||||
'-f', "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4]/b",
|
||||
'--merge-output-format', 'mp4',
|
||||
'--print-json',
|
||||
'--newline',
|
||||
'-o', outputTemplate,
|
||||
url,
|
||||
];
|
||||
|
||||
const proc = spawn('yt-dlp', args);
|
||||
let stdoutBuf = '';
|
||||
let stderrBuf = '';
|
||||
let lastJsonLine = null;
|
||||
|
||||
proc.stdout.on('data', (chunk) => {
|
||||
stdoutBuf += chunk.toString();
|
||||
let nl;
|
||||
while ((nl = stdoutBuf.indexOf('\n')) !== -1) {
|
||||
const line = stdoutBuf.slice(0, nl);
|
||||
stdoutBuf = stdoutBuf.slice(nl + 1);
|
||||
|
||||
// [download] 42.3% of 53.21MiB at 4.21MiB/s ETA 00:07
|
||||
const m = line.match(/\[download\]\s+(\d+(?:\.\d+)?)%/);
|
||||
if (m && onProgress) onProgress(parseFloat(m[1]));
|
||||
|
||||
// --print-json emits one JSON line at the end of a successful download.
|
||||
if (line.startsWith('{') && line.endsWith('}')) {
|
||||
try { lastJsonLine = JSON.parse(line); } catch { /* not the json line */ }
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
proc.stderr.on('data', (chunk) => { stderrBuf += chunk.toString(); });
|
||||
|
||||
proc.on('error', (err) => reject(new Error(`Failed to spawn yt-dlp: ${err.message}`)));
|
||||
|
||||
proc.on('close', (code) => {
|
||||
if (code === 0) {
|
||||
resolve(lastJsonLine || {});
|
||||
} else {
|
||||
const err = new Error(friendlyError(stderrBuf));
|
||||
err.stderr = stderrBuf;
|
||||
err.code = code;
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export const youtubeImportWorker = async (job) => {
|
||||
const { assetId, url } = job.data;
|
||||
|
||||
// Each job gets its own temp directory so concurrent jobs (if we ever bump
|
||||
// concurrency above 1) can't clobber each other's intermediate files.
|
||||
const workDir = await mkdtemp(join(tmpdir(), `yt-${job.id}-`));
|
||||
const outputTemplate = join(workDir, `${assetId}.%(ext)s`);
|
||||
|
||||
try {
|
||||
console.log(`[youtube] Asset ${assetId}: importing ${url}`);
|
||||
await job.updateProgress(2);
|
||||
|
||||
// yt-dlp does the work; progress 5..60 covers the download.
|
||||
const meta = await runYtDlp({
|
||||
url,
|
||||
outputTemplate,
|
||||
onProgress: async (pct) => {
|
||||
const mapped = 5 + Math.floor(pct * 0.55);
|
||||
try { await job.updateProgress(mapped); } catch { /* ignore */ }
|
||||
},
|
||||
});
|
||||
|
||||
await job.updateProgress(62);
|
||||
|
||||
// Find the resulting MP4 — yt-dlp's --merge-output-format ensures .mp4
|
||||
// but we scan the dir defensively in case the format string changes.
|
||||
const files = await readdir(workDir);
|
||||
const mp4 = files.find((f) => f.endsWith('.mp4'));
|
||||
if (!mp4) {
|
||||
throw new Error(`yt-dlp produced no .mp4 in ${workDir} (got ${files.join(', ') || 'nothing'})`);
|
||||
}
|
||||
const localPath = join(workDir, mp4);
|
||||
|
||||
const { size: fileSize } = await stat(localPath);
|
||||
if (fileSize < 4096) {
|
||||
throw new Error(`Downloaded file is suspiciously small (${fileSize} bytes) — aborting before upload.`);
|
||||
}
|
||||
|
||||
// ffprobe the file ourselves — yt-dlp's metadata is sometimes missing or
|
||||
// wrong (especially fps), and we already trust ffprobe everywhere else.
|
||||
let mediaInfo = {};
|
||||
try {
|
||||
mediaInfo = await getMediaInfo(localPath);
|
||||
} catch (err) {
|
||||
console.warn(`[youtube] getMediaInfo failed for ${assetId}: ${err.message}`);
|
||||
}
|
||||
|
||||
const videoId = meta.id || mp4.replace(/\..+$/, '').replace(/^.*-/, '');
|
||||
const sanitized = sanitizeTitle(meta.title || meta.fulltitle, videoId);
|
||||
const filename = `${sanitized}.mp4`;
|
||||
const originalKey = `originals/${assetId}/${filename}`;
|
||||
|
||||
await job.updateProgress(70);
|
||||
console.log(`[youtube] Uploading ${localPath} → s3://${S3_BUCKET}/${originalKey} (${fileSize} bytes)`);
|
||||
await uploadToS3(S3_BUCKET, originalKey, localPath);
|
||||
|
||||
await job.updateProgress(90);
|
||||
|
||||
// Backfill the asset row with the real title + S3 key + ffprobe metadata,
|
||||
// then flip to 'processing' so the rest of the UI treats it like any
|
||||
// freshly-uploaded asset.
|
||||
await query(
|
||||
`UPDATE assets
|
||||
SET filename = $1,
|
||||
display_name = $2,
|
||||
original_s3_key = $3,
|
||||
codec = COALESCE($4, codec),
|
||||
resolution = COALESCE($5, resolution),
|
||||
fps = COALESCE($6, fps),
|
||||
duration_ms = COALESCE($7, duration_ms),
|
||||
file_size = COALESCE($8, file_size),
|
||||
status = 'processing',
|
||||
updated_at = NOW()
|
||||
WHERE id = $9`,
|
||||
[
|
||||
filename,
|
||||
meta.title || meta.fulltitle || filename,
|
||||
originalKey,
|
||||
mediaInfo.codec ?? null,
|
||||
mediaInfo.resolution ?? null,
|
||||
mediaInfo.fps ?? null,
|
||||
mediaInfo.durationMs ?? null,
|
||||
mediaInfo.fileSizeBytes ?? fileSize,
|
||||
assetId,
|
||||
]
|
||||
);
|
||||
|
||||
// Hand off to the proxy queue — identical payload shape to upload.js so
|
||||
// the proxy worker doesn't need to know this came from an import.
|
||||
await proxyQueue.add('generate', {
|
||||
assetId,
|
||||
inputKey: originalKey,
|
||||
outputKey: `proxies/${assetId}.mp4`,
|
||||
});
|
||||
|
||||
console.log(`[youtube] Asset ${assetId} imported (${meta.title || 'untitled'}); proxy job queued`);
|
||||
await job.updateProgress(100);
|
||||
return { assetId, originalKey };
|
||||
} catch (error) {
|
||||
console.error(`[youtube] Import failed for asset ${assetId}:`, error.message);
|
||||
await query(
|
||||
`UPDATE assets SET status = 'error', updated_at = NOW() WHERE id = $1`,
|
||||
[assetId]
|
||||
);
|
||||
throw error;
|
||||
} finally {
|
||||
await rm(workDir, { recursive: true, force: true }).catch(() => {});
|
||||
}
|
||||
};
|
||||
Loading…
Reference in a new issue