Overview
Apache Kafka is a highly popular distributed streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka consumers read records from Kafka topics. One important configuration parameter you need to understand to fine-tune consumer performance is max.poll.records
. In this tutorial, we’ll dive deep into the implications of this parameter and demonstrate its impact with practical examples.
What is max.poll.records?
The max.poll.records
configuration controls the maximum number of records that a Kafka consumer can retrieve in a single call to poll()
. By adapting this value, you can manipulate consumer throughput and concurrency to match your application’s latency and processing power requirements.
Default Behavior
The default value of max.poll.records
is 500. This means that by default, the poll()
method will retrieve up to 500 records at a time. Let‘s start with a basic example of how to create a Kafka consumer and leave the max.poll.records
to default:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class DefaultConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
});
}
}
}
}
If you run the above example, the consumer will consume messages from the specified topic and process up to 500 records per poll invocation.
Customizing max.poll.records
Adjusting max.poll.records
allows you to control consumer throughput. Let’s see how you can customize this setting:
Properties props = new Properties();
// ... other configuration settings
// Set max.poll.records to 100
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Create the consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Use consumer...
In this configuration, the consumer is more responsive and consumes fewer records per poll, which might be useful if each record requires substantial processing time.
Balancing max.poll.records and max.poll.interval.ms
It’s important to find a balance between max.poll.records
and another configuration parameter: max.poll.interval.ms
, which sets the maximum delay between invocations of poll()
calls. If processing the number of records defined in max.poll.records
takes longer than max.poll.interval.ms
, the consumer can be perceived as failed and leave the group. Adjust accordingly:
Properties props = new Properties();
// ... other configuration properties
// Allow a longer interval between polls
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000");
// Proceed with consumer creation and usage
Advanced Usage: Fine-tuning for Performance and Stability
In more complex scenarios, you might need to adjust other consumer settings according to the changes in max.poll.records
. This section will cover advanced examples considering different use cases.
Example: Batch Processing with High Throughput
If you are processing records in batches to achieve high throughput, you could benefit from increasing max.poll.records
coupled with batch processing logic:
Properties props = new Properties();
// ... more settings
// Configure batch size for high throughput
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000");
// Extend processing time before next poll
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
// Consumer creation and batch processing logic below
With such a configuration, you ensure that the consumer has enough records to process in batches and enough time to process them without causing a re-balance.
Example: Real-time Processing with Low Latency
For applications that need to deliver new data with low latency, you should aim for a smaller max.poll.records
:
Properties props = new Properties();
// ... other properties
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
// Consumer creation and subsequent real-time processing
This ensures that the consumer consumes records quickly and more frequently, thus reducing latency.
Notes
Error Handling: If the consumer cannot complete processing within the interval, you’ll need to handle related errors. This is an aspect that should be kept in mind while tweaking max.poll.records
.
Monitoring & Optimization: Monitoring the consumer lag and throughput is crucial when fine-tuning the max.poll.records
. For optimizing the configurations, you may need to apply different values and monitor the results to find the sweet spot for your use case.
Conclusion
In conclusion, max.poll.records
is a powerful setting in Kafka consumers that can help control throughput and latency of message processing. It’s essential to understand and fine-tune this and other related settings for stable and efficient Kafka operation tailored to your specific needs.