Applying a function in each row of a big PySpark dataframe?

mommomonthewind picture mommomonthewind · Aug 25, 2017 · Viewed 17.4k times · Source

I have a big dataframe (~30M rows). I have a function f. The business of f is to run through each row, check some logics and feed the outputs into a dictionary. The function needs to be performed row by row.

I tried:

dic = dict() for row in df.rdd.collect(): f(row, dic)

But I always meet the error OOM. I set the memory of Docker to 8GB.

How can I effectively perform the business?

Answer

1.618 picture 1.618 · Aug 25, 2017
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, MapType

#sample data
df = sc.parallelize([
    ['a', 'b'],
    ['c', 'd'],
    ['e', 'f']
]).toDF(('col1', 'col2'))

#add logic to create dictionary element using rows of the dataframe    
def add_to_dict(l):
    d = {}
    d[l[0]] = l[1]
    return d
add_to_dict_udf = udf(add_to_dict, MapType(StringType(), StringType()))
#struct is used to pass rows of dataframe
df = df.withColumn("dictionary_item", add_to_dict_udf(struct([df[x] for x in df.columns])))
df.show()

#list of dictionary elements
dictionary_list = [i[0] for i in df.select('dictionary_item').collect()]
print dictionary_list

Output is:

[{u'a': u'b'}, {u'c': u'd'}, {u'e': u'f'}]