Memory efficient cartesian join in PySpark

mgoldwasser picture mgoldwasser · Feb 6, 2017 · Viewed 16.4k times · Source

I have a large dataset of string ids, that can fit into memory on a single node in my spark cluster. The issue is that it consumes most of the memory for a single node.

These ids are about 30 characters long. For example:

ids
O2LWk4MAbcrOCWo3IVM0GInelSXfcG
HbDckDXCye20kwu0gfeGpLGWnJ2yif
o43xSMBUJLOKDxkYEQbAEWk4aPQHkm

I am looking to write to file a list of all of the pairs of ids. For example:

id1,id2
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,HbDckDXCye20kwu0gfeGpLGWnJ2yif
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,o43xSMBUJLOKDxkYEQbAEWk4aPQHkm
HbDckDXCye20kwu0gfeGpLGWnJ2yif,O2LWk4MAbcrOCWo3IVM0GInelSXfcG
# etc...

So I need to cross join the dataset on itself. I was hoping to do this on PySpark using a 10 node cluster, but it needs to be memory efficient.

Answer

Mariusz picture Mariusz · Feb 6, 2017

pySpark will handle your dataset easily and memory efficient but it will take time to process 10^8 * 10^8 records (this is estimated size of cross join result). See sample code:

from pyspark.sql.types import *
df = spark.read.csv('input.csv', header=True, schema=StructType([StructField('id', StringType())]))
df.withColumnRenamed('id', 'id1').crossJoin(df.withColumnRenamed('id', 'id2')).show()