How to Set Up Source and Sink Connectors in Kafka

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka has become the go-to ecosystem for real-time event streaming. But its true potential is unlocked when connected with diverse data systems for importing and exporting data seamlessly. This process is facilitated by Kafka Connectors, modular plugins that help you to integrate Kafka with your data infrastructure without writing additional code.

Understanding Kafka Connectors

Kafka Connectors are typed as either source or sink. A source connector ingests data from an external system into Kafka, while a sink connector exports data from Kafka to an external storage system. Kafka Connect, the tool for streamlined connector management, supports numerous connectors out of the box, and you can implement custom connectors, too.

Before jumping into the deep-end, ensure Kafka and Zookeeper are up and running since Kafka connectors operate atop the Kafka infrastructure. If you haven’t yet, download Kafka from the official Apache Kafka website and follow installation instructions for your operating system.

Setting Up a Basic File Source Connector

Let’s start by setting up a simple file source connector to stream text files into Kafka.

cat > config/file-source-connector.properties <<EOF
connector.class=FileStreamSource
connector.type=source
tasks.max=1
topic=text_lines
file=/tmp/kafka-input.txt
EOF

Place the above configuration in a file in Kafka’s config directory. This configuration states that we want to use the FileStreamSource class, have one task, and look for the input file at ‘/tmp/kafka-input.txt’. Place any text file at this location to serve as input.

Run your connector using Kafka Connect in standalone mode, which is useful for development and testing:

bin/connect-standalone.sh config/connect-standalone.properties config/file-source-connector.properties

View the results by consuming messages from the topic using:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic text_lines --from-beginning

If your setup is correct, you will see the content of your file being streamed as messages in the Kafka topic you specified.

Creating a Custom MySQL Sink Connector

Moving forward, let’s create a more typical business use case: a custom Kafka sink connector for MySQL.

  1. Firstly, install a suitable MySQL JDBC driver. This can be achieved by downloading the jar from the MySQL website and placing it in the Kafka libs directory.
  2. Create your sink connector configuration file with the following parameters:
cat > config/mysql-sink-connector.properties <<EOF
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connector.type=sink
tasks.max=1
topics=db_changes
connection.url=jdbc:mysql://localhost:3306/kafka_demo
connection.user=kafka_user
connection.password=kafka_password
insert.mode=insert
auto.create=true
auto.evolve=true
EOF

This sink connector will ingest data from the topic ‘db_changes’ and insert rows into the ‘kafka_demo’ MySQL database. If the tables do not exist, ‘auto.create’ will ensure they are created with appropriate schemas inferred from the Kafka message keys and values.

Run your customized sink connector with:

bin/connect-standalone.sh config/connect-standalone.properties config/mysql-sink-connector.properties

You would now be able to see data from the Kafka topic ‘db_changes’ appearing in your MySQL tables.

Advanced Configuration: Error Handling and Retries

As your Kafka connectors run, they may encounter issues that should be handled gracefully. This requires configuring your connectors with appropriate error handling and retry policies.

cat > config/error-handling-file-source-connector.properties <<EOF
connector.class=FileStreamSource
connector.type=source
tasks.max=1
topic=text_lines
file=/tmp/kafka-input.txt
errors.tolerance=all
errors.retry.timeout=30000
errors.retry.delay.max.ms=3000
EOF

This configuration will enable the connector to tolerate all errors and retry failed operations with a maximum delay of 3 seconds. The retry operation will give up if it cannot succeed within 30 seconds, thereby preventing stuck threads.

Performance and Scalability Considerations

When deploying Kafka connectors in a production environment, it’s important to consider both performance and scalability. Horizontal scaling can be achieved by increasing the ‘tasks.max’ parameter, which spreads the workload across multiple tasks when the connector plugin supports it. Monitoring these connectors in operation is equally important to detect and resolve issues promptly.

Conclusion

Setting up source and sink connectors in Kafka is a pivotal step to integrating your real-time data streaming platform with the rest of your data infrastructure. By understanding the basic setup and progressively diving into custom configurations and error handling, you can ensure a resilient data pipeline with Kafka at its core.