Trying to understand Spark's normalization algorithm. My small test set contains 5 vectors:
{0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0},
{1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 70000.0},
{-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 70000.0},
{-0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0},
{0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 70000.0},
I would expect that new Normalizer().transform(vectors)
creates JavaRDD
where each vector feature is normalized as (v-mean)/stdev
across all values for feature-0, `feature-1, etc.
The resulting set is:
[-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,0.9999999993877552]
[1.357142668768307E-5,2.571428214508371E-7,0.0,3.428570952677828E-4,3.428570952677828E-4,2.057142571606697E-4,0.9999998611976999]
[-1.357142668768307E-5,2.571428214508371E-7,0.0,3.428570952677828E-4,3.428570952677828E-4,2.057142571606697E-4,0.9999998611976999]
[1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,0.9999999993877552]
[0.0,0.0,0.0,0.0,0.0,0.0,1.0]
Note that all original values 7000.0 result in different 'normalized' values. Also, how, for example, 1.357142668768307E-5
was calculated when the values are: .95
, 1
,-1
, -.95
, 0
? What's more, if I remove a feature, the results are different. Could not find any documentation on the issue.
In fact, my question is, how to normalize all vectors in RDD correctly?
Your expectations are simply incorrect. As it is clearly stated in the official documentation "Normalizer
scales individual samples to have unit L p norm" where default value for p is 2. Ignoring numerical precision issues:
import org.apache.spark.mllib.linalg.Vectors
val rdd = sc.parallelize(Seq(
Vectors.dense(0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0),
Vectors.dense(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 70000.0),
Vectors.dense(-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 70000.0),
Vectors.dense(-0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0),
Vectors.dense(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 70000.0)))
val transformed = normalizer.transform(rdd)
transformed.map(_.toArray.sum).collect
// Array[Double] = Array(1.0009051182149054, 1.000085713673417,
// 0.9999142851020933, 1.00087797536153, 1.0
MLLib
doesn't provide functionality you need but can use StandardScaler
from ML
.
import org.apache.spark.ml.feature.StandardScaler
val df = rdd.map(Tuple1(_)).toDF("features")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(true)
val transformedDF = scaler.fit(df).transform(df)
transformedDF.select($"scaledFeatures")show(5, false)
// +--------------------------------------------------------------------------------------------------------------------------+
// |scaledFeatures |
// +--------------------------------------------------------------------------------------------------------------------------+
// |[0.9740388301169303,0.015272022105217588,0.0,1.0938637007095298,1.0938637007095298,1.0910691283447955,0.0] |
// |[1.0253040317020319,1.4038947727833362,1.414213562373095,-0.6532797101459693,-0.6532797101459693,-0.6010982697825494,0.0] |
// |[-1.0253040317020319,-1.4242574689236265,-1.414213562373095,-0.805205224133404,-0.805205224133404,-0.8536605680105113,0.0]|
// |[-0.9740388301169303,0.015272022105217588,0.0,1.0938637007095298,1.0938637007095298,1.0910691283447955,0.0] |
// |[0.0,-0.010181348070145075,0.0,-0.7292424671396867,-0.7292424671396867,-0.7273794188965303,0.0] |
// +--------------------------------------------------------------------------------------------------------------------------+