<![CDATA[Taskforce.sh Blog]]>https://blog.taskforce.sh/https://blog.taskforce.sh/favicon.pngTaskforce.sh Bloghttps://blog.taskforce.sh/Ghost 6.22Fri, 13 Mar 2026 22:03:57 GMT60<![CDATA[Implementing a Job Scheduler into a newsletter application]]>A while back, I developed a simple newsletter application to demonstrate telemetry integration in a real-world scenario. If you haven't already seen this post, I highly recommend checking it out. This application will now serve as a foundation for exploring a new feature in BullMQ. Version 5.16.

]]>
https://blog.taskforce.sh/implementing-a-job-scheduler-into-a-newsletter-application/67451040fd1b3a00015fdedcTue, 10 Dec 2024 12:40:13 GMT

A while back, I developed a simple newsletter application to demonstrate telemetry integration in a real-world scenario. If you haven't already seen this post, I highly recommend checking it out. This application will now serve as a foundation for exploring a new feature in BullMQ. Version 5.16.0 introduced the Job Scheduler, designed to replace repeatable jobs. We'll be incorporating this functionality into our newsletter application.

Before diving into the implementation, let's understand why repeatable jobs are deprecated and explore the key differences between the old and new approaches.

Before:

  • You add a repeatable job by providing a “repeat” option to Queue.add
  • You manage repeatable jobs by using removeRepeatable or removeRepeatableByKey

Now:

  • You add repeatable job by calling Queue.upsertJobScheduler
  • You manage repeatable jobs by using removeJobScheduler or getJobSchedulers

Similarities:

  • Both schedulers add new jobs when the currently active job begins processing.
  • A job will always remain in a delayed state as long as the scheduler continues to produce jobs.
  • Both support custom repeat strategies, which must be configured in both the Worker and the Queue.

You can read more about why the new Job Schedulers API is much more useful than the old one here: https://docs.bullmq.io/guide/job-schedulers

Equipped with an understanding of the new job scheduler, let's integrate it into our project. Bear with me that this is a theoretical example just for learning purposes, as in real life it would require that some newsletter has actually been written during the week, otherwise there will be no newsletter to send to any subscriber. Moreover, in a simple use case like this, you most likely will only have 1 job scheduler, as all newsletters are sent the same day and time of the week. In a more practical case you would probably use different cron patterns for different users depending on their timezones, or you would just have one job scheduler for all your users.

The implementation will be straightforward, requiring minimal code modifications. Notably, the new job scheduler seamlessly integrates with the OpenTelemetry instrumentation we added earlier. We'll begin by updating the main service:

import { NodemailerInterface } from '../interfaces/nodemailer.interface';
import { Queue } from 'bullmq';
import config from '../config';
import SubscribedUserCrud from '../crud/subscribedUser.crud';
import { BullMQOtel } from 'bullmq-otel';

const { bullmqConfig } = config;

class NewsletterService {
    private queue: Queue;
    private subscribedUserCRUD: typeof SubscribedUserCrud;
    private cronPattern: '0 0 12 * * 5';

    constructor() {
        this.queue = new Queue<NodemailerInterface>(bullmqConfig.queueName, {
            connection: bullmqConfig.connection,
            telemetry: new BullMQOtel('newsletter-tracer'),
        });

        this.subscribedUserCRUD = SubscribedUserCrud;
    }

    async subscribeToNewsletter(email: string) {
        const subscribedUser = await this.subscribedUserCRUD.create(email);
        if (!subscribedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: '[email protected]',
            subject: 'Subscribed to newsletter',
            text: 'You have successfully subscribed to a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);

        await this.startSendingWeeklyEmails(email);

        return subscribedUser;
    }

    async unsubscribeFromNewsletter(email: string) {
        const removedUser = await this.subscribedUserCRUD.delete(email);
        if (!removedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: '[email protected]',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have successfully unsubscribed from a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);

        const result = await this.stopSendingWeeklyEmails(email);
        console.log(result ? `scheduler for email: ${email} removed` : `scheduler for email: ${email} not found`);

        return removedUser;
    }

    /* Add this new method */
    private async startSendingWeeklyEmails(email: string) {
        await this.queue.upsertJobScheduler(
            `${email}`,
            { pattern: this.cronPattern },
            {
                name: 'send-weekly-newsletter',
                data: {
                    from: '[email protected]',
                    subject: 'weekly newsletter',
                    text: 'newsletter',
                    to: `${email}`,
                }
            }
        );
    }

    /* Add this new method */
    private async stopSendingWeeklyEmails(email: string) {
        return this.queue.removeJobScheduler(`${email}`);
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

As you can see we added 2 more methods: startSendingWeeklyEmails and stopSendingWeeklyEmails.

In this updated initialization method, we utilize the upsertJobScheduler function. We generate a unique job scheduler ID based on the user's email address. Then, we define the scheduling pattern using cron syntax to ensure emails are sent every Friday at 12 AM. Finally, we create a job template that will be executed weekly, sending a simple email notification.

In the second method, we utilize the removeJobScheduler function to delete the recurring job based on its unique ID. For better insight, we've included a console log to confirm successful deletion.

We then integrated these methods by triggering them after the initial subscription email, providing users with immediate feedback. With that, our implementation is complete, and we can proceed to testing.

Send a POST request to /api/newsletter/subscribe. The console should display a clear indication that the scheduler successfully completed its task: "Completed job repeat:[email protected]:1732577580000 successfully".

Now, let's examine the Jaeger results:

Implementing a Job Scheduler into a newsletter application
Implementing a Job Scheduler into a newsletter application

Let's tweek the code a bit. We are gonna change the cron expression to better see how telemetry is handling it. Change cron to * * * * * . This expression will send newsletter every minute. Let's examine the results. Go to the menu on the left, change service to consumer , operation to add mailbot.send-weekly-newsletter and hit search:

Implementing a Job Scheduler into a newsletter application

This will show us the output of the newsletter scheduler:

Implementing a Job Scheduler into a newsletter application

You'll notice a difference in the number of consumer services utilized between the two operations. Let's investigate the reason behind this discrepancy. Select the more recent trace to begin our analysis:

Implementing a Job Scheduler into a newsletter application

This particular trace provides valuable insights into the inner workings of our scheduling mechanism, revealing five distinct operations involved in the process. One of these operations originates directly from BullMQ, the remaining four operations stem from Redis, the in-memory data store that BullMQ utilizes to persist and manage job data. Let's delve deeper into the BullMQ operation. Its primary function is to add a scheduled task to the "delayed" state, effectively queuing it up for future execution. As we discussed earlier, BullMQ's job schedulers are designed to maintain a single job in this delayed state at all times, ensuring that there's always a task ready to be processed when the time is right. This job patiently bides its time for one minute before it's finally promoted to the active state. Once active, the job is picked up by a worker and processed according to the defined logic.

Implementing a Job Scheduler into a newsletter application

Now, let's compare this to the older trace:

Implementing a Job Scheduler into a newsletter application

In this trace, we observe two additional operations, one from Redis and one from BullMQ. As expected, these operations are processed one minute later, aligning with the cron schedule we defined. Let's expand the process mailbot operation to examine its details:

Implementing a Job Scheduler into a newsletter application

Notice the new Logs label. Let's expand it to see what it contains:

Implementing a Job Scheduler into a newsletter application

This reveals an event generated by the application, indicating that the job has been successfully completed. Since the scheduler always maintains a delayed job, it immediately adds another job with the same configuration as the one we just examined, thus continuing the cycle.

Now, let's unsubscribe from the newsletter and observe the process. Send a POST request to /api/newsletter/unsubscribe. You should see a confirmation message in the console: scheduler for email: [email protected] removed, indicating a successful operation. The same confirmation can be found in Jaeger:

Implementing a Job Scheduler into a newsletter application

This concludes our exploration of the newsletter application, which only scratches the surface of what job schedulers offer. These powerful tools provide a wealth of additional functionality. For instance, we could leverage the advanced options available for recurring jobs, such as:

  • limit - lets us limit the number of executions:
queue.upsertJobScheduler(
    `scheduler-id`,
    { limit: 10 },
    { jobData }
)
  • immediately - job scheduler adds jobs in the delayed state, it ensures that the job will be run as fast as possible without a delay:
queue.upsertJobScheduler(
    `scheduler-id`,
    { immediately: true },
    { jobData }
)
  • count - start value of the repeat iteration count:
queue.upsertJobScheduler(
    `scheduler-id`,
    { count: 2 },
    { jobData }
)

In the application we made use of the cron expression, but if we don’t want to, we can use every. This one will repeat the job every 10 seconds:

queue.upsertJobScheduler(
    `scheduler-id`,
    { every: 10000 },
    { jobData }
)

We can use it together with startDate/endDate to make sure the job will be repeating only after/before certain date:

queue.upsertJobScheduler(
    `scheduler-id`,
    { 
      every: 10000,
      startDate: new Date('2024-10-15T00:00:00Z'),
      endDate: new Date('2024-10-16T00:00:00Z'),
    },
    { jobData }
)

While this blog post covers the essentials of using job schedulers, there's much more to explore. For a comprehensive understanding of their capabilities, please check the official BullMQ documentation for more information.

]]>
<![CDATA[How to integrate BullMQ’s telemetry on a newsletter’s registration application]]>In this tutorial we are going to demonstrate how to use the new telemetry support thats built-in in BullMQ by creating a simple newsletter's registration application.

In order to show the potential of telemetry, we want an application that is composed of several components that interact with each

]]>
https://blog.taskforce.sh/how-to-integrate-bullmqs-telemetry-on-a-newsletters-subscription-application-2/672b484ce3198f0001a8e850Fri, 08 Nov 2024 10:52:45 GMT

In this tutorial we are going to demonstrate how to use the new telemetry support thats built-in in BullMQ by creating a simple newsletter's registration application.

In order to show the potential of telemetry, we want an application that is composed of several components that interact with each other. In this case we will have a standard ExpressJS server to handle HTTP requests for registering the newsletter's subscriptions, a BullMQ queue to handle the registrations and a postgreSQL database to store the subscribers and their statuses.

The idea is being able to see the requests all the way from the HTTP request up to the update of the database.

Why should you read this?

If you're using BullMQ and want to improve your application's monitoring and troubleshooting capabilities, this blog post is for you. The new telemetry functionality and this guide will save you time and effort by providing a streamlined approach to instrumenting BullMQ with OpenTelemetry.

What you will learn:

  • OpenTelemetry basics: Get a quick overview of OpenTelemetry and its core components (tracing, metrics, logs).
  • Introducing the new package: Explore the features and benefits of the new package that simplifies OpenTelemetry integration with BullMQ.
  • Step-by-step guide: Follow a practical tutorial to instrument your BullMQ queues and workers with OpenTelemetry.

Impatient? Jump to the solution:

If you prefer to dive straight into the code, check out the GitHub repository. Each chapter of the following tutorial is a separate branch and it will be linked accordingly.

Project Scaffold

To begin, let's set up our project. Start by creating a package.json file using the command npm init -y.

Next, install the necessary dependencies:

  • tsx
  • typescript
  • dotenv

Now, let's configure TypeScript. Create a tsconfig.json file in your project's root directory, using the recommended settings from tsx.

{
    "compilerOptions": {
        "moduleDetection": "force",
        "module": "Preserve",
        "resolveJsonModule": true,
        "allowJs": true,
        "esModuleInterop": true,
        "isolatedModules": true,
        "experimentalDecorators": true,
        "emitDecoratorMetadata": true
    },
}

tsconfig.json

Next, create a src directory in the project's root directory. Inside the src directory, create an empty file named index.ts.

Express setup

With the project's foundation in place, let's set up the necessary files, including routes, controllers, and services. First, install the required dependencies:

  • @types/express
  • express

Now, let's populate the index.ts file with the following code:

import express from 'express';
import config from './config';
import router from './routes';

const app = express();

const port = config.port || 3000;

app.use(express.json());
app.use(router);

app.listen(port, () => {
  console.log(`Listening ${port}`);
});

src/index.ts

This is the configuration file where we'll store all the essential values for the project:

import dotenv from 'dotenv';

dotenv.config();

export default {
    port: process.env.PORT
};

src/config/index.ts

Router inside routes folder:

import express from 'express';
import newsletterRoutes from './newsletter.route';

const router = express.Router();

router.use('/api/newsletter', newsletterRoutes);

export default router;

src/routes/index.ts

import express from 'express';
import controllers from '../controllers';

const router = express.Router();

const {newsletterController} = controllers;

router.post('/subscribe', newsletterController.subscribeToNewsletter);
router.post('/unsubscribe', newsletterController.unsubcribeFromNewsletter);

export default router;

src/routes/newsletter.route.ts

Controller inside controllers folder:

import newsletterController from './newsletter.controller';

export default {
    newsletterController
};

src/controllers/index.ts

import { Request, Response, NextFunction } from 'express';
import NewsletterService from '../services/newsletter.service';

class NewsletterController {
    constructor() {
        this.subscribeToNewsletter = this.subscribeToNewsletter.bind(this);
        this.unsubcribeFromNewsletter = this.unsubcribeFromNewsletter.bind(this);
    }

    async subscribeToNewsletter(req: Request, res: Response, next: NextFunction) {
        try {
            const subscribedUser = await NewsletterService.subscribeToNewsletter(req.body.email);
            if (!subscribedUser) {
                return res.json({message: 'user already subscribed'});
            }

            return res.json(subscribedUser);
        } catch(err) {
            return next(err);
        }
    }
   
    async unsubcribeFromNewsletter(req: Request, res: Response, next: NextFunction) {
        try {
            const unsubscribedUser = await NewsletterService.unsubcribeFromNewsletter(req.body.email);
            if (!unsubscribedUser) {
                return res.json({message: 'user is not a member of a newsletter'});
            }

            return res.json(unsubscribedUser);
        } catch(err) {
            return next(err);
        }
    }
}

export default new NewsletterController();

src/controllers/newsletter.controller.ts

Service for future logic inside services folder:

class NewsletterService {
    constructor() {}

    async subscribeToNewsletter(email: string) {
        return false;
    }

