Kafka: How to Write a Simple Producer in Python

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka is a powerful distributed streaming platform that enables you to build real-time data pipelines and streaming applications. Kafka is widely used for building real-time streaming data pipelines that reliably get data between systems or applications. In this tutorial, we’ll focus on how Kafka can be interfaced using Python to write a simple producer that sends messages to a Kafka topic.

Before we dive into the code examples, make sure you have the following prerequisites installed:

  • Python 3.x
  • Apache Kafka
  • kafka-python package (Install it via pip with pip install kafka-python)

Setting Up Apache Kafka

Before writing our producer, we need to set up Kafka locally. Follow these steps:

1. Start the ZooKeeper service:

bin/zookeeper-server-start.sh config/zookeeper.properties

2. Start the Kafka broker:

bin/kafka-server-start.sh config/server.properties

Once both services are up and running, we can proceed to create a topic. Open another terminal and run:

bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic test-topic --bootstrap-server localhost:9092

Now that we have a Kafka topic, let’s start writing our Python producer.

Creating a Simple Kafka Producer

The producer is responsible for creating and sending messages to a Kafka topic. Below is a basic producer script:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('test-topic', b'Hello, Kafka!')
producer.flush()
producer.close()

In this example:

  • We import KafkaProducer from the kafka-python package.
  • We create a producer object that connects to the local Kafka instance.
  • We send a simple byte string message to our ‘test-topic’.
  • We call flush() to ensure all messages have been sent.
  • We call close() to free up resources.

The b'Hello, Kafka!' represents the message, encoded in bytes. Kafka deals with byte arrays, so it’s important to encode strings before sending them.

Serialization with JSON

While sending a byte string is simple, in practice, we often need to send more complex, structured data. For this purpose, we can serialize our data to JSON:

import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('test-topic', {'key': 'value'})
producer.flush()
producer.close()

The value_serializer argument specifies how to convert the messages to bytes. Here, we use a lambda function to serialize a Python dictionary into a JSON formatted byte-string before sending the message.

Error Handling

It’s important to handle potential errors that may occur while producing messages:

from kafka.errors import KafkaError

try:
    future = producer.send('test-topic', b'Hello, Kafka!')
    future.get(timeout=10)
except KafkaError as e:
    print(f'An error occurred: {e}')
    raise e

This example makes use of the send() method’s return value, which is a Future that can be used to wait for the result synchronously with a timeout.

Synchronous and Asynchronous Sending

You can send messages either synchronously or asynchronously:

Synchronous:

from kafka import KafkaProducer
import time

producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(10):
    future = producer.send('test-topic', b'Hello, Kafka!')
    result = future.get(timeout=60)
    print(result)
    time.sleep(1)

Synchronous sending waits for a response after each message, possibly including a delay between messages as in the example.

Asynchronous:

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)

for _ in range(10):
    producer.send('test-topic', b'Hello, Kafka!').add_callback(on_send_success).add_errback(on_send_error)

In contrast, the asynchronous approach can include callbacks and error-backs and does not wait for the message to be acknowledged before continuing.

Producing Keyed Messages

With Kafka, you can send key-value pairs where messages with the same key will land in the same partition. This is crucial for maintaining order in certain cases:

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         key_serializer=str.encode,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range(10):
    producer.send('test-topic', key=str(i), value={'number': i})

We introduced a key_serializer to properly encode the key, ensuring that messages with the same numbered key are ordered correctly within the same partition.

Conclusion

Through this tutorial, you have learned how to set up Apache Kafka and write a simple producer in Python using kafka-python. We explored producing simple messages, using serialization for structured data, handling errors effectively, and sending synchronous and asynchronous messages. It’s now time to integrate this knowledge into your real-world applications and take advantage of Kafka’s robust messaging system.