Request-Callback - 6.3

Talend ESB Service Developer Guide

EnrichVersion
6.3
EnrichProdName
Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
task
Design and Development
Installation and Upgrade
EnrichPlatform
Talend ESB

When a consumer sends a request to a provider, the consumer normally waits for the response to return before it can continue with its work. In cases where the response returns immediately, a request-response message exchange pattern is sufficient. But there can be scenarios when request processing can be delayed by few hours or even days. In such cases, it is useful for the consumer to be able to continue its work and handle the response later. The Request-Callback message exchange pattern supports exactly this scenario. The consumer can continue its processing without interruptions and is notified when the response is returned at a later point in time.

WSDL for Request-Callback Message Exchange

The Request-Callback message exchange is described in WSDL as two separate one-way exchanges.

The WSDL starts with a "Types" section followed by "message definitions". Additionally, it contains two "portType" definitions - one for the request from the consumer to the provider (Library) and one for the response from the provider (LibraryConsumer). The second callback portType contains two operations - one for a regular callback response and one for a callback fault response. Both operations (in LibraryConsumer) declare the Library portType operation as a "partnerOperation" element.

The WSDL used in this message exchange pattern additionally has a "partnerLink" definition, which declares the first portType (Library) as a "service" and the second (LibraryConsumer) as a "callback".

In the following WSDL, the request operation is called seekBookInBasement and that callback operation is called seekBookInBasementResponse:

Example of WSDL with request-callback exchange

<?xml version="1.0" encoding="UTF-8"?>
<definitions
  targetNamespace="http://services.talend.org/demos/Library/1.0"
  xmlns="http://schemas.xmlsoap.org/wsdl/"
  xmlns:err="http://types.talend.org/demos/GeneralObjects/ErrorHandling/1.0"
  xmlns:jms="http://schemas.xmlsoap.org/wsdl/jms/"
  xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
  xmlns:svn="http://types.talend.org/demos/Library/Common/1.0"
  xmlns:tns="http://services.talend.org/demos/Library/1.0"
  xmlns:tmep="http://types.talend.com/mep/1.0"
  xmlns:sdx="http://types.talend.org/service/ServiceDescription/2.0"
  xmlns:xsd="http://www.w3.org/2001/XMLSchema"
  xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <types>
        <!-- Refer to the Library example for the complete WSDL -->
        ......
        ......
    </types>
    <message name="seekBookRequest">
        <part element="svn:SearchFor" name="body" />
    </message>
    <message name="seekBookInBasementRequest">
        <part element="svn:SearchInBasementFor" name="body" />
    </message>
    <message name="seekBookResponse">
        <part element="svn:ListOfBooks" name="body" />
    </message>
    <message name="seekBookError">
        <part element="err:Exceptions" name="Exception" />
    </message>
     
 
    <portType name="Library">
        <operation name="seekBook">
            <input message="tns:seekBookRequest" />
            <output message="tns:seekBookResponse" />
            <fault message="tns:seekBookError" name="error" />
        </operation>
         
        <operation name="seekBookInBasement">
            <input message="tns:seekBookInBasementRequest" />
        </operation>
    </portType>
 
    <portType name="LibraryConsumer">
        <operation name="seekBookInBasementResponse"
            sdx:partnerOperation="seekBookInBasement">
            <input message="tns:seekBookResponse" />
        </operation>
        <operation name="seekBookInBasementFault"
            sdx:faultOperation="true" sdx:partnerOperation="seekBookInBasement">
            <input message="tns:seekBookError" />
        </operation>
    </portType>
 
    <plnk:partnerLinkType name="CallbackPartnerLink"
        xmlns:plnk="http://schemas.xmlsoap.org/ws/2003/05/partner-link/">
        <plnk:role name="service">
            <plnk:portType name="tns:Library" />
        </plnk:role>
        <plnk:role name="callback">
            <plnk:portType name="tns:LibraryConsumer" />
        </plnk:role>
    </plnk:partnerLinkType>
            
    <service name="LibraryProvider">
        <port binding="tns:LibraryJmsSoap" name="LibraryJmsPort">
            <soap:address
location="jms:jndi:dynamicQueues/library.queue?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&jnd
iConnectionFactoryName=ConnectionFactory&jndiURL=tcp://localhost:61616" />
        </port>
    </service>
 
    <binding name="LibraryJmsSoap" type="tns:Library">
        <soap:binding style="document" 
                transport="http://www.w3.org/2010/soapjms/" />
        <operation name="seekBook">
            <soap:operation soapAction="seekBook" />
            <input>
                <soap:body use="literal" />
            </input>
            <output>
                <soap:body use="literal" />
            </output>
            <fault name="error">
                <soap:fault name="error" use="literal" />
            </fault>
        </operation>
     
        <operation name="seekBookInBasement">
            <soap:operation soapAction="seekBookInBasement" />
            <input>
                <soap:body use="literal" />
            </input>
        </operation>
    </binding>
 
    <service name="LibraryConsumer">
        <port binding="tns:LibraryConsumerJmsSoap" name="LibraryConsumerPort">
            <soap:address
