Kafka: How to read records in JSON format

Updated: January 31, 2024 By: Guest Contributor Post a comment

Overview

Apache Kafka is a powerful distributed streaming platform that allows you to work with high volumes of data in real-time. Working with Kafka often involves reading and writing records in different formats, and one of the most common formats for data representation is JSON (JavaScript Object Notation). This tutorial will help you understand how to read records in JSON format from a Kafka topic using the Kafka Consumers API with code examples that range from basic to advanced implementations.

Understanding Kafka Consumers

A Kafka Consumer is an application that reads records from one or more Kafka topics. Consumers can work in groups to share a workload or independently. To read records in JSON format, a consumer must understand how to deserialize the binary data received from a Kafka topic into JSON records.

Prerequisites

  • Apache Kafka installed and running
  • Topic created with JSON formatted messages
  • Java environment set-up (since examples are in Java using the Kafka Clients library)
  • Maven or a similar build tool to manage dependencies

Setting up Your Kafka Consumer

Before you can read records in JSON format, you will need to set up a Kafka consumer. Let’s start by adding the kafka-clients dependency to your Maven pom.xml:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>[insert the latest version here]</version>
</dependency>

Next, initialize a KafkaConsumer with the proper configurations:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

// Initialize consumer properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Create the KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Creating a JSON Deserializer

Kafka does not natively understand JSON, so we read data as a string and then convert it into a JSON object. To simplify this process, you can implement a custom deserializer. Suppose we are using Google’s Gson library for this:

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;

public class JsonDeserializer<T> implements Deserializer<T> {
    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
        if (bytes == null) return null;
        return gson.fromJson(new String(bytes), deserializedClass);
    }
}

Add the Gson dependency to your pom.xml as well:

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>[insert latest version]</version>
</dependency>

Using the Consumer to Read JSON Records

With the consumer and deserializer set up, you can start reading records. You’ll need to subscribe to the topic from which you want to consume records and run the consumer in a loop:

// Define your data class to match the JSON structure
class MyRecord {
    private String field1;
    private int field2;
    // Getters and setters omitted for brevity
}

// Subscribe to topic
consumer.subscribe(Collections.singletonList("your-topic"));

// Poll for new records
while (true) {
    ConsumerRecords<String, MyRecord> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, MyRecord> record : records) {
        MyRecord jsonData = record.value();
        // Process your JSON object
    }
}

Advanced Deserialization

Selective consumption is a common requirement in more complex applications. Using Kafka Streams with a custom deserialization schema can provide the flexibility needed:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.common.serialization.Serdes;

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MyRecord> kStream = builder.stream("your-topic", Consumed.with(Serdes.String(), new JsonSerde<>(MyRecord.class)));

kStream.foreach((key, value) -> {
    // process each record as a MyRecord instance
});

// Start the stream
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

This approach optimizes record processing and handling within a Kafka Streams application.

Conclusion

This tutorial explored how to read and process JSON formatted Kafka records step by step. By implementing and utilizing a custom JSON deserializer, you can integrate your Kafka data with JSON-based systems smoothly and efficiently. Whether you’re working with simple consumer applications or complex Kafka Streams data processing, handling JSON records is a crucial skill in today’s data-intensive environment.