    async unsubcribeFromNewsletter(email: string) {
        return false;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

Next, create an .env file to store the application's port:

PORT=3000

.env

The only remaining step is to add a script to run the application. In your package.json file, within the scripts section, add:

"start": "tsx ./src/index.ts"

package. json

We now have a basic project structure for newsletter subscriptions. This includes a dedicated route, a controller to manage various service outcomes, and an empty service that we'll implement later with the core logic.

Nodemailer setup

To implement the service, we need a way to send emails. We'll use Nodemailer for this purpose, as it's user-friendly and offers a free testing account where you can inspect outgoing emails through their UI. Guide for that is here!

Install:

  • @types/nodemailer
  • nodemailer

To connect to Nodemailer, add the following values to your .env file, referencing the guide above for specific instructions:

NODEMAILER_HOST='smtp.ethereal.email'
NODEMAILER_AUTH_USER=''
NODEMAILER_AUTH_PASS=''
NODEMAILER_PORT=587

.env

Import them with config file:

import dotenv from 'dotenv';

dotenv.config();

export default {
    port: process.env.PORT,
    nodemailerConfig: {
        host: process.env.NODEMAILER_HOST,
        port: parseInt(process.env.NODEMAILER_PORT),
        auth: {
            user: process.env.NODEMAILER_AUTH_USER,
            pass: process.env.NODEMAILER_AUTH_PASS
        }
    },
};

src/config/index.ts

With our Nodemailer credentials in place, let's integrate it into our application. Create a new directory named nodemailer within the src directory, and inside it, create an index.ts file:

import nodemailer from 'nodemailer';
import config from '../config';

const { nodemailerConfig } = config;

const transporter = nodemailer.createTransport({
    host: nodemailerConfig.host,
    port: nodemailerConfig.port,
    auth: {
        user: nodemailerConfig.auth.user,
        pass: nodemailerConfig.auth.pass
    }
});

export default transporter;

src/nodemailer/index.ts

This code will enable our service to send emails. Let's update the service accordingly:

import nodemailer from '../nodemailer';

class NewsletterService {
    constructor() {}

    async subscribeToNewsletter(email: string) {
        await nodemailer.sendMail({
            from: '[email protected]',
            subject: 'Subscribed to newsletter',
            text: 'You have succesfully subscribed to a newsletter',
            to: `${email}`,
        });
        return true;
    }

    async unsubcribeFromNewsletter(email: string) {
        await nodemailer.sendMail({
            from: '[email protected]',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have succesfully unsubscribed from a newsletter',
            to: `${email}`,
        })
        return true;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

With these changes, sending a POST request to /api/newsletter/subscribe with an email field, or to /api/newsletter/unsubscribe, will trigger a response and generate an email notification visible in the Nodemailer UI.

Postgres Setup

To persist user data, we'll utilize PostgreSQL. Install the following dependencies:

  • pg
  • typeorm
  • @types/pg

Next, create a docker-compose.yaml file in the project root to run the database:

services:
  pg:
    image: postgres:16
    container_name: opentelemetry_bullmq_pg
    ports:
      - '5432:5432'
    environment:
      POSTGRES_USER: "${POSTGRES_USER}"
      POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
      POSTGRES_DB: "${POSTGRES_DB}"

docker-compose.yaml

This configuration exposes the database port for external access and retrieves configuration values from the .env file.

POSTGRES_USER=
POSTGRES_PASSWORD=
POSTGRES_DB=
POSTGRES_HOST=127.0.0.1
POSTGRES_PORT=5432

.env

import dotenv from 'dotenv';

dotenv.config();

export default {
    port: process.env.PORT,
    nodemailerConfig: {
        host: process.env.NODEMAILER_HOST,
        port: parseInt(process.env.NODEMAILER_PORT),
        auth: {
            user: process.env.NODEMAILER_AUTH_USER,
            pass: process.env.NODEMAILER_AUTH_PASS
        }
    },
    postgresConfig: {
        user: process.env.POSTGRES_USER,
        pass: process.env.POSTGRES_PASSWORD,
        db: process.env.POSTGRES_DB,
        host: process.env.POSTGRES_HOST,
        port: parseInt(process.env.POSTGRES_PORT || '5432')
    }
};

src/config/index.ts

Run docker-compose up to ensure everything is working correctly. If the database starts successfully, proceed to create the model, CRUD operations, and setup file.

import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';

@Entity()
export class SubscribedUser {
    @PrimaryGeneratedColumn('uuid')
    id: number

    @Column('text')
    email: string
}

src/models/subscribedUser.model.ts

import PGClient from '../db';
import { SubscribedUser } from '../models/subscribedUser.model';

class SubcribedUserCRUD {
    private pgClient: typeof PGClient.manager;
    private model: typeof SubscribedUser = SubscribedUser;

    constructor() {
        this.pgClient = PGClient.manager;
    }

    async create(email: string) {
        const exist = await this.read(email);
        if (!!exist) {
            return false;
        }

        const newSubscribedUser = new this.model();
        newSubscribedUser.email = email;

        return await this.pgClient.save(newSubscribedUser);
    }

    async read(email: string) {
        return await this.pgClient.getRepository(this.model).findOneBy({
            email: email
        });
    }

    async delete(email: string) {
        const subscribedUser = await this.read(email);
        if (!subscribedUser) {
            return false;
        }

        return await this.pgClient.getRepository(this.model).remove(subscribedUser);
    }
}

export default new SubcribedUserCRUD();

src/crud/subscribedUser.crud.ts

import { DataSource } from 'typeorm';
import { SubscribedUser } from '../models/subscribedUser.model';
import config from '../config';

const { postgresConfig } = config;

class PGClient {
    private client: DataSource;

    constructor() {
        this.client = new DataSource({
            type: 'postgres',
            host: postgresConfig.host,
            port: postgresConfig.port,
            username: postgresConfig.user,
            password: postgresConfig.pass,
            database: postgresConfig.db,
            entities: [SubscribedUser],
            synchronize: true,
            logging: false
        });
    }

    async init() {
        try {
            await this.client.initialize();
            console.log('database connected');
        } catch (err) {
            console.log('database connection error: ', err)
        }
       
    }

    async disconnect() {
        await this.client.destroy();
    }

    get clientInstance() {
        return this.client;
    }

    get manager() {
        return this.client.manager;
    }
}

export default new PGClient();

src/db/index.ts

And update main entry file for the project index.ts to initialize a postgres:

import express from 'express';
import config from './config';
import router from './routes';
import pgClient from './db';

const app = express();

const port = config.port || 3000;

(async () => {
  await pgClient.init();
})();

app.use(express.json());
app.use(router);

app.listen(port, () => {
  console.log(`Listening ${port}`);
});

src/index.ts

This code defines a simple model with an email field and CRUD operations to create, read, and delete users for newsletter management. Now, let's update the service to utilize these functionalities:

import nodemailer from '../nodemailer';
import SubscribedUserCrud from '../crud/subscribedUser.crud';

class NewsletterService {
    private subscribedUserCRUD: typeof SubscribedUserCrud;;

    constructor() {
        this.subscribedUserCRUD = SubscribedUserCrud;
    }

    async subscribeToNewsletter(email: string) {
        const subscribedUser = await this.subscribedUserCRUD.create(email);
        if (!subscribedUser) {
            return false;
        }

        await nodemailer.sendMail({
            from: '[email protected]',
            subject: 'Subscribed to newsletter',
            text: 'You have succesfully subscribed to a newsletter',
            to: `${email}`,
        });

        return subscribedUser;
    }

    async unsubcribeFromNewsletter(email: string) {
        const removedUser = await this.subscribedUserCRUD.delete(email);
        if (!removedUser) {
            return false;
        }

        await nodemailer.sendMail({
            from: '[email protected]',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have succesfully unsubscribed from a newsletter',
            to: `${email}`,
        })
       
        return removedUser;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

BullMQ setup

Finally, let's integrate BullMQ. Install the necessary dependency:

  • bullmq

Next, add a Redis service to your docker-compose.yaml file:

services:
  pg:
    image: postgres:16
    container_name: opentelemetry_bullmq_pg
    ports:
      - '5432:5432'
    environment:
      POSTGRES_USER: "${POSTGRES_USER}"
      POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
      POSTGRES_DB: "${POSTGRES_DB}"


  redis:
    image: redis:latest
    container_name: opentelemetry_bullmq_redis
    ports:
      - '6379:6379'

docker-compose.yaml

import dotenv from 'dotenv';

dotenv.config();

export default {
    port: process.env.PORT,
    nodemailerConfig: {
        host: process.env.NODEMAILER_HOST,
        port: parseInt(process.env.NODEMAILER_PORT),
        auth: {
            user: process.env.NODEMAILER_AUTH_USER,
            pass: process.env.NODEMAILER_AUTH_PASS
        }
    },
    postgresConfig: {
        user: process.env.POSTGRES_USER,
        pass: process.env.POSTGRES_PASSWORD,
        db: process.env.POSTGRES_DB,
        host: process.env.POSTGRES_HOST,
        port: parseInt(process.env.POSTGRES_PORT || '5432')
    },
    bullmqConfig: {
        concurrency: parseInt(process.env.BULLMQ_QUEUE_CONCURRENCY || '1'),
        queueName: process.env.BULLMQ_QUEUE_NAME || 'mailbot',
        connection: {
          host: process.env.BULLMQ_REDIS_HOST || 'redis',
          port: parseInt(process.env.BULLMQ_REDIS_PORT || '6379'),
        },
    },
};

src/config/index.ts

To integrate queueing into our application, we need to make a few modifications. First, let's create an interface for Nodemailer. Create a new directory named interfaces:

export interface NodemailerInterface {
    from: string;
    to: string;
    subject: string;
    text: string;
}

src/interfaces/nodemailer.interface.ts

Next, modify the index.ts file within the nodemailer directory to export a job for processing emails:

import nodemailer from 'nodemailer';
import { Job } from 'bullmq';
import config from '../config';
import { NodemailerInterface } from '../interfaces/nodemailer.interface';

const { nodemailerConfig } = config;

const transporter = nodemailer.createTransport({
    host: nodemailerConfig.host,
    port: nodemailerConfig.port,
    auth: {
        user: nodemailerConfig.auth.user,
        pass: nodemailerConfig.auth.pass
    }
});

export default (job: Job<NodemailerInterface>) => transporter.sendMail(job.data);

src/nodemailer/index.ts

In the same file, create a worker to consume the job. We'll add some helpful events and console logs to verify that everything is functioning as expected:

import { Worker } from 'bullmq';
import config from '../config';
import processor from './';

const { bullmqConfig } = config;

export function initWorker() {
    const worker = new Worker(bullmqConfig.queueName, processor, {
        connection: bullmqConfig.connection,
        concurrency: bullmqConfig.concurrency,
    });

    worker.on('completed', (job) =>
        console.log(`Completed job ${job.id} successfully`)
    );
   
    worker.on('failed', (job, err) =>
        console.log(`Failed job ${job.id} with ${err}`)
    );
}

src/nodemailer/worker.ts

And modify service to use our queue system:

import { NodemailerInterface } from '../interfaces/nodemailer.interface';
import { Queue } from 'bullmq';
import config from '../config';
import SubscribedUserCrud from '../crud/subscribedUser.crud';

const { bullmqConfig } = config;

class NewsletterService {
    private queue: Queue;
    private subscribedUserCRUD: typeof SubscribedUserCrud;

    constructor() {
        this.queue = new Queue<NodemailerInterface>(bullmqConfig.queueName, {
            connection: bullmqConfig.connection,
        });


        this.subscribedUserCRUD = SubscribedUserCrud;
    }

    async subscribeToNewsletter(email: string) {
        const subscribedUser = await this.subscribedUserCRUD.create(email);
        if (!subscribedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: '[email protected]',
            subject: 'Subscribed to newsletter',
            text: 'You have succesfully subscribed to a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);
        return subscribedUser;
    }

    async unsubcribeFromNewsletter(email: string) {
        const removedUser = await this.subscribedUserCRUD.delete(email);
        if (!removedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: '[email protected]',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have succesfully unsubscribed from a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);
        return removedUser;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

Additionally, create a file to initialize the worker:

import { initWorker } from './nodemailer/worker';

initWorker();

console.log('worker listening');

src/worker.ts

To run the worker, add a new script to your package.json file:

"start:worker": "tsx ./src/worker.ts"

package.json

With the queue system integrated, our application now stores jobs in Redis and processes them using the worker. You can run the application using the scripts we defined. First, start the worker:

npm run start:worker

Once the worker is initialized, start the main application:

npm run start

You can then test the application as before.

OpenTelemetry setup

Let's move on to the main part: integrating telemetry. First, install the required packages:

  • @opentelemetry/instrumentation-express
  • @opentelemetry/instrumentation-http
  • @opentelemetry/instrumentation-ioredis
  • @opentelemetry/instrumentation-pg
  • @opentelemetry/sdk-metrics
  • @opentelemetry/sdk-node
  • @opentelemetry/sdk-trace-node
  • bullmq-otel

bullmq-otel is the official library for seamlessly integrating OpenTelemetry with BullMQ. I'll demonstrate how to use it and create a setup file for OpenTelemetry shortly.

To begin, we need to update all instances where queues and workers are initialized, adding a new option to pass the BullMQOtel class:

import { Worker } from 'bullmq';
import config from '../config';
import processor from './';
import { BullMQOtel } from 'bullmq-otel';

const { bullmqConfig } = config;

export function initWorker() {
    const worker = new Worker(bullmqConfig.queueName, processor, {
        connection: bullmqConfig.connection,
        concurrency: bullmqConfig.concurrency,
        telemetry: new BullMQOtel('example-tracer')
    });

    worker.on('completed', (job) =>
        console.log(`Completed job ${job.id} successfully`)
    );
   
    worker.on('failed', (job, err) =>
        console.log(`Failed job ${job.id} with ${err}`)
    );
}

src/nodemailer/worker.ts

import { NodemailerInterface } from '../interfaces/nodemailer.interface';
import { Queue } from 'bullmq';
import config from '../config';
import SubscribedUserCrud from '../crud/subscribedUser.crud';
import { BullMQOtel } from 'bullmq-otel';

const { bullmqConfig } = config;

class NewsletterService {
    private queue: Queue;
    private subscribedUserCRUD: typeof SubscribedUserCrud;

    constructor() {
        this.queue = new Queue<NodemailerInterface>(bullmqConfig.queueName, {
            connection: bullmqConfig.connection,
            telemetry: new BullMQOtel('example-tracer')
        });


        this.subscribedUserCRUD = SubscribedUserCrud;
    }

    async subscribeToNewsletter(email: string) {
        const subscribedUser = await this.subscribedUserCRUD.create(email);
        if (!subscribedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: '[email protected]',
            subject: 'Subscribed to newsletter',
            text: 'You have succesfully subscribed to a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);

        return subscribedUser;
    }

    async unsubcribeFromNewsletter(email: string) {
        const removedUser = await this.subscribedUserCRUD.delete(email);
        if (!removedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: '[email protected]',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have succesfully unsubscribed from a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);

        return removedUser;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

Next, let's set up OpenTelemetry. Create a new directory named instrumentation inside the src directory.

Within the instrumentation directory, create two files: producer.instrumentation.ts and consumer.instrumentation.ts. These files will handle instrumentation for producers and consumers, respectively:

import { NodeSDK } from '@opentelemetry/sdk-node';
import { PeriodicExportingMetricReader, ConsoleMetricExporter } from '@opentelemetry/sdk-metrics';
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node';
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { IORedisInstrumentation } from '@opentelemetry/instrumentation-ioredis';

const sdk = new NodeSDK({
  serviceName: 'producer',
  traceExporter: new ConsoleSpanExporter(),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new ConsoleMetricExporter(),
  }),
  instrumentations: [
    new ExpressInstrumentation(),
    new HttpInstrumentation(),
    new PgInstrumentation(),
    new IORedisInstrumentation()
  ],
});

sdk.start();

src/instrumentation/instrumentation.producer.ts

import { NodeSDK } from '@opentelemetry/sdk-node';
import { PeriodicExportingMetricReader, ConsoleMetricExporter } from '@opentelemetry/sdk-metrics';
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node';
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { IORedisInstrumentation } from '@opentelemetry/instrumentation-ioredis';

const sdk = new NodeSDK({
  serviceName: 'consumer',
  traceExporter: new ConsoleSpanExporter(),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new ConsoleMetricExporter(),
  }),
  instrumentations: [
    new ExpressInstrumentation(),
    new HttpInstrumentation(),
    new PgInstrumentation(),
    new IORedisInstrumentation()
  ],
});

sdk.start();

src/instrumentation/instrumentation.consumer.ts

This code configures OpenTelemetry with a basic setup. We've assigned a service name for easy identification of trace origins and enabled console output for all telemetry data. Additionally, we've included automatic instrumentation libraries for various parts of our application: HTTP, Express, PostgreSQL, and Redis.

You might wonder how this differs from the bullmq-otel instrumentation. The key distinction is that these libraries utilize monkey patching to observe the application, while bullmq-otel doesn't. Observability in BullMQ is achieved through direct source code integration, providing greater control, flexibility, and maintainability. Rest assured, these libraries work together seamlessly.

To utilize the instrumentation files, update the starting scripts in your package.json file:

    "start": "tsx --import ./src/instrumentation/instrumentation.producer.ts ./src/index.ts",
    "start:worker": "tsx --import ./src/instrumentation/instrumentation.consumer.ts ./src/worker.ts"

package.json

Now, run the application as before. You should see OpenTelemetry spans displayed in the console.

Jaeger setup

While the additional logs provide some insight into BullMQ's internal operations, to fully leverage the power of observability, we need a centralized system for storing and visualizing traces. This will allow us to generate insightful diagrams and understand the precise execution order of different application components. OpenTelemetry's popularity brings a wide array of options, both commercial and open source, for achieving this.

Using the console alone for observability can be overwhelming and difficult to interpret. A more effective approach is to utilize a dedicated tool like Jaeger, specifically designed for trace storage and visualization.

To export spans to Jaeger, we need to install the necessary packages:

  • @opentelemetry/exporter-metrics-otlp-proto
  • @opentelemetry/exporter-trace-otlp-proto

Now, update your docker-compose.yaml file with a new Jaeger service:

services:
  pg:
    image: postgres:16
    container_name: opentelemetry_bullmq_pg
    ports:
      - '5432:5432'
    environment:
      POSTGRES_USER: "${POSTGRES_USER}"
      POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
      POSTGRES_DB: "${POSTGRES_DB}"

  redis:
    image: redis:latest
    container_name: opentelemetry_bullmq_redis
    ports:
      - '6379:6379'

  jaeger:
    image: jaegertracing/all-in-one:latest
    container_name: opentelemetry_jaeger_redis
    ports:
      - '16686:16686'
      - '4318:4318'

docker-compose.yaml

This configuration exposes two ports:

  • 4318: The endpoint for exporting traces in protobuf format.
  • 16686: The port for accessing the Jaeger UI.

Note that I'm using protobuf as the data format for Jaeger, which requires exposing port 4318. If you're using a different method, such as HTTP, you'll need to expose the appropriate port for that format.

Now, let's update the instrumentation files:

import { NodeSDK } from '@opentelemetry/sdk-node';
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { IORedisInstrumentation } from '@opentelemetry/instrumentation-ioredis';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-proto';

const sdk = new NodeSDK({
  serviceName: 'producer',
  traceExporter: new OTLPTraceExporter({
    url: 'http://127.0.0.1:4318/v1/traces'
  }),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new OTLPMetricExporter({
      url: 'http://127.0.0.1:4318/v1/metrics'
    }),
  }),
  instrumentations: [
    new ExpressInstrumentation(),
    new HttpInstrumentation(),
    new PgInstrumentation(),
    new IORedisInstrumentation()
  ],
});

sdk.start();

src/instrumentation/instrumentation.producer.ts

import { NodeSDK } from '@opentelemetry/sdk-node';
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { IORedisInstrumentation } from '@opentelemetry/instrumentation-ioredis';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-proto';

const sdk = new NodeSDK({
  serviceName: 'consumer',
  traceExporter: new OTLPTraceExporter({
    url: 'http://127.0.0.1:4318/v1/traces'
  }),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new OTLPMetricExporter({
      url: 'http://127.0.0.1:4318/v1/metrics'
    }),
  }),
  instrumentations: [
    new ExpressInstrumentation(),
    new HttpInstrumentation(),
    new PgInstrumentation(),
    new IORedisInstrumentation()
  ],
});

sdk.start();

src/instrumentation/instrumentation.consumer.ts

Here, we've replaced the console exporter with OTLP, directing the telemetry data to Jaeger.

Run the starting scripts again and navigate to http://localhost:16686 in your browser.

In the left-hand menu, you'll find an option to view traces from various services. You should see at least two services listed: producer and consumer. Select producer and click Find Traces to view the observed BullMQ operations.

How to integrate BullMQ’s telemetry on a newsletter’s registration application

This view displays comprehensive information about the observed application components, starting from the initial HTTP call, through Express and PostgreSQL, to BullMQ and Redis.

The left-hand menu allows you to filter traces by their origin, such as by consumer:

How to integrate BullMQ’s telemetry on a newsletter’s registration application

...or by other operations specific to your application:

How to integrate BullMQ’s telemetry on a newsletter’s registration application
How to integrate BullMQ’s telemetry on a newsletter’s registration application

One of the powerful capabilities of telemetry is the ability to gain insights into various parameters passed within your application. This can be invaluable for debugging and further development:

How to integrate BullMQ’s telemetry on a newsletter’s registration application

Furthermore, if any errors occur, they will be displayed as special events within the span. You can expand the Logs section of a span to view detailed information about the error:

How to integrate BullMQ’s telemetry on a newsletter’s registration application

It's crucial to remember that spans must be explicitly ended before they are sent to the telemetry backend. This means that if a worker process crashes mid-process, preventing the span from being closed, the entire trace will be lost and won't appear in Jaeger (or any other telemetry system you might be using).

For instance, if a worker dies in our example application, the complete trace, from the initial HTTP call to the Redis operation, will not be saved. This is a critical limitation to be aware of to avoid misinterpreting missing traces.

And there you have it! You've successfully built a simple BullMQ application with OpenTelemetry integration.

]]>
<![CDATA[Building docker images in CIs using secrets]]>I spent more time than I would like to admit trying to solve a problem I thought would be standard in the Docker world: passing a secret to Docker build in a CI  environment (GitHub Actions, in my case).

This post is not about mounting a file with environment

]]>
https://blog.taskforce.sh/building-docker-images-in-ci-using-a-secret/63e2976eab0d1d003dadc74fTue, 07 Feb 2023 19:25:03 GMT

I spent more time than I would like to admit trying to solve a problem I thought would be standard in the Docker world: passing a secret to Docker build in a CI  environment (GitHub Actions, in my case).

This post is not about mounting a file with environment secrets, as there are already good posts about it (such as this one). Instead, I needed to pass an environment variable as a secret for building my Docker container using GitHub Actions.

The solution turned out to be simple, but with one limitation: you can only use one secret per RUN statement in the Dockerfile.

To build the Docker image, you can pass the secret from an environment variable like this:

docker build --secret id=env,env=MY_SECRET -t my_image .

This will take the environment variable MY_SECRET and allow the Dockerfile to mount it as a file.

For example, if we need to pass an Npm token when installing the packages of a NodeJS application, we would do the following:


RUN --mount=type=secret,id=env export MY_SECRET=$(cat /run/secrets/env) \
    yarn

This statement does two things:

  1. It mounts the environment variable assigned to the "env" ID that was passed to the build command, and mounts it as a file, which by default would be placed in /run/secrets/myId (env in this example).
  2. It assigns the contents of the file to an environment variable that will only be available in that line of the Dockerfile (so you will need to mount it multiple times if you need it in different places). So yarn will hace access to it during the installation of the NodeJS packages.

Using this technique, you can mount several environment variables, but as far as I could figure out, you can only mount one per RUN statement. Maybe there is a way to overcome this limitation, but I did not have time to investigate further.

]]>
<![CDATA[BullMQ 3.0 released]]>We have just released a new major version of BullMQ. And as all major versions it includes some new features but also some breaking changes that we would like to highlight in this post. If you are using Typescript (as we dearly recommend), you will get compiler errors if you

]]>
https://blog.taskforce.sh/bullmq-3-0-release/6357f5139b5135003d61fbfdTue, 25 Oct 2022 20:34:08 GMT

We have just released a new major version of BullMQ. And as all major versions it includes some new features but also some breaking changes that we would like to highlight in this post. If you are using Typescript (as we dearly recommend), you will get compiler errors if you are affected by these changes.

Better Rate-Limiter

The rate-limiter provided by the previous versions of BullMQ was functional and did its job. The rate-limiter was based on delayed jobs, so as soon as a queue became rate-limited, all the jobs would be moved to the delay set with a carefully calculated delay so that they would be picked up as soon as the rate-limiter had expired.

This approach worked for many scenarios, and also made possible the implementation of a rudimentary rate-limiter based on group ids. However, we were never fully satisfied with this solution. It seemed unnecessary to move jobs from one set to another, which in some degenerate cases would result in very high CPU usage due to movements of jobs back and forward between these two sets.

Instead, we knew that the optimal solution would be to not perform any work as long as a queue was rate-limited. The problem was with the groups. Since we could not know if there were groups far away in the wait list, we could not just stop processing the queue, as it could be possible that there were jobs in groups that were not rate-limited yet.

In the end, we came to the conclusion that it was better to sacrifice the group based limiter in favor of a much better and robust global rate-limiter. The group rate limiter will still be available in a much better form and with more functionality in the Pro version.

Dynamic Rate-Limiter

A highly requested feature has been to be able to dynamically activate rate-limiting in a queue based on some condition that happened during the processing. For example, we can now activate the rate-limiter if an HTTP response included the 429 (Too Many Requests) code:

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    const result = await callExternalAPI(job.data);
    if (rateLimited(result)) {
      await rateLimitWorker.rateLimit(result.expireTime);
      throw Worker.RateLimitError();
    }
  }, { connection });
Example of a dynamic rate-limiter.

You can read this blog post with more details on how to implement rate-limiters with BullMQ.

Better typing for Backoff strategies

We made a small change in the way backoff strategies (how the delay is calculated when a job fails and should be retried) are defined.  Instead of passing an object with the strategies keyed by strategy name, we have simplified it so you only pass a function callback.

const myWorker = new Worker("test", async () => {}, {
  connection,
  settings: {
    backoffStrategy: (attemptsMade: number) => {
      return attemptsMade * 500;
    },
  },
});
New way to define custom backoff strategies.

Replaced "cron" by the more generic "pattern"

Since the introduction of custom repeat strategies, we have used the option "pattern", that is generic for any kind of strategy. With this new version of BullMQ we finally remove the "cron" option, so if you are still using it, please just rename it to "pattern" when migrating to version 3.0.

await myQueue.add(
  'submarine',
  { color: 'yellow' },
  {
    repeat: {
      pattern: '* 15 3 * * *',
    },
  },
);
]]>
<![CDATA[Rate-Limit recipes in NodeJS using BullMQ]]>As the communication between microservices increases and becomes more complex, we often have to deal with limitations on how fast we can call internal or external APIs. When the services are distributed and scaled horizontally, we find that limiting the speed while preserving high availability and robustness can become quite

]]>
https://blog.taskforce.sh/rate-limit-recipes-in-nodejs-using-bullmq/6356654c9b5135003d61fa02Mon, 24 Oct 2022 21:14:26 GMT

As the communication between microservices increases and becomes more complex, we often have to deal with limitations on how fast we can call internal or external APIs. When the services are distributed and scaled horizontally, we find that limiting the speed while preserving high availability and robustness can become quite challenging.

In this post, we will describe strategies that are useful when dealing with processes that need to be rate-limited using BullMQ.

What is rate limiting?

When we talk about rate limiting we often refer to the requirement of a given service to not be called too fast or too often. The reason is that, in practice, all services have limitations, and by putting a rate limit they protect themselves from going down due to too high traffic that they are just not design to cope with.

As a consumer of a service that is rate limited, we need to comply with the service's constraints to avoid the we risk of being banned or rate limited even harder.

A basic rate-limiter

Let's say that we have some service from which we want to consume some data by calling an API. The service specifies a limit on how fast we are allowed to call its API. The limit is usually specified in requests per second. So let's say for the sake of this example that we are only allowed to perform up to 300 requests per second.

Using BullMQ we can achieve this requirement using the following queue:

import { Worker } from "bullmq";

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    await callExternalAPI(job.data);
  },
  {
    connection: { host: "my.redis-host.com" },
    limiter: { max: 300, duration: 1000 },
  }
);
A simple rate-limited worker

The worker above will call the API at most 300 times per second. So now we can add as many jobs as we want using the code below, and we will have the guarantee that we will not call the external API more often than allowed.

import { Queue } from "bullmq";

const rateLimitQueue = new Queue("rate-limit", {
  connection: { host: "my.redis-host.com" },
});

await rateLimitQueue.add("api-call", { foo: "bar" });

Since we are using queues, we would like to get some extra robustness in case the call to the external API fails for any reason. We can simply add a retry option so that we can handle temporal failures as well:

import { Worker } from "bullmq";

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    await callExternalAPI(job.data);
  },
  {
    connection: { host: "my.redis-host.com" },
    limiter: { max: 300, duration: 1000 },
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  }
);
Rate-limit with exponential retries.

The worker will now retry in the case the API call fails, since we used the "exponential" back-off it will retry after 1, 2, 4, 8, and 16 seconds. If the job still fails after the last attempt, then the job will be marked as failed. You can still manually inspect the failed jobs (or retry manually), either using the Queue API of BullMQ or using a dashboard such as Taskforce.sh.

Scaling up

The avid reader may have discovered an issue in the example above. If the API call would take, for example, 1 second to complete, then even though we have a rate-limiter of 300 requests per second, we would just perform 1 request per second, since only one job will be processed at a any given time.

Thankfully we can easily increase the concurrency of our worker by specifying the concurrency factor. Lets crank it up to 500 in this case, this means that at most 500 calls will be processed in parallel (but no more than 300 per second):

import { Worker } from "bullmq";

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    await callExternalAPI(job.data);
  },
  {
    connection: { host: "my.redis-host.com" },
    concurrency: 500,
    limiter: { max: 300, duration: 1000 },
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  }
);
A worker with rate-limit and high concurrency enabled.

When the work to be done is mostly IO, it is efficient to increase the concurrency factor as much as we did in this example. If, on the other hand, the work had been more CPU intensive, like for example transcoding an image from one format to another, then we would certainly not use such a large concurrency value, since it would only add overhead without any gains in performance.
In order to increase performance on a CPU bound job, it is more effective to increase the number of workers instead, as we get more CPUs to do the actual work.

Adding redundancy

We have so far one single worker with a lot of concurrency. The worker will process jobs as fast as 300 requests per second, and retry in the case of failure. This is quite robust, but often times you need even more robustness. For instance, the worker may go offline, and in that case the jobs will stop processing. The queue will grow as new jobs are added to it, but none will be processed until the worker comes up online again.

The simplest solution to this problem is to spawn more workers such as the one shown above. You can have as many workers running in parallel on different machines as you want. With the added redundancy, you minimize the risk of not processing your jobs in time. Also, for jobs that require a lot of CPU, having several workers will increase the throughput of your queue.

Smoothing out requests

When defining a rate-limiter as we did in our example above, by specifying a number of requests per second, the workers will always try to process as fast as possible until they get rate limited. This means that the 300 requests may all be executed almost at once, and then the workers will be mostly idling during one second where they will once again perform a very fast burst of calls.

This behaviour may not be the most desirable one in many cases, fortunately it is quite easy to fix. Instead of defining the duration of the rate-limiter in seconds we can divide one second by the number of jobs, so that the workers process jobs during the whole second instead of in bursts.

So for example, in our case where we limited to 300 jobs per second, we can instead write the limiter max value as 1 and the duration like this: 1000 / 300 = 3.33:

{
  limiter: {
    max: 1,
    duration: 3.33
  }
}
Smoothing out a rate-limiter

With such rate-limiter, the jobs will now be processed at regular 3.33ms intervals, instead of being processed in bursts.

Dynamic rate-limit

There are occasions when a fixed rate-limit is not enough. For example, some services do rate limit you based on more complex rules and your HTTP call will return a 429 (Too Many) response. In this case the service will tell you how much you need to wait until you are allowed to perform the next call, for example in the response header we can find something like this:

Retry-After: 3600
A rate-limited service giving the delay for the next call.

BullMQ also supports dynamic rate-limiting, by signalling in the processor that a given queue is rate limited for some specified time:

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    const result = await callExternalAPI(job.data);
    if (rateLimited(result)) {
      await rateLimitWorker.rateLimit(1000);
      throw Worker.RateLimitError();
    }
  },
  {
    connection: { host: "my.redis-host.com" },
    concurrency: 500,
    limiter: { max: 300, duration: 1000 },
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  }
);
Dynamic rate-limiter

In this example, we assume that we get some result from the call to the external API that tells us that we should be rate limited. So we call the "rateLimit" method in the worker specifying, in milliseconds, how much to wait until we can process a job again.

We can additionally throw a special exception that tells the worker that this job has been rate limited, so we should process it again when the rate limit time has expired.

Groups

If you require even more advanced rate limit functionality, the profesional version of BullMQ provides support for groups.

Groups are necessary when you want to provide a separate rate-limiter to a given subset of jobs. For example, you may have a Queue where you put jobs for all the users in your system. If you used a global rate-limiter for the queue, then some users may get to use more of the system resources than others, as all are affected by the same limiter.

Using groups however, every user could have its own separate rate-limiter, and therefore they would not be affected by other user's high volume of work, as the busy users will be rate limited independently of the less "busy" users.

Furthermore, it is also possible to combine a global rate-limit with the per-group rate-limiter, so even in the case when you have a lot of jobs for many users all at the same time, you can still limit the total overall amount of jobs that get to be processed per unit of time.

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    await callExternalAPI(job.data);
  },
  {
    connection: { host: "my.redis-host.com" },
    concurrency: 500,
    limiter: { max: 300, duration: 1000 },
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    groups: {
      limit: {
        max: 20,
        duration: 1000,
      },
    },
  }
);
Rate-limit using groups

For more information about groups check the documentation.

]]>
<![CDATA[Announcing BullMQ 2.0]]>BullMQ has reached version 2.0. Which according to the "SemVer" standard means that we have introduced some breaking changes.

It is always an extra effort to deal with breaking changes, so we try to minimize them as much as we can. There are normally 2 types of

]]>
https://blog.taskforce.sh/bullmq-2-0/632598cdfef1f2003d78b8f5Wed, 21 Sep 2022 09:43:29 GMT

BullMQ has reached version 2.0. Which according to the "SemVer" standard means that we have introduced some breaking changes.

It is always an extra effort to deal with breaking changes, so we try to minimize them as much as we can. There are normally 2 types of possible breaking changes in BullMQ:

  • Changes that affect the API, or functional behaviour of the library.
  • Changes that affect the structure of the data stored in Redis.

In this version we only made breaking changes of the first type, so as long as your typescript compiles after upgrading, you should be all set.

Without further ado let's review the changes which will affect your current codebase if you want to upgrade.

QueueScheduler has been removed

The class that took care of delayed and stalled jobs has been removed from the library and is no longer necessary. We have implemented a simpler mechanism for handling delayed jobs (in a separate post I will go through the details of this mechanism), and also moved the stalled jobs checker to the workers themselves.

This change does not only make your codebase simpler, but also consumes less connections and have one less moving part that, honestly, has always been like a stone on my shoe, but we did not have a good solution to get rid of it.

With the development of the groups functionality in BullMQ Pro, however, we discovered a trick that could also be applied to how delayed jobs where handled, so after some redesign we were able to implement it and ultimately get rid of the QueueScheduler altogether.

The "Added" event does not contain data and options anymore

Previously to version 2.0 we were storing the data and opts of the job on the "added" event. This was for most users just a waste of memory and CPU, so we are now only sending the job id, and if you need the data for your specific use case you can just get it from the job.

Compatibility layer  with legacy Bull removed

This layer was conceived to be used if you wanted to upgrade from a Bull codebase to BullMQ without needing to use the newer classes. But as far as we are aware, it was not used by anyone (not a single issue or question since BullMQ was release 3 years ago), so we decided to remove it to make the codebase smaller.

Minimum Redis version recommended is now 6.2

We have raised the minimum version of Redis to 6.2. Although version 5.0+ will kind of work, it is highly recommended that you upgrade to at least version 6, since with the removal of the QueueScheduler we now need that the timeout argument is interpreted as a "double" in the BRPOPLPUSH command. If you are still on 5.0 version, the delayed jobs will sometimes be delayed more than they should as the timeout cannot represent exactly the remaining delay to the next job.

We also made some smaller fixes here and there, you can find the complete changelog here. As always, we will keep fixing issues and adding new features until we reach the next major version of BullMQ.

]]>
<![CDATA[Dividing heavy jobs using BullMQ flows in NodeJS]]>Divide and conquer is a well-known programming technique for attacking complex problems in an easier way. In a distributed environment it can be used to accelerate long processes by dividing them into small chunks of work and then joining the results together.

BullMQ provides a powerful feature called "flows

]]>
https://blog.taskforce.sh/splitting-heavy-jobs-using-bullmq-flows/62a29e5d951b0f003dcac77aWed, 15 Jun 2022 11:54:04 GMT

Divide and conquer is a well-known programming technique for attacking complex problems in an easier way. In a distributed environment it can be used to accelerate long processes by dividing them into small chunks of work and then joining the results together.

BullMQ provides a powerful feature called "flows" that allows a parent job to depend on several children. In this blog post, we are going to show how this feature can be used to divide a long CPU-intensive job (video transcoding), into smaller chunks of work that can be parallelized and processed faster.

All the code for this blog post can be found here.  Note that this code is written for a tutorial and not really a production standard.

Video transcoding

Transcoding of videos is a common operation on today's internet as videos that are uploaded to sites are often not in a suitable format for being reproduced on a webpage. The most common transcoding is performed to produce a standard format (often mp4) with a standard resolution and compression quality.

But transcoding is a quite CPU-demanding operation, and the time to transcode a video is normally proportional to the length of the video, so the process can become quite lengthy if the videos are really long.

Using the excellent video handling library FFmpeg we can transcode the videos from almost any known format to any other format. FFmpeg also allows us to quickly split a video into parts and also join the parts together. We will use these capabilities in order to implement a faster transcoding service using BullMQ flows.

Flow structure

We will create a flow with 3 levels, every level is handled by a queue and a specific worker.

The first queue will take the input file and divide it into chunks and add a new job to the second queue for every chunk as well as a job to a parent queue that will be processed when all the chunks have been processed.

Here is a diagram depicting the whole structure:

Dividing heavy jobs using BullMQ flows in NodeJS

File storage

For this tutorial we will, for simplicity, just use the local file storage, however, in a production scenario, we should use a distributed storage system such as S3 or similar so that all workers can access all data independently on where they run.

Worker Factory

Since we are going to use several queues, we will refactor the worker creation into its own function, this function will also create a queue scheduler so that we can get automatic job retries and stalled jobs handling if needed. We will attach a couple of listeners in order to get some feedback when the jobs are completed or failed.

For this application, even though it is very CPU intensive we will not use "sandboxed" processors because the call to FFmpeg is already spawning a new process, so using sandboxed processors would just increase the overhead.

Also note that although in this example we only use 1 worker per queue, in a production system we could have as many workers as we wish in order to process more videos faster.

import { Worker, QueueScheduler, Processor, ConnectionOptions } from "bullmq";

export function createWorker(
  name: string,
  processor: Processor,
  connection: ConnectionOptions,
  concurrency = 1
) {
  const worker = new Worker(name, processor, {
    connection,
    concurrency,
  });

  worker.on("completed", (job, err) => {
    console.log(`Completed job on queue ${name}`);
  });

  worker.on("failed", (job, err) => {
    console.log(`Faille job on queue ${name}`, err);
  });

  const scheduler = new QueueScheduler(name, {
    connection,
  });

  return { worker, scheduler };
}
src/workers/worker.factory.ts

Splitter Queue

The process starts with a queue "splitter" where a worker will take the video input and split it into similar-sized parts that can later be transcoded in parallel.  Since the whole point of splitting is to accelerate the transcoding process, we need the split to be very fast, for this we can use the following FFmpeg flags:  "-c copy -map 0 -segment_time 00:00:20 -f segment", they will split the file into around 20 seconds chunks without re-encoding, more information can be found in stackoverflow. It is possible there are other chunk sizes that are more optimal, but for this example, 20 seconds is good enough.

After the video file has been split we need to parse FFmpeg output to get the filenames of every chunk, and with this information create a BullMQ flow that will process every chunk, and when all chunks are processed a "parent" worker will concatenate the transcoded chunks into one output file.

export default async function (job: Job<SplitterJob>) {
  const { videoFile } = job.data;

  console.log(
    `Start splitting video ${videoFile} using ${pathToFfmpeg}`,
    job.id,
    job.data
  );

  // Split the video into chunks
  const chunks = await splitVideo(videoFile, job.id!);
  await addChunksToQueue(chunks);
}

async function splitVideo(videoFile: string, jobId: string) {
  // Split the video into chunks
  // around 20 seconds chunks
  const stdout = await ffmpeg(
    resolve(process.cwd(), videoFile),
    "-c copy -map 0 -segment_time 00:00:20 -f segment",
    resolve(process.cwd(), `output/${jobId}-part%03d.mp4`)
  );

  return getChunks(stdout);
}

function getChunks(s: string) {
  const lines = s.split(/\n|\r/);
  const chunks = lines
    .filter((line) => line.startsWith("[segment @"))
    .map((line) => line.match("'(.*)'")[1]);

  return chunks;
}

async function addChunksToQueue(chunks: string[]) {
  const flowProducer = new FlowProducer();

  return flowProducer.add({
    name: "transcode",
    queueName: concatQueueName,
    children: chunks.map((chunk) => ({
      name: "transcode-chunk",
      queueName: transcoderQueueName,
      data: { videoFile: chunk } as SplitterJob,
    })),
  });
}
src/workers/splitter.ts

Transcoder Queue

In this queue, we add the chunks that are part of some video file. Note that the worker of this queue does not need any other information than the chunk file itself so it can just focus on performing the transcoding of that chunk as efficiently as possible. In fact, this worker is quite simple compared to the splitter worker:

export default async function (job: Job<SplitterJob>) {
  const { videoFile } = job.data;
  console.log(`Start transcoding video ${videoFile}`);

  // Transcode video
  return transcodeVideo(videoFile, job.id!);
}

async function transcodeVideo(videoFile: string, jobId: string) {
  const srcName = basename(videoFile);
  const output = resolve(process.cwd(), `output/transcoded-${srcName}`);

  await ffmpeg(
    resolve(process.cwd(), videoFile),
    "-c:v libx264 -preset slow -crf 20 -c:a aac -b:a 16k -vf scale=320:240",
    output
  );

  return output;
}
src/workers/transcoder.ts

It transcodes the input to an output file and returns the file path to said file.

Concat Queue

The last queue is the parent of the "transcoder" code, and as such will start processing its job as soon as all the children have completed it. In order to concatenate all the files we need to generate a special text file with all the filenames and feed it to FFmpeg:

async function concat(jobId: string, files: string[], output: string) {
  const listFile = resolve(process.cwd(), `output/${jobId}.txt`);
  await new Promise<void>((resolve, reject) => {
    writeFile(
      listFile,
      files.map((file) => `file '${file}'`).join("\n"),
      (err) => {
        if (err) {
          reject(err);
        } else {
          resolve();
        }
      }
    );
  });

  await ffmpeg(null, `-f concat -safe 0 -i ${listFile} -c copy`, output);
  return output;
}
src/workers/concat.ts

With this last worker, we conclude the whole process. I hope this simple example for videos is useful to illustrate how jobs can be split in order to accelerate the time it takes for lengthy operations. The complete source code can be found here.

]]>
<![CDATA[Do not wait for your jobs to complete]]>A common pattern that arises when working with queues in general and BullMQ in particular, is the one of waiting for a job to complete. The reasoning goes, that if the job is doing something useful we need to wait for it to be complete so that we can get

]]>
https://blog.taskforce.sh/do-not-wait-for-your-jobs-to-complete/629d73c7951b0f003dcac4e8Thu, 09 Jun 2022 09:49:15 GMT

A common pattern that arises when working with queues in general and BullMQ in particular, is the one of waiting for a job to complete. The reasoning goes, that if the job is doing something useful we need to wait for it to be complete so that we can get the result of its work.

So an intuitive way to code this behaviour would be something like this:

const job = queue.add("my-job", { foo: 'bar' });

const result = await job.waitUntilFinished();

doSomethingWithResult(result);

This is intuitive because it follows a sequence of actions and it's what you may be used to do on a single process, sequential program. However when using distributed queues on a highly tolerant system you do not want to solve the problems like this.

First of all, the code above does not provide any guarantees that the call to "doSomethingWithResult(result)" will ever be performed successfully, and if it fails, the result of that job will be lost forever.

Secondly, we do not get any guarantees on how long time it will take for a given job to complete. The point of the queue in the first place was to offload job to workers, so it could very easily be the case the workers are quite busy and the duration of the call to "waitUntilFinished" be unacceptable. You could add a timeout, but that would also make us lose that job's result forever.

Finally, it is much less efficient, imagine that for every job that you add to the queue you are also creating a promise and listeners and wait them to resolve. If you are adding many jobs per second this will be a slow way to handle them and will most likely become a performance and memory bottleneck.

So "waitUntilFinished" has very limited use, it is mostly used by test code but not on production code unless the above limitations do not matter for your specific case, but most likely you will be better off using a different approach.

How to do it instead?

The solution is to think about completing jobs in a different way. You have to consider that once you add a job to a queue, the "producer" of that job must let go. Now the job will live its own life processed by one of the potentially hundreds of workers running in parallel.

Let's illustrate this different way of thinking with a couple of examples.

HTTP Call that must return the job result

This one is quite a common case where you have for example a POST HTTP call to a service, the service will do some heavy work so you want to offload this job to a queue and when the job completes return this result to the caller.

If these are your requirements, then you will need to actually change them a little bit because waiting for a job to complete inside an HTTP handler does not scale and suffers from all the problems highlighted above.

Instead what you typically do in this case is to break the call into 2 (or more) calls.

The first call will just enqueue the job and return to the caller a Job ID or any other ID that the caller can later use for asking about this particular job status.

app.post("/jobs", json(), async (req, res) => {
  try {
    const job = await queue.add("my-queue", req.body);
    res.status(201).send(job.id);
  } catch(err) {
    res.status(500).send(err.message);
  }
});

The job has been queued and the user gets a response from the HTTP call immediately. With the provided Job ID, the caller can now ask for the job status, something like:

app.get("/jobs/:id/status", async (req, res) => {
  try {
    const status = await queue.getJobStatus("my-queue", req.params.id);
    res.status(201).send(status);
  } catch(err) {
    res.status(500).send(err.message);
  }
});

Long running process

Sometimes you have jobs that will need to work for a long time until completing. In this case it is quite common to report progress for some interested parties. The way to do this is by using the job api "progress":

export default async function (job: Job) {
  await job.log("Start processing job");
  
  for(let i=0; i < 100; i++) {
     await processChunk(i);
     await job.progress({ percentage: i, userId: job.data.userId });
  }
}

You can listen to all the progress events for a given queue, and when some progress is detected report to the interested party.

const queueEvents = new QueueEvents("my-queue", { connection });

queueEvents.on("progress", async ({ jobId, data }) => {
  const { userId } = data;
  
  // Notify userId about the progress
  await notifyUser(userId, data);
});

