Make Http call using ReactiveX for Java

WowBow picture WowBow · Feb 25, 2017 · Viewed 7.1k times · Source

I am new to ReactiveX for Java and I've the following code block that make external http call but it is not async. We are using rxjava 1.2, and Java 1.8

  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

I've the following code block I found online but I couldn't totally understand it and how I can apply it to my code base.

private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }

Answer

SergGr picture SergGr · Mar 7, 2017

If I understand you correctly, you need something like this to wrap your existing callExternalUrl

static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> callExternalUrl(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

Short description of the code:

  1. It schedules execution of the existing callExternalUrl on the Schedulers.io
  2. Does minimal transformation of ResponseEntity<T> into successful T and error cases. It happens on the io scheduler as well but it is not important as it is really short. (If there was an exception inside callExternalUrl, it is passed as is.)
  3. Makes subscriber to the result to be executed on Schedulers.computation

Caveats:

  1. You probably want to use your custom schedulers for both subscribeOn and observeOn
  2. You probably want to have some better logic in the first lambda passed to flatMap to distinguish between success and error and definitely you want some more specific exception type.

Higher-order magic

If you are willing to use higher-order functions and trade a little bit of performance for less code duplication you can do something like this:

// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> externalCall.call(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
    return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}

Where MyClass is wherever your callExternalUrl is.


Update (Async calls only)

private static RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // here you might pass custom ExecutorService

private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
    return httpClient.target(url)
            .request()
            .headers(httpHeaders) // assuming httpHeaders is something global as in your example
            .rx()
            .method(httpMethod, entity)
            .map(resp -> {
                if (200 != resp.getStatus()) {
                    throw new RuntimeException("Bad status code " + resp.getStatus());
                } else {
                    if (!resp.hasEntity()) {
                        // return null; // or error?
                        throw new RuntimeException("Empty response"); // or empty?
                    } else {
                        try {
                            return resp.readEntity(String.class);
                        } catch (Exception ex) {
                            throw new RuntimeException(ex); // wrap exception into unchecked
                        }
                    }
                }
            })
            .observeOn(Schedulers.computation());
}

private Observable<String> executeGetAsync(String url) {
    return executeHttpAsync(url, "GET", null);
}

private Observable<String> executePostAsync(String url, String json) {
    return executeHttpAsync(url, "POST", Entity.json(json));
}

Again similar caveats apply:

  1. You probably want to use your custom schedulers for both newClient call and observeOn
  2. You probably want to have some better logic for error handling than just checking whether it is HTTP 200 or not and definitely you want some more specific exception type. But this is all business-logic specific so it is up to you.

Also it is not clear from your example how exactly the body of the request (HttpEntity) is build and whether you actually always want String as a response as it is in your original example. Still I just replicated your logic as is. If you need something more you probably should refer to the documentation at https://jersey.java.net/documentation/2.25/media.html#json