Notification - 6.3

Talend ESB Service Developer Guide

EnrichVersion
6.3
EnrichProdName
Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
task
Design and Development
Installation and Upgrade
EnrichPlatform
Talend ESB

Notification is a one-way message exchange in which a message is sent from the provider to the consumer. In Talend ESB, the Notification Message Exchange Pattern has been implemented using CXF, by setting up a CXF service on the consumer side and CXF service proxy on the provider side.

This following figure represents the Notification Message Exchange Pattern:

WSDL for Notification Message Exchange

In this message exchange, a service provider sends a message to a consumer, without expecting a reply. The service operation must only contain an output message, and the input message is not defined.

The operation in the WSDL looks like the following:

<portType name="Library">
  .....
  <operation name="newBooks" tmep:mep="notification">
    <input message="tns:newBooksNotification" />
  </operation>
</portType>

<binding name="LibraryNotificationJmsSoap" type="tns:Library">
  <soap:binding style="document" transport="http://www.w3.org/2010/soapjms/" />
  <operation name="newBooks">
    <soap:operation soapAction="newBooks" />
    <input>
      <soap:body use="literal" />
    </input>
  </operation>
</binding>

In this case, the provider is the initiator of the communication (it sends the messages), and the consumer is recipient of the messages.

Dependencies and Code Generation

The following dependencies should be added to the project to enable notification functionality:

In the pom.xml file of the project:

<dependencies>
    <!-- TESB -->
    <dependency>
        <groupId>org.talend.esb</groupId>
        <artifactId>transport-jms</artifactId>
        <version>${tesb.version}</version>
    </dependency>
         
    <!-- CXF -->
    <dependency>
        <groupId>org.apache.cxf</groupId>
        <artifactId>cxf-rt-frontend-jaxws</artifactId>
        <version>${cxf.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.cxf</groupId>
        <artifactId>cxf-rt-transports-http</artifactId>
        <version>${cxf.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.cxf</groupId>
        <artifactId>cxf-rt-ws-security</artifactId>
        <version>${cxf.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.cxf.xjc-utils</groupId>
        <artifactId>cxf-xjc-runtime</artifactId>
        <version>${cxf.xjc.version}</version>
    </dependency>

    <!-- Third parties -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-web</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <!-- logging -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
</dependencies>

The following plugin must be added to the Maven build cycle to include the CXF code generator:

CXF code genration plugin

<plugins>
   <plugin>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-codegen-plugin</artifactId>
      <version>${cxf.version}</version>
      <executions>
         <execution>
            <id>generate-sources</id>
            <phase>generate-sources</phase>
            <configuration>
               <wsdlOptions>
                  <wsdlOption>
                     <wsdl>src/main/sr-resources/Library.wsdl</wsdl>
                     <frontEnd>jaxws21</frontEnd>
                     <faultSerialVersionUID>1</faultSerialVersionUID>
                     <bindingFiles>
                        <bindingFile>src/main/sr-resources/binding.xml</bindingFile>
                     </bindingFiles>
                  </wsdlOption>
               </wsdlOptions>
            </configuration>
            <goals>
               <goal>wsdl2java</goal>
            </goals>
         </execution>
      </executions>
   </plugin>

Setting up the Service Provider

A CXF client should be set up on the provider side to send messages to the consumer.

Spring-based client set up via Spring

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jaxws="http://cxf.apache.org/jaxws"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:soap="http://cxf.apache.org/bindings/soap"
    xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://cxf.apache.org/bindings/soap http://cxf.apache.org/schemas/configuration/soap.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd">

    <import resource="classpath:/META-INF/cxf/cxf.xml"/>

    <!-- CXF 3 JMS configuration style -->
    <import resource="classpath:/META-INF/tesb/tesb-cxf-transport-jms.xml"/>

    <!-- CXF client that sends notifications -->
    <jaxws:client xmlns:library="http://services.talend.org/demos/Library/1.0"
        id="publisherClient" serviceName="library:LibraryProvider"
        endpointName="library:LibraryTopicPort"
        address="jms:jndi-topic:dynamicTopics/newBooksTopic.topic?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp
;jndiConnectionFactoryName=ConnectionFactory&amp;jndiURL=tcp://localhost:61616"
        serviceClass="org.talend.services.demos.library._1_0.Library">
        <jaxws:features>
            <bean class="org.apache.cxf.ws.addressing.WSAddressingFeature"/>
        </jaxws:features>
    </jaxws:client>

    <bean id="libraryPublisher" class="org.talend.services.demos.server.LibraryPublisher">
        <property name="library" ref="publisherClient"/>
    </bean>
</beans>

Here is an example of how notification-sending method in LibraryPublisher class may be implemented:

Notification sender

public class LibraryPublisher {
    /** The Library proxy will be injected either by spring or by a direct call to the setter  */
    Library library;

    public Library getLibrary() {
        return library;
    }

    public void setLibrary(Library library) {
        this.library = library;
    }

    public void publishNewBooksNotifications() throws InterruptedException {
        List<BookType> newBooks = new LinkedList<BookType>();
        BookType book = new BookType();
        newBooks.add(book);
        PersonType author = new PersonType();
        book.getAuthor().add(author);
        author.setFirstName("Jack");
        author.setLastName("Icebear");
        Calendar dateOfBirth = new GregorianCalendar(101, Calendar.JANUARY, 2);
        author.setDateOfBirth(dateOfBirth.getTime());
        book.getTitle().add("More About Survival in the Arctic - Volume " + ndx);
        book.getPublisher().add("Frosty Edition");
        book.setYearPublished("2011");

        library.newBooks(new Date(), newBooks);
    }
}

Here, the library object is injected via Spring.

Setting up the Service Consumer

The CXF endpoint that listens for notifications should be defined on the consumer side. Here is how it looks like in Spring configuration file:

Setting up consumer side

<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:library="http://services.talend.org/demos/Library/1.0"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
        http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd">
 
    <import resource="classpath:META-INF/cxf/cxf.xml" />
 
    <!-- CXF 3 JMS configuration style -->
    <import resource="classpath:META-INF/tesb/tesb-cxf-transport-jms.xml" />
 
 
    <jaxws:endpoint xmlns:library="http://services.talend.org/demos/Library/1.0"
        id="LibraryNotificationReceiver"
        address="jms:jndi-topic:dynamicTopics/newBooksTopic.topic?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp
;jndiConnectionFactoryName=ConnectionFactory&amp;jndiURL=tcp://localhost:61616"
        serviceName="library:LibraryProvider" endpointName="library:LibraryTopicPort"
        implementor="org.talend.services.demos.client.LibraryNotificationReceiverImpl">
        <jaxws:features>
            <bean class="org.apache.cxf.ws.addressing.WSAddressingFeature" />          
        </jaxws:features>
    </jaxws:endpoint>
 
</beans>

The following is an example of the implementation of the notification-receiving service:

Consumer side notification receiver

@@WebServiceProvider
public class LibraryNotificationReceiverImpl implements Library {

    @Resource
    private WebServiceContext wsContext;

    ...

    @Override
    public void newBooks(Date listDate, List<BookType> book) {
        System.out.println("****************************************************");
        System.out.println("*** newBooks notification is received **************");
        System.out.println("****************************************************");

        System.out.println("New books notification:");

        // Business logic here
    }
}