Spring batch jpaPagingItemReader why some rows are not read?

ThierryC picture ThierryC · Oct 22, 2014 · Viewed 25.2k times · Source

I 'm using Spring Batch(3.0.1.RELEASE) / JPA and an HSQLBD server database. I need to browse an entire table (using paging) and update items (one by one). So I used a jpaPagingItemReader. But when I run the job I can see that some rows are skipped, and the number of skipped rows is equal to the page size. For i.e. if my table has 12 rows and the jpaPagingItemReader.pagesize = 3 the job will read : lines 1,2,3 then lines 7,8,9 (so skip the lines 4,5,6)… Could you tell me what is wrong in my code/configuration, or maybe it's an issue with HSQLDB paging? Below is my code:

[EDIT] : The problem is with my ItemProcessor that performs modification to the POJOs Entities. Since JPAPagingItemReader made a flush between each reading, the Entities are updated ((this is what I want) . But it seems that the cursor paging is also incremented (as can be seen in the log: row ID 4, 5 and 6 have been skipped). How can I manage this issue ?

@Configuration
@EnableBatchProcessing(modular=true)
public class AppBatchConfig {
  @Inject
  private InfrastructureConfiguration infrastructureConfiguration;  
  @Inject private JobBuilderFactory jobs;
  @Inject private StepBuilderFactory steps;

  @Bean  public Job job() {
     return jobs.get("Myjob1").start(step1()).build();
  }
  @Bean  public Step step1() {  
      return steps.get("step1")
                .<SNUserPerCampaign, SNUserPerCampaign> chunk(0)
                .reader(reader()).processor(processor()).build();   
  }
  @Bean(destroyMethod = "")
@JobScope 
public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) {
    JpaPagingItemReader reader = new JpaPagingItemReader();
    reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory());
    reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true");
    reader.setPageSize(3));
    return reader;
}
 @Bean @JobScope
 public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() {   
     return new MyItemProcessor();
 }
}

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
 @Inject private EntityManagerFactory emf;  
 @Override
public EntityManagerFactory getEntityManagerFactory() {
    return emf;
}
}  

from my ItemProcessor:

@Override
public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception {
    //do some stuff …
   //then if (condition) update the Entity pojo :   
   item.setModificationDate(new Timestamp(System.currentTimeMillis());
   item.setIsactive = false;

}

from Spring xml config file:

<tx:annotation-driven transaction-manager="transactionManager" />     
<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
    <property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
    <property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" />
    <property name="username" value="sa" />
    <property name="password" value="" />
</bean>

trace/log summarized :

11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1]
11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2]
11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3]

11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...

11:16:06.681 DEBUG SQL- select ...etc... from  SNUSER_CAMPAIGN snuserperc0_ 

11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7]
11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8]
11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9]

Answer

Manuel Verriez picture Manuel Verriez · Oct 24, 2014

org.springframework.batch.item.database.JpaPagingItemReader creates is own entityManager instance

(from org.springframework.batch.item.database.JpaPagingItemReader#doOpen) :

entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);

If you are within a transaction, as it seems to be, reader entities are not detached (from org.springframework.batch.item.database.JpaPagingItemReader#doReadPage):

    if (!transacted) {
        List<T> queryResult = query.getResultList();
        for (T entity : queryResult) {
            entityManager.detach(entity);
            results.add(entity);
        }//end if
    } else {
        results.addAll(query.getResultList());
        tx.commit();
    }

For this reason, when you update an item into processor, or writer, this item is still managed by reader's entityManager.

When the item reader reads the next chunk of data, it flushes the context to the database.

So, if we look at your case, after the first chunk of data processes, we have in database:

|id|active
|1 | false
|2 | false
|3 | false

org.springframework.batch.item.database.JpaPagingItemReader uses limit & offset to retrieve paginated data. So the next select created by the reader looks like :

select * from table where active = true offset 3 limits 3. 

Reader will miss the items with id 4,5,6, because they are now the first rows retrieved by database.

What you can do, as a workaround, is to use jdbc implementation (org.springframework.batch.item.database.JdbcPagingItemReader) as it does not use limit & offset. It is based on a sorted column (typically the id column), so you will not miss any data. Of course, you will have to update your data into the writer (using either JPA ou pure JDBC implementation)

Reader will be more verbose:

@Bean
public ItemReader<? extends Entity> reader() {
    JdbcPagingItemReader<Entity> reader = new JdbcPagingItemReader<Entity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
    sqlPagingQueryProviderFactoryBean.setSelectClause("select *");
    sqlPagingQueryProviderFactoryBean.setFromClause("from <your table name>");
    sqlPagingQueryProviderFactoryBean.setWhereClause("where active = true");
    sqlPagingQueryProviderFactoryBean.setSortKey("id");
    try {
        reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource(dataSource);
    reader.setPageSize(3);
    reader.setRowMapper(new BeanPropertyRowMapper<Entity>(Entity.class));
    return reader;