How to Write a Kafka Consumer in Python

Updated: January 31, 2024 By: Guest Contributor Post a comment

Overview

Apache Kafka is a powerful distributed streaming platform that is extensively used to build real-time data pipelines and streaming apps. It is highly scalable, fault-tolerant, and capable of handling trillions of events a day. One essential component of Kafka is the consumer, which reads data from Kafka topics. In this tutorial, we’ll walk through the steps to write a Kafka consumer in Python using the Confluent Kafka Python client.

Understanding Kafka Consumers

Kafka consumers read records from a Kafka cluster. These records are organized and stored in topics that are distributed over a number of partitions. Consumers subscribe to one or more topics and process the feed of records as they are produced.

Prerequisites

  • An active Kafka cluster.
  • Python 3.x installed.
  • Confluent Kafka Python client installed. Install it using pip install confluent-kafka.

Basic Kafka Consumer Example

Below is a simple example of a Kafka consumer written in Python.

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
}

c = Consumer(conf)

def print_assignment(consumer, partitions):
    print('Assignment:', partitions)

c.subscribe(['my_topic'], on_assign=print_assignment)

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            # End of partition event
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

The above code sets the consumer configuration, subscribes to a specified topic, and listens for messages in a loop. It then processes each message, handles errors or end-of-partition events, and converts the message to a printable string.

Handling Messages in Batches

To improve performance, Kafka consumers can process messages in batches. The following example demonstrates how to adjust the consumer to process messages in batch.

from confluent_kafka import Consumer, KafkaError, TopicPartition

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
}

c = Consumer(conf)
c.subscribe(['my_topic'])

try:
    while True:
        msgs = c.consume(num_messages=10, timeout=1.0)
        for msg in msgs:
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    continue
                else:
                    print(msg.error())
            else:
                print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
    c.close()

This example introduces the consume() method to fetch messages in batches. This is usually more efficient than polling for one message at a time.

Committing Offsets

Consumers can manually commit offsets, which provides precise control over when a record is considered consumed. The example below showcases how to manually commit offsets after processing.

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}

c = Consumer(conf)
c.subscribe(['my_topic'])

try:
    while True:
        msg = c.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            print('Error:', msg.error())
            continue

        print('Processing', msg.value().decode('utf-8'))
        c.commit(message=msg)
finally:
    c.close()

The enable.auto.commit is set to False, and c.commit() is used to commit the offset of the message after processing it, giving full control over when to acknowledge a message as processed.

Advanced Consumer Features

Complex applications may require more sophisticated consumers for failover, rebalancing, and offset management. Kafka consumers in Python support additional features and configurations.

Rebalance Listeners

Here’s how you can use a rebalance listener for processing messages.

from confluent_kafka import Consumer, KafkaError, OFFSET_BEGINNING

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'group1',
    'enable.auto.commit': False,
}

def on_assign(consumer, partitions):
    for p in partitions:
        # Reset the offset to the beginning if needed
        p.offset = OFFSET_BEGINNING
    consumer.assign(partitions)

def on_revoke(consumer, partitions):
    
.consumer.unassign()

consumer = Consumer(conf)
consumer.subscribe(['my_topic'], on_assign=on_assign, on_revoke=on_revoke)

try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                
.continue
            raise KafkaException(msg.error())
        else:
            print('Received message:', msg.value().decode('utf-8'))
            consumer.commit(msg)
finally:
    consumer.close()

The rebalance listener functions are on_assign and on_revoke, which enable custom handling of group rebalances. This is useful in scenarios where you want precise control when partitions are assigned or revoked.

Conclusion

Writing a Kafka consumer in Python is straightforward with the Confluent Kafka client. The above examples ranged from basic to advanced usage, illustrating how you can consume messages, commit offsets, process messages in batches, and handle group rebalances. By customizing these examples to fit your specific use case, you can leverage Kafka for powerful streaming and processing of real-time data.