Spring Batch custom completion policy for dynamic chunk size

FlasH from Ru picture FlasH from Ru · May 23, 2016 · Viewed 10.2k times · Source

Context

We have a batch job that replicates localized country names (i.e. translations of country names to different languages) to our DB from the external one. The idea was to process all localized country names for a single country in 1 chunk (i.e. first chunk - all translations for Andorra, next chunk - all translations for U.A.E., etc.). We use JdbcCursorItemReader for reading external data + some oracle analytic functions to provide total number of translations available for the country: something like

select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code

Problem

So cutting this input by chunks looks simple: stop chunk when you've read the exact amount of rows specified in lng_count and start a new one with the next read row, but it appears not to be so simple practically :(

First thing to try is a custom completion policy. But the problem is, it doesn't have access to the last item, read by ItemReader - you should explicitly put it to context in reader and get it back in policy. Don't like it 'cause it requires additional reader modifications/adding reader listeners. Moreover I don't like the same item being serialized/deserialized back and forth. And I don't feel like JobContext/StepContext is a good place for such data.

There's also RepeatContext which looks like a better place for such data, but I was not able to get to it easily...

So finally we end up with solution like this:

@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
        final StepBuilderFactory stepBuilderFactory,
        final MasterdataCountryNameReader countryNameReader,
        final MasterdataCountryNameProcessor countryNameProcessor,
        final MasterdataCountryNameWriter writer) {
    /* Use the same fixed-commit policy, but update it's chunk size dynamically */
    final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
    return stepBuilderFactory.get("localizedCountryNamesStep")
            .<ExtCountryLng, LocalizedCountryName> chunk(policy)
            .reader(countryNameReader)
            .listener(new ItemReadListener<ExtCountryLng>() {

                @Override
                public void beforeRead() {
                    // do nothing
                }

                @Override
                public void afterRead(final ExtCountryLng item) {
                    /* Update the cunk size after every read: consequent reads 
                    inside the same country = same chunk do nothing since lngCount is always the same there */
                    policy.setChunkSize(item.getLngCount());
                }

                @Override
                public void onReadError(final Exception ex) {
                    // do nothing
                }
            })
            .processor(countryNameProcessor)
            .writer(writer)
            .faultTolerant()
            .skip(RuntimeException.class)
            .skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
            .retryLimit(0) // this solution disables only retry, but not recover
            .build();
}

It's working, it requires minimum code changes, but it's still a bit ugly for me. So I'm wondering, is there another elegant way to do a dynamic chunk size in Spring Batch when all the required information is already available at the ItemReader?

Answer

Dean Clark picture Dean Clark · May 23, 2016

The easiest way would be to simply partition your step by country. That way each country would get its own step, and you would also be able to thread across countries for increased performance.

If it needs to be a single reader, you can wrap a delegate PeekableItemReader and extend SimpleCompletionPolicy to accomplish your goal.

public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> {

    private PeekableItemReader<? extends CountrySpecificItem> delegate;

    private CountrySpecificItem currentReadItem = null;

    @Override
    public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        currentReadItem = delegate.read();
        return currentReadItem;
    }

    @Override
    public RepeatContext start(final RepeatContext context) {
        return new ComparisonPolicyTerminationContext(context);
    }

    protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext {

        public ComparisonPolicyTerminationContext(final RepeatContext context) {
            super(context);
        }

        @Override
        public boolean isComplete() {
            final CountrySpecificItem nextReadItem = delegate.peek();

            // logic to check if same country
            if (currentReadItem.isSameCountry(nextReadItem)) {
                return false;
            }

            return true;
        }
    }
}

Then in your context you would define:

<batch:tasklet>
    <batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>

<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
     <property name="delegate" ref="peekableReader" />
</bean>


<bean id="peekableReader" class="YourPeekableItemReader" />

Edit: Thinking back over your issue, partitioning strikes me as the cleanest approach. Using a partitioned step, each ItemReader (make sure scope="step") will be passed a single countryName from the step execution context. Yes, you'll need a custom Partitioner class to build up your map of execution contexts (one entry per country) and a hard-coded commit interval large enough to accommodate your largest unit of work, but after that everything is very boilerplate, and since each slave step will only be a single chunk, restart should be a relative breeze for any countries that might hit issues.