Create Spark Dataframe from SQL Query

opus111 picture opus111 · Jul 14, 2016 · Viewed 36.7k times · Source

I'm sure this is a simple SQLContext question, but I can't find any answer in the Spark docs or Stackoverflow

I want to create a Spark Dataframe from a SQL Query on MySQL

For example, I have a complicated MySQL query like

SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...

and I want a Dataframe with Columns X,Y and Z

I figured out how to load entire tables into Spark, and I could load them all, and then do the joining and selection there. However, that is very inefficient. I just want to load the table generated by my SQL query.

Here is my current approximation of the code, that doesn't work. Mysql-connector has an option "dbtable" that can be used to load a whole table. I am hoping there is some way to specify a query

  val df = sqlContext.format("jdbc").
    option("url", "jdbc:mysql://localhost:3306/local_content").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", "root").
    option("password", "").
    sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
    ).load()

Answer

opus111 picture opus111 · Aug 24, 2016

I found this here Bulk data migration through Spark SQL

The dbname parameter can be any query wrapped in parenthesis with an alias. So in my case, I need to do this:

val query = """
  (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
    join DialogLine as dl on dl.DialogID=d.DialogID
    join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
    join WordRoot as wr on wr.WordRootID=wi.WordRootID
    where d.InSite=1 and dl.Active=1
    limit 100) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:mysql://localhost:3306/local_content").
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("continueBatchOnError","true").
  option("useSSL", "false").
  option("user", "root").
  option("password", "").
  option("dbtable",query).
  load()

As expected, loading each table as its own Dataframe and joining them in Spark was very inefficient.