Scatter-Gather - 6.3

Talend ESB Mediation Developer Guide

Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
Design and Development
Talend ESB

The Scatter-Gather from the EIP patterns allows you to route messages to a number of dynamically specified recipients and re-aggregate the responses back into a single message.

Dynamic Scatter-Gather Example

In this example we want to get the best quote for beer from several different vendors. We use a dynamic Recipient List to get the request for a quote to all vendors and an Aggregator to pick the best quote out of all the responses. The routes for this are defined as:

<camelContext xmlns="">
    <from uri="direct:start"/>
    <from uri="seda:quoteAggregator"/>
    <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000">
      <to uri="mock:result"/>

So in the first route you see that the Recipient List is looking at the listOfVendors header for the list of recipients. So, we need to send a message like

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3");
headers.put("quoteRequestId", "quoteRequest-1");
    "<quote_request item=\"beer\"/>", headers);

This message will be distributed to the following Endpoint s: bean:vendor1, bean:vendor2, and bean:vendor3. These are all beans which look like

public class MyVendor {
    private int beerPrice;
    @Produce(uri = "seda:quoteAggregator")
    private ProducerTemplate quoteAggregator;
    public MyVendor(int beerPrice) {
        this.beerPrice = beerPrice;
    public void getQuote(@XPath("/quote_request/@item") String item, 
        Exchange exchange) throws Exception {
        if ("beer".equals(item)) {
        } else {
            throw new Exception("No quote available for " + item);

and are loaded up in Spring like

<bean id="aggregatorStrategy" class=
    "org.apache.camel.spring.processor.scattergather. \\ 

<bean id="vendor1" 

<bean id="vendor2" 

<bean id="vendor3" 

Each bean is loaded with a different price for beer. When the message is sent to each bean endpoint, it will arrive at the MyVendor.getQuote method. This method does a simple check whether this quote request is for beer and then sets the price of beer on the exchange for retrieval at a later step. The message is forwarded on to the next step using POJO Producing (see the @Produce annotation).

At the next step we want to take the beer quotes from all vendors and find out which one was the best (i.e. the lowest!). To do this we use an Aggregator with a custom aggregation strategy. The Aggregator needs to be able to compare only the messages from this particular quote; this is easily done by specifying a correlationExpression equal to the value of the quoteRequestId header. As shown above in the message sending snippet, we set this header to quoteRequest-1. This correlation value should be unique or you may include responses that are not part of this quote. To pick the lowest quote out of the set, we use a custom aggregation strategy like

public class LowestQuoteAggregationStrategy 
    implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // the first time we only have the new exchange
        if (oldExchange == null) {
            return newExchange;

        if (oldExchange.getIn().getBody(int.class) 
            < newExchange.getIn().getBody(int.class)) {
            return oldExchange;
        } else {
            return newExchange;

Finally, we expect to get the lowest quote of $1 out of $1, $2, and $3.

result.expectedBodiesReceived(1); // expect the lowest quote

You can find the full example source here:



Static Scatter-Gather Example

You can lock down which recipients are used in the Scatter-Gather by using a static Recipient List. It looks something like this

from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", 


    new LowestQuoteAggregationStrategy()).to(