Introduction
Kafka Streams is a lightweight library designed for building real-time applications and microservices, where the input and output data are stored in Kafka clusters. In this tutorial, we’ll explore the essentials of Kafka Streams and demonstrate how to build stream processing applications using various examples.
Setting Up the Environment
First, make sure you have Apache Kafka installed and running on your system (if you don’t, see:
- How to download and install Kafka on Ubuntu
- How to install and configure Apache Kafka on Windows
- How to set up Kafka on Mac
- How to use Kafka with Docker and Docker Compose
For Kafka Streams, you will also need to integrate the Kafka Streams library into your Java application.
Start the Kafka environment. Open a terminal session and navigate to the Kafka directory. Run the following commands to start the Zookeeper and Kafka servers:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Include Kafka Streams in your project by adding it to your build file. For Maven, add:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>LATEST_VERSION</version>
</dependency>
Replace LATEST_VERSION
with the latest Kafka version.
Basic Kafka Streams Application
Create a simple application that reads from one topic, processes the data, and writes to another topic.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class SimpleStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("source-topic");
source.to("target-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
This simple application transfers data from source-topic
to target-topic
without any modification. The props
object holds the configuration for the Kafka Streams application.
Stateful Operations
Kafka Streams supports stateful operations such as count
, aggregate
, and reduce
. Let’s add a simple word count example:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-lines-topic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\s+")))
.groupBy((key, word) -> word)
.count();
wordCounts.toStream().to("word-counts-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
In this code, the input is a stream of text lines. We flatten these into words, group by the word, and then count the occurrences of each word.
Join Operations
Joining streams is another powerful feature of Kafka Streams that allows you to correlate data from multiple sources.
KStream<String, String> leftSource = builder.stream("left-topic");
KStream<String, String> rightSource = builder.stream("right-topic");
KStream<String, String> joined = leftSource.join(rightSource,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
JoinWindows.of(Duration.ofMinutes(5)),
StreamJoined.with(
Serdes.String(),
Serdes.String(),
Serdes.String())
);
joined.to("joined-topic");
This example demonstrates a simple inner join between two streams with a five-minute window.
Advanced Processing
Let’s build a more advanced application that uses the processor API for custom processing logic:
builder.stream("input-topic").process(() -> new Processor<String, String>() {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
// Implement custom processing logic here
context.forward(key, new CustomValue(value));
context.commit();
}
@Override
public void close() {
// Implement any cleanup code here
}
}, "processor-node");
This demonstrates the use of a custom processor which allows you total control over the stream processing.
Error Handling
When working with stream processing, handling errors appropriately is crucial. Kafka Streams provides ways to handle exceptions during processing.
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultProductionExceptionHandler.class);
These configurations allow you to manage deserialization and production exceptions, either by logging and continuing or by default handling that might even shut down the application.
Scaling Your Application
Scaling is inherent to Kafka Streams. By running multiple instances of your application with the same application.id
, Kafka will balance the workload between the instances.
Testing
Testing your Kafka Streams application is straight-forward with the TopologyTestDriver
.
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), props);
ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new StringSerializer());
testDriver.pipeInput(factory.create("key", "value"));
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
assertThat(outputRecord.key()).isEqualTo("key");
assertThat(outputRecord.value()).isEqualTo("modified-value");
In the test, we use TopologyTestDriver
to pipe input records into our topology and then inspect the output.
Conclusion
Kafka Streams is a versatile library for building scalable, high-throughput, and fault-tolerant real-time stream processing applications. By following this guide, you’ve learned the basics and are well on your way to creating sophisticated stream processing applications with Kafka Streams.