How to run a queue of tasks in NestJS

Updated: January 1, 2024 By: Guest Contributor Post a comment

Introduction

Managing asynchronous tasks efficiently is critical in server-side applications. This guide demonstrates how to implement a task queue in NestJS, ensuring your applications handle workload with resilience and scalability.

Setting Up a Basic Queue

Start by setting up a new NestJS project if you haven’t already. You’ll also need to install necessary packages:

npm install @nestjs/bull bull

Integrate the queue module into your application:

import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'myQueue',
    }),
  ],
})
export class AppModule {}

Create a service to manage your task:

import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

@Injectable()
export class QueueService {
  constructor(@InjectQueue('myQueue') private readonly myQueue: Queue) {}

  async addTask(data: any) {
    await this.myQueue.add(data);
  }
}

Now, let’s define a processor to handle tasks from the queue:

import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('myQueue')
export class MyProcessor {
  @Process()
  async handleJob(job: Job) {
    // Your task handling logic here
    console.log(`Processing job ${job.id} with data`, job.data);
  }
}

Advanced Queue Configuration

Optimize your queue with advanced configuration options. For instance, setting concurrency:

import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('myQueue')
export class MyProcessor {
  @Process({ concurrency: 5 })
  async handleJob(job: Job) {
    // Your task handling logic here
  }
}

Create separate processors for different job types:

@Processor('myQueue')
export class MyProcessor {
  @Process('transcode')
  async transcodeJob(job: Job) {
    // Transcoding logic
  }

  @Process('log')
  async logJob(job: Job) {
    // Logging logic
  }
}

Implement retry strategies:

BullModule.registerQueue({
  name: 'myQueue',
  options: {
    limiter: {
      max: 5,
      duration: 1000,
    },
    attempts: 5,
    backoff: {
      type: 'exponential',
      delay: 1000,
    },
  },
})

Consider a priority-based task handling system:

async addTask(data: any, priority: number) {
  await this.myQueue.add('priorityTask', data, {
    priority: priority,
  });
}

Integrating with Databases

Ensure the reliability of your tasks by integrating the queue with a database to manage job persistence.

import { BullModule } from '@nestjs/bull';

BullModule.registerQueue({
  name: 'myQueue',
  options: {
    redis: {
      host: 'localhost',
      port: 6379,
    },
  },
})

Handle task completion and failures:

@Processor('myQueue')
export class MyProcessor {
  @OnQueueCompleted()
  async onCompleted(job: Job, result: any) {
    // Logic to handle a completed job
  }

  @OnQueueFailed()
  async onFailed(job: Job, error: any) {
    // Logic to handle a failed job
  }
}

Monitoring and Scaling

Implement monitoring tools like Bull Board:

import { BullModule } from '@nestjs/bull';
import BullBoard from 'bull-board';
import { BullAdapter } from 'bull-board/bullAdapter'

// ...other imports

// After initializing AppModule
BullBoard.setQueues([
  new BullAdapter(myQueue)
]);

// Set up an endpoint to serve the UI
// Assume you have a route called '/admin/queues'
expressApp.use('/admin/queues', BullBoard.UI);

Discuss the benefits of scaling your application horizontally by running multiple job processors across different instances.

Conclusion

By following the steps above, you now know how to run a queue of tasks in NestJS effectively, harnessing the full power of the framework and Bull for task management. Experiment further to tailor the system to your specific needs, and you’ll be equipped to handle complex asynchronous operations with ease.