I have a bizarre issue with PySpark when indexing column of strings in features. Here is my tmp.csv file:
x0,x1,x2,x3
asd2s,1e1e,1.1,0
asd2s,1e1e,0.1,0
,1e3e,1.2,0
bd34t,1e1e,5.1,1
asd2s,1e3e,0.2,0
bd34t,1e2e,4.3,1
where I have one missing value for 'x0'. At first, I'm reading features from csv file into DataFrame using pyspark_csv: https://github.com/seahboonsiew/pyspark-csv then indexing x0 with StringIndexer:
import pyspark_csv as pycsv
from pyspark.ml.feature import StringIndexer
sc.addPyFile('pyspark_csv.py')
features = pycsv.csvToDataFrame(sqlCtx, sc.textFile('tmp.csv'))
indexer = StringIndexer(inputCol='x0', outputCol='x0_idx' )
ind = indexer.fit(features).transform(features)
print ind.collect()
when calling ''ind.collect()'' Spark throws java.lang.NullPointerException. Everything works fine for complete data set, e.g., for 'x1' though.
Does anyone have a clue what is causing this and how to fix it?
Thanks in advance!
Sergey
Update:
I use Spark 1.5.1. The exact error:
File "/spark/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py", line 258, in show
print(self._jdf.showString(n))
File "/spark/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/spark/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o444.showString.
: java.lang.NullPointerException
at org.apache.spark.sql.types.Metadata$.org$apache$spark$sql$types$Metadata$$hash(Metadata.scala:208)
at org.apache.spark.sql.types.Metadata$$anonfun$org$apache$spark$sql$types$Metadata$$hash$2.apply(Metadata.scala:196)
at org.apache.spark.sql.types.Metadata$$anonfun$org$apache$spark$sql$types$Metadata$$hash$2.apply(Metadata.scala:196)
... etc
I've tried to create the same DataFrame without reading csv file,
df = sqlContext.createDataFrame(
[('asd2s','1e1e',1.1,0), ('asd2s','1e1e',0.1,0),
(None,'1e3e',1.2,0), ('bd34t','1e1e',5.1,1),
('asd2s','1e3e',0.2,0), ('bd34t','1e2e',4.3,1)],
['x0','x1','x2','x3'])
and it gives the same error. A bit different example works fine,
df = sqlContext.createDataFrame(
[(0, None, 1.2), (1, '06330986ed', 2.3),
(2, 'b7584c2d52', 2.5), (3, None, .8),
(4, 'bd17e19b3a', None), (5, '51b5c0f2af', 0.1)],
['id', 'x0', 'num'])
// after indexing x0
+---+----------+----+------+
| id| x0| num|x0_idx|
+---+----------+----+------+
| 0| null| 1.2| 0.0|
| 1|06330986ed| 2.3| 2.0|
| 2|b7584c2d52| 2.5| 4.0|
| 3| null| 0.8| 0.0|
| 4|bd17e19b3a|null| 1.0|
| 5|51b5c0f2af| 0.1| 3.0|
+---+----------+----+------+
Update 2:
I've just discovered the same issue in Scala, so I guess it's Spark bug not PySpark only. In particular, data frame
val df = sqlContext.createDataFrame(
Seq(("asd2s","1e1e",1.1,0), ("asd2s","1e1e",0.1,0),
(null,"1e3e",1.2,0), ("bd34t","1e1e",5.1,1),
("asd2s","1e3e",0.2,0), ("bd34t","1e2e",4.3,1))
).toDF("x0","x1","x2","x3")
throws java.lang.NullPointerException when indexing 'x0' feature. Moreover, when indexing 'x0' in the following data frame
val df = sqlContext.createDataFrame(
Seq((0, null, 1.2), (1, "b", 2.3),
(2, "c", 2.5), (3, "a", 0.8),
(4, "a", null), (5, "c", 0.1))
).toDF("id", "x0", "num")
I've got 'java.lang.UnsupportedOperationException: Schema for type Any is not supported' which is caused by missing 'num' value in 5th vector. If one replaces it with a number everything works well even having missing value in the 1st vector.
I've also tried older versions of Spark (1.4.1), and the result is the same.
It looks like module you're using converts empty strings to nulls and it is messing at some point with downstream processing. At first glance it looks like a PySpark bug.
How to fix it? A simple workaround is to either drop nulls before indexing:
features.na.drop()
or replace nulls with some placeholder:
from pyspark.sql.functions import col, when
features.withColumn(
"x0", when(col("x0").isNull(), "__SOME_PLACEHOLDER__").otherwise(col("x0")))
Also, you could use spark-csv
. It is efficient, tested and as a bonus doesn't convert empty strings to nulls
.
features = (sqlContext.read
.format('com.databricks.spark.csv')
.option("inferSchema", "true")
.option("header", "true")
.load("tmp.csv"))