How to create a custom Kafka serializer and deserializer

Updated: January 31, 2024 By: Guest Contributor Post a comment

Introduction

In the realm of big data streaming, Apache Kafka stands out as a highly scalable and fault-tolerant distributed event streaming platform, which is capable of handling trillions of events a day. Kafka facilitates real-time data feeds through producers and consumers, which respectively write to and read from Kafka topics. To efficiently transmit data over the network, Kafka relies on serializers to convert object data into bytes, and deserializers for the reverse process. Out of the box, Kafka provides default serializers for simple data types like strings and integers. However, for complex data types, or to implement custom serialization logic, you may need custom serializers and deserializers.

This tutorial will show you how to create custom serializers and deserializers for Kafka. We assume that you have a basic understanding of Kafka and Java.

Understanding Serialization and Deserialization

Serialization is the process of translating data structures or object state into a format that can be stored or transmitted and subsequently reconstructed. In Kafka, during the production of messages, serialization converts objects into byte arrays that can be sent through the network. Deserialization, on the other hand, is the reverse process where byte arrays are converted back into objects by consumers to be processed.

To illustrate, let’s say you have a Java class called Order that represents a purchase. The serializer’s job is to take the Order object and convert it into bytes, whereas the deserializer will take those bytes and reconstruct the Order object on the consumer’s side.

Setting Up Your Environment

Before you start, ensure you have the following installed:

  • Java Development Kit (JDK) – Version 8 or above
  • Apache Kafka – The latest version can be downloaded from the Apache website
  • Apache Maven – To manage project dependencies

Step 1: Define Your Custom Class

public class Order {
    private String id;
    private String product;
    private int quantity;

    // constructors, getters, setters and toString methods would be here
}

Step 2: Implement Your Custom Serializer

Implement a serializer by creating a new class that implements the org.apache.kafka.common.serialization.Serializer interface.

import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrderSerializer implements Serializer<Order> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configuration code can go here
    }

    @Override
    public byte[] serialize(String topic, Order data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }
}

The serialize method is where the conversion takes place. Here we are using library Jackson to handle the conversion of the Order object to a JSON string, and then to a byte array.

Step 3: Implement Your Custom Deserializer

Like the serializer, create a new class that implements org.apache.kafka.common.serialization.Deserializer.

import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrderDeserializer implements Deserializer<Order> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configuration code can go here
    }

    @Override
    public Order deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(data, Order.class);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }
}

The deserialize method takes a byte array and tries to convert it back into an Order object. Just like with the serializer, exception handling is crucial as the deserialization process can fail.

Step 4: Configure Kafka Producer and Consumer

The next step is to update your Kafka producer and consumer configuration to use your custom serializer and deserializer.

// Producer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "your.package.OrderSerializer");

// Consumer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "your.package.OrderDeserializer");

Replace your.package with the package name where your OrderSerializer and OrderDeserializer are located.

Testing the Custom SerDe

To ensure that your custom serializers and deserializers work as expected, you will need to write integration tests or run a Kafka cluster to test in a more realistic setting.

Conclusion

In this tutorial, you learned how to create custom serializers and deserializers in Kafka. With this knowledge, you can now handle more complex data types in your Kafka streams effectively. Don’t forget to handle serialization exceptions properly and clean up resources in the close methods to prevent memory leaks.