I wrote a spring batch job that processes List of Lists.
Reader returns List of List. Processor works on each ListItem and returns processed List. Writer writes stuff to DB and sftp from List of List.
I have a use case where I call Async REST api from spring batch processor. On ListenableFuture response I implemented LitenableFutureCallback to handle success and failure, which works as expected, but before the async call returns something, ItemProcessor dosen't wait for call backs from async api and returns object(List) to writer.
I am not sure how to implement and handle async calls from ItemProcessor.
I did read about AsyncItemProcessor and AsyncItemWriter, but I am not sure if that is something I should use in this scenario.
I also thought of calling get() on ListenableFuture response from AsyncRestTemplate, but as per documentation it will block the current thread until it receives the response.
I am seeking for some help on how should I implement this. Code snippet below:
Processor:
public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {
... Initialization code
@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
logger.info("Entering MailingDocsEntity processor");
List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);
for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());
..code to get the file
//If the file is not a pdf convert it
String fileExtension = readFromSpResponse.getFileExtension();
String fileName = readFromSpResponse.getFileName();
byte[] fileBytes = readFromSpResponse.getByteArray();
try {
//Do checks to make sure PDF file is being sent
if (!"pdf".equalsIgnoreCase(fileExtension)) {
//Only doc, docx and xlsx conversions are supported
...Building REquest object
//make async call to pdf conversion service
pdfService.convertDocxToPdf(request, mailingDocsEntity);
} else {
logger.error("The file cannot be converted to a pdf.\n"
);
}
}
} catch (Exception ex){
logger.error("There has been an exception while processing data", ex);
}
}
return synchronizedList;
}
}
Async PdfConversion Service Class:
@Service
public class PdfService{
@Autowired
@Qualifier("MicroServiceAsyncRestTemplate")
AsyncRestTemplate microServiceAsyncRestTemplate;
public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){
ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<?> entity = new HttpEntity<>(request, headers);
ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);
ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>() {
@Override
public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
...code to do stuff on success
}
@Override
public void onFailure(Throwable ex) {
pdfResponse.setMessage("Exception while retrieving response");
}
});
} catch (Exception e) {
String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
pdfResponse.setMessage(message);
logger.error(message, e);
}
return pdfResponse;
}
}
My original batch job was synchronous, i am converting to Async to faster processing. I did try to look for similar questions, but could not find enough information. Any pointers or help is highly appreciated.
Thank you!!
I did read about AsyncItemProcessor and AsyncItemWriter, but I am not sure if that is something I should use in this scenario.
Yes, AsyncItemProcessor
and AsyncItemWriter
are suitable for your use case. The AsyncItemProcessor
will execute the logic (your rest call) of the delegate ItemProcessor
for an item on a new thread. Once the item completes, the Future
of the result is passed to the AsynchItemWriter
to be written. The AsynchItemWriter
will then unwrap the Future
and write the item. The advantage of these components is that you don't have to deal with Future
s wrapping, unwrapping, etc yourself.
You can find:
Hope this helps.