I am developing Spring Batch CompositeItemReader & Writer
Example. I developed XML file and able to compile code successfully, but when running mu sample code gives me following error. Not sure what is going wrong?
org.springframework.beans.NotReadablePropertyException: Invalid property 'customerNumber' of bean class [java.lang.String]: Bean property 'customerNumber' is not readable or has an invalid getter method: Does the return type of the getter match the parameter type of the setter?
at org.springframework.beans.AbstractNestablePropertyAccessor.getPropertyValue(AbstractNestablePropertyAccessor.java:619)
at org.springframework.beans.AbstractNestablePropertyAccessor.getPropertyValue(AbstractNestablePropertyAccessor.java:610)
at org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor.extract(BeanWrapperFieldExtractor.java:57)
at org.springframework.batch.item.file.transform.ExtractorLineAggregator.aggregate(ExtractorLineAggregator.java:54)
at org.springframework.batch.item.file.FlatFileItemWriter.write(FlatFileItemWriter.java:267)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy4.write(Unknown Source)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:175)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:151)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:274)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:199)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:77)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:198)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:162)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:141)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:134)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
at com.common.batch.main.CompositeMain.main(CompositeMain.java:24)
jdbc-composite-item-reader-job.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="classpath:context-datasource.xml" />
<!-- JobRepository and JobLauncher are configuration/setup classes -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<!-- Step will need a transaction manager -->
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- =========================================================== -->
<job id="compositeJdbcReaderJob" xmlns="http://www.springframework.org/schema/batch">
<step id="compositeJdbcReaderStep" next="compositeJdbcReaderStep2">
<tasklet>
<chunk reader="compositeItemReader" writer="itemWriter" commit-interval="5" />
</tasklet>
</step>
<step id="compositeJdbcReaderStep2">
<tasklet>
<chunk reader="compositeItemReader2" writer="itemWriter2" commit-interval="5" />
</tasklet>
</step>
</job>
<!-- ============= Composite Item Reader ================ -->
<bean id="compositeItemReader" class="com.common.batch.reader.CompositeCursorItemReader">
<property name="unifyingMapper">
<bean class="com.common.batch.mapper.DefaultUnifyingStringItemsMapper" />
</property>
<property name="cursorItemReaders">
<list>
<ref bean="itemReader1" />
</list>
</property>
</bean>
<bean id="compositeItemReader2" class="com.common.batch.reader.CompositeCursorItemReader">
<property name="unifyingMapper">
<bean class="com.common.batch.mapper.DefaultUnifyingStringItemsMapper" />
</property>
<property name="cursorItemReaders">
<list>
<ref bean="itemReader2" />
</list>
</property>
</bean>
<!-- ========== ItemReader =============== -->
<bean id="itemReader1" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="saveState" value="true" />
<property name="sql">
<value>
<![CDATA[ ${select.sql.customers} ]]>
</value>
</property>
<property name="rowMapper">
<bean class="com.common.batch.mapper.CustomerMapper" />
</property>
</bean>
<bean id="itemReader2" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="saveState" value="true" />
<property name="sql">
<value>
<![CDATA[ ${select.sql.employees} ]]>
</value>
</property>
<property name="rowMapper">
<bean class="com.common.batch.mapper.EmployeeMapper" />
</property>
</bean>
<!-- ============= Flat File Writters ============= -->
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="${output.file.location}" />
<!-- <property name="appendAllowed" value="true" /> -->
<property name="lineAggregator">
<!-- An Aggregator which converts an object into delimited list of strings -->
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="|" />
<property name="fieldExtractor">
<!-- Extractor which returns the value of beans property through reflection -->
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,
addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit" />
</bean>
</property>
</bean>
</property>
</bean>
<bean id="itemWriter2" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="${output.file.location}" />
<property name="appendAllowed" value="true" />
<property name="lineAggregator">
<!-- An Aggregator which converts an object into delimited list of strings -->
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="|" />
<property name="fieldExtractor">
<!-- Extractor which returns the value of beans property through reflection -->
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle" />
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Employee.java
public class Employee implements Serializable{
private static final long serialVersionUID = 1L;
private Integer employeeNumber;
private String lastName;
private String firstName;
private String extension;
private String email;
private String officeCode;
private Integer reportsTo;
private String jobTitle;
// setters and getters
}
Customer.java
public class Customer implements Serializable{
private static final long serialVersionUID = 1L;
private Integer customerNumber;
private String customerName;
private String contactLastName;
private String contactFirstName;
private String phone;
private String addressLine1;
private String addressLine2;
private String city;
private String state;
private String postalCode;
private String country;
private Integer salesRepEmployeeNumber;
private Double creditLimit;
public Integer getCustomerNumber() {
return customerNumber;
}
public void setCustomerNumber(Integer customerNumber) {
this.customerNumber = customerNumber;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
public String getContactLastName() {
return contactLastName;
}
public void setContactLastName(String contactLastName) {
this.contactLastName = contactLastName;
}
public String getContactFirstName() {
return contactFirstName;
}
public void setContactFirstName(String contactFirstName) {
this.contactFirstName = contactFirstName;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getAddressLine1() {
return addressLine1;
}
public void setAddressLine1(String addressLine1) {
this.addressLine1 = addressLine1;
}
public String getAddressLine2() {
return addressLine2;
}
public void setAddressLine2(String addressLine2) {
this.addressLine2 = addressLine2;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getPostalCode() {
return postalCode;
}
public void setPostalCode(String postalCode) {
this.postalCode = postalCode;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public Integer getSalesRepEmployeeNumber() {
return salesRepEmployeeNumber;
}
public void setSalesRepEmployeeNumber(Integer salesRepEmployeeNumber) {
this.salesRepEmployeeNumber = salesRepEmployeeNumber;
}
public Double getCreditLimit() {
return creditLimit;
}
public void setCreditLimit(Double creditLimit) {
this.creditLimit = creditLimit;
}
/*@Override
public String toString() {
return customerNumber + "|" + customerName + "|" + contactLastName + "|" + contactFirstName + "|" +
phone+ "|" + addressLine1 + "|" + addressLine2 + "|" + city + "|" + state+ "|" + postalCode + "|" +
country+ "|" + salesRepEmployeeNumber + "|" + creditLimit;
}*/
}
CompositeMain.java
public class CompositeMain {
@SuppressWarnings("resource")
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("composite/jdbc-composite-item-reader-job.xml");
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("compositeJdbcReaderJob");
JobExecution execution;
try {
execution = jobLauncher.run(job, new JobParameters());
System.out.println("Job Exit Status : "+ execution.getStatus());
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
System.out.println("Done !!");
}
}
My DB details:
mysql> desc customers;
+------------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------------+-------------+------+-----+---------+-------+
| customerNumber | int(11) | NO | PRI | NULL | |
| customerName | varchar(50) | NO | | NULL | |
| contactLastName | varchar(50) | NO | | NULL | |
| contactFirstName | varchar(50) | NO | | NULL | |
| phone | varchar(50) | NO | | NULL | |
| addressLine1 | varchar(50) | NO | | NULL | |
| addressLine2 | varchar(50) | YES | | NULL | |
| city | varchar(50) | NO | | NULL | |
| state | varchar(50) | YES | | NULL | |
| postalCode | varchar(15) | YES | | NULL | |
| country | varchar(50) | NO | | NULL | |
| salesRepEmployeeNumber | int(11) | YES | MUL | NULL | |
| creditLimit | double | YES | | NULL | |
+------------------------+-------------+------+-----+---------+-------+
13 rows in set (0.14 sec)
mysql> desc employees;
+----------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------------+--------------+------+-----+---------+-------+
| employeeNumber | int(11) | NO | PRI | NULL | |
| lastName | varchar(50) | NO | | NULL | |
| firstName | varchar(50) | NO | | NULL | |
| extension | varchar(10) | NO | | NULL | |
| email | varchar(100) | NO | | NULL | |
| officeCode | varchar(10) | NO | MUL | NULL | |
| reportsTo | int(11) | YES | MUL | NULL | |
| jobTitle | varchar(50) | NO | | NULL | |
+----------------+--------------+------+-----+---------+-------+
8 rows in set (0.01 sec)
I see issue is already there: http://forum.spring.io/forum/spring-projects/container/112929-concurrent-problem-with-beanwrapperimpl-getpropertyvalue-in-3-1
pom.xml
<properties>
<java.version>1.8</java.version>
<springframework.version>4.2.4.RELEASE</springframework.version>
<springbatch.version>3.0.1.RELEASE</springbatch.version>
<mysql.version>5.1.31</mysql.version>
<joda-time.version>2.8</joda-time.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${springframework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${springframework.version}</version>
</dependency>
<!-- Spring XML to/back object -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${springframework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${springframework.version}</version>
</dependency>
<!-- Spring Batch -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${springbatch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${springbatch.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda-time.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
CustomerMapper.java
public class CustomerMapper implements RowMapper<Customer>{
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
Customer customer = new Customer();
customer.setCustomerNumber(rs.getInt("customerNumber"));
customer.setCustomerName(rs.getString("customerName"));
customer.setContactLastName(rs.getString("contactLastName"));
customer.setContactFirstName(rs.getString("contactFirstName"));
customer.setPhone(rs.getString("phone"));
customer.setAddressLine1(rs.getString("addressLine1"));
customer.setAddressLine2(rs.getString("addressLine2"));
customer.setCity(rs.getString("city"));
customer.setState(rs.getString("state"));
customer.setPostalCode(rs.getString("postalCode"));
customer.setCountry(rs.getString("country"));
customer.setSalesRepEmployeeNumber(rs.getInt("salesRepEmployeeNumber"));
customer.setCreditLimit(rs.getDouble("creditLimit"));
return customer;
}
}
EmployeeMapper.java
public class EmployeeMapper implements RowMapper<Employee>{
@Override
public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
Employee employee = new Employee();
employee.setEmployeeNumber(rs.getInt("employeeNumber"));
employee.setLastName(rs.getString("lastName"));
employee.setFirstName(rs.getString("firstName"));
employee.setExtension(rs.getString("extension"));
employee.setEmail(rs.getString("email"));
employee.setOfficeCode(rs.getString("officeCode"));
employee.setReportsTo(rs.getInt("reportsTo"));
employee.setJobTitle(rs.getString("jobTitle"));
return employee;
}
}
CompositeCursorItemReader.java
public class CompositeCursorItemReader<T> implements ItemStreamReader<T> {
private List<AbstractCursorItemReader<?>> cursorItemReaders;
private UnifyingItemsMapper<T> unifyingMapper;
@Override
public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// read from all registered readers
List items = new ArrayList();
for (AbstractCursorItemReader<?> cursorItemReader : cursorItemReaders) {
items.add(cursorItemReader.read());
}
// delegate to mapper
return unifyingMapper.mapItems(items);
}
@Override
public void update(ExecutionContext executionContext) {
for (ItemStream itemStream : cursorItemReaders) {
itemStream.update(executionContext);
}
}
@Override
public void close() throws ItemStreamException {
for (ItemStream itemStream : cursorItemReaders) {
itemStream.close();
}
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
for (ItemStream itemStream : cursorItemReaders) {
itemStream.open(executionContext);
}
}
public void setUnifyingMapper(UnifyingItemsMapper<T> mapper) {
this.unifyingMapper = mapper;
}
public void setCursorItemReaders(List<AbstractCursorItemReader<?>> cursorItemReaders) {
this.cursorItemReaders = cursorItemReaders;
}
}
UnifyingItemsMapper.java
public interface UnifyingItemsMapper<T> {
T mapItems(List<?> items) throws Exception;
}
Spring is trying to get the property cutomerNumber
from the String
class :
Invalid property 'customerNumber' of bean class [java.lang.String]
which is wrong because the String
is not supposed to be configured as the item
of your job 's step, the Item
in this context is supposed to be the Customer
.
your beans are configured correctly except for one bean :compositeItemReader
The read method of this bean is returning a String
instead of a Customer
which is because of this :
public class CompositeCursorItemReader<T> implements ItemStreamReader<T> {
/** Registered ItemStreamReaders. */
private List<AbstractCursorItemReader<?>> cursorItemReaders;
/** Mandatory Unifying Mapper Implementation. */
private UnifyingItemsMapper<T> unifyingMapper;
@Override
public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// read from all registered readers
List items = new ArrayList();
for (AbstractCursorItemReader<?> cursorItemReader : cursorItemReaders) {
items.add(cursorItemReader.read());
}
// delegate to mapper
return unifyingMapper.mapItems(items);
}
}
Its property UnifyingMapper is configured as follow:
<property name="unifyingMapper">
<bean class="com.common.batch.mapper.DefaultUnifyingStringItemsMapper" />
</property>
here is the implementation of the UnifyingItemsMapper
you are using:
public class DefaultUnifyingStringItemsMapper implements UnifyingItemsMapper<String> {
/** {@inheritDoc} */
@Override
public String mapItems(List<?> items) throws Exception {
if (items != null && items.size() > 0) {
StringBuilder sb = new StringBuilder();
for (Object item : items) {
if (item != null) {
sb.append(item);
}
}
if (sb.length() > 0) {
return sb.toString();
} else {
return null;
}
} else {
return null;
}
}
}
the solution to this is to create a new Class that implements UnifyingItemsMapper<Customer>
and configure it for your CompositeCursorItemReader#unifyingMapper
Or just use your itemReader
directly if you don't need a UnifyingItemsMapper
:
<job id="compositeJdbcReaderJob" xmlns="http://www.springframework.org/schema/batch">
<step id="compositeJdbcReaderStep" next="compositeJdbcReaderStep2">
<tasklet>
<chunk reader="itemReader1" writer="itemWriter" commit-interval="5" />
</tasklet>
</step>
<step id="compositeJdbcReaderStep2">
<tasklet>
<chunk reader="itemReader2" writer="itemWriter2" commit-interval="5" />
</tasklet>
</step>
</job>