Now the question which falls a bit outside of the scope of this post is how to practically notify the relevant user with the progress information. Typically it would be via some WebSocket associated with a certain user id, so you could just send a message to that socket. Now, the important thing to notice here is that notifying the progress is usually not a critical thing to do, so it is not the end of the world if for some temporal network issue the user does not receive a particular progress event.

However, it may be critical to notify the completion of the job to some other system. In this case, a robust pattern is to use another queue only to notify completions. As we will be adding a job to a different queue inside the process function, we get the guarantee that if the job is added successfully, the "message" will be delivered:

export default async function (job: Job) {
  await job.log("Start processing job");
  
  for(let i=0; i < 100; i++) {
     await processChunk(i);
     await job.progress({ percentage: i, userId: job.data.userId });
  }
  await otherQueue.add("completed", { id: job.id, data: job.data });
}

The otherQueue would be handled by the service interested in knowing that the job has been performed, just process the message and do something useful, maybe update a status in a database, or whatever.

export default async function (job: Job) {  
  await updateDatabaseWithResult(job.data);
}
]]>
<![CDATA[Typescript with BullMQ sandboxes in NodeJS]]>Writing code using Typescript and BullMQ is quite straightforward most of the time since BullMQ itself is written in Typescript.

However there is a special case that has kept people scratching their heads unnecessarily and that is when you want to use sandboxed workers, i.e. a worker that is

]]>
https://blog.taskforce.sh/using-typescript-with-bullmq/62a082dc951b0f003dcac582Wed, 08 Jun 2022 13:07:55 GMT

Writing code using Typescript and BullMQ is quite straightforward most of the time since BullMQ itself is written in Typescript.

However there is a special case that has kept people scratching their heads unnecessarily and that is when you want to use sandboxed workers, i.e. a worker that is run in a separated NodeJS process. In this blog post I would like to offer an example with some scaffolding to easily accomplish this. It is by no means the only possible way to do it, but it is one way that works well in most cases.

Source code

The source code for this blog post is available here. Feel free to fork or copy the repo if you want to use it as a blueprint for new projects. And here the documentation for sandboxed processors.

ESM or CommonJS

Before writing this post (in mid 2022),  I spent a lot of time trying to port a medium sized project to a pure ESM setup with Typescript. The amount of problems I got down this path made me reconsider and finally I found a sweet spot which is, use ES6 import syntax within Typescript but configure the compiler to generate CommonJS modules. This happens to work quite well, although it would be of course more elegant to use the new ES6 modules all the way. Maybe in the future it would be easier but for now I recommend this setup when working with Typescript in the NodeJS ecosystem.

Project structure

We will create a typical structure for a Typescript application in NodeJS, that is providing a package.json with scripts for building, running in dev mode as well as running tests. We will also provide a suitable tsconfig.json file.

Typescript with BullMQ sandboxes in NodeJS

Inside the src directory we will put all the code for our application, including the workers. Note that for large apps you may want to move the worker code to a separate repository, away from the main application logic.

Typescript with BullMQ sandboxes in NodeJS

Worker

For this example we will just have a dummy worker that doesn't do anything:

import { Job } from "bullmq";

/**
 * Dummy worker
 *
 * This worker is responsible for doing something useful.
 *
 */
export default async function (job: Job) {
  await job.log("Start processing job");
  console.log("Doing something useful...", job.id, job.data);
}
src/workers/my-worker.ts

As the worker is going to be used as a sandboxed worker we just implement the "process" function and export it as the default export.

The worker is then instantiated in a different file, in our case it would be on the index.ts. For this example we will just use concurrency 1 so we leave all the options to their defaults:

const myQueue = new Worker(queueName, `${__dirname}/workers/my-worker.js`, {
  connection,
});
index.ts

Note that we do not specify the original ".ts" file, but the compiled ".js" file instead which will be built along the rest of the source code. This may look a bit awkward but it does not have any major drawbacks.

Producer

For this dummy example provide also a "producer" that adds some jobs to the queue so that the worker can process them. The code is very simple and we can just add a bunch of jobs like this:

import { Queue } from "bullmq";
import { queueName } from "./types";

const connection = {
  host: "localhost",
  port: 6379,
};

const myQueue = new Queue(queueName, { connection });

async function addJobs() {
  console.log("Adding jobs...");
  for (let i = 0; i < 10; i++) {
    await myQueue.add("my-job", { foo: "bar" });
  }
  console.log("Done");
  await myQueue.close();
}

addJobs();
producer.ts

Again, this code is very simple for illustration purposes, in production you normally will add the jobs in parallel instead of "awaiting" in sequence as the example above.

Development

During development it is very convenient to have a watcher configured so that as soon as we modify any source code file the new code is run automatically. For this I like to use the simplest possible tool that gets the job done, and in this case it is tsc-watch. The thing I like most about it is that you will run the same code in development as you would on production, so there will be no surprises there, the only difference would be that in production we will just run the code without the watcher.

We just provide the following scripts to package.json:

 "scripts": {
    "start": "node './dist/index.js'",
    "dev": "tsc-watch --onSuccess \"node ./dist/index.js\""
 }
package.json

When you are developing with workers, keep in mind that an  application reload will most likely kill your current workers so jobs may get stalled, and eventually the jobs will end failing if this happens several times.

Testing

It is easy to write tests in typescript and import the workers in order to write unit or integration tests. For this we will use "mocha" and specifically ts-mocha which works pretty well with Typescript. A simple test would look like this:

import { Job } from "bullmq";
import myWorker from "../src/workers/my-worker";
import { describe, it } from "mocha";

describe("My worker", () => {
  it("should do something", async () => {
    const job = {
      data: {
        foo: "bar",
      },
      async log(msg: string) {
        console.log(msg);
        return 0;
      },
    };

    await myWorker(<Job>job);
  });
});
tests/my-worker.test.ts

And we can add a run script to package.json like this:

  "scripts": {
    "test": "ts-mocha ./tests/**/*.test.ts"
  }
package.json

And run it using yarn:

Typescript with BullMQ sandboxes in NodeJS

Production

For production you will just run the built index.ts file in dist/index.js. The complete scripts section in package.json will look like this:

"scripts": {
    "build": "tsc",
    "start": "node './dist/index.js'",
    "dev": "tsc-watch --onSuccess \"node ./dist/index.js\"",
    "test": "ts-mocha ./tests/**/*.test.ts",
  }

Here the repo with the full example.

]]>
<![CDATA[BullMQ Pro Edition]]>https://blog.taskforce.sh/bullmq-pro-edition/616f740bd95ca400482829c0Wed, 20 Oct 2021 04:10:21 GMT

When the first version of Bull was released in 2012, it simply aimed at solving a use case we had for Castmill, namely we needed queues for processing simple video and image transcoding jobs. Those where the young days of NodeJS but there was a promising package called "Kue" that should do all we needed.

After a couple of months using Kue on production we noticed that sometimes the queues just stopped processing entirely. When this happened it was not enough with restarting the servers, some Redis "fu" was needed in order to fix the issue and then re-start the servers.

This was not very optimal so I tried to find the issue in the source code of Kue, unfortunately the design used for processing the jobs was not very robust, since it did not use the recommended Redis RPOPLPUSH command that is necessary in order to guarantee robust queues. My two options were to either dig in in the Kue codebase and change it so that it used the more robust pattern, or just write a simple alternative that did what we needed and nothing else.

Thats how Bull started and the rest is kind of history now. Many years later and a huge amount of time invested in the library has resulted in two very popular libraries in the NodeJS ecosystem for queue handling, Bull and the newest typescript based, BullMQ.

I am quite proud with the feature set we provide for free in the open source versions, and that many of those features are not even available in any other Redis based queue systems, open source nor commercial.

We have accumulated quite a large backlog of features that our users have requested over the years. Some of these features are simply too complex for implementing them in our spare time.

Introducing BullMQ Pro

BullMQ Pro is the "professional" edition of BullMQ where we are investing in these features as well as providing a more private support for the companies that requires it.

I feel that the Pro edition is the best way forward, in my view the best of both open source and enterprise worlds. If you are a user that relies on Bull/BullMQ for your business I think it is reassuring to know there is a commercial team behind to help you out when you most need it, either with that critical new feature that will make your life so much easier or maybe with that bug you hit that needs to be resolved ASAP in order to not impair your product.

BullMQ Pro includes, at time of writing this post, a couple of new features that I hope you are going to find  really useful:

  • Support of observables in the process functions. These enables new use cases, such as state machines and also provides a clean mechanism for cancelling running processes and a proper time to live (TTL)  functionality.
  • Groups support. This one I think is huge and one feature that I have been dreaming on implementing for a long time. It is finally here. You can just assign jobs to an unlimited number of groups and the jobs will be processed in a round-robin fashion, so in practice it works as if you had a "virtual queue" per group. You can read more about groups here.
  • Rate limit per group. Although the standard versions of Bull/BullMQ supports rate limiting based on group keys, the new rate limit is implemented as a core feature of the groups, so it is much more robust and efficient. Instead of being built on top of delayed jobs it actually "rate limits" the group so no CPU is used at all when a group is rate limited.

You can read more about this features in the official documentation.

Of course this is just the beginning, we have a long backlog of features we will keep implementing on BullMQ Pro. We are keeping the project open so that you can peek and comment on the issues, (github.com/taskforcesh/bullmq-pro-support).

Two new  features we are currently working on:

  • Batch processing.
  • Topics.

The future of Bull/BullMQ open source editions

We are fully committed to the continuous development of Bull/BullMQ, however the development will be focused mainly on improving stability and adding minor features. If you are a user of any of these two libraries and happy with the current set of features then rest assured that we will keep maintaining them, in fact we will be in a much better position of maintaining them having better financial support.

What about the price?

We are aware that BullMQ Pro may  not yet have all the bells and whistles, so we are providing a generous license where you can use the library on any projects you want (as long as they are part of the same organization), for a introductory price of 95$/mo (or 995$ yearly).

The subscriptions are handled in your Taskforce.sh account.

Something missing?

If as a company you are lacking something that you wish we could provide and that is not covered by the pro edition, please let me know, I would gladly like to discuss it in more deep with you.

]]>
<![CDATA[Implementing a robust and scalable Webhooks/Notification system in NodeJS]]>

(c) Eric Scales. Get it at Etsy

A lot of modern REST APIs need to perform asynchronous operations, instead of unreliably keeping a long living http connection to the server waiting for it to complete, it is common to instead implement a notification system using webhooks. Notifications can be used

]]>
https://blog.taskforce.sh/implementing-a-webhook-system-with-bullmq/603fc8070e9bfe0039b9adabFri, 26 Mar 2021 15:21:30 GMTImplementing a robust and scalable Webhooks/Notification system in NodeJS

(c) Eric Scales. Get it at Etsy

A lot of modern REST APIs need to perform asynchronous operations, instead of unreliably keeping a long living http connection to the server waiting for it to complete, it is common to instead implement a notification system using webhooks. Notifications can be used for many other things of course, a good example is Stripe's API which uses webhooks extensively for notifying events such as subscription creation,  payments, charges, or anything in between.

In this post I would like to show you how to implement such a webhook system in NodeJS using BullMQ for powering the queues. You can find a fully working code for this tutorial here.

The tutorial is written so that the core concepts of a webhook system are explained while things like security are omitted for simplicity, please ping me if you would like me to cover security as well in a second post.

Design

We are going to go through a solution for the following use case: we have a server that exposes a POST endpoint  to request  async tasks. When the user calls this endpoint a job is created and placed into the Tasks Queue. A fleet of workers consumes these jobs, and when each one of these jobs completes, a new job is created and added to the Webhook Queue.

graph LR A((User)) -->|POST task| B[(REST Service)] B --> |add Task Job| C([Tasks Queue]) C:::queue --> |consume task job| D(Tasks Workers) D --> |add webhook job| E([Webhooks Queue]) E:::queue style A fill:#009bff,stroke:#333,stroke-width:4px style B fill:#26faff,stroke:#333,stroke-width:4px classDef queue fill:#ff7983,stroke:#333,stroke-width:4px

Another fleet of workers will now try to notify the user's webhooks with the result of the task that has been completed. If the call to the webhook fails we will retry a couple of times using an exponential backoff, and if after a number of attempts we still do not manage to perform the call we will add a new job to the "mailbot queue" to notify the user via email that the webhook is not working properly.

