I've been able to create a pipeline that will allow me to index multiple string columns at once, but I am getting stuck encoding them, because unlike indexing, the encoder is not an estimator so I never call fit according to the OneHotEncoder example in the docs.
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler,
OneHotEncoder}
import org.apache.spark.ml.Pipeline
val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")
val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()
//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)
//encoding columns
val indexColumns = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
cname => new OneHotEncoder()
.setInputCol(cname)
.setOutputCol(s"${cname}_vec")
)
val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)
val df_encoded = one_hot_pipeline.transform(df_indexed)
The OneHotEncoder object doesn't have a fit method so putting it in the same pipeline as the indexers will not work- it throws an error when I call fit on the pipeline. I can also not call transform on the pipeline that I made with the array of pipeline stages, one_hot_encoders
.
I have not found a good solution for using the OneHotEncoder without individually creating and calling transform on that transforming itself for all of the columns I want to encode
Spark >= 3.0:
In Spark 3.0 OneHotEncoderEstimator
has been renamed to OneHotEncoder
:
import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderModel}
val encoder = new OneHotEncoder()
.setInputCols(indexColumns)
.setOutputCols(indexColumns map (name => s"${name}_vec"))
Spark >= 2.3
Spark 2.3 introduced new classes OneHotEncoderEstimator
, OneHotEncoderModel
, which required fitting even if used outside Pipeline
, and operate on multiple columns at the same time.
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, OneHotEncoderModel}
val encoder = new OneHotEncoderEstimator()
.setInputCols(indexColumns)
.setOutputCols(indexColumns map (name => s"${name}_vec"))
encoder.fit(df_indexed).transform(df_indexed)
Spark < 2.3
Even if transformers you use don't require fitting you have to use fit
method to create a PipelineModel
which can be used to transform data.
one_hot_pipeline.fit(df_indexed).transform(df_indexed)
On a side note you can combine indexing and encoding into a single Pipeline
:
val pipeline = new Pipeline()
.setStages(index_transformers ++ one_hot_encoders)
val model = pipeline.fit(df)
model.transform(df)
Edit:
Error you see means that one of your columns contains an empty String
. It is accepted by indexer but cannot be used for encoding. Depending on you requirements you can drop these or use a dummy label. Unfortunately you cannot use NULLs
until SPARK-11569) is resolved.