Sling Academy
Home/DevOps/Understanding max.poll.interval.ms in Kafka (with examples)

Understanding max.poll.interval.ms in Kafka (with examples)

Last updated: January 30, 2024

Introduction

Apache Kafka is a distributed streaming platform that allows for high-throughput, fault-tolerant, publish-subscribe messaging. Kafka, at its core, is designed to provide a durable, scalable platform for handling streams of records. One of the critical configurations that helps ensure consumer reliability in Kafka is max.poll.interval.ms. This tutorial will cover what max.poll.interval.ms is, its implications, and how to use it effectively with various examples from basic to advanced configurations.

Understanding max.poll.interval.ms

In Kafka, the max.poll.interval.ms configuration specifies the maximum time a consumer can go without polling the broker for messages. If a consumer exceeds this time, the consumer is considered failed, and the group coordinator will initiate a rebalance of the consumer group, assigning partitions to other consumers in the group. This setting helps ensure that partitions are not left without a consumer due to failures or stalls in the processing pipeline.

Basic Example

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.interval.ms", "300000");  // 5 minutes
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

The above code snippet is a standard Kafka consumer configuration in Java, setting max.poll.interval.ms to 5 minutes.

Handling Long-Running Processing Logic

You may occasionally need a consumer to process data in a way that takes longer than the max.poll.interval.ms. To handle this, it’s necessary to implement a partition assignment strategy that allows for longer processing times while avoiding consumer timeouts.

void processRecordsWithLongProcessingTime(Map<String, ConsumerRecords<String, String>> records) {
    for (ConsumerRecord<String, String> record : records.values()) {
    // Perform long-running processing logic here.
    }
}

consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        offsets.clear();  // Clear stored offsets if any
        for (TopicPartition partition : partitions) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            processRecordsWithLongProcessingTime(records);
            final long lastOffset = records.records(partition)
                                            .get(records.count() - 1)
                                            .offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            offsets.put(partition, lastOffset + 1);
        }
    }
});

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    processRecordsWithLongProcessingTime(records);
}

In this example, each partition’s records are processed immediately upon being assigned, allowing processing to occur outside the constraints of max.poll.interval.ms.

Advanced Example: Custom Offset Management

There might be scenarios where you want to control when and how offsets are committed. In such cases, it’s essential to disable auto-commit and manually commit offsets while being aware of max.poll.interval.ms. Let’s look at an example:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            // Process each record
            // ...

            // Sync offset commit to prevent duplicate processing in the event of a rebalance
            consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
                                                         new OffsetAndMetadata(record.offset()+1)));
            // To account for long processing, check if the next poll is within max.poll.interval.ms
            if (/* Add condition to check for max.poll.interval.ms */)
                break;
        }
    }
} finally {
    consumer.close();
}

In complex applications, you must maintain progress and commit offsets to prevent message replay in case of consumer failure or rebalance.

Conclusion

Understanding and correctly configuring max.poll.interval.ms is vital for stable and reliable Kafka consumer applications. This parameter ensures that consumers that are unable to poll within a specified time frame are considered failed, thus allowing reassignment of their partitions and maintaining the overall health of the consumer group.

Next Article: Fixing kafka.common.InvalidMessageSizeException: Invalid Message Size

Previous Article: Understanding session.timeout.ms in Kafka (through examples)

Series: Apache Kafka Tutorials

DevOps

You May Also Like

  • How to reset Ubuntu to factory settings (4 approaches)
  • Making GET requests with cURL: A practical guide (with examples)
  • Git: What is .DS_Store and should you ignore it?
  • NGINX underscores_in_headers: Explained with examples
  • How to use Jenkins CI with private GitHub repositories
  • Terraform: Understanding State and State Files (with Examples)
  • SHA1, SHA256, and SHA512 in Terraform: A Practical Guide
  • CSRF Protection in Jenkins: An In-depth Guide (with examples)
  • Terraform: How to Merge 2 Maps
  • Terraform: How to extract filename/extension from a path
  • JSON encoding/decoding in Terraform: Explained with examples
  • Sorting Lists in Terraform: A Practical Guide
  • Terraform: How to trigger a Lambda function on resource creation
  • How to use Terraform templates
  • Understanding terraform_remote_state data source: Explained with examples
  • Jenkins Authorization: A Practical Guide (with examples)
  • Solving Jenkins Pipeline NotSerializableException: groovy.json.internal.LazyMap
  • Understanding Artifacts in Jenkins: A Practical Guide (with examples)
  • Using Jenkins with AWS EC2 and S3: A Practical Guide