graph LR E([Webhooks Queue]):::queue --> |consume webhook job| G(Webhooks Workers) G --> |webhook call| H[(Client Service)] G --> |add emaill failure job| F([Email queue]) F:::queue style H fill:#009bff,stroke:#333,stroke-width:4px classDef queue fill:#ff7983,stroke:#333,stroke-width:4px

Implementation

So let's start by creating our server that handles the posted tasks. We will use a simple express server for this:

import express from "express";
import { Queue } from "bullmq";
import config from "./config";

const app = express();

const taskQueue = new Queue("tasks", { connection: config.connection });

app.post("/users/:userId/tasks/:taskType", express.json(), (req, res) => {
  const taskData = req.body;

  console.log(`Received task ${req.params.taskType} to process...`);

  taskQueue
    .add(req.params.taskType, { userId: req.params.userId, taskData })
    .then(
      (job) => {
        res.status(201).end(job.id);
      },
      (err) => {
        res.status(500).end(err);
      }
    );
});

console.log(`Start listening to port ${config.port}`);
app.listen(config.port);
index.ts

There is nothing fancy here, just proxying the POST call as a new task job into a queue. Since we return after adding to the queue, the user can rest assured that the task will be performed if she gets a status 201 when calling this endpoint.

The worker code for this queue will perform the actual task and post a new job to the webhooks queue:

import { Worker, Queue } from "bullmq";
import config from "./config";

const webhooksQueue = new Queue("webhooks", { connection: config.connection });

export const taskWorker = new Worker<{ userId: string; task: any }>(
  config.taskQueueName,
  async (job) => {
    console.log(`Processing job ${job.id} of type ${job.name}`);

    const result = `Result data from task performed for ${job.name} with ID ${job.id}`;

    return webhooksQueue.add(
      job.name,
      { userId: job.data.userId, result },
      {
        attempts: config.maxAttempts,
        backoff: { type: "exponential", delay: config.backoffDelay },
      }
    );
  },
  { connection: config.connection }
);
task-worker.ts

Since this is just a tutorial we only print a console.log, but  you could perform any long lived operation that you would need to do.

We finalize the job by adding the result to a webhook queue  keeping the userId so that we can notify the correct user's webhook later. Also note that since we are returning the promise from the call to "webhooksQueue.add" it implies that, if and only if this job completes successfully, the job to the webhooks queue would have been added correctly. The implication for this is robustness: if the tasks completes the user will get notified.

Next we implement the worker for the webhook queue:

import { Worker, Queue } from "bullmq";
import got from "got";
import config from "./config";
import { MailJob } from "./mail-job.interface";
import { getUserById } from "./users";

const mailQueue = new Queue<MailJob>(config.mailQueueName, {
  connection: config.connection,
});

export const webhooksWorker = new Worker<{ userId: string; result: string }>(
  config.webhooksQueueName,
  async (job) => {
    const { userId, result } = job.data;
    const user = await getUserById(userId);

    const maxWebhookAttempts = config.maxAttempts - config.maxAttemptsForEmail;

    if (job.attemptsMade < maxWebhookAttempts) {
      console.log(
        `Calling webhook for "${result}", attempt ${job.attemptsMade + 1} of ${maxWebhookAttempts}`
      );
      return got.post(user.webhook, { json: { result } });
    } else {
      console.log(
        `Giving up, lets mail user about webhook not working for "${result}"`
      );
      // Send an email to the user about failing webhooks.
      return mailQueue.add("webhook-failure", {
        mailOpts: {
          from: "[email protected]",
          subject: "Your Webhook is failing",
          text: `We tried to send a notification to your webhook ${user.webhook} ${maxWebhookAttempts} times and it is failing.`,
          to: user.email,
        },
      });
    }
  },
  {
    connection: config.connection,
  }
);
webhook-worker.ts

So here we fetch the data from the given user, where we should get the webhook endpoint as well as an email address. First we want to know if we have already consumed all the attempts available for the webhook call. If we are within limits we just call the endpoint posting the result from the task. If we have exhausted the number of attempts then we shall proceed to send an email instead, in the hope that the user can fix the issue so that future calls will succeed. For the email sending we use our "mailbot", please check this other tutorial for how this is done.

This is basically all there is to it, so now lets test if everything works

Testing

In order to test our webhooks system we are going to need 3 different servers running in parallel:

1) the actual server with the endpoint to post tasks.

2) the client server that is going to receive the webhook calls.

3) the workers that are processing all the jobs.

If you have cloned the tutorial repo, and installed the dependencies with yarn, then you can just run these commands each in a terminal window:

> bullmq-webhook yarn start
yarn run v1.22.5
$ tsc && node dist/index.js
Start listening to port 9000

the above will start the main server.

Now lets start the other two services:

> bullmq-webhook yarn start:test
yarn run v1.22.5
$ tsc && node dist/test-server.js
Test server start listening to port 8080
> bullmq-webhook yarn start:workers
yarn run v1.22.5
$ tsc && node dist/workers
Started workers: webhooks and tasks

Ok, now we have all the required servers in order to test our webhooks, we can now use curl to post a task to our main server and see what happens:

 > curl -d '{"key1":"value1", "key2":"value2"}' -H "Content-Type: application/json" -X POST http://localhost:9000/users/1/tasks/transcode
 1%

The call was successful and we got the job id back. Now we can see what happened in the other services:

> bullmq-webhook yarn start
yarn run v1.22.5
$ tsc && node dist/index.js
Start listening to port 9000
Received task transcode to process...
bullmq-webhook yarn start:workers
yarn run v1.22.5
$ tsc && node dist/workers
Started workers: webhooks and tasks
Processing job 1 of type transcode
Calling webhook for "Result data from task performed for transcode with ID 1", attempt 1 of 5

The workers have picked up the task job, added the webhook job and also processed the job.

On the client server we can see that the webhook has been called successfully:

 bullmq-webhook yarn start:test
yarn run v1.22.5
$ tsc && node dist/test-server.js
Test server start listening to port 8080
Received notification with { result: 'Result data from task performed for transcode with ID 1' }

However, lets see what happens if our test server goes does down, so lets just quit the service and try again:

> bullmq-webhook yarn start
yarn run v1.22.5
$ tsc && node dist/index.js
Start listening to port 9000
Received task transcode to process...
Received task transcode to process...

The main server received a new task to process, however since the client server is down, our webhook worker is retrying instead:

> bullmq-webhook yarn start:workers
yarn run v1.22.5
$ tsc && node dist/workers
Started workers: webhooks and tasks
Processing job 1 of type transcode
Calling webhook for "Result data from task performed for transcode with ID 1", attempt 1 of 5
Calling webhook for "Result data from task performed for transcode with ID 2", attempt 1 of 5
Calling webhook for "Result data from task performed for transcode with ID 2", attempt 2 of 5
Calling webhook for "Result data from task performed for transcode with ID 2", attempt 3 of 5
Calling webhook for "Result data from task performed for transcode with ID 2", attempt 4 of 5
Calling webhook for "Result data from task performed for transcode with ID 2", attempt 5 of 5
Giving up, lets mail user about webhook not working for "Result data from task performed for transcode with ID 2"
Implementing a robust and scalable Webhooks/Notification system in NodeJS

We can also use Taskforce.sh to examine the queue and jobs, running the taskforce-connector we can easily see the local queues in the ui:

>  taskforce  -t mytaskforcetoken -n webhook-tutorial
Taskforce Connector v1.12.0 - (c) 2017-2020 Taskforce.sh Inc.
WebSocket: opening connection to Taskforce.sh
WebSocket: opened connection to Taskforce.sh

And now a new connection appears automatically:

Implementing a robust and scalable Webhooks/Notification system in NodeJS

We can see that 2 jobs completed in the webhooks queue:

Implementing a robust and scalable Webhooks/Notification system in NodeJS

However as we know, one did not managed to call the endpoint, but it still completed since it managed to add a new job to the email queue. If we open the job we can still see the reason why it failed, and the result of adding the job to the mailbot queue.

Implementing a robust and scalable Webhooks/Notification system in NodeJS

The emailbot queue has now the job, however since we do not have any workers processing emails it stays in wait status:

Implementing a robust and scalable Webhooks/Notification system in NodeJS

As you can see, we get quite good visibility of how the queues are performing using the UI, and if something goes wrong we can manually retry the failed jobs after correcting the underlying cause that made them fail.

And with this we reached the end of this tutorial, I hope you found it useful. Please stay tuned for more. I am always looking for ideas for future tutorials so if you have any please do not hesitate to contact me.

]]>
<![CDATA[Obliterate!]]>I never though I would even create a piece of code that "obliterates" anything, but here it is anyway!

I just released a new method for Bull/BullMQ to "obliterate" your queues. The name is chosen so that it is used carefully, since this will indeed

]]>
https://blog.taskforce.sh/obliterate/60536b13fcaf6f003b3efa9cThu, 18 Mar 2021 15:19:21 GMT

I never though I would even create a piece of code that "obliterates" anything, but here it is anyway!

I just released a new method for Bull/BullMQ to "obliterate" your queues. The name is chosen so that it is used carefully, since this will indeed completely destroy your queue leaving no traces of its existence.

await myQueue.obliterate();

The method is unfortunately not atomic since removing all the keys part of a key can be a lengthy process depending on how many jobs you have in your queues.

As soon as you call the method the queue will globally paused, by default it will fail if there are currently any active jobs, this is to prevent possible side effects by workers that suddenly will find that the job they are working on no longer exists.

The method will remove in chunks of maximum 5000 keys, but since it is not atomic, side-effects could leave the queue half deleted. For example, if the queue is unpaused  by another script while this method is running, it will fail with an exception.

Anyway, you asked for this so here it is, but use with care!.

]]>
<![CDATA[Implementing a mail microservice in NodeJS with BullMQ (3/3)]]>https://blog.taskforce.sh/implementing-a-mail-microservice-in-nodejs-with-bullmq-part-3/603a1b53b2211e00392d2b91Fri, 05 Mar 2021 13:56:21 GMTPDFs as Email Attachments

In this last chapter we are going to add a more advance feature to our mailing service, namely the option to send HTML content as PDFs attachments. This will show how using BullMQ it is trivial to offload and distribute more heavy tasks to a fleet of workers.

The code for this tutorial can be found here: https://github.com/taskforcesh/bullmq-mailbot on the part3 branch. And there is also a plain JS version of the tutorial here: https://github.com/igolskyi/bullmq-mailbot-js.

We will use puppeteer in order to render the HTML into a PDF. Puppeteer is an excellent choice which just works and provides very nice results. You will notice that sending the emails now is much more time consuming, our workers need finally to do some work!. However since we can have as many workers as we want distributed in different machines, we should have no problem in scaling a solution like this for handling any workload we throw at it.

We will start by enhancing our "Mail" interface (now renamed to MailJob) so that we can also specify attachments:

export interface MailJob {
  mailOpts: {
    from: string;
    to: string;
    subject: string;
    text?: string;
    html?: string;
    generateTextFromHTML?: boolean;
  };
  htmlAttachments?: {
    name: string;
    html: string;
  }[];
}
mail-job.ts

Attachments are optional and we want to allow any number of them so we accept an array.

Since we are using "puppeteer", we need to install it as a dependency:

yarn add puppeteer

Next we need to enhance our processor so that we generate the PDFs, one for every item in the htmlAttachments array:

import puppeteer from "puppeteer";

export default async (job: Job<MailJob>) => {
  let attachments;
  if (job.data.htmlAttachments) {
    attachments = await Promise.all(
      job.data.htmlAttachments.map(async (attachment) => {
        const browser = await puppeteer.launch({
          headless: true,
          args: ["--no-sandbox", "--disable-setuid-sandbox"],
        });
        const page = await browser.newPage();

        await page.setContent(attachment.html);

        const pdf = await page.pdf({ format: "a4", printBackground: true });

        await browser.close();

        return { filename: `${attachment.name}.pdf`, content: pdf };
      })
    );
  }

  return transporter.sendMail({ ...job.data.mailOpts, attachments });
};
mail.processor.ts

Now we use the "async" keyword for our processor to simplify the code using "await" for all the asynchronous operations. Note that we launch a new browser for every job, an optimization may be to move this outside of the processor itself, however  I am not sure this is completely safe if you have concurrency enabled in your workers. I will leave it as an exercise to the reader to determine if this is feasible or not :).

Regarding the arguments sent to puppeteer:

"--no-sandbox", "--disable-setuid-sandbox"

you must now that these are unsafe arguments if the HTML you send to puppeteer is not trusted, if this is not good enough for your use case please read here on how to fix it: https://github.com/puppeteer/puppeteer/blob/main/docs/troubleshooting.md#setting-up-chrome-linux-sandbox

With the processor ready we can now write a simple test to check that everything works:

client.enqueue("invoice", {
  mailOpts: {
    from: "[email protected]",
    to: args[0],
    subject: "Your service invoice",
    text: "Please see the attached invoice\n Kind Regards \n Economy team\n",
  },
  htmlAttachments: [
    {
      name: "invoice-0001",
      html: "<html><body><div>This is just a dummy Invoice</div></body></html>",
    },
  ],
});
send-attachment.ts

This will result in an email that includes a PDF with the HTML as content.

And with this post we end third and last part of these tutorial series!

]]>
<![CDATA[Implementing a mail microservice in NodeJS with BullMQ (2/3)]]>In this second post we are going to show you how to add rate limiting, retries after failure and delay jobs so that emails are sent in a future point in time.

The code for this tutorial is available at https://github.com/taskforcesh/bullmq-mailbot branch part2. And there is

]]>
https://blog.taskforce.sh/implementing-a-mail-microservice-in-nodejs-with-bullmq-part-2/603970b3cee0e70039682192Mon, 01 Mar 2021 10:04:50 GMTIn this second post we are going to show you how to add rate limiting, retries after failure and delay jobs so that emails are sent in a future point in time.

The code for this tutorial is available at https://github.com/taskforcesh/bullmq-mailbot branch part2. And there is also a plain JS version of the tutorial here: https://github.com/igolskyi/bullmq-mailbot-js.

If you haven't read the first post in this series you should start doing that https://blog.taskforce.sh/implementing-mail-microservice-with-bullmq/

Rate limiting

Most services implement som kind of rate limit that you need to honor so that your calls are not restricted or in some cases to avoid being banned. With BullMQ you can simply define the maximum rate for processing your jobs independently on how many parallel workers you have running.

We build on the previous code by adding a rate limiter to the worker instance:

export const worker = new Worker(
  config.queueName,
  __dirname + "/mail.proccessor.js",
  {
    connection: config.connection,
    concurrency: config.concurrency,
    limiter: config.limiter,
  }
);
mail.worker.ts

We factor out the rate limiter to the config object:

export default {
  queueName: process.env.QUEUE_NAME || "mailbot",
  concurrency: parseInt(process.env.QUEUE_CONCURRENCY || "1"),
  connection: {
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT || "6379"),
  },
  region: process.env.AWS_DEFAULT_REGION || "us-west-2",
  limiter: {
    max: parseInt(process.env.MAX_LIMIT || "1"),
    duration: parseIn(process.env.DURATION_LIMIT || "1000")
  }
};
config.ts

Note that the limiter has 2 options, a max value which is the max number of jobs, and a duration in milliseconds. So this means that with the default settings provided above the queue will run max 1 job every second.

We are not quite ready yet, we also need a special class called QueueScheduler. This class takes care of moving delayed jobs back to the wait status when the time is right. Since the rate limiter will delay the jobs that become limited, we need to have this instance running or the jobs will never be processed at all. We just instantiate it in the same file as where we instantiate the worker:

export const scheduler = new QueueScheduler(config.queueName, {
  connection: config.connection,
});
mail.worker.ts

You can now start the workers with:

$ MAX_LIMIT=1 DURATION_LIMIT=2000 yarn start

And they will now only process 1 job every 2 seconds. To test it you can run:

$ node dist/tests/send-limited.js [email protected]

Retry failed emails

Our processor function is very simple, just a call to transporter.send, however if this call fails unexpectedly the email will not be sent. Instead we want to perform some automatic retries before we give up on that send operation. BullMQ has a flexible retry mechanism that is configured with 2 options, the max amount of times to retry, and which backoff function to use. For this tutorial we will use the exponential back-off which is a good backoff function for most cases.

One important difference now is that the retry options are not configured on the workers but when adding jobs to the queue, i.e. it is decided by the producer of the jobs, so this allows us to have different retry mechanisms for every job if we wish so.

When writing a module like the one for this tutorial, you would probably will divide it into two modules, one for the producer of jobs (adds jobs to the queue) and another for the consumer of the jobs (processes the jobs). For simplicity we will just create a helper class and keep it in the same repository:

import { Queue, QueueOpts, RetryOpts } from "bullmq";
import { Mail } from "./mail.interface";
import config from "./config";

export class MailbotClient {
  private queue: Queue;

  constructor(opts: QueueOpts) {
    this.queue = new Queue<Mail>(config.queueName, opts);
  }

  async enqueue(jobName: string, mail: Mail, retry?: RetryOpts) {
    await this.queue.add(jobName, mail);

    console.log(`Enqueued an email sending to ${mail.to}`);
  }

  close() {
    return this.queue.close();
  }
}
mail.client.ts

Of course we could use the Queue class exported by BullMQ directly, but wrapping it in our own class helps in adding some extra type safety and maybe some app specific defaults.

We can now test adding jobs with retry functionality. For example let's retry a maximum of 5 times with an exponential backoff starting with 3 seconds delay in the first retry:

import { MailbotClient } from "../mail.client";
import config from "../config";

const args = process.argv.slice(2);

const client = new MailbotClient({
  connection: config.connection,
});

client.enqueue(
  "welcome-mail",
  {
    from: "[email protected]",
    to: args[0],
    subject: "Welcome to BullMQ",
    text: "This is a welcome email!",
  },
  { attempts: 5, backoff: { type: "exponential", delay: 3000 } }
);
send-retry.ts

If a job fails more than 5 times it will not be automatically retried anymore, however it will be kept in the "failed" status, so it can be examined and/or retried manually in the future when the cause for the failure has been resolved.

Delayed emails

It is quite common that we want to send an email after some time has passed since a user some operation. For example, maybe we want to send a follow up to a new user one week after the first login. This is very easy to accomplish with our "mailbot" module, we will just enqueue a new email with a one week delay:

import { MailbotClient } from "../mail.client";
import config from "../config";

const args = process.argv.slice(2);

const client = new MailbotClient({
  connection: config.connection,
});

const ONE_MINUTE = 1000 * 60;
const ONE_HOUR = 60 * ONE_MINUTE;
const ONE__DAY = 24 * ONE_HOUR;
const ONE_WEEK = 7 * ONE__DAY;

client.enqueue(
  "We are here to help!",
  {
    from: "[email protected]",
    to: args[0],
    subject: "Your first week with BullMq",
    text: "This is an engagement email!",
  },
  { delay: ONE_WEEK },
);
send-delay.ts

If you instead want to delay the job to a specific point in time just take the difference between now and desired time and use that as the delay:

const delay = (new Date("2023-03-25T12:00:00Z")) - Date.now();

Note that in the example above we did not specify any retry options, so in case of failure that particular email will not be retried. Since the retry option probably will be the same for all jobs, we can move it as a "defaultJobOption", so that all jobs will retry but we are also allowed to override that option if we wish, so back to our MailClient class:

  constructor(opts: QueueOptions) {
    this.queue = new Queue<Mail>(config.queueName, {
      defaultJobOptions: {
        attempts: 5,
        backoff: { type: "exponential", delay: 3000 },
      },
      ...opts,
    });
  }
mail.client.ts

This is all for this post. In the next post we will show how to add .PDF attachments to the emails: https://blog.taskforce.sh/implementing-a-mail-microservice-in-nodejs-with-bullmq-part-3/

Follow me on Twitter to get notified when it's out!.

]]>
<![CDATA[BullMQ Redis™ Module]]>

I am excited to announce that I started the development of a Redis Module that implements BullMQ in Zig. You can access it here: https://github.com/taskforcesh/bullmq-redis

I had this idea since Redis modules where announced back in version 5.0, as it provides the means to

]]>
https://blog.taskforce.sh/bullmq-redis-module/6038ffa0dd60d20039dfd4c2Sun, 28 Feb 2021 10:40:16 GMT

I am excited to announce that I started the development of a Redis Module that implements BullMQ in Zig. You can access it here: https://github.com/taskforcesh/bullmq-redis

I had this idea since Redis modules where announced back in version 5.0, as it provides the means to resolve several limitations in BullMQ that are either very difficult or just impossible to resolve as a layer on top of Redis, and it opens the door for interoperability between other languages to use BullMQ, instead of being exclusively available for NodeJS.

In this post I would like to present some of the improvements of the module compared to the current BullMQ implementation. They may not be so many, but for me it is a big breakthrough and these are just the tip of the iceberg of what I am planing for future releases.

You may also wonder why Zig instead of plain C which is the language Redis is actually written in. The reason is pretty simple, I think Zig has enough improvements over C to make the coding more pleasing, simplify memory management and ultimately help in making a more robust software, plus I thought it was a good excuse to learn something new after all these last years working in the NodeJS ecosystem.

So let's review some of the advantages of using a Redis module instead of Lua scripts as in the current NodeJS implementation.

Blocking Commands

Probably the most important feature that opens a full set of improvements is the fact that you can write blocking commands as complex as you need. When using Redis commands you are limited to a handful of blocking commands, and Lua scripts do not support them. For reliable queues BRPOPLPUSH is a useful command and the one we use in BullMQ in order to implement robust queues. However this command is too simple for the kind of features that are provided by BullMQ, for example, as soon as we start to process a job we need to lock it so that the job is not moved back to wait as stalled. Solving this in Bull requires extra mechanisms in order to avoid hazard conditions. With a module this becomes trivial, we just lock before returning from the blocking command.

Locking may not be so exciting, but let me tell you about delayed jobs. In BullMQ (as well as in older Bull), we need to have a separate connection to Redis that listens to a special delay event. Based on these events we schedule a call to a lua script that moves all delayed jobs that have “expired” back to the wait list so that they are processed by the workers. This logic for handling delay events requires a special mechanism to move jobs from the delay set back to the wait list. This works with the blocking command that blocks while there are no jobs in the wait list, but as soon as a delayed job is moved back to the wait list, the blocking command will complete moving the job to the active list and processing can start. This is feature is currently accomplished with the special class “QueueScheduler”.

With the new Redis Module we can get rid of the QueueScheduler class all together. Now our blocking command “QNEXT” will return the next job to process in the queue, delayed or not, and block until such a job is available.

In BullMQ for node, when it is time to process a delayed job, the job is moved to the head/front of the wait list so that it is executed as soon as possible. But this mechanism is not perfect, if you have many delayed jobs close together the oldest ones will jump the queue, which makes it quite impossible to give any guarantees about job order. Since QNEXT will always return the next job, order is maintained independently of how many delayed jobs you have or how close they are to each other.

What about working with batches of jobs? Here we have the same problem, since the blocking command in redis can only move one element per call it is not really possible to implement batch support, at least not optimally. With the new module you can just call “QNEXT” with a max batch size and it will return a complete batch of jobs in one call.

Finally, another cool improvement is the code for handling stalled jobs. Like in BullMQ, if a worker is working on a job for longer duration that the lock, the worker must either re-new the lock or the job will be moved back to the wait list. This process is implemented in bull using a polling mechanism, which is also implemented in the QueueScheduler class. As mentioned before this class is not needed anymore, now the expiration of the locks is detected automatically using Redis own keys notification system and the jobs are moved back to the wait list when needed. This is done inside the module, no need for external polling anymore.

Performance

The great thing about implementing the queue as a module is that we can maximize the performance. Since all code is compiled with the highly efficient Zig compiler there is not the extra overhead to pay for executing lua scripts, and the code is basically as fast as if the queue was implemented natively in the Redis codebase. In my initial testings I could process 50k jobs/second on my i7 laptop with one single client.

Interoperability

The module is being implemented to be fully compatible with current BullMQ, so independently of using the NodeJS library or the commands provided by the module you will get the same results. This opens of course the door for using BullMQ from any language/platform as long as they support a Redis client. In time I expect wrappers built in the most popular languages that will make the use of the queue more pleasant than just calling the commands manually.

License

I chose the permissive BSD license for this module, so it is free to load on any Redis instances independently of where they are hosted (as long as they support modules).

Future work

Currently my efforts are directed to make the module compatible with all the features that exist in the NodeJS version. For this to be completed, the major thing missing is the rate limiter, which is going to work much more reliable and performant than current NodeJS version. After that I aim to release version 1.0 and make the module a drop-in replacement for users of the NodeJS library, making your current code just faster and better with minimum effort.

After 1.0 I have planned a lot of really powerful features that are only possible to implement as a module, and that will allow you to solve really complex use cases, but this I will leave for another post.

You are free to start testing the module, the releases are built automatically as new features are added so just go to the releases page in github and grab your copy.

]]>