How can you write to multiple outputs dependent on the key using Spark in a single Job.
Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job
E.g.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
would ensure cat prefix/1
is
a
b
and cat prefix/2
would be
c
EDIT: I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.
If you use Spark 1.4+, this has become much, much easier thanks to the DataFrame API. (DataFrames were introduced in Spark 1.3, but partitionBy()
, which we need, was introduced in 1.4.)
If you're starting out with an RDD, you'll first need to convert it to a DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
In Python, this same code is:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
Once you have a DataFrame, writing to multiple outputs based on a particular key is simple. What's more -- and this is the beauty of the DataFrame API -- the code is pretty much the same across Python, Scala, Java and R:
people_df.write.partitionBy("number").text("people")
And you can easily use other output formats if you want:
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
In each of these examples, Spark will create a subdirectory for each of the keys that we've partitioned the DataFrame on:
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh