Understand and Apply Exactly-Once Semantics in Kafka (with Examples)

Updated: January 31, 2024 By: Guest Contributor Post a comment

Overview

Apache Kafka has become a cornerstone in the world of stream processing and event-driven systems. Ensuring data integrity and consistency across distributed applications is crucial, and that’s where exactly-once semantics (EOS) come into play. In Kafka, we have two primary semantics: at-least-once and exactly-once. At-least-once ensures that messages are never lost but may be processed more than once. Exactly-once, on the other hand, is the gold standard as it guarantees that each message is processed exactly once, eliminating the possibility of duplicates.

The Basics of Exactly-Once Semantics

Exactly-once semantics can be achieved in Kafka by combining Idempotent Producers and Transactional APIs. Idempotent producers deduplicate messages in the event of retries, ensuring no duplicate writes. The transactional API allows producers to send messages in batches all of which will be either committed or aborted, ensuring consistency across multiple topic-partitions.

Configuring Idempotent Producers

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("enable.idempotence", "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

With idempotence enabled, each message has a sequence number and producers de-duplicate any message with the same sequence number.

Transactional API with Exactly-Once Semantics

producer.initTransactions();
try {
    producer.beginTransaction();
    for (String key : keysAndValues.keySet()) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, keysAndValues.get(key));
        producer.send(record);
    }
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    producer.abortTransaction();
} finally {
    producer.close();
}

This code snippet demonstrates the transactional API in which the producer sends a batch of messages transactionally. If an error occurs, such as ProducerFencedException, the transaction is aborted, ensuring either all or none of the messages are written.

Consuming Messages with Exactly-Once Semantics

Consumers also play an important role in exactly-once semantics by preventing message replay. This can be achieved by storing offsets in the same transaction as the processed messages.

Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "test-group");
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("auto.offset.reset", "earliest");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList(topic));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // Process the record
    }
    consumer.commitSync();
}

This consumer configuration ensures that offsets are manually committed after message processing. This way, we prevent message replay if processing the next batch fails.

Exactly-Once Semantics in Kafka Streams

Kafka Streams, a client library for building applications and microservices, has native support for exactly-once semantics. By simply setting the processing.guarantee property to exactly_once, Streams applications can process messages in an exactly-once fashion without any additional complexity.

Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application");
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");

StreamsBuilder builder = new StreamsBuilder();
// Define the stream processing topology
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
streams.start();

This configuration enables exactly-once processing in Kafka Streams without any complex coding.

Advanced Concepts

Exactly-once semantics become more complicated in multi-region Kafka deployments or with cross-cluster replication. Here, we might encounter additional challenges like time synchronization, network reliability, and transaction timeout configurations which need to be handled with care.

Conclusion

Exactly-once semantics in Kafka enable reliable data delivery, ensuring that messages are processed just once, thus preventing data duplication and ensuring consistency. While the implementation requires careful configuration and management of Kafka producers, consumers, and Streams applications, the resulting data integrity is often well worth the effort for critical systems relying on accurate real-time information.