Error Handling in Kafka Consumer
In my previous blog, I have explained how to tackle unexpected situations in Kafka Producer. Please refer : Error Handling in Kafka Producer. This blog explains how to design error handling in Kafka Consumer in order to make a consistent streaming integration
Recap for Kafka Consumer
Kafka consumer is an API that allow to fetch data from Kafka. Consumer reads data from the Kafka topic in order to process the message and push it to sink system. Please see how consumer reads data from the messaging layer;
As you see, there are three components that are used in the flow;
- Messaging Layer ( Kafka Broker)
- Kafka Consumer
- Sink system
The following code represents how to consume data from Kafka
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Ref : KafkaConsumer
Let’s deep dive how to tackle unexpected situations in Kafka Consumer
ERROR HANDLING IN CONSUMER
Error handling is very important topic when it comes to play data accuracy. Let’s list down all the problem that you can face while consuming your data from Kafka broker
- Invalid (bad) data is received from Kafka topic
- Kafka broker is not available while consuming your data
- Kafka Consumer could be interrupted
- Kafka Topic couldn’t be found
- Any exception can be happened during the data processing
- Sink system is not available
For every item, I will deep dive to the problem and explain how you can design your implementation
Invalid (bad) data is received from Kafka topic
The data is coming with wrong format/content what consumer is expected. In that case, the recommendation would be to skip this data, but it would be better to log the data to the specific file to be reported afterwards
Consumer Interrupted
The consumer allows to fetch data from the Kafka broker. If any issue happens, the data will be loosed which is in consumer memory. If your consumer skip the record and won’t re-send same data to sink system, there would be a data inconsistency
In order to make sure what part of data is sent to Sink(DB), you can use Kafka consumer “enable.auto.commit” feature
Kafka provides commit API to control what offsets are processed. When you set “enable.auto.commit” field to false, Kafka Consumer commits the offset after successfully processed the block
Steps in the workflow
- Number 1: Data is fetched from Kafka topic by the consumer
- Number 2 : Data is processed and send to database to be inserted
- Number 3 : Data is successfully inserted and seems the flow is working as expected
- Number 4 : Consumer commit the offset and fetch another event block
The following code disables auto.commit feature and commit the offset manually
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
Ref : KafkaConsumer (Manual Offset Control)
Kafka broker is not available while consuming your data
Consumer is trying to consume data from Kafka topic but the connection to broker is not established because broker is not available. In that case, consumer should retry to consume data within some time intervals.
Another important point is that your monitoring system should send an alert in order to make Kafka broker available. Hence, DevOps team can restart the Kafka broker or find the real issue to keep it available
Kafka Topic couldn’t be found
Kafka records are stored in the topics. If the topic is not found, consumer gets “Unknown topic or partition” error. The use case is very similar with the previous item. Hence you can take a similar action in terms of retrying and sending an alert.
Sink system is not available
When consumer fetch data from Kafka, it is processed based on the logic and output is sent to Sink system. Sink could be every system that accepts the data like database, API, IOT, queue etc. What happens if the consumer couldn’t access to the Sink system.
Ideally, the consumer should pause fetching since there is no value processing data and try to access the Sink system. Once the sink system is available, the consumer can resume fetching and process the data to be sent to the Sink system
Conclusion
As you see, you see different issues in the production environment which reduce the data quality. If the data is critical, it can also cause lose of money or other legal issues. Hence, depends on the data importance, you have to create proper streaming infrastructure with error handling methods. Another important point is to note that some of analytical data is not important and omittable without process it. As a starting point, you should try to understand your data!