Kafka: How to access record headers and metadata

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a powerful distributed streaming platform that enables you to publish and subscribe to streams of records. In addition to the key, value, and timestamp that most developers are familiar with, Kafka records can carry headers and metadata that provide additional information and context. In this tutorial, we’ll go through how to access and utilize these headers and metadata within Kafka messages.

Prerequisites

Before diving in, make sure you have the following installed:

  • Java Development Kit (JDK) 8 or later
  • Apache Kafka 0.11.0.0 or later

A Basic Understanding of Kafka Record Headers

Kafka 0.11.0.0 introduced the capability to include headers in Kafka records. Headers are key-value pairs, with the key being a string and the value being a byte array. Here’s how you might produce a message with headers:

// Import necessary classes
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;
import java.util.Properties;

// Set up properties for the Kafka producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Create the producer
Producer<String, String> producer = new KafkaProducer<>(props);

// Create headers
Header header1 = new RecordHeader("header_key", "header_value".getBytes());

// Create a producer record with headers
ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", null, "your_key", "your_value", Arrays.asList(header1));

// Send the record
producer.send(record);

// Close the producer
producer.close();

With this basic understanding, we can now look into how to consume such messages and access their headers.

Consuming Messages and Accessing Headers

To access the headers while consuming messages, you’ll utilize the ConsumerRecord class. Below is an example of how to consume messages and print their headers:

// Import necessary classes
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.header.Header;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

// Set up properties for the Kafka consumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Create the consumer
Consumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to the topic
consumer.subscribe(Collections.singletonList("your_topic"));

// Consume messages
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
  // Access the headers
  Iterable<Header> headers = record.headers();
  headers.forEach(header -> {
      System.out.printf("Header Key: %s, Header Value: %s%n", header.key(), new String(header.value()));
  });
}

// Close the consumer
consumer.close();

By using the headers method on the ConsumerRecord, you can easily retrieve all the headers associated with a message.

Advanced Header Operations

In more complex scenarios, you may need to conduct additional operations on headers such as filtering or modification. Here’s an example of consuming messages and modifying the headers before forwarding them:

// Assuming the setup from previous examples...

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
    // Modify headers
    Header newHeader = new RecordHeader("new_header_key", "new_header_value".getBytes());
    record.headers().add(newHeader);

    // Forward the record with modified headers to another topic or perform other processing
}

In addition to manipulating headers, you can also deal with metadata such as the topic or partition from which the record was consumed, the offset of the record within the partition, and the record’s timestamp.

Accessing Metadata

Metadata associated with a Kafka message can provide valuable context about the data you’re processing. The ConsumerRecord class exposes methods that allow you to access this metadata:

// Continuing from the consuming example above:

for (ConsumerRecord<String, String> record : records) {
    System.out.println("Record Topic: " + record.topic());
    System.out.println("Record Partition: " + record.partition());
    System.out.println("Record Offset: " + record.offset());
    System.out.println("Record Timestamp: " + record.timestamp());
}

These properties can be particularly useful when troubleshooting or when your processing logic needs to change behavior based on where or when a message was sent.

Conclusion

In this tutorial, we’ve explored how to access and utilize headers and metadata within Kafka records. Understanding and using these valuable pieces of information can significantly enhance your Kafka-based applications.