Sling Academy
Home/DevOps/How to Set Up Source and Sink Connectors in Kafka

How to Set Up Source and Sink Connectors in Kafka

Last updated: January 30, 2024

Introduction

Apache Kafka has become the go-to ecosystem for real-time event streaming. But its true potential is unlocked when connected with diverse data systems for importing and exporting data seamlessly. This process is facilitated by Kafka Connectors, modular plugins that help you to integrate Kafka with your data infrastructure without writing additional code.

Understanding Kafka Connectors

Kafka Connectors are typed as either source or sink. A source connector ingests data from an external system into Kafka, while a sink connector exports data from Kafka to an external storage system. Kafka Connect, the tool for streamlined connector management, supports numerous connectors out of the box, and you can implement custom connectors, too.

Before jumping into the deep-end, ensure Kafka and Zookeeper are up and running since Kafka connectors operate atop the Kafka infrastructure. If you haven’t yet, download Kafka from the official Apache Kafka website and follow installation instructions for your operating system.

Setting Up a Basic File Source Connector

Let’s start by setting up a simple file source connector to stream text files into Kafka.

cat > config/file-source-connector.properties <<EOF
connector.class=FileStreamSource
connector.type=source
tasks.max=1
topic=text_lines
file=/tmp/kafka-input.txt
EOF

Place the above configuration in a file in Kafka’s config directory. This configuration states that we want to use the FileStreamSource class, have one task, and look for the input file at ‘/tmp/kafka-input.txt’. Place any text file at this location to serve as input.

Run your connector using Kafka Connect in standalone mode, which is useful for development and testing:

bin/connect-standalone.sh config/connect-standalone.properties config/file-source-connector.properties

View the results by consuming messages from the topic using:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic text_lines --from-beginning

If your setup is correct, you will see the content of your file being streamed as messages in the Kafka topic you specified.

Creating a Custom MySQL Sink Connector

Moving forward, let’s create a more typical business use case: a custom Kafka sink connector for MySQL.

  1. Firstly, install a suitable MySQL JDBC driver. This can be achieved by downloading the jar from the MySQL website and placing it in the Kafka libs directory.
  2. Create your sink connector configuration file with the following parameters:
cat > config/mysql-sink-connector.properties <<EOF
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connector.type=sink
tasks.max=1
topics=db_changes
connection.url=jdbc:mysql://localhost:3306/kafka_demo
connection.user=kafka_user
connection.password=kafka_password
insert.mode=insert
auto.create=true
auto.evolve=true
EOF

This sink connector will ingest data from the topic ‘db_changes’ and insert rows into the ‘kafka_demo’ MySQL database. If the tables do not exist, ‘auto.create’ will ensure they are created with appropriate schemas inferred from the Kafka message keys and values.

Run your customized sink connector with:

bin/connect-standalone.sh config/connect-standalone.properties config/mysql-sink-connector.properties

You would now be able to see data from the Kafka topic ‘db_changes’ appearing in your MySQL tables.

Advanced Configuration: Error Handling and Retries

As your Kafka connectors run, they may encounter issues that should be handled gracefully. This requires configuring your connectors with appropriate error handling and retry policies.

cat > config/error-handling-file-source-connector.properties <<EOF
connector.class=FileStreamSource
connector.type=source
tasks.max=1
topic=text_lines
file=/tmp/kafka-input.txt
errors.tolerance=all
errors.retry.timeout=30000
errors.retry.delay.max.ms=3000
EOF

This configuration will enable the connector to tolerate all errors and retry failed operations with a maximum delay of 3 seconds. The retry operation will give up if it cannot succeed within 30 seconds, thereby preventing stuck threads.

Performance and Scalability Considerations

When deploying Kafka connectors in a production environment, it’s important to consider both performance and scalability. Horizontal scaling can be achieved by increasing the ‘tasks.max’ parameter, which spreads the workload across multiple tasks when the connector plugin supports it. Monitoring these connectors in operation is equally important to detect and resolve issues promptly.

Conclusion

Setting up source and sink connectors in Kafka is a pivotal step to integrating your real-time data streaming platform with the rest of your data infrastructure. By understanding the basic setup and progressively diving into custom configurations and error handling, you can ensure a resilient data pipeline with Kafka at its core.

Next Article: How to Integrate Kafka with Databases

Previous Article: How to Use Kafka Connect: Tutorial & Examples

Series: Apache Kafka Tutorials

DevOps

You May Also Like

  • How to reset Ubuntu to factory settings (4 approaches)
  • Making GET requests with cURL: A practical guide (with examples)
  • Git: What is .DS_Store and should you ignore it?
  • NGINX underscores_in_headers: Explained with examples
  • How to use Jenkins CI with private GitHub repositories
  • Terraform: Understanding State and State Files (with Examples)
  • SHA1, SHA256, and SHA512 in Terraform: A Practical Guide
  • CSRF Protection in Jenkins: An In-depth Guide (with examples)
  • Terraform: How to Merge 2 Maps
  • Terraform: How to extract filename/extension from a path
  • JSON encoding/decoding in Terraform: Explained with examples
  • Sorting Lists in Terraform: A Practical Guide
  • Terraform: How to trigger a Lambda function on resource creation
  • How to use Terraform templates
  • Understanding terraform_remote_state data source: Explained with examples
  • Jenkins Authorization: A Practical Guide (with examples)
  • Solving Jenkins Pipeline NotSerializableException: groovy.json.internal.LazyMap
  • Understanding Artifacts in Jenkins: A Practical Guide (with examples)
  • Using Jenkins with AWS EC2 and S3: A Practical Guide