Sling Academy
Home/DevOps/How to Connect Kafka with Hadoop

How to Connect Kafka with Hadoop

Last updated: January 31, 2024

Introduction

Integrating Apache Kafka with Hadoop is becoming an increasingly popular way to efficiently process large volumes of data. Apache Kafka is a distributed streaming platform that allows for high-throughput data ingestion, while Hadoop is a framework for storage and large-scale processing of data-sets on clusters of commodity hardware. In this tutorial, we’ll guide you step-by-step on how to achieve a seamless integration between Kafka and Hadoop.

Prerequisites

Before we begin, ensure that you have the following software installed:

  • Apache Kafka
  • Apache Hadoop
  • Java Development Kit (JDK)

Additionally, it is beneficial to have a basic understanding of Kafka topics, producers, consumers, and the fundamental concepts of Hadoop’s HDFS and MapReduce.

Getting Started

Start by launching a Kafka server:

bin/kafka-server-start.sh config/server.properties

Create a topic in Kafka where messages will be published:

bin/kafka-topics.sh --create --topic hadoop-demo-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

Test your topic by sending some messages:

bin/kafka-console-producer.sh --topic hadoop-demo-topic --bootstrap-server localhost:9092
> This is a test message
> Another test line

Consume messages to verify they are being published (open a new terminal session):

bin/kafka-console-consumer.sh --topic hadoop-demo-topic --from-beginning --bootstrap-server localhost:9092

Importing Data into Hadoop

Before you can process Kafka data with Hadoop, you must get the data into the Hadoop Distributed File System (HDFS). You can accomplish this using Apache Flume with a Kafka source and HDFS sink:

agent.sources = kafka-source
agent.sinks = hdfs-sink
agent.channels = memory-channel

agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.zookeeperConnect = localhost:2181
agent.sources.kafka-source.topic = hadoop-demo-topic
agent.sources.kafka-source.channels = memory-channel

agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = hdfs:///user/kafka/hadoop-demo-topic
agent.sinks.hdfs-sink.channel = memory-channel

agent.channels.memory-channel.type = memory

After defining your Flume configuration, start the Flume agent:

bin/flume-ng agent --name agent --conf conf --conf-file job.conf

This will start the process which reads from Kafka and writes to HDFS continuously.

Processing Kafka Data with MapReduce

With your data in HDFS, you can now use Hadoop MapReduce to process it. Lets write a simple word count MapReduce job:

public class KafkaHadoopWordCount {

    public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    // Reducer and job-setup code will follow
}

The mapper splits each line into words and emits a tuple of the word and the number one for each occurrence. Implement a reducer that sums up these counts:

    public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
                           ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

Proceed by setting up and running the MapReduce job:

    // ... Other code ...
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Kafka Hadoop Word Count");
        job.setJarByClass(KafkaHadoopWordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

Advanced Considerations

For more advanced scenarios, consider using Apache Spark to process the Kafka stream in real-time:

val sparkConf = new SparkConf().setAppName("KafkaHadoopSparkApp")
val ssc = new StreamingContext(sparkConf, Seconds(10))

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = Set("hadoop-demo-topic")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val lines = stream.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()
ssc.awaitTermination()

This creates a Spark streaming context which creates a direct stream from Kafka, processes the data, and outputs a count of words seen in 10-second windows.

Conclusion

Integrating Kafka with Hadoop requires careful planning and execution but can ultimately lead to a robust data pipeline capable of handling massive streams of data. The connectivity between Kafka and Hadoop allows businesses to process data real-time and perform complex analytics at scale.

Next Article: Understand and Apply Exactly-Once Semantics in Kafka (with Examples)

Previous Article: How to Integrate Kafka with Databases

Series: Apache Kafka Tutorials

DevOps

You May Also Like

  • How to reset Ubuntu to factory settings (4 approaches)
  • Making GET requests with cURL: A practical guide (with examples)
  • Git: What is .DS_Store and should you ignore it?
  • NGINX underscores_in_headers: Explained with examples
  • How to use Jenkins CI with private GitHub repositories
  • Terraform: Understanding State and State Files (with Examples)
  • SHA1, SHA256, and SHA512 in Terraform: A Practical Guide
  • CSRF Protection in Jenkins: An In-depth Guide (with examples)
  • Terraform: How to Merge 2 Maps
  • Terraform: How to extract filename/extension from a path
  • JSON encoding/decoding in Terraform: Explained with examples
  • Sorting Lists in Terraform: A Practical Guide
  • Terraform: How to trigger a Lambda function on resource creation
  • How to use Terraform templates
  • Understanding terraform_remote_state data source: Explained with examples
  • Jenkins Authorization: A Practical Guide (with examples)
  • Solving Jenkins Pipeline NotSerializableException: groovy.json.internal.LazyMap
  • Understanding Artifacts in Jenkins: A Practical Guide (with examples)
  • Using Jenkins with AWS EC2 and S3: A Practical Guide