Pyspark and PCA: How can I extract the eigenvectors of this PCA? How can I calculate how much variance they are explaining?

nanounanue picture nanounanue · Oct 30, 2015 · Viewed 12.5k times · Source

I am reducing the dimensionality of a Spark DataFrame with PCA model with pyspark (using the spark ml library) as follows:

pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(data)

where data is a Spark DataFrame with one column labeled features which is a DenseVector of 3 dimensions:

data.take(1)
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1')

After fitting, I transform the data:

transformed = model.transform(data)
transformed.first()
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))

My question is: How can I extract the eigenvectors of this PCA? How can I calculate how much variance they are explaining?

Answer

desertnaut picture desertnaut · Nov 2, 2015

[UPDATE: From Spark 2.2 onwards, PCA and SVD are both available in PySpark - see JIRA ticket SPARK-6227 and PCA & PCAModel for Spark ML 2.2; original answer below is still applicable for older Spark versions.]

Well, it seems incredible, but indeed there is not a way to extract such information from a PCA decomposition (at least as of Spark 1.5). But again, there have been many similar "complaints" - see here, for example, for not being able to extract the best parameters from a CrossValidatorModel.

Fortunately, some months ago, I attended the 'Scalable Machine Learning' MOOC by AMPLab (Berkeley) & Databricks, i.e. the creators of Spark, where we implemented a full PCA pipeline 'by hand' as part of the homework assignments. I have modified my functions from back then (rest assured, I got full credit :-), so as to work with dataframes as inputs (instead of RDD's), of the same format as yours (i.e. Rows of DenseVectors containing the numerical features).

We first need to define an intermediate function, estimatedCovariance, as follows:

import numpy as np

def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).map(lambda x:   x[0]).map(lambda x: x-m)  # subtract the mean

    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

Then, we can write a main pca function as follows:

from numpy.linalg import eigh

def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
        scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
        rows equals the length of the arrays in the input `RDD` and the number of columns equals
        `k`.  The `RDD` of scores has the same number of rows as `data` and consists of arrays
        of length `k`.  Eigenvalues is an array of length d (the number of features).
     """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvals
    score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    # Return the `k` principal components, `k` scores, and all eigenvalues

    return components.T, score, eigVals

Test

Let's see first the results with the existing method, using the example data from the Spark ML PCA documentation (modifying them so as to be all DenseVectors):

 from pyspark.ml.feature import *
 from pyspark.mllib.linalg import Vectors
 data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
         (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
         (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
 df = sqlContext.createDataFrame(data,["features"])
 pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
 model = pca_extracted.fit(df)
 model.transform(df).collect()

 [Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
  Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
  Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]

Then, with our method:

 comp, score, eigVals = pca(df)
 score.collect()

 [array([ 1.64857282,  4.0132827 ]),
  array([-4.64510433,  1.11679727]),
  array([-6.42888054,  5.33795143])]

Let me stress that we don't use any collect() methods in the functions we have defined - score is an RDD, as it should be.

Notice that the signs of our second column are all opposite from the ones derived by the existing method; but this is not an issue: according to the (freely downloadable) An Introduction to Statistical Learning, co-authored by Hastie & Tibshirani, p. 382

Each principal component loading vector is unique, up to a sign flip. This means that two different software packages will yield the same principal component loading vectors, although the signs of those loading vectors may differ. The signs may differ because each principal component loading vector specifies a direction in p-dimensional space: flipping the sign has no effect as the direction does not change. [...] Similarly, the score vectors are unique up to a sign flip, since the variance of Z is the same as the variance of −Z.

Finally, now that we have the eigenvalues available, it is trivial to write a function for the percentage of the variance explained:

 def varianceExplained(df, k=1):
     """Calculate the fraction of variance explained by the top `k` eigenvectors.

     Args:
         df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
         k: The number of principal components to consider.

     Returns:
         float: A number between 0 and 1 representing the percentage of variance explained
             by the top `k` eigenvectors.
     """
     components, scores, eigenvalues = pca(df, k)  
     return sum(eigenvalues[0:k])/sum(eigenvalues)


 varianceExplained(df,1)
 # 0.79439325322305299

As a test, we also check if the variance explained in our example data is 1.0, for k=5 (since the original data are 5-dimensional):

 varianceExplained(df,5)
 # 1.0

This should do your job efficiently; feel free to come up with any clarifications you may need.

[Developed & tested with Spark 1.5.0 & 1.5.1]