Using Kafka to import data to Hadoop

Kobe-Wan Kenobi picture Kobe-Wan Kenobi · Nov 4, 2014 · Viewed 7.5k times · Source

Firstly I was thinking what to use to get events into Hadoop, where they will be stored and periodically analysis would be performed on them (possibly using Ooozie to schedule periodic analysis) Kafka or Flume, and decided that Kafka is probably a better solution, since we also have a component that does event processing, so in this way, both batch and event processing components get data in the same way.

But know I'm looking for suggestions concretely how to get data out of broker to Hadoop.

I found here that Flume can be used in combination with Kafka

  • Flume - Contains Kafka Source (consumer) and Sink (producer)

And also found on the same page and in Kafka documentation that there is something called Camus

  • Camus - LinkedIn's Kafka=>HDFS pipeline. This one is used for all data at LinkedIn, and works great.

I'm interested in what would be a better (and easier, better documented solution) to do that? Also, are there any examples or tutorials how to do it?

When should I use this variants over simpler, High level consumer?

I'm opened for suggestions if there is another/better solution than this two.

Thanks

Answer

sunitha picture sunitha · Feb 3, 2016

You can use flume to dump data from Kafka to HDFS. Flume has kafka source and sink. Its a matter of property file change. An example is given below.

Steps:

  1. Create a kafka topic

    kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --     partitions 1 --topic testkafka
    
  2. Write to the above created topic using kafka console producer

    kafka-console-producer --broker-list localhost:9092 --topic testkafka
    
  3. Configure a flume agent with the following properties

    flume1.sources  = kafka-source-1
    flume1.channels = hdfs-channel-1
    flume1.sinks    = hdfs-sink-1
    flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
    flume1.sources.kafka-source-1.zookeeperConnect = localhost:2181
    flume1.sources.kafka-source-1.topic =testkafka
    flume1.sources.kafka-source-1.batchSize = 100
    flume1.sources.kafka-source-1.channels = hdfs-channel-1
    
    flume1.channels.hdfs-channel-1.type   = memory
    flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
    flume1.sinks.hdfs-sink-1.type = hdfs
    flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
    flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
    flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
    flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
    flume1.sinks.hdfs-sink-1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
    flume1.sinks.hdfs-sink-1.hdfs.rollCount=100
    flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
    flume1.channels.hdfs-channel-1.capacity = 10000
    flume1.channels.hdfs-channel-1.transactionCapacity = 1000
    

Save the above config file as example.conf

  1. Run the flume agent

    flume-ng agent -n flume1 -c conf -f example.conf -    Dflume.root.logger=INFO,console
    

Data will be now dumped to HDFS location under the following path

/tmp/kafka/%{topic}/%y-%m-%d