Error Handling in Kafka Consumer

Serkan SAKINMAZ
5 min readOct 21, 2021

In my previous blog, I have explained how to tackle unexpected situations in Kafka Producer. Please refer : Error Handling in Kafka Producer. This blog explains how to design error handling in Kafka Consumer in order to make a consistent streaming integration

Recap for Kafka Consumer

Kafka consumer is an API that allow to fetch data from Kafka. Consumer reads data from the Kafka topic in order to process the message and push it to sink system. Please see how consumer reads data from the messaging layer;

As you see, there are three components that are used in the flow;

  • Messaging Layer ( Kafka Broker)
  • Kafka Consumer
  • Sink system

The following code represents how to consume data from Kafka

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Ref : KafkaConsumer

Let’s deep dive how to tackle unexpected situations in Kafka Consumer

ERROR HANDLING IN CONSUMER

Error handling is very important topic when it comes to play data accuracy. Let’s list down all the problem that you can face while consuming your data from Kafka broker

  • Invalid (bad) data is received from Kafka topic
  • Kafka broker is not available while consuming your data
  • Kafka Consumer could be interrupted
  • Kafka Topic couldn’t be found
  • Any exception can be happened during the data processing
  • Sink system is not available

For every item, I will deep dive to the problem and explain how you can design your implementation

Invalid (bad) data is received from Kafka topic

The data is coming with wrong format/content what consumer is expected. In that case, the recommendation would be to skip this data, but it would be better to log the data to the specific file to be reported afterwards

Consumer Interrupted

The consumer allows to fetch data from the Kafka broker. If any issue happens, the data will be loosed which is in consumer memory. If your consumer skip the record and won’t re-send same data to sink system, there would be a data inconsistency

In order to make sure what part of data is sent to Sink(DB), you can use Kafka consumer “enable.auto.commit” feature

Kafka provides commit API to control what offsets are processed. When you set “enable.auto.commit” field to false, Kafka Consumer commits the offset after successfully processed the block

Steps in the workflow

  • Number 1: Data is fetched from Kafka topic by the consumer
  • Number 2 : Data is processed and send to database to be inserted
  • Number 3 : Data is successfully inserted and seems the flow is working as expected
  • Number 4 : Consumer commit the offset and fetch another event block

The following code disables auto.commit feature and commit the offset manually

     Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}

Ref : KafkaConsumer (Manual Offset Control)

Kafka broker is not available while consuming your data

Consumer is trying to consume data from Kafka topic but the connection to broker is not established because broker is not available. In that case, consumer should retry to consume data within some time intervals.

Another important point is that your monitoring system should send an alert in order to make Kafka broker available. Hence, DevOps team can restart the Kafka broker or find the real issue to keep it available

Kafka Topic couldn’t be found

Kafka records are stored in the topics. If the topic is not found, consumer gets “Unknown topic or partition” error. The use case is very similar with the previous item. Hence you can take a similar action in terms of retrying and sending an alert.

Sink system is not available

When consumer fetch data from Kafka, it is processed based on the logic and output is sent to Sink system. Sink could be every system that accepts the data like database, API, IOT, queue etc. What happens if the consumer couldn’t access to the Sink system.

Ideally, the consumer should pause fetching since there is no value processing data and try to access the Sink system. Once the sink system is available, the consumer can resume fetching and process the data to be sent to the Sink system

Conclusion

As you see, you see different issues in the production environment which reduce the data quality. If the data is critical, it can also cause lose of money or other legal issues. Hence, depends on the data importance, you have to create proper streaming infrastructure with error handling methods. Another important point is to note that some of analytical data is not important and omittable without process it. As a starting point, you should try to understand your data!

--

--