Spring batch Job read from multiple sources

xedo picture xedo · Jan 23, 2014 · Viewed 16.7k times · Source

How can I read items from multiples databases? I already know that is possible from files.
the following example works for read from multiples files

...
<job id="readMultiFileJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="step1">
    <tasklet>
        <chunk reader="multiResourceReader" writer="flatFileItemWriter"
            commit-interval="1" />
    </tasklet>
    </step>
</job>
...
<bean id="multiResourceReader"
    class=" org.springframework.batch.item.file.MultiResourceItemReader">
    <property name="resources" value="file:csv/inputs/domain-*.csv" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>
...

three beans like this.

<bean id="database2" class="org.springframework.batch.item.database.JdbcCursorItemReader">
    <property name="name" value="database2Reader" />
    <property name="dataSource" ref="dataSource2" />
    <property name="sql" value="select image from object where image like '%/images/%'" />
    <property name="rowMapper">
        <bean class="sym.batch.ImagesRowMapper2" />
    </property>
</bean>

Answer

Luca Basso Ricci picture Luca Basso Ricci · Jan 23, 2014

There isn't a ready-to-use component that perform what you ask; the only solution is to write a custom ItemReader<> that delegates to JdbcCursorItemReader (or to HibernateCursorItemReader or to any generic ItemReader implementation).
You need to prepare all necessary stuff (datasource, session, real database readers) and bind all delegated readers to your custom reader.

EDIT: You need to simulate a loop using recusion of ItemReader.read() and mantain reader and delegates state across job restarts.

class MyItemReader<T> implements ItemReader<T>, ItemStream {
  private ItemReader[] delegates;
  private int delegateIndex;
  private ItemReader<T> currentDelegate;
  private ExecutionContext stepExecutionContext;

  public void setDelegates(ItemReader[] delegates) {
    this.delegates = delegates;
  }

  @BeforeStep
  private void beforeStep(StepExecution stepExecution) {
    this.stepExecutionContext = stepExecution.getExecutionContext();
  }

  public T read() {
    T item = null;
    if(null != currentDelegate) {
      item = currentDelegate.read();
      if(null == item) {
        ((ItemStream)this.currentDelegate).close();
        this.currentDelegate = null;
      }
    }
    // Move to next delegate if previous was exhausted!
    if(null == item && this.delegateIndex< this.delegates.length) {
      this.currentDelegate = this.delegates[this.currentIndex++];
      ((ItemStream)this.currentDelegate).open(this.stepExecutionContext);
      update(this.stepExecutionContext);
      // Recurse to read() to simulate loop through delegates
      item = read();
    }
    return item;
  }

  public void open(ExecutionContext ctx) {
    // During open restore last active reader and restore its state
    if(ctx.containsKey("index")) {
      this.delegateIndex = ctx.getInt("index");
      this.currentDelegate = this.delegates[this.delegateIndex];
      ((ItemStream)this.currentDelegate ).open(ctx);
    }
  }

  public void update(ExecutionContext ctx) {
    // Update current delegate index and state
    ctx.putInt("index", this.delegateIndex);
    if(null != this.currentDelegate) {
      ((ItemStream)this.currentDelegate).update(ctx);
    }
  }

  public void close(ExecutionContext ctx) {
    if(null != this.currentDelegate) {
      ((ItemStream)this.currentDelegate).close();
  }
}

<bean id="myItemReader" class=path.to.MyItemReader>
  <property name="delegates">
    <array>
      <ref bean="itemReader1"/>
      <ref bean="itemReader2"/>
      <ref bean="itemReader3"/>
    </array>
  </property>
</bean>

EDIT2: Remember to set property name; this is NECESSARY to let MyItemReader.read() works correctly

<bean id="itemReader1" class="JdbcCursorItemReader">
  <property name="name" value="itemReader1" />
  <!-- Set other properties -->
</bean>