How to Reset Consumer Offsets in Kafka

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a popular distributed streaming platform that allows for the handling of large amounts of real-time data. In Kafka, consumer groups read data in topics partitioned for scalability and parallel processing. Each consumer group tracks the messages it has consumed by storing an offset, which points to the last message it has read. However, sometimes we need to reset these offsets, for instance, when reprocessing messages due to an error or a change in the processing logic.

This tutorial will walk you through how to reset consumer offsets in Kafka using several approaches, ranging from basic methods provided by Kafka to more advanced custom ones, aided by code examples and explanations.

Prerequisites

  • Working Apache Kafka cluster
  • Kafka command-line tools
  • Basic understanding of Kafka topics and consumer groups.

Understanding Offsets in Kafka

Before diving into offset resets, it’s important to understand that in Kafka, offsets are a sequential ID number given to each record within a partition. The consumer reads records in the order they were stored. As it processes each record, it advances its offset. Should the consumer disconnect and reconnect, it picks up where it left off by reading the next offset.

Reset Using the Kafka-consumer-groups.sh Script

The kafka-consumer-groups.sh script, which comes with the Kafka distribution, can be used to list, describe, or reset consumer offsets. The basic command structure for resetting an offset is:

./bin/kafka-consumer-groups.sh --bootstrap-server <broker> --group <consumer-group> --topic <topic> --reset-offsets --to-offset <new-offset>

This command sets the offset for a specific topic and consumer group to the specified <new-offset>.

Reset to Earliest or Latest

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-earliest
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-latest

These commands reset the offset to the earliest or the latest message in the partition. This is useful when you want to reprocess all messages (--to-earliest) or skip all messages to start consuming from the newest ones (--to-latest).

Shifting Offsets

If you need to adjust the offset by a certain amount, you can shift it using the --shift-by argument as shown:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --shift-by -2

This shifts the offset back by 2, which is equivalent to reprocessing the last two messages. Positive values move the offset forward.

Advanced Offset Management

For more granular control, you might choose to reset offsets to a specific timestamp. The following command achieves this:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-datetime 2020-01-01T00:00:00.000

This resets the offset to the first message after the specified timestamp, allowing you to reprocess messages from a certain point in time.

Executing Offset Resets in Dry-run Mode

Before committing to an offset reset, it’s wise to perform a dry-run which simulates the reset and shows what offsets would be applied without making any actual changes:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-earliest --dry-run

This command outputs where offsets would be reset to, if the reset command were to be run without the --dry-run flag.

Automating Offsets Reseting with Code

If you’re looking for even more advanced or programmable control over offset resetting, you can directly manipulate offsets using Kafka’s consumer API. Here’s an example in Java that shows how to reset offsets using the Kafka Client API:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList("my-topic"));
    consumer.poll(Duration.ofMillis(100));

    Set assignedPartitions = consumer.assignment();
    consumer.seekToBeginning(assignedPartitions);
    // Or use seek() to set precise offset per partition, e.g., consumer.seek(new TopicPartition("my-topic", 0), 10L);
}

This code connects to the Kafka cluster, subscribes to “my-topic”, seeks to the beginning of each assigned partition (rewinding all offsets to the earliest), and then exits. For production environments, you would need to add your own logic to determine the new offsets based on requirements.

Conclusion

Understanding and managing consumer offsets are crucial for effectively processing messages in Kafka. By utilizing tools like kafka-consumer-groups.sh or the Kafka API, you can remain flexible and reactive to any situation requiring offset resets, ensuring data is processed exactly as needed.