Sling Academy
Home/DevOps/How to Integrate Kafka into Microservices (with Examples)

How to Integrate Kafka into Microservices (with Examples)

Last updated: January 31, 2024

Introduction

Apache Kafka is a high-throughput distributed messaging system that is widely used in microservices architectures for event streaming and handling. Integrating Kafka into microservices enables the services to communicate asynchronously and process data in real-time. This tutorial will guide you through the integration of Kafka into a microservice environment, with a progression from basic to advanced examples.

Moving from monolithic architectures to microservices has led to the adoption of message brokers for communication between services. Kafka is particularly popular due to its scalability, fault tolerance, and performance. Before diving into Kafka integration, it’s important to understand its key components like producers, consumers, topics, and the Kafka broker itself.

Getting Started with Kafka

Start the Kafka server and create a test topic to work with:

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4

Once you have a running instance of Kafka, you can start integrating it with your microservices.

Basic Producer and Consumer

To begin, we’ll write a simple producer and consumer in Java using the Kafka client library. Include the necessary dependency in your project’s build file:

// For Maven
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>
// For Gradle
implementation 'org.apache.kafka:kafka-clients:3.0.0'

Producer Example

We’ll write a producer that sends messages to the ‘test’ topic that we created earlier. Here’s the sample code:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("test", 
                Integer.toString(i), "Message " + i));
        }

        producer.close();
    }
}

Consumer Example

Next, we’ll create a simple consumer that reads messages from our ‘test’ topic:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
            }
        }
    }
}

This consumer will continue to read messages as they arrive on the topic and print them to the console.

Stream Processing with Kafka Streams

Kafka Streams API allows for real-time stream processing within your microservices. First, add the necessary Kafka Streams dependency:

// For Maven
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.0.0</version>
</dependency>
// For Gradle
implementation 'org.apache.kafka:kafka-streams:3.0.0'

Now, let’s create a stream processor that reads from our ‘test’ topic, processes the data, and outputs to a new topic:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class StreamProcessor {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("test");
        source.mapValues(value -> "Processed: " + value)
              .to("processed-test");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The processor will append ‘Processed:’ to each message and send the result to the ‘processed-test’ topic.

Advanced Integration with Spring Cloud Stream

For advanced use cases, you can integrate Kafka with Spring Cloud Stream, which provides a higher-level abstraction for Kafka integration. First, add the Spring Cloud Stream Binder for Kafka to your project:

// For Maven
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    <version>3.1.2</version>
</dependency>
// For Gradle
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.2'

Next, configure the application to use the Kafka binder and define your input and output channels:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        inputChannel:
          destination: test
        outputChannel:
          destination: processed-test

Create a service bean that listens to the input channel and sends processed data to the output channel:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.handler.annotation.SendTo;

@EnableBinding(Processor.class)
public class StreamProcessingService {
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Object transformMessage(String message) {
        return MessageBuilder.withPayload("Processed: " + message).build();
    }
}

This configuration uses Spring Cloud Stream’s programming model to read from and write to Kafka topics.

Conclusion

In this tutorial, we’ve explored how to integrate Kafka into microservices, starting from basic producer and consumer examples, advancing to stream processing with Kafka Streams, and finally leveraging the Spring Cloud Stream library for a more seamless integration. These examples serve as a foundation for adopting Kafka in a microservices environment to build scalable and real-time data processing systems.

Next Article: How to Design Kafka-Based Event-Driven Microservices (with Examples)

Previous Article: Apache Kafka: Addressing the Challenges of Exactly-Once Semantics

Series: Apache Kafka Tutorials

DevOps

You May Also Like

  • How to reset Ubuntu to factory settings (4 approaches)
  • Making GET requests with cURL: A practical guide (with examples)
  • Git: What is .DS_Store and should you ignore it?
  • NGINX underscores_in_headers: Explained with examples
  • How to use Jenkins CI with private GitHub repositories
  • Terraform: Understanding State and State Files (with Examples)
  • SHA1, SHA256, and SHA512 in Terraform: A Practical Guide
  • CSRF Protection in Jenkins: An In-depth Guide (with examples)
  • Terraform: How to Merge 2 Maps
  • Terraform: How to extract filename/extension from a path
  • JSON encoding/decoding in Terraform: Explained with examples
  • Sorting Lists in Terraform: A Practical Guide
  • Terraform: How to trigger a Lambda function on resource creation
  • How to use Terraform templates
  • Understanding terraform_remote_state data source: Explained with examples
  • Jenkins Authorization: A Practical Guide (with examples)
  • Solving Jenkins Pipeline NotSerializableException: groovy.json.internal.LazyMap
  • Understanding Artifacts in Jenkins: A Practical Guide (with examples)
  • Using Jenkins with AWS EC2 and S3: A Practical Guide