multi-processing with spark(PySpark)

Mike picture Mike · Sep 27, 2017 · Viewed 7.1k times · Source

The usecase is the following:

I have a large dataframe, with a 'user_id' column in it (every user_id can appear in many rows). I have a list of users my_users which I need to analyse.

Groupby, filter and aggregate could be a good idea, but the available aggregation functions included in pyspark did not fit my needs. In the pyspark ver, user defined aggregation functions are still not fully supported and I decided to leave it for now..

Instead, I simply iterate the my_users list, filter each user in the dataframe, and analyse. In order to optimize this procedure, I decided to use python multiprocessing pool, for each user in my_users

The function that does the analysis (and passed to the pool) takes two arguments: the user_id, and a path to the main dataframe, on which I perform all the computations (PARQUET format). In the method, I load the dataframe, and work on it (DataFrame can't be passed as an argument itself)

I get all sorts of weird errors, on some of the processes (different in each run), that look like:

  • PythonUtils does not exist in the JVM (when reading the 'parquet' dataframe)

print screen of the error message

  • KeyError: 'c' not found (also, when reading the 'parquet' dataframe. What is 'c' anyway??)

When I run it without any multiprocessing, everything runs smooth, but slow..

Any ideas where these errors are coming from?

I'll put some code sample just to make things clearer:

PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant

# ....

def users_worker(df_path, user_id):
    df = spark.read.parquet(df_path) # The problem is here!
    ## the analysis of user_id in df is here

def user_worker_wrapper(args):
    users_worker(*args)

def analyse():
    # ...
    users_worker_args = [(df_path, user_id) for user_id in my_users]
    users_pool = Pool(processes=len(my_users))
    users_pool.map(users_worker_wrapper, users_worker_args)
    users_pool.close()
    users_pool.join()

Answer

Mike picture Mike · Sep 27, 2017

Indeed, as @user6910411 commented, when I changed the Pool to be threadPool (multiprocessing.pool.ThreadPool package), everything worked as expected and these errors were gone.

The root reasons for the errors themselves are also clear now, if you want me to share them, please comment below.