Understanding max.poll.interval.ms in Kafka (with examples)

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a distributed streaming platform that allows for high-throughput, fault-tolerant, publish-subscribe messaging. Kafka, at its core, is designed to provide a durable, scalable platform for handling streams of records. One of the critical configurations that helps ensure consumer reliability in Kafka is max.poll.interval.ms. This tutorial will cover what max.poll.interval.ms is, its implications, and how to use it effectively with various examples from basic to advanced configurations.

Understanding max.poll.interval.ms

In Kafka, the max.poll.interval.ms configuration specifies the maximum time a consumer can go without polling the broker for messages. If a consumer exceeds this time, the consumer is considered failed, and the group coordinator will initiate a rebalance of the consumer group, assigning partitions to other consumers in the group. This setting helps ensure that partitions are not left without a consumer due to failures or stalls in the processing pipeline.

Basic Example

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.interval.ms", "300000");  // 5 minutes
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

The above code snippet is a standard Kafka consumer configuration in Java, setting max.poll.interval.ms to 5 minutes.

Handling Long-Running Processing Logic

You may occasionally need a consumer to process data in a way that takes longer than the max.poll.interval.ms. To handle this, it’s necessary to implement a partition assignment strategy that allows for longer processing times while avoiding consumer timeouts.

void processRecordsWithLongProcessingTime(Map<String, ConsumerRecords<String, String>> records) {
    for (ConsumerRecord<String, String> record : records.values()) {
    // Perform long-running processing logic here.
    }
}

consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        offsets.clear();  // Clear stored offsets if any
        for (TopicPartition partition : partitions) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            processRecordsWithLongProcessingTime(records);
            final long lastOffset = records.records(partition)
                                            .get(records.count() - 1)
                                            .offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            offsets.put(partition, lastOffset + 1);
        }
    }
});

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    processRecordsWithLongProcessingTime(records);
}

In this example, each partition’s records are processed immediately upon being assigned, allowing processing to occur outside the constraints of max.poll.interval.ms.

Advanced Example: Custom Offset Management

There might be scenarios where you want to control when and how offsets are committed. In such cases, it’s essential to disable auto-commit and manually commit offsets while being aware of max.poll.interval.ms. Let’s look at an example:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            // Process each record
            // ...

            // Sync offset commit to prevent duplicate processing in the event of a rebalance
            consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
                                                         new OffsetAndMetadata(record.offset()+1)));
            // To account for long processing, check if the next poll is within max.poll.interval.ms
            if (/* Add condition to check for max.poll.interval.ms */)
                break;
        }
    }
} finally {
    consumer.close();
}

In complex applications, you must maintain progress and commit offsets to prevent message replay in case of consumer failure or rebalance.

Conclusion

Understanding and correctly configuring max.poll.interval.ms is vital for stable and reliable Kafka consumer applications. This parameter ensures that consumers that are unable to poll within a specified time frame are considered failed, thus allowing reassignment of their partitions and maintaining the overall health of the consumer group.