location="jms:jndi:dynamicQueues/libraryconsumer.queue?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&jnd
iConnectionFactoryName=ConnectionFactory&jndiURL=tcp://localhost:61616" />
        </port>
    </service>
 
    <binding name="LibraryConsumerJmsSoap" type="tns:LibraryConsumer">
        <soap:binding style="document" 
                transport="http://www.w3.org/2010/soapjms/" />
        <operation name="seekBookInBasementResponse">
            <soap:operation soapAction="seekBookInBasementResponse" />
            <input>
                <soap:body use="literal" />
            </input>
        </operation>
        <operation name="seekBookInBasementFault">
            <soap:operation soapAction="seekBookInBasementFault" />
            <input>
                <soap:body use="literal" />
            </input>
        </operation>
    </binding>
 
</definitions>

The LibraryConsumer service waits for callback responses from the service provider (listening to the ActiveMQ queue). The operations involved in the message exchange are: seekBookInBasement, seekBookInBasementResponse and seekBookInBasementFault.

Relation between Request and Callback Messages

The relationship between a request sent and a response received is achieved by adding a special callId header to both request and callback messages:

For this Request message

<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
   <soap:Header>
      <callId xmlns="http://www.talend.com/esb/requestcallback">urn:uuid:8b64f8f3-d72b-4a6f-a206-8a3309920998</callId>
      <Action xmlns="http://www.w3.org/2005/08/addressing">http://cxf.apache.org/jaxws/dispatch/Library/InvokeOneWayRequest</Action>
      <MessageID xmlns="http://www.w3.org/2005/08/addressing">urn:uuid:1d1dce72-66b3-4c47-aebe-dd462b1e5d32</MessageID>
      <To xmlns="http://www.w3.org/2005/08/addressing">http://localhost:9090/tesb-library-tutorial/services/LibraryProvider</To>
      <ReplyTo xmlns="http://www.w3.org/2005/08/addressing">
         <Address>http://consumer.example.com:7777/soap/LibraryConsumer</Address>
      </ReplyTo>
      <RelatesTo xmlns="http://www.w3.org/2005/08/addressing" 
                 RelationshipType="message"/>
   </soap:Header>
   <soap:Body>
       <!-- some data here -->
   </soap:Body>
</soap:Envelope>

The response for this message may look like the following:

Corresponding Callback reponse

<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
   <soap:Header>
      <callId xmlns="http://www.talend.com/esb/requestcallback">urn:uuid:8b64f8f3-d72b-4a6f-a206-8a3309920998</callId>
      <callbackId xmlns="http://www.talend.com/esb/requestcallback">urn:uuid:bb7176ae-e6e2-44cf-91c6-7b1238c3d6f5</callbackId>
      <Action xmlns="http://www.w3.org/2005/08/addressing">seekBookInBasementResponse</Action>
      <MessageID xmlns="http://www.w3.org/2005/08/addressing">urn:uuid:511793d8-74a6-4b7c-a1bd-e852a2795fed</MessageID>
      <To xmlns="http://www.w3.org/2005/08/addressing">http://127.0.0.1:6666/soap/LibraryConsumer</To>
      <ReplyTo xmlns="http://www.w3.org/2005/08/addressing">
         <Address>http://www.w3.org/2005/08/addressing/none</Address>
      </ReplyTo>
      <RelatesTo xmlns="http://www.w3.org/2005/08/addressing" RelationshipType="message">urn:uuid:1d1dce72-66b3-4c47-aebe-dd462b1e5d32</RelatesTo>
   </soap:Header>
   <soap:Body>
     <!-- some data here -->
   </soap:Body>
</soap:Envelope>

Defining Participants

Participants for the request-callback message exchange can be done in two ways: using the CXF API or using Spring-based configuration. It is possible to use org.talend.esb.mep.requestcallback.feature.CallContext methods for the participant setup.

Updating the Service Provider

The Provider side setup includes creating a Provider service (that receives the request from the consumer) and a callback client that will send the callback response to the consumer. The following is an example of a provider side service Spring configuration:

Provider side service Spring configuration

