Feature normalization algorithm in Spark

Alex B picture Alex B · Dec 12, 2015 · Viewed 9.5k times · Source

Trying to understand Spark's normalization algorithm. My small test set contains 5 vectors:

{0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0},  
{1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 70000.0},  
{-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 70000.0},  
{-0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0},  
{0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 70000.0},  

I would expect that new Normalizer().transform(vectors) creates JavaRDD where each vector feature is normalized as (v-mean)/stdev across all values for feature-0, `feature-1, etc.
The resulting set is:

[-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,0.9999999993877552]  
[1.357142668768307E-5,2.571428214508371E-7,0.0,3.428570952677828E-4,3.428570952677828E-4,2.057142571606697E-4,0.9999998611976999]  
[-1.357142668768307E-5,2.571428214508371E-7,0.0,3.428570952677828E-4,3.428570952677828E-4,2.057142571606697E-4,0.9999998611976999]  
[1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,0.9999999993877552]  
[0.0,0.0,0.0,0.0,0.0,0.0,1.0]  

Note that all original values 7000.0 result in different 'normalized' values. Also, how, for example, 1.357142668768307E-5 was calculated when the values are: .95, 1,-1, -.95, 0? What's more, if I remove a feature, the results are different. Could not find any documentation on the issue.
In fact, my question is, how to normalize all vectors in RDD correctly?

Answer

zero323 picture zero323 · Dec 12, 2015

Your expectations are simply incorrect. As it is clearly stated in the official documentation "Normalizer scales individual samples to have unit L p norm" where default value for p is 2. Ignoring numerical precision issues:

import org.apache.spark.mllib.linalg.Vectors

val rdd = sc.parallelize(Seq(
    Vectors.dense(0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0),  
    Vectors.dense(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 70000.0),  
    Vectors.dense(-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 70000.0),  
    Vectors.dense(-0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0),  
    Vectors.dense(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 70000.0)))

val transformed = normalizer.transform(rdd)
transformed.map(_.toArray.sum).collect
// Array[Double] = Array(1.0009051182149054, 1.000085713673417,
//   0.9999142851020933, 1.00087797536153, 1.0

MLLib doesn't provide functionality you need but can use StandardScaler from ML.

import org.apache.spark.ml.feature.StandardScaler

val df = rdd.map(Tuple1(_)).toDF("features")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)

val transformedDF =  scaler.fit(df).transform(df)

transformedDF.select($"scaledFeatures")show(5, false)

// +--------------------------------------------------------------------------------------------------------------------------+
// |scaledFeatures                                                                                                            |
// +--------------------------------------------------------------------------------------------------------------------------+
// |[0.9740388301169303,0.015272022105217588,0.0,1.0938637007095298,1.0938637007095298,1.0910691283447955,0.0]                |
// |[1.0253040317020319,1.4038947727833362,1.414213562373095,-0.6532797101459693,-0.6532797101459693,-0.6010982697825494,0.0] |
// |[-1.0253040317020319,-1.4242574689236265,-1.414213562373095,-0.805205224133404,-0.805205224133404,-0.8536605680105113,0.0]|
// |[-0.9740388301169303,0.015272022105217588,0.0,1.0938637007095298,1.0938637007095298,1.0910691283447955,0.0]               |
// |[0.0,-0.010181348070145075,0.0,-0.7292424671396867,-0.7292424671396867,-0.7273794188965303,0.0]                           |
// +--------------------------------------------------------------------------------------------------------------------------+