Context
We have a batch job that replicates localized country names (i.e. translations of country names to different languages) to our DB from the external one. The idea was to process all localized country names for a single country in 1 chunk (i.e. first chunk - all translations for Andorra, next chunk - all translations for U.A.E., etc.). We use JdbcCursorItemReader
for reading external data + some oracle analytic functions to provide total number of translations available for the country: something like
select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code
Problem
So cutting this input by chunks looks simple: stop chunk when you've read the exact amount of rows specified in lng_count
and start a new one with the next read row, but it appears not to be so simple practically :(
First thing to try is a custom completion policy. But the problem is, it doesn't have access to the last item, read by ItemReader
- you should explicitly put it to context in reader and get it back in policy. Don't like it 'cause it requires additional reader modifications/adding reader listeners. Moreover I don't like the same item being serialized/deserialized back and forth. And I don't feel like JobContext
/StepContext
is a good place for such data.
There's also RepeatContext
which looks like a better place for such data, but I was not able to get to it easily...
So finally we end up with solution like this:
@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
final StepBuilderFactory stepBuilderFactory,
final MasterdataCountryNameReader countryNameReader,
final MasterdataCountryNameProcessor countryNameProcessor,
final MasterdataCountryNameWriter writer) {
/* Use the same fixed-commit policy, but update it's chunk size dynamically */
final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
return stepBuilderFactory.get("localizedCountryNamesStep")
.<ExtCountryLng, LocalizedCountryName> chunk(policy)
.reader(countryNameReader)
.listener(new ItemReadListener<ExtCountryLng>() {
@Override
public void beforeRead() {
// do nothing
}
@Override
public void afterRead(final ExtCountryLng item) {
/* Update the cunk size after every read: consequent reads
inside the same country = same chunk do nothing since lngCount is always the same there */
policy.setChunkSize(item.getLngCount());
}
@Override
public void onReadError(final Exception ex) {
// do nothing
}
})
.processor(countryNameProcessor)
.writer(writer)
.faultTolerant()
.skip(RuntimeException.class)
.skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
.retryLimit(0) // this solution disables only retry, but not recover
.build();
}
It's working, it requires minimum code changes, but it's still a bit ugly for me. So I'm wondering, is there another elegant way to do a dynamic chunk size in Spring Batch when all the required information is already available at the ItemReader
?
The easiest way would be to simply partition your step by country. That way each country would get its own step, and you would also be able to thread across countries for increased performance.
If it needs to be a single reader, you can wrap a delegate PeekableItemReader
and extend SimpleCompletionPolicy
to accomplish your goal.
public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> {
private PeekableItemReader<? extends CountrySpecificItem> delegate;
private CountrySpecificItem currentReadItem = null;
@Override
public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
currentReadItem = delegate.read();
return currentReadItem;
}
@Override
public RepeatContext start(final RepeatContext context) {
return new ComparisonPolicyTerminationContext(context);
}
protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext {
public ComparisonPolicyTerminationContext(final RepeatContext context) {
super(context);
}
@Override
public boolean isComplete() {
final CountrySpecificItem nextReadItem = delegate.peek();
// logic to check if same country
if (currentReadItem.isSameCountry(nextReadItem)) {
return false;
}
return true;
}
}
}
Then in your context you would define:
<batch:tasklet>
<batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>
<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
<property name="delegate" ref="peekableReader" />
</bean>
<bean id="peekableReader" class="YourPeekableItemReader" />
Edit: Thinking back over your issue, partitioning strikes me as the cleanest approach. Using a partitioned step, each ItemReader (make sure scope="step"
) will be passed a single countryName
from the step execution context. Yes, you'll need a custom Partitioner
class to build up your map of execution contexts (one entry per country) and a hard-coded commit interval large enough to accommodate your largest unit of work, but after that everything is very boilerplate, and since each slave step will only be a single chunk, restart should be a relative breeze for any countries that might hit issues.