From fdec2e307d89901d8be507898fbce50d641f748c Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Fri, 29 May 2026 04:00:10 +0000 Subject: [PATCH] feat(worker): capability-routed GPU worker pool + per-node job attribution WORKER_QUEUES env lets a worker subscribe to a subset of queues. Deploy one GPU-pinned container per card: heavy encodes (proxy/conform/trim) on Tesla P4 (zampp1) + L4 (zampp2) via NVENC; light jobs (thumbnail/filmstrip) on the 2x Quadro P400 (zampp1). BullMQ competing-consumers distribute across nodes. RUN_PROMOTION gates the growing-files scanner to one worker. Each worker stamps WORKER_LABEL onto job data so the Jobs UI Node column shows which node/GPU ran each job. Redis/DB/S3 for the zampp2 worker come from its .env (pointed at zampp1). Co-Authored-By: Claude Opus 4.8 --- docker-compose.worker.yml | 25 ++++++++++++++++ docker-compose.yml | 56 ++++++++++++++++++++++++++++++++++-- services/worker/src/index.js | 42 ++++++++++++++++++++------- 3 files changed, 111 insertions(+), 12 deletions(-) diff --git a/docker-compose.worker.yml b/docker-compose.worker.yml index ebb4222..629516e 100644 --- a/docker-compose.worker.yml +++ b/docker-compose.worker.yml @@ -100,6 +100,31 @@ services: networks: - wild-dragon-worker + # worker-l4: HEAVY tier (proxy/conform/trim) on the L4 (NVENC). Talks to + # zampp1's Redis/Postgres/S3 over the LAN (.200). No promotion scanner here. + worker-l4: + build: + context: ./services/worker + dockerfile: Dockerfile.gpu + image: wild-dragon-worker-gpu:latest + runtime: nvidia + restart: unless-stopped + environment: + REDIS_URL: ${REDIS_URL} + DATABASE_URL: ${DATABASE_URL} + S3_ENDPOINT: ${S3_ENDPOINT} + S3_BUCKET: ${S3_BUCKET} + S3_ACCESS_KEY: ${S3_ACCESS_KEY} + S3_SECRET_KEY: ${S3_SECRET_KEY} + S3_REGION: ${S3_REGION:-us-east-1} + WORKER_QUEUES: proxy,conform,trim + PROXY_CONCURRENCY: "3" + NVIDIA_VISIBLE_DEVICES: GPU-13acf439-8bf4-a5e0-7804-c1071bca547a + WORKER_LABEL: "zampp2 / L4" + NVIDIA_DRIVER_CAPABILITIES: video,compute,utility + networks: + - wild-dragon-worker + networks: wild-dragon-worker: driver: bridge diff --git a/docker-compose.yml b/docker-compose.yml index fcb12b2..5733471 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -95,8 +95,15 @@ services: networks: - wild-dragon - worker: - build: ./services/worker + # ── GPU worker pool (capability-routed) ────────────────────────────── + # worker-p4: HEAVY tier (proxy/conform/trim) on the Tesla P4 (NVENC). + # Also runs the promotion scanner (RUN_PROMOTION) — exactly one worker must. + worker-p4: + build: + context: ./services/worker + dockerfile: Dockerfile.gpu + image: wild-dragon-worker-gpu:latest + runtime: nvidia depends_on: - queue - db @@ -109,11 +116,56 @@ services: S3_SECRET_KEY: ${S3_SECRET_KEY} S3_REGION: ${S3_REGION:-us-east-1} GROWING_PATH: /growing + WORKER_QUEUES: proxy,conform,trim + RUN_PROMOTION: "true" + PROXY_CONCURRENCY: "2" + NVIDIA_VISIBLE_DEVICES: GPU-79afca3e-2ab2-a6ea-1c44-706c1f0a26d6 + WORKER_LABEL: "zampp1 / Tesla P4" + NVIDIA_DRIVER_CAPABILITIES: video,compute,utility volumes: - /mnt/NVME/MAM/wild-dragon-growing:/growing networks: - wild-dragon + # worker-p400a/b: LIGHT tier (thumbnail/filmstrip) on the two Quadro P400s. + worker-p400a: + image: wild-dragon-worker-gpu:latest + runtime: nvidia + depends_on: [queue, db, worker-p4] + environment: + REDIS_URL: ${REDIS_URL} + DATABASE_URL: ${DATABASE_URL} + S3_ENDPOINT: ${S3_ENDPOINT} + S3_BUCKET: ${S3_BUCKET} + S3_ACCESS_KEY: ${S3_ACCESS_KEY} + S3_SECRET_KEY: ${S3_SECRET_KEY} + S3_REGION: ${S3_REGION:-us-east-1} + WORKER_QUEUES: thumbnail,filmstrip + NVIDIA_VISIBLE_DEVICES: GPU-331c53ea-2ed9-0007-e364-c1451775948f + WORKER_LABEL: "zampp1 / P400 #1" + NVIDIA_DRIVER_CAPABILITIES: video,compute,utility + networks: + - wild-dragon + + worker-p400b: + image: wild-dragon-worker-gpu:latest + runtime: nvidia + depends_on: [queue, db, worker-p4] + environment: + REDIS_URL: ${REDIS_URL} + DATABASE_URL: ${DATABASE_URL} + S3_ENDPOINT: ${S3_ENDPOINT} + S3_BUCKET: ${S3_BUCKET} + S3_ACCESS_KEY: ${S3_ACCESS_KEY} + S3_SECRET_KEY: ${S3_SECRET_KEY} + S3_REGION: ${S3_REGION:-us-east-1} + WORKER_QUEUES: thumbnail,filmstrip + NVIDIA_VISIBLE_DEVICES: GPU-b514a592-9077-44bd-d9e8-9efa0591ef88 + WORKER_LABEL: "zampp1 / P400 #2" + NVIDIA_DRIVER_CAPABILITIES: video,compute,utility + networks: + - wild-dragon + web-ui: build: ./services/web-ui ports: diff --git a/services/worker/src/index.js b/services/worker/src/index.js index c6f58f6..23b6cb2 100644 --- a/services/worker/src/index.js +++ b/services/worker/src/index.js @@ -19,8 +19,17 @@ const parseRedisUrl = (url) => { const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379'); +// Human-readable node/GPU label stamped onto each job so the Jobs UI can show +// which worker ran it (UI reads metadata.node). Set per container via env. +const WORKER_LABEL = process.env.WORKER_LABEL || process.env.HOSTNAME || 'worker'; + const createWorker = (queueName, handler, overrides = {}) => { - const worker = new Worker(queueName, handler, { + // Stamp node attribution into the job data on pickup (persists to Redis). + const wrapped = async (job) => { + try { await job.updateData({ ...job.data, node: WORKER_LABEL }); } catch (_) {} + return handler(job); + }; + const worker = new Worker(queueName, wrapped, { connection: redisOptions, // Stall detection: if a worker dies mid-job, BullMQ moves it back to wait // after stalledInterval. Without this a crashed run sits in active forever. @@ -62,18 +71,27 @@ const FILMSTRIP_CONCURRENCY = parseInt(process.env.FILMSTRIP_CONCURRENCY || '2 const CONFORM_CONCURRENCY = parseInt(process.env.CONFORM_CONCURRENCY || '1', 10); const TRIM_CONCURRENCY = parseInt(process.env.TRIM_CONCURRENCY || '4', 10); +// Capability routing: a worker only subscribes to the queues named in +// WORKER_QUEUES (comma-separated). Unset = all queues (back-compat). This lets +// us pin heavy encodes (proxy/conform/trim) to strong GPUs and light jobs +// (thumbnail/filmstrip) to weak ones, each container bound to one GPU. +const _wq = (process.env.WORKER_QUEUES || '').trim(); +const _enabled = _wq ? new Set(_wq.split(',').map(x => x.trim()).filter(Boolean)) : null; +const want = (q) => !_enabled || _enabled.has(q); + const workers = [ - createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }), - createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }), - createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }), - createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }), - createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }), - createWorker('import', youtubeImportWorker, { + want('proxy') && createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }), + want('thumbnail') && createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }), + want('filmstrip') && createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }), + want('conform') && createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }), + want('trim') && createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }), + want('import') && createWorker('import', youtubeImportWorker, { concurrency: 1, lockDuration: 10 * 60 * 1000, lockRenewTime: 60000, }), -]; +].filter(Boolean); +console.log(`WORKER_QUEUES=${_wq || '(all)'}`); // Filmstrip queue singleton — used by thumbnail worker to enqueue filmstrip jobs export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions }); @@ -82,7 +100,11 @@ console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCU // BUG FIX #4: startPromotionWorker() now returns a shutdown function that // clears the poll intervals and closes the promotion proxyQueue singleton. -const stopPromotionWorker = startPromotionWorker(); +// Promotion (growing-files -> S3) is a polling SCAN, not a queue consumer. +// With multiple worker containers it must run on exactly one, or every node +// races the same files. Gate behind RUN_PROMOTION (set true on a single worker). +const stopPromotionWorker = (process.env.RUN_PROMOTION === 'true') ? startPromotionWorker() : null; +if (process.env.RUN_PROMOTION === 'true') console.log('[promotion] scanner ENABLED on this worker'); console.log('Wild Dragon Worker Service started'); console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`); @@ -107,7 +129,7 @@ process.on('SIGTERM', async () => { youtubeProxyQueue.close().catch(() => {}), filmstripQueue.close().catch(() => {}), // BUG FIX #4: Stop the promotion worker intervals and close its proxyQueue - stopPromotionWorker(), + stopPromotionWorker ? stopPromotionWorker() : Promise.resolve(), ]); console.log('All workers and queues closed');