Sling Academy
Home/DevOps/How to create a custom Kafka serializer and deserializer

How to create a custom Kafka serializer and deserializer

Last updated: January 31, 2024

Introduction

In the realm of big data streaming, Apache Kafka stands out as a highly scalable and fault-tolerant distributed event streaming platform, which is capable of handling trillions of events a day. Kafka facilitates real-time data feeds through producers and consumers, which respectively write to and read from Kafka topics. To efficiently transmit data over the network, Kafka relies on serializers to convert object data into bytes, and deserializers for the reverse process. Out of the box, Kafka provides default serializers for simple data types like strings and integers. However, for complex data types, or to implement custom serialization logic, you may need custom serializers and deserializers.

This tutorial will show you how to create custom serializers and deserializers for Kafka. We assume that you have a basic understanding of Kafka and Java.

Understanding Serialization and Deserialization

Serialization is the process of translating data structures or object state into a format that can be stored or transmitted and subsequently reconstructed. In Kafka, during the production of messages, serialization converts objects into byte arrays that can be sent through the network. Deserialization, on the other hand, is the reverse process where byte arrays are converted back into objects by consumers to be processed.

To illustrate, let’s say you have a Java class called Order that represents a purchase. The serializer’s job is to take the Order object and convert it into bytes, whereas the deserializer will take those bytes and reconstruct the Order object on the consumer’s side.

Setting Up Your Environment

Before you start, ensure you have the following installed:

  • Java Development Kit (JDK) – Version 8 or above
  • Apache Kafka – The latest version can be downloaded from the Apache website
  • Apache Maven – To manage project dependencies

Step 1: Define Your Custom Class

public class Order {
    private String id;
    private String product;
    private int quantity;

    // constructors, getters, setters and toString methods would be here
}

Step 2: Implement Your Custom Serializer

Implement a serializer by creating a new class that implements the org.apache.kafka.common.serialization.Serializer interface.

import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrderSerializer implements Serializer<Order> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configuration code can go here
    }

    @Override
    public byte[] serialize(String topic, Order data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }
}

The serialize method is where the conversion takes place. Here we are using library Jackson to handle the conversion of the Order object to a JSON string, and then to a byte array.

Step 3: Implement Your Custom Deserializer

Like the serializer, create a new class that implements org.apache.kafka.common.serialization.Deserializer.

import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrderDeserializer implements Deserializer<Order> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configuration code can go here
    }

    @Override
    public Order deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(data, Order.class);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }
}

The deserialize method takes a byte array and tries to convert it back into an Order object. Just like with the serializer, exception handling is crucial as the deserialization process can fail.

Step 4: Configure Kafka Producer and Consumer

The next step is to update your Kafka producer and consumer configuration to use your custom serializer and deserializer.

// Producer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "your.package.OrderSerializer");

// Consumer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "your.package.OrderDeserializer");

Replace your.package with the package name where your OrderSerializer and OrderDeserializer are located.

Testing the Custom SerDe

To ensure that your custom serializers and deserializers work as expected, you will need to write integration tests or run a Kafka cluster to test in a more realistic setting.

Conclusion

In this tutorial, you learned how to create custom serializers and deserializers in Kafka. With this knowledge, you can now handle more complex data types in your Kafka streams effectively. Don’t forget to handle serialization exceptions properly and clean up resources in the close methods to prevent memory leaks.

Next Article: Kafka: 3 ways to delete committed offsets for a consumer group

Previous Article: Kafka: How to change the number of partitions in a topic

Series: Apache Kafka Tutorials

DevOps

You May Also Like

  • How to reset Ubuntu to factory settings (4 approaches)
  • Making GET requests with cURL: A practical guide (with examples)
  • Git: What is .DS_Store and should you ignore it?
  • NGINX underscores_in_headers: Explained with examples
  • How to use Jenkins CI with private GitHub repositories
  • Terraform: Understanding State and State Files (with Examples)
  • SHA1, SHA256, and SHA512 in Terraform: A Practical Guide
  • CSRF Protection in Jenkins: An In-depth Guide (with examples)
  • Terraform: How to Merge 2 Maps
  • Terraform: How to extract filename/extension from a path
  • JSON encoding/decoding in Terraform: Explained with examples
  • Sorting Lists in Terraform: A Practical Guide
  • Terraform: How to trigger a Lambda function on resource creation
  • How to use Terraform templates
  • Understanding terraform_remote_state data source: Explained with examples
  • Jenkins Authorization: A Practical Guide (with examples)
  • Solving Jenkins Pipeline NotSerializableException: groovy.json.internal.LazyMap
  • Understanding Artifacts in Jenkins: A Practical Guide (with examples)
  • Using Jenkins with AWS EC2 and S3: A Practical Guide