How to Serialize and Deserialize Messages in Kafka

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a distributed event streaming platform used extensively in modern data architectures. Kafka’s ability to handle high throughput of messages has made it indispensable for real-time analytics, data integration, and application logging. A vital but often overlooked aspect of working with Kafka is serialization and deserialization – the process to convert data structures or object states into a binary or text format that can be stored or transmitted (serialization) and later reconstructed (deserialization). This tutorial will explore various methods on how to serialize and deserialize messages in Kafka with practical code examples.

Prerequisites:

  • Basic understanding of Apache Kafka
  • Familiarity with Java programming language
  • Access to a Kafka broker for testing

Understanding Serialization Formats

In Kafka, the most common serialization formats are:

  • String Serialization
  • Byte Array Serialization
  • JSON Serialization
  • Avro Serialization
  • Custom Serialization

String and Byte Array Serialization

String Serialization in Producer

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("topic", "key", "value"));
producer.close();

Above, the KafkaProducer is configured with String serializers for both the key and value.

Byte Array Serialization in Producer

If the message consists of binary data, using byte array serialization is relevant. Here is how one may configure a producer for byte array serialization.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
producer.send(new ProducerRecord<byte[], byte[]>("topic", key, value));
producer.close();

JSON Serialization

Using JSON libraries

For JSON serialization, you might use popular libraries like Jackson or Gson. Let’s look at an example using Jackson.

ObjectMapper objectMapper = new ObjectMapper();
String stringValue = objectMapper.writeValueAsString(myObject);

Now, the stringValue can be sent as a message to Kafka as we did earlier with String serialization.

Custom JSON Serializer

If your use case requires a custom JSON serializer, you’ll have to implement the Serializer interface provided by Kafka’s API.

public class CustomObjectSerializer implements Serializer<CustomObject> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public byte[] serialize(String topic, CustomObject data) {
    try {
        return objectMapper.writeValueAsBytes(data);
    } catch (Exception e) {
        throw new SerializationException("Error serializing JSON message", e);
    }
}

@Override
public void close() {}
}

To use this serializer:

props.put("value.serializer", "com.example.CustomObjectSerializer");

Avro Serialization

The Avro format offers a compact and fast binary serialization ideal for Kafka messages. The accompanying schema helps in maintaining the contract between producer and consumer.

Using Avro

Incorporating Avro serialization into your Kafka producer looks something like this:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
Schema.Parser parser = new Schema.Parser();

String userSchema = "{"type":"record","name":"User","fields":[{"name":"name","type":"string"}, {"name":"age","type":"int"}]}
Schema schema = parser.parse(userSchema);

GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "John Doe");
user1.put("age", 28);

producer.send(new ProducerRecord<String, GenericRecord>("users", user1);
producer.close();

Deserialization in Kafka

Deserialization in Kafka Consumers is handled much the same way as serialization in producers. For instance, here’s how you might set up a consumer using the same custom object class that was serialized.

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test-group");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "com.example.CustomObjectDeserializer");

KafkaConsumer<String, CustomObject> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));

while (true) {
    ConsumerRecords<String, CustomObject> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, CustomObject> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

Avro Deserialization

And for Avro deserialization with Confluent’s schema registry:

props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
// the rest of the consumer setup and polling loop is the same

Advanced Serialization Techniques

For advanced use-cases, you can employ techniques like schema evolution with Avro or leverage Kafka Streams for handling serialization transparently as a part of the stream processing.

Below is a Java code example that demonstrates an advanced use-case with Kafka, specifically using Avro for schema evolution and Kafka Streams for transparent serialization within stream processing. This example assumes you have a Kafka cluster and Schema Registry set up and running.

Java Kafka Example: Avro with Kafka Streams

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;

import java.util.Properties;

public class KafkaAvroStreamsExample {
    public static void main(String[] args) {
        // Configuration for Kafka Streams
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("application.id", "kafka-avro-streams");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

        // Create StreamBuilder
        StreamsBuilder builder = new StreamsBuilder();
        
        // Define the source of the stream
        KStream<String, GenericRecord> avroStream = builder.stream(
            "avro-topic",
            Consumed.with(Serdes.String(), new GenericAvroSerde())
        );

        // Process the stream (e.g., filtering, mapping, aggregating)
        avroStream.mapValues(value -> {
            // Process each record (e.g., transform or enrich the data)
            // ...
            return value;
        });

        // Define the topology
        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        // Start processing
        streams.start();

        // Add shutdown hook for graceful closure
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Explanation:

  • Avro Serialization: The GenericAvroSerde class from Confluent’s Kafka Avro Serializer package is used for serialization and deserialization of Kafka records using Avro schemas. This handles schema evolution, where the schema of the data can change over time.
  • Kafka Streams: Kafka Streams API is used to define and execute the stream processing logic. It transparently handles the serialization and deserialization of data as it flows through the stream processing pipeline.
  • Configuration: The properties are set for the Kafka Streams application, including the Schema Registry URL and Avro deserialization settings.
  • Stream Processing: The KStream is used to define the source (input topic) and the processing steps (e.g., mapValues to transform each record). You can extend this to include filtering, aggregating, or joining with other streams.
  • Graceful Shutdown: A shutdown hook is added to ensure the stream processing stops gracefully.

Requirements:

  • Kafka Cluster: A running Kafka cluster is required.
  • Schema Registry: A Schema Registry service must be running and accessible.
  • Dependencies: Include Kafka Streams and Confluent Schema Registry client libraries in your project.

Note:

  • This example provides a basic framework. You’ll need to adapt and expand upon this code to fit your specific use case, especially the stream processing logic.
  • Ensure that the topic avro-topic exists in your Kafka cluster and is configured to use Avro serialization.

Conclusion

Effective serialization and deserialization strategies are crucial for the efficient transmission of messages within a Kafka cluster. Whether basic or advanced encoding is employed, understanding these mechanisms will ensure that data is not only transmitted efficiently but is also compatible between producer and consumer applications.