How to build a simple Kafka Streams application in Java

Updated: January 31, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a popular distributed streaming platform designed to handle high volume of data. Kafka Streams is a client library for building applications and microservices that process and analyze data stored in Kafka. In this article, we will walk you through the steps to create a simple Kafka Streams application in Java. We will cover everything from setting up your development environment to writing and executing your application, with code examples provided.

Setting Up the Development Environment

To build a Kafka Streams application in Java, you need the following pre-requisites:

  • Java Development Kit (JDK) 8 or higher
  • Apache Kafka binaries
  • Maven or Gradle for building the project

Ensure JAVA_HOME is set and that Kafka can run on your machine. For the purposes of this guide, we will use Maven to build our project.

Creating the Maven Project

First, create a new directory for your project and initialize a Maven project:

mkdir kafka-streams-app

cd kafka-streams-app

mvn archetype:generate -DgroupId=com.example -DartifactId=kafka-streams-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Navigate to the pom.xml file and add the Kafka Streams dependency:

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>YOUR_KAFKA_VERSION</version>
  </dependency>
</dependencies>

Basic Kafka Streams Application

Create a new Java file named StreamsApplication.java. This is where you will write your streams application logic.

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsApplication {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source = builder.stream("source-topic");
    source.to("destination-topic");

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
  }
}

This code is a simple application that reads messages from source-topic and writes them to destination-topic without any transformation. Replace YOUR_KAFKA_VERSION with the version of Kafka you are using.

Filtering Stream Data

Now, let’s add a filter operation to our stream to only allow messages that meet certain criteria. For example, let us only pass messages with even-length values:

KStream<String, String> filteredStream = source.filter((key, value) -> (value.length() % 2 == 0));

filteredStream.to("filtered-topic");

We have now created a stream that filters out messages. The messages that pass the filter are then written to filtered-topic.

Conclusion

Throughout this tutorial, we learned how to create a basic Kafka Streams application in Java, from setting up the development environment to more advanced stream manipulations. Kafka Streams provides powerful capabilities for building robust streaming applications, and we’ve merely scratched the surface. With Kafka Streams, the possibilities are limitless, and it is a remarkable tool for processing streaming data at scale.