Sling Academy
Home/DevOps/Kafka: How to access record headers and metadata

Kafka: How to access record headers and metadata

Last updated: January 30, 2024

Introduction

Apache Kafka is a powerful distributed streaming platform that enables you to publish and subscribe to streams of records. In addition to the key, value, and timestamp that most developers are familiar with, Kafka records can carry headers and metadata that provide additional information and context. In this tutorial, we’ll go through how to access and utilize these headers and metadata within Kafka messages.

Prerequisites

Before diving in, make sure you have the following installed:

  • Java Development Kit (JDK) 8 or later
  • Apache Kafka 0.11.0.0 or later

A Basic Understanding of Kafka Record Headers

Kafka 0.11.0.0 introduced the capability to include headers in Kafka records. Headers are key-value pairs, with the key being a string and the value being a byte array. Here’s how you might produce a message with headers:

// Import necessary classes
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;
import java.util.Properties;

// Set up properties for the Kafka producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Create the producer
Producer<String, String> producer = new KafkaProducer<>(props);

// Create headers
Header header1 = new RecordHeader("header_key", "header_value".getBytes());

// Create a producer record with headers
ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", null, "your_key", "your_value", Arrays.asList(header1));

// Send the record
producer.send(record);

// Close the producer
producer.close();

With this basic understanding, we can now look into how to consume such messages and access their headers.

Consuming Messages and Accessing Headers

To access the headers while consuming messages, you’ll utilize the ConsumerRecord class. Below is an example of how to consume messages and print their headers:

// Import necessary classes
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.header.Header;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

// Set up properties for the Kafka consumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Create the consumer
Consumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to the topic
consumer.subscribe(Collections.singletonList("your_topic"));

// Consume messages
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
  // Access the headers
  Iterable<Header> headers = record.headers();
  headers.forEach(header -> {
      System.out.printf("Header Key: %s, Header Value: %s%n", header.key(), new String(header.value()));
  });
}

// Close the consumer
consumer.close();

By using the headers method on the ConsumerRecord, you can easily retrieve all the headers associated with a message.

Advanced Header Operations

In more complex scenarios, you may need to conduct additional operations on headers such as filtering or modification. Here’s an example of consuming messages and modifying the headers before forwarding them:

// Assuming the setup from previous examples...

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
    // Modify headers
    Header newHeader = new RecordHeader("new_header_key", "new_header_value".getBytes());
    record.headers().add(newHeader);

    // Forward the record with modified headers to another topic or perform other processing
}

In addition to manipulating headers, you can also deal with metadata such as the topic or partition from which the record was consumed, the offset of the record within the partition, and the record’s timestamp.

Accessing Metadata

Metadata associated with a Kafka message can provide valuable context about the data you’re processing. The ConsumerRecord class exposes methods that allow you to access this metadata:

// Continuing from the consuming example above:

for (ConsumerRecord<String, String> record : records) {
    System.out.println("Record Topic: " + record.topic());
    System.out.println("Record Partition: " + record.partition());
    System.out.println("Record Offset: " + record.offset());
    System.out.println("Record Timestamp: " + record.timestamp());
}

These properties can be particularly useful when troubleshooting or when your processing logic needs to change behavior based on where or when a message was sent.

Conclusion

In this tutorial, we’ve explored how to access and utilize headers and metadata within Kafka records. Understanding and using these valuable pieces of information can significantly enhance your Kafka-based applications.

Next Article: Fixing Kafka Error: Couldn’t find or load main class QuorumPeerMain (4 solutions)

Previous Article: Kafka: How to Customize Start Offset for a Consumer

Series: Apache Kafka Tutorials

DevOps

You May Also Like

  • How to reset Ubuntu to factory settings (4 approaches)
  • Making GET requests with cURL: A practical guide (with examples)
  • Git: What is .DS_Store and should you ignore it?
  • NGINX underscores_in_headers: Explained with examples
  • How to use Jenkins CI with private GitHub repositories
  • Terraform: Understanding State and State Files (with Examples)
  • SHA1, SHA256, and SHA512 in Terraform: A Practical Guide
  • CSRF Protection in Jenkins: An In-depth Guide (with examples)
  • Terraform: How to Merge 2 Maps
  • Terraform: How to extract filename/extension from a path
  • JSON encoding/decoding in Terraform: Explained with examples
  • Sorting Lists in Terraform: A Practical Guide
  • Terraform: How to trigger a Lambda function on resource creation
  • How to use Terraform templates
  • Understanding terraform_remote_state data source: Explained with examples
  • Jenkins Authorization: A Practical Guide (with examples)
  • Solving Jenkins Pipeline NotSerializableException: groovy.json.internal.LazyMap
  • Understanding Artifacts in Jenkins: A Practical Guide (with examples)
  • Using Jenkins with AWS EC2 and S3: A Practical Guide