Understanding Kafka max.poll.records (with examples)

Updated: January 30, 2024 By: Guest Contributor Post a comment

Overview

Apache Kafka is a highly popular distributed streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka consumers read records from Kafka topics. One important configuration parameter you need to understand to fine-tune consumer performance is max.poll.records. In this tutorial, we’ll dive deep into the implications of this parameter and demonstrate its impact with practical examples.

What is max.poll.records?

The max.poll.records configuration controls the maximum number of records that a Kafka consumer can retrieve in a single call to poll(). By adapting this value, you can manipulate consumer throughput and concurrency to match your application’s latency and processing power requirements.

Default Behavior

The default value of max.poll.records is 500. This means that by default, the poll() method will retrieve up to 500 records at a time. Let‘s start with a basic example of how to create a Kafka consumer and leave the max.poll.records to default:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class DefaultConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("test-topic"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                });
            }
        }
    }
}

If you run the above example, the consumer will consume messages from the specified topic and process up to 500 records per poll invocation.

Customizing max.poll.records

Adjusting max.poll.records allows you to control consumer throughput. Let’s see how you can customize this setting:

Properties props = new Properties();
// ... other configuration settings

// Set max.poll.records to 100
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

// Create the consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Use consumer...

In this configuration, the consumer is more responsive and consumes fewer records per poll, which might be useful if each record requires substantial processing time.

Balancing max.poll.records and max.poll.interval.ms

It’s important to find a balance between max.poll.records and another configuration parameter: max.poll.interval.ms, which sets the maximum delay between invocations of poll() calls. If processing the number of records defined in max.poll.records takes longer than max.poll.interval.ms, the consumer can be perceived as failed and leave the group. Adjust accordingly:

Properties props = new Properties();
// ... other configuration properties

// Allow a longer interval between polls
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000");

// Proceed with consumer creation and usage

Advanced Usage: Fine-tuning for Performance and Stability

In more complex scenarios, you might need to adjust other consumer settings according to the changes in max.poll.records. This section will cover advanced examples considering different use cases.

Example: Batch Processing with High Throughput

If you are processing records in batches to achieve high throughput, you could benefit from increasing max.poll.records coupled with batch processing logic:

Properties props = new Properties();
// ... more settings

// Configure batch size for high throughput
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000");

// Extend processing time before next poll
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");

// Consumer creation and batch processing logic below

With such a configuration, you ensure that the consumer has enough records to process in batches and enough time to process them without causing a re-balance.

Example: Real-time Processing with Low Latency

For applications that need to deliver new data with low latency, you should aim for a smaller max.poll.records:

Properties props = new Properties();
// ... other properties
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

// Consumer creation and subsequent real-time processing

This ensures that the consumer consumes records quickly and more frequently, thus reducing latency.

Notes

Error Handling: If the consumer cannot complete processing within the interval, you’ll need to handle related errors. This is an aspect that should be kept in mind while tweaking max.poll.records.

Monitoring & Optimization: Monitoring the consumer lag and throughput is crucial when fine-tuning the max.poll.records. For optimizing the configurations, you may need to apply different values and monitor the results to find the sweet spot for your use case.

Conclusion

In conclusion, max.poll.records is a powerful setting in Kafka consumers that can help control throughput and latency of message processing. It’s essential to understand and fine-tune this and other related settings for stable and efficient Kafka operation tailored to your specific needs.