Understanding Stateful and Stateless Processing in Kafka Streams (with Examples)

Updated: January 31, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka has become the go-to technology for stream processing, often used in combination with its stream-processing library Kafka Streams. Understanding the difference between stateful and stateless processing is fundamental when working with Kafka Streams. This tutorial will break down the difference between the two, provide code examples for clarification, and help you decide when to use each one.

Understanding Stateless Processing

In stateless processing, each record is handled independently of one another. Neither previous records’ information nor a maintained state affects how a new record is processed. This makes stateless operations easier to implement and reason about.

Examples of Stateless Operations in Kafka Streams

  • Map
  • Filter
  • ForEach

Example 1: Filter Operation – Filtering records in Kafka Streams could be for a specific condition, like records with value greater than a threshold.

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> initialStream = builder.stream("numbers-topic");
KStream<String, Integer> filteredStream = initialStream.filter((key, value) -> value > 10);
filteredStream.to("filtered-numbers-topic");

Here, we applied a filter operation on the KStream which contains String as key and Integer as value, forwarding only those records with values greater than 10.

Understanding Stateful Processing

In stateful processing, the processing of a record can depend on the state calculated from previous records or some external systems. Examples of stateful operations include aggregation, joins, and windowing.

Examples of Stateful Operations in Kafka Streams

  • Aggregates
  • Join Operations
  • Windowing

Example 2: Aggregation Operation – Here’s how you would perform an aggregation to count occurrences of numbers in given messages.

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KGroupedStream;

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> sourceStream = builder.stream("numbers-topic");
KGroupedStream<String, String> groupedStream = sourceStream.groupByKey();
KTable<String, Long> aggregatedStream = groupedStream.count();

aggregatedStream.toStream().to("aggregated-numbers-topic");

This code example essentially counts the number of occurrences for each key and outputs a KTable, a changelog stream that represents updates to a table.

Merging Stateful and Stateless Operations

Often, in real-world applications, stateful and stateless processing are combined to fulfill complex business requirements.

Example 3: Combining Filter and Aggregate – Imagine you want to count occurrences but only for certain values that pass through the filter.

....
KStream<String, Integer> filteredStream = initialStream.filter((key, value) -> value > 10);
KGroupedStream<String, Integer> groupedFilteredStream = filteredStream.groupByKey();
KTable<String, Long> countTable = groupedFilteredStream.count();
....

In this pattern, records are first filtered (stateless) and then counted (stateful).

Advanced Concepts

Stateful processing can also include more advanced concepts such as windowing, where data is aggregated over a specific time window.

Example 4: Windowing Operations – Here’s an example of a time window that counts occurrences within a 5-minute window.

import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Materialized;

KStream<String, Integer> source = builder.stream("numbers-topic");

source.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.as("windowed-count-store"))
.toStream((Windowed windowedKey, Long value) -> windowedKey.toString())
.to("windowed-counts-topic");

In the above example, counts are updated per window per key, and those counts are published to another Kafka topic.

State Store Considerations

Stateful processing often involves state stores which are customizable and can be persistent or in-memory, depending on the fault-tolerance requirement.

Conclusion

In this tutorial, you’ve learned the difference between stateful and stateless processing within Kafka Streams and seen how they’re implemented through various examples. Recognizing when to use stateful versus stateless processing will enhance the scalability and robustness of your stream processing applications. Mastering these paradigms will provide you with a strong foundation to tackle complex streaming problems.