PCA Analysis in PySpark

lapolonio picture lapolonio · Aug 2, 2015 · Viewed 15.3k times · Source

Looking at http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html. The examples seem to only contain Java and Scala.

Does Spark MLlib support PCA analysis for Python? If so please point me to an example. If not, how to combine Spark with scikit-learn?

Answer

zero323 picture zero323 · Aug 2, 2015

Spark >= 1.5.0

Although PySpark 1.5 introduces distributed data structures (pyspark.mllib.linalg.distributed) it looks like API is rather limited and there is no implementation of the computePrincipalComponents method.

It is possible to use either from pyspark.ml.feature.PCA or pyspark.mllib.feature.PCA though. In the first case an expected input is a data frame with vector column:

from pyspark.ml.feature import PCA as PCAml
from pyspark.ml.linalg import Vectors  # Pre 2.0 pyspark.mllib.linalg

df = sqlContext.createDataFrame([
   (Vectors.dense([1, 2, 0]),),
   (Vectors.dense([2, 0, 1]),),
   (Vectors.dense([0, 1, 0]),)], ("features", ))

pca = PCAml(k=2, inputCol="features", outputCol="pca")
model = pca.fit(df)
transformed = model.transform(df)

In Spark 2.0 or later you should use pyspark.ml.linalg.Vector in place of pyspark.mllib.linalg.Vector.

For mllib version you'll need a RDD of Vector:

from pyspark.mllib.feature import PCA as PCAmllib

rdd = sc.parallelize([
    Vectors.dense([1, 2, 0]),
    Vectors.dense([2, 0, 1]),
    Vectors.dense([0, 1, 0])])

model = PCAmllib(2).fit(rdd)
transformed = model.transform(rdd)

Spark < 1.5.0

PySpark <= 1.4.1 doesn't support distributed data structures yet so there is no built-in method to compute PCA. If input matrix is relatively thin you can compute covariance matrix in a distributed manner, collect the results and perform eigendecomposition locally on a driver.

Order of operations is more or less like the one bellow. Distributed steps are followed by a name of the operation, local by "*" and optional method.

  1. Create RDD[Vector] where each element is a single row from an input matrix. You can use numpy.ndarray for each row (prallelize)
  2. Compute column-wise statistics (reduce)
  3. Use results from 2. to center the matrix (map)
  4. Compute outer product for each row (map outer)
  5. Sum results to obtain covariance matrix (reduce +)
  6. Collect and compute eigendecomposition * (numpy.linalg.eigh)
  7. Choose top-n eigenvectors *
  8. Project the data (map)

Regarding Sklearn. You can use NumPy (it is already in use in Mllib), SciPy, Scikit locally on a driver or a worker the same way as usual.