Adds Ingest → YouTube. UI takes a URL + project, API enqueues a BullMQ
"import" job, worker shells out to yt-dlp, lands the MP4 in S3 at the
same originals/{assetId}/... path uploads use, then hands off to the
existing proxy queue. Imported assets share one lifecycle with uploads
from that point on.
Worker container picks up yt-dlp + python3 (apk on alpine, apt on the
GPU variant). The new 'import' queue is registered in jobs.js so it
appears in the Jobs SSE stream and retry/delete work for free.
Spec: docs/superpowers/specs/2026-05-23-youtube-importer-design.md
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
76 lines
2.4 KiB
JavaScript
76 lines
2.4 KiB
JavaScript
import 'dotenv/config';
|
|
import { Worker } from 'bullmq';
|
|
import { proxyWorker } from './workers/proxy.js';
|
|
import { thumbnailWorker } from './workers/thumbnail.js';
|
|
import { conformWorker } from './workers/conform.js';
|
|
import { youtubeImportWorker } from './workers/youtube-import.js';
|
|
import { startPromotionWorker } from './workers/promotion.js';
|
|
|
|
const parseRedisUrl = (url) => {
|
|
const parsed = new URL(url);
|
|
return {
|
|
host: parsed.hostname,
|
|
port: parseInt(parsed.port, 10),
|
|
password: parsed.password || undefined,
|
|
};
|
|
};
|
|
|
|
const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379');
|
|
|
|
const createWorker = (queueName, handler, overrides = {}) => {
|
|
const worker = new Worker(queueName, handler, {
|
|
connection: redisOptions,
|
|
// Stall detection: if a worker dies mid-job, BullMQ moves it back to wait
|
|
// after stalledInterval. Without this a crashed run sits in active forever.
|
|
stalledInterval: 30000,
|
|
maxStalledCount: 1,
|
|
lockDuration: 60000,
|
|
lockRenewTime: 15000,
|
|
...overrides,
|
|
});
|
|
|
|
worker.on('completed', (job) => {
|
|
console.log(`[${queueName}] Job ${job.id} completed`);
|
|
});
|
|
|
|
worker.on('failed', (job, err) => {
|
|
console.error(`[${queueName}] Job ${job.id} failed:`, err.message);
|
|
});
|
|
|
|
worker.on('stalled', (jobId) => {
|
|
console.warn(`[${queueName}] Job ${jobId} stalled — reclaimed`);
|
|
});
|
|
|
|
worker.on('progress', (job, progress) => {
|
|
console.log(`[${queueName}] Job ${job.id} progress:`, progress);
|
|
});
|
|
|
|
return worker;
|
|
};
|
|
|
|
const workers = [
|
|
createWorker('proxy', proxyWorker),
|
|
createWorker('thumbnail', thumbnailWorker),
|
|
createWorker('conform', conformWorker),
|
|
// YouTube imports: keep concurrency at 1 so we don't burn through rate
|
|
// limits when several jobs land back-to-back. Lock window is longer than
|
|
// the default because a long video download can run for minutes.
|
|
createWorker('import', youtubeImportWorker, {
|
|
concurrency: 1,
|
|
lockDuration: 10 * 60 * 1000,
|
|
lockRenewTime: 60000,
|
|
}),
|
|
];
|
|
|
|
startPromotionWorker();
|
|
|
|
console.log('Wild Dragon Worker Service started');
|
|
console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`);
|
|
console.log('Active queues: proxy, thumbnail, conform, import');
|
|
console.log('Background scans: promotion (growing-files → S3)');
|
|
|
|
process.on('SIGTERM', async () => {
|
|
console.log('SIGTERM received, shutting down...');
|
|
await Promise.all(workers.map(w => w.close()));
|
|
process.exit(0);
|
|
});
|