feat(worker): capability-routed GPU worker pool + per-node job attribution
WORKER_QUEUES env lets a worker subscribe to a subset of queues. Deploy one GPU-pinned container per card: heavy encodes (proxy/conform/trim) on Tesla P4 (zampp1) + L4 (zampp2) via NVENC; light jobs (thumbnail/filmstrip) on the 2x Quadro P400 (zampp1). BullMQ competing-consumers distribute across nodes. RUN_PROMOTION gates the growing-files scanner to one worker. Each worker stamps WORKER_LABEL onto job data so the Jobs UI Node column shows which node/GPU ran each job. Redis/DB/S3 for the zampp2 worker come from its .env (pointed at zampp1). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
92b460f503
commit
fdec2e307d
3 changed files with 111 additions and 12 deletions
|
|
@ -100,6 +100,31 @@ services:
|
|||
networks:
|
||||
- wild-dragon-worker
|
||||
|
||||
# worker-l4: HEAVY tier (proxy/conform/trim) on the L4 (NVENC). Talks to
|
||||
# zampp1's Redis/Postgres/S3 over the LAN (.200). No promotion scanner here.
|
||||
worker-l4:
|
||||
build:
|
||||
context: ./services/worker
|
||||
dockerfile: Dockerfile.gpu
|
||||
image: wild-dragon-worker-gpu:latest
|
||||
runtime: nvidia
|
||||
restart: unless-stopped
|
||||
environment:
|
||||
REDIS_URL: ${REDIS_URL}
|
||||
DATABASE_URL: ${DATABASE_URL}
|
||||
S3_ENDPOINT: ${S3_ENDPOINT}
|
||||
S3_BUCKET: ${S3_BUCKET}
|
||||
S3_ACCESS_KEY: ${S3_ACCESS_KEY}
|
||||
S3_SECRET_KEY: ${S3_SECRET_KEY}
|
||||
S3_REGION: ${S3_REGION:-us-east-1}
|
||||
WORKER_QUEUES: proxy,conform,trim
|
||||
PROXY_CONCURRENCY: "3"
|
||||
NVIDIA_VISIBLE_DEVICES: GPU-13acf439-8bf4-a5e0-7804-c1071bca547a
|
||||
WORKER_LABEL: "zampp2 / L4"
|
||||
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
|
||||
networks:
|
||||
- wild-dragon-worker
|
||||
|
||||
networks:
|
||||
wild-dragon-worker:
|
||||
driver: bridge
|
||||
|
|
|
|||
|
|
@ -95,8 +95,15 @@ services:
|
|||
networks:
|
||||
- wild-dragon
|
||||
|
||||
worker:
|
||||
build: ./services/worker
|
||||
# ── GPU worker pool (capability-routed) ──────────────────────────────
|
||||
# worker-p4: HEAVY tier (proxy/conform/trim) on the Tesla P4 (NVENC).
|
||||
# Also runs the promotion scanner (RUN_PROMOTION) — exactly one worker must.
|
||||
worker-p4:
|
||||
build:
|
||||
context: ./services/worker
|
||||
dockerfile: Dockerfile.gpu
|
||||
image: wild-dragon-worker-gpu:latest
|
||||
runtime: nvidia
|
||||
depends_on:
|
||||
- queue
|
||||
- db
|
||||
|
|
@ -109,11 +116,56 @@ services:
|
|||
S3_SECRET_KEY: ${S3_SECRET_KEY}
|
||||
S3_REGION: ${S3_REGION:-us-east-1}
|
||||
GROWING_PATH: /growing
|
||||
WORKER_QUEUES: proxy,conform,trim
|
||||
RUN_PROMOTION: "true"
|
||||
PROXY_CONCURRENCY: "2"
|
||||
NVIDIA_VISIBLE_DEVICES: GPU-79afca3e-2ab2-a6ea-1c44-706c1f0a26d6
|
||||
WORKER_LABEL: "zampp1 / Tesla P4"
|
||||
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
|
||||
volumes:
|
||||
- /mnt/NVME/MAM/wild-dragon-growing:/growing
|
||||
networks:
|
||||
- wild-dragon
|
||||
|
||||
# worker-p400a/b: LIGHT tier (thumbnail/filmstrip) on the two Quadro P400s.
|
||||
worker-p400a:
|
||||
image: wild-dragon-worker-gpu:latest
|
||||
runtime: nvidia
|
||||
depends_on: [queue, db, worker-p4]
|
||||
environment:
|
||||
REDIS_URL: ${REDIS_URL}
|
||||
DATABASE_URL: ${DATABASE_URL}
|
||||
S3_ENDPOINT: ${S3_ENDPOINT}
|
||||
S3_BUCKET: ${S3_BUCKET}
|
||||
S3_ACCESS_KEY: ${S3_ACCESS_KEY}
|
||||
S3_SECRET_KEY: ${S3_SECRET_KEY}
|
||||
S3_REGION: ${S3_REGION:-us-east-1}
|
||||
WORKER_QUEUES: thumbnail,filmstrip
|
||||
NVIDIA_VISIBLE_DEVICES: GPU-331c53ea-2ed9-0007-e364-c1451775948f
|
||||
WORKER_LABEL: "zampp1 / P400 #1"
|
||||
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
|
||||
networks:
|
||||
- wild-dragon
|
||||
|
||||
worker-p400b:
|
||||
image: wild-dragon-worker-gpu:latest
|
||||
runtime: nvidia
|
||||
depends_on: [queue, db, worker-p4]
|
||||
environment:
|
||||
REDIS_URL: ${REDIS_URL}
|
||||
DATABASE_URL: ${DATABASE_URL}
|
||||
S3_ENDPOINT: ${S3_ENDPOINT}
|
||||
S3_BUCKET: ${S3_BUCKET}
|
||||
S3_ACCESS_KEY: ${S3_ACCESS_KEY}
|
||||
S3_SECRET_KEY: ${S3_SECRET_KEY}
|
||||
S3_REGION: ${S3_REGION:-us-east-1}
|
||||
WORKER_QUEUES: thumbnail,filmstrip
|
||||
NVIDIA_VISIBLE_DEVICES: GPU-b514a592-9077-44bd-d9e8-9efa0591ef88
|
||||
WORKER_LABEL: "zampp1 / P400 #2"
|
||||
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
|
||||
networks:
|
||||
- wild-dragon
|
||||
|
||||
web-ui:
|
||||
build: ./services/web-ui
|
||||
ports:
|
||||
|
|
|
|||
|
|
@ -19,8 +19,17 @@ const parseRedisUrl = (url) => {
|
|||
|
||||
const redisOptions = parseRedisUrl(process.env.REDIS_URL || 'redis://localhost:6379');
|
||||
|
||||
// Human-readable node/GPU label stamped onto each job so the Jobs UI can show
|
||||
// which worker ran it (UI reads metadata.node). Set per container via env.
|
||||
const WORKER_LABEL = process.env.WORKER_LABEL || process.env.HOSTNAME || 'worker';
|
||||
|
||||
const createWorker = (queueName, handler, overrides = {}) => {
|
||||
const worker = new Worker(queueName, handler, {
|
||||
// Stamp node attribution into the job data on pickup (persists to Redis).
|
||||
const wrapped = async (job) => {
|
||||
try { await job.updateData({ ...job.data, node: WORKER_LABEL }); } catch (_) {}
|
||||
return handler(job);
|
||||
};
|
||||
const worker = new Worker(queueName, wrapped, {
|
||||
connection: redisOptions,
|
||||
// Stall detection: if a worker dies mid-job, BullMQ moves it back to wait
|
||||
// after stalledInterval. Without this a crashed run sits in active forever.
|
||||
|
|
@ -62,18 +71,27 @@ const FILMSTRIP_CONCURRENCY = parseInt(process.env.FILMSTRIP_CONCURRENCY || '2
|
|||
const CONFORM_CONCURRENCY = parseInt(process.env.CONFORM_CONCURRENCY || '1', 10);
|
||||
const TRIM_CONCURRENCY = parseInt(process.env.TRIM_CONCURRENCY || '4', 10);
|
||||
|
||||
// Capability routing: a worker only subscribes to the queues named in
|
||||
// WORKER_QUEUES (comma-separated). Unset = all queues (back-compat). This lets
|
||||
// us pin heavy encodes (proxy/conform/trim) to strong GPUs and light jobs
|
||||
// (thumbnail/filmstrip) to weak ones, each container bound to one GPU.
|
||||
const _wq = (process.env.WORKER_QUEUES || '').trim();
|
||||
const _enabled = _wq ? new Set(_wq.split(',').map(x => x.trim()).filter(Boolean)) : null;
|
||||
const want = (q) => !_enabled || _enabled.has(q);
|
||||
|
||||
const workers = [
|
||||
createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }),
|
||||
createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }),
|
||||
createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }),
|
||||
createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }),
|
||||
createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }),
|
||||
createWorker('import', youtubeImportWorker, {
|
||||
want('proxy') && createWorker('proxy', proxyWorker, { concurrency: PROXY_CONCURRENCY }),
|
||||
want('thumbnail') && createWorker('thumbnail', thumbnailWorker, { concurrency: THUMBNAIL_CONCURRENCY }),
|
||||
want('filmstrip') && createWorker('filmstrip', filmstripWorker, { concurrency: FILMSTRIP_CONCURRENCY }),
|
||||
want('conform') && createWorker('conform', conformWorker, { concurrency: CONFORM_CONCURRENCY }),
|
||||
want('trim') && createWorker('trim', trimWorker, { concurrency: TRIM_CONCURRENCY }),
|
||||
want('import') && createWorker('import', youtubeImportWorker, {
|
||||
concurrency: 1,
|
||||
lockDuration: 10 * 60 * 1000,
|
||||
lockRenewTime: 60000,
|
||||
}),
|
||||
];
|
||||
].filter(Boolean);
|
||||
console.log(`WORKER_QUEUES=${_wq || '(all)'}`);
|
||||
|
||||
// Filmstrip queue singleton — used by thumbnail worker to enqueue filmstrip jobs
|
||||
export const filmstripQueue = new Queue('filmstrip', { connection: redisOptions });
|
||||
|
|
@ -82,7 +100,11 @@ console.log(`Concurrency: proxy=${PROXY_CONCURRENCY} thumbnail=${THUMBNAIL_CONCU
|
|||
|
||||
// BUG FIX #4: startPromotionWorker() now returns a shutdown function that
|
||||
// clears the poll intervals and closes the promotion proxyQueue singleton.
|
||||
const stopPromotionWorker = startPromotionWorker();
|
||||
// Promotion (growing-files -> S3) is a polling SCAN, not a queue consumer.
|
||||
// With multiple worker containers it must run on exactly one, or every node
|
||||
// races the same files. Gate behind RUN_PROMOTION (set true on a single worker).
|
||||
const stopPromotionWorker = (process.env.RUN_PROMOTION === 'true') ? startPromotionWorker() : null;
|
||||
if (process.env.RUN_PROMOTION === 'true') console.log('[promotion] scanner ENABLED on this worker');
|
||||
|
||||
console.log('Wild Dragon Worker Service started');
|
||||
console.log(`Redis: ${redisOptions.host}:${redisOptions.port}`);
|
||||
|
|
@ -107,7 +129,7 @@ process.on('SIGTERM', async () => {
|
|||
youtubeProxyQueue.close().catch(() => {}),
|
||||
filmstripQueue.close().catch(() => {}),
|
||||
// BUG FIX #4: Stop the promotion worker intervals and close its proxyQueue
|
||||
stopPromotionWorker(),
|
||||
stopPromotionWorker ? stopPromotionWorker() : Promise.resolve(),
|
||||
]);
|
||||
|
||||
console.log('All workers and queues closed');
|
||||
|
|
|
|||
Loading…
Reference in a new issue