Building a Basic Kafka Consumer in Java

Updated: January 31, 2024 By: Guest Contributor Post a comment

Overview

Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation. It is used for building real-time data pipelines and streaming apps. Kafka is often used for building powerful data consumers that can handle high throughput and are fault-tolerant. This tutorial focuses on building a basic Kafka consumer using Java.

Prerequisites

  • Java 8 or above
  • Apache Kafka setup
  • Maven for dependency management
  • An IDE such as IntelliJ IDEA or Eclipse

Step-by-Step Instructions

Step 1: Set Up a Maven Project

Create a new Maven project in your favorite IDE and add the following dependency in your pom.xml to include the Kafka clients library:


    org.apache.kafka
    kafka-clients
    YOUR_KAFKA_VERSION

Step 2: Configure the Consumer

The first step in creating a Kafka consumer in Java is to define the configuration settings:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Step 3: Subscribe to Topics

After configuring the consumer, the next step is to subscribe to the topic(s) it should listen to:

consumer.subscribe(Arrays.asList("my_topic", "my_other_topic"));

Step 4: Poll Kafka for Data

Now, the consumer can poll data from Kafka:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}

Error Handling

It’s important to handle potential errors such as connection failures or interruptions. This can be accomplished by wrapping the polling logic within a try-catch block.

Step 5: Graceful Shutdown

To ensure a graceful shutdown, use a shutdown hook to close the consumer:

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("Stopping consumer...");
    consumer.close();
}));

Conclusion

This basic consumer setup will allow you to connect to your Kafka cluster and start processing streams of data. For production systems, consider adding more complex error handling and committing strategies, along with monitoring and alerting. Kafka offers a rich set of APIs and settings for fine-tuning your consumers, and mastering these will be key to successfully harnessing the full power of Kafka in your Java applications.

With this guide, you’re now equipped to start building more sophisticated consumers and integrating Apache Kafka into your data processing pipelines, ultimately enabling you to manage high-throughput data in a more effective manner.