An Introduction to Producer and Consumer in Apache Kafka

Updated: January 31, 2024 By: Guest Contributor Post a comment

Overview

Apache Kafka is a distributed streaming platform that allows you to build real-time streaming data pipelines and applications. At its core, Kafka is based on a publish-subscribe model consisting of producers, topics, consumers, and consumer groups. This tutorial will introduce you to the producer and consumer components within Apache Kafka, explain how they function, and provide you with practical code examples to get started. By the end of this tutorial, you will have a solid foundation to start integrating Kafka into your next application.

Understanding Kafka Basics

Before diving into producers and consumers, let’s clarify a few Kafka concepts:

  • Producer: A producer is an entity that publishes data to one or more Kafka topics.
  • Consumer: A consumer retrieves messages from Kafka topics to which it has subscribed.
  • Topic: A topic is a named feed to which messages are published by producers and from which messages are read by consumers.
  • Consumer Group: A consumer group consists of multiple consumers that jointly consume messages from a topic. Kafka ensures that each partition is only consumed by one consumer in the group.

Setting Up the Environment

To follow along with the code examples, you’ll need to have Apache Kafka and Java installed on your system. Kafka requires a ZooKeeper server for cluster coordination, and by downloading Kafka, you can start a ZooKeeper server with the built-in scripts provided.

Creating a Producer

Let’s start by creating a simple Kafka producer in Java. The producer will send a string message to a topic named MyTopic.

The first step is to define the properties for the Kafka producer:


import java.util.Properties;
import org.apache.kafka.clients.producer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

Then, send a message to the topic:


producer.send(new ProducerRecord<String, String>("MyTopic", "Hello, Kafka!"));
producer.close();

In this example, we set the bootstrap.servers to point to our Kafka instance. The acks setting determines how many partition replicas must acknowledge the message before the send is considered successful. We also define serializers for both the key and value to convert them into bytes because Kafka stores and transmits messages in a binary format.

Creating a Consumer

Next up is creating a Kafka consumer. Consumers read messages from topics and are very flexible in terms of partition and topic subscription.

Below are the basic steps to create a simple consumer:


import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("MyTopic"));

Then, continuously process messages from the topic:


try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("%s [%d] offset=%d, key=%s, value=%s%n",
                record.topic(), record.partition(), record.offset(),
                record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}

In this consumer example, we subscribe to MyTopic and continuously poll for newly available records, processing them one by one. To run the code examples, make sure your Kafka environment is correctly configured and the Kafka server is running.

Kafka Security

While the examples don’t include security configurations, Kafka supports SSL and SASL for that purpose. It is essential to secure your Kafka cluster in production environments.

Conclusion

Now that we know how to set up Kafka producers and consumers, you can start integrating them into your applications. Kafka is an exceptionally powerful tool for processing streaming data, and its publish-subscribe model makes it highly scalable and fault-tolerant. Experiment with the examples provided, and soon you’ll be able to design sophisticated data pipelines and real-time applications powered by Kafka.