pyspark how to load compressed snappy file

Levi Pierce picture Levi Pierce · Apr 25, 2015 · Viewed 10.9k times · Source

I have compressed a file using python-snappy and put it in my hdfs store. I am now trying to read it in like so but I get the following traceback. I can't find an example of how to read the file in so I can process it. I can read the text file (uncompressed) version fine. Should I be using sc.sequenceFile ? Thanks!

I first compressed the file and pushed it to hdfs

python-snappy -m snappy -c gene_regions.vcf gene_regions.vcf.snappy
hdfs dfs -put gene_regions.vcf.snappy /

I then added the following to spark-env.sh
export SPARK_EXECUTOR_MEMORY=16G                                                
export HADOOP_HOME=/usr/local/hadoop                                            

export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native             
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native                 
export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native           
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/lib/lib/snappy-java-1.1.1.8-SNAPSHOT.jar

I then launch my spark master and slave and finally my ipython notebook where I am executing the code below.

a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()

ValueError Traceback (most recent call last) in () ----> 1 a_file.first()

/home/user/Software/spark-1.3.0-bin-hadoop2.4/python/pyspark/rdd.pyc in first(self) 1244 if rs: 1245 return rs[0] -> 1246 raise ValueError("RDD is empty") 1247 1248 def isEmpty(self):

ValueError: RDD is empty

Working code (uncompressed) text file
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf")
a_file.first()

output: u'##fileformat=VCFv4.1'

Answer

Patrick Wendell picture Patrick Wendell · Apr 26, 2015

The issue here is that python-snappy is not compatible with Hadoop's snappy codec, which is what Spark will use to read the data when it sees a ".snappy" suffix. They are based on the same underlying algorithm but they aren't compatible in that you can compress with one and decompress with another.

You can make this work either by writing your data out in the first place to snappy using Spark or Hadoop. Or by having Spark read your data as binary blobs and then you manually invoke the python-snappy decompression yourself (see binaryFiles here http://spark.apache.org/docs/latest/api/python/pyspark.html). The binary blob approach is a bit more brittle because it needs to fit the entire file in memory for each input file. But if your data is small enough that will work.