How can I update a broadcast variable in spark streaming?

Andrew Stubbs picture Andrew Stubbs · Oct 27, 2015 · Viewed 19.7k times · Source

I have, I believe, a relatively common use case for spark streaming:

I have a stream of objects that I would like to filter based on some reference data

Initially, I thought that this would be a very simple thing to achieve using a Broadcast Variable:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}

However, albeit infrequently, my reference data will change periodically

I was under the impression that I could modify and re-broadcast my variable on the driver and it would be propagated to each of the workers, however the Broadcast object is not Serializable and needs to be final.

What alternatives do I have? The three solutions I can think of are:

  1. Move the reference data lookup into a forEachPartition or forEachRdd so that it resides entirely on the workers. However the reference data lives beind a REST API so I would also need to somehow store a timer / counter to stop the remote being accessed for every element in the stream.

  2. Restart the Spark Context every time the refdata changes, with a new Broadcast Variable.

  3. Convert the Reference Data to an RDD, then join the streams in such a way that I am now streaming Pair<MyObject, RefData>, though this will ship the reference data with every object.

Answer

Aastha picture Aastha · Dec 21, 2016

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}

Your code would look like :

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}

This worked for me on multi-cluster as well. Hope this helps