Load CSV file with Spark

Kernael picture Kernael · Feb 28, 2015 · Viewed 321k times · Source

I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing :

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

I would expect this call to give me a list of the two first columns of my file but I'm getting this error :

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

although my CSV file as more than one column.

Answer

zero323 picture zero323 · Dec 30, 2015

Spark 2.0.0+

You can use built-in csv data source directly:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

or

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

without including any external dependencies.

Spark < 2.0.0:

Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv:

Make sure that Spark CSV is included in the path (--packages, --jars, --driver-class-path)

And load your data as follows:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

It can handle loading, schema inference, dropping malformed lines and doesn't require passing data from Python to the JVM.

Note:

If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader. Assuming you have three columns - integer, double and string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))