How to Use Kafka Connect: Tutorial & Examples

Updated: February 1, 2024 By: Guest Contributor Post a comment

Introduction

Apache Kafka has become the backbone of real-time data processing for many organizations, offering robust distributed streaming capabilities. Kafka Connect is a component of Apache Kafka that simplifies the integration of other data systems such as databases, key-value stores, search indexes, and file systems with Kafka. In this tutorial, we’ll explore Kafka Connect and look at examples to understand how you can use it to streamline your data pipelines.

What is Kafka Connect?

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It is a framework for connecting Kafka with external systems such as databases, key-value stores, and more. Kafka Connect is built upon the notion of ‘connectors’ which are specialized plugins that know how to interact with specific systems for input and output of data.

Kafka Connect can operate in two modes:

  • Source Connectors: Pull data from an external system into Kafka.
  • Sink Connectors: Push data from Kafka into an external system.

Setting Up Kafka Connect

Before you can start using Kafka Connect, you need to have a Kafka cluster up and running. For this tutorial, we will assume that you have Apache Kafka and Zookeeper installed and configured.

Step 1: Install a Connector Plugin

# List available connector plugins
confluent-hub list

# Install a connector plugin, e.g., FileStreamSource
confluent-hub install confluentinc/kafka-connect-file-source:latest

Installing a plugin with confluent-hub will place it into your Kafka Connect’s classpath, making it available for use.

Step 2: Configure the Connector

Connectors are configured through simple JSON or properties files. For this example, let’s create a ‘FileSourceConnector’ to read data from a file and write it into a Kafka topic.

{
  "name": "local-file-source",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "tasks.max": "1",
    "file": "./test.txt",
    "topic": "test-topic" 
  }
}

Step 3: Start the Connector

Once your connector is configured, you can start it either using the REST API or the CLI.

curl -X POST -H "Content-Type: application/json" --data @file-source-config.json http://localhost:8083/connectors

At this point, your connector should be running and data from ‘test.txt’ will be streamed into the ‘test-topic’ topic.

Example: Using Connectors

Source Connector Example

Assume we have a MySQL database from which we would like to capture changes and publish them into Kafka. We can use the Debezium MySQL Source connector for this.

# First, install the Debezium MySQL connector
confluent-hub install debezium/debezium-connector-mysql:latest

# Configure the MySQL source connector
{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "user",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "exampleDB",
    "database.history.kafka.bootstrap.servers": "kafka:29092",
    "database.history.kafka.topic": "schema-changes.exampleDB"
  }
}

This configuration captures the change data from the MySQL database. Changes will be published to the topic prefix ‘dbserver1’.

Sink Connector Example

If we want to store our Kafka messages into a Elasticsearch, we can use the Confluent Elasticsearch Sink Connector.

# Install the Elasticsearch connector
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest

# Elasticsearch sink connector configuration
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "test-topic",
    "connection.url": "http://elasticsearch:9200",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}

This sink connector will consume messages from ‘test-topic’ and write them into Elasticsearch.

Bonus: Kafka Connect in Standalone vs Distributed Mode

Kafka Connect can run in two different modes: standalone and distributed. Standalone mode is for running Kafka Connect on a single machine with a single worker process. It’s useful for development and testing.

Distributed mode, on the other hand, runs Connect across multiple machines for fault tolerance and scalability.

To run in distributed mode, you simply start the Connect runtime with the ‘connect-distributed.properties’ file instead of ‘connect-standalone.properties’. You also need to ensure that all your worker instances are configured to form a cluster.

Conclusion

In this tutorial, we’ve covered the basics of setting up Kafka Connect, using source and sink connectors, and touched upon the differences between standalone and distributed modes. Remember, Kafka Connect’s strength lies in its ease of use and the growing number of connectors available which make it a versatile tool for building robust, scalable data pipelines.