I need to join two ordinary RDDs
on one/more columns. Logically this operation is equivalent to the database join operation of two tables. I wonder if this is possible only through Spark SQL
or there are other ways of doing it.
As a concrete example, consider
RDD r1
with primary key ITEM_ID
:
(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)
and RDD r2
with primary key COMPANY_ID
:
(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)
I want to join r1
and r2
.
How can this be done?
Soumya Simanta gave a good answer. However, the values in joined RDD are Iterable
, so the results may not be very similar to ordinary table joining.
Alternatively, you can:
val mappedItems = items.map(item => (item.companyId, item))
val mappedComp = companies.map(comp => (comp.companyId, comp))
mappedItems.join(mappedComp).take(10).foreach(println)
The output would be:
(c1,(Item(1,first,2,c1),Company(c1,company-1,city-1)))
(c1,(Item(2,second,2,c1),Company(c1,company-1,city-1)))
(c2,(Item(3,third,2,c2),Company(c2,company-2,city-2)))