Kafka: How to import/export CSV/TXT data

Updated: January 30, 2024 By: Guest Contributor Post a comment

Overview

Working with Apache Kafka, one of the most popular distributed streaming platforms, often requires the ability to import and export data in common file formats like CSV and TXT. This tutorial will walk you through the basics of handling CSV and TXT data in Kafka, employing both native Kafka utilities and third-party tools. By the end, you’ll be equipped to integrate Kafka with a widely used data interchange format.

Prerequisites

Before we embark on this Kafka journey, ensure that you have:

  • Apache Kafka installed and running
  • Basic familiarity with Kafka concepts like producers, consumers, topics, and brokers
  • A system with Java installed

Importing CSV Data into Kafka

To import CSV data into Kafka, we’ll start by creating a simple producer application in Java that reads from a CSV file and sends those records to a Kafka topic.

import org.apache.kafka.clients.producer.*;
import java.io.*;
import java.util.*;

public class CSVProducer {
    public static void main(String[] args) throws Exception {
        String bootstrapServers = "localhost:9092";
        String topicName = "csv_data";

        //Set up properties for Kafka Producer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //Initialization of Kafka Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //Reading CSV file
        BufferedReader br = new BufferedReader(new FileReader("data.csv"));
        String line;
        while ((line = br.readLine()) != null) {
            //Sending CSV line to Kafka topic
            producer.send(new ProducerRecord<String, String>(topicName, line));
        }

        //Closing resources
        producer.close();
        br.close();
        System.out.println("CSV data imported into Kafka topic successfully.");
    }
}

This producer reads each line from the ‘data.csv’ file and publishes it to the ‘csv_data’ Kafka topic. The simplicity of this code belies its power: with Kafka, your CSV data is now part of a highly scalable and fault-tolerant streaming data pipeline.

Exporting Data from Kafka to TXT

The reverse process—exporting data from a Kafka topic to a TXT file—involves creating a Kafka consumer that writes messages to a file.

import org.apache.kafka.clients.consumer.*;
import java.io.*;
import java.time.Duration;
import java.util.*;

public class TXTConsumer {
    public static void main(String[] args) throws Exception {
        String bootstrapServers = "localhost:9092";
        String groupId = "txt_group";
        String topicName = "csv_data";

        //Set up properties for Kafka Consumer
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //Initialization of Kafka Consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));

        //Writing data from Kafka to TXT file
        BufferedWriter bw = new BufferedWriter(new FileWriter("exported_data.txt"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                bw.write(record.value());
                bw.newLine();
            }
            bw.flush();
        }
    }
}

This consumer subscribes to the ‘csv_data’ topic, retrieves messages, and writes each record to ‘exported_data.txt’. One thing to note is the potential infinite loop. Depending on your needs, you might want to add exit conditions or logic to process only a certain number of messages.

Advanced CSV Handling

These basic examples demonstrate simple reads and writes. But real-world Kafka integration often involves dealing with complex schema, transformations, and processing. For advanced CSV handling within a Kafka ecosystem, you might consider Kafka Connect with a connector that supports CSV (like FileStream Connector or Confluent’s SpoolDir connectors).

See also: How to Set Up Source and Sink Connectors in Kafka.

Moreover, Kafka Streams and KSQL are powerful tools for transforming and processing data in flight. KSQL, for example, allows you to filter, aggregate, and query Kafka topics with SQL-like syntax. Kafka Streams, a Java client library, enables building applications and microservices where the input and output data are stored in Kafka topics.

Read more: Getting started with KSQL for Kafka stream processing.

Conclusion

The capability to import and export CSV/TXT data with Kafka is vital for data interchange and integration. The examples provided demonstrate basic interactions with Kafka, forming a foundation for more advanced systems to build upon. Wrap this tutorial up by encouraging exploration of Kafka Connect, Kafka Streams, and KSQL to truly leverage Kafka’s power with CSV and TXT data formats.