dragonflight/services/mam-api/src/routes/imports.js
Zac Gaetano 9ad88e4df4 feat(ingest): YouTube importer — paste link, asset travels normal pipeline
Adds Ingest → YouTube. UI takes a URL + project, API enqueues a BullMQ
"import" job, worker shells out to yt-dlp, lands the MP4 in S3 at the
same originals/{assetId}/... path uploads use, then hands off to the
existing proxy queue. Imported assets share one lifecycle with uploads
from that point on.

Worker container picks up yt-dlp + python3 (apk on alpine, apt on the
GPU variant). The new 'import' queue is registered in jobs.js so it
appears in the Jobs SSE stream and retry/delete work for free.

Spec: docs/superpowers/specs/2026-05-23-youtube-importer-design.md

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-23 16:05:41 -04:00

96 lines
3.3 KiB
JavaScript

// External media imports — currently YouTube only.
//
// The flow mirrors upload.js: create the asset row up front with a placeholder
// filename (the worker fills in the real title once yt-dlp prints metadata),
// then enqueue a BullMQ job. The worker downloads, lands the file in S3 at the
// same originals/{assetId}/... path uploads use, and hands off to the existing
// proxy queue — so an imported asset travels the same lifecycle as any upload.
import express from 'express';
import { Queue } from 'bullmq';
import { v4 as uuidv4 } from 'uuid';
import pool from '../db/pool.js';
import { requireAuth } from '../middleware/auth.js';
const router = express.Router();
router.use(requireAuth);
const parseRedisUrl = (url) => {
try {
const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) || 6379 };
} catch {
return { host: 'localhost', port: 6379 };
}
};
const importQueue = new Queue('import', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
// Match the same three forms the client UI validates against. Server is the
// authoritative check — never trust the client to have validated.
const YT_PATTERNS = [
/^https?:\/\/(?:www\.|m\.)?youtube\.com\/watch\?[^ ]*v=[A-Za-z0-9_-]{11}/i,
/^https?:\/\/youtu\.be\/[A-Za-z0-9_-]{11}/i,
/^https?:\/\/(?:www\.)?youtube\.com\/shorts\/[A-Za-z0-9_-]{11}/i,
];
function isYouTubeUrl(url) {
return typeof url === 'string' && YT_PATTERNS.some((re) => re.test(url));
}
// POST /api/v1/imports/youtube — body { url, projectId, binId? }
router.post('/youtube', async (req, res, next) => {
try {
const { url, projectId, binId } = req.body || {};
if (!url || !projectId) {
return res.status(400).json({ error: 'url and projectId are required' });
}
if (!isYouTubeUrl(url)) {
return res.status(400).json({ error: 'Invalid YouTube URL' });
}
// A playlist URL has `list=…` — yt-dlp's --no-playlist would still grab
// the single video, but the operator probably meant "import the list" and
// we don't support that yet. Reject so the intent is explicit.
if (/[?&]list=/i.test(url)) {
return res.status(400).json({ error: "Playlists aren't supported yet" });
}
const projCheck = await pool.query('SELECT id FROM projects WHERE id = $1', [projectId]);
if (projCheck.rows.length === 0) {
return res.status(404).json({ error: 'Project not found' });
}
const assetId = uuidv4();
// Placeholder filename/display_name — the worker overwrites both once
// yt-dlp resolves the video title (usually within a second or two).
await pool.query(
`INSERT INTO assets (
id, project_id, bin_id, filename, display_name, status,
media_type, original_s3_key, source_url, created_at, updated_at
)
VALUES ($1, $2, $3, $4, $4, 'ingesting', 'video', NULL, $5, NOW(), NOW())`,
[assetId, projectId, binId || null, url, url]
);
const bullJob = await importQueue.add('youtube', {
assetId,
url,
// Surface the URL in the Jobs screen until the worker fills in the title.
assetName: url,
});
res.status(202).json({
assetId,
jobId: `import:${bullJob.id}`,
status: 'queued',
});
} catch (err) {
next(err);
}
});
export default router;