Workers
Background job processing using BullMQ with Redis. Jobs are defined as classes, automatically discovered, and executed by a worker runner with full dependency injection.
Setup
Enable the worker system in createApp():
const app = createApp({
config: AppConfig,
db: AppDB,
enableWorker: true
});This registers the job runner, queue registry, and CLI commands. In development, the runner auto-starts. In production, it's controlled via the ENABLE_JOB_RUNNER environment variable.
Defining Jobs
import { BaseJob, WorkerJob } from '@zyno-io/dk-server-foundation';
@WorkerJob()
class SendEmailJob extends BaseJob<{ to: string; subject: string; body: string }> {
async handle(data) {
await emailProvider.send(data.to, data.subject, data.body);
}
}With Return Values
@WorkerJob()
class ProcessImageJob extends BaseJob<{ url: string }, { width: number; height: number }> {
async handle(data) {
const result = await processImage(data.url);
return { width: result.width, height: result.height };
}
}Cron Jobs
@WorkerJob()
class DailyCleanupJob extends BaseJob {
static CRON_SCHEDULE = '0 0 * * *'; // Midnight daily
async handle() {
await cleanupExpiredSessions();
}
}Custom Queue
@WorkerJob()
class HighPriorityJob extends BaseJob<{ data: string }> {
static QUEUE_NAME = 'high-priority';
async handle(data) {
// Processed by a separate queue
}
}BaseJob<I, O>
Abstract base class for all jobs.
| Static Property | Type | Default | Description |
|---|---|---|---|
QUEUE_NAME | string | 'default' | BullMQ queue name |
CRON_SCHEDULE | string | null | null | Cron expression for repeatable jobs |
| Method | Description |
|---|---|
handle(data: I): Promise<O> | Job execution logic. Receives the queued data and returns the result. |
Queueing Jobs
import { WorkerService } from '@zyno-io/dk-server-foundation';
class OrderService {
constructor(private workerSvc: WorkerService) {}
async createOrder(order: Order) {
// ... create order ...
// Queue email notification
await this.workerSvc.queueJob(SendEmailJob, {
to: order.email,
subject: 'Order Confirmation',
body: `Your order #${order.id} has been placed.`
});
}
}Queue Options
await workerSvc.queueJob(SendEmailJob, data, {
delay: 5000, // Delay execution by 5 seconds
priority: 1, // Lower number = higher priority
attempts: 3, // Retry up to 3 times on failure
backoff: {
// Backoff strategy for retries
type: 'exponential',
delay: 1000
},
jobId: 'unique-id' // Deduplicate by job ID
});Jest Environment
In Jest, jobs are not queued -- queueJob() is a no-op. This prevents background job side effects during testing.
Worker Runner
The WorkerRunnerService discovers all @WorkerJob() decorated classes, registers cron schedules, and processes jobs with full Deepkit dependency injection.
Starting Manually
# Via CLI
node app.js worker:startAuto-Start (Development)
In development, the runner starts automatically if ENABLE_JOB_RUNNER is not explicitly set to false.
Job Recorder
The WorkerRecorderService monitors BullMQ queue events and logs job lifecycle to the _jobs database table. It tracks:
- Job added, active, completed, failed
- Execution duration
- Error messages for failed jobs
The _jobs table is created automatically if it doesn't exist.
Leader Election
The recorder uses LeaderService (Redis-based leader election) so that when multiple runner instances are deployed, only one of them acts as the recorder at any given time. If the current recorder goes down, another runner automatically takes over recording duties. This eliminates the need for a separate observer process.
Queue Registry
The WorkerQueueRegistry manages BullMQ queue instances as singletons:
import { WorkerQueueRegistry } from '@zyno-io/dk-server-foundation';
const queue = WorkerQueueRegistry.getQueue('default');
const defaultQueue = WorkerQueueRegistry.getDefaultQueue();
// Cleanup on shutdown
await WorkerQueueRegistry.closeQueues();Configuration
| Variable | Type | Default | Description |
|---|---|---|---|
BULL_REDIS_HOST | string | — | Redis host for BullMQ |
BULL_REDIS_PORT | number | — | Redis port for BullMQ |
BULL_REDIS_PREFIX | string | — | Redis key prefix |
BULL_QUEUE | string | default | Default queue name |
ENABLE_JOB_RUNNER | boolean | true (dev) | Enable job runner |
Falls back to default REDIS_* settings if BULL_REDIS_* is not set.