Rahul Somasunderam

Programmer, Cyclist, Trivia Junkie.


06 September 2016

In this post we’re going to look at what you can do to use a CXF client in a non-blocking manner inside of Ratpack.

By default CXF offers a blocking client called the HTTP Transport. If you were to use CXF that way, you could wrap CXF calls inside a Blocking promise. While that does the job, it does limit your application in some way.

Since 3.0.0, CXF also offers rt-transports-http-netty-client. This uses Netty, and can be used in a non-blocking manner.

I’m going to take ihe-iti as an example of a SOAP Library. Up until version 0.3, it only supported the synchronous APIs. So if you took a wsdl like PIXConsumer, this is what your generated PortType would look like

PIXConsumerPortType.java
@WebService(name = "PIXConsumer_PortType", targetNamespace = "urn:ihe:iti:pixv3:2007")
@SOAPBinding(parameterStyle = ParameterStyle.BARE)
@XmlSeeAlso({ObjectFactory.class})
public interface PIXConsumerPortType {
    @WebMethod(operationName = "PIXConsumer_PRPA_IN201302UV02",
            action = "urn:hl7-org:v3:PRPA_IN201302UV02")
    @WebResult(name = "MCCI_IN000002UV01", targetNamespace = "urn:hl7-org:v3",
            partName = "Body")
    MCCIIN000002UV01 pixConsumerPRPAIN201302UV02(
            @WebParam(name = "PRPA_IN201302UV02", targetNamespace = "urn:hl7-org:v3",
                    partName = "Body") PRPAIN201302UV02 var1);
}

If you told wsdl2java to generated the async APIs as well, you’ll get something like this

PIXConsumerPortType.java
@WebService(name = "PIXConsumer_PortType", targetNamespace = "urn:ihe:iti:pixv3:2007")
@SOAPBinding(parameterStyle = ParameterStyle.BARE)
@XmlSeeAlso({ObjectFactory.class})
public interface PIXConsumerPortType {
    @WebMethod(operationName = "PIXConsumer_PRPA_IN201302UV02",
            action = "urn:hl7-org:v3:PRPA_IN201302UV02")
    Response<MCCIIN000002UV01> pixConsumerPRPAIN201302UV02Async(
            @WebParam(name = "PRPA_IN201302UV02", targetNamespace = "urn:hl7-org:v3",
                    partName = "Body") PRPAIN201302UV02 var1);

    @WebMethod(operationName = "PIXConsumer_PRPA_IN201302UV02",
            action = "urn:hl7-org:v3:PRPA_IN201302UV02")
    Future<?> pixConsumerPRPAIN201302UV02Async(
            @WebParam(name = "PRPA_IN201302UV02", targetNamespace = "urn:hl7-org:v3",
                    partName = "Body") PRPAIN201302UV02 var1,
            @WebParam(name = "PIXConsumer_PRPA_IN201302UV02Response", targetNamespace = "",
                    partName = "asyncHandler") AsyncHandler<MCCIIN000002UV01> var2);

    @WebMethod(operationName = "PIXConsumer_PRPA_IN201302UV02",
            action = "urn:hl7-org:v3:PRPA_IN201302UV02")
    @WebResult(name = "MCCI_IN000002UV01", targetNamespace = "urn:hl7-org:v3", partName = "Body")
    MCCIIN000002UV01 pixConsumerPRPAIN201302UV02(
            @WebParam(name = "PRPA_IN201302UV02", targetNamespace = "urn:hl7-org:v3",
                    partName = "Body") PRPAIN201302UV02 var1);
}

It is tempting to use RxJava and wrap this method into an Observable.

Response<MCCIIN000002UV01> pixConsumerPRPAIN201302UV02Async(PRPAIN201302UV02 var1);

After all, RxJava has a method with the signature

public static <T> Observable<T> from(Future<? extends T> future)

And javax.xml.ws.Response<T> extends java.util.concurrent.Future<T>

Unfortunately, that will not work as intended. You will be able to call your webservice, and get data, but all your operations will be limited by the number of threads.

To get it to work correctly, you’ll have to use the other asynchronous method, i.e.

Future<?> pixConsumerPRPAIN201302UV02Async(
        PRPAIN201302UV02 var1, AsyncHandler<MCCIIN000002UV01> var2);

You’ll have to turn the response into a Ratpack Promise first, before turning it into an Observable.

This is a how you turn the response into a Promise

import groovy.transform.TupleConstructor
import ratpack.exec.Downstream

import javax.xml.ws.AsyncHandler
import javax.xml.ws.Response

/**
 * Bridges SOAP's AsyncHandler to Ratpack's Promise.
 */
@SuppressWarnings('CatchThrowable')
@TupleConstructor
class PromiseAsyncHandlerBridge<T> implements AsyncHandler<T> {
  Downstream<T> downstream

  @Override
  void handleResponse(Response<T> res) {
    try {
      downstream.success(res.get())
    } catch (Throwable t) {
      downstream.error(t)
    }
  }

}

Then to call your webservice and get an observable, you can do this

Observable<PRPAIN201310UV02> pixResponse =
    observe(async { Downstream down ->
        servicePort.pixConsumerPRPAIN201302UV02Async(
                request, new PromiseAsyncHandlerBridge<MCCIIN000002UV01>(down)
        )
    })