Kafka: How to set retention time for messages in a topic

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming applications. One of its core features is the ability to store records (messages) in topics for a specified period. By controlling the retention period of messages, users can manage storage costs and ensure that their Kafka clusters are not overwhelmed by stale data. This tutorial will guide you through setting up message retention time in Kafka from basic to advanced configurations.

Understanding Kafka Retention Policies

In Kafka, messages are retained in topics either for a pre-configured time or until the topic reaches a certain size. There are two key properties that control retention:

  • retention.ms – This is the retention time in milliseconds. When set, messages older than this duration will be eligible for deletion.
  • retention.bytes – This is the maximum size a log can grow to. Once this size is met, older log segments are removed.

Kafka retention policy can be configured at the broker level (applies to all topics) or per topic, where specific topics can have their own retention settings.

Setting Retention Time

Prerequisites: Ensure that you have a Kafka environment set up and that you can produce and consume messages to and from a Kafka topic.

Configuring Broker-Level Retention

To set a global, broker-level retention time for all topics, you need to modify the Kafka configuration file (server.properties). This is usually located in the Kafka config directory.

echo "log.retention.hours=48" >> /path/to/kafka/config/server.properties

Restart the Kafka broker for the changes to take effect:

bin/kafka-server-stop.sh
bin/kafka-server-start.sh /path/to/kafka/config/server.properties

Configuring Topic-Level Retention

To configure retention time for an individual topic, you use the kafka-configs.sh utility comes with Kafka.

# Set the retention period for 'my-topic' to 24 hours
bin/kafka-configs.sh --zookeeper <Zookeeper-Connect-String> --alter --entity-type topics --entity-name my-topic --add-config retention.ms=86400000

If you’re using a Kafka version that has deprecated Zookeeper, use the --bootstrap-server option with the endpoint of your Kafka cluster:

# Set the retention period for 'my-topic' to 24 hours using bootstrap serverin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --alter --entity-type topics --entity-name my-topic --add-config retention.ms=86400000

Advanced Configurations

Setting a Size-Based Retention Policy

Aside from time-based retention, Kafka also allows you to set a size-based retention policy. Here’s how you can set a limit on the size of the stored logs for a topic:

# Set a retention size of 500MB for 'my-topic'
bin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --alter --entity-type topics --entity-name my-topic --add-config retention.bytes=524288000

Combining Time and Size Retention Policies

In some scenarios, you might want to configure both time and size-based retention policies, such as retaining messages for a maximum of 7 days or until the log size reaches 1GB, whichever comes first.

# Set a retention time of 7 days and a retention size of 1GB for 'my-topic'
bin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --alter --entity-type topics --entity-name my-topic \
  --add-config retention.ms=604800000,retention.bytes=1073741824

Log Compaction

Another retention strategy is log compaction, which retains the last update to a particular key even after the retention period or log size threshold is reached. This ensures that Kafka maintains at least one copy of every key.

# Enable log compaction for 'my-topic'
bin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --alter --entity-type topics --entity-name my-topic \
  --add-config cleanup.policy=compact

Enabling log compaction and retention settings can be useful for topics where the full history of updates needs to be retained, such as configuration change logs or master data records.

Verifying Your Configuration

Once you have applied your retention settings, it’s always a good idea to verify that they’re working as expected. The following command will display the current configuration for a topic, including retention policies:

bin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --describe --entity-type topics --entity-name my-topic

You should see output including entries for retention.ms and retention.bytes with the values you configured earlier.

Conclusion

Properly managing retention time for messages in Kafka is crucial for optimizing storage and ensuring data relevance. Tailoring Kafka’s retention policy based on your application’s requirements can lead to better performance and resource allocation. With the techniques covered in this tutorial, you should be equipped to configure time-based, size-based, and even compacted logs for your Kafka topics.