I was trying this tutorial http://spark.apache.org/docs/latest/quick-start.html I first created a collection from a file
textFile = sc.textFile("README.md")
Then I tried a command to cound the words:
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
To print the collection:
wordCounts.collect()
I found how to sort it by word using the command sortByKey. I was wondering how it could be possible to do the same thing for sorting by the value, that in this case in the number that a word occur in the document.
The sorting usually should be done before collect() is called since that returns the dataset to the driver program and also that is the way an hadoop map-reduce job would be programmed in java so that the final output you want is written (typically) to HDFS. With the spark API this approach provides the flexibility of writing the output in "raw" form where you want, such as to a file where it could be used as input for further processing.
Using spark's scala API sorting before collect() can be done following eliasah's suggestion and using Tuple2.swap() twice, once before sorting and once after in order to produce a list of tuples sorted in increasing or decreasing order of their second field (which is named _2) and contains the count of number of words in their first field (named _1). Below is an example of how this is scripted in spark-shell:
// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _, 1) // 2nd arg configures one task (same as number of partitions)
.map(item => item.swap) // interchanges position of entries in each tuple
.sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
.map(item => item.swap)
In order to reverse the ordering of the sort use sortByKey(false,1) since its first arg is the boolean value of ascending. Its second argument is the number of tasks (equivilent to number of partitions) which is set to 1 for testing with a small input file where only one output data file is desired; reduceByKey also takes this optional argument.
After this the wordCounts RDD can be saved as text files to a directory with saveAsTextFile(directory_pathname) in which will be deposited one or more part-xxxxx files (starting with part-00000) depending on the number of reducers configured for the job (1 output data file per reducer), a _SUCCESS file depending on if the job succeeded or not and .crc files.
Using pyspark a python script very similar to the scala script shown above produces output that is effectively the same. Here is the pyspark version demonstrating sorting a collection by value:
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
.map(lambda (a, b): (b, a)) \
.sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
.map(lambda (a, b): (b, a))
In order to sortbyKey in descending order its first arg should be 0. Since python captures leading and trailing whitespace as data, strip() is inserted before splitting each line on spaces, but this is not necessary using spark-shell/scala.
The main difference in the output of the spark and python version of wordCount is that where spark outputs (word,3) python outputs (u'word', 3).
For more information on spark RDD methods see http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html for python and https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD for scala.
In the spark-shell, running collect() on wordCounts transforms it from an RDD to an Array[(String, Int)] = Array[Tuple2(String,Int)] which itself can be sorted on the second field of each Tuple2 element using:
Array.sortBy(_._2)
sortBy also takes an optional implicit math.Ordering argument such as Romeo Kienzler showed in a previous answer to this question. Array.sortBy(_._2) will do a reverse sort of the Array Tuple2 elements on their _2 fields just by defining an implicit reverse ordering before running the map-reduce script because it overrides the pre-existing ordering of Int. The reverse int Ordering already defined by Romeo Kienzler is:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
Another common way to define this reverse Ordering is to reverse the order of a and b and drop the (-1) on the right hand side of the compare definition:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compare(a)
}