I have a large pandas dataframe with multiple "records" consisting of 2 or more line items. I'm trying to efficiently perform a CPU intensive calculation on each record using multiprocessing. Here's a simplified example with a function that just adds a random number to each record:
import pandas as pd
from random import randrange
from multiprocessing import Pool
#Trivial example function
def my_func(record):
df.loc[((df.Record == record), 'Result')] = randrange(0,100)
print (df)
d = {'Record': ['A', 'A', 'B', 'B'], 'Values': [100, 200, 50, 70]}
df = pd.DataFrame(d)
all_records = df['Record'].unique()
if __name__ == '__main__':
pool = Pool(processes=2)
pool.map(my_func,all_records)
df.to_csv('output.csv')
The desired output is the original dataframe with a new column titled "Result" that includes a random number for each record. For example:
Record Values Result
0 A 100 63.0
1 A 200 63.0
2 B 50 22.0
3 B 70 22.0
Actual results are my CSV output isn't updated with a result column. I can tell the processes are working through the print statement in the function. From what I've researched, the processes act on a copy of df and aren't being brought back together. How can I get the results of each process reflected in a single dataframe?
This might work for you:
import pandas as pd
from random import randrange
from multiprocessing import Pool
#Trivial example function
def my_func(record):
sub_df = df.loc[df['Record'] == record]
sub_df['Result'] = randrange(0,100)
# return results for the record as pd.Series
return sub_df['Result']
d = {'Record': ['A', 'A', 'B', 'B'], 'Values': [100, 200, 50, 70]}
df = pd.DataFrame(d)
all_records = df['Record'].unique()
if __name__ == '__main__':
pool = Pool(processes=2)
results = pool.map(my_func, all_records)
pool.close()
pool.join()
# concatenate results into a single pd.Series
results = pd.concat(results)
# join results with original df
joined_df = df.join(results)
print(joined_df)
# Record Values Result
# 0 A 100 90
# 1 A 200 90
# 2 B 50 62
# 3 B 70 62