This is my first venture into parallel processing and I have been looking into Dask but I am having trouble actually coding it.
I have had a look at their examples and documentation and I think dask.delayed will work best. I attempted to wrap my functions with the delayed(function_name), or add an @delayed decorator, but I can't seem to get it working properly. I preferred Dask over other methods since it is made in python and for its (supposed) simplicity. I know dask doesn't work on the for loop, but they say it can work inside a loop.
My code passes files through a function that contains inputs to other functions and looks like this:
from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....
then do some pre-processing ex:
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
then I call a constructor and pass the pre_results in to the function calls:
fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)
What i do here is I pass the file into the for loop, do some pre-processing and then pass the file into two models.
Thoughts or tips on how to do parallelize this? I began getting odd errors and I had no idea how to fix the code. The code does work as is. I use a bunch of pandas dataframes, series, and numpy arrays, and I would prefer not to go back and change everything to work with dask.dataframes etc.
The code in my comment may be difficult to read. Here it is in a more formatted way.
In the code below, when I type print(mean_squared_error) I just get: Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, prediction)
You need to call dask.compute to eventually compute the result. See dask.delayed documentation.
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
results = []
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1) # isn't this already a dataframe?
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = mse(observed, prediction)
results.append(mean_squared_error)
import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
delayed_results = []
for count, name in enumerate(filenames):
df = dask.delayed(pd.read_csv)(name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = dask.delayed(mse)(observed, prediction)
delayed_results.append(mean_squared_error)
results = dask.compute(*delayed_results)