How to use apache storm tuple

dermoritz picture dermoritz · Aug 17, 2015 · Viewed 7.9k times · Source

I just began with Apache Storm. I read the tutorial and had a look into examples My problem is that all example work with very simple tuples (often one filed with a string). The tuples are created inline (using new Values(...)). In my case i have tuples with many fields (5..100). So my question is how to implement such tuple with name and type (all primitive) for each field?

Are there any examples? (i think directly implementing "Tuple" isn't a good idea)

thanks

Answer

Kit Menke picture Kit Menke · Aug 17, 2015

An alternative to creating the tuple with all of the fields as a value is to just create a bean and pass that inside the tuple.

Given the following class:

public class DataBean implements Serializable {
    private static final long serialVersionUID = 1L;

    // add more properties as necessary
    int id;
    String word;

    public DataBean(int id, String word) {
        setId(id);
        setWord(word);
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
}

Create and emit the DataBean in one bolt:

collector.emit(new Values(bean));

Get the DataBean in the destination bolt:

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
        DataBean bean = (DataBean)tuple.getValue(0);
        // do your bolt processing with the bean
    } catch (Exception e) {
        LOG.error("WordCountBolt error", e);
        collector.reportError(e);
    }       
}

Don't forget to make your bean serializable and register when you set up your topology:

Config stormConfig = new Config();
stormConfig.registerSerialization(DataBean.class);
// more stuff
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());

Disclaimer: Beans will work fine for shuffle grouping. If you need to do a fieldsGrouping, you should still use a primitive. For example, in the Word Count scenario, you need go group by word so you might emit:

collector.emit(new Values(word, bean));