<jaxws:endpoint xmlns:library="http://services.talend.org/demos/Library/1.0"
    id="LibraryProviderJMS"
    address="jms:jndi:dynamicQueues/library.queue?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp
;jndiConnectionFactoryName=ConnectionFactory&amp;jndiURL=tcp://localhost:61616"
    serviceName="library:LibraryProvider" endpointName="library:LibraryJmsPort"
    implementor="#libraryServerImpl">
   <jaxws:features>
      <bean class="org.talend.esb.mep.requestcallback.feature.RequestCallbackFeature"/>
      <bean class="org.apache.cxf.feature.LoggingFeature"/>
   </jaxws:features>
</jaxws:endpoint>

Warning

RequestCallbackFeature feature should be added to the service to allow request-callback functionality.

Service Provider Implementation

The service provider implementation may be done using JAX-WS. In such case, the provider side method in the annotated interface may look like this:

Library JAX-WS annotated interface

@WebService(targetNamespace = "http://services.talend.org/demos/Library/1.0", 
            name = "Library")
@XmlSeeAlso({org.talend.types.demos.library.common._1.ObjectFactory.class,
org.talend.types.demos.generalobjects.errorhandling._1.ObjectFactory.class})
public interface Library {
   ...
    @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
    @Oneway
    @WebMethod(action = "seekBookInBasement")
    public void seekBookInBasement(
        @WebParam(partName = "body", name = "SearchInBasementFor", 
                  targetNamespace = "http://types.talend.org/demos/Library/Common/1.0")
        org.talend.types.demos.library.common._1.SearchInBasementFor body
    );
   ...
}

A JAX-WS client must be created on the provider side to send a callback because the callback message is sent to a separate endpoint opened on the consumer side. The Client object may be set up as a jaxws:client bean in Spring configuration.

Provider side jaxws:client configuration example

<jaxws:client xmlns:library="http://services.talend.org/demos/Library/1.0"
        id="callbackResponseClient" serviceName="library:LibraryConsumer"
        endpointName="library:LibraryConsumerPort" address="jms://"
        serviceClass="org.talend.services.demos.library._1_0.LibraryConsumer">
    <jaxws:features>
        <bean class="org.apache.cxf.feature.LoggingFeature"/>
    </jaxws:features>
</jaxws:client>

The Client needs the Talend ESB CallContext feature to be set up as well as a service provider to work properly. CallContext class has a method that helps to set up callback proxy object. The method is called setupCallbackProxy and after it was applied to the proxy object, the proxy is ready to work.

Each request-callback message has a call context object associated with it. Call context is necessary for proper handling of request-callback messages. Call context object contains such information as call id, callback id, address of endpoint to which reply should be sent, and so on. The class org.talend.esb.mep.requestcallback.feature.CallContext is used to represent call context objects. It is necessary to extract call context from a message when handling request, at least to provide call id of the message to which callback message is related to.

In the example above, the callback proxy object was injected into the service provider and the callback was sent from the same thread in which request was received. In real-world scenarios, it is a rare situation, it is here just for the example purpose.

The request-callback processing on the provider side is shown below:

Request-Callback processing on provider side example

public class LibraryServerImpl implements Library {
  ...

    // Injected jaxws:client (see xml config above)
    private LibraryConsumer callbackResponseClient;

    @Resource
    private WebServiceContext wsContext;

    @Override
    public void seekBookInBasement(SearchInBasementFor body) {

        // Extract call context from message
        CallContext ctx = CallContext.getCallContext(wsContext.getMessageContext());
        
        // Some business logic here
        ListOfBooks result = new ListOfBooks();
        BookType book = new BookType();
        result.getBook().add(book);
        PersonType author = new PersonType();
        book.getAuthor().add(author);
        author.setFirstName("John");
        author.setLastName("Stripycat");
        Calendar dateOfBirth = new GregorianCalendar(202, Calendar.MAY, 17);
        author.setDateOfBirth(dateOfBirth.getTime());
        book.getTitle().add("Hunting basement inhabitants");
        book.getPublisher().add("Dusty Edition");
        book.setYearPublished("2013");

        // Set up necessary interceptors for callback proxy object
        ctx.setupCallbackProxy(callbackResponseClient);

        // Send callback message back to client
        callbackResponseClient.seekBookInBasementResponse(result);
    }
  ...
}

Updating the Service Consumer

The consumer side configuration includes setting up a CXF client to send a request to the provider, and setting up a CXF endpoint which will receive callback messages back from the provider. The client may be set up using Spring-based configuration. RequestCallbackFeature must be added to the client to work properly. The following is an example of Client Spring configuration:

CXF client configuration on consumer side

