I am new to ReactiveX for Java and I've the following code block that make external http call but it is not async. We are using rxjava 1.2, and Java 1.8
private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {
RestTemplate restTemplate;
HttpEntity request;
request = new HttpEntity(jsonContent, httpHeaders);
return restTemplate.exchange(url, httpMethod, request, String.class);
}
I've the following code block I found online but I couldn't totally understand it and how I can apply it to my code base.
private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {
return httpClient.target(url)
.request()
.rx()
.get()
.subscribeOn(Schedulers.io())
.map(mapper);
}
If I understand you correctly, you need something like this to wrap your existing callExternalUrl
static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
return Observable.fromCallable(() -> callExternalUrl(url, json, method))
.subscribeOn(Schedulers.io())
.flatMap(re -> {
if (re.hasBody())
return Observable.just(re.getBody());
else
return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
},
e -> Observable.error(e),
(Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
.observeOn(Schedulers.computation());
}
Short description of the code:
callExternalUrl
on the Schedulers.io
ResponseEntity<T>
into successful T
and error cases. It happens on the io
scheduler as well but it is not important as it is really short. (If there was an exception inside callExternalUrl
, it is passed as is.)Schedulers.computation
Caveats:
subscribeOn
and observeOn
flatMap
to distinguish between success and error and definitely you want some more specific exception type.Higher-order magic
If you are willing to use higher-order functions and trade a little bit of performance for less code duplication you can do something like this:
// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
return Observable.fromCallable(() -> externalCall.call(url, json, method))
.subscribeOn(Schedulers.io())
.flatMap(re -> {
if (re.hasBody())
return Observable.just(re.getBody());
else
return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
},
e -> Observable.error(e),
(Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
.observeOn(Schedulers.computation());
}
static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}
Where MyClass
is wherever your callExternalUrl
is.
Update (Async calls only)
private static RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // here you might pass custom ExecutorService
private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
return httpClient.target(url)
.request()
.headers(httpHeaders) // assuming httpHeaders is something global as in your example
.rx()
.method(httpMethod, entity)
.map(resp -> {
if (200 != resp.getStatus()) {
throw new RuntimeException("Bad status code " + resp.getStatus());
} else {
if (!resp.hasEntity()) {
// return null; // or error?
throw new RuntimeException("Empty response"); // or empty?
} else {
try {
return resp.readEntity(String.class);
} catch (Exception ex) {
throw new RuntimeException(ex); // wrap exception into unchecked
}
}
}
})
.observeOn(Schedulers.computation());
}
private Observable<String> executeGetAsync(String url) {
return executeHttpAsync(url, "GET", null);
}
private Observable<String> executePostAsync(String url, String json) {
return executeHttpAsync(url, "POST", Entity.json(json));
}
Again similar caveats apply:
newClient
call and observeOn
Also it is not clear from your example how exactly the body of the request (HttpEntity
) is build and whether you actually always want String
as a response as it is in your original example. Still I just replicated your logic as is. If you need something more you probably should refer to the documentation at https://jersey.java.net/documentation/2.25/media.html#json