Reading gz.parquet file

Mouzzam Hussain picture Mouzzam Hussain · Mar 30, 2016 · Viewed 8.1k times · Source

Hello I need to read the data from gz.parquet files but dont know how to?? Tried with impala but i get the same result as parquet-tools cat without the table structure.

P.S: any suggestions to improve the spark code are most welcome.

I have the following parquet files gz.parquet as a result of a data pipe line created by twitter => flume => kafka => spark streaming => hive/gz.parquet files). For flume agent i am using agent1.sources.twitter-data.type = org.apache.flume.source.twitter.TwitterSource

Spark code de-queues the data from kafka and storing in hive as follows:

val sparkConf = new SparkConf().setAppName("KafkaTweet2Hive")
    
val sc = new SparkContext(sparkConf)
    
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext =  new org.apache.spark.sql.hive.HiveContext(sc)//new org.apache.spark.sql.SQLContext(sc)
    
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    
// Get the data (tweets) from kafka
val tweets = messages.map(_._2)
    
    
// adding the tweets to Hive
    
tweets.foreachRDD { rdd =>

    val hiveContext = SQLContext.getOrCreate(rdd.sparkContext)
    
    import sqlContext.implicits._
    
    val tweetsDF = rdd.toDF()
    tweetsDF.write.mode("append").saveAsTable("tweet")  
}

When i run the spark-streaming app it stores the data as gz.parquet files in hdfs: /user/hive/warehouse directory as follows:

[root@quickstart /]# hdfs dfs -ls /user/hive/warehouse/tweets
Found 469 items
-rw-r--r--   1 root supergroup          0 2016-03-30 08:36 /user/hive/warehouse/tweets/_SUCCESS
-rw-r--r--   1 root supergroup        241 2016-03-30 08:36 /user/hive/warehouse/tweets/_common_metadata
-rw-r--r--   1 root supergroup      35750 2016-03-30 08:36 /user/hive/warehouse/tweets/_metadata
-rw-r--r--   1 root supergroup      23518 2016-03-30 08:33 /user/hive/warehouse/tweets/part-r-00000-0133fcd1-f529-4dd1-9371-36bf5c3e5df3.gz.parquet
-rw-r--r--   1 root supergroup       9552 2016-03-30 08:33 /user/hive/warehouse/tweets/part-r-00000-02c44f98-bfc3-47e3-a8e7-62486a1a45e7.gz.parquet
-rw-r--r--   1 root supergroup      19228 2016-03-30 08:25 /user/hive/warehouse/tweets/part-r-00000-0321ce99-9d2b-4c52-82ab-a9ed5f7d5036.gz.parquet
-rw-r--r--   1 root supergroup        241 2016-03-30 08:25 /user/hive/warehouse/tweets/part-r-00000-03415df3-c719-4a3a-90c6 462c43cfef54.gz.parquet

The schema from _metadata file is as follows:

    [root@quickstart /]# parquet-tools meta hdfs://quickstart.cloudera:8020/user/hive/warehouse/tweets/_metadata
    creator:       parquet-mr version 1.5.0-cdh5.5.0 (build ${buildNumber}) 
    extra:         org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"tweet","type":"string","nullable":true,"metadata":{}}]} 
    
    file schema:   root 
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    tweet:         OPTIONAL BINARY O:UTF8 R:0 D:1

Furthermore, if i load the data into a dataframe in spark i get the output of `df.show´ as follows:

    +--------------------+
    |               tweet|
    +--------------------+
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |ڕObjavro.sch...|
    |��Objavro.sc...|
    |ֲObjavro.sch...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |֕Objavro.sch...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    |��Objavro.sc...|
    +--------------------+
    only showing top 20 rows

How ever i would like to see the tweets as plain text?

Answer

JasonWayne picture JasonWayne · Jun 30, 2016
sqlContext.read.parquet("/user/hive/warehouse/tweets").show