Non-blocking Failed Message Management in Kafka with Retrievable Topic in Spring Boot Application
Sometimes, when we process messages from Kafka topics, errors may occur. For example, consumer services or other infrastructure may be down. We want to make sure we don’t lose any data and try to handle failed messages.
The default Kafka failover management behavior is to retry to process messages asynchronously. This is not beneficial because some fatal errors cannot be fixed, and we should not reprocess them.
Instead of relying on the default implementation, we can use
RetryableTopic Annotations to configure a more robust strategy for handling failed messages. For example, we can send the failed message to the dead letter queue, limit the number of retries, define timeouts, exclude fatal exception reprocessing, etc.
In this tutorial, I will show you how to apply
RetryableTopic In a few easy steps in a Spring Boot application.
let’s get started!
First, let’s understand the difference between blocking and non-blocking message retries. Let’s say you have configured a
@Bean To try to reprocess failed messages multiple times in your Kafka configuration. For example, consider extracting the following code:
The consumer tries to continuously reprocess the failed messages in real-time. The main topic will be blocked. here we have
FixedBackOff Strategy with 3 attempts with a recovery interval of 5 seconds.
If all retries fail, the message is sent to the Dead Letter Queue (DLT). Until then, all other incoming messages will be blocked until the previous ones are processed.
This can be dangerous, especially if the retry interval is very long.
Here’s how we can rectify the situation using
- The main topic is not blocked, and other messages can be processed.
- Failed messages are sent to retry subjects with a back-off timestamp.
- If the failed message cannot be processed, it is passed to the next retry topic.
- If processing fails for all retry topics, the message is forwarded to the DLT.
- DLT messages can be retried by sending them back to the first retry topic.
We HTTP. will create a message using
Get Request and consume messages using Kafka. kafka listener will use
I have created a skeleton Spring Boot project for this demo via https://start.spring.io/.
I am using Maven as a build tool. We need the following dependencies
spring-boot-starter-webDependencies enable the creation of a web application.
spring-kafkaDependency is used for Kafka operations.
lombokDependencies eliminate the use of boilerplate code.
To run Kafka locally, let’s say a . create
- We have two services –
Configure kafka properties
- The name of the subject we will hear is called
- kafka consumer group name
- we have configured
KafkaConsumerProperties for serialization and deserialization.
- we use
29092The port we defined for the Kafka Bootstrap server
Add a Kafka Listener
Let’s create a Kafka listener:
- we use
@ ComponentAnnotation to register bean in Spring Boot app.
- we have a way
handleMessage()where we define our kafka listener and use it
- When all retry attempts are exhausted, the message is forwarded
handleDlt()method, indicated by
@DltHandlerAnnotation. The default name of the DLT topic will be
- note that
throw new RuntimeException("Test exception")The part is needed for testing purposes.
here are some
@RetryableTopic's Required properties:
attemptsThe property defines how many retries we want to do. In this case, we would have 4 retries plus 1 for the original subject.
- The app will auto-create topics suffixed to the name of the index value. For example,
my-topic-retry-1, The naming strategy is defined by
backoffThe property instructs the app to retry failed messages in 1 second. we have a multiplier of
2.0, This means that the second attempt will be after 2 seconds, the third after 4 seconds, and so on.
excludeThe property lets us configure which exceptions we don’t want to be retried. For example, it is a good practice to ignore fatal exceptions, like
DeserializationException, For a complete list of unrecoverable failures, check the documentation.
RetryableTopic Offers other powerful options. If you want to see them, go to the documentation.
Add a Rest Controller
For simplicity, let’s say a . add
RestController To generate a message on a topic:
produceMessage()The method will send a message to our Kafka topic.
kafkaTemplateis automatic by default. Of course, if we need a custom implementation, we can configure our own bean.
build main app
The main app looks like this:
test the application
Now, it’s test time!
- Start by running the local infrastructure:
2. Run main class –
3. Send a test message through the controller. For example:
You should see in the console of the app that the listener receives the message. Since we a. throw
RuntimeExceptionThe message will be forwarded to retries subjects and finally to DLT.
The log looks like this:
Excellent! Check the timestamp on the screenshot to see that the retry attempt happens as configured by
Additionally, DLT works as expected:
In this tutorial, you learned how to use
RetryableTopic Annotation to implement non-blocking failed message handling. We have seen some example configuration properties.
This annotation gives us a robust solution for error handling. However, keep in mind that currently, it comes with some limitations as per the official docs:
By using this strategy you lose Kafka’s ordering guarantee for that topic.
At this time this functionality does not support class level
You can find the link to the complete source code of this demo below Reference section below.
I hope you learned something new from this post. If you liked this tutorial, you may also like my other Kafka-related articles:
Thanks for reading, and happy coding!