Introduction
Apache Kafka is a popular distributed streaming platform designed to handle high volume of data. Kafka Streams is a client library for building applications and microservices that process and analyze data stored in Kafka. In this article, we will walk you through the steps to create a simple Kafka Streams application in Java. We will cover everything from setting up your development environment to writing and executing your application, with code examples provided.
Setting Up the Development Environment
To build a Kafka Streams application in Java, you need the following pre-requisites:
- Java Development Kit (JDK) 8 or higher
- Apache Kafka binaries
- Maven or Gradle for building the project
Ensure JAVA_HOME
is set and that Kafka can run on your machine. For the purposes of this guide, we will use Maven to build our project.
Creating the Maven Project
First, create a new directory for your project and initialize a Maven project:
mkdir kafka-streams-app
cd kafka-streams-app
mvn archetype:generate -DgroupId=com.example -DartifactId=kafka-streams-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
Navigate to the pom.xml
file and add the Kafka Streams dependency:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>YOUR_KAFKA_VERSION</version>
</dependency>
</dependencies>
Basic Kafka Streams Application
Create a new Java file named StreamsApplication.java
. This is where you will write your streams application logic.
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
public class StreamsApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("source-topic");
source.to("destination-topic");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
}
This code is a simple application that reads messages from source-topic
and writes them to destination-topic
without any transformation. Replace YOUR_KAFKA_VERSION
with the version of Kafka you are using.
Filtering Stream Data
Now, let’s add a filter operation to our stream to only allow messages that meet certain criteria. For example, let us only pass messages with even-length values:
KStream<String, String> filteredStream = source.filter((key, value) -> (value.length() % 2 == 0));
filteredStream.to("filtered-topic");
We have now created a stream that filters out messages. The messages that pass the filter are then written to filtered-topic
.
Conclusion
Throughout this tutorial, we learned how to create a basic Kafka Streams application in Java, from setting up the development environment to more advanced stream manipulations. Kafka Streams provides powerful capabilities for building robust streaming applications, and we’ve merely scratched the surface. With Kafka Streams, the possibilities are limitless, and it is a remarkable tool for processing streaming data at scale.