Kafka: How to read events from a topic (with examples)

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a popular open-source stream-processing software platform developed by the Apache Software Foundation, written in Scala and Java. It is designed to handle data feeds with high throughput and low latency. Kafka is widely used for building real-time data pipelines and streaming apps. It is imperative to understand how to read events from a Kafka topic as it’s one of the core operations to process the continuously flowing stream of data.

In this tutorial, we will explore various methods to read events from a Kafka topic using the Kafka consumers, ranging from basic consumption to more advanced configurations.

Prerequisites

Before we dive into reading Kafka events, make sure that you have Kafka installed and running on your machine along with a topic created for testing. Also, note that we’ll be using Java in this tutorial assuming you have Java development environment set up.

Setting Up a Basic Kafka Consumer

First, let’s start by setting up a basic consumer to read messages from a Kafka topic: ‘test-topic’.

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // More configurations can be set here if needed

        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("test-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("Received record with key: %s, value: %s, partition: %d, offset: %d%n", record.key(), record.value(), record.partition(), record.offset());
                });
            }
        }
    }
}

This simple example demonstrates how to read events from a Kafka topic using the Kafka Java API. You would see the messages printed on the console if there were any messages in ‘test-topic’.

Auto-Offset Reset Configuration

It’s important to understand the behavior of the consumer in terms of reading offsets. By default, consumers automatically commit offsets. However, you can define how your consumer handles offsets by setting the ‘auto.offset.reset’ property.

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Read from the beginning
// or
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // Read only new messages

‘earliest’ configures the consumer to read from the start of the topic, while ‘latest’ means it will only read new messages that are produced after the consumer has started.

Custom Offset Management

For more fine-grained control, you might not want to commit offsets after every message. This provides flexibility and precise control over what has been processed.

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Disable auto-commit

// Later in your code, after processing the message:
long lastOffset = record.offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));

This manual offset committing ensures that your consumer does not lose its place and reprocesses messages leading to potential duplication.

Deserializing Complex Types

Up until now, we have assumed that the messages in the Kafka topic are String data types. In a real-world scenario, you might have to work with complex types such as JSON or Avro. Here is an example of how you can read JSON serialized data using a custom deserializer:

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.example.YourCustomJsonDeserializer");

Implement ‘YourCustomJsonDeserializer’ to handle the conversion logic from the byte stream to your desired object.

Commit Strategies and Error Handling

You can choose different strategies for when to commit offsets. Deciding between synchronous (commitSync()) and asynchronous (commitAsync()) commits can impact system performance and data consistency. Additionally, you should also implement error handling to properly deal with situations when consumption fails and for managing retries.

Advanced Use Cases: Partition Assignment and Rebalance Listeners

For more advanced control over the Kafka consumer, you can implement custom partition assignment and rebalance listeners. This allows finer-grained control and management over message consumption from different partitions of a topic handling use cases like partition stickiness or stateful operations.

consumer.subscribe(Collections.singletonList("test-topic"), new HandleRebalance());

class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Handle partition revocation
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Handle partition assignment
    }
}

With this custom logic in the rebalance listener, you can make decisions before and after partitions assignments change.

Conclusion

Reading events from a Kafka topic is vital for developing streaming applications. This tutorial walked through various levels of Kafka consumer complexities. As consuming events is a central feature of Kafka, getting familiar with these concepts ensures robust and fault-tolerant streaming systems.