I have, I believe, a relatively common use case for spark streaming:
I have a stream of objects that I would like to filter based on some reference data
Initially, I thought that this would be a very simple thing to achieve using a Broadcast Variable:
public void startSparkEngine {
Broadcast<ReferenceData> refdataBroadcast
= sparkContext.broadcast(getRefData());
final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
final ReferenceData refData = refdataBroadcast.getValue();
return obj.getField().equals(refData.getField());
}
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
However, albeit infrequently, my reference data will change periodically
I was under the impression that I could modify and re-broadcast my variable on the driver and it would be propagated to each of the workers, however the Broadcast
object is not Serializable
and needs to be final
.
What alternatives do I have? The three solutions I can think of are:
Move the reference data lookup into a forEachPartition
or forEachRdd
so that it resides entirely on the workers. However the reference data lives beind a REST API so I would also need to somehow store a timer / counter to stop the remote being accessed for every element in the stream.
Restart the Spark Context every time the refdata changes, with a new Broadcast Variable.
Convert the Reference Data to an RDD, then join
the streams in such a way that I am now streaming Pair<MyObject, RefData>
, though this will ship the reference data with every object.
Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl
public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
//Your logic to refresh
ReferenceData data = getRefData();
var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}
Your code would look like :
public void startSparkEngine() {
final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());
stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
});
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
This worked for me on multi-cluster as well. Hope this helps