Getting Spark, Python, and MongoDB to work together

Philip O'Brien picture Philip O'Brien · Oct 28, 2015 · Viewed 19.4k times · Source

I'm having difficulty getting these components to knit together properly. I have Spark installed and working succesfully, I can run jobs locally, standalone, and also via YARN. I have followed the steps advised (to the best of my knowledge) here and here

I'm working on Ubuntu and the various component versions I have are

I had some difficulty following the various steps such as which jars to add to which path, so what I have added are

  • in /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce I have added mongo-hadoop-core-1.5.0-SNAPSHOT.jar
  • the following environment variables
    • export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
    • export PATH=$PATH:$HADOOP_HOME/bin
    • export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
    • export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
    • export PATH=$PATH:$SPARK_HOME/bin

My Python program is basic

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:password@localhost:27017/mydb.mycollection')

if __name__ == '__main__':
    main()

I am running it using the command

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py

and I am getting the following output as a result

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError

According to here

This exception is raised when an exception occurs in the Java client code. For example, if you try to pop an element from an empty stack. The instance of the Java exception thrown is stored in the java_exception member.

Looking at the source code for pymongo_spark.py and the line throwing the error, it says

"Error while communicating with the JVM. Is the MongoDB Spark jar on Spark's CLASSPATH? : "

So in response I have tried to be sure the right jars are being passed, but I might be doing this all wrong, see below

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

I have imported pymongo to the same python program to verify that I can at least access MongoDB using that, and I can.

I know there are quite a few moving parts here so if I can provide any more useful information please let me know.

Answer

zero323 picture zero323 · Nov 19, 2015

Updates:

2016-07-04

Since the last update MongoDB Spark Connector matured quite a lot. It provides up-to-date binaries and data source based API but it is using SparkConf configuration so it is subjectively less flexible than the Stratio/Spark-MongoDB.

2016-03-30

Since the original answer I found two different ways to connect to MongoDB from Spark:

While the former one seems to be relatively immature the latter one looks like a much better choice than a Mongo-Hadoop connector and provides a Spark SQL API.

# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
  .format("com.stratio.datasource.mongodb")
  .options(host="mongo:27017", database="foo", collection="bar")
  .load())

df.show()

## +---+----+--------------------+
## |  x|   y|                 _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+

It seems to be much more stable than mongo-hadoop-spark, supports predicate pushdown without static configuration and simply works.

The original answer:

Indeed, there are quite a few moving parts here. I tried to make it a little bit more manageable by building a simple Docker image which roughly matches described configuration (I've omitted Hadoop libraries for brevity though). You can find complete source on GitHub (DOI 10.5281/zenodo.47882) and build it from scratch:

git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .

or download an image I've pushed to Docker Hub so you can simply docker pull zero323/mongo-spark):

Start images:

docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash

Start PySpark shell passing --jars and --driver-class-path:

pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}

And finally see how it works:

import pymongo
import pymongo_spark

mongo_url = 'mongodb://mongo:27017/'

client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
    {"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()

pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
    .map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()

## [(1.0, -1.0), (0.0, 4.0)]

Please note that mongo-hadoop seems to close the connection after the first action. So calling for example rdd.count() after the collect will throw an exception.

Based on different problems I've encountered creating this image I tend to believe that passing mongo-hadoop-1.5.0-SNAPSHOT.jar and mongo-hadoop-spark-1.5.0-SNAPSHOT.jar to both --jars and --driver-class-path is the only hard requirement.

Notes: