How can I create a TF-IDF for Text Classification using Spark?

eliasah picture eliasah · Jul 3, 2014 · Viewed 13.8k times · Source

I have a CSV file with the following format :

product_id1,product_title1
product_id2,product_title2
product_id3,product_title3
product_id4,product_title4
product_id5,product_title5
[...]

The product_idX is a integer and the product_titleX is a String, example :

453478692, Apple iPhone 4 8Go

I'm trying to create the TF-IDF from my file so I can use it for a Naive Bayes Classifier in MLlib.

I am using Spark for Scala so far and using the tutorials I have found on the official page and the Berkley AmpCamp 3 and 4.

So I'm reading the file :

val file = sc.textFile("offers.csv")

Then I'm mapping it in tuples RDD[Array[String]]

val tuples = file.map(line => line.split(",")).cache

and after I'm transforming the tuples into pairs RDD[(Int, String)]

val pairs = tuples.(line => (line(0),line(1)))

But I'm stuck here and I don't know how to create the Vector from it to turn it into TFIDF.

Thanks

Answer

Metropolis picture Metropolis · Sep 23, 2014

To do this myself (using pyspark), I first started by creating two data structures out of the corpus. The first is a key, value structure of

document_id, [token_ids]

The second is an inverted index like

token_id, [document_ids]

I'll call those corpus and inv_index respectively.

To get tf we need to count the number of occurrences of each token in each document. So

from collections import Counter
def wc_per_row(row):
    cnt = Counter()
    for word in row:
        cnt[word] += 1
    return cnt.items() 

tf = corpus.map(lambda (x, y): (x, wc_per_row(y)))

The df is simply the length of each term's inverted index. From that we can calculate the idf.

df = inv_index.map(lambda (x, y): (x, len(y)))
num_documnents = tf.count()

# At this step you can also apply some filters to make sure to keep
# only terms within a 'good' range of df. 
import math.log10
idf = df.map(lambda (k, v): (k, 1. + log10(num_documents/v))).collect()

Now we just have to do a join on the term_id:

def calc_tfidf(tf_tuples, idf_tuples):
    return [(k1, v1 * v2) for (k1, v1) in tf_tuples for
        (k2, v2) in idf_tuples if k1 == k2]

tfidf = tf.map(lambda (k, v): (k, calc_tfidf(v, idf)))

This isn't a particularly performant solution, though. Calling collect to bring idf into the driver program so that it's available for the join seems like the wrong thing to do.

And of course, it requires first tokenizing and creating a mapping from each uniq token in the vocabulary to some token_id.

If anyone can improve on this, I'm very interested.