Overview
Apache Kafka is a powerful distributed streaming platform that is extensively used to build real-time data pipelines and streaming apps. It is highly scalable, fault-tolerant, and capable of handling trillions of events a day. One essential component of Kafka is the consumer, which reads data from Kafka topics. In this tutorial, we’ll walk through the steps to write a Kafka consumer in Python using the Confluent Kafka Python client.
Understanding Kafka Consumers
Kafka consumers read records from a Kafka cluster. These records are organized and stored in topics that are distributed over a number of partitions. Consumers subscribe to one or more topics and process the feed of records as they are produced.
Prerequisites
- An active Kafka cluster.
- Python 3.x installed.
- Confluent Kafka Python client installed. Install it using
pip install confluent-kafka
.
Basic Kafka Consumer Example
Below is a simple example of a Kafka consumer written in Python.
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
c = Consumer(conf)
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
c.subscribe(['my_topic'], on_assign=print_assignment)
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
The above code sets the consumer configuration, subscribes to a specified topic, and listens for messages in a loop. It then processes each message, handles errors or end-of-partition events, and converts the message to a printable string.
Handling Messages in Batches
To improve performance, Kafka consumers can process messages in batches. The following example demonstrates how to adjust the consumer to process messages in batch.
from confluent_kafka import Consumer, KafkaError, TopicPartition
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
c = Consumer(conf)
c.subscribe(['my_topic'])
try:
while True:
msgs = c.consume(num_messages=10, timeout=1.0)
for msg in msgs:
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
continue
else:
print(msg.error())
else:
print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
c.close()
This example introduces the consume()
method to fetch messages in batches. This is usually more efficient than polling for one message at a time.
Committing Offsets
Consumers can manually commit offsets, which provides precise control over when a record is considered consumed. The example below showcases how to manually commit offsets after processing.
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
c = Consumer(conf)
c.subscribe(['my_topic'])
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print('Error:', msg.error())
continue
print('Processing', msg.value().decode('utf-8'))
c.commit(message=msg)
finally:
c.close()
The enable.auto.commit
is set to False
, and c.commit()
is used to commit the offset of the message after processing it, giving full control over when to acknowledge a message as processed.
Advanced Consumer Features
Complex applications may require more sophisticated consumers for failover, rebalancing, and offset management. Kafka consumers in Python support additional features and configurations.
Rebalance Listeners
Here’s how you can use a rebalance listener for processing messages.
from confluent_kafka import Consumer, KafkaError, OFFSET_BEGINNING
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'group1',
'enable.auto.commit': False,
}
def on_assign(consumer, partitions):
for p in partitions:
# Reset the offset to the beginning if needed
p.offset = OFFSET_BEGINNING
consumer.assign(partitions)
def on_revoke(consumer, partitions):
.consumer.unassign()
consumer = Consumer(conf)
consumer.subscribe(['my_topic'], on_assign=on_assign, on_revoke=on_revoke)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
.continue
raise KafkaException(msg.error())
else:
print('Received message:', msg.value().decode('utf-8'))
consumer.commit(msg)
finally:
consumer.close()
The rebalance listener functions are on_assign
and on_revoke
, which enable custom handling of group rebalances. This is useful in scenarios where you want precise control when partitions are assigned or revoked.
Conclusion
Writing a Kafka consumer in Python is straightforward with the Confluent Kafka client. The above examples ranged from basic to advanced usage, illustrating how you can consume messages, commit offsets, process messages in batches, and handle group rebalances. By customizing these examples to fit your specific use case, you can leverage Kafka for powerful streaming and processing of real-time data.