I would like to do some DBSCAN on Spark. I have currently found 2 implementations:
I have tested the first one with the sbt configuration given in its github but:
functions in the jar are not the same as those in the doc or in the source on github. For example, I cannot find the train function in the jar
I manage to run a test with the fit function (found in the jar) but a bad configuration of epsilon (a little to big) put the code in an infinite loop.
code :
val model = DBSCAN.fit(eps, minPoints, values, parallelism)
Has someone managed to do someting with the first library?
Has someone tested the second one?
Please try ELKI. As this is Java, it should be easy to call from Scala.
ELKI is very well optimized, and with indexes it will scale to quite large data sets.
We tried to include one of these Spark implementations in our benchmarking study - but it ran out of memory (and it was the only implementation to run out of memory ... the k-means of Spark and Mahout were also among the slowest):
Hans-Peter Kriegel, Erich Schubert, and Arthur Zimek.
The (black) art of runtime evaluation: Are we comparing algorithms or implementations?
In: Knowledge and Information Systems (KAIS). 2016, 1–38
Professor Neukirchen benchmarked parallel implementations of DBSCAN in this technical report:
Helmut Neukirchen
Survey and Performance Evaluation of DBSCAN Spatial Clustering Implementations for Big Data and High-Performance Computing Paradigms
apparently he got some of the Spark implementations working, but noted that:
The result is devastating: none of the implementations for Apache Spark is anywhere near to the HPC implementations. In particular on bigger (but still rather small) data sets, most of them fail completely and do not even deliver correct results.
and earlier:
When running any of the "Spark DBSCAN" implementations while making use of all available cores of our cluster, we experienced out-of-memory exceptions.
(also, "Spark DBSCAN" took 2406 seconds on 928 cores, ELKI took 997 seconds on 1 core for the smaller benchmark - the other Spark implementation didn't fare too well either, in particular it did not return the correct result...)
"DBSCAN on Spark" did not crash, but returned completely wrong clusters.
While "DBSCAN on Spark" finishes faster, it delivered completely wrong clustering results. Due to the hopelessly long run-times of the DBSCAN implementations for Spark already with the maximum number of cores, we did not perform measurements with a lower number of cores.
You can wrap a double[][]
array as ELKI database:
// Adapter to load data from an existing array.
DatabaseConnection dbc = new ArrayAdapterDatabaseConnection(data);
// Create a database (which may contain multiple relations!)
Database db = new StaticArrayDatabase(dbc, null);
// Load the data into the database (do NOT forget to initialize...)
db.initialize();
// Squared Euclidean is faster than Euclidean.
Clustering<Model> c = new DBSCAN<NumberVector>(
SquaredEuclideanDistanceFunction.STATIC, eps*eps, minpts).run(db);
for(Cluster<KMeansModel> clu : c.getAllClusters()) {
// Process clusters
}
See also: Java API example (in particular, how to map DBIDs back to row indexes). For better performance, pass an index factory (such as new CoverTree.Factory(...)
) as second parameter to the StaticArrayDatabase
constructor.