Introduction
Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming applications. One of its core features is the ability to store records (messages) in topics for a specified period. By controlling the retention period of messages, users can manage storage costs and ensure that their Kafka clusters are not overwhelmed by stale data. This tutorial will guide you through setting up message retention time in Kafka from basic to advanced configurations.
Understanding Kafka Retention Policies
In Kafka, messages are retained in topics either for a pre-configured time or until the topic reaches a certain size. There are two key properties that control retention:
retention.ms
– This is the retention time in milliseconds. When set, messages older than this duration will be eligible for deletion.retention.bytes
– This is the maximum size a log can grow to. Once this size is met, older log segments are removed.
Kafka retention policy can be configured at the broker level (applies to all topics) or per topic, where specific topics can have their own retention settings.
Setting Retention Time
Prerequisites: Ensure that you have a Kafka environment set up and that you can produce and consume messages to and from a Kafka topic.
Configuring Broker-Level Retention
To set a global, broker-level retention time for all topics, you need to modify the Kafka configuration file (server.properties
). This is usually located in the Kafka config directory.
echo "log.retention.hours=48" >> /path/to/kafka/config/server.properties
Restart the Kafka broker for the changes to take effect:
bin/kafka-server-stop.sh
bin/kafka-server-start.sh /path/to/kafka/config/server.properties
Configuring Topic-Level Retention
To configure retention time for an individual topic, you use the kafka-configs.sh
utility comes with Kafka.
# Set the retention period for 'my-topic' to 24 hours
bin/kafka-configs.sh --zookeeper <Zookeeper-Connect-String> --alter --entity-type topics --entity-name my-topic --add-config retention.ms=86400000
If you’re using a Kafka version that has deprecated Zookeeper, use the --bootstrap-server
option with the endpoint of your Kafka cluster:
# Set the retention period for 'my-topic' to 24 hours using bootstrap serverin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --alter --entity-type topics --entity-name my-topic --add-config retention.ms=86400000
Advanced Configurations
Setting a Size-Based Retention Policy
Aside from time-based retention, Kafka also allows you to set a size-based retention policy. Here’s how you can set a limit on the size of the stored logs for a topic:
# Set a retention size of 500MB for 'my-topic'
bin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --alter --entity-type topics --entity-name my-topic --add-config retention.bytes=524288000
Combining Time and Size Retention Policies
In some scenarios, you might want to configure both time and size-based retention policies, such as retaining messages for a maximum of 7 days or until the log size reaches 1GB, whichever comes first.
# Set a retention time of 7 days and a retention size of 1GB for 'my-topic'
bin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --alter --entity-type topics --entity-name my-topic \
--add-config retention.ms=604800000,retention.bytes=1073741824
Log Compaction
Another retention strategy is log compaction, which retains the last update to a particular key even after the retention period or log size threshold is reached. This ensures that Kafka maintains at least one copy of every key.
# Enable log compaction for 'my-topic'
bin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --alter --entity-type topics --entity-name my-topic \
--add-config cleanup.policy=compact
Enabling log compaction and retention settings can be useful for topics where the full history of updates needs to be retained, such as configuration change logs or master data records.
Verifying Your Configuration
Once you have applied your retention settings, it’s always a good idea to verify that they’re working as expected. The following command will display the current configuration for a topic, including retention policies:
bin/kafka-configs.sh --bootstrap-server <Broker-Connect-String> --describe --entity-type topics --entity-name my-topic
You should see output including entries for retention.ms
and retention.bytes
with the values you configured earlier.
Conclusion
Properly managing retention time for messages in Kafka is crucial for optimizing storage and ensuring data relevance. Tailoring Kafka’s retention policy based on your application’s requirements can lead to better performance and resource allocation. With the techniques covered in this tutorial, you should be equipped to configure time-based, size-based, and even compacted logs for your Kafka topics.