Non-blocking Failed Message Handling in Kafka

Non-blocking Failed Message Management in Kafka with Retrievable Topic in Spring Boot Application

a macbook on a table
Unsplash. Photo by Clement Hellardot

Sometimes, when we process messages from Kafka topics, errors may occur. For example, consumer services or other infrastructure may be down. We want to make sure we don’t lose any data and try to handle failed messages.

The default Kafka failover management behavior is to retry to process messages asynchronously. This is not beneficial because some fatal errors cannot be fixed, and we should not reprocess them.

Instead of relying on the default implementation, we can use RetryableTopic Annotations to configure a more robust strategy for handling failed messages. For example, we can send the failed message to the dead letter queue, limit the number of retries, define timeouts, exclude fatal exception reprocessing, etc.

In this tutorial, I will show you how to apply RetryableTopic In a few easy steps in a Spring Boot application.

let’s get started!

First, let’s understand the difference between blocking and non-blocking message retries. Let’s say you have configured a @Bean To try to reprocess failed messages multiple times in your Kafka configuration. For example, consider extracting the following code:

FixedBackoff Strategy for Kafka Listener Factory Bean

The consumer tries to continuously reprocess the failed messages in real-time. The main topic will be blocked. here we have FixedBackOff Strategy with 3 attempts with a recovery interval of 5 seconds.

If all retries fail, the message is sent to the Dead Letter Queue (DLT). Until then, all other incoming messages will be blocked until the previous ones are processed.

This can be dangerous, especially if the retry interval is very long.

Here’s how we can rectify the situation using RetryableTopic,

  • The main topic is not blocked, and other messages can be processed.
  • Failed messages are sent to retry subjects with a back-off timestamp.
  • If the failed message cannot be processed, it is passed to the next retry topic.
  • If processing fails for all retry topics, the message is forwarded to the DLT.
  • DLT messages can be retried by sending them back to the first retry topic.

We HTTP. will create a message using Get Request and consume messages using Kafka. kafka listener will use RetryableTopic Annotation.

I have created a skeleton Spring Boot project for this demo via https://start.spring.io/.

add dependency

I am using Maven as a build tool. We need the following dependencies pom.xml,

project dependencies
  • spring-boot-starter-web Dependencies enable the creation of a web application.
  • spring-kafka Dependency is used for Kafka operations.
  • lombok Dependencies eliminate the use of boilerplate code.

create infrastructure

To run Kafka locally, let’s say a . create docker-compose.yml file:

docker composition for kafka
  • We have two services – zookeeperneeded by kafka,

Configure kafka properties

let’s configure application.yml,

properties of application
  • The name of the subject we will hear is called my-topic,
  • kafka consumer group name my-group,
  • we have configured KafkaProducer And KafkaConsumer Properties for serialization and deserialization.
  • we use 29092 The port we defined for the Kafka Bootstrap server docker-compose.yml file.

Add a Kafka Listener

Let’s create a Kafka listener:

  • we use @ Component Annotation to register bean in Spring Boot app.
  • we have a way handleMessage() where we define our kafka listener and use it@RetryableTopic,
  • When all retry attempts are exhausted, the message is forwarded handleDlt() method, indicated by @DltHandler Annotation. The default name of the DLT topic will be my-topic-dlt,
  • note that throw new RuntimeException("Test exception") The part is needed for testing purposes.

here are some @RetryableTopic's Required properties:

  • attempts The property defines how many retries we want to do. In this case, we would have 4 retries plus 1 for the original subject.
  • The app will auto-create topics suffixed to the name of the index value. For example, my-topic-retry-1, The naming strategy is defined by topicSuffixingStrategy Property.
  • backoff The property instructs the app to retry failed messages in 1 second. we have a multiplier of 2.0, This means that the second attempt will be after 2 seconds, the third after 4 seconds, and so on.
  • exclude The property lets us configure which exceptions we don’t want to be retried. For example, it is a good practice to ignore fatal exceptions, like DeserializationException, For a complete list of unrecoverable failures, check the documentation.

RetryableTopic Offers other powerful options. If you want to see them, go to the documentation.

Add a Rest Controller

For simplicity, let’s say a . add RestController To generate a message on a topic:

RestController to generate message on Kafka topic
  • produceMessage() The method will send a message to our Kafka topic.
  • kafkaTemplate is automatic by default. Of course, if we need a custom implementation, we can configure our own bean.

build main app

The main app looks like this:

main application

test the application

Now, it’s test time!

  1. Start by running the local infrastructure:
docker-compose up

2. Run main class – KafkaErrorHandlingApplication.java,

3. Send a test message through the controller. For example:

GET http://localhost:8090/produce/hello

You should see in the console of the app that the listener receives the message. Since we a. throw RuntimeExceptionThe message will be forwarded to retries subjects and finally to DLT.

The log looks like this:

Spring Boot app console log

Excellent! Check the timestamp on the screenshot to see that the retry attempt happens as configured by BackOff Policy.

Additionally, DLT works as expected:

Message handled by DLT

In this tutorial, you learned how to use RetryableTopic Annotation to implement non-blocking failed message handling. We have seen some example configuration properties.

This annotation gives us a robust solution for error handling. However, keep in mind that currently, it comes with some limitations as per the official docs:

By using this strategy you lose Kafka’s ordering guarantee for that topic.

At this time this functionality does not support class level @KafkaListener Annotation.

You can find the link to the complete source code of this demo below Reference section below.

I hope you learned something new from this post. If you liked this tutorial, you may also like my other Kafka-related articles:

Thanks for reading, and happy coding!

Leave a Comment