I am reading in a CSV as a Spark DataFrame and performing machine learning operations upon it. I keep getting a Python serialization EOFError - any idea why? I thought it might be a memory issue - i.e. file exceeding available RAM - but drastically reducing the size of the DataFrame didn't prevent the EOF error.
Toy code and error below.
#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('myfile.csv')
#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)
#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()
Running the above code with spark-submit
on a single node repeatedly throws the following error, even if the size of the DataFrame is reduced prior to fitting the model (e.g. tinydf = df.sample(False, 0.00001)
:
Traceback (most recent call last):
File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 157,
in manager
File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 61,
in worker
File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/worker.py", line 136,
in main if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/serializers.py", line 545,
in read_int
raise EOFError
EOFError
The error appears to happen in the pySpark read_int function. Code for which is as follows from spark site :
def read_int(stream):
length = stream.read(4)
if not length:
raise EOFError
return struct.unpack("!i", length)[0]
This would mean that when reading 4bytes from the stream, if 0 bytes are read, EOF error is raised. The python docs are here.