Kafka: How to add events to a topic with a timestamp

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka has become the backbone of many modern data-driven applications because of its capability to handle high-throughput, resilient distributed streaming. Timestamps play a crucial role in Kafka messages, serving various purposes like event time processing, log retention, and message ordering. In this tutorial, we’ll dive deep into Kafka and understand how to add events to a topic with a timestamp, walking through basic to advanced examples.

Understanding Kafka Timestamps

Each event in Kafka, also referred to as a message, can carry a timestamp. There are two types of timestamps in Kafka:

  • Create Time: The timestamp when the event was created by the producer.
  • Log Append Time: The timestamp when the event was appended to the Kafka log on the broker.

By default, Kafka uses the producer’s timestamp, but this behavior can be configured at the broker or topic level.

Setting up Your Kafka Environment

Before we add events with timestamps, make sure you have Kafka installed and a topic created. If you don’t have a running Kafka cluster, you can quickly set one up using Docker or by downloading Kafka from the official Apache website. For this tutorial, we will assume that you have Kafka running and a topic named ‘events-topic’.

Adding Events to a Topic with a Basic Producer

First, we’ll start by producing events using a simple Kafka producer. You can develop a Kafka producer in several languages. Here’s a basic Java example using the Kafka client library:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class BasicTimestampProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, String> record = new ProducerRecord<>("events-topic", "key", "value");
            producer.send(record);
        }
    }
}

In the code above, we’re sending a simple record without explicitly setting a timestamp, so it defaults to the current time generated by the producer. The key-value pair is “key”:”value” for this example.

Explicitly Adding a Timestamp

Now, let’s advance our Kafka producer by setting an explicit timestamp.

long explicitTimestamp = System.currentTimeMillis();
ProducerRecord<String, String> recordWithTimestamp = new ProducerRecord<>("events-topic", null, explicitTimestamp, "key", "value with timestamp");
producer.send(recordWithTimestamp);

In this modified producer example, we’ve added a timestamp by using ‘System.currentTimeMillis()’ to get the current time in milliseconds since epoch. The ‘null’ before the timestamp argument represents the partition, which we’re allowing Kafka to choose automatically.

Custom Timestamp Extraction

If your events have embedded timestamps within the payload, you might want to extract and use those instead. Here’s an example:

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

// Assume `eventJson` is a JSON string with an embedded timestamp.
JsonNode jsonNode = new ObjectMapper().readTree(eventJson);
long eventTimestamp = jsonNode.get("timestamp").asLong();

ProducerRecord<String, String> customTimestampRecord = new ProducerRecord<>("events-topic", null, eventTimestamp, "key", eventJson);
producer.send(customTimestampRecord);

This code snippet uses Jackson, a popular JSON library in Java, to parse the JSON string and extract the timestamp from the event payload. We then use this extracted timestamp to create our ‘ProducerRecord’.

Producer Interceptors for Timestamp Logic

For more complex timestamp logic, you can implement a custom ‘ProducerInterceptor’. Here is a simple interceptor example:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimestampInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        long now = System.currentTimeMillis();
        return new ProducerRecord<>(record.topic(), record.partition(), now, record.key(), record.value());
    }

    // Implement other necessary methods...

}

Configure your producer to use this interceptor by adding the following line to your ‘props’:

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.TimestampInterceptor");

The interceptor overrides the record’s timestamp with the current system time. Other complex logic can also be added in the ‘onSend’ method to manipulate the record before it is sent to the topic.

Handling Timestamps on the Consumer Side

After you have successfully sent events with timestamps, it’s important to understand how to retrieve and use these timestamps on the consumer side:

consumerRecord.timestamp()

This line of code will retrieve the timestamp from a Kafka ‘ConsumerRecord’.

Choosing Timestamp Types at the Broker Level

To configure timestamps at the broker level, alter the Kafka server properties file:

log.message.timestamp.type=LogAppendTime

This configuration line in ‘server.properties’ will switch the default timestamp from ‘CreateTime’ to ‘LogAppendTime’ for all topics in the Kafka cluster.

Setting the Timestamp Type for a Specific Topic

If you only want to change the timestamp type for a specific topic, you can use the Kafka command-line tool:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name events-topic --add-config message.timestamp.type=LogAppendTime

This command adds a topic-level configuration to use ‘LogAppendTime’ timestamps.

Testing

It is equally imperative to ensure that your producer correctly adds timestamps. Write unit tests or integration tests that verify the behavior of your producer, especially when you are implementing custom logic for timestamps.

Conclusion

Incorporating timestamps into Kafka events allows for sophisticated event time processing, log retention policies, and message ordering strategies. Whether you’re using default producer timestamps, extracting them from event payloads, or utilizing interceptors, Kafka provides flexibility to accommodate these varying requirements.