Using Spark 1.4.0, I am trying to insert data from a Spark DataFrame into a MemSQL database (which should be exactly like interacting with a MySQL database) using insertIntoJdbc(). However I keep getting a Runtime TableAlreadyExists exception.
First I create the MemSQL table like this:
CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);
Then I create a simple dataframe in Spark and try to insert into MemSQL like this:
val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]
df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)
java.lang.RuntimeException: Table table1 already exists.
This solution applies to general JDBC connections, although the answer by @wayne is probably a better solution for memSQL specifically.
insertIntoJdbc seems to have been deprecated as of 1.4.0, and using it actually calls write.jdbc().
write() returns a DataFrameWriter object. If you want to append data to your table you will have to change the save mode of the object to "append"
.
Another issue with the example in the question above is the DataFrame schema didn't match the schema of the target table.
The code below gives a working example from the Spark shell. I am using spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar
to start my spark-shell session.
import java.util.Properties
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "")
val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")
val dfWriter = df.write.mode("append")
dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop)