Build a Distributed Task Scheduler Using RabbitMQ and Redis by Sridhar Subramaniam | October, 2022

Delay job execution using RabbitMQ deadLetterExchange

scheduler

Are you interested in creating a task scheduler using RabbitMQ?

You may wonder why one should build a task scheduler using RabbitMQ, given that it is a message broker and there is no reason to behave as a scheduler.

Well, a year ago, I was working on a hobby project where I wanted to use RabbitMQ and I needed to execute a piece of code after a certain amount of time. A delayed task execution We can call it.

This is where I got curious, wondering if it’s possible to delay message execution. In other words, suppose a message is sent to the queue x time and i want to consume it x+y time, where y Configurable per message/task.

I was able to achieve this by using deadLetterExchange, This is a RabbitMQ feature. Using this RabbitMQ feature as the main argument, I was able to specify how much time a consumer task or message should spend before it is consumed.
But it is not yet a task scheduler, it is just a delayed task execution. A task scheduler must be enabled schedule And cancel Work. Well, this is where Redis comes into play to make it a task scheduler.

I hope the above material helps in setting the context here. Let’s dive

As part of this post, we are not going to discuss the introduction of RabbitMQ and Redis. It is assumed that you have at least a basic understanding of RabbitMQ and Redis. The sample project discussed here is written on NodeJS. Knowledge of NodeJS would be helpful, but it is not required.

RabbitMQ will send messages to the corresponding queue as soon as they become available. There is no direct way we can tell RabbitMQ to put the message somewhere and send it to the expected queue x Time.
Let us see how we can achieve this using deadLetterExchange ,

dead letter exchange

Messages from a queue can be dead-lettered (ie, republished in an exchange) when any of the following events occur:

  • The message is accepted negatively by the consumer.
  • The TTL per message causes the message to expire.
  • The message was discarded because its queue exceeded a length limit.

We are going to use the second option (ie TTL per message) to trigger deadLetterExchange, Let’s look at it in detail.

TTL stands for Time Teahey TookI

let’s go set one TTL for each of our messages and send them to a queue. let’s call it a intermediate queue, when declaring a intermediate queue, we set an option that when messages in this queue expire, they should be delivered to another specified exchange. let’s call it final exchange,

message TTL Time That’s the time we want to delay. final exchange is where our consumers will consume messages from a specified queue.

so once the message TTL When finished, our consumer receives the message.

Now the question is how do we let the message expire? this is easy. we are sending a message intermediate queue Correct. Let’s not let anyone from here consume it. if the message in intermediate queue Specified . not consumed till TTLso they are dead-lettered and the message will be delivered to our consumer who is happily consuming the messages from the queue of final exchange,

by setting it TTL delay as we need, and to send the message a intermediate queue where there is no consumer, we can get a delayed task execution On RabbitMQ.

Below is the UML diagram to achieve delayed task execution using RabbitMQ dead letter exchange,

RabbitMQ delayed task execution using dead letter exchange

But the delayed task execution is not the task scheduler, right. Let us see how we can create a task scheduler that will provide the option to schedule and cancel tasks using RabbitMQ and Redis.

role of redis

Redis is used to track the validity of a scheduled task. Using this, we can set any scheduled task as invalid if necessary, and when the time comes to execute the task, we can check whether the task is valid to be executed.

Since we will be using Redis only for storing job validity information, it can be replaced with any data storage device.

architecture

task scheduler architecture

The list entities we will use to create this task scheduler are as follows.

  • Task → Task Definition.
  • Producer → Base class provides the option to send delayed messages.
  • Consumer → Base class for consuming messages from the final queue.
  • TaskRepository → Repository class to check task validity status.
  • Task Scheduler → Class provides the option to schedule and invalidate tasks.
  • Task Consumer → Class to consume tasks from specified task type Queue.

Work

class Task {
public taskId: string;
public taskType: string; // queue name
public ttlInSeconds: number;
public payload: string;
}
  • taskId → Unique ID to identify this task.
  • taskType → The type of task. It is used as a queue name so that both the producer and consumer can infer the name from the task type.
  • ttlInSeconds → Time delay after which we want this task to execute.
  • payload → a JSON string describing the task; taskType Will come in handy to parse this json according to different task types.

taskType plays an important role. It acts as a queue name, so when a task scheduler wants to schedule a task, it can use taskType as a queue name, and anyone who wants to consume a particular task type can consume it from that queue.

The following are some examples of task types: send-sms, send-offer-notification, send-greetings e.t.c

the creator

manufacturer.ts

As we discussed earlier, we need two queues to achieve this delayed task execution.

  1. Intermediate queue → Holds the task until completion; Forward the task after completion final exchange,
  2. Last Queue → It is associated with final exchange, Our task consumer will listen to this queue.

Both the queue and its associated exchange name are formed from taskType To bring about any kind of contract between the scheduler and the consumer.

From the above code, we can see that the intermediate queue is associated with the last exchange a dead letter exchange,

this.assertQueue(
INTERMEDIATE_QUEUE, { deadLetterExchange: FINAL_EXCHANGE }
)

Once the task completion time has elapsed, the task will be forwarded final exchangeAnd this final exchange will route the task final queue,

sendToQueue(INTERMEDIATE_QUEUE, Buffer.from(data), {
expiration: delayInMills,
});

consumer

A consumer will consume messages from a specific queue (ie taskType,

work store

It provides options to create, delete and validate tasks.

task Scheduler

Task Scheduler Will create a task entry in Redis when scheduling a task.
When we don’t want the task to be executed, the task can be deleted to mark it as invalid. scheduleTask return a taskId, This can be used to invalidate a task if necessary.

work consumer

Task Consumer will listen for a specific taskType And if the task is still valid then execute it.
Even after that we invalidate the task by deleting it. It will be consumed at the time specified by the consumer. This will happen Task Consumer Responsibility to check the validity of the work before executing the work isTaskValid function from Task Repository,

The demo consists of two consumers and two producers.
Treat consumer 1 and 2 as email services, each microservice responsible for sending greetings And offer notification for customers.
greeting And offer-notification two are different task types which will be produced by Greeter-Service And Offer-Notification-Service manufacturers respectively.

Demo shows that Email-Service-1 (ie, consumer 1) will process both greet And offer-notification function, where as Email-Service-2 (ie, consumer 2) will only process offer-notification Work.

Due to brevity, task invalidation is not shown on the demo, but is explained in the project readme file.

I tried my best to animate and highlight the log messages to indicate the event flow in between scheduler And consumer, The log message includes a timestamp, which can be used to confirm whether the consumer is receiving the task notification at the expected scheduled time. I have highlighted timestamp on both scheduler And this consumer whenever consumer receives task,

demo

Leave a Comment