Sling Academy
Home/DevOps/Building a Basic Kafka Consumer in Java

Building a Basic Kafka Consumer in Java

Last updated: January 31, 2024

Overview

Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation. It is used for building real-time data pipelines and streaming apps. Kafka is often used for building powerful data consumers that can handle high throughput and are fault-tolerant. This tutorial focuses on building a basic Kafka consumer using Java.

Prerequisites

  • Java 8 or above
  • Apache Kafka setup
  • Maven for dependency management
  • An IDE such as IntelliJ IDEA or Eclipse

Step-by-Step Instructions

Step 1: Set Up a Maven Project

Create a new Maven project in your favorite IDE and add the following dependency in your pom.xml to include the Kafka clients library:


    org.apache.kafka
    kafka-clients
    YOUR_KAFKA_VERSION

Step 2: Configure the Consumer

The first step in creating a Kafka consumer in Java is to define the configuration settings:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Step 3: Subscribe to Topics

After configuring the consumer, the next step is to subscribe to the topic(s) it should listen to:

consumer.subscribe(Arrays.asList("my_topic", "my_other_topic"));

Step 4: Poll Kafka for Data

Now, the consumer can poll data from Kafka:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}

Error Handling

It’s important to handle potential errors such as connection failures or interruptions. This can be accomplished by wrapping the polling logic within a try-catch block.

Step 5: Graceful Shutdown

To ensure a graceful shutdown, use a shutdown hook to close the consumer:

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("Stopping consumer...");
    consumer.close();
}));

Conclusion

This basic consumer setup will allow you to connect to your Kafka cluster and start processing streams of data. For production systems, consider adding more complex error handling and committing strategies, along with monitoring and alerting. Kafka offers a rich set of APIs and settings for fine-tuning your consumers, and mastering these will be key to successfully harnessing the full power of Kafka in your Java applications.

With this guide, you’re now equipped to start building more sophisticated consumers and integrating Apache Kafka into your data processing pipelines, ultimately enabling you to manage high-throughput data in a more effective manner.

Next Article: How to Manage Offsets in Kafka (with Examples)

Previous Article: How to Write a Kafka Consumer in Python

Series: Apache Kafka Tutorials

DevOps

You May Also Like

  • How to reset Ubuntu to factory settings (4 approaches)
  • Making GET requests with cURL: A practical guide (with examples)
  • Git: What is .DS_Store and should you ignore it?
  • NGINX underscores_in_headers: Explained with examples
  • How to use Jenkins CI with private GitHub repositories
  • Terraform: Understanding State and State Files (with Examples)
  • SHA1, SHA256, and SHA512 in Terraform: A Practical Guide
  • CSRF Protection in Jenkins: An In-depth Guide (with examples)
  • Terraform: How to Merge 2 Maps
  • Terraform: How to extract filename/extension from a path
  • JSON encoding/decoding in Terraform: Explained with examples
  • Sorting Lists in Terraform: A Practical Guide
  • Terraform: How to trigger a Lambda function on resource creation
  • How to use Terraform templates
  • Understanding terraform_remote_state data source: Explained with examples
  • Jenkins Authorization: A Practical Guide (with examples)
  • Solving Jenkins Pipeline NotSerializableException: groovy.json.internal.LazyMap
  • Understanding Artifacts in Jenkins: A Practical Guide (with examples)
  • Using Jenkins with AWS EC2 and S3: A Practical Guide