How to Integrate Kafka into Microservices (with Examples)

Updated: January 31, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a high-throughput distributed messaging system that is widely used in microservices architectures for event streaming and handling. Integrating Kafka into microservices enables the services to communicate asynchronously and process data in real-time. This tutorial will guide you through the integration of Kafka into a microservice environment, with a progression from basic to advanced examples.

Moving from monolithic architectures to microservices has led to the adoption of message brokers for communication between services. Kafka is particularly popular due to its scalability, fault tolerance, and performance. Before diving into Kafka integration, it’s important to understand its key components like producers, consumers, topics, and the Kafka broker itself.

Getting Started with Kafka

Start the Kafka server and create a test topic to work with:

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4

Once you have a running instance of Kafka, you can start integrating it with your microservices.

Basic Producer and Consumer

To begin, we’ll write a simple producer and consumer in Java using the Kafka client library. Include the necessary dependency in your project’s build file:

// For Maven
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>
// For Gradle
implementation 'org.apache.kafka:kafka-clients:3.0.0'

Producer Example

We’ll write a producer that sends messages to the ‘test’ topic that we created earlier. Here’s the sample code:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("test", 
                Integer.toString(i), "Message " + i));
        }

        producer.close();
    }
}

Consumer Example

Next, we’ll create a simple consumer that reads messages from our ‘test’ topic:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
            }
        }
    }
}

This consumer will continue to read messages as they arrive on the topic and print them to the console.

Stream Processing with Kafka Streams

Kafka Streams API allows for real-time stream processing within your microservices. First, add the necessary Kafka Streams dependency:

// For Maven
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.0.0</version>
</dependency>
// For Gradle
implementation 'org.apache.kafka:kafka-streams:3.0.0'

Now, let’s create a stream processor that reads from our ‘test’ topic, processes the data, and outputs to a new topic:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class StreamProcessor {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("test");
        source.mapValues(value -> "Processed: " + value)
              .to("processed-test");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The processor will append ‘Processed:’ to each message and send the result to the ‘processed-test’ topic.

Advanced Integration with Spring Cloud Stream

For advanced use cases, you can integrate Kafka with Spring Cloud Stream, which provides a higher-level abstraction for Kafka integration. First, add the Spring Cloud Stream Binder for Kafka to your project:

// For Maven
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    <version>3.1.2</version>
</dependency>
// For Gradle
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.2'

Next, configure the application to use the Kafka binder and define your input and output channels:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        inputChannel:
          destination: test
        outputChannel:
          destination: processed-test

Create a service bean that listens to the input channel and sends processed data to the output channel:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.handler.annotation.SendTo;

@EnableBinding(Processor.class)
public class StreamProcessingService {
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Object transformMessage(String message) {
        return MessageBuilder.withPayload("Processed: " + message).build();
    }
}

This configuration uses Spring Cloud Stream’s programming model to read from and write to Kafka topics.

Conclusion

In this tutorial, we’ve explored how to integrate Kafka into microservices, starting from basic producer and consumer examples, advancing to stream processing with Kafka Streams, and finally leveraging the Spring Cloud Stream library for a more seamless integration. These examples serve as a foundation for adopting Kafka in a microservices environment to build scalable and real-time data processing systems.