feat(worker): capability-routed GPU worker pool + per-node job attribution

WORKER_QUEUES env lets a worker subscribe to a subset of queues. Deploy one GPU-pinned container per card: heavy encodes (proxy/conform/trim) on Tesla P4 (zampp1) + L4 (zampp2) via NVENC; light jobs (thumbnail/filmstrip) on the 2x Quadro P400 (zampp1). BullMQ competing-consumers distribute across nodes. RUN_PROMOTION gates the growing-files scanner to one worker. Each worker stamps WORKER_LABEL onto job data so the Jobs UI Node column shows which node/GPU ran each job. Redis/DB/S3 for the zampp2 worker come from its .env (pointed at zampp1).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Zac Gaetano 2026-05-29 04:00:10 +00:00
parent 92b460f503
commit fdec2e307d
3 changed files with 111 additions and 12 deletions

View file

@ -100,6 +100,31 @@ services:
networks:
- wild-dragon-worker
# worker-l4: HEAVY tier (proxy/conform/trim) on the L4 (NVENC). Talks to
# zampp1's Redis/Postgres/S3 over the LAN (.200). No promotion scanner here.
worker-l4:
build:
context: ./services/worker
dockerfile: Dockerfile.gpu
image: wild-dragon-worker-gpu:latest
runtime: nvidia
restart: unless-stopped
environment:
REDIS_URL: ${REDIS_URL}
DATABASE_URL: ${DATABASE_URL}
S3_ENDPOINT: ${S3_ENDPOINT}
S3_BUCKET: ${S3_BUCKET}
S3_ACCESS_KEY: ${S3_ACCESS_KEY}
S3_SECRET_KEY: ${S3_SECRET_KEY}
S3_REGION: ${S3_REGION:-us-east-1}
WORKER_QUEUES: proxy,conform,trim
PROXY_CONCURRENCY: "3"
NVIDIA_VISIBLE_DEVICES: GPU-13acf439-8bf4-a5e0-7804-c1071bca547a
WORKER_LABEL: "zampp2 / L4"
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
networks:
- wild-dragon-worker
networks:
wild-dragon-worker:
driver: bridge

View file

@ -95,8 +95,15 @@ services:
networks:
- wild-dragon
worker:
build: ./services/worker
# ── GPU worker pool (capability-routed) ──────────────────────────────
# worker-p4: HEAVY tier (proxy/conform/trim) on the Tesla P4 (NVENC).
# Also runs the promotion scanner (RUN_PROMOTION) — exactly one worker must.
worker-p4:
build:
context: ./services/worker
dockerfile: Dockerfile.gpu
image: wild-dragon-worker-gpu:latest
runtime: nvidia
depends_on:
- queue
- db
@ -109,11 +116,56 @@ services:
S3_SECRET_KEY: ${S3_SECRET_KEY}
S3_REGION: ${S3_REGION:-us-east-1}
GROWING_PATH: /growing
WORKER_QUEUES: proxy,conform,trim
RUN_PROMOTION: "true"
PROXY_CONCURRENCY: "2"
NVIDIA_VISIBLE_DEVICES: GPU-79afca3e-2ab2-a6ea-1c44-706c1f0a26d6
WORKER_LABEL: "zampp1 / Tesla P4"
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
volumes:
- /mnt/NVME/MAM/wild-dragon-growing:/growing
networks:
- wild-dragon
# worker-p400a/b: LIGHT tier (thumbnail/filmstrip) on the two Quadro P400s.
worker-p400a:
image: wild-dragon-worker-gpu:latest
runtime: nvidia
depends_on: [queue, db, worker-p4]
environment:
REDIS_URL: ${REDIS_URL}
DATABASE_URL: ${DATABASE_URL}
S3_ENDPOINT: ${S3_ENDPOINT}
S3_BUCKET: ${S3_BUCKET}
S3_ACCESS_KEY: ${S3_ACCESS_KEY}
S3_SECRET_KEY: ${S3_SECRET_KEY}
S3_REGION: ${S3_REGION:-us-east-1}
WORKER_QUEUES: thumbnail,filmstrip
NVIDIA_VISIBLE_DEVICES: GPU-331c53ea-2ed9-0007-e364-c1451775948f
WORKER_LABEL: "zampp1 / P400 #1"
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
networks:
- wild-dragon
worker-p400b:
image: wild-dragon-worker-gpu:latest
runtime: nvidia
depends_on: [queue, db, worker-p4]
environment:
REDIS_URL: ${REDIS_URL}
DATABASE_URL: ${DATABASE_URL}
S3_ENDPOINT: ${S3_ENDPOINT}
S3_BUCKET: ${S3_BUCKET}
S3_ACCESS_KEY: ${S3_ACCESS_KEY}
S3_SECRET_KEY: ${S3_SECRET_KEY}
S3_REGION: ${S3_REGION:-us-east-1}
WORKER_QUEUES: thumbnail,filmstrip
NVIDIA_VISIBLE_DEVICES: GPU-b514a592-9077-44bd-d9e8-9efa0591ef88
WORKER_LABEL: "zampp1 / P400 #2"
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
networks:
- wild-dragon
web-ui:
build: ./services/web-ui
ports:

