I am having hard time to combine Spring-batch with Spring-batch-admin using Spring-boot.
Because I am using Spring-batch-admin I had to disable @EnableBatchProcessing and I manually configured the two builders it provides.
that came from my question here:
Error Integrating spring-batch and Spring-Batch-admin
Now I am trying to do simple scenario when I want to pass param from jobContext params into ItemReader using @StepScope and injection of the properties
I followed the example over here(the example is without spring-batch-admin and spring boot)
And I get this exception:
2015-01-06 18:22:09.852 ERROR 10260 --- [nio-8080-exec-1] o.s.batch.core.step.AbstractStep : Exception while closing step execution resources in step step2 in job MyTestJob
java.lang.ClassCastException: com.sun.proxy.$Proxy74 cannot be cast to org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:717)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653)
at org.springframework.batch.item.file.FlatFileItemReader$$EnhancerBySpringCGLIB$$e99a9c6b.close(<generated>)
at org.springframework.batch.item.support.CompositeItemStream.close(CompositeItemStream.java:85)
at org.springframework.batch.core.step.tasklet.TaskletStep.close(TaskletStep.java:305)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:267)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:386)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
at com.mycompany.notification.processor.service.controller.BatchJobController.job1(BatchJobController.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandleMethod(RequestMappingHandlerAdapter.java:777)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:706)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:943)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:877)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:966)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:857)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:618)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:842)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:725)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:291)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:88)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.boot.actuate.autoconfigure.EndpointWebMvcAutoConfiguration$ApplicationContextHeaderFilter.doFilterInternal(EndpointWebMvcAutoConfiguration.java:288)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.batch.admin.web.filter.ParameterUnpackerFilter.doFilterInternal(ParameterUnpackerFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.boot.actuate.autoconfigure.MetricFilterAutoConfiguration$MetricsFilter.doFilterInternal(MetricFilterAutoConfiguration.java:90)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:219)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:142)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:537)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1085)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:658)
at org.apache.coyote.http11.Http11NioProtocol$Http11ConnectionHandler.process(Http11NioProtocol.java:222)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1556)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1513)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)
2015-01-06 18:22:09.866 INFO 10260 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=MyTestJob]] completed with the following parameters: [{pathToFile=snids.csv, date=1420561328719}] and the following status: [FAILED]
This is how I create my Job via controller:
@RequestMapping(value = "/job1")
public @ResponseBody StatusResponse job1() {
try {
JobParameters jobParameters = new JobParametersBuilder().addString("pathToFile", "snids.csv").addDate("date", new Date()).toJobParameters();
jobLauncher.run(job1,jobParameters);
return new StatusResponse(true);
} catch (JobInstanceAlreadyCompleteException ex) {
return new StatusResponse(false, "This job has been completed already!");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Application.class:
@ComponentScan({"com.mycompany.notification.processor.service"})
@EnableAutoConfiguration
@Configuration
@ImportResource({
"classpath:integration-context.xml","classpath:launch-context.xml"})
@PropertySource("file:///d://etc/mycompany/services/pushExecuterService/pushExecuterServices.properties")
//@Import({ ServletConfiguration.class, WebappConfiguration.class })
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
System.out.printf("hello man");
}
}
And BatchConfiguration class:
@Configuration
public class BatchConfiguration {
private static final Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);
@Inject
private WriteToKafkaTasklet writeToKafkaTasklet;
@Inject
private StepBuilderFactory steps;
@Autowired
private DownloadFileTasklet downloadFileTasklet;
@Inject
private JobBuilderFactory jobs;
@Inject
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() throws Exception {
return this.jobs.get("MyTestJob").start(downloadFileFromETStep()).next(processFileStep()).next(pushToKafkaStep()).build();
}
private Step pushToKafkaStep() {
return this.steps.get("step3").tasklet(writeToKafkaTasklet).build();
}
private Step downloadFileFromETStep() {
return this.steps.get("step1").tasklet(downloadFileTasklet).build();
}
private static final String OVERRIDDEN_BY_EXPRESSION = null;
@Bean
public Step processFileStep() {
return stepBuilderFactory.get("step2")
.<PushItemDTO, PushItemDTO>chunk(1) //important to be one in this case to commit after every line read
.reader(reader(OVERRIDDEN_BY_EXPRESSION))
// .processor(processor())
.writer(writer())
// .listener(logProcessListener())
// .faultTolerant()
// .skipLimit(10) //default is set to 0
// .skip(MySQLIntegrityConstraintViolationException.class)
.build();
}
@Bean
public ItemWriter writer() {
return new ItemWriter() {
@Override
public void write(List items) throws Exception {
logger.debug("hello");
}
};
}
@Bean
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
public FlatFileItemReader<PushItemDTO> reader(@Value("#{jobParameters[pathToFile]}") String pathToFile){
FlatFileItemReader<PushItemDTO> itemReader = new FlatFileItemReader<PushItemDTO>();
itemReader.setLineMapper(lineMapper());
itemReader.setResource(new ClassPathResource(pathToFile));
return itemReader;
}
@Bean
public LineMapper<PushItemDTO> lineMapper() {
DefaultLineMapper<PushItemDTO> lineMapper = new DefaultLineMapper<PushItemDTO>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
// lineTokenizer.setDelimiter(",");
lineTokenizer.setStrict(false);
lineTokenizer.setNames(new String[]{"SNID"});
BeanWrapperFieldSetMapper<PushItemDTO> fieldSetMapper = new BeanWrapperFieldSetMapper<PushItemDTO>();
fieldSetMapper.setTargetType(PushItemDTO.class);
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(new PushItemFieldSetMapper());
return lineMapper;
}
}
Servlet Configuration:
@Configuration
@ImportResource("classpath:/org/springframework/batch/admin/web/resources/servlet-config.xml")
public class ServletConfiguration {
}
Webapp Configuration:
@Configuration
@ImportResource("classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml")
public class WebappConfiguration {
}
Thank you
I edited my code and instead of @StepScope I added
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
as suggested.
Now I get different exception:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'batchJobController': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.batch.core.Job com.mycompany.notification.processor.service.controller.BatchJobController.job1; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'job' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Job]: Factory method 'job' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'processFileStep' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'processFileStep' threw exception; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy59 cannot be cast to org.springframework.batch.item.file.FlatFileItemReader
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:334)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1202)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:537)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:762)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:757)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:480)
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:109)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:691)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:321)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:961)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:950)
at com.mycompany.notification.processor.service.main.Application.main(Application.java:20)
Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.batch.core.Job com.mycompany.notification.processor.service.controller.BatchJobController.job1; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'job' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Job]: Factory method 'job' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'processFileStep' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'processFileStep' threw exception; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy59 cannot be cast to org.springframework.batch.item.file.FlatFileItemReader
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:558)
at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:87)
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331)
... 16 common frames omitted
Spring Batch Admin configures the StepScope via XML which provides for java proxying as the mechanism for proxies. However, @StepScope
uses dynamic subclasses. In order for this to work, instead of using the @StepScope
shortcut, use the following:
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
In addition to the above update, you'll want to return the interface (instead of the implementation) so that it is proxied correctly. So in this case, you'll want to return ItemStreamReader
instead of FlatFileItemReader
.