Kafka: Limit the memory used for buffering across all threads

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a distributed streaming platform that is widely used for building real-time data pipelines and streaming applications. One of the key aspects of operating a Kafka cluster is managing the memory used for buffering, especially when dealing with large data volumes or high-throughput scenarios. This tutorial delves into strategies to limit the memory used for buffering across all threads in Kafka, optimizing its performance and preventing issues due to excessive memory consumption.

Understanding Kafka Memory Buffering

Kafka broker memory is primarily consumed by the network layer, the I/O threads, and the request processing threads. Buffer memory settings allow Kafka to allocate memory for read and write operations, ensuring efficient handling of producer and consumer requests. However, if not configured properly, buffering can lead to excessive memory usage, leaving less memory available for other essential tasks or even resulting in OutOfMemoryErrors.

Configuring the Producer

The Kafka producer uses a buffer to store records that are not yet sent to the server. To limit the producer’s buffer memory, adjust the buffer.memory configuration. This setting defines the total memory available for the producer for buffering.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("buffer.memory", 33554432); //32MB
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

This code snippet sets the buffer.memory to 32MB. Adjust this setting based on your requirements and available system memory.

Configuring the Consumer

The Kafka consumer also has configurations that affect memory usage. The fetch.max.bytes setting limits the number of bytes the server should return for a fetch request. The max.partition.fetch.bytes limits the maximum bytes per partition returned by a single fetch request.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("fetch.max.bytes", 52428800); //50MB
props.put("max.partition.fetch.bytes", 1048576); //1MB
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Please note that too small values might increase the number of fetch requests and negatively impact consumer throughput.

Adjusting the Broker Configuration

On the broker side, there are several configurations that can help manage memory usage. The queued.max.requests controls the maximum number of requests that can be queued without processing by the I/O threads. Reducing this limit helps ensure that excessive memory is not used to queue up pending requests.

# server.properties
queued.max.requests=500

The socket.request.max.bytes limits the size of a request. If a request is larger than this value, the broker will not accept it.

# server.properties
socket.request.max.bytes=104857600 #100MB

By setting appropriate values for these configurations, you can prevent the broker from running out of memory under high load conditions.

Monitoring and Managing Memory Usage

Monitoring Kafka’s memory usage is essential to ensure that the system remains stable and performs well. Tools such as JConsole, VisualVM or Kafka’s own JMX metrics can be employed to monitor memory usage in real-time. Memory management also involves tuning Java’s garbage collection to minimize the impact on Kafka’s performance. For most use cases, the Concurrent Mark Sweep (CMS) garbage collector works well with Kafka.

KAFKA_HEAP_OPTS="-Xmx4G -Xms4G -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export KAFKA_HEAP_OPTS

Here, -Xmx and -Xms set the maximum and the initial size of the heap, respectively. The options related to the CMS garbage collector help in managing heap usage more effectively.

Advanced Configuration: Implementing Throttling

Throttling is an advanced feature that can restrict the data rate for either the entire broker or specific clients. Broker-wide bandwidth throttling is done using the leader.replication.throttled.rate and follower.replication.throttled.rate settings.

# server.properties
leader.replication.throttled.rate=1048576
follower.replication.throttled.rate=1048576

Throttling at the client level is done via the client.quota.callback.class configuration. By implementing a custom quota callback class, you can enforce dynamic throttling based on various criteria.

Conclusion

This guide has outlined practical steps to manage and limit Kafka’s memory usage across buffering threads. By carefully configuring producers, consumers, and brokers as well as monitoring memory metrics, you can ensure that Kafka performs optimally within your system’s resource constraints.