<jaxws:client id="library" serviceName="library:LibraryProvider"
    endpointName="library:LibraryJmsPort"
    address="jms:jndi:dynamicQueues/library.queue?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp
;jndiConnectionFactoryName=ConnectionFactory&amp;jndiURL=tcp://localhost:61616"
    serviceClass="org.talend.services.demos.library._1_0.Library">
    <jaxws:features>
        <bean class="org.talend.esb.mep.requestcallback.feature.RequestCallbackFeature"/>
    </jaxws:features>
    <jaxws:properties>
            <entry key="org.talend.esb.mep.requestcallback.CallbackEndpoint">
                <ref bean="custTestServiceConsumerEndpoint"/>
            </entry>
    </jaxws:properties>
 </jaxws:client>

The setup of callback-receiving endpoint on the consumer side can be done via Spring configuration as well:

Callback-receiving endpoint Spring configuration

<jaxws:endpoint xmlns:library="http://services.talend.org/demos/Library/1.0"
    id="LibraryConsumerJMS"
    address="jms:jndi:dynamicQueues/libraryconsumer.queue?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&jnd
iConnectionFactoryName=ConnectionFactory&jndiURL=tcp://localhost:61616"
    serviceName="library:LibraryConsumer" endpointName="library:LibraryConsumerPort"
    implementor="org.talend.services.demos.client.LibraryConsumerImpl">
    <jaxws:features>
        <bean class="org.talend.esb.mep.requestcallback.feature.RequestCallbackFeature"/>
        <bean class="org.apache.cxf.feature.LoggingFeature"/>
    </jaxws:features>
    <jaxws:properties>
        <entry key="jaxws.provider.interpretNullAsOneway" value="true"/>
    </jaxws:properties>
</jaxws:endpoint>

After both provider and consumer are set up, the request-callback message exchange can be done. The following is an example of sending a request:

Sending request from consumer to provider

public class LibraryTester {
    ...

    /** The Library proxy will be injected either by spring or by a direct call to the setter  */
    Library library;

   public void testRequestCallbackPositive() throws SeekBookError {

        // Create new request object and fill it with
        // some business data
        SearchInBasementFor request = new SearchInBasementFor();
        request.getAuthorLastName().add("Stripycat");

        // Add correlation info map to the request context
        Map<String, Object> rctx = ((BindingProvider) library).getRequestContext();
        Map<String, Object> correlationInfo = new HashMap<String, Object>();
        rctx.put(RequestCallbackFeature.CALL_INFO_PROPERTY_NAME, correlationInfo);

        // Send request to the provider
        library.seekBookInBasement(request);
   ...
}

CallContext object related to request-callback messages contains various information regarding a message. The following code snippet shows how to get the message information on the consumer side when callback message is received.

Receiving callback message

@WebServiceProvider
@Features(features = { "org.talend.esb.mep.requestcallback.feature.RequestCallbackFeature"})
public class LibraryConsumerImpl implements LibraryConsumer {
    @Resource
    private WebServiceContext wsContext;
    public void seekBookInBasementResponse(ListOfBooks body) {
      
        CallContext ctx = CallContext.getCallContext(wsContext.getMessageContext());
        System.out.println("Info from CallContext:");
        if (ctx == null) {
            System.out.println("- no CallContext");
        } else {
            System.out.println("- Call ID is " + ctx.getCallId());
            System.out.println("- Callback ID is " + ctx.getCallbackId());
        }
    }
    ... 
 }

Project dependencies

To be able to work with the Request-Callback message exchange, the request-callback dependency should be added:

Request-Callback message exchange pattern dependency

<dependency>
    <groupId>org.talend.esb.mep</groupId>
    <artifactId>request-callback</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>org.talend.esb</groupId>
    <artifactId>transport-jms</artifactId>
    <version>${project.version}</version>
</dependency>

And the CXF dependencies should be added as well:

CXF dependencies

<dependency>
    <groupId>org.apache.cxf</groupId>
    <artifactId>cxf-rt-frontend-jaxws</artifactId>
    <version>${cxf.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.cxf</groupId>
    <artifactId>cxf-rt-transports-http</artifactId>
    <version>${cxf.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.cxf</groupId>
    <artifactId>cxf-rt-ws-security</artifactId>
    <version>${cxf.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.cxf.xjc-utils</groupId>
    <artifactId>cxf-xjc-runtime</artifactId>
    <version>${cxf.xjc.version}</version>
</dependency>

To be able to configure CXF endpoints/client via Spring DI, the Spring framework dependency should be added:

Spring framework dependency

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-web</artifactId>
    <version>${spring.version}</version>
</dependency>