How to Implement Windowing in Kafka Streams (with Examples)

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka topics. It allows for stateful and stateless processing, and one of its most powerful features is windowing, which enables processing data in time-bound chunks.

In this article, we walk through various windowing concepts in Kafka Streams and how to implement them, providing examples that range from basic to advanced to help illustrate the process.

What is Windowing?

Windowing partitions the input data into windows based on the timestamp of each message. It’s useful for aggregating data over a specified time frame and answering questions like “How many events occurred in the last 15 minutes?”

Types of Windows in Kafka Streams

Kafka Streams offers several types of windows:

  • Tumbling Windows: Non-overlapping, fixed-sized windows.
  • Hopping Windows: Fixed-size windows that overlap and ‘hop’ by a specified interval.
  • Sliding Windows: Windows that slide continuously over the data stream, capturing data within a given range.
  • Session Windows: Dynamic windows that group data by sessions of activity. They are separated by a period of inactivity longer than a specified gap.

Setting Up the Environment

To implement windowing in Kafka Streams, you first need to set up your environment:

dependencies {
    implementation 'org.apache.kafka:kafka-streams:2.8.0'
}

You also need a Kafka cluster running, and to create the required topics.

Basic Tumbling Window Example

A tumbling window is the simplest form of windowing. For this example, let’s count the number of messages in a stream over a 1-minute window.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");

KStream<Windowed, Long> wordCounts = textLines
    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word)
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
    .count(Materialized.as("counts-store"))
    .toStream();

wordCounts.to("CountsOutputTopic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

This code will output counts of words every minute to the “CountsOutputTopic”.

Hopping Window Example

In a hopping window, we count messages like the tumbling window but with overlapping periods. Let’s count messages over a 5-minute window that moves by 1 minute.

 KStream<Windowed, Long> wordCounts = textLines
    .groupBy((key, word) -> word)
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
    .count(Materialized.as("counts-store"))
    .toStream();

With the advanceBy method, you specify the hop (the window size is still specified in the of method).

Sliding Window Example

A sliding window is useful for use cases like detecting patterns within the last N minutes for every event. Here is an example of a 2-minute sliding window that counts words:

 KStream<Windowed, Long> wordCounts = textLines
    .groupBy((key, word) -> word)
    .windowedBy(TimeWindows.of(Duration.ofMinutes(2)).grace(Duration.ofSeconds(0)))
    .count(Materialized.as("counts-store"))
    .toStream((windowedKey, value) -> windowedKey.key());

Note that sliding windows in Kafka Streams are configured like hopping windows without the advanceBy parameter.

Session Window Example

Finally, let’s look at session windows. They help generating windows for sessions of activity, which can be insightful for user activity analysis. Below is a code snippet for a session window that groups messages into sessions with at least 10-minute inactivity gaps between them:

 KStream<Windowed, Long> wordCounts = textLines
    .groupBy((key, word) -> word)
    .windowedBy(SessionWindows.with(Duration.ofMinutes(10)))
    .count()
    .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start() + "-" + windowedKey.window().end());

Session windows can help in understanding how events are distributed across periods of activity and silence.

Conclusion

We’ve covered the four main types of windowing operations in Kafka Streams, including tumbling, hopping, sliding, and session windows. Each serves a different purpose and can be pivotal in managing time-sensitive data in a streaming context. The provided examples should give you a starting point to explore windowing in your Kafka applications.