Getting Started with Kafka Streams: A Practical Guide (with Examples)

Updated: January 31, 2024 By: Guest Contributor Post a comment

Introduction

Kafka Streams is a lightweight library designed for building real-time applications and microservices, where the input and output data are stored in Kafka clusters. In this tutorial, we’ll explore the essentials of Kafka Streams and demonstrate how to build stream processing applications using various examples.

Setting Up the Environment

First, make sure you have Apache Kafka installed and running on your system (if you don’t, see:

For Kafka Streams, you will also need to integrate the Kafka Streams library into your Java application.

Start the Kafka environment. Open a terminal session and navigate to the Kafka directory. Run the following commands to start the Zookeeper and Kafka servers:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Include Kafka Streams in your project by adding it to your build file. For Maven, add:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>LATEST_VERSION</version>
</dependency>

Replace LATEST_VERSION with the latest Kafka version.

Basic Kafka Streams Application

Create a simple application that reads from one topic, processes the data, and writes to another topic.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;

public class SimpleStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("source-topic");
        source.to("target-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

This simple application transfers data from source-topic to target-topic without any modification. The props object holds the configuration for the Kafka Streams application.

Stateful Operations

Kafka Streams supports stateful operations such as count, aggregate, and reduce. Let’s add a simple word count example:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-lines-topic");

KTable<String, Long> wordCounts = textLines
        .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\s+")))
        .groupBy((key, word) -> word)
        .count();

wordCounts.toStream().to("word-counts-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

In this code, the input is a stream of text lines. We flatten these into words, group by the word, and then count the occurrences of each word.

Join Operations

Joining streams is another powerful feature of Kafka Streams that allows you to correlate data from multiple sources.

KStream<String, String> leftSource = builder.stream("left-topic");
KStream<String, String> rightSource = builder.stream("right-topic");

KStream<String, String> joined = leftSource.join(rightSource,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
    JoinWindows.of(Duration.ofMinutes(5)),
    StreamJoined.with(
        Serdes.String(),
        Serdes.String(),
        Serdes.String())
);
joined.to("joined-topic");

This example demonstrates a simple inner join between two streams with a five-minute window.

Advanced Processing

Let’s build a more advanced application that uses the processor API for custom processing logic:

builder.stream("input-topic").process(() -> new Processor<String, String>() {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {
        // Implement custom processing logic here
        context.forward(key, new CustomValue(value));
        context.commit();
    }

    @Override
    public void close() {
        // Implement any cleanup code here
    }
}, "processor-node");

This demonstrates the use of a custom processor which allows you total control over the stream processing.

Error Handling

When working with stream processing, handling errors appropriately is crucial. Kafka Streams provides ways to handle exceptions during processing.

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultProductionExceptionHandler.class);

These configurations allow you to manage deserialization and production exceptions, either by logging and continuing or by default handling that might even shut down the application.

Scaling Your Application

Scaling is inherent to Kafka Streams. By running multiple instances of your application with the same application.id, Kafka will balance the workload between the instances.

Testing

Testing your Kafka Streams application is straight-forward with the TopologyTestDriver.

TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), props);
ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new StringSerializer());

testDriver.pipeInput(factory.create("key", "value"));
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
assertThat(outputRecord.key()).isEqualTo("key");
assertThat(outputRecord.value()).isEqualTo("modified-value");

In the test, we use TopologyTestDriver to pipe input records into our topology and then inspect the output.

Conclusion

Kafka Streams is a versatile library for building scalable, high-throughput, and fault-tolerant real-time stream processing applications. By following this guide, you’ve learned the basics and are well on your way to creating sophisticated stream processing applications with Kafka Streams.