View file

@ -19,8 +19,17 @@ const parseRedisUrl = (url) => {
const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379');
// Human-readable node/GPU label stamped onto each job so the Jobs UI can show
// which worker ran it (UI reads metadata.node). Set per container via env.
const WORKER_LABEL = process.env.WORKER_LABEL || process.env.HOSTNAME || 'worker';
const createWorker = (queueName, handler, overrides = {}) => {
const worker = new Worker(queueName, handler, {
// Stamp node attribution into the job data on pickup (persists to Redis).
const wrapped = async (job) => {
try { await job.updateData({ ...job.data, node: WORKER_LABEL }); } catch (_) {}
return handler(job);
};
const worker = new Worker(queueName, wrapped, {
connection: redisOptions,
// Stall detection: if a worker dies mid-job, BullMQ moves it back to wait
// after stalledInterval. Without this a crashed run sits in active forever.
@ -62,18 +71,27 @@ const FILMSTRIP_CONCURRENCY = parseInt(process.env.FILMSTRIP_CONCURRENCY || '2
const CONFORM_CONCURRENCY = parseInt(process.env.CONFORM_CONCURRENCY || '1', 10);
const TRIM_CONCURRENCY = parseInt(process.env.TRIM_CONCURRENCY || '4', 10);
// Capability routing: a worker only subscribes to the queues named in
// WORKER_QUEUES (comma-separated). Unset = all queues (back-compat). This lets
// us pin heavy encodes (proxy/conform/trim) to strong GPUs and light jobs
// (thumbnail/filmstrip) to weak ones, each container bound to one GPU.
const _wq = (process.env.WORKER_QUEUES || '').trim();
const _enabled = _wq ? new Set(_wq.split(',').map(x => x.trim()).filter(Boolean)) : null;
const want = (q) => !_enabled || _enabled.has(q);
const workers = [
createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }),
createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }),
createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }),
createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }),
createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }),
createWorker('import', youtubeImportWorker, {
want('proxy') && createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }),
want('thumbnail') && createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }),
want('filmstrip') && createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }),
want('conform') && createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }),
want('trim') && createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }),
want('import') && createWorker('import', youtubeImportWorker, {
concurrency: 1,
lockDuration: 10 * 60 * 1000,
lockRenewTime: 60000,
}),
];
].filter(Boolean);
console.log(`WORKER_QUEUES=${_wq || '(all)'}`);
// Filmstrip queue singleton — used by thumbnail worker to enqueue filmstrip jobs
export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions });
@ -82,7 +100,11 @@ console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCU
// BUG FIX #4: startPromotionWorker() now returns a shutdown function that
// clears the poll intervals and closes the promotion proxyQueue singleton.
const stopPromotionWorker = startPromotionWorker();
// Promotion (growing-files -> S3) is a polling SCAN, not a queue consumer.
// With multiple worker containers it must run on exactly one, or every node
// races the same files. Gate behind RUN_PROMOTION (set true on a single worker).
const stopPromotionWorker = (process.env.RUN_PROMOTION === 'true') ? startPromotionWorker() : null;
if (process.env.RUN_PROMOTION === 'true') console.log('[promotion] scanner ENABLED on this worker');
console.log('Wild Dragon Worker Service started');
console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`);
@ -107,7 +129,7 @@ process.on('SIGTERM', async () => {
youtubeProxyQueue.close().catch(() => {}),
filmstripQueue.close().catch(() => {}),
// BUG FIX #4: Stop the promotion worker intervals and close its proxyQueue
stopPromotionWorker(),
stopPromotionWorker ? stopPromotionWorker() : Promise.resolve(),
]);
console.log('All workers and queues closed');