Can dask parralelize reading fom a csv file?

Magellan88 picture Magellan88 · Oct 18, 2016 · Viewed 18.4k times · Source

I'm converting a large textfile to a hdf storage in hopes of a faster data access. The conversion works allright, however reading from the csv file is not done in parallel. It is really slow (takes about 30min for a 1GB textfile on an SSD, so my guess is that it is not IO-bound).

Is there a way to have it read in multiple threads in parralel? Sice it might be important, I'm currently forced to run under Windows -- just in case that makes any difference.

from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df.categorize([ 'Type',
                'Condition',               
          ])

df.to_hdf("data/data.hdf", "Measurements", 'w')

Answer

MRocklin picture MRocklin · Oct 18, 2016

Yes, dask.dataframe can read in parallel. However you're running into two problems:

Pandas.read_csv only partially releases the GIL

By default dask.dataframe parallelizes with threads because most of Pandas can run in parallel in multiple threads (releases the GIL). Pandas.read_csv is an exception, especially if your resulting dataframes use object dtypes for text

dask.dataframe.to_hdf(filename) forces sequential computation

Writing to a single HDF file will force sequential computation (it's very hard to write to a single file in parallel.)

Edit: New solution

Today I would avoid HDF and use Parquet instead. I would probably use the multiprocessing or dask.distributed schedulers to avoid GIL issues on a single machine. The combination of these two should give you full linear scaling.

from dask.distributed import Client
client = Client()

df = dask.dataframe.read_csv(...)
df.to_parquet(...)

Solution

Because your dataset likely fits in memory, use dask.dataframe.read_csv to load in parallel with multiple processes, then switch immediately to Pandas.

import dask.dataframe as ddf
import dask.multiprocessing

df = ddf.read_csv("data/Measurements*.csv",  # read in parallel
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(get=dask.multiprocessing.get)     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')