Kafka: How to Customize Start Offset for a Consumer

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a powerful streaming platform that enables you to process and analyze data in real-time. It’s built on the concept of producers writing data to topics and consumers reading from those topics. Understanding how to control the behavior of consumer applications plays a crucial role in the operation and reliability of the system. One aspect that often requires adjustment is the start offset configuration. This setting determines where a consumer starts reading messages from a Kafka topic. In this tutorial, we’ll dive into how to customize the start offset for Kafka consumers using various approaches.

Understanding Offsets in Kafka

In Kafka, every message in a partition has a unique and sequential id called an offset. Consumers have the ability to read records starting from a specific offset. By default, new consumer groups start consuming from the latest offset (meaning any new messages after the consumer group was created). However, there are scenarios where one might want to read from the earliest messages or from a specific offset.

Basic Offset Configuration

To begin, let’s look at how to use the ‘auto.offset.reset’ property to tell a Kafka consumer where to start if there is no initial offset or if the current offset no longer exists (for example, if it was deleted due to the retention policy).

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest"); // Can be "latest", "earliest", or "none"
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Here, ‘auto.offset.reset’ is set to ‘earliest’, which tells the consumer to start from the earliest offset available if an offset is not found. If you want the consumer to start from the latest offset, you can set this value to ‘latest’.

Subscribing to a Topic with Offset Control

Instead of just subscribing to a topic, you can request specific partitions and seek to a particular offset before starting to consume messages. This process provides you with complete control over where the consumptions begins.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition partition0 = new TopicPartition("myTopic", 0);
consumer.assign(Arrays.asList(partition0));

// Seeking to offset 10 of partition 0
consumer.seek(partition0, 10);

// Now consuming messages
while(true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

The ‘assign’ method connects the consumer to a specific TopicPartition object, and ‘seek’ sets the offset for the partition. You can set this to any valid offset, including ‘0’ to start from the beginning of the partition.

Advanced Offset Management

While the aforementioned methods are straightforward, sometimes you need to establish offsets programmatically based on conditions such as time, external systems, or complex logic. The KafkaConsumer API provides a method called ‘offsetsForTimes’ which allows you to look up the offset by timestamp for a set of partitions.

Map<TopicPartition, Long> targetTimes = new HashMap<>();
// Specify a timestamp for each partition of interest
targetTimes.put(new TopicPartition("myTopic", 0), targetTimestamp);
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(targetTimes);

// Seek to the returned offset
if (result != null) {
    for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : result.entrySet()) {
        consumer.seek(entry.getKey(), entry.getValue().offset());
    }
}

// Begin consuming messages
// ... consuming loop, as shown above ...

In this case, ‘targetTimestamp’ is the epoch time in milliseconds where you’d like to start consuming messages. This can be useful if your consumer logic is based on time-windows.

Storing Offsets Outside Kafka

In some advanced cases, you may need to store offsets outside of Kafka. This is typically done using an external storage system like a database or a distributed storage solution. Using the KafkaConsumer API, you turn off auto commit and manually commit the offsets based on the external store state.

props.put("enable.auto.commit", "false");
// ... other consumer configurations ...

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    // ... logic to determine starting offset based on the external store...
    TopicPartition partition0 = new TopicPartition("myTopic", 0);
    consumer.assign(Arrays.asList(partition0));
    consumer.seek(partition0, startingOffset);
    // ... consuming loop ...
}

This method offers the highest level of flexibility but also comes with the added complexity of managing the external offsets state.

Conclusion

Customizing the Kafka consumer’s start offset is essential for tailoring your applications to process data effectively and efficiently. The strategies discussed range from simple configuration changes to advanced manual control, giving developers and data engineers the tools they need to manage application state and ensure seamless data processing. Choose the method that aligns best with your system’s requirements